MCPcopy
hub / github.com/saltstack/salt / RedirectStdStreams

Class RedirectStdStreams

tests/support/helpers.py:292–355  ·  view source on GitHub ↗

Temporarily redirect system output to file like objects. Default is to redirect to `os.devnull`, which just mutes output, `stdout` and `stderr`.

Source from the content-addressed store, hash-verified

290
291
292class RedirectStdStreams:
293 """
294 Temporarily redirect system output to file like objects.
295 Default is to redirect to `os.devnull`, which just mutes output, `stdout`
296 and `stderr`.
297 """
298
299 def __init__(self, stdout=None, stderr=None):
300 if stdout is None:
301 # pylint: disable=resource-leakage
302 stdout = salt.utils.files.fopen(os.devnull, "w")
303 # pylint: enable=resource-leakage
304 if stderr is None:
305 # pylint: disable=resource-leakage
306 stderr = salt.utils.files.fopen(os.devnull, "w")
307 # pylint: enable=resource-leakage
308
309 self.__stdout = stdout
310 self.__stderr = stderr
311 self.__redirected = False
312 self.patcher = patch.multiple(sys, stderr=self.__stderr, stdout=self.__stdout)
313
314 def __enter__(self):
315 self.redirect()
316 return self
317
318 def __exit__(self, exc_type, exc_value, traceback):
319 self.unredirect()
320
321 def redirect(self):
322 self.old_stdout = sys.stdout
323 self.old_stdout.flush()
324 self.old_stderr = sys.stderr
325 self.old_stderr.flush()
326 self.patcher.start()
327 self.__redirected = True
328
329 def unredirect(self):
330 if not self.__redirected:
331 return
332 try:
333 self.__stdout.flush()
334 self.__stdout.close()
335 except ValueError:
336 # already closed?
337 pass
338 try:
339 self.__stderr.flush()
340 self.__stderr.close()
341 except ValueError:
342 # already closed?
343 pass
344 self.patcher.stop()
345
346 def flush(self):
347 if self.__redirected:
348 try:
349 self.__stdout.flush()

Callers 1

run_run_plusMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected