(eventMessage resource.WsOutMessage, msg *redis.Message, typename string, cruds map[string]*resource.DbResource, columnInfo api2go.ColumnInfo, store ydb.Store)
| 13 | ) |
| 14 | |
| 15 | func ProcessEventMessage(eventMessage resource.WsOutMessage, msg *redis.Message, typename string, cruds map[string]*resource.DbResource, columnInfo api2go.ColumnInfo, store ydb.Store) error { |
| 16 | var err error |
| 17 | err = eventMessage.UnmarshalBinary([]byte(msg.Payload)) |
| 18 | if err != nil { |
| 19 | resource.CheckErr(err, "Failed to read message on channel "+typename) |
| 20 | return nil |
| 21 | } |
| 22 | if eventMessage.Event == "update" && eventMessage.Topic == typename { |
| 23 | eventDataMap := make(map[string]interface{}) |
| 24 | err = json.Unmarshal(eventMessage.Data, &eventDataMap) |
| 25 | if err != nil { |
| 26 | resource.CheckErr(err, "Failed to unmarshal message ["+eventMessage.Topic+"]") |
| 27 | return nil |
| 28 | } |
| 29 | stringReferenceId := eventDataMap["reference_id"] |
| 30 | if stringReferenceId == nil { |
| 31 | logrus.Warnf("no reference id in event data map %v", eventDataMap) |
| 32 | return nil |
| 33 | } |
| 34 | |
| 35 | stringRefId, ok := stringReferenceId.(string) |
| 36 | if !ok { |
| 37 | logrus.Warnf("reference_id is not a string: %v", stringReferenceId) |
| 38 | return nil |
| 39 | } |
| 40 | referenceId, parseErr := uuid.Parse(stringRefId) |
| 41 | if parseErr != nil { |
| 42 | logrus.Warnf("failed to parse reference_id as UUID: %v", stringRefId) |
| 43 | return nil |
| 44 | } |
| 45 | |
| 46 | colData, ok := eventDataMap[columnInfo.ColumnName] |
| 47 | if ok && colData != nil { |
| 48 | colDataMap, ok := colData.([]interface{}) |
| 49 | if ok { |
| 50 | for _, file := range colDataMap { |
| 51 | fileMap, ok := file.(map[string]interface{}) |
| 52 | if !ok { |
| 53 | continue |
| 54 | } |
| 55 | if fileMap["type"] != "x-crdt/yjs" { |
| 56 | return nil |
| 57 | } |
| 58 | } |
| 59 | } |
| 60 | } |
| 61 | |
| 62 | transaction1, txErr := cruds[typename].Connection().Beginx() |
| 63 | if txErr != nil { |
| 64 | resource.CheckErr(txErr, "Failed to begin transaction [788]") |
| 65 | return nil |
| 66 | } |
| 67 | defer transaction1.Rollback() |
| 68 | |
| 69 | object, _, getErr := cruds[typename].GetSingleRowByReferenceIdWithTransaction(typename, daptinid.DaptinReferenceId(referenceId), map[string]bool{ |
| 70 | columnInfo.ColumnName: true, |
| 71 | }, transaction1) |
| 72 | logrus.Tracef("Completed dtopicMapListener GetSingleRowByReferenceIdWithTransaction") |
no test coverage detected