| 424 | ) |
| 425 | |
| 426 | func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { |
| 427 | buf := make([]byte, s.bufferSize) |
| 428 | for { |
| 429 | nr1, err := src.Read(buf) |
| 430 | if err != nil { |
| 431 | if errors.Is(err, io.EOF) { |
| 432 | return |
| 433 | } |
| 434 | // connection already closed |
| 435 | if strings.HasSuffix(err.Error(), "read: connection reset by peer") { |
| 436 | return |
| 437 | } |
| 438 | if strings.HasSuffix(err.Error(), "use of closed network connection") { |
| 439 | return |
| 440 | } |
| 441 | select { |
| 442 | case s.errc <- err: |
| 443 | select { |
| 444 | case <-s.donec: |
| 445 | return |
| 446 | default: |
| 447 | } |
| 448 | case <-s.donec: |
| 449 | return |
| 450 | } |
| 451 | s.lg.Debug("failed to read", zap.Error(err)) |
| 452 | return |
| 453 | } |
| 454 | if nr1 == 0 { |
| 455 | return |
| 456 | } |
| 457 | data := buf[:nr1] |
| 458 | |
| 459 | // alters/corrupts/drops data |
| 460 | switch ptype { |
| 461 | case proxyTx: |
| 462 | s.modifyTxMu.RLock() |
| 463 | if s.modifyTx != nil { |
| 464 | data = s.modifyTx(data) |
| 465 | } |
| 466 | s.modifyTxMu.RUnlock() |
| 467 | case proxyRx: |
| 468 | s.modifyRxMu.RLock() |
| 469 | if s.modifyRx != nil { |
| 470 | data = s.modifyRx(data) |
| 471 | } |
| 472 | s.modifyRxMu.RUnlock() |
| 473 | default: |
| 474 | panic("unknown proxy type") |
| 475 | } |
| 476 | nr2 := len(data) |
| 477 | switch ptype { |
| 478 | case proxyTx: |
| 479 | s.lg.Debug( |
| 480 | "modified tx", |
| 481 | zap.String("data-received", humanize.Bytes(uint64(nr1))), |
| 482 | zap.String("data-modified", humanize.Bytes(uint64(nr2))), |
| 483 | zap.String("from", s.From()), |