Saga & Outbox
Saga/outbox — external calls beyond the local transaction, always in a known state.
AbandonedHook
Section titled “AbandonedHook”Protocol — implement these members in your own class; do not subclass.
class AbandonedHook(Protocol): ...A host callback fired when an outbox entry is abandoned.
Re-driving abandoned work is already a pull (Outbox.list_abandoned /
Outbox.requeue); this is the push. Wire one in to page or emit a
metric the instant a subject request stalls, instead of polling.
This protocol is public API, extended additively only. Sync by design (ADR 0006) — the hook runs on the runner’s thread, between blocking bookkeeping calls.
AbandonedHook.on_abandoned
Section titled “AbandonedHook.on_abandoned”def on_abandoned(signal: AbandonedSignal) -> NoneReact to one entry’s abandonment.
Called after the entry’s transition to ABANDONED is durable
and its audit event is written, so the hook is side-effect-isolated
from both: the runner swallows whatever this raises and does not
wait on it beyond the call returning. A slow or failing alerting
backend therefore cannot corrupt or block the state transition or
the audit trail — keeping this fast and resilient is the host’s job.
Args:
- signal (
AbandonedSignal): The PII-free summary of the abandoned entry.
AbandonedSignal
Section titled “AbandonedSignal”class AbandonedSignal(BaseModel): attempts: int = Field(ge=0) entry_id: UUID error: str = Field(min_length=1, max_length=255) operation: OutboxOperation resolver: str = Field(min_length=1, max_length=255) subject_id: str = Field(min_length=1, max_length=255)One entry’s terminal abandonment, summarised for an alerting host.
Handed to an AbandonedHook after the entry has flipped
to ABANDONED. Carries exactly what a host needs to page or emit a
metric and no PII: the error is the exception class name only,
never its message (provider errors embed identifiers), and corrections
never appear here.
Fields:
- attempts (
int): How many times the call was claimed before abandonment. - entry_id (
UUID): The abandoned entry’s id — the idempotency key, and whatOutbox.requeuetakes to re-drive an erase entry. - error (
str): The terminating exception’s class name (no message, no PII). - operation (
OutboxOperation): Whether this was an erase or a rectify entry — a rectify abandonment cannot be requeued (its corrections are gone). - resolver (
str): Which resolver’s call exhausted its retries. - subject_id (
str): The data subject whose request is now unfinished.
BackoffPolicy
Section titled “BackoffPolicy”class BackoffPolicy(BaseModel): base_delay: timedelta = timedelta(seconds=30) lease: timedelta = timedelta(minutes=5) max_delay: timedelta = timedelta(hours=1)Deterministic exponential backoff for outbox retries.
The delay after the n-th attempt is
min(base_delay * 2**(n - 1), max_delay) — no jitter, so tests can
pin exact schedules; concurrent runners are already spread by
SKIP LOCKED claiming.
Fields:
- base_delay (
timedelta): Delay after the first failed attempt. - lease (
timedelta): How long a claim protects anIN_FLIGHTentry from other runners. Must comfortably exceed the slowest expected resolver call: a lease that expires mid-call lets another runner execute the same entry again — converging (resolvers are idempotent) but wasteful and noisy. - max_delay (
timedelta): Ceiling the doubling never exceeds.
BackoffPolicy.delay
Section titled “BackoffPolicy.delay”def delay(attempts: int) -> timedeltaThe wait before the next try after attempts tries so far.
Args:
- attempts (
int): How many times the entry has been attempted (≥ 1).
Returns:
timedelta—min(base_delay * 2**(attempts - 1), max_delay).
Raises:
ValueError— Ifattemptsis not positive.
Outbox
Section titled “Outbox”class Outbox: def __init__(session_factory: sessionmaker, outbox: Table, *, status_counts_source: StatusCountsSource | None = None, audit_sink: AuditSink | None = None) -> NoneTransactional outbox in the application’s own database.
Entries are enqueued inside the caller’s transaction (atomically with
the local erasure) and claimed by the saga runner afterwards.
enqueue uses the caller’s session; the claim and bookkeeping
methods run outside any caller transaction and use the factory
(ADR 0006).
Outbox.claim_batch
Section titled “Outbox.claim_batch”def claim_batch(limit: int = 50, *, lease: timedelta = _DEFAULT_LEASE) -> Sequence[OutboxEntry]Atomically claim due entries for execution, oldest first.
An entry is due when its next_attempt_at gate is NULL or in
the past and it is not terminal: fresh PENDING entries, FAILED
entries whose backoff has elapsed, IN_FLIGHT entries whose
claim lease has expired (a crashed runner’s work, healed here),
and SCHEDULED entries whose retention horizon has passed
(parked by mark_scheduled, re-claimed to verify expiry —
ADR 0022).
Claimed entries move to IN_FLIGHT with attempts incremented —
the claim is the attempt, so an entry that crashes its runner every
time still converges to abandonment — and next_attempt_at set to
now + lease.
On PostgreSQL the selection runs FOR UPDATE SKIP LOCKED, so
concurrent runners never double-claim. SQLite ignores row locking;
the no-double-claim guarantee holds only on dialects that support
FOR UPDATE.
Args:
- limit (
int): Maximum entries to claim in one batch. - lease (
timedelta): How long the claim protects the entries from other runners. Must comfortably exceed the slowest expected resolver call; a too-short lease causes double execution (absorbed by resolver idempotency, but wasteful).
Returns:
Sequence[OutboxEntry]— The claimed entries in their post-claim state, oldest first.
Outbox.enqueue
Section titled “Outbox.enqueue”def enqueue(session: Session, entries: Sequence[OutboxEntry]) -> NonePersist entries inside the caller’s open transaction.
Never commits — the entries become durable exactly when the
caller’s transaction does, and a rollback takes them with it.
The nested SubjectRef is flattened into the
table’s ref_kind/ref_value/ref_extra columns.
Args:
- session (
Session): The SAME session the local erasure runs in — that is the whole point; the entries commit or roll back with it. - entries (
Sequence[OutboxEntry]): The external calls to record.
Outbox.list_abandoned
Section titled “Outbox.list_abandoned”def list_abandoned(*, limit: int = 100) -> Sequence[OutboxEntry]Return abandoned entries for operator inspection, oldest first.
The read half of “abandoned loudly”: every entry whose retries are
exhausted stays visible here (and in the audit trail) until it is
handled out of band. The ids it returns are exactly what
requeue consumes once the underlying cause is fixed —
abandonment is terminal under ADR 0010 until an operator
requeues (ADR 0015); whether an abandoned erasure needs that is a
determination only you can make.
Args:
- limit (
int): Maximum entries to return.
Returns:
Sequence[OutboxEntry]—ABANDONEDentries, oldest first (byenqueued_at, thenSequence[OutboxEntry]—entry_id).
Outbox.list_scheduled
Section titled “Outbox.list_scheduled”def list_scheduled(*, limit: int = 100) -> Sequence[OutboxEntry]Return entries parked for vendor expiry, nearest horizon first.
The read half of retention-only erasure (ADR 0022): every erase
entry a retention-only resolver could only schedule sits
SCHEDULED until its vendor horizon, then re-claims to verify the
data is gone. This shows which subject waits on which resolver
until when — next_attempt_at is the gate the runner re-claims
on, so the list is ordered by it: the subject expiring soonest
surfaces first. A SCHEDULED entry is neither a fault nor terminal
(contrast list_abandoned); it is pending vendor expiry, and
it blocks ERASURE_COMPLETED for its subject until verified gone.
Args:
- limit (
int): Maximum entries to return.
Returns:
Sequence[OutboxEntry]—SCHEDULEDentries, nearest horizon first (bySequence[OutboxEntry]—next_attempt_at, thenentry_id).
Outbox.mark_abandoned
Section titled “Outbox.mark_abandoned”def mark_abandoned(entry: OutboxEntry, *, error: str) -> NoneRecord a terminal failure; the entry is never retried.
Abandonment is loud by contract: the runner appends the step-failed
audit event before calling this, so an abandoned entry always has
its audit record. The row’s payload is cleared — a terminal
entry never retains corrected values (ADR 0013).
Args:
- entry (
OutboxEntry): The claimed entry whose retries are exhausted or whose failure is non-retryable. - error (
str): The exception class name — never its message.
Outbox.mark_failed
Section titled “Outbox.mark_failed”def mark_failed(entry: OutboxEntry, *, error: str, next_attempt_at: datetime) -> NoneRecord a retryable failure and schedule the next attempt.
The row’s payload survives a retryable failure on purpose: the
retry needs the corrected values.
Args:
- entry (
OutboxEntry): The claimed entry whose resolver call failed. - error (
str): The exception class name — never its message, which may embed PII. - next_attempt_at (
datetime): When the entry becomes claimable again (the backoff schedule).
Outbox.mark_scheduled
Section titled “Outbox.mark_scheduled”def mark_scheduled(entry: OutboxEntry, *, resume_at: datetime) -> NonePark the entry until its retention horizon (ADR 0022).
Records that the external system can only expire the data: the
entry moves to SCHEDULED with next_attempt_at=resume_at
(the same gate that schedules retries), attempts=0, and
last_error=NULL — the verification after the horizon gets the
full retry budget, the ADR 0015 requeue precedent; the prior
struggle lives in the ERASURE_EXPIRY_SCHEDULED event the
runner appends before calling this. A SCHEDULED entry is not
terminal: it blocks ERASURE_COMPLETED until re-claimed after
the horizon and verified gone.
Args:
- entry (
OutboxEntry): The claimed entry whose erasure was scheduled. - resume_at (
datetime): When the entry becomes claimable again — the retention horizon (clamped by the runner to at least one backoff step from now).
Outbox.mark_succeeded
Section titled “Outbox.mark_succeeded”def mark_succeeded(entry: OutboxEntry, *, on_subject_complete: Callable[[], None]) -> NoneRecord a successful external call; detect per-operation completion.
Runs in one transaction that first locks all of the subject’s
entries of the entry’s operation FOR UPDATE ordered by
entry_id — two runners finishing the same subject’s last two
entries serialize on the same lock order instead of deadlocking,
so exactly one of them observes the all-succeeded transition. If,
after this update, every same-operation entry for the subject is
SUCCEEDED (an ABANDONED sibling blocks completion
permanently; entries of the other operation are invisible here —
ADR 0013), on_subject_complete is invoked inside the open
transaction; if it raises, the update rolls back and the entry
stays IN_FLIGHT for the lease to heal — a success is never
recorded without its completion check.
The update also clears the row’s payload: a terminal entry
never retains corrected values.
With the default DatabaseAuditSink the callback
opens a second pooled connection while this transaction still
holds the first — size the pool for two connections per concurrent
runner thread. An exhausted pool times out, rolls back, and the
lease heals the entry, but the work is wasted.
Args:
- entry (
OutboxEntry): The claimed entry whose resolver call succeeded. - on_subject_complete (
Callable[[], None]): Invoked at most once, while the subject’s same-operation rows are still locked, when its last entry of that operation lands.
Outbox.requeue
Section titled “Outbox.requeue”def requeue(entry_ids: Iterable[UUID]) -> Sequence[OutboxEntry]Return abandoned erase entries to the queue with a fresh budget.
The single supervised mutation of the operator surface (ADR 0015):
the operator, having fixed the cause an entry abandoned for, hands
back the ids list_abandoned produced. Each id that is
currently ABANDONED flips to PENDING with
next_attempt_at = NULL (due immediately), attempts = 0 (the
full budget, not one borrowed attempt), and last_error = NULL.
The prior struggle is not lost — it moves into the requeue audit
event, where history belongs, instead of lingering in columns that
now describe a fresh entry. entry_id is unchanged, so the
resolver-side idempotency key holds and re-execution converges.
Erase-only — rectify entries cannot be requeued. A rectify
entry’s corrections (real PII) live in the row’s payload and
are cleared at abandonment under ADR 0013, exactly as on success.
A requeued rectify entry would re-execute with no corrections — a
silent no-op that would still fire RECTIFICATION_COMPLETED, an
Art. 16 correctness hole. So an ABANDONED rectify entry among
the ids raises ConfigurationError naming it,
before any audit append or status change (validation-first, the
same ordering as the rest of the trail): nothing flips and no event
is written. The remediation for an abandoned rectification is to
re-issue it through the Rectifier, which
re-enqueues a fresh entry carrying the corrections again.
Append-first audit. Before any row flips, one
ERASURE_REQUEUED event is appended per entry, payload
{entry_id, resolver, prior_attempts, prior_error} — the prior
error is the exception class name only, never a message. The
append precedes the status change under ADR 0010’s ordering rule:
if the sink is down, the append raises and nothing transitions; a
crash between the sink commit and the outbox commit can duplicate
an event but never lose one. With the default
DatabaseAuditSink each append opens a second
pooled connection while this transaction still holds the FOR UPDATE locks — the same connection-budget footgun as
mark_succeeded; size the pool for two connections per
caller, or an exhausted pool deadlocks the requeue against itself.
Idempotent and skip-tolerant. Ids that are missing, or whose
entry is no longer ABANDONED (a colleague requeued first, a
generation already succeeded), are silently skipped — never
errors. Calling requeue twice with the same ids is success;
the return value reports only the entries that actually flipped.
The whole call runs in one transaction that first locks the
affected rows FOR UPDATE ordered by entry_id — the same
lock order as mark_succeeded’s completion check, so a
requeue racing a concurrent runner serializes instead of
deadlocking. (SQLite ignores row locking; the serialization
guarantee holds only on dialects that support FOR UPDATE.)
Args:
- entry_ids (
Iterable[UUID]): The ids to requeue, as produced bylist_abandoned. Order is irrelevant; locking always followsentry_idorder.
Returns:
Sequence[OutboxEntry]— The entries that actually flipped, in their post-requeueSequence[OutboxEntry]—PENDINGstate. Empty when no supplied id wasABANDONED.
Raises:
ConfigurationError— If the outbox was constructed without anaudit_sink(the supervised requeue event has nowhere to land), or if any suppliedABANDONEDentry is a rectify entry (its corrections were cleared at abandonment, ADR 0013 — re-issue via theRectifier). Both are raised before any event or flip.
Outbox.status_counts
Section titled “Outbox.status_counts”def status_counts() -> dict[OutboxStatus, int]Count entries per lifecycle status, for dashboards and health checks.
Read-only. Every OutboxStatus member is present in
the result, zero-filled — a healthy, drained outbox reports explicit
zeros rather than missing keys. A growing ABANDONED count is the
operator signal that erasures need out-of-band attention.
Counting materializes every row in Python by default, since core
does not import SQLAlchemy at runtime. For large outboxes, inject a
SqlStatusCountsSource at construction to push the
aggregation into a single GROUP BY query — the result is
identical either way.
Returns:
dict[OutboxStatus, int]— A mapping with one entry per status.
OutboxEntry
Section titled “OutboxEntry”class OutboxEntry(BaseModel): attempts: int = Field(default=0, ge=0) corrections: tuple[Correction, ...] = () enqueued_at: datetime entry_id: UUID last_attempt_at: datetime | None = None last_error: str | None = None next_attempt_at: datetime | None = None operation: OutboxOperation = OutboxOperation.ERASE ref: SubjectRef resolver: str = Field(min_length=1, max_length=255) status: OutboxStatus = OutboxStatus.PENDING subject_id: StoredSubjectIdOne durable external call awaiting (or done with) execution.
Entries are written in the same transaction as the local phase, so an operation is never half-recorded: either the local changes and all their external follow-ups are committed together, or none are.
Fields:
- attempts (
int): How many times the call has been claimed for execution. - corrections (
tuple[Correction, ...]): The corrected values for a rectify entry — real PII, stored in the row’s payload only while the entry is non-terminal (cleared on success and abandonment alike) and never mirrored into any audit event. - enqueued_at (
datetime): When the entry was committed (UTC). - entry_id (
UUID): Unique id; doubles as the idempotency key for the call. - last_attempt_at (
datetime | None): When the last try started, if any (UTC). - last_error (
str | None): Short error note from the last failed try — the exception class name only, never its message (no PII). - next_attempt_at (
datetime | None): Earliest instant any runner may (re)claim the entry (UTC).Nonemeans due immediately. Doubles as the crash lease whileIN_FLIGHTand as the backoff schedule whileFAILED; terminal entries carryNone. - operation (
OutboxOperation): Which external call the entry performs (erase by default; rectify entries also carrycorrections). - ref (
SubjectRef): The subject reference to pass to it. - resolver (
str): Which resolver will perform the call. - status (
OutboxStatus): Current lifecycle state. - subject_id (
StoredSubjectId): The subject’s identifier, exactly as passed toerase_subject/rectify_subject— groups a subject’s entries so the runner can tell when its last one of an operation lands (completion is per subject and operation). Must be globally unique per data subject: two subjects sharing a value would be treated as one completion group.
OutboxOperation
Section titled “OutboxOperation”class OutboxOperation(StrEnum): ...Which external operation an outbox entry performs.
Completion is tracked per (subject, operation): ERASURE_COMPLETED
considers only erase entries, RECTIFICATION_COMPLETED only rectify
entries (ADR 0013). Adding members is a MINOR change; removing or
renaming is MAJOR (stored rows must stay readable forever).
| Member | Value | Description |
|---|---|---|
ERASE | erase | An Art. 17 erasure call (Resolver.erase_subject). |
RECTIFY | rectify | An Art. 16 rectification call (RectifyingResolver.rectify_subject). |
OutboxStatus
Section titled “OutboxStatus”class OutboxStatus(StrEnum): ...Lifecycle of one durable external-call entry.
| Member | Value | Description |
|---|---|---|
PENDING | pending | Enqueued, not yet attempted. |
IN_FLIGHT | in_flight | Claimed by a runner; being attempted right now. |
SUCCEEDED | succeeded | The external call completed (including “already gone”). |
FAILED | failed | Last attempt failed; will be retried with backoff. |
SCHEDULED | scheduled | Erasure scheduled to expire externally (ADR 0022); parked until the retention horizon, then re-claimed to verify the data is gone. |
ABANDONED | abandoned | Retries exhausted; surfaced loudly for operator action — never silently dropped, the audit trail records the abandonment. |
SagaRunner
Section titled “SagaRunner”class SagaRunner: def __init__(registry: ResolverRegistry, outbox: Outbox, audit: AuditSink, *, max_attempts: int = 8, batch_size: int = 50, backoff: BackoffPolicy | None = None, on_abandoned: AbandonedHook | None = None) -> NoneExecutes outbox entries with retries, backoff, and idempotency.
Designed to be driven by whatever the application already has — a
background task, a worker process, a cron job. One call to
run_once processes one batch; the runner owns no event loop.
Entries are dispatched by their operation: erase entries call
Resolver.erase_subject — or RetentionOnlyResolver.schedule_erasure
when the resolver can only schedule expiry (ADR 0022) — and rectify
entries call RectifyingResolver.rectify_subject with the entry’s
corrections. A scheduled erasure parks its entry until the retention
horizon, then re-verifies; it never counts as a completed erasure.
Failure taxonomy (ADR 0010): ResolverError — raised
by a resolver for a non-retryable failure, by the registry for an
unknown resolver name, or here for a rectify entry whose resolver does
not implement rectify_subject — abandons the entry immediately; any
other exception is treated as transient and retried with exponential
backoff until max_attempts, then abandoned. Every terminal outcome
is audited; an abandonment is never silent.
SagaRunner.run_once
Section titled “SagaRunner.run_once”async def run_once() -> intClaim and execute one batch of due entries.
Each entry’s resolver call is idempotent (the entry id is the
idempotency key), so a crash between execution and bookkeeping is
safe — the entry stays IN_FLIGHT, its claim lease expires, and
the retry converges on the same outcome.
Per entry: success appends ERASURE_STEP_SUCCEEDED (erase) or
RECTIFICATION_STEP_SUCCEEDED (rectify) and — when the subject’s
last entry of that operation lands — ERASURE_COMPLETED /
RECTIFICATION_COMPLETED; a terminal failure appends the
matching step-failed event before the entry is marked
ABANDONED. The audit append always precedes the status change,
so no recorded outcome lacks its audit record; if the sink is down
the entry stays claimed and the lease heals it. Transient failures
are not audited — the row’s last_error carries the exception
class name and the entry retries on the backoff schedule (a failed
rectify row keeps its corrections payload; terminal rows never do).
A retention-only resolver reporting a future horizon appends
ERASURE_EXPIRY_SCHEDULED and parks the entry SCHEDULED
until the horizon (ADR 0022); the parked entry blocks
ERASURE_COMPLETED until a later claim verifies the data gone
(already_absent=True), which succeeds the step with
verified_expiry. A vendor that keeps reporting fresh horizons
re-parks loudly each time — every slip is audited, never silent.
When an erase entry’s resolver implements
VerifyingResolver, the runner re-queries the
external system with verify_absent after the successful erase and
appends ERASURE_EXTERNAL_VERIFIED (read-back confirmed absence)
or ERASURE_EXTERNAL_VERIFICATION_FAILED (still present despite the
reported success) — additional trail over the settled success, never
a gate that reverts the erase (ADR 0027). Resolvers without the
capability are skipped silently: no event, no error.
Awaits resolver calls concurrently but makes blocking database calls (claiming, audit appends) between awaits — run it in a worker, cron job, or background task, never on a serving event loop (ADR 0006).
Returns:
int— Number of entries processed in this batch.
StatusCountsSource
Section titled “StatusCountsSource”Protocol — implement these members in your own class; do not subclass.
class StatusCountsSource(Protocol): ...Computes the outbox’s per-status entry counts.
Lets status_counts push the aggregation into
the database instead of materializing every row in Python. Core stays
storage-agnostic — it never imports SQLAlchemy at runtime — so the
actual GROUP BY lives in an adapter implementation
(SqlStatusCountsSource) injected at construction.
This protocol is public API. It is extended additively only — a custom source must never break on upgrade.
StatusCountsSource.status_counts
Section titled “StatusCountsSource.status_counts”def status_counts(outbox: Table, session_factory: sessionmaker) -> dict[OutboxStatus, int]Count outbox entries per lifecycle status.
Contract: the returned mapping is zero-filled over every
OutboxStatus member — a drained outbox reports
explicit zeros, never missing keys — so the result is a drop-in
replacement for the Python-side count.
Args:
- outbox (
Table): Theeffaced_outboxtable handle to count over. - session_factory (
sessionmaker): Factory producing sessions on the database holding that table; the source opens its own short-lived read session.
Returns:
dict[OutboxStatus, int]— A mapping with exactly one entry per status.