Skip to content

Saga & Outbox

This content is for 0.1. Switch to the latest version for up-to-date documentation.

Saga/outbox — erasure beyond the local transaction, always in a known state.

class BackoffPolicy(BaseModel):
base_delay: timedelta = timedelta(seconds=30)
lease: timedelta = timedelta(minutes=5)
max_delay: timedelta = timedelta(hours=1)

Deterministic exponential backoff for outbox retries.

The delay after the n-th attempt is min(base_delay * 2**(n - 1), max_delay) — no jitter, so tests can pin exact schedules; concurrent runners are already spread by SKIP LOCKED claiming.

Fields:

  • base_delay (timedelta): Delay after the first failed attempt.
  • lease (timedelta): How long a claim protects an IN_FLIGHT entry from other runners. Must comfortably exceed the slowest expected resolver call: a lease that expires mid-call lets another runner execute the same entry again — converging (resolvers are idempotent) but wasteful and noisy.
  • max_delay (timedelta): Ceiling the doubling never exceeds.
def delay(attempts: int) -> timedelta

The wait before the next try after attempts tries so far.

Args:

  • attempts (int): How many times the entry has been attempted (≥ 1).

Returns:

  • timedeltamin(base_delay * 2**(attempts - 1), max_delay).

Raises:

  • ValueError — If attempts is not positive.
class Outbox:
def __init__(session_factory: sessionmaker, outbox: Table) -> None

Transactional outbox in the application’s own database.

Entries are enqueued inside the caller’s transaction (atomically with the local erasure) and claimed by the saga runner afterwards. enqueue uses the caller’s session; the claim and bookkeeping methods run outside any caller transaction and use the factory (ADR 0006).

def claim_batch(limit: int = 50, *, lease: timedelta = _DEFAULT_LEASE) -> Sequence[OutboxEntry]

Atomically claim due entries for execution, oldest first.

An entry is due when its next_attempt_at gate is NULL or in the past and it is not terminal: fresh PENDING entries, FAILED entries whose backoff has elapsed, and IN_FLIGHT entries whose claim lease has expired (a crashed runner’s work, healed here). Claimed entries move to IN_FLIGHT with attempts incremented — the claim is the attempt, so an entry that crashes its runner every time still converges to abandonment — and next_attempt_at set to now + lease.

On PostgreSQL the selection runs FOR UPDATE SKIP LOCKED, so concurrent runners never double-claim. SQLite ignores row locking; the no-double-claim guarantee holds only on dialects that support FOR UPDATE.

Args:

  • limit (int): Maximum entries to claim in one batch.
  • lease (timedelta): How long the claim protects the entries from other runners. Must comfortably exceed the slowest expected resolver call; a too-short lease causes double execution (absorbed by resolver idempotency, but wasteful).

Returns:

  • Sequence[OutboxEntry] — The claimed entries in their post-claim state, oldest first.
def enqueue(session: Session, entries: Sequence[OutboxEntry]) -> None

Persist entries inside the caller’s open transaction.

Never commits — the entries become durable exactly when the caller’s transaction does, and a rollback takes them with it. The nested SubjectRef is flattened into the table’s ref_kind/ref_value/ref_extra columns.

Args:

  • session (Session): The SAME session the local erasure runs in — that is the whole point; the entries commit or roll back with it.
  • entries (Sequence[OutboxEntry]): The external calls to record.
def list_abandoned(*, limit: int = 100) -> Sequence[OutboxEntry]

Return abandoned entries for operator inspection, oldest first.

The read half of “abandoned loudly”: every entry whose retries are exhausted stays visible here (and in the audit trail) until it is handled out of band. Read-only by design — abandonment is permanent under ADR 0010, so there is deliberately no requeue surface; what an abandoned erasure requires is a determination only you can make.

Args:

  • limit (int): Maximum entries to return.

Returns:

  • Sequence[OutboxEntry]ABANDONED entries, oldest first (by enqueued_at, then
  • Sequence[OutboxEntry]entry_id).
def mark_abandoned(entry: OutboxEntry, *, error: str) -> None

Record a terminal failure; the entry is never retried.

Abandonment is loud by contract: the runner appends the ERASURE_STEP_FAILED audit event before calling this, so an abandoned entry always has its audit record.

Args:

  • entry (OutboxEntry): The claimed entry whose retries are exhausted or whose failure is non-retryable.
  • error (str): The exception class name — never its message.
def mark_failed(entry: OutboxEntry, *, error: str, next_attempt_at: datetime) -> None

Record a retryable failure and schedule the next attempt.

Args:

  • entry (OutboxEntry): The claimed entry whose resolver call failed.
  • error (str): The exception class name — never its message, which may embed PII.
  • next_attempt_at (datetime): When the entry becomes claimable again (the backoff schedule).
def mark_succeeded(entry: OutboxEntry, *, on_subject_complete: Callable[[], None]) -> None

Record a successful external call; detect subject completion.

Runs in one transaction that first locks all of the subject’s entries FOR UPDATE ordered by entry_id — two runners finishing the same subject’s last two entries serialize on the same lock order instead of deadlocking, so exactly one of them observes the all-succeeded transition. If, after this update, every entry for the subject is SUCCEEDED (an ABANDONED sibling blocks completion permanently), on_subject_complete is invoked inside the open transaction; if it raises, the update rolls back and the entry stays IN_FLIGHT for the lease to heal — a success is never recorded without its completion check.

With the default DatabaseAuditSink the callback opens a second pooled connection while this transaction still holds the first — size the pool for two connections per concurrent runner thread. An exhausted pool times out, rolls back, and the lease heals the entry, but the work is wasted.

Args:

  • entry (OutboxEntry): The claimed entry whose resolver call succeeded.
  • on_subject_complete (Callable[[], None]): Invoked at most once, while the subject’s rows are still locked, when its last entry lands.
def status_counts() -> dict[OutboxStatus, int]

Count entries per lifecycle status, for dashboards and health checks.

Read-only. Every OutboxStatus member is present in the result, zero-filled — a healthy, drained outbox reports explicit zeros rather than missing keys. A growing ABANDONED count is the operator signal that erasures need out-of-band attention.

Returns:

  • dict[OutboxStatus, int] — A mapping with one entry per status.
class OutboxEntry(BaseModel):
attempts: int = Field(default=0, ge=0)
enqueued_at: datetime
entry_id: UUID
last_attempt_at: datetime | None = None
last_error: str | None = None
next_attempt_at: datetime | None = None
ref: SubjectRef
resolver: str = Field(min_length=1, max_length=255)
status: OutboxStatus = OutboxStatus.PENDING
subject_id: str = Field(min_length=1, max_length=255)

One durable external call awaiting (or done with) execution.

Entries are written in the same transaction as the local erasure, so an erasure is never half-recorded: either the local delete and all its external follow-ups are committed together, or none are.

Fields:

  • attempts (int): How many times the call has been claimed for execution.
  • enqueued_at (datetime): When the entry was committed (UTC).
  • entry_id (UUID): Unique id; doubles as the idempotency key for the call.
  • last_attempt_at (datetime | None): When the last try started, if any (UTC).
  • last_error (str | None): Short error note from the last failed try — the exception class name only, never its message (no PII).
  • next_attempt_at (datetime | None): Earliest instant any runner may (re)claim the entry (UTC). None means due immediately. Doubles as the crash lease while IN_FLIGHT and as the backoff schedule while FAILED; terminal entries carry None.
  • ref (SubjectRef): The subject reference to pass to it.
  • resolver (str): Which resolver will perform the call.
  • status (OutboxStatus): Current lifecycle state.
  • subject_id (str): The erased subject’s identifier, exactly as passed to erase_subject — groups a subject’s entries so the runner can tell when its last one lands. Must be globally unique per data subject: two subjects sharing a value would be treated as one completion group.
class OutboxStatus(StrEnum):
...

Lifecycle of one durable external-call entry.

MemberValueDescription
PENDINGpendingEnqueued, not yet attempted.
IN_FLIGHTin_flightClaimed by a runner; being attempted right now.
SUCCEEDEDsucceededThe external call completed (including “already gone”).
FAILEDfailedLast attempt failed; will be retried with backoff.
ABANDONEDabandonedRetries exhausted; surfaced loudly for operator action — never silently dropped, the audit trail records the abandonment.
class SagaRunner:
def __init__(registry: ResolverRegistry, outbox: Outbox, audit: AuditSink, *, max_attempts: int = 8, batch_size: int = 50, backoff: BackoffPolicy | None = None) -> None

Executes outbox entries with retries, backoff, and idempotency.

Designed to be driven by whatever the application already has — a background task, a worker process, a cron job. One call to run_once processes one batch; the runner owns no event loop.

Failure taxonomy (ADR 0010): ResolverError — raised by a resolver for a non-retryable failure, or by the registry for an unknown resolver name — abandons the entry immediately; any other exception is treated as transient and retried with exponential backoff until max_attempts, then abandoned. Every terminal outcome is audited; an abandonment is never silent.

async def run_once() -> int

Claim and execute one batch of due entries.

Each entry’s resolver call is idempotent (the entry id is the idempotency key), so a crash between execution and bookkeeping is safe — the entry stays IN_FLIGHT, its claim lease expires, and the retry converges on the same outcome.

Per entry: success appends ERASURE_STEP_SUCCEEDED and — when the subject’s last entry lands — ERASURE_COMPLETED; a terminal failure appends ERASURE_STEP_FAILED before the entry is marked ABANDONED. The audit append always precedes the status change, so no recorded outcome lacks its audit record; if the sink is down the entry stays claimed and the lease heals it. Transient failures are not audited — the row’s last_error carries the exception class name and the entry retries on the backoff schedule.

Awaits resolver calls concurrently but makes blocking database calls (claiming, audit appends) between awaits — run it in a worker, cron job, or background task, never on a serving event loop (ADR 0006).

Returns:

  • int — Number of entries processed in this batch.