MCPcopy Index your code
hub / github.com/modelcontextprotocol/go-sdk / Write

Method Write

mcp/streamable.go:1373–1484  ·  view source on GitHub ↗

Write implements the [Connection] interface.

(ctx context.Context, msg jsonrpc.Message)

Source from the content-addressed store, hash-verified

1371
1372// Write implements the [Connection] interface.
1373func (c *streamableServerConn) Write(ctx context.Context, msg jsonrpc.Message) error {
1374 // Throughout this function, note that any error that wraps ErrRejected
1375 // indicates a does not cause the connection to break.
1376 //
1377 // Most errors don't break the connection: unlike a true bidirectional
1378 // stream, a failure to deliver to a stream is not an indication that the
1379 // logical session is broken.
1380 data, err := jsonrpc2.EncodeMessage(msg)
1381 if err != nil {
1382 return err
1383 }
1384
1385 if req, ok := msg.(*jsonrpc.Request); ok && req.IsCall() && (c.stateless || c.sessionID == "") {
1386 // Requests aren't possible with stateless servers, or when there's no session ID.
1387 return fmt.Errorf("%w: stateless servers cannot make requests", jsonrpc2.ErrRejected)
1388 }
1389
1390 // Find the incoming request that this write relates to, if any.
1391 var (
1392 relatedRequest jsonrpc.ID
1393 responseTo jsonrpc.ID // if valid, the message is a response to this request
1394 )
1395 if resp, ok := msg.(*jsonrpc.Response); ok {
1396 // If the message is a response, it relates to its request (of course).
1397 relatedRequest = resp.ID
1398 responseTo = resp.ID
1399 } else {
1400 // Otherwise, we check to see if it request was made in the context of an
1401 // ongoing request. This may not be the case if the request was made with
1402 // an unrelated context.
1403 if v := ctx.Value(idContextKey{}); v != nil {
1404 relatedRequest = v.(jsonrpc.ID)
1405 }
1406 }
1407
1408 // If the stream is application/json, but the message is not a response, we
1409 // must send it out of band to the standalone SSE stream.
1410 if c.jsonResponse && !responseTo.IsValid() {
1411 relatedRequest = jsonrpc.ID{}
1412 }
1413
1414 // Write the message to the stream.
1415 var s *stream
1416 c.mu.Lock()
1417 if relatedRequest.IsValid() {
1418 if streamID, ok := c.requestStreams[relatedRequest]; ok {
1419 s = c.streams[streamID]
1420 }
1421 } else {
1422 s = c.streams[""] // standalone SSE stream
1423 }
1424 if responseTo.IsValid() {
1425 // Once we've responded to a request, disallow related messages by removing
1426 // the stream association. This also releases memory.
1427 delete(c.requestStreams, responseTo)
1428 }
1429 sessionClosed := c.isDone
1430 c.mu.Unlock()

Callers

nothing calls this directly

Calls 7

EncodeMessageFunction · 0.92
formatEventIDFunction · 0.85
IsCallMethod · 0.80
IsValidMethod · 0.80
deliverLockedMethod · 0.80
AppendMethod · 0.65
ValueMethod · 0.45

Tested by

no test coverage detected