* Runs `run` under the run coordinator when one is configured. If the * coordinator declines this instance, the run is skipped and `onSkipped` is * emitted with `'not-elected'`. Fail-closed: if `shouldRun` throws, the run is * skipped with `'coordinator-error'`. `onComplete` runs after the
(slot: Date, run: () => Promise<any>)
| 112 | * skipped with `'coordinator-error'`. `onComplete` runs after the task. |
| 113 | */ |
| 114 | private async runCoordinated(slot: Date, run: () => Promise<any>): Promise<void> { |
| 115 | if (!this.runCoordinator) { |
| 116 | await run(); |
| 117 | return; |
| 118 | } |
| 119 | |
| 120 | const key = `${this.coordinatorKeyPrefix}:${slot.toISOString()}`; |
| 121 | let allowed: boolean; |
| 122 | try { |
| 123 | allowed = await this.runCoordinator.shouldRun(key, this.coordinatorTtl); |
| 124 | } catch (err: any) { |
| 125 | this.logger.error('Run coordinator failed; skipping execution (fail-closed)', err); |
| 126 | this.emitSkipped(slot, 'coordinator-error'); |
| 127 | return; |
| 128 | } |
| 129 | |
| 130 | if (!allowed) { |
| 131 | this.emitSkipped(slot, 'not-elected'); |
| 132 | return; |
| 133 | } |
| 134 | |
| 135 | try { |
| 136 | await run(); |
| 137 | } finally { |
| 138 | try { |
| 139 | await this.runCoordinator.onComplete?.(key); |
| 140 | } catch (err: any) { |
| 141 | this.logger.error('Run coordinator onComplete failed', err); |
| 142 | } |
| 143 | } |
| 144 | } |
| 145 | |
| 146 | private emitSkipped(slot: Date, reason: SkipReason){ |
| 147 | Promise.resolve(this.onSkipped(slot, reason)).catch((err) => this.onErrorFallback(slot, err)); |
no test coverage detected