readStream pumps Chunks from one FakeConn onto the shared events channel. It returns on EOF / ErrClosed / deadline-exceeded; the supervisor.Session's ctx is observed indirectly: when the ctx is cancelled the relay closes the FakeConn streams, which surfaces here as ErrClosed.
(fc *fakeconn.FakeConn, dir fakeconn.Direction, out chan<- chunkEvent, done chan<- struct{})
| 247 | // cancelled the relay closes the FakeConn streams, which surfaces |
| 248 | // here as ErrClosed. |
| 249 | func readStream(fc *fakeconn.FakeConn, dir fakeconn.Direction, out chan<- chunkEvent, done chan<- struct{}) { |
| 250 | defer func() { done <- struct{}{} }() |
| 251 | if fc == nil { |
| 252 | out <- chunkEvent{dir: dir, err: io.EOF} |
| 253 | return |
| 254 | } |
| 255 | for { |
| 256 | c, err := fc.ReadChunk() |
| 257 | if err != nil { |
| 258 | out <- chunkEvent{dir: dir, err: err} |
| 259 | return |
| 260 | } |
| 261 | // Empty chunk with no error shouldn't happen on a real stream |
| 262 | // but guard against it so the consumer never stalls. |
| 263 | if len(c.Bytes) == 0 { |
| 264 | continue |
| 265 | } |
| 266 | out <- chunkEvent{ |
| 267 | dir: dir, |
| 268 | bytes: c.Bytes, |
| 269 | readAt: c.ReadAt, |
| 270 | writtenAt: c.WrittenAt, |
| 271 | } |
| 272 | } |
| 273 | } |
| 274 | |
| 275 | // isBenignReadErr reports whether err is one of the expected end-of-stream |
| 276 | // signals from a FakeConn. |
no test coverage detected