MCPcopy
hub / github.com/amoffat/sh / StreamBufferer

Class StreamBufferer

src/sh/__init__.py:3249–3372  ·  view source on GitHub ↗

this is used for feeding in chunks of stdout/stderr, and breaking it up into chunks that will actually be put into the internal buffers. for example, if you have two processes, one being piped to the other, and you want that, first process to feed lines of data (instead of the chunks

Source from the content-addressed store, hash-verified

3247
3248
3249class StreamBufferer:
3250 """this is used for feeding in chunks of stdout/stderr, and breaking it up
3251 into chunks that will actually be put into the internal buffers. for
3252 example, if you have two processes, one being piped to the other, and you
3253 want that, first process to feed lines of data (instead of the chunks
3254 however they come in), OProc will use an instance of this class to chop up
3255 the data and feed it as lines to be sent down the pipe"""
3256
3257 def __init__(self, buffer_type, encoding=DEFAULT_ENCODING, decode_errors="strict"):
3258 # 0 for unbuffered, 1 for line, everything else for that amount
3259 self.type = buffer_type
3260 self.buffer = []
3261 self.n_buffer_count = 0
3262 self.encoding = encoding
3263 self.decode_errors = decode_errors
3264
3265 # this is for if we change buffering types. if we change from line
3266 # buffered to unbuffered, its very possible that our self.buffer list
3267 # has data that was being saved up (while we searched for a newline).
3268 # we need to use that up, so we don't lose it
3269 self._use_up_buffer_first = False
3270
3271 # the buffering lock is used because we might change the buffering
3272 # types from a different thread. for example, if we have a stdout
3273 # callback, we might use it to change the way stdin buffers. so we
3274 # lock
3275 self._buffering_lock = threading.RLock()
3276 self.log = Logger("stream_bufferer")
3277
3278 def change_buffering(self, new_type):
3279 # TODO, when we stop supporting 2.6, make this a with context
3280 self.log.debug("acquiring buffering lock for changing buffering")
3281 self._buffering_lock.acquire()
3282 self.log.debug("got buffering lock for changing buffering")
3283 try:
3284 if new_type == 0:
3285 self._use_up_buffer_first = True
3286
3287 self.type = new_type
3288 finally:
3289 self._buffering_lock.release()
3290 self.log.debug("released buffering lock for changing buffering")
3291
3292 def process(self, chunk):
3293 # MAKE SURE THAT THE INPUT IS PY3 BYTES
3294 # THE OUTPUT IS ALWAYS PY3 BYTES
3295
3296 # TODO, when we stop supporting 2.6, make this a with context
3297 self.log.debug(
3298 "acquiring buffering lock to process chunk (buffering: %d)", self.type
3299 )
3300 self._buffering_lock.acquire()
3301 self.log.debug("got buffering lock to process chunk (buffering: %d)", self.type)
3302 try:
3303 # unbuffered
3304 if self.type == 0:
3305 if self._use_up_buffer_first:
3306 self._use_up_buffer_first = False

Callers 5

test_unbufferedMethod · 0.90
test_newline_bufferedMethod · 0.90
test_chunk_bufferedMethod · 0.90
__init__Method · 0.85
__init__Method · 0.85

Calls

no outgoing calls

Tested by 3

test_unbufferedMethod · 0.72
test_newline_bufferedMethod · 0.72
test_chunk_bufferedMethod · 0.72