MCPcopy
hub / github.com/microsoft/Cream / _async_manager_worker_fn

Method _async_manager_worker_fn

TinyViT/data/augmentation/manager.py:22–58  ·  view source on GitHub ↗
(self, msg_queue, path, rank)

Source from the content-addressed store, hash-verified

20 KILL = 4
21
22 def _async_manager_worker_fn(self, msg_queue, path, rank):
23 # path: xxx/logits_top100_epoch0
24 rank_name = f'rank{rank}'
25 # logits_top100_epoch0_rank0
26 basename = os.path.basename(path) + f'_{rank_name}'
27 tmp_handle = tempfile.TemporaryDirectory(prefix='tinyvit_' + basename)
28
29 # tmp_dir/tinyvit_logits_top100_epoch0_rank0
30 temp_dirname = tmp_handle.name
31
32 tmp_filename = os.path.join(temp_dirname, rank_name)
33 # tmp_dir/tinyvit_logits_top100_epoch0_rank0/rank0-keys.txt
34 keys_fname = tmp_filename + '-keys.txt'
35 values_fname = tmp_filename + '-values.bin'
36
37 keys_file = open(keys_fname, 'w')
38 values_file = open(values_fname, 'wb')
39 keys = dict()
40
41 while 1:
42 item = msg_queue.get()
43 if item == _Writer._WORKER_MSG.KILL:
44 break
45 key, value = item
46 if key in keys:
47 continue
48 idx = len(keys)
49 keys[key] = idx
50 keys_file.write(key + '\n')
51 values_file.write(value)
52
53 keys_file.close()
54 values_file.close()
55
56 os.makedirs(path, exist_ok=True)
57 os.system(f'mv {temp_dirname}/* {path}/')
58 print(f"Save logits over: {path}")
59
60 def __del__(self):
61 if self.worker is not None:

Callers

nothing calls this directly

Calls 3

printFunction · 0.50
getMethod · 0.45
writeMethod · 0.45

Tested by

no test coverage detected