Seal every still-open bucket whose hour has fully elapsed. A bucket ``[t, t+1h)`` is finalized once ``now >= t + 1h + grace``: the hour is over and the grace window has absorbed any late-arriving event, so nothing more can land in it. Run on the collector tick *after*
(
cls,
session,
meter_key: str,
now: datetime,
grace_seconds: int,
)
| 206 | |
| 207 | @classmethod |
| 208 | async def seal_due( |
| 209 | cls, |
| 210 | session, |
| 211 | meter_key: str, |
| 212 | now: datetime, |
| 213 | grace_seconds: int, |
| 214 | ) -> None: |
| 215 | """Seal every still-open bucket whose hour has fully elapsed. |
| 216 | |
| 217 | A bucket ``[t, t+1h)`` is finalized once ``now >= t + 1h + grace``: the |
| 218 | hour is over and the grace window has absorbed any late-arriving event, |
| 219 | so nothing more can land in it. Run on the collector tick *after* |
| 220 | settling, so a still-running resource's current hour is written before |
| 221 | it becomes eligible. Idempotent — only NULL ``sealed_at`` rows of this |
| 222 | meter are touched. |
| 223 | """ |
| 224 | cutoff = now - timedelta(hours=1, seconds=grace_seconds) |
| 225 | await session.exec( |
| 226 | update(cls) |
| 227 | .where( |
| 228 | cls.meter_key == meter_key, |
| 229 | cls.sealed_at.is_(None), |
| 230 | cls.bucket_start <= cutoff, |
| 231 | ) |
| 232 | .values(sealed_at=now) |
| 233 | ) |
| 234 | await session.commit() |
| 235 | |
| 236 | |
| 237 | # --------------------------------------------------------------------------- |