blocking, returns if there is an error, or on EOF of input
(logName string, input chan utilfn.LineOutput, output io.Writer)
| 278 | |
| 279 | // blocking, returns if there is an error, or on EOF of input |
| 280 | func HandleStdIOClient(logName string, input chan utilfn.LineOutput, output io.Writer) { |
| 281 | proxy := MakeRpcProxy(logName) |
| 282 | linkId := DefaultRouter.RegisterTrustedRouter(proxy) |
| 283 | rawCh := make(chan []byte, DefaultInputChSize) |
| 284 | go func() { |
| 285 | defer func() { |
| 286 | panichandler.PanicHandler("HandleStdIOClient:ParseWithLinesChan", recover()) |
| 287 | }() |
| 288 | packetparser.ParseWithLinesChan(input, proxy.FromRemoteCh, rawCh) |
| 289 | }() |
| 290 | doneCh := make(chan struct{}) |
| 291 | var doneOnce sync.Once |
| 292 | closeDoneCh := func() { |
| 293 | doneOnce.Do(func() { |
| 294 | close(doneCh) |
| 295 | DefaultRouter.UnregisterLink(linkId) |
| 296 | close(proxy.FromRemoteCh) |
| 297 | }) |
| 298 | } |
| 299 | go func() { |
| 300 | defer func() { |
| 301 | panichandler.PanicHandler("HandleStdIOClient:ToRemoteChLoop", recover()) |
| 302 | }() |
| 303 | defer closeDoneCh() |
| 304 | for msg := range proxy.ToRemoteCh { |
| 305 | err := packetparser.WritePacket(output, msg) |
| 306 | if err != nil { |
| 307 | log.Printf("[%s] error writing to output: %v\n", logName, err) |
| 308 | break |
| 309 | } |
| 310 | } |
| 311 | }() |
| 312 | go func() { |
| 313 | defer func() { |
| 314 | panichandler.PanicHandler("HandleStdIOClient:RawChLoop", recover()) |
| 315 | }() |
| 316 | defer closeDoneCh() |
| 317 | for msg := range rawCh { |
| 318 | if !bytes.HasSuffix(msg, []byte{'\n'}) { |
| 319 | msg = append(msg, '\n') |
| 320 | } |
| 321 | log.Printf("[%s:stdout] %s", logName, msg) |
| 322 | } |
| 323 | }() |
| 324 | <-doneCh |
| 325 | } |
| 326 | |
| 327 | func handleDomainSocketClient(conn net.Conn, readCallback func()) { |
| 328 | var linkIdContainer atomic.Int32 |
no test coverage detected