(proc string, req, resp any, done chan *rpc.Call)
| 343 | } |
| 344 | |
| 345 | func (n *ClusterNode) callAsync(proc string, req, resp any, done chan *rpc.Call) *rpc.Call { |
| 346 | if done != nil && cap(done) == 0 { |
| 347 | logs.Err.Panic("cluster: RPC done channel is unbuffered") |
| 348 | } |
| 349 | |
| 350 | if !n.connected { |
| 351 | call := &rpc.Call{ |
| 352 | ServiceMethod: proc, |
| 353 | Args: req, |
| 354 | Reply: resp, |
| 355 | Error: errors.New("cluster: node '" + n.name + "' not connected"), |
| 356 | Done: done, |
| 357 | } |
| 358 | if done != nil { |
| 359 | done <- call |
| 360 | } |
| 361 | return call |
| 362 | } |
| 363 | |
| 364 | var responseChan chan *rpc.Call |
| 365 | if done != nil { |
| 366 | // Make a separate response callback if we need to notify the caller. |
| 367 | myDone := make(chan *rpc.Call, 1) |
| 368 | go func() { |
| 369 | call := <-myDone |
| 370 | n.handleRpcResponse(call) |
| 371 | if done != nil { |
| 372 | done <- call |
| 373 | } |
| 374 | }() |
| 375 | responseChan = myDone |
| 376 | } else { |
| 377 | responseChan = n.rpcDone |
| 378 | } |
| 379 | |
| 380 | call := n.endpoint.Go(proc, req, resp, responseChan) |
| 381 | |
| 382 | return call |
| 383 | } |
| 384 | |
| 385 | // proxyToMaster forwards request from topic proxy to topic master. |
| 386 | func (n *ClusterNode) proxyToMaster(msg *ClusterReq) error { |
no test coverage detected