MCPcopy
hub / github.com/NateBJones-Projects/OB1 / handleExecuteJob

Function handleExecuteJob

integrations/smart-ingest/index.ts:895–988  ·  view source on GitHub ↗
(req: Request)

Source from the content-addressed store, hash-verified

893// ── Execute a dry-run job ───────────────────────────────────────────────────
894
895async function handleExecuteJob(req: Request): Promise<Response> {
896 let body: Record<string, unknown>;
897 try { body = await req.json(); } catch { return json({ error: "Invalid JSON body" }, 400); }
898
899 const jobId = typeof body.job_id === "number" ? body.job_id : 0;
900 if (!jobId) return json({ error: "job_id is required" }, 400);
901
902 const { data: job, error: jobErr } = await supabase
903 .from("ingestion_jobs").select("*").eq("id", jobId).single();
904 if (jobErr || !job) return json({ error: `Job #${jobId} not found` }, 404);
905 if (job.status === "complete") return json({ ...job, message: "Job already complete" }, 200);
906 if (job.status !== "dry_run_complete") {
907 return json({ error: `Job status is '${job.status}', expected 'dry_run_complete'` }, 400);
908 }
909
910 const { data: itemRows } = await supabase
911 .from("ingestion_items").select("*").eq("job_id", jobId).order("id");
912 const items = itemRows ?? [];
913
914 // CAS: only transition dry_run_complete -> executing; concurrent requests get 409
915 const { data: casRow, error: casErr } = await supabase
916 .from("ingestion_jobs")
917 .update({ status: "executing" })
918 .eq("id", jobId)
919 .eq("status", "dry_run_complete")
920 .select("id, status")
921 .maybeSingle();
922 if (casErr || !casRow || casRow.status !== "executing") {
923 return json({ error: "Job execution conflict — another request may have claimed this job" }, 409);
924 }
925
926 let addedCount = 0, skippedCount = 0, appendedCount = 0, revisedCount = 0;
927 const sourceLabel = job.source_label ?? null;
928 const jobMeta = (job.metadata ?? {}) as Record<string, unknown>;
929 const sourceType = jobMeta.source_type as string ?? "smart_ingest";
930 const skipClassification = body.skip_classification === true || jobMeta.skip_classification === true;
931 const jobSourceMetadata = (jobMeta.source_client || jobMeta.capture_mode)
932 ? jobMeta as Record<string, unknown>
933 : null;
934
935 for (const item of items) {
936 if (item.action === "skip") { skippedCount++; continue; }
937 try {
938 const fakeItem: IngestionItem = {
939 content: item.extracted_content,
940 type: sanitizeType((item.metadata as Record<string, unknown>)?.type),
941 importance: sanitizeImportance((item.metadata as Record<string, unknown>)?.importance),
942 tags: sanitizeTags((item.metadata as Record<string, unknown>)?.tags),
943 source_snippet: sanitizeSourceSnippet((item.metadata as Record<string, unknown>)?.source_snippet),
944 content_fingerprint: "",
945 action: item.action as ReconcileAction,
946 reason: item.reason ?? "",
947 matched_thought_id: item.matched_thought_id,
948 similarity_score: item.similarity_score,
949 status: "pending",
950 error_message: null,
951 };
952 let embedding: number[] = [];

Callers 1

index.tsFile · 0.85

Calls 9

embedTextFunction · 0.90
sanitizeImportanceFunction · 0.85
sanitizeTagsFunction · 0.85
sanitizeSourceSnippetFunction · 0.85
executeItemFunction · 0.85
updateJobByIdFunction · 0.85
scheduleEntityExtractionFunction · 0.85
jsonFunction · 0.70
sanitizeTypeFunction · 0.70

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…