Skip to content

Saga & Outbox

Saga/outbox — external calls beyond the local transaction, always in a known state.

Protocol — implement these members in your own class; do not subclass.

class AbandonedHook(Protocol):
...

A host callback fired when an outbox entry is abandoned.

Re-driving abandoned work is already a pull (Outbox.list_abandoned / Outbox.requeue); this is the push. Wire one in to page or emit a metric the instant a subject request stalls, instead of polling.

This protocol is public API, extended additively only. Sync by design (ADR 0006) — the hook runs on the runner’s thread, between blocking bookkeeping calls.

def on_abandoned(signal: AbandonedSignal) -> None

React to one entry’s abandonment.

Called after the entry’s transition to ABANDONED is durable and its audit event is written, so the hook is side-effect-isolated from both: the runner swallows whatever this raises and does not wait on it beyond the call returning. A slow or failing alerting backend therefore cannot corrupt or block the state transition or the audit trail — keeping this fast and resilient is the host’s job.

Args:

  • signal (AbandonedSignal): The PII-free summary of the abandoned entry.
class AbandonedSignal(BaseModel):
attempts: int = Field(ge=0)
entry_id: UUID
error: str = Field(min_length=1, max_length=255)
operation: OutboxOperation
resolver: str = Field(min_length=1, max_length=255)
subject_id: str = Field(min_length=1, max_length=255)

One entry’s terminal abandonment, summarised for an alerting host.

Handed to an AbandonedHook after the entry has flipped to ABANDONED. Carries exactly what a host needs to page or emit a metric and no PII: the error is the exception class name only, never its message (provider errors embed identifiers), and corrections never appear here.

Fields:

  • attempts (int): How many times the call was claimed before abandonment.
  • entry_id (UUID): The abandoned entry’s id — the idempotency key, and what Outbox.requeue takes to re-drive an erase entry.
  • error (str): The terminating exception’s class name (no message, no PII).
  • operation (OutboxOperation): Whether this was an erase or a rectify entry — a rectify abandonment cannot be requeued (its corrections are gone).
  • resolver (str): Which resolver’s call exhausted its retries.
  • subject_id (str): The data subject whose request is now unfinished.
class BackoffPolicy(BaseModel):
base_delay: timedelta = timedelta(seconds=30)
lease: timedelta = timedelta(minutes=5)
max_delay: timedelta = timedelta(hours=1)

Deterministic exponential backoff for outbox retries.

The delay after the n-th attempt is min(base_delay * 2**(n - 1), max_delay) — no jitter, so tests can pin exact schedules; concurrent runners are already spread by SKIP LOCKED claiming.

Fields:

  • base_delay (timedelta): Delay after the first failed attempt.
  • lease (timedelta): How long a claim protects an IN_FLIGHT entry from other runners. Must comfortably exceed the slowest expected resolver call: a lease that expires mid-call lets another runner execute the same entry again — converging (resolvers are idempotent) but wasteful and noisy.
  • max_delay (timedelta): Ceiling the doubling never exceeds.
def delay(attempts: int) -> timedelta

The wait before the next try after attempts tries so far.

Args:

  • attempts (int): How many times the entry has been attempted (≥ 1).

Returns:

  • timedeltamin(base_delay * 2**(attempts - 1), max_delay).

Raises:

  • ValueError — If attempts is not positive.
class Outbox:
def __init__(session_factory: sessionmaker, outbox: Table, *, status_counts_source: StatusCountsSource | None = None, audit_sink: AuditSink | None = None) -> None

Transactional outbox in the application’s own database.

Entries are enqueued inside the caller’s transaction (atomically with the local erasure) and claimed by the saga runner afterwards. enqueue uses the caller’s session; the claim and bookkeeping methods run outside any caller transaction and use the factory (ADR 0006).

def claim_batch(limit: int = 50, *, lease: timedelta = _DEFAULT_LEASE) -> Sequence[OutboxEntry]

Atomically claim due entries for execution, oldest first.

An entry is due when its next_attempt_at gate is NULL or in the past and it is not terminal: fresh PENDING entries, FAILED entries whose backoff has elapsed, IN_FLIGHT entries whose claim lease has expired (a crashed runner’s work, healed here), and SCHEDULED entries whose retention horizon has passed (parked by mark_scheduled, re-claimed to verify expiry — ADR 0022). Claimed entries move to IN_FLIGHT with attempts incremented — the claim is the attempt, so an entry that crashes its runner every time still converges to abandonment — and next_attempt_at set to now + lease.

On PostgreSQL the selection runs FOR UPDATE SKIP LOCKED, so concurrent runners never double-claim. SQLite ignores row locking; the no-double-claim guarantee holds only on dialects that support FOR UPDATE.

Args:

  • limit (int): Maximum entries to claim in one batch.
  • lease (timedelta): How long the claim protects the entries from other runners. Must comfortably exceed the slowest expected resolver call; a too-short lease causes double execution (absorbed by resolver idempotency, but wasteful).

Returns:

  • Sequence[OutboxEntry] — The claimed entries in their post-claim state, oldest first.
def enqueue(session: Session, entries: Sequence[OutboxEntry]) -> None

Persist entries inside the caller’s open transaction.

Never commits — the entries become durable exactly when the caller’s transaction does, and a rollback takes them with it. The nested SubjectRef is flattened into the table’s ref_kind/ref_value/ref_extra columns.

Args:

  • session (Session): The SAME session the local erasure runs in — that is the whole point; the entries commit or roll back with it.
  • entries (Sequence[OutboxEntry]): The external calls to record.
def list_abandoned(*, limit: int = 100) -> Sequence[OutboxEntry]

Return abandoned entries for operator inspection, oldest first.

The read half of “abandoned loudly”: every entry whose retries are exhausted stays visible here (and in the audit trail) until it is handled out of band. The ids it returns are exactly what requeue consumes once the underlying cause is fixed — abandonment is terminal under ADR 0010 until an operator requeues (ADR 0015); whether an abandoned erasure needs that is a determination only you can make.

Args:

  • limit (int): Maximum entries to return.

Returns:

  • Sequence[OutboxEntry]ABANDONED entries, oldest first (by enqueued_at, then
  • Sequence[OutboxEntry]entry_id).
def list_scheduled(*, limit: int = 100) -> Sequence[OutboxEntry]

Return entries parked for vendor expiry, nearest horizon first.

The read half of retention-only erasure (ADR 0022): every erase entry a retention-only resolver could only schedule sits SCHEDULED until its vendor horizon, then re-claims to verify the data is gone. This shows which subject waits on which resolver until whennext_attempt_at is the gate the runner re-claims on, so the list is ordered by it: the subject expiring soonest surfaces first. A SCHEDULED entry is neither a fault nor terminal (contrast list_abandoned); it is pending vendor expiry, and it blocks ERASURE_COMPLETED for its subject until verified gone.

Args:

  • limit (int): Maximum entries to return.

Returns:

  • Sequence[OutboxEntry]SCHEDULED entries, nearest horizon first (by
  • Sequence[OutboxEntry]next_attempt_at, then entry_id).
def mark_abandoned(entry: OutboxEntry, *, error: str) -> None

Record a terminal failure; the entry is never retried.

Abandonment is loud by contract: the runner appends the step-failed audit event before calling this, so an abandoned entry always has its audit record. The row’s payload is cleared — a terminal entry never retains corrected values (ADR 0013).

Args:

  • entry (OutboxEntry): The claimed entry whose retries are exhausted or whose failure is non-retryable.
  • error (str): The exception class name — never its message.
def mark_failed(entry: OutboxEntry, *, error: str, next_attempt_at: datetime) -> None

Record a retryable failure and schedule the next attempt.

The row’s payload survives a retryable failure on purpose: the retry needs the corrected values.

Args:

  • entry (OutboxEntry): The claimed entry whose resolver call failed.
  • error (str): The exception class name — never its message, which may embed PII.
  • next_attempt_at (datetime): When the entry becomes claimable again (the backoff schedule).
def mark_scheduled(entry: OutboxEntry, *, resume_at: datetime) -> None

Park the entry until its retention horizon (ADR 0022).

Records that the external system can only expire the data: the entry moves to SCHEDULED with next_attempt_at=resume_at (the same gate that schedules retries), attempts=0, and last_error=NULL — the verification after the horizon gets the full retry budget, the ADR 0015 requeue precedent; the prior struggle lives in the ERASURE_EXPIRY_SCHEDULED event the runner appends before calling this. A SCHEDULED entry is not terminal: it blocks ERASURE_COMPLETED until re-claimed after the horizon and verified gone.

Args:

  • entry (OutboxEntry): The claimed entry whose erasure was scheduled.
  • resume_at (datetime): When the entry becomes claimable again — the retention horizon (clamped by the runner to at least one backoff step from now).
def mark_succeeded(entry: OutboxEntry, *, on_subject_complete: Callable[[], None]) -> None

Record a successful external call; detect per-operation completion.

Runs in one transaction that first locks all of the subject’s entries of the entry’s operation FOR UPDATE ordered by entry_id — two runners finishing the same subject’s last two entries serialize on the same lock order instead of deadlocking, so exactly one of them observes the all-succeeded transition. If, after this update, every same-operation entry for the subject is SUCCEEDED (an ABANDONED sibling blocks completion permanently; entries of the other operation are invisible here — ADR 0013), on_subject_complete is invoked inside the open transaction; if it raises, the update rolls back and the entry stays IN_FLIGHT for the lease to heal — a success is never recorded without its completion check.

The update also clears the row’s payload: a terminal entry never retains corrected values.

With the default DatabaseAuditSink the callback opens a second pooled connection while this transaction still holds the first — size the pool for two connections per concurrent runner thread. An exhausted pool times out, rolls back, and the lease heals the entry, but the work is wasted.

Args:

  • entry (OutboxEntry): The claimed entry whose resolver call succeeded.
  • on_subject_complete (Callable[[], None]): Invoked at most once, while the subject’s same-operation rows are still locked, when its last entry of that operation lands.
def requeue(entry_ids: Iterable[UUID]) -> Sequence[OutboxEntry]

Return abandoned erase entries to the queue with a fresh budget.

The single supervised mutation of the operator surface (ADR 0015): the operator, having fixed the cause an entry abandoned for, hands back the ids list_abandoned produced. Each id that is currently ABANDONED flips to PENDING with next_attempt_at = NULL (due immediately), attempts = 0 (the full budget, not one borrowed attempt), and last_error = NULL. The prior struggle is not lost — it moves into the requeue audit event, where history belongs, instead of lingering in columns that now describe a fresh entry. entry_id is unchanged, so the resolver-side idempotency key holds and re-execution converges.

Erase-only — rectify entries cannot be requeued. A rectify entry’s corrections (real PII) live in the row’s payload and are cleared at abandonment under ADR 0013, exactly as on success. A requeued rectify entry would re-execute with no corrections — a silent no-op that would still fire RECTIFICATION_COMPLETED, an Art. 16 correctness hole. So an ABANDONED rectify entry among the ids raises ConfigurationError naming it, before any audit append or status change (validation-first, the same ordering as the rest of the trail): nothing flips and no event is written. The remediation for an abandoned rectification is to re-issue it through the Rectifier, which re-enqueues a fresh entry carrying the corrections again.

Append-first audit. Before any row flips, one ERASURE_REQUEUED event is appended per entry, payload {entry_id, resolver, prior_attempts, prior_error} — the prior error is the exception class name only, never a message. The append precedes the status change under ADR 0010’s ordering rule: if the sink is down, the append raises and nothing transitions; a crash between the sink commit and the outbox commit can duplicate an event but never lose one. With the default DatabaseAuditSink each append opens a second pooled connection while this transaction still holds the FOR UPDATE locks — the same connection-budget footgun as mark_succeeded; size the pool for two connections per caller, or an exhausted pool deadlocks the requeue against itself.

Idempotent and skip-tolerant. Ids that are missing, or whose entry is no longer ABANDONED (a colleague requeued first, a generation already succeeded), are silently skipped — never errors. Calling requeue twice with the same ids is success; the return value reports only the entries that actually flipped.

The whole call runs in one transaction that first locks the affected rows FOR UPDATE ordered by entry_id — the same lock order as mark_succeeded’s completion check, so a requeue racing a concurrent runner serializes instead of deadlocking. (SQLite ignores row locking; the serialization guarantee holds only on dialects that support FOR UPDATE.)

Args:

  • entry_ids (Iterable[UUID]): The ids to requeue, as produced by list_abandoned. Order is irrelevant; locking always follows entry_id order.

Returns:

  • Sequence[OutboxEntry] — The entries that actually flipped, in their post-requeue
  • Sequence[OutboxEntry]PENDING state. Empty when no supplied id was ABANDONED.

Raises:

  • ConfigurationError — If the outbox was constructed without an audit_sink (the supervised requeue event has nowhere to land), or if any supplied ABANDONED entry is a rectify entry (its corrections were cleared at abandonment, ADR 0013 — re-issue via the Rectifier). Both are raised before any event or flip.
def status_counts() -> dict[OutboxStatus, int]

Count entries per lifecycle status, for dashboards and health checks.

Read-only. Every OutboxStatus member is present in the result, zero-filled — a healthy, drained outbox reports explicit zeros rather than missing keys. A growing ABANDONED count is the operator signal that erasures need out-of-band attention.

Counting materializes every row in Python by default, since core does not import SQLAlchemy at runtime. For large outboxes, inject a SqlStatusCountsSource at construction to push the aggregation into a single GROUP BY query — the result is identical either way.

Returns:

  • dict[OutboxStatus, int] — A mapping with one entry per status.
class OutboxEntry(BaseModel):
attempts: int = Field(default=0, ge=0)
corrections: tuple[Correction, ...] = ()
enqueued_at: datetime
entry_id: UUID
last_attempt_at: datetime | None = None
last_error: str | None = None
next_attempt_at: datetime | None = None
operation: OutboxOperation = OutboxOperation.ERASE
ref: SubjectRef
resolver: str = Field(min_length=1, max_length=255)
status: OutboxStatus = OutboxStatus.PENDING
subject_id: StoredSubjectId

One durable external call awaiting (or done with) execution.

Entries are written in the same transaction as the local phase, so an operation is never half-recorded: either the local changes and all their external follow-ups are committed together, or none are.

Fields:

  • attempts (int): How many times the call has been claimed for execution.
  • corrections (tuple[Correction, ...]): The corrected values for a rectify entry — real PII, stored in the row’s payload only while the entry is non-terminal (cleared on success and abandonment alike) and never mirrored into any audit event.
  • enqueued_at (datetime): When the entry was committed (UTC).
  • entry_id (UUID): Unique id; doubles as the idempotency key for the call.
  • last_attempt_at (datetime | None): When the last try started, if any (UTC).
  • last_error (str | None): Short error note from the last failed try — the exception class name only, never its message (no PII).
  • next_attempt_at (datetime | None): Earliest instant any runner may (re)claim the entry (UTC). None means due immediately. Doubles as the crash lease while IN_FLIGHT and as the backoff schedule while FAILED; terminal entries carry None.
  • operation (OutboxOperation): Which external call the entry performs (erase by default; rectify entries also carry corrections).
  • ref (SubjectRef): The subject reference to pass to it.
  • resolver (str): Which resolver will perform the call.
  • status (OutboxStatus): Current lifecycle state.
  • subject_id (StoredSubjectId): The subject’s identifier, exactly as passed to erase_subject/rectify_subject — groups a subject’s entries so the runner can tell when its last one of an operation lands (completion is per subject and operation). Must be globally unique per data subject: two subjects sharing a value would be treated as one completion group.
class OutboxOperation(StrEnum):
...

Which external operation an outbox entry performs.

Completion is tracked per (subject, operation): ERASURE_COMPLETED considers only erase entries, RECTIFICATION_COMPLETED only rectify entries (ADR 0013). Adding members is a MINOR change; removing or renaming is MAJOR (stored rows must stay readable forever).

MemberValueDescription
ERASEeraseAn Art. 17 erasure call (Resolver.erase_subject).
RECTIFYrectifyAn Art. 16 rectification call (RectifyingResolver.rectify_subject).
class OutboxStatus(StrEnum):
...

Lifecycle of one durable external-call entry.

MemberValueDescription
PENDINGpendingEnqueued, not yet attempted.
IN_FLIGHTin_flightClaimed by a runner; being attempted right now.
SUCCEEDEDsucceededThe external call completed (including “already gone”).
FAILEDfailedLast attempt failed; will be retried with backoff.
SCHEDULEDscheduledErasure scheduled to expire externally (ADR 0022); parked until the retention horizon, then re-claimed to verify the data is gone.
ABANDONEDabandonedRetries exhausted; surfaced loudly for operator action — never silently dropped, the audit trail records the abandonment.
class SagaRunner:
def __init__(registry: ResolverRegistry, outbox: Outbox, audit: AuditSink, *, max_attempts: int = 8, batch_size: int = 50, backoff: BackoffPolicy | None = None, on_abandoned: AbandonedHook | None = None) -> None

Executes outbox entries with retries, backoff, and idempotency.

Designed to be driven by whatever the application already has — a background task, a worker process, a cron job. One call to run_once processes one batch; the runner owns no event loop.

Entries are dispatched by their operation: erase entries call Resolver.erase_subject — or RetentionOnlyResolver.schedule_erasure when the resolver can only schedule expiry (ADR 0022) — and rectify entries call RectifyingResolver.rectify_subject with the entry’s corrections. A scheduled erasure parks its entry until the retention horizon, then re-verifies; it never counts as a completed erasure.

Failure taxonomy (ADR 0010): ResolverError — raised by a resolver for a non-retryable failure, by the registry for an unknown resolver name, or here for a rectify entry whose resolver does not implement rectify_subject — abandons the entry immediately; any other exception is treated as transient and retried with exponential backoff until max_attempts, then abandoned. Every terminal outcome is audited; an abandonment is never silent.

async def run_once() -> int

Claim and execute one batch of due entries.

Each entry’s resolver call is idempotent (the entry id is the idempotency key), so a crash between execution and bookkeeping is safe — the entry stays IN_FLIGHT, its claim lease expires, and the retry converges on the same outcome.

Per entry: success appends ERASURE_STEP_SUCCEEDED (erase) or RECTIFICATION_STEP_SUCCEEDED (rectify) and — when the subject’s last entry of that operation lands — ERASURE_COMPLETED / RECTIFICATION_COMPLETED; a terminal failure appends the matching step-failed event before the entry is marked ABANDONED. The audit append always precedes the status change, so no recorded outcome lacks its audit record; if the sink is down the entry stays claimed and the lease heals it. Transient failures are not audited — the row’s last_error carries the exception class name and the entry retries on the backoff schedule (a failed rectify row keeps its corrections payload; terminal rows never do).

A retention-only resolver reporting a future horizon appends ERASURE_EXPIRY_SCHEDULED and parks the entry SCHEDULED until the horizon (ADR 0022); the parked entry blocks ERASURE_COMPLETED until a later claim verifies the data gone (already_absent=True), which succeeds the step with verified_expiry. A vendor that keeps reporting fresh horizons re-parks loudly each time — every slip is audited, never silent.

When an erase entry’s resolver implements VerifyingResolver, the runner re-queries the external system with verify_absent after the successful erase and appends ERASURE_EXTERNAL_VERIFIED (read-back confirmed absence) or ERASURE_EXTERNAL_VERIFICATION_FAILED (still present despite the reported success) — additional trail over the settled success, never a gate that reverts the erase (ADR 0027). Resolvers without the capability are skipped silently: no event, no error.

Awaits resolver calls concurrently but makes blocking database calls (claiming, audit appends) between awaits — run it in a worker, cron job, or background task, never on a serving event loop (ADR 0006).

Returns:

  • int — Number of entries processed in this batch.

Protocol — implement these members in your own class; do not subclass.

class StatusCountsSource(Protocol):
...

Computes the outbox’s per-status entry counts.

Lets status_counts push the aggregation into the database instead of materializing every row in Python. Core stays storage-agnostic — it never imports SQLAlchemy at runtime — so the actual GROUP BY lives in an adapter implementation (SqlStatusCountsSource) injected at construction.

This protocol is public API. It is extended additively only — a custom source must never break on upgrade.

def status_counts(outbox: Table, session_factory: sessionmaker) -> dict[OutboxStatus, int]

Count outbox entries per lifecycle status.

Contract: the returned mapping is zero-filled over every OutboxStatus member — a drained outbox reports explicit zeros, never missing keys — so the result is a drop-in replacement for the Python-side count.

Args:

  • outbox (Table): The effaced_outbox table handle to count over.
  • session_factory (sessionmaker): Factory producing sessions on the database holding that table; the source opens its own short-lived read session.

Returns:

  • dict[OutboxStatus, int] — A mapping with exactly one entry per status.