MCPcopy Index your code
hub / github.com/aws/aws-cli / RecordingSubscriber

Class RecordingSubscriber

tests/utils/s3transfer/__init__.py:258–277  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

256
257
258class RecordingSubscriber(BaseSubscriber):
259 def __init__(self):
260 self.on_queued_calls = []
261 self.on_progress_calls = []
262 self.on_done_calls = []
263
264 def on_queued(self, **kwargs):
265 self.on_queued_calls.append(kwargs)
266
267 def on_progress(self, **kwargs):
268 self.on_progress_calls.append(kwargs)
269
270 def on_done(self, **kwargs):
271 self.on_done_calls.append(kwargs)
272
273 def calculate_bytes_seen(self, **kwargs):
274 amount_seen = 0
275 for call in self.on_progress_calls:
276 amount_seen += call['bytes_transferred']
277 return amount_seen
278
279
280class TransferCoordinatorWithInterrupt(TransferCoordinator):

Calls

no outgoing calls