postStreamProcessing handles the post-stream processing of data received from the buffer into the local BadgerDB. It loads the schema, updates the membership state, informs zero about tablets, resets caches, applies initial schema, applies initial types, and resets the GQL schema store.
(ctx context.Context)
| 493 | // It loads the schema, updates the membership state, informs zero about tablets, resets caches, applies initial schema, |
| 494 | // applies initial types, and resets the GQL schema store. |
| 495 | func postStreamProcessing(ctx context.Context) error { |
| 496 | glog.Info("[import:flush] post stream processing") |
| 497 | if err := schema.LoadFromDb(ctx); err != nil { |
| 498 | return errors.Wrapf(err, "cannot load schema after streaming data") |
| 499 | } |
| 500 | if err := UpdateMembershipState(ctx); err != nil { |
| 501 | return errors.Wrapf(err, "cannot update membership state after streaming data") |
| 502 | } |
| 503 | |
| 504 | gr.informZeroAboutTablets() |
| 505 | posting.ResetCache() |
| 506 | ResetAclCache() |
| 507 | groups().applyInitialSchema() |
| 508 | groups().applyInitialTypes() |
| 509 | ResetGQLSchemaStore() |
| 510 | glog.Info("[import:flush] post stream processing done") |
| 511 | return nil |
| 512 | } |
| 513 | |
| 514 | // streamInGroup handles the streaming of data within a group. |
| 515 | // This function is called on both leader and follower nodes with different behaviors: |
no test coverage detected