(clusterID uint64)
| 705 | } |
| 706 | |
| 707 | func (p *workerPool) getStreamJob(clusterID uint64) (job, bool) { |
| 708 | n, ok := p.nodes[clusterID] |
| 709 | if !ok { |
| 710 | return job{}, false |
| 711 | } |
| 712 | req, sinkFn, ok := n.ss.getStreamReq() |
| 713 | if !ok { |
| 714 | return job{}, false |
| 715 | } |
| 716 | return job{ |
| 717 | task: req, |
| 718 | node: n, |
| 719 | sink: sinkFn, |
| 720 | instanceID: n.instanceID, |
| 721 | clusterID: clusterID, |
| 722 | }, true |
| 723 | } |
| 724 | |
| 725 | func (p *workerPool) scheduleTask(j job, n *node, w *ssWorker) { |
| 726 | if n.instanceID == j.instanceID { |
no test coverage detected