MCPcopy Index your code
hub / github.com/ipython/ipython / _tee

Method _tee

IPython/core/interactiveshell.py:3087–3129  ·  view source on GitHub ↗

Capture output of a given standard stream and store it in history. Uses patching of write method for maximal compatibility, because ipykernel checks for instances of the stream class, and stream classes in ipykernel implement more complex logic.

(self, channel: Literal["stdout", "stderr"])

Source from the content-addressed store, hash-verified

3085
3086 @contextmanager
3087 def _tee(self, channel: Literal["stdout", "stderr"]):
3088 """Capture output of a given standard stream and store it in history.
3089
3090 Uses patching of write method for maximal compatibility,
3091 because ipykernel checks for instances of the stream class,
3092 and stream classes in ipykernel implement more complex logic.
3093 """
3094 stream = getattr(sys, channel)
3095 original_write = stream.write
3096 execution_count = self.execution_count
3097
3098 def write(data, *args, **kwargs):
3099 """Write data to both the original destination and the capture dictionary."""
3100 result = original_write(data, *args, **kwargs)
3101 if any(
3102 [
3103 self.display_pub.is_publishing,
3104 self.displayhook.is_active,
3105 self.showing_traceback,
3106 ]
3107 ):
3108 return result
3109 if not data:
3110 return result
3111 output_stream = None
3112 outputs_by_counter = self.history_manager.outputs
3113 output_type = "out_stream" if channel == "stdout" else "err_stream"
3114 if execution_count in outputs_by_counter:
3115 outputs = outputs_by_counter[execution_count]
3116 if outputs[-1].output_type == output_type:
3117 output_stream = outputs[-1]
3118 if output_stream is None:
3119 output_stream = HistoryOutput(
3120 output_type=output_type, bundle={"stream": []}
3121 )
3122 outputs_by_counter[execution_count].append(output_stream)
3123
3124 output_stream.bundle["stream"].append(data) # Append to existing stream
3125 return result
3126
3127 stream.write = write
3128 yield
3129 stream.write = original_write
3130
3131 def run_cell(
3132 self,

Callers 1

run_cellMethod · 0.95

Calls

no outgoing calls

Tested by

no test coverage detected