(self, message, buff)
| 290 | |
| 291 | # Stream socket directly to a file |
| 292 | def handleStream(self, message, buff): |
| 293 | stream_bytes_left = message["stream_bytes"] |
| 294 | file = self.waiting_streams[message["to"]] |
| 295 | |
| 296 | unprocessed_bytes_num = self.getUnpackerUnprocessedBytesNum() |
| 297 | |
| 298 | if unprocessed_bytes_num: # Found stream bytes in unpacker |
| 299 | unpacker_stream_bytes = min(unprocessed_bytes_num, stream_bytes_left) |
| 300 | buff_stream_start = len(buff) - unprocessed_bytes_num |
| 301 | file.write(buff[buff_stream_start:buff_stream_start + unpacker_stream_bytes]) |
| 302 | stream_bytes_left -= unpacker_stream_bytes |
| 303 | else: |
| 304 | unpacker_stream_bytes = 0 |
| 305 | |
| 306 | if config.debug_socket: |
| 307 | self.log( |
| 308 | "Starting stream %s: %s bytes (%s from unpacker, buff size: %s, unprocessed: %s)" % |
| 309 | (message["to"], message["stream_bytes"], unpacker_stream_bytes, len(buff), unprocessed_bytes_num) |
| 310 | ) |
| 311 | |
| 312 | try: |
| 313 | while 1: |
| 314 | if stream_bytes_left <= 0: |
| 315 | break |
| 316 | stream_buff = self.sock.recv(min(64 * 1024, stream_bytes_left)) |
| 317 | if not stream_buff: |
| 318 | break |
| 319 | buff_len = len(stream_buff) |
| 320 | stream_bytes_left -= buff_len |
| 321 | file.write(stream_buff) |
| 322 | |
| 323 | # Statistics |
| 324 | self.last_recv_time = time.time() |
| 325 | self.incomplete_buff_recv += 1 |
| 326 | self.bytes_recv += buff_len |
| 327 | self.server.bytes_recv += buff_len |
| 328 | except Exception as err: |
| 329 | self.log("Stream read error: %s" % Debug.formatException(err)) |
| 330 | |
| 331 | if config.debug_socket: |
| 332 | self.log("End stream %s, file pos: %s" % (message["to"], file.tell())) |
| 333 | |
| 334 | self.incomplete_buff_recv = 0 |
| 335 | self.waiting_requests[message["to"]]["evt"].set(message) # Set the response to event |
| 336 | del self.waiting_streams[message["to"]] |
| 337 | del self.waiting_requests[message["to"]] |
| 338 | |
| 339 | if unpacker_stream_bytes: |
| 340 | return buff[buff_stream_start + unpacker_stream_bytes:] |
| 341 | else: |
| 342 | return b"" |
| 343 | |
| 344 | # My handshake info |
| 345 | def getHandshakeInfo(self): |
no test coverage detected