SortedEvents fan-in some Event-channels into one, sorted by creation date, using CreatedAt-method. This function assume that each channel is pre-ordered.
(inputs ...<-chan Event)
| 171 | // SortedEvents fan-in some Event-channels into one, sorted by creation date, using CreatedAt-method. |
| 172 | // This function assume that each channel is pre-ordered. |
| 173 | func SortedEvents(inputs ...<-chan Event) chan Event { |
| 174 | out := make(chan Event) |
| 175 | |
| 176 | go func() { |
| 177 | defer close(out) |
| 178 | |
| 179 | heads := make([]Event, len(inputs)) |
| 180 | |
| 181 | // pre-fill the head view |
| 182 | for i, input := range inputs { |
| 183 | if event, ok := <-input; ok { |
| 184 | heads[i] = event |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | for { |
| 189 | var earliestEvent Event |
| 190 | var originChannel int |
| 191 | |
| 192 | // pick the earliest event of the heads |
| 193 | for i, head := range heads { |
| 194 | if head != nil && (earliestEvent == nil || head.CreatedAt().Before(earliestEvent.CreatedAt())) { |
| 195 | earliestEvent = head |
| 196 | originChannel = i |
| 197 | } |
| 198 | } |
| 199 | |
| 200 | if earliestEvent == nil { |
| 201 | // no event anymore, we are done |
| 202 | return |
| 203 | } |
| 204 | |
| 205 | // we have an event: consume it and replace it if possible |
| 206 | heads[originChannel] = nil |
| 207 | if event, ok := <-inputs[originChannel]; ok { |
| 208 | heads[originChannel] = event |
| 209 | } |
| 210 | out <- earliestEvent |
| 211 | } |
| 212 | }() |
| 213 | |
| 214 | return out |
| 215 | } |