MCPcopy
hub / github.com/aspen-cloud/triplit / onMessageHandler

Method onMessageHandler

packages/client/src/sync-engine.ts:851–983  ·  view source on GitHub ↗
(session: SyncSession)

Source from the content-addressed store, hash-verified

849
850 // TODO: add an onError handler to gracefully handle errors in message handlers
851 private onMessageHandler(session: SyncSession) {
852 return async (evt: any) => {
853 const message: ServerSyncMessage = JSON.parse(evt.data);
854 this.logger.debug('received', message);
855 for (const handler of this.messageReceivedSubscribers) {
856 handler(message);
857 }
858 if (message.type === 'ERROR') {
859 await this.handleErrorMessage(message);
860 }
861 if (message.type === 'ENTITY_DATA') {
862 const {
863 changes: stringifiedChanges,
864 timestamp,
865 forQueries: queryIds,
866 } = message.payload;
867 const changes = SuperJSON.deserialize<DBChanges>(stringifiedChanges);
868 // first apply changes
869 // the db will push these onto IVMs buffer
870 if (this.client.awaitReady) await this.client.awaitReady;
871 await this.client.db.applyChangesWithTimestamp(changes, timestamp, {
872 skipRules: true,
873 });
874
875 // TODO do in same transaction
876 await this.client.db.setMetadata(
877 ['latest_server_timestamp'],
878 timestamp
879 );
880
881 // then update the query fulfillment state so that
882 // the client can signal in the results handler
883 // that the next time IVM fires, it's because
884 // of the server's response
885 for (const qId of queryIds) {
886 const query = this.queries.get(qId);
887 if (!query) continue;
888 if (query.syncState !== 'FULFILLED') {
889 await this.markQueryAsSeen(qId);
890 query.syncState = 'FULFILLED';
891 }
892 // this.queryFulfillmentCallbacks.delete(qId);
893 }
894
895 // update IVM
896 await this.client.db.updateQueryViews();
897 this.client.db.broadcastToQuerySubscribers();
898
899 // finally, run the query fulfillment callbacks
900 for (const qId of queryIds) {
901 const query = this.queries.get(qId);
902 if (!query) continue;
903 for (const callback of query.syncStateCallbacks) {
904 callback('FULFILLED', message.payload);
905 }
906 }
907 }
908

Callers 1

createConnectionMethod · 0.95

Calls 15

handleErrorMessageMethod · 0.95
markQueryAsSeenMethod · 0.95
syncWritesMethod · 0.95
closeConnectionMethod · 0.95
sendMessageMethod · 0.95
initializeSyncMethod · 0.95
handlerFunction · 0.85
setMetadataMethod · 0.80
updateQueryViewsMethod · 0.80
hasMethod · 0.80

Tested by

no test coverage detected