WriteProtoNodeStatusEvent writes a node status event into the error stream.
(r *observerpb.GetFlowsResponse)
| 438 | |
| 439 | // WriteProtoNodeStatusEvent writes a node status event into the error stream. |
| 440 | func (p *Printer) WriteProtoNodeStatusEvent(r *observerpb.GetFlowsResponse) error { |
| 441 | s := r.GetNodeStatus() |
| 442 | if s == nil { |
| 443 | return errors.New("not a node status event") |
| 444 | } |
| 445 | |
| 446 | if !p.opts.enableDebug { |
| 447 | switch s.GetStateChange() { |
| 448 | case relaypb.NodeState_NODE_ERROR, relaypb.NodeState_NODE_UNAVAILABLE: |
| 449 | break |
| 450 | default: |
| 451 | // skips informal messages in non-debug mode |
| 452 | return nil |
| 453 | } |
| 454 | } |
| 455 | |
| 456 | switch p.opts.output { |
| 457 | case JSONPBOutput: |
| 458 | return json.NewEncoder(p.opts.werr).Encode(r) |
| 459 | case DictOutput: |
| 460 | w := p.createStderrWriter() |
| 461 | // this is a bit crude, but in case stdout and stderr are interleaved, |
| 462 | // we want to make sure the separators are still printed to clearly |
| 463 | // separate flows from node events. |
| 464 | if p.line != 0 { |
| 465 | w.print(dictSeparator + "\n") |
| 466 | } else { |
| 467 | p.line++ |
| 468 | } |
| 469 | nodeNames := joinWithCutOff(s.GetNodeNames(), ", ", nodeNamesCutOff) |
| 470 | message := "N/A" |
| 471 | if m := s.GetMessage(); len(m) != 0 { |
| 472 | message = strconv.Quote(m) |
| 473 | } |
| 474 | w.print( |
| 475 | " TIMESTAMP: ", fmtTimestamp(p.opts.timeFormat, r.GetTime()), newline, |
| 476 | " STATE: ", s.GetStateChange().String(), newline, |
| 477 | " NODES: ", nodeNames, newline, |
| 478 | " MESSAGE: ", message, newline, |
| 479 | ) |
| 480 | if w.err != nil { |
| 481 | return fmt.Errorf("failed to write out node status: %w", w.err) |
| 482 | } |
| 483 | case TabOutput, CompactOutput: |
| 484 | w := p.createStderrWriter() |
| 485 | numNodes := len(s.GetNodeNames()) |
| 486 | nodeNames := joinWithCutOff(s.GetNodeNames(), ", ", nodeNamesCutOff) |
| 487 | prefix := fmt.Sprintf("%s [%s]", fmtTimestamp(p.opts.timeFormat, r.GetTime()), r.GetNodeName()) |
| 488 | msg := fmt.Sprintf("%s: unknown node status event: %+v", prefix, s) |
| 489 | switch s.GetStateChange() { |
| 490 | case relaypb.NodeState_NODE_CONNECTED: |
| 491 | msg = fmt.Sprintf("%s: Receiving flows from %d nodes: %s", prefix, numNodes, nodeNames) |
| 492 | case relaypb.NodeState_NODE_UNAVAILABLE: |
| 493 | msg = fmt.Sprintf("%s: %d nodes are unavailable: %s", prefix, numNodes, nodeNames) |
| 494 | case relaypb.NodeState_NODE_GONE: |
| 495 | msg = fmt.Sprintf("%s: %d nodes removed from cluster: %s", prefix, numNodes, nodeNames) |
| 496 | case relaypb.NodeState_NODE_ERROR: |
| 497 | msg = fmt.Sprintf("%s: Error %q on %d nodes: %s", prefix, s.GetMessage(), numNodes, nodeNames) |
no test coverage detected