MCPcopy
hub / github.com/apache/devlake / scanOutputPipe

Function scanOutputPipe

backend/core/utils/ipc.go:237–253  ·  view source on GitHub ↗
(pipe io.ReadCloser, wg *sync.WaitGroup, onReceive func([]byte),
	responseCreator func([]byte) *ProcessResponse, outboundChannel chan<- *ProcessResponse)

Source from the content-addressed store, hash-verified

235}
236
237func scanOutputPipe(pipe io.ReadCloser, wg *sync.WaitGroup, onReceive func([]byte),
238 responseCreator func([]byte) *ProcessResponse, outboundChannel chan<- *ProcessResponse) func() {
239 return func() {
240 scanner := bufio.NewScanner(pipe)
241 scanner.Split(bufio.ScanLines)
242 for scanner.Scan() {
243 src := scanner.Bytes()
244 data := make([]byte, len(src))
245 copy(data, src)
246 if onReceive != nil {
247 onReceive(data)
248 }
249 outboundChannel <- responseCreator(data)
250 }
251 wg.Done()
252 }
253}
254
255func scanErrorPipe(pipe io.ReadCloser, onReceive func([]byte), outboundChannel chan<- *ProcessResponse) (func(), *strings.Builder) {
256 remoteErrorMsg := &strings.Builder{}

Callers 1

StreamProcessFunction · 0.85

Calls 1

ScanMethod · 0.65

Tested by

no test coverage detected