MCPcopy Index your code
hub / github.com/Effect-TS/effect / activity

Function activity

packages/cluster/src/ClusterWorkflowEngine.ts:334–381  ·  view source on GitHub ↗
(request: Entity.Request<any>)

Source from the content-addressed store, hash-verified

332 },
333
334 activity(request: Entity.Request<any>) {
335 const activityId = `${executionId}/${request.payload.name}`
336 const instance = WorkflowInstance.initial(workflow, executionId)
337 interruptedActivities.delete(activityId)
338 return Effect.gen(function*() {
339 let entry = activities.get(activityId)
340 while (!entry) {
341 const latch = Effect.unsafeMakeLatch()
342 activityLatches.set(activityId, latch)
343 yield* latch.await
344 entry = activities.get(activityId)
345 }
346 const contextMap = new Map(entry.runtime.context.unsafeMap)
347 contextMap.set(Activity.CurrentAttempt.key, request.payload.attempt)
348 contextMap.set(WorkflowInstance.key, instance)
349 const runtime = Runtime.make({
350 context: Context.unsafeMake(contextMap),
351 fiberRefs: entry.runtime.fiberRefs,
352 runtimeFlags: Runtime.defaultRuntimeFlags
353 })
354 return yield* entry.activity.executeEncoded.pipe(
355 Effect.provide(runtime)
356 )
357 }).pipe(
358 Workflow.intoResult,
359 Effect.catchAllCause((cause) => {
360 const interruptors = Cause.interruptors(cause)
361 // we only want to store interrupts as suspends when the
362 // client requested it
363 const ids = Array.from(interruptors, (id) => Array.from(FiberId.ids(id))).flat()
364 const suspend = ids.includes(RpcServer.fiberIdClientInterrupt.id)
365 if (suspend) {
366 interruptedActivities.add(activityId)
367 return Effect.succeed(new Workflow.Suspended())
368 }
369 return Effect.failCause(cause)
370 }),
371 Effect.provideService(WorkflowInstance, instance),
372 Effect.provideService(Activity.CurrentAttempt, request.payload.attempt),
373 Effect.ensuring(Effect.sync(() => {
374 activities.delete(activityId)
375 })),
376 Rpc.wrap({
377 fork: true,
378 uninterruptible: true
379 })
380 )
381 },
382
383 deferred: Effect.fnUntraced(function*(request: Entity.Request<any>) {
384 yield* ensureSuccess(resume(workflow, executionId))

Callers

nothing calls this directly

Calls 10

initialMethod · 0.80
failCauseMethod · 0.80
syncMethod · 0.80
wrapMethod · 0.80
pipeMethod · 0.65
getMethod · 0.65
setMethod · 0.65
makeMethod · 0.65
provideMethod · 0.65
addMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…