Write data to both the original destination and the capture dictionary.
(data, *args, **kwargs)
| 3096 | execution_count = self.execution_count |
| 3097 | |
| 3098 | def write(data, *args, **kwargs): |
| 3099 | """Write data to both the original destination and the capture dictionary.""" |
| 3100 | result = original_write(data, *args, **kwargs) |
| 3101 | if any( |
| 3102 | [ |
| 3103 | self.display_pub.is_publishing, |
| 3104 | self.displayhook.is_active, |
| 3105 | self.showing_traceback, |
| 3106 | ] |
| 3107 | ): |
| 3108 | return result |
| 3109 | if not data: |
| 3110 | return result |
| 3111 | output_stream = None |
| 3112 | outputs_by_counter = self.history_manager.outputs |
| 3113 | output_type = "out_stream" if channel == "stdout" else "err_stream" |
| 3114 | if execution_count in outputs_by_counter: |
| 3115 | outputs = outputs_by_counter[execution_count] |
| 3116 | if outputs[-1].output_type == output_type: |
| 3117 | output_stream = outputs[-1] |
| 3118 | if output_stream is None: |
| 3119 | output_stream = HistoryOutput( |
| 3120 | output_type=output_type, bundle={"stream": []} |
| 3121 | ) |
| 3122 | outputs_by_counter[execution_count].append(output_stream) |
| 3123 | |
| 3124 | output_stream.bundle["stream"].append(data) # Append to existing stream |
| 3125 | return result |
| 3126 | |
| 3127 | stream.write = write |
| 3128 | yield |
no test coverage detected