( body: z.infer<typeof IngestBodyRequest>, userId: string, workspaceId: string, activityId?: string, ingestionQueueId?: string, )
| 9 | |
| 10 | // Used in the trigger |
| 11 | export const addToQueue = async ( |
| 12 | body: z.infer<typeof IngestBodyRequest>, |
| 13 | userId: string, |
| 14 | workspaceId: string, |
| 15 | activityId?: string, |
| 16 | ingestionQueueId?: string, |
| 17 | ) => { |
| 18 | if (!workspaceId) { |
| 19 | throw new Error( |
| 20 | "Workspace ID is required to create an ingestion queue entry.", |
| 21 | ); |
| 22 | } |
| 23 | |
| 24 | // Check if workspace has sufficient credits before processing |
| 25 | const hasSufficientCredits = await hasCredits( |
| 26 | workspaceId, |
| 27 | userId, |
| 28 | "addEpisode", |
| 29 | ); |
| 30 | |
| 31 | if (!hasSufficientCredits) { |
| 32 | throw new Error("no credits"); |
| 33 | } |
| 34 | |
| 35 | let labels: string[] = body.labelIds ?? []; |
| 36 | |
| 37 | if (body.sessionId) { |
| 38 | const lastEpisode = await prisma.ingestionQueue.findFirst({ |
| 39 | where: { |
| 40 | sessionId: body.sessionId, |
| 41 | }, |
| 42 | orderBy: { |
| 43 | createdAt: "desc", |
| 44 | }, |
| 45 | }); |
| 46 | |
| 47 | if (lastEpisode?.labels && lastEpisode?.labels.length > 0) { |
| 48 | labels = lastEpisode?.labels; |
| 49 | } |
| 50 | } |
| 51 | |
| 52 | // Validate label access if labelIds are provided |
| 53 | if (body.labelIds && body.labelIds.length > 0) { |
| 54 | const labelService = new LabelService(); |
| 55 | const hasAccess = await labelService.validateLabelAccess( |
| 56 | body.labelIds, |
| 57 | workspaceId, |
| 58 | ); |
| 59 | |
| 60 | if (!hasAccess) { |
| 61 | throw new Error( |
| 62 | "One or more labels are invalid or not accessible in this workspace", |
| 63 | ); |
| 64 | } |
| 65 | } |
| 66 | |
| 67 | // Upsert: update existing or create new ingestion queue entry |
| 68 | const queuePersist = await prisma.ingestionQueue.upsert({ |
nothing calls this directly
no test coverage detected