GetOperationEvents implements GET /v1/events/operations
(ctx context.Context, req *connect.Request[emptypb.Empty], resp *connect.ServerStream[v1.OperationEvent])
| 362 | |
| 363 | // GetOperationEvents implements GET /v1/events/operations |
| 364 | func (s *BackrestHandler) GetOperationEvents(ctx context.Context, req *connect.Request[emptypb.Empty], resp *connect.ServerStream[v1.OperationEvent]) error { |
| 365 | errChan := make(chan error, 1) |
| 366 | events := make(chan *v1.OperationEvent, 100) |
| 367 | |
| 368 | timer := time.NewTicker(60 * time.Second) |
| 369 | defer timer.Stop() |
| 370 | |
| 371 | callback := func(ops []*v1.Operation, eventType oplog.OperationEvent) { |
| 372 | var event *v1.OperationEvent |
| 373 | switch eventType { |
| 374 | case oplog.OPERATION_ADDED: |
| 375 | event = &v1.OperationEvent{ |
| 376 | Event: &v1.OperationEvent_CreatedOperations{ |
| 377 | CreatedOperations: &v1.OperationList{ |
| 378 | Operations: ops, |
| 379 | }, |
| 380 | }, |
| 381 | } |
| 382 | case oplog.OPERATION_UPDATED: |
| 383 | event = &v1.OperationEvent{ |
| 384 | Event: &v1.OperationEvent_UpdatedOperations{ |
| 385 | UpdatedOperations: &v1.OperationList{ |
| 386 | Operations: ops, |
| 387 | }, |
| 388 | }, |
| 389 | } |
| 390 | case oplog.OPERATION_DELETED: |
| 391 | ids := make([]int64, len(ops)) |
| 392 | for i, o := range ops { |
| 393 | ids[i] = o.Id |
| 394 | } |
| 395 | |
| 396 | event = &v1.OperationEvent{ |
| 397 | Event: &v1.OperationEvent_DeletedOperations{ |
| 398 | DeletedOperations: &types.Int64List{ |
| 399 | Values: ids, |
| 400 | }, |
| 401 | }, |
| 402 | } |
| 403 | default: |
| 404 | zap.L().Error("Unknown event type") |
| 405 | } |
| 406 | |
| 407 | select { |
| 408 | case events <- event: |
| 409 | default: |
| 410 | select { |
| 411 | case errChan <- errors.New("event buffer overflow, closing stream for client retry and catchup"): |
| 412 | default: |
| 413 | } |
| 414 | } |
| 415 | } |
| 416 | |
| 417 | s.oplog.Subscribe(oplog.SelectAll, &callback) |
| 418 | defer func() { |
| 419 | if err := s.oplog.Unsubscribe(&callback); err != nil { |
| 420 | zap.L().Error("failed to unsubscribe from oplog", zap.Error(err)) |
| 421 | } |
nothing calls this directly
no test coverage detected