MCPcopy
hub / github.com/Kanaries/pygwalker / _init_callback

Method _init_callback

pygwalker/api/pygwalker.py:372–550  ·  view source on GitHub ↗
(self, comm: BaseCommunication, preview_tool: PreviewImageTool = None)

Source from the content-addressed store, hash-verified

370
371 # TODO: using the better way to handle callback
372 def _init_callback(self, comm: BaseCommunication, preview_tool: PreviewImageTool = None):
373 upload_tool = BatchUploadDatasToolOnWidgets(comm)
374 self.comm = comm
375
376 def reuqest_data_callback(_):
377 upload_tool.run(
378 records=self.origin_data_source,
379 sample_data_count=0,
380 data_source_id=self.data_source_id
381 )
382 return {}
383
384 def get_latest_vis_spec(_):
385 return {"visSpec": self.vis_spec}
386
387 def save_chart_endpoint(data: Dict[str, Any]):
388 chart_data = ChartData.parse_obj(data)
389 self._chart_map[data["title"]] = chart_data
390
391 def update_spec(data: Dict[str, Any]):
392 spec_obj = {
393 "config": data["visSpec"],
394 "chart_map": {},
395 "version": __version__,
396 "workflow_list": data.get("workflowList", [])
397 }
398 self._update_vis_spec(data["visSpec"])
399 self.spec_version = __version__
400 self.workflow_list = data.get("workflowList", [])
401
402 if self.use_preview:
403 preview_tool.async_render_gw_review(self._get_gw_preview_html())
404
405 save_chart_endpoint(data["chartData"])
406
407 if self.spec_type == "json_file":
408 with open(self.spec, "w", encoding="utf-8") as f:
409 f.write(json.dumps(spec_obj))
410 if self.spec_type == "json_ksf":
411 self.cloud_service.write_config_to_cloud(self.spec[6:], json.dumps(spec_obj))
412
413 def upload_spec_to_cloud(data: Dict[str, Any]):
414 if data["newToken"]:
415 set_config({"kanaries_token": data["newToken"]})
416 GlobalVarManager.kanaries_api_key = data["newToken"]
417 spec_obj = {
418 "config": self.vis_spec,
419 "chart_map": {},
420 "version": __version__,
421 "workflow_list": self.workflow_list,
422 }
423 file_name = data["fileName"]
424 workspace_name = self.cloud_service.get_kanaries_user_info()["workspaceName"]
425 path = f"{workspace_name}/{file_name}"
426 self.cloud_service.write_config_to_cloud(path, json.dumps(spec_obj))
427 return {"specFilePath": path}
428
429 def _get_datas(data: Dict[str, Any]):

Callers 8

walkFunction · 0.95
get_componentFunction · 0.95
get_html_on_gradioFunction · 0.95
walkFunction · 0.95
_start_serverFunction · 0.80
__init__Method · 0.80

Calls 2

registerMethod · 0.80

Tested by

no test coverage detected