(msg *ClientComMessage)
| 1103 | } |
| 1104 | |
| 1105 | func (s *Session) get(msg *ClientComMessage) { |
| 1106 | // Expand topic name. |
| 1107 | var resp *ServerComMessage |
| 1108 | msg.RcptTo, resp = s.expandTopicName(msg) |
| 1109 | if resp != nil { |
| 1110 | s.queueOut(resp) |
| 1111 | return |
| 1112 | } |
| 1113 | |
| 1114 | msg.MetaWhat = parseMsgClientMeta(msg.Get.What) |
| 1115 | |
| 1116 | sub := s.getSub(msg.RcptTo) |
| 1117 | if msg.MetaWhat == 0 { |
| 1118 | s.queueOut(ErrMalformedReply(msg, msg.Timestamp)) |
| 1119 | logs.Warn.Println("s.get: invalid Get message action", msg.Get.What) |
| 1120 | } else if sub != nil { |
| 1121 | select { |
| 1122 | case sub.meta <- msg: |
| 1123 | default: |
| 1124 | // Reply with a 503 to the user. |
| 1125 | s.queueOut(ErrServiceUnavailableReply(msg, msg.Timestamp)) |
| 1126 | logs.Err.Println("s.get: sub.meta channel full, topic ", msg.RcptTo, s.sid) |
| 1127 | } |
| 1128 | } else if msg.MetaWhat&(constMsgMetaDesc|constMsgMetaSub) != 0 { |
| 1129 | // Request some minimal info from a topic not currently attached to. |
| 1130 | select { |
| 1131 | case globals.hub.meta <- msg: |
| 1132 | default: |
| 1133 | // Reply with a 503 to the user. |
| 1134 | s.queueOut(ErrServiceUnavailableReply(msg, msg.Timestamp)) |
| 1135 | logs.Err.Println("s.get: hub.meta channel full", s.sid) |
| 1136 | } |
| 1137 | } else { |
| 1138 | logs.Warn.Println("s.get: subscribe first to get=", msg.Get.What) |
| 1139 | s.queueOut(ErrPermissionDeniedReply(msg, msg.Timestamp)) |
| 1140 | } |
| 1141 | } |
| 1142 | |
| 1143 | func (s *Session) set(msg *ClientComMessage) { |
| 1144 | // Expand topic name. |
no test coverage detected