(self, obj: Union[Dict, List[Dict]], gather_obj: bool = False)
| 59 | self._append(**item) |
| 60 | |
| 61 | def _append(self, obj: Union[Dict, List[Dict]], gather_obj: bool = False): |
| 62 | if isinstance(obj, (list, tuple)) and all(isinstance(item, dict) for item in obj): |
| 63 | obj_list = obj |
| 64 | else: |
| 65 | obj_list = [obj] |
| 66 | if gather_obj and dist.is_initialized(): |
| 67 | obj_list = gather_object(obj_list) |
| 68 | if not self.is_write_rank: |
| 69 | return |
| 70 | obj_list = check_json_format(obj_list) |
| 71 | for i, _obj in enumerate(obj_list): |
| 72 | obj_list[i] = json.dumps(_obj, ensure_ascii=False) + '\n' |
| 73 | self._write_buffer(''.join(obj_list)) |
| 74 | |
| 75 | def append(self, obj: Union[Dict, List[Dict]], gather_obj: bool = False): |
| 76 | if self.enable_async: |
no test coverage detected