( scheduler: ReturnType<typeof createScheduler>, rules: CompiledRule[], bus: ReturnType<typeof createEventBus>, cronProbeResolver: CronProbeResolver, )
| 2147 | * RAILWAY_PROJECT_ID, RAILWAY_ENVIRONMENT_ID, AIMOCK_URL). |
| 2148 | */ |
| 2149 | export async function diffCronSchedules( |
| 2150 | scheduler: ReturnType<typeof createScheduler>, |
| 2151 | rules: CompiledRule[], |
| 2152 | bus: ReturnType<typeof createEventBus>, |
| 2153 | cronProbeResolver: CronProbeResolver, |
| 2154 | ): Promise<void> { |
| 2155 | const desired = new Map< |
| 2156 | string, |
| 2157 | { schedule: string; ruleId: string; dimension: string } |
| 2158 | >(); |
| 2159 | for (const r of rules) { |
| 2160 | for (const [idx, cron] of r.cronTriggers.entries()) { |
| 2161 | const id = `${r.id}:cron:${idx}`; |
| 2162 | desired.set(id, { |
| 2163 | schedule: cron.schedule, |
| 2164 | ruleId: r.id, |
| 2165 | dimension: r.signal.dimension, |
| 2166 | }); |
| 2167 | } |
| 2168 | } |
| 2169 | const currentIds = scheduler.list().map((e) => e.id); |
| 2170 | for (const id of currentIds) { |
| 2171 | if (!desired.has(id) && id.includes(":cron:")) { |
| 2172 | // R2-B.1: await the unregister, mirroring the CR-A2.2 fix on the |
| 2173 | // probe-rule path. Pre-fix this was fire-and-forget — a rejection |
| 2174 | // became an unhandled rejection AND the orphan got no structured |
| 2175 | // log. On rejection we log (operators get a first-class signal) |
| 2176 | // and leave the entry in scheduler.list(); next diff sweep will |
| 2177 | // retry. Same design choice as CR-A2.2: the orphan stays VISIBLE |
| 2178 | // rather than being silently dropped from bookkeeping. |
| 2179 | try { |
| 2180 | await scheduler.unregister(id); |
| 2181 | } catch (err) { |
| 2182 | logger.error("orchestrator.cron-unregister-failed", { |
| 2183 | id, |
| 2184 | err: String(err), |
| 2185 | }); |
| 2186 | // R2-B.4: dedicated `scheduler_unregister_failures_total` counter |
| 2187 | // would belong here, but adding it requires extending metrics.ts |
| 2188 | // COUNTER_NAMES (out of scope for this fix). Defer to bucket-b |
| 2189 | // follow-up; the structured log above is the first-class |
| 2190 | // observability surface in the meantime. |
| 2191 | } |
| 2192 | } |
| 2193 | } |
| 2194 | for (const [id, { schedule, ruleId, dimension }] of desired) { |
| 2195 | const invoker = cronProbeResolver(dimension); |
| 2196 | // A2: wrap `scheduler.register` per-rule so a single typoed cron |
| 2197 | // expression (validateCron throws synchronously inside register()) |
| 2198 | // does NOT poison the rest of the reload. Pre-fix, one bad rule |
| 2199 | // aborted the for-loop → every subsequent rule silently unscheduled. |
| 2200 | // Pinned by the "continues registering subsequent rules" regression |
| 2201 | // test in orchestrator.test.ts — do not remove without updating it. |
| 2202 | try { |
| 2203 | scheduler.register({ |
| 2204 | id, |
| 2205 | cron: schedule, |
| 2206 | handler: async () => { |
no test coverage detected
searching dependent graphs…