MCPcopy
hub / github.com/PostHog/posthog / EventsProcessor

Class EventsProcessor

plugin-server/src/worker/ingestion/process-event.ts:33–351  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

31import { captureIngestionWarning } from './utils'
32
33export class EventsProcessor {
34 pluginsServer: Hub
35 db: DB
36 clickhouse: ClickHouse
37 kafkaProducer: KafkaProducerWrapper
38 teamManager: TeamManager
39 groupTypeManager: GroupTypeManager
40
41 constructor(pluginsServer: Hub) {
42 this.pluginsServer = pluginsServer
43 this.db = pluginsServer.db
44 this.clickhouse = pluginsServer.clickhouse
45 this.kafkaProducer = pluginsServer.kafkaProducer
46 this.teamManager = pluginsServer.teamManager
47 this.groupTypeManager = new GroupTypeManager(pluginsServer.db, this.teamManager, pluginsServer.SITE_URL)
48 }
49
50 public async processEvent(
51 distinctId: string,
52 ip: string | null,
53 data: PluginEvent,
54 teamId: number,
55 timestamp: DateTime,
56 eventUuid: string
57 ): Promise<PreIngestionEvent | null> {
58 if (!UUID.validateString(eventUuid, false)) {
59 captureIngestionWarning(this.db, teamId, 'skipping_event_invalid_uuid', {
60 eventUuid: JSON.stringify(eventUuid),
61 })
62 throw new Error(`Not a valid UUID: "${eventUuid}"`)
63 }
64 const singleSaveTimer = new Date()
65 const timeout = timeoutGuard('Still inside "EventsProcessor.processEvent". Timeout warning after 30 sec!', {
66 event: JSON.stringify(data),
67 })
68
69 let result: PreIngestionEvent | null = null
70 try {
71 // We know `normalizeEvent` has been called here.
72 const properties: Properties = data.properties!
73
74 const team = await this.teamManager.fetchTeam(teamId)
75 if (!team) {
76 throw new Error(`No team found with ID ${teamId}. Can't ingest event.`)
77 }
78
79 if (data['event'] === '$snapshot') {
80 if (team.session_recording_opt_in) {
81 const snapshotEventTimeout = timeoutGuard(
82 'Still running "createSessionRecordingEvent". Timeout warning after 30 sec!',
83 { eventUuid }
84 )
85 try {
86 result = await this.createSessionRecordingEvent(
87 eventUuid,
88 teamId,
89 distinctId,
90 timestamp,

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…