(request: Entity.Request<any>)
| 332 | }, |
| 333 | |
| 334 | activity(request: Entity.Request<any>) { |
| 335 | const activityId = `${executionId}/${request.payload.name}` |
| 336 | const instance = WorkflowInstance.initial(workflow, executionId) |
| 337 | interruptedActivities.delete(activityId) |
| 338 | return Effect.gen(function*() { |
| 339 | let entry = activities.get(activityId) |
| 340 | while (!entry) { |
| 341 | const latch = Effect.unsafeMakeLatch() |
| 342 | activityLatches.set(activityId, latch) |
| 343 | yield* latch.await |
| 344 | entry = activities.get(activityId) |
| 345 | } |
| 346 | const contextMap = new Map(entry.runtime.context.unsafeMap) |
| 347 | contextMap.set(Activity.CurrentAttempt.key, request.payload.attempt) |
| 348 | contextMap.set(WorkflowInstance.key, instance) |
| 349 | const runtime = Runtime.make({ |
| 350 | context: Context.unsafeMake(contextMap), |
| 351 | fiberRefs: entry.runtime.fiberRefs, |
| 352 | runtimeFlags: Runtime.defaultRuntimeFlags |
| 353 | }) |
| 354 | return yield* entry.activity.executeEncoded.pipe( |
| 355 | Effect.provide(runtime) |
| 356 | ) |
| 357 | }).pipe( |
| 358 | Workflow.intoResult, |
| 359 | Effect.catchAllCause((cause) => { |
| 360 | const interruptors = Cause.interruptors(cause) |
| 361 | // we only want to store interrupts as suspends when the |
| 362 | // client requested it |
| 363 | const ids = Array.from(interruptors, (id) => Array.from(FiberId.ids(id))).flat() |
| 364 | const suspend = ids.includes(RpcServer.fiberIdClientInterrupt.id) |
| 365 | if (suspend) { |
| 366 | interruptedActivities.add(activityId) |
| 367 | return Effect.succeed(new Workflow.Suspended()) |
| 368 | } |
| 369 | return Effect.failCause(cause) |
| 370 | }), |
| 371 | Effect.provideService(WorkflowInstance, instance), |
| 372 | Effect.provideService(Activity.CurrentAttempt, request.payload.attempt), |
| 373 | Effect.ensuring(Effect.sync(() => { |
| 374 | activities.delete(activityId) |
| 375 | })), |
| 376 | Rpc.wrap({ |
| 377 | fork: true, |
| 378 | uninterruptible: true |
| 379 | }) |
| 380 | ) |
| 381 | }, |
| 382 | |
| 383 | deferred: Effect.fnUntraced(function*(request: Entity.Request<any>) { |
| 384 | yield* ensureSuccess(resume(workflow, executionId)) |
nothing calls this directly
no test coverage detected
searching dependent graphs…