MCPcopy
hub / github.com/tinode/chat / saveAndBroadcastMessage

Method saveAndBroadcastMessage

server/topic.go:999–1088  ·  view source on GitHub ↗

Saves a new message (defined by head, content and attachments) in the topic in response to a client request (msg, asUid) and broadcasts it to the attached sessions.

(msg *ClientComMessage, asUid types.Uid, noEcho bool, attachments []string, head map[string]any, content any)

Source from the content-addressed store, hash-verified

997// Saves a new message (defined by head, content and attachments) in the topic
998// in response to a client request (msg, asUid) and broadcasts it to the attached sessions.
999func (t *Topic) saveAndBroadcastMessage(msg *ClientComMessage, asUid types.Uid, noEcho bool, attachments []string, head map[string]any, content any) error {
1000 pud, userFound := t.perUser[asUid]
1001 // Anyone is allowed to post to 'sys' topic.
1002 if t.cat != types.TopicCatSys {
1003 // If it's not 'sys' check write permission.
1004 if !(pud.modeWant & pud.modeGiven).IsWriter() {
1005 msg.sess.queueOut(ErrPermissionDenied(msg.Id, t.original(asUid), msg.Timestamp))
1006 return types.ErrPermissionDenied
1007 }
1008 }
1009
1010 if msg.sess != nil && msg.sess.uid != asUid {
1011 // The "sender" header contains ID of the user who sent the message on behalf of asUid.
1012 if head == nil {
1013 head = map[string]any{}
1014 }
1015 head["sender"] = msg.sess.uid.UserId()
1016 } else if head != nil {
1017 // Make sure the received Head does not include a fake "sender" header.
1018 delete(head, "sender")
1019 }
1020
1021 markedReadBySender := false
1022 if err, unreadUpdated := store.Messages.Save(
1023 &types.Message{
1024 ObjHeader: types.ObjHeader{CreatedAt: msg.Timestamp},
1025 SeqId: t.lastID + 1,
1026 Topic: t.name,
1027 From: asUid.String(),
1028 Head: head,
1029 Content: content,
1030 }, attachments, (pud.modeGiven & pud.modeWant).IsReader()); err != nil {
1031 logs.Warn.Printf("topic[%s]: failed to save message: %v", t.name, err)
1032 msg.sess.queueOut(ErrUnknown(msg.Id, t.original(asUid), msg.Timestamp))
1033
1034 return err
1035 } else {
1036 markedReadBySender = unreadUpdated
1037 }
1038
1039 t.lastID++
1040 t.touched = msg.Timestamp
1041
1042 if userFound {
1043 pud.readID = t.lastID
1044 pud.recvID = t.lastID
1045 t.perUser[asUid] = pud
1046 }
1047
1048 if msg.Id != "" && msg.sess != nil {
1049 reply := NoErrAccepted(msg.Id, t.original(asUid), msg.Timestamp)
1050 reply.Ctrl.Params = map[string]any{"seq": t.lastID}
1051 msg.sess.queueOut(reply)
1052 }
1053
1054 data := &ServerComMessage{
1055 Data: &MsgServerData{
1056 Topic: msg.Original,

Callers 3

handleCallEventMethod · 0.95
handlePubBroadcastMethod · 0.95

Calls 15

originalMethod · 0.95
presSubsOfflineMethod · 0.95
broadcastToSessionsMethod · 0.95
pushForDataMethod · 0.95
ErrPermissionDeniedFunction · 0.85
ErrUnknownFunction · 0.85
NoErrAcceptedFunction · 0.85
pluginMessageFunction · 0.85
sendPushFunction · 0.85
IsWriterMethod · 0.80
queueOutMethod · 0.80
UserIdMethod · 0.80

Tested by

no test coverage detected