Wiring the saga runner
SagaRunner.run_once() claims one batch of due outbox entries, fans the
resolver calls out concurrently, and records every outcome in the audit
trail. It owns no event loop and no schedule — you drive it with whatever
your application already operates: a worker process, a cron job, or a
background thread.
Running several drivers at once is safe: claiming uses FOR UPDATE SKIP LOCKED, so concurrent runners skip each other’s rows, and a crashed
runner’s claims are re-claimed after the lease expires. See
the saga concept for the full claim, retry, and
completion semantics.
Shared wiring
Section titled “Shared wiring”from sqlalchemy.orm import sessionmaker
from effaced import ( BackoffPolicy, DatabaseAuditSink, Outbox, ResolverRegistry, SagaRunner, bind_tables,)
session_factory = sessionmaker(engine)tables = bind_tables(metadata)
registry = ResolverRegistry()registry.register(StripeResolver(api_key="rk_live_..."))
runner = SagaRunner( registry, Outbox(session_factory, tables.outbox), DatabaseAuditSink(session_factory, tables.audit_events), # Size the lease above your slowest resolver call: an expired lease # mid-call means double execution (idempotent, but wasteful). backoff=BackoffPolicy(),)Connection budget: at the moment a subject completes, the runner holds the outbox transaction and opens a second connection for the audit append — size the engine’s pool for two connections per concurrent runner thread. An exhausted pool only times out and retries via the lease, but that is wasted work.
FastAPI: a background thread, not a background task
Section titled “FastAPI: a background thread, not a background task”A daemon thread with its own asyncio.run keeps the runner’s blocking
database calls off the serving loop. (asyncio.create_task(...) on the
app’s loop is exactly the misuse warned about above.)
import asyncioimport threadingfrom contextlib import asynccontextmanager
stop = threading.Event()
def drain_forever() -> None: async def loop() -> None: while not stop.is_set(): if await runner.run_once() == 0: await asyncio.sleep(5.0) # queue empty — poll gently asyncio.run(loop())
@asynccontextmanagerasync def lifespan(app): worker = threading.Thread(target=drain_forever, daemon=True) worker.start() yield stop.set() worker.join(timeout=10)
app = FastAPI(lifespan=lifespan)Dedicated worker process
Section titled “Dedicated worker process”# saga_worker.py — run alongside your web processesimport asyncio
async def main() -> None: while True: if await runner.run_once() == 0: await asyncio.sleep(5.0)
if __name__ == "__main__": asyncio.run(main())The same shape drops into a Celery/RQ/Huey periodic task: the task body is
asyncio.run(drain()) where drain loops run_once until it returns 0.
# saga_drain.py — run from cron, e.g. * * * * *import asyncio
async def drain() -> int: total = 0 while count := await runner.run_once(): total += count return total
if __name__ == "__main__": asyncio.run(drain())Overlapping cron invocations are safe — concurrent runners skip each other’s locked rows and never double-claim.
Rectification entries
Section titled “Rectification entries”The outbox carries Art. 16 fan-out too: rectify entries
(operation = 'rectify') hold the corrected values in the row’s
payload column. In-flight and retrying rows therefore contain
personal data — treat outbox inspection output accordingly. The payload
is cleared the moment an entry reaches a terminal status (succeeded and
abandoned alike); only the retry path keeps it, because the retry needs
the values.
Deployments whose effaced_outbox predates rectification need the
additive columns (new metadata.create_all deployments get them
automatically):
ALTER TABLE effaced_outbox ADD COLUMN operation VARCHAR(32) NOT NULL DEFAULT 'erase';ALTER TABLE effaced_outbox ADD COLUMN payload JSONB;Monitoring abandonment
Section titled “Monitoring abandonment”An entry whose retries are exhausted (or whose resolver raised the
non-retryable ResolverError) becomes ABANDONED: it is never retried, it
blocks the completion event for its subject and operation permanently
(ERASURE_COMPLETED for erase entries, RECTIFICATION_COMPLETED for
rectify entries), and it is always audited (ERASURE_STEP_FAILED /
RECTIFICATION_STEP_FAILED with abandoned: true). Abandonment means a
data-subject request is not finished — alert on it:
SELECT subject_id, resolver, operation, last_error, attemptsFROM effaced_outbox WHERE status = 'abandoned';Remediation: fix the underlying cause (credentials, deleted API resource,
resolver bug), delete the abandoned row, and re-run erase_subject for the
subject. The re-run enqueues the external work under fresh idempotency
keys, and resolvers treat “already gone” as success, so it converges.