(self, record_type, records)
| 56 | self._handle_global_state(state) |
| 57 | |
| 58 | def _write(self, record_type, records): |
| 59 | if record_type.startswith(AIRBYTE_STREAM_RECORD_PREFIX): |
| 60 | for record in records: |
| 61 | payload = record.pop(AIRBYTE_DATA_RECORD_FIELD) |
| 62 | self.on_event(payload) |
| 63 | elif record_type == AIRBYTE_STATE_RECORD_TYPE: |
| 64 | # Parse state according to the schema described in the protocol |
| 65 | # https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#airbytestatemessage |
| 66 | for record in records: |
| 67 | full_state = json.loads(record[AIRBYTE_DATA_RECORD_FIELD]) |
| 68 | state_type = full_state.get("type", "LEGACY") |
| 69 | if state_type == "LEGACY": |
| 70 | self._handle_legacy_state(full_state) |
| 71 | elif state_type == "GLOBAL": |
| 72 | self._handle_global_state(full_state) |
| 73 | elif state_type == "STREAM" or state_type == "PER_STREAM": |
| 74 | # two different names in the docs, hence two clauses |
| 75 | self._handle_stream_state(full_state) |
| 76 | else: |
| 77 | logging.warning( |
| 78 | f"Unknown state type: {state_type}. Ignoring state: {full_state}" |
| 79 | ) |
| 80 | self.on_state(self.get_state()) |
| 81 | |
| 82 | def _handle_stream_state(self, full_state): |
| 83 | stream = full_state.get("stream") |
nothing calls this directly
no test coverage detected