(argument: json.JsonObject, context: JobHandlerContext)
| 64 | }; |
| 65 | |
| 66 | function handler(argument: json.JsonObject, context: JobHandlerContext) { |
| 67 | // Add input validation to the inbound bus. |
| 68 | const inboundBusWithInputValidation = context.inboundBus.pipe( |
| 69 | concatMap(async (message) => { |
| 70 | if (message.kind === JobInboundMessageKind.Input) { |
| 71 | const v = message.value as BuilderInput; |
| 72 | const options = mergeOptions(baseOptions, v.options); |
| 73 | |
| 74 | // Validate v against the options schema. |
| 75 | const validation = await registry.compile(info.optionSchema); |
| 76 | const validationResult = await validation(options); |
| 77 | const { data, success, errors } = validationResult; |
| 78 | |
| 79 | if (!success) { |
| 80 | throw new json.schema.SchemaValidationException(errors); |
| 81 | } |
| 82 | |
| 83 | return { ...message, value: { ...v, options: data } } as JobInboundMessage<BuilderInput>; |
| 84 | } else { |
| 85 | return message as JobInboundMessage<BuilderInput>; |
| 86 | } |
| 87 | }), |
| 88 | // Using a share replay because the job might be synchronously sending input, but |
| 89 | // asynchronously listening to it. |
| 90 | shareReplay(1), |
| 91 | ); |
| 92 | |
| 93 | // Make an inboundBus that completes instead of erroring out. |
| 94 | // We'll merge the errors into the output instead. |
| 95 | const inboundBus = onErrorResumeNext(inboundBusWithInputValidation); |
| 96 | |
| 97 | const output = from(host.loadBuilder(info)).pipe( |
| 98 | concatMap((builder) => { |
| 99 | if (builder === null) { |
| 100 | throw new Error(`Cannot load builder for builderInfo ${JSON.stringify(info, null, 2)}`); |
| 101 | } |
| 102 | |
| 103 | return builder.handler(argument, { ...context, inboundBus }).pipe( |
| 104 | map((output) => { |
| 105 | if (output.kind === JobOutboundMessageKind.Output) { |
| 106 | // Add target to it. |
| 107 | return { |
| 108 | ...output, |
| 109 | value: { |
| 110 | ...output.value, |
| 111 | ...(target ? { target } : 0), |
| 112 | } as unknown as json.JsonObject, |
| 113 | }; |
| 114 | } else { |
| 115 | return output; |
| 116 | } |
| 117 | }), |
| 118 | ); |
| 119 | }), |
| 120 | // Share subscriptions to the output, otherwise the handler will be re-run. |
| 121 | shareReplay(), |
| 122 | ); |
| 123 |
no test coverage detected