MCPcopy
hub / github.com/pathwaycom/pathway / _write

Method _write

python/pathway/io/airbyte/logic.py:58–80  ·  view source on GitHub ↗
(self, record_type, records)

Source from the content-addressed store, hash-verified

56 self._handle_global_state(state)
57
58 def _write(self, record_type, records):
59 if record_type.startswith(AIRBYTE_STREAM_RECORD_PREFIX):
60 for record in records:
61 payload = record.pop(AIRBYTE_DATA_RECORD_FIELD)
62 self.on_event(payload)
63 elif record_type == AIRBYTE_STATE_RECORD_TYPE:
64 # Parse state according to the schema described in the protocol
65 # https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#airbytestatemessage
66 for record in records:
67 full_state = json.loads(record[AIRBYTE_DATA_RECORD_FIELD])
68 state_type = full_state.get("type", "LEGACY")
69 if state_type == "LEGACY":
70 self._handle_legacy_state(full_state)
71 elif state_type == "GLOBAL":
72 self._handle_global_state(full_state)
73 elif state_type == "STREAM" or state_type == "PER_STREAM":
74 # two different names in the docs, hence two clauses
75 self._handle_stream_state(full_state)
76 else:
77 logging.warning(
78 f"Unknown state type: {state_type}. Ignoring state: {full_state}"
79 )
80 self.on_state(self.get_state())
81
82 def _handle_stream_state(self, full_state):
83 stream = full_state.get("stream")

Callers

nothing calls this directly

Calls 9

_handle_legacy_stateMethod · 0.95
_handle_global_stateMethod · 0.95
_handle_stream_stateMethod · 0.95
get_stateMethod · 0.95
startswithMethod · 0.80
on_eventMethod · 0.80
on_stateMethod · 0.80
loadsMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected