this is used for feeding in chunks of stdout/stderr, and breaking it up into chunks that will actually be put into the internal buffers. for example, if you have two processes, one being piped to the other, and you want that, first process to feed lines of data (instead of the chunks
| 3247 | |
| 3248 | |
| 3249 | class StreamBufferer: |
| 3250 | """this is used for feeding in chunks of stdout/stderr, and breaking it up |
| 3251 | into chunks that will actually be put into the internal buffers. for |
| 3252 | example, if you have two processes, one being piped to the other, and you |
| 3253 | want that, first process to feed lines of data (instead of the chunks |
| 3254 | however they come in), OProc will use an instance of this class to chop up |
| 3255 | the data and feed it as lines to be sent down the pipe""" |
| 3256 | |
| 3257 | def __init__(self, buffer_type, encoding=DEFAULT_ENCODING, decode_errors="strict"): |
| 3258 | # 0 for unbuffered, 1 for line, everything else for that amount |
| 3259 | self.type = buffer_type |
| 3260 | self.buffer = [] |
| 3261 | self.n_buffer_count = 0 |
| 3262 | self.encoding = encoding |
| 3263 | self.decode_errors = decode_errors |
| 3264 | |
| 3265 | # this is for if we change buffering types. if we change from line |
| 3266 | # buffered to unbuffered, its very possible that our self.buffer list |
| 3267 | # has data that was being saved up (while we searched for a newline). |
| 3268 | # we need to use that up, so we don't lose it |
| 3269 | self._use_up_buffer_first = False |
| 3270 | |
| 3271 | # the buffering lock is used because we might change the buffering |
| 3272 | # types from a different thread. for example, if we have a stdout |
| 3273 | # callback, we might use it to change the way stdin buffers. so we |
| 3274 | # lock |
| 3275 | self._buffering_lock = threading.RLock() |
| 3276 | self.log = Logger("stream_bufferer") |
| 3277 | |
| 3278 | def change_buffering(self, new_type): |
| 3279 | # TODO, when we stop supporting 2.6, make this a with context |
| 3280 | self.log.debug("acquiring buffering lock for changing buffering") |
| 3281 | self._buffering_lock.acquire() |
| 3282 | self.log.debug("got buffering lock for changing buffering") |
| 3283 | try: |
| 3284 | if new_type == 0: |
| 3285 | self._use_up_buffer_first = True |
| 3286 | |
| 3287 | self.type = new_type |
| 3288 | finally: |
| 3289 | self._buffering_lock.release() |
| 3290 | self.log.debug("released buffering lock for changing buffering") |
| 3291 | |
| 3292 | def process(self, chunk): |
| 3293 | # MAKE SURE THAT THE INPUT IS PY3 BYTES |
| 3294 | # THE OUTPUT IS ALWAYS PY3 BYTES |
| 3295 | |
| 3296 | # TODO, when we stop supporting 2.6, make this a with context |
| 3297 | self.log.debug( |
| 3298 | "acquiring buffering lock to process chunk (buffering: %d)", self.type |
| 3299 | ) |
| 3300 | self._buffering_lock.acquire() |
| 3301 | self.log.debug("got buffering lock to process chunk (buffering: %d)", self.type) |
| 3302 | try: |
| 3303 | # unbuffered |
| 3304 | if self.type == 0: |
| 3305 | if self._use_up_buffer_first: |
| 3306 | self._use_up_buffer_first = False |
no outgoing calls