| 31 | import { captureIngestionWarning } from './utils' |
| 32 | |
| 33 | export class EventsProcessor { |
| 34 | pluginsServer: Hub |
| 35 | db: DB |
| 36 | clickhouse: ClickHouse |
| 37 | kafkaProducer: KafkaProducerWrapper |
| 38 | teamManager: TeamManager |
| 39 | groupTypeManager: GroupTypeManager |
| 40 | |
| 41 | constructor(pluginsServer: Hub) { |
| 42 | this.pluginsServer = pluginsServer |
| 43 | this.db = pluginsServer.db |
| 44 | this.clickhouse = pluginsServer.clickhouse |
| 45 | this.kafkaProducer = pluginsServer.kafkaProducer |
| 46 | this.teamManager = pluginsServer.teamManager |
| 47 | this.groupTypeManager = new GroupTypeManager(pluginsServer.db, this.teamManager, pluginsServer.SITE_URL) |
| 48 | } |
| 49 | |
| 50 | public async processEvent( |
| 51 | distinctId: string, |
| 52 | ip: string | null, |
| 53 | data: PluginEvent, |
| 54 | teamId: number, |
| 55 | timestamp: DateTime, |
| 56 | eventUuid: string |
| 57 | ): Promise<PreIngestionEvent | null> { |
| 58 | if (!UUID.validateString(eventUuid, false)) { |
| 59 | captureIngestionWarning(this.db, teamId, 'skipping_event_invalid_uuid', { |
| 60 | eventUuid: JSON.stringify(eventUuid), |
| 61 | }) |
| 62 | throw new Error(`Not a valid UUID: "${eventUuid}"`) |
| 63 | } |
| 64 | const singleSaveTimer = new Date() |
| 65 | const timeout = timeoutGuard('Still inside "EventsProcessor.processEvent". Timeout warning after 30 sec!', { |
| 66 | event: JSON.stringify(data), |
| 67 | }) |
| 68 | |
| 69 | let result: PreIngestionEvent | null = null |
| 70 | try { |
| 71 | // We know `normalizeEvent` has been called here. |
| 72 | const properties: Properties = data.properties! |
| 73 | |
| 74 | const team = await this.teamManager.fetchTeam(teamId) |
| 75 | if (!team) { |
| 76 | throw new Error(`No team found with ID ${teamId}. Can't ingest event.`) |
| 77 | } |
| 78 | |
| 79 | if (data['event'] === '$snapshot') { |
| 80 | if (team.session_recording_opt_in) { |
| 81 | const snapshotEventTimeout = timeoutGuard( |
| 82 | 'Still running "createSessionRecordingEvent". Timeout warning after 30 sec!', |
| 83 | { eventUuid } |
| 84 | ) |
| 85 | try { |
| 86 | result = await this.createSessionRecordingEvent( |
| 87 | eventUuid, |
| 88 | teamId, |
| 89 | distinctId, |
| 90 | timestamp, |
nothing calls this directly
no outgoing calls
no test coverage detected
searching dependent graphs…