MCPcopy
hub / github.com/daptin/daptin / ProcessEventMessage

Function ProcessEventMessage

server/event_message_handler.go:15–117  ·  view source on GitHub ↗
(eventMessage resource.WsOutMessage, msg *redis.Message, typename string, cruds map[string]*resource.DbResource, columnInfo api2go.ColumnInfo, store ydb.Store)

Source from the content-addressed store, hash-verified

13)
14
15func ProcessEventMessage(eventMessage resource.WsOutMessage, msg *redis.Message, typename string, cruds map[string]*resource.DbResource, columnInfo api2go.ColumnInfo, store ydb.Store) error {
16 var err error
17 err = eventMessage.UnmarshalBinary([]byte(msg.Payload))
18 if err != nil {
19 resource.CheckErr(err, "Failed to read message on channel "+typename)
20 return nil
21 }
22 if eventMessage.Event == "update" && eventMessage.Topic == typename {
23 eventDataMap := make(map[string]interface{})
24 err = json.Unmarshal(eventMessage.Data, &eventDataMap)
25 if err != nil {
26 resource.CheckErr(err, "Failed to unmarshal message ["+eventMessage.Topic+"]")
27 return nil
28 }
29 stringReferenceId := eventDataMap["reference_id"]
30 if stringReferenceId == nil {
31 logrus.Warnf("no reference id in event data map %v", eventDataMap)
32 return nil
33 }
34
35 stringRefId, ok := stringReferenceId.(string)
36 if !ok {
37 logrus.Warnf("reference_id is not a string: %v", stringReferenceId)
38 return nil
39 }
40 referenceId, parseErr := uuid.Parse(stringRefId)
41 if parseErr != nil {
42 logrus.Warnf("failed to parse reference_id as UUID: %v", stringRefId)
43 return nil
44 }
45
46 colData, ok := eventDataMap[columnInfo.ColumnName]
47 if ok && colData != nil {
48 colDataMap, ok := colData.([]interface{})
49 if ok {
50 for _, file := range colDataMap {
51 fileMap, ok := file.(map[string]interface{})
52 if !ok {
53 continue
54 }
55 if fileMap["type"] != "x-crdt/yjs" {
56 return nil
57 }
58 }
59 }
60 }
61
62 transaction1, txErr := cruds[typename].Connection().Beginx()
63 if txErr != nil {
64 resource.CheckErr(txErr, "Failed to begin transaction [788]")
65 return nil
66 }
67 defer transaction1.Rollback()
68
69 object, _, getErr := cruds[typename].GetSingleRowByReferenceIdWithTransaction(typename, daptinid.DaptinReferenceId(referenceId), map[string]bool{
70 columnInfo.ColumnName: true,
71 }, transaction1)
72 logrus.Tracef("Completed dtopicMapListener GetSingleRowByReferenceIdWithTransaction")

Callers 1

InitializeYjsResourcesFunction · 0.85

Calls 7

CheckErrFunction · 0.92
makeFunction · 0.85
ParseMethod · 0.80
BeginxMethod · 0.65
ConnectionMethod · 0.65
UnmarshalBinaryMethod · 0.45

Tested by

no test coverage detected