(
ctx: ExecutionContext,
block: SerializedBlock,
backend: PiBackendRun<P>,
params: P,
memoryConfig: PiMemoryConfig
)
| 193 | } |
| 194 | |
| 195 | private async runPi<P extends PiRunParams>( |
| 196 | ctx: ExecutionContext, |
| 197 | block: SerializedBlock, |
| 198 | backend: PiBackendRun<P>, |
| 199 | params: P, |
| 200 | memoryConfig: PiMemoryConfig |
| 201 | ): Promise<BlockOutput | StreamingExecution> { |
| 202 | const startTime = Date.now() |
| 203 | const startTimeISO = new Date(startTime).toISOString() |
| 204 | |
| 205 | logger.info('Executing Pi block', { |
| 206 | blockId: block.id, |
| 207 | mode: params.mode, |
| 208 | model: params.model, |
| 209 | workflowId: ctx.workflowId, |
| 210 | executionId: ctx.executionId, |
| 211 | }) |
| 212 | |
| 213 | if (this.isContentSelectedForStreaming(ctx, block)) { |
| 214 | const output: NormalizedBlockOutput = { content: '', model: params.model } |
| 215 | const stream = new ReadableStream<Uint8Array>({ |
| 216 | start: async (controller) => { |
| 217 | const encoder = new TextEncoder() |
| 218 | try { |
| 219 | const result = await backend(params, { |
| 220 | onEvent: (event) => { |
| 221 | const text = streamTextForEvent(event) |
| 222 | if (text) controller.enqueue(encoder.encode(text)) |
| 223 | }, |
| 224 | signal: ctx.abortSignal, |
| 225 | }) |
| 226 | if (result.totals.errorMessage) { |
| 227 | controller.error(new Error(result.totals.errorMessage)) |
| 228 | return |
| 229 | } |
| 230 | Object.assign( |
| 231 | output, |
| 232 | this.buildOutput(result, params.model, params.isBYOK, startTime, startTimeISO) |
| 233 | ) |
| 234 | await appendPiMemory(ctx, memoryConfig, params.task, result.totals.finalText) |
| 235 | controller.close() |
| 236 | } catch (error) { |
| 237 | controller.error(error) |
| 238 | } |
| 239 | }, |
| 240 | }) |
| 241 | |
| 242 | return { |
| 243 | stream, |
| 244 | execution: { |
| 245 | success: true, |
| 246 | output, |
| 247 | blockId: block.id, |
| 248 | logs: [], |
| 249 | metadata: { startTime: startTimeISO, duration: 0 }, |
| 250 | isStreaming: true, |
| 251 | } as StreamingExecution['execution'] & { blockId: string }, |
| 252 | } |
no test coverage detected