MCPcopy
hub / github.com/Kanaries/pygwalker / BatchUploadDatasToolOnWidgets

Class BatchUploadDatasToolOnWidgets

pygwalker/services/upload_data.py:77–105  ·  view source on GitHub ↗

Upload data in batches(use ipywidgets)

Source from the content-addressed store, hash-verified

75
76
77class BatchUploadDatasToolOnWidgets:
78 """Upload data in batches(use ipywidgets)"""
79 def __init__(self, comm: BaseCommunication) -> None:
80 self.comm = comm
81
82 def run(
83 self,
84 *,
85 data_source_id: str,
86 records: List[Dict[str, Any]],
87 sample_data_count: int
88 ) -> None:
89 chunk = 1 << 12
90
91 for i in range(sample_data_count, len(records), chunk):
92 data = records[i: min(i+chunk, len(records))]
93 msg = {
94 'dataSourceId': data_source_id,
95 "total": len(records),
96 "curIndex": i,
97 'data': data,
98 }
99 self.comm.send_msg_async("postData", msg)
100
101 finish_msg = {
102 'dataSourceId': data_source_id,
103 }
104 time.sleep(0.1)
105 self.comm.send_msg_async("finishData", finish_msg)

Callers 1

_init_callbackMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected