MCPcopy
hub / github.com/canopy-network/canopy / Send

Method Send

p2p/conn.go:147–173  ·  view source on GitHub ↗

Send() queues the sending of a message to a specific Stream

(topic lib.Topic, bz []byte)

Source from the content-addressed store, hash-verified

145
146// Send() queues the sending of a message to a specific Stream
147func (c *MultiConn) Send(topic lib.Topic, bz []byte) (ok bool) {
148 defer lib.TimeTrack(c.log, time.Now(), time.Second)
149 startTime := time.Now()
150 stream, ok := c.streams[topic]
151 if !ok {
152 c.log.Errorf("Stream %s does not exist", topic)
153 return
154 }
155 chunks := split(bz, int(maxDataChunkSize))
156 var packets []*Packet
157 for i, chunk := range chunks {
158 packets = append(packets, &Packet{
159 StreamId: topic,
160 Eof: i == len(chunks)-1,
161 Bytes: chunk,
162 })
163 }
164 if c.p2p.metrics != nil {
165 c.p2p.metrics.MessageSize.Observe(float64(len(bz)))
166 c.p2p.metrics.PacketsPerMessage.Observe(float64(len(packets)))
167 }
168 ok = stream.queueSends(packets, startTime, c.p2p.metrics)
169 if !ok {
170 c.log.Errorf("Packet(ID:%s) packet failed in queue for: %s", lib.Topic_name[int32(topic)], lib.BytesToTruncatedString(c.Address.PublicKey))
171 }
172 return
173}
174
175// startSendService() starts the main send service
176// - converges and writes the send queue from all streams into the underlying tcp connection.

Callers 1

sendMethod · 0.80

Calls 5

TimeTrackFunction · 0.92
BytesToTruncatedStringFunction · 0.92
splitFunction · 0.85
queueSendsMethod · 0.80
ErrorfMethod · 0.65

Tested by

no test coverage detected