MCPcopy
hub / github.com/probelabs/goreplay / parse

Method parse

input_file.go:70–137  ·  view source on GitHub ↗
(init chan struct{})

Source from the content-addressed store, hash-verified

68}
69
70func (f *fileInputReader) parse(init chan struct{}) error {
71 payloadSeparatorAsBytes := []byte(payloadSeparator)
72 var buffer bytes.Buffer
73 var initialized bool
74
75 lineNum := 0
76
77 for {
78 line, err := f.reader.ReadBytes('\n')
79 lineNum++
80
81 if err != nil {
82 if err != io.EOF {
83 Debug(1, err)
84 }
85
86 f.Close()
87
88 if !initialized {
89 close(init)
90 initialized = true
91 }
92
93 return err
94 }
95
96 if bytes.Equal(payloadSeparatorAsBytes[1:], line) {
97 asBytes := buffer.Bytes()
98 meta := payloadMeta(asBytes)
99
100 if len(meta) < 3 {
101 Debug(1, fmt.Sprintf("Found malformed record, file: %s, line %d", f.path, lineNum))
102 buffer = bytes.Buffer{}
103 continue
104 }
105
106 timestamp, _ := strconv.ParseInt(string(meta[2]), 10, 64)
107 data := asBytes[:len(asBytes)-1]
108
109 f.queue.Lock()
110 heap.Push(&f.queue, &filePayload{
111 timestamp: timestamp,
112 data: data,
113 })
114 f.queue.Unlock()
115
116 for {
117 if f.queue.Len() < f.readDepth {
118 break
119 }
120
121 if !initialized {
122 close(init)
123 initialized = true
124 }
125
126 if !f.dryRun {
127 time.Sleep(100 * time.Millisecond)

Callers 1

newFileInputReaderFunction · 0.95

Calls 6

CloseMethod · 0.95
payloadMetaFunction · 0.85
PushMethod · 0.80
WriteMethod · 0.80
DebugFunction · 0.70
LenMethod · 0.45

Tested by

no test coverage detected