(session: SyncSession)
| 849 | |
| 850 | // TODO: add an onError handler to gracefully handle errors in message handlers |
| 851 | private onMessageHandler(session: SyncSession) { |
| 852 | return async (evt: any) => { |
| 853 | const message: ServerSyncMessage = JSON.parse(evt.data); |
| 854 | this.logger.debug('received', message); |
| 855 | for (const handler of this.messageReceivedSubscribers) { |
| 856 | handler(message); |
| 857 | } |
| 858 | if (message.type === 'ERROR') { |
| 859 | await this.handleErrorMessage(message); |
| 860 | } |
| 861 | if (message.type === 'ENTITY_DATA') { |
| 862 | const { |
| 863 | changes: stringifiedChanges, |
| 864 | timestamp, |
| 865 | forQueries: queryIds, |
| 866 | } = message.payload; |
| 867 | const changes = SuperJSON.deserialize<DBChanges>(stringifiedChanges); |
| 868 | // first apply changes |
| 869 | // the db will push these onto IVMs buffer |
| 870 | if (this.client.awaitReady) await this.client.awaitReady; |
| 871 | await this.client.db.applyChangesWithTimestamp(changes, timestamp, { |
| 872 | skipRules: true, |
| 873 | }); |
| 874 | |
| 875 | // TODO do in same transaction |
| 876 | await this.client.db.setMetadata( |
| 877 | ['latest_server_timestamp'], |
| 878 | timestamp |
| 879 | ); |
| 880 | |
| 881 | // then update the query fulfillment state so that |
| 882 | // the client can signal in the results handler |
| 883 | // that the next time IVM fires, it's because |
| 884 | // of the server's response |
| 885 | for (const qId of queryIds) { |
| 886 | const query = this.queries.get(qId); |
| 887 | if (!query) continue; |
| 888 | if (query.syncState !== 'FULFILLED') { |
| 889 | await this.markQueryAsSeen(qId); |
| 890 | query.syncState = 'FULFILLED'; |
| 891 | } |
| 892 | // this.queryFulfillmentCallbacks.delete(qId); |
| 893 | } |
| 894 | |
| 895 | // update IVM |
| 896 | await this.client.db.updateQueryViews(); |
| 897 | this.client.db.broadcastToQuerySubscribers(); |
| 898 | |
| 899 | // finally, run the query fulfillment callbacks |
| 900 | for (const qId of queryIds) { |
| 901 | const query = this.queries.get(qId); |
| 902 | if (!query) continue; |
| 903 | for (const callback of query.syncStateCallbacks) { |
| 904 | callback('FULFILLED', message.payload); |
| 905 | } |
| 906 | } |
| 907 | } |
| 908 |
no test coverage detected