Write implements the [Connection] interface.
(ctx context.Context, msg jsonrpc.Message)
| 1371 | |
| 1372 | // Write implements the [Connection] interface. |
| 1373 | func (c *streamableServerConn) Write(ctx context.Context, msg jsonrpc.Message) error { |
| 1374 | // Throughout this function, note that any error that wraps ErrRejected |
| 1375 | // indicates a does not cause the connection to break. |
| 1376 | // |
| 1377 | // Most errors don't break the connection: unlike a true bidirectional |
| 1378 | // stream, a failure to deliver to a stream is not an indication that the |
| 1379 | // logical session is broken. |
| 1380 | data, err := jsonrpc2.EncodeMessage(msg) |
| 1381 | if err != nil { |
| 1382 | return err |
| 1383 | } |
| 1384 | |
| 1385 | if req, ok := msg.(*jsonrpc.Request); ok && req.IsCall() && (c.stateless || c.sessionID == "") { |
| 1386 | // Requests aren't possible with stateless servers, or when there's no session ID. |
| 1387 | return fmt.Errorf("%w: stateless servers cannot make requests", jsonrpc2.ErrRejected) |
| 1388 | } |
| 1389 | |
| 1390 | // Find the incoming request that this write relates to, if any. |
| 1391 | var ( |
| 1392 | relatedRequest jsonrpc.ID |
| 1393 | responseTo jsonrpc.ID // if valid, the message is a response to this request |
| 1394 | ) |
| 1395 | if resp, ok := msg.(*jsonrpc.Response); ok { |
| 1396 | // If the message is a response, it relates to its request (of course). |
| 1397 | relatedRequest = resp.ID |
| 1398 | responseTo = resp.ID |
| 1399 | } else { |
| 1400 | // Otherwise, we check to see if it request was made in the context of an |
| 1401 | // ongoing request. This may not be the case if the request was made with |
| 1402 | // an unrelated context. |
| 1403 | if v := ctx.Value(idContextKey{}); v != nil { |
| 1404 | relatedRequest = v.(jsonrpc.ID) |
| 1405 | } |
| 1406 | } |
| 1407 | |
| 1408 | // If the stream is application/json, but the message is not a response, we |
| 1409 | // must send it out of band to the standalone SSE stream. |
| 1410 | if c.jsonResponse && !responseTo.IsValid() { |
| 1411 | relatedRequest = jsonrpc.ID{} |
| 1412 | } |
| 1413 | |
| 1414 | // Write the message to the stream. |
| 1415 | var s *stream |
| 1416 | c.mu.Lock() |
| 1417 | if relatedRequest.IsValid() { |
| 1418 | if streamID, ok := c.requestStreams[relatedRequest]; ok { |
| 1419 | s = c.streams[streamID] |
| 1420 | } |
| 1421 | } else { |
| 1422 | s = c.streams[""] // standalone SSE stream |
| 1423 | } |
| 1424 | if responseTo.IsValid() { |
| 1425 | // Once we've responded to a request, disallow related messages by removing |
| 1426 | // the stream association. This also releases memory. |
| 1427 | delete(c.requestStreams, responseTo) |
| 1428 | } |
| 1429 | sessionClosed := c.isDone |
| 1430 | c.mu.Unlock() |
nothing calls this directly
no test coverage detected