Saga & Outbox
This content is for 0.1. Switch to the latest version for up-to-date documentation.
Saga/outbox — erasure beyond the local transaction, always in a known state.
BackoffPolicy
Section titled “BackoffPolicy”class BackoffPolicy(BaseModel): base_delay: timedelta = timedelta(seconds=30) lease: timedelta = timedelta(minutes=5) max_delay: timedelta = timedelta(hours=1)Deterministic exponential backoff for outbox retries.
The delay after the n-th attempt is
min(base_delay * 2**(n - 1), max_delay) — no jitter, so tests can
pin exact schedules; concurrent runners are already spread by
SKIP LOCKED claiming.
Fields:
- base_delay (
timedelta): Delay after the first failed attempt. - lease (
timedelta): How long a claim protects anIN_FLIGHTentry from other runners. Must comfortably exceed the slowest expected resolver call: a lease that expires mid-call lets another runner execute the same entry again — converging (resolvers are idempotent) but wasteful and noisy. - max_delay (
timedelta): Ceiling the doubling never exceeds.
BackoffPolicy.delay
Section titled “BackoffPolicy.delay”def delay(attempts: int) -> timedeltaThe wait before the next try after attempts tries so far.
Args:
- attempts (
int): How many times the entry has been attempted (≥ 1).
Returns:
timedelta—min(base_delay * 2**(attempts - 1), max_delay).
Raises:
ValueError— Ifattemptsis not positive.
Outbox
Section titled “Outbox”class Outbox: def __init__(session_factory: sessionmaker, outbox: Table) -> NoneTransactional outbox in the application’s own database.
Entries are enqueued inside the caller’s transaction (atomically with
the local erasure) and claimed by the saga runner afterwards.
enqueue uses the caller’s session; the claim and bookkeeping
methods run outside any caller transaction and use the factory
(ADR 0006).
Outbox.claim_batch
Section titled “Outbox.claim_batch”def claim_batch(limit: int = 50, *, lease: timedelta = _DEFAULT_LEASE) -> Sequence[OutboxEntry]Atomically claim due entries for execution, oldest first.
An entry is due when its next_attempt_at gate is NULL or in
the past and it is not terminal: fresh PENDING entries, FAILED
entries whose backoff has elapsed, and IN_FLIGHT entries whose
claim lease has expired (a crashed runner’s work, healed here).
Claimed entries move to IN_FLIGHT with attempts incremented —
the claim is the attempt, so an entry that crashes its runner every
time still converges to abandonment — and next_attempt_at set to
now + lease.
On PostgreSQL the selection runs FOR UPDATE SKIP LOCKED, so
concurrent runners never double-claim. SQLite ignores row locking;
the no-double-claim guarantee holds only on dialects that support
FOR UPDATE.
Args:
- limit (
int): Maximum entries to claim in one batch. - lease (
timedelta): How long the claim protects the entries from other runners. Must comfortably exceed the slowest expected resolver call; a too-short lease causes double execution (absorbed by resolver idempotency, but wasteful).
Returns:
Sequence[OutboxEntry]— The claimed entries in their post-claim state, oldest first.
Outbox.enqueue
Section titled “Outbox.enqueue”def enqueue(session: Session, entries: Sequence[OutboxEntry]) -> NonePersist entries inside the caller’s open transaction.
Never commits — the entries become durable exactly when the
caller’s transaction does, and a rollback takes them with it.
The nested SubjectRef is flattened into the
table’s ref_kind/ref_value/ref_extra columns.
Args:
- session (
Session): The SAME session the local erasure runs in — that is the whole point; the entries commit or roll back with it. - entries (
Sequence[OutboxEntry]): The external calls to record.
Outbox.list_abandoned
Section titled “Outbox.list_abandoned”def list_abandoned(*, limit: int = 100) -> Sequence[OutboxEntry]Return abandoned entries for operator inspection, oldest first.
The read half of “abandoned loudly”: every entry whose retries are exhausted stays visible here (and in the audit trail) until it is handled out of band. Read-only by design — abandonment is permanent under ADR 0010, so there is deliberately no requeue surface; what an abandoned erasure requires is a determination only you can make.
Args:
- limit (
int): Maximum entries to return.
Returns:
Sequence[OutboxEntry]—ABANDONEDentries, oldest first (byenqueued_at, thenSequence[OutboxEntry]—entry_id).
Outbox.mark_abandoned
Section titled “Outbox.mark_abandoned”def mark_abandoned(entry: OutboxEntry, *, error: str) -> NoneRecord a terminal failure; the entry is never retried.
Abandonment is loud by contract: the runner appends the
ERASURE_STEP_FAILED audit event before calling this, so an
abandoned entry always has its audit record.
Args:
- entry (
OutboxEntry): The claimed entry whose retries are exhausted or whose failure is non-retryable. - error (
str): The exception class name — never its message.
Outbox.mark_failed
Section titled “Outbox.mark_failed”def mark_failed(entry: OutboxEntry, *, error: str, next_attempt_at: datetime) -> NoneRecord a retryable failure and schedule the next attempt.
Args:
- entry (
OutboxEntry): The claimed entry whose resolver call failed. - error (
str): The exception class name — never its message, which may embed PII. - next_attempt_at (
datetime): When the entry becomes claimable again (the backoff schedule).
Outbox.mark_succeeded
Section titled “Outbox.mark_succeeded”def mark_succeeded(entry: OutboxEntry, *, on_subject_complete: Callable[[], None]) -> NoneRecord a successful external call; detect subject completion.
Runs in one transaction that first locks all of the subject’s
entries FOR UPDATE ordered by entry_id — two runners
finishing the same subject’s last two entries serialize on the same
lock order instead of deadlocking, so exactly one of them observes
the all-succeeded transition. If, after this update, every entry
for the subject is SUCCEEDED (an ABANDONED sibling blocks
completion permanently), on_subject_complete is invoked inside
the open transaction; if it raises, the update rolls back and the
entry stays IN_FLIGHT for the lease to heal — a success is
never recorded without its completion check.
With the default DatabaseAuditSink the callback
opens a second pooled connection while this transaction still
holds the first — size the pool for two connections per concurrent
runner thread. An exhausted pool times out, rolls back, and the
lease heals the entry, but the work is wasted.
Args:
- entry (
OutboxEntry): The claimed entry whose resolver call succeeded. - on_subject_complete (
Callable[[], None]): Invoked at most once, while the subject’s rows are still locked, when its last entry lands.
Outbox.status_counts
Section titled “Outbox.status_counts”def status_counts() -> dict[OutboxStatus, int]Count entries per lifecycle status, for dashboards and health checks.
Read-only. Every OutboxStatus member is present in
the result, zero-filled — a healthy, drained outbox reports explicit
zeros rather than missing keys. A growing ABANDONED count is the
operator signal that erasures need out-of-band attention.
Returns:
dict[OutboxStatus, int]— A mapping with one entry per status.
OutboxEntry
Section titled “OutboxEntry”class OutboxEntry(BaseModel): attempts: int = Field(default=0, ge=0) enqueued_at: datetime entry_id: UUID last_attempt_at: datetime | None = None last_error: str | None = None next_attempt_at: datetime | None = None ref: SubjectRef resolver: str = Field(min_length=1, max_length=255) status: OutboxStatus = OutboxStatus.PENDING subject_id: str = Field(min_length=1, max_length=255)One durable external call awaiting (or done with) execution.
Entries are written in the same transaction as the local erasure, so an erasure is never half-recorded: either the local delete and all its external follow-ups are committed together, or none are.
Fields:
- attempts (
int): How many times the call has been claimed for execution. - enqueued_at (
datetime): When the entry was committed (UTC). - entry_id (
UUID): Unique id; doubles as the idempotency key for the call. - last_attempt_at (
datetime | None): When the last try started, if any (UTC). - last_error (
str | None): Short error note from the last failed try — the exception class name only, never its message (no PII). - next_attempt_at (
datetime | None): Earliest instant any runner may (re)claim the entry (UTC).Nonemeans due immediately. Doubles as the crash lease whileIN_FLIGHTand as the backoff schedule whileFAILED; terminal entries carryNone. - ref (
SubjectRef): The subject reference to pass to it. - resolver (
str): Which resolver will perform the call. - status (
OutboxStatus): Current lifecycle state. - subject_id (
str): The erased subject’s identifier, exactly as passed toerase_subject— groups a subject’s entries so the runner can tell when its last one lands. Must be globally unique per data subject: two subjects sharing a value would be treated as one completion group.
OutboxStatus
Section titled “OutboxStatus”class OutboxStatus(StrEnum): ...Lifecycle of one durable external-call entry.
| Member | Value | Description |
|---|---|---|
PENDING | pending | Enqueued, not yet attempted. |
IN_FLIGHT | in_flight | Claimed by a runner; being attempted right now. |
SUCCEEDED | succeeded | The external call completed (including “already gone”). |
FAILED | failed | Last attempt failed; will be retried with backoff. |
ABANDONED | abandoned | Retries exhausted; surfaced loudly for operator action — never silently dropped, the audit trail records the abandonment. |
SagaRunner
Section titled “SagaRunner”class SagaRunner: def __init__(registry: ResolverRegistry, outbox: Outbox, audit: AuditSink, *, max_attempts: int = 8, batch_size: int = 50, backoff: BackoffPolicy | None = None) -> NoneExecutes outbox entries with retries, backoff, and idempotency.
Designed to be driven by whatever the application already has — a
background task, a worker process, a cron job. One call to
run_once processes one batch; the runner owns no event loop.
Failure taxonomy (ADR 0010): ResolverError — raised
by a resolver for a non-retryable failure, or by the registry for an
unknown resolver name — abandons the entry immediately; any other
exception is treated as transient and retried with exponential backoff
until max_attempts, then abandoned. Every terminal outcome is
audited; an abandonment is never silent.
SagaRunner.run_once
Section titled “SagaRunner.run_once”async def run_once() -> intClaim and execute one batch of due entries.
Each entry’s resolver call is idempotent (the entry id is the
idempotency key), so a crash between execution and bookkeeping is
safe — the entry stays IN_FLIGHT, its claim lease expires, and
the retry converges on the same outcome.
Per entry: success appends ERASURE_STEP_SUCCEEDED and — when
the subject’s last entry lands — ERASURE_COMPLETED; a terminal
failure appends ERASURE_STEP_FAILED before the entry is marked
ABANDONED. The audit append always precedes the status change,
so no recorded outcome lacks its audit record; if the sink is down
the entry stays claimed and the lease heals it. Transient failures
are not audited — the row’s last_error carries the exception
class name and the entry retries on the backoff schedule.
Awaits resolver calls concurrently but makes blocking database calls (claiming, audit appends) between awaits — run it in a worker, cron job, or background task, never on a serving event loop (ADR 0006).
Returns:
int— Number of entries processed in this batch.