Celery · AI agents · API key security
Celery AI agent API key: scoping vendor calls in distributed task queues
Celery is the most widely-deployed distributed task queue in the Python ecosystem — millions of production systems use it for background jobs, periodic tasks, and multi-step workflows. When AI agents use Celery to dispatch vendor API calls across workers, the same features that make Celery reliable become spending risks: chord headers fan out to hundreds of simultaneous Stripe calls, autoretry_for multiplies each network failure into multiple vendor charges, and celery beat schedules repeat indefinitely until explicitly stopped. This page covers the vault-key pattern that bounds per-chord vendor spend without restructuring your task definitions.
TL;DR
Celery's chord and autoretry_for are designed for reliable distributed compute — not for vendor API calls that cost money per invocation. Issue a vault key at chord-dispatch time, pass it through the task signature, enforce a per-chord dollar cap at the proxy layer, and get a structured per-call audit log with the task ID attached. Revoke the vault key from the Keybrake dashboard without rotating the real API key that every other Celery worker shares.
How Celery AI agent tasks call vendor APIs
In a Celery application, vendor API calls live inside @app.task-decorated functions. An AI billing agent that processes subscription renewals might use a chord to fan out charges:
from celery import Celery, chord
import stripe
import os
app = Celery("billing", broker="redis://localhost:6379/0")
@app.task(
autoretry_for=(stripe.error.APIConnectionError,),
retry_backoff=True,
max_retries=3,
)
def charge_customer(customer_id: str, amount_cents: int) -> dict:
stripe.api_key = os.environ["STRIPE_SECRET_KEY"] # full-access key
return stripe.PaymentIntent.create(
amount=amount_cents,
currency="usd",
customer=customer_id,
)
@app.task
def billing_complete(results: list) -> dict:
successful = sum(1 for r in results if r.get("status") == "succeeded")
return {"processed": len(results), "successful": successful}
def run_billing(customer_ids: list[str], amount_cents: int):
header = [charge_customer.s(cid, amount_cents) for cid in customer_ids]
callback = billing_complete.s()
chord(header)(callback)
This is standard Celery. The chord dispatches all charge tasks simultaneously; autoretry_for handles transient network errors. The problem: STRIPE_SECRET_KEY is a full-access key read from the environment on every worker, there's no cap on how many charges the chord can issue, and a retry on a network timeout may create a duplicate charge if an idempotency key isn't used.
Three gaps Celery's native tooling doesn't fill for vendor spend control
| Gap | What happens in practice | Celery's answer |
|---|---|---|
| No per-chord spend cap | A billing chord receives 3,000 customer IDs due to a data pipeline error. Celery dispatches all 3,000 charge tasks. Workers process them in parallel at whatever concurrency your pool supports. Stripe confirms each charge. The only limit is your Stripe account credit cap — not the $500 intended for the day's billing run. | None. Celery's Flower monitoring shows task states and throughput, not dollar spend on vendor calls. |
| No worker-level revoke by vendor | You call app.control.revoke(task_id) to stop pending tasks. Tasks already executing on workers may have sent vendor API calls before the revoke signal arrived. Rotating the Stripe secret to stop in-flight calls breaks every other Celery worker reading that environment variable. |
Celery's revoke signal stops queued tasks and can terminate running workers, but cannot recall vendor API calls already dispatched from executing tasks. |
| No per-call audit with task context | Celery's task events and Flower dashboard show success/failure rates and task IDs, but don't parse dollar amounts from Stripe responses or cross-reference Stripe Request-Id values with Celery task IDs in a queryable format. |
Task result backend (Redis/database) and Flower. No structured cost tracking, no Celery-task-to-Stripe-charge correlation. |
The chord risk: header tasks and simultaneous vendor calls
A Celery chord dispatches all header tasks to the broker simultaneously. The broker distributes them to available workers. On a Celery deployment with 50 workers at concurrency 4, that's 200 simultaneous task executions — 200 simultaneous Stripe API calls. The chord's callback runs when all header tasks complete.
A per-chord vault key with a dollar cap turns this into a bounded operation. The proxy enforces the cap atomically across all concurrent calls: once cumulative spend hits the limit, further calls return 429. Celery tasks that receive 429 raise exceptions — which autoretry_for will retry unless you explicitly exclude the 429 case. The correct pattern: don't retry on 429 (it means the cap was hit intentionally), and let the chord's callback handle partial completion.
The autoretry_for risk: network failures and duplicate charges
Celery's autoretry_for is designed for transient errors: Redis connection drops, temporary service unavailability. When applied to vendor API calls, it creates a subtle duplicate-charge risk:
- A
charge_customertask calls Stripe and the connection drops before the response arrives. - Celery sees an
APIConnectionErrorand schedules a retry perautoretry_for. - The retry calls Stripe again — but Stripe may have already processed the original request before the connection dropped.
- Without a stable idempotency key, Stripe creates a second charge.
A vault key proxy adds a circuit breaker: if the first call succeeded (recorded in the proxy audit log) and the retry is attempted, the proxy can enforce that the per-chord cap already accounts for the first call's cost. Retries that would exceed the cap are blocked. This doesn't replace idempotency keys — it complements them as a second safety layer.
Scoping vault keys per Celery chord
Issue the vault key before dispatching the chord, then pass it through the task signature using .s():
import httpx
from celery import Celery, chord
import stripe
import os
app = Celery("billing", broker="redis://localhost:6379/0")
def issue_vault_key(chord_id: str, budget_usd: float) -> str:
r = httpx.post(
"https://proxy.keybrake.com/vault/keys",
headers={"Authorization": f"Bearer {os.environ['KEYBRAKE_API_KEY']}"},
json={
"vendor": "stripe",
"daily_usd_cap": budget_usd,
"allowed_endpoints": ["POST /v1/payment_intents"],
"expires_in": "2h",
"agent_run_label": f"celery-billing/{chord_id}",
},
)
return r.json()["vault_key"]
@app.task(
autoretry_for=(stripe.error.APIConnectionError,),
retry_backoff=True,
max_retries=3,
)
def charge_customer(customer_id: str, amount_cents: int, vault_key: str) -> dict:
stripe.api_key = vault_key # scoped vault key
stripe.api_base = "https://proxy.keybrake.com/stripe"
return stripe.PaymentIntent.create(
amount=amount_cents,
currency="usd",
customer=customer_id,
idempotency_key=f"{customer_id}-{amount_cents}", # stable retry key
)
@app.task
def billing_complete(results: list) -> dict:
successful = sum(1 for r in results if r.get("status") == "succeeded")
return {"processed": len(results), "successful": successful}
def run_billing(customer_ids: list[str], amount_cents: int, budget_usd: float = 500.0):
import uuid
chord_id = str(uuid.uuid4())
vault_key = issue_vault_key(chord_id, budget_usd)
header = [
charge_customer.s(cid, amount_cents, vault_key)
for cid in customer_ids
]
callback = billing_complete.s()
chord(header)(callback)
The vault key is issued once and passed in every task signature. All chord header tasks share the same vault key and its per-chord cap. The real Stripe secret stays in Keybrake's environment, never in Celery worker environment variables. The audit log records each call with agent_run_label: "celery-billing/{chord_id}" — queryable by chord or by time window.
How Keybrake fits
Keybrake is the proxy layer between your Celery tasks and Stripe, Twilio, or Resend. You swap stripe.api_key for the vault key and set stripe.api_base to https://proxy.keybrake.com/stripe. The real Stripe secret stays in Keybrake, not in your Celery worker environment. Each chord dispatch gets its own vault key with its own dollar cap, endpoint allowlist, and expiry. Chord fan-outs that exceed the cap return 429s — catchable exceptions in your Celery tasks, not silent charges distributed across your worker pool.
Related questions
How should I handle the vault key in Celery Beat scheduled tasks?
For Celery Beat periodic tasks, issue a new vault key at the start of each beat execution. If your beat task dispatches a chord, follow the same pattern: issue the vault key in the orchestrator task that celery-beat runs, then pass it into the chord header tasks. Each scheduled run gets a fresh vault key with a fresh cap. This prevents spend from one run rolling over into the next, and gives you a clean audit trail per scheduled execution. If a beat task runs more frequently than expected (due to a misconfiguration or clock drift), each extra run gets its own vault key — and each key has its own cap, so you won't silently absorb extra spend from rogue runs.
Does passing the vault key through task signatures expose it in the broker?
The vault key string is serialized into the task message in the broker (Redis, RabbitMQ) just like any other task argument. This is equivalent to passing any secret through Celery — and the same caveats apply: ensure your broker transport is TLS-encrypted and access-controlled. The key advantage over the real API key is the vault key's properties: it has a short TTL, a dollar cap, and can be revoked from the Keybrake dashboard even after it's been distributed. The real Stripe secret never enters the broker at all.
Can I use the vault key with Celery's Canvas primitives besides chord — like group or chain?
Yes. group() dispatches tasks in parallel like a chord header without a callback — issue the vault key before dispatching the group and pass it in each task signature. chain() sequences tasks; issue the vault key before the chain and pass it through using .s(vault_key) in each step. For chain(), the vault key's cap accumulates across all steps in the chain — which is correct if the chain represents a single logical transaction. If steps in a chain represent independent operations with independent budgets, issue separate vault keys per step.
Further reading
- AI agent kill switch patterns — the four ways to stop a runaway agent and their real stop latencies, including distributed queue scenarios.
- AI agent audit trail schema — what belongs in a structured per-call log and the SQL queries that matter when reviewing a billing incident.
- Airflow AI agent API key — similar pattern for Airflow DAGs: per-run spend caps using vault keys with dynamic task mapping.
- AI agent API key rotation — why short-lived vault keys are better than rotation schedules for distributed agent workloads.