Oracle streams the oracle state to the alphas. The first entry sent by Zero contains the entire state of transactions. Zero periodically confirms receipt from the group, and truncates its state. This 2-way acknowledgement is a safe way to get the status of all the transactions.
(_ *api.Payload, server pb.Zero_OracleServer)
| 443 | // confirms receipt from the group, and truncates its state. This 2-way acknowledgement is a |
| 444 | // safe way to get the status of all the transactions. |
| 445 | func (s *Server) Oracle(_ *api.Payload, server pb.Zero_OracleServer) error { |
| 446 | if !s.Node.AmLeader() { |
| 447 | return errNotLeader |
| 448 | } |
| 449 | ch, id := s.orc.newSubscriber() |
| 450 | defer s.orc.removeSubscriber(id) |
| 451 | |
| 452 | ctx := server.Context() |
| 453 | leaderChangeCh := s.leaderChangeChannel() |
| 454 | for { |
| 455 | select { |
| 456 | case <-leaderChangeCh: |
| 457 | return errNotLeader |
| 458 | case delta, open := <-ch: |
| 459 | if !open { |
| 460 | return errClosed |
| 461 | } |
| 462 | // Pass in the latest group checksum as well, so the Alpha can use that to determine |
| 463 | // when not to service a read. |
| 464 | delta.GroupChecksums = s.groupChecksums() |
| 465 | if err := server.Send(&delta); err != nil { |
| 466 | return err |
| 467 | } |
| 468 | case <-ctx.Done(): |
| 469 | return ctx.Err() |
| 470 | case <-s.closer.HasBeenClosed(): |
| 471 | return errServerShutDown |
| 472 | } |
| 473 | } |
| 474 | } |
| 475 | |
| 476 | // TryAbort attempts to abort the given transactions which are not already committed.. |
| 477 | func (s *Server) TryAbort(ctx context.Context, |
nothing calls this directly
no test coverage detected