MCPcopy
hub / github.com/wavetermdev/waveterm / prepareNextPacket

Method prepareNextPacket

pkg/jobmanager/streammanager.go:359–417  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

357}
358
359func (sm *StreamManager) prepareNextPacket() (done bool, pkt *wshrpc.CommandStreamData, sender DataSender) {
360 sm.lock.Lock()
361 defer sm.lock.Unlock()
362
363 available := sm.buf.Size()
364
365 if sm.closed || sm.terminalEventAcked {
366 return true, nil, nil
367 }
368
369 if !sm.connected {
370 sm.drainCond.Wait()
371 return false, nil, nil
372 }
373
374 if available == 0 {
375 if sm.terminalEvent != nil && !sm.terminalEventSent {
376 return false, sm.prepareTerminalPacket(), sm.dataSender
377 }
378 sm.drainCond.Wait()
379 return false, nil, nil
380 }
381
382 effectiveRwnd := sm.rwndSize
383 if sm.cwndSize < effectiveRwnd {
384 effectiveRwnd = sm.cwndSize
385 }
386 availableToSend := int64(effectiveRwnd) - sm.sentNotAcked
387
388 if availableToSend <= 0 {
389 sm.drainCond.Wait()
390 return false, nil, nil
391 }
392
393 peekSize := int(availableToSend)
394 if peekSize > MaxPacketSize {
395 peekSize = MaxPacketSize
396 }
397 if peekSize > available {
398 peekSize = available
399 }
400
401 data := make([]byte, peekSize)
402 n := sm.buf.PeekDataAt(int(sm.sentNotAcked), data)
403 if n == 0 {
404 sm.drainCond.Wait()
405 return false, nil, nil
406 }
407 data = data[:n]
408
409 seq := sm.buf.HeadPos() + sm.sentNotAcked
410 sm.sentNotAcked += int64(n)
411
412 return false, &wshrpc.CommandStreamData{
413 Id: sm.streamId,
414 Seq: seq,
415 Data64: base64.StdEncoding.EncodeToString(data),
416 }, sm.dataSender

Callers 1

senderLoopMethod · 0.95

Calls 5

prepareTerminalPacketMethod · 0.95
PeekDataAtMethod · 0.80
HeadPosMethod · 0.80
WaitMethod · 0.65
SizeMethod · 0.45

Tested by

no test coverage detected