(data: Record<string, any> | Record<string, any>[])
| 267 | } |
| 268 | |
| 269 | async push(data: Record<string, any> | Record<string, any>[]): Promise<void> { |
| 270 | this.isPushDataUsed = true |
| 271 | if (!this.stream) { |
| 272 | this.taskFilePath = TaskResults.generateTaskFilePath(this.taskId) |
| 273 | this.stream = new NDJSONWriteStream(this.taskFilePath) |
| 274 | } |
| 275 | |
| 276 | const items = normalizeData(data) |
| 277 | |
| 278 | for (const item of items) { |
| 279 | // Handle deduplication if removeDuplicatesBy is set |
| 280 | if (this.removeDuplicatesBy && this.removeDuplicatesBy in item && !isNullish(item[this.removeDuplicatesBy])) { |
| 281 | const key = item[this.removeDuplicatesBy] |
| 282 | if (this.seen.has(key)) continue |
| 283 | this.seen.add(key) |
| 284 | } |
| 285 | |
| 286 | // Collect keys for normalization |
| 287 | const itemKeys = Object.keys(item) |
| 288 | if (this.allKeysMapping === null) { |
| 289 | // First item: initialize with its keys |
| 290 | this.allKeysMapping = createKeyToNullMapping(item) |
| 291 | this.firstItemKeyCount = itemKeys.length |
| 292 | } else { |
| 293 | // Add any new keys |
| 294 | populateMissingKeys(itemKeys, this.allKeysMapping) |
| 295 | } |
| 296 | |
| 297 | await this.stream.push(item) |
| 298 | this.itemCount++ |
| 299 | } |
| 300 | |
| 301 | // Report count update after each push call |
| 302 | await this.reportCountUpdate() |
| 303 | } |
| 304 | |
| 305 | private async reportCountUpdate(): Promise<void> { |
| 306 | if (!this.onResultCountUpdate) return |
no test coverage detected