MCPcopy Index your code
hub / github.com/OpenPipe/OpenPipe / defineTask

Function defineTask

app/src/server/tasks/defineTask.ts:16–51  ·  view source on GitHub ↗
(options: {
  id: string;
  handler: (payload: TPayload, helpers: Helpers) => Promise<void>;

  // This is run in the caller's process before the task is enqueued. Ensure
  // that nothing long-running happens here.
  beforeEnqueue?: (payload: TPayload) => Promise<void>;

  // Default values for things like `priority` and `maxAttempts`.
  specDefaults?: TaskSpec;
})

Source from the content-addressed store, hash-verified

14}
15
16function defineTask<TPayload>(options: {
17 id: string;
18 handler: (payload: TPayload, helpers: Helpers) => Promise<void>;
19
20 // This is run in the caller's process before the task is enqueued. Ensure
21 // that nothing long-running happens here.
22 beforeEnqueue?: (payload: TPayload) => Promise<void>;
23
24 // Default values for things like `priority` and `maxAttempts`.
25 specDefaults?: TaskSpec;
26}) {
27 const enqueue = async (payload: TPayload, spec?: TaskSpec) => {
28 const mergedSpec = merge({}, options.specDefaults, spec);
29
30 await options.beforeEnqueue?.(payload);
31
32 const utils = await workerUtils();
33 return await utils.addJob(options.id, payload, mergedSpec);
34 };
35
36 const handler = (payload: TPayload, helpers: Helpers) => {
37 helpers.logger.debug(`Running task ${options.id} with payload: ${JSON.stringify(payload)}`);
38 return options.handler(payload, helpers);
39 };
40
41 const task = {
42 identifier: options.id,
43 handler: handler as Task,
44 };
45
46 return {
47 runNow: async (payload: TPayload) => options.handler(payload, await workerUtils()),
48 enqueue,
49 task,
50 };
51}
52
53export default defineTask;

Calls 1

workerUtilsFunction · 0.85

Tested by

no test coverage detected