MCPcopy
hub / github.com/ScrapeGraphAI/Scrapegraph-ai / BurrBridge

Class BurrBridge

scrapegraphai/integrations/burr_bridge.py:93–231  ·  view source on GitHub ↗

Bridge class to integrate Burr into ScrapeGraphAI graphs. Args: base_graph (BaseGraph): The base graph to convert to a Burr application. burr_config (dict): Configuration parameters for the Burr application. Attributes: base_graph (BaseGraph): The base graph to

Source from the content-addressed store, hash-verified

91
92
93class BurrBridge:
94 """
95 Bridge class to integrate Burr into ScrapeGraphAI graphs.
96
97 Args:
98 base_graph (BaseGraph): The base graph to convert to a Burr application.
99 burr_config (dict): Configuration parameters for the Burr application.
100
101 Attributes:
102 base_graph (BaseGraph): The base graph to convert to a Burr application.
103 burr_config (dict): Configuration parameters for the Burr application.
104 tracker (LocalTrackingClient): The tracking client for the Burr application.
105 app_instance_id (str): The instance ID for the Burr application.
106 burr_inputs (dict): The inputs for the Burr application.
107 burr_app (Application): The Burr application instance.
108
109 Example:
110 >>> burr_bridge = BurrBridge(base_graph, burr_config)
111 >>> result = burr_bridge.execute(initial_state={"input_key": "input_value"})
112 """
113
114 def __init__(self, base_graph, burr_config):
115 self.base_graph = base_graph
116 self.burr_config = burr_config
117 self.project_name = burr_config.get("project_name", "scrapegraph_project")
118 self.app_instance_id = burr_config.get("app_instance_id", "default-instance")
119 self.burr_inputs = burr_config.get("inputs", {})
120 self.burr_app = None
121
122 def _initialize_burr_app(self, initial_state: Dict[str, Any] = None) -> Application:
123 """
124 Initialize a Burr application from the base graph.
125
126 Args:
127 initial_state (dict): The initial state of the Burr application.
128
129 Returns:
130 Application: The Burr application instance.
131 """
132 if initial_state is None:
133 initial_state = {}
134
135 actions = self._create_actions()
136 transitions = self._create_transitions()
137 hooks = [PrintLnHook()]
138 burr_state = State(initial_state)
139 application_context = ApplicationContext.get()
140 builder = (
141 ApplicationBuilder()
142 .with_actions(**actions)
143 .with_transitions(*transitions)
144 .with_entrypoint(self.base_graph.entry_point)
145 .with_state(**burr_state)
146 .with_identifiers(app_id=str(uuid.uuid4())) # TODO -- grab this from state
147 .with_hooks(*hooks)
148 )
149 if application_context is not None:
150 builder = builder.with_tracker(

Callers 1

executeMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected