Apache Airflow Stripe Integration: Restricted API Keys, Spend Caps, and Agent Governance
Airflow's task retry is what makes billing pipelines recoverable from transient failures. It is also the mechanism most likely to silently fire duplicate Stripe charges when a downstream step fails after the charge already went through.
Apache Airflow is the most widely deployed workflow orchestrator in data engineering, and it increasingly appears as the orchestration backbone for AI agent pipelines — especially at companies that already run Airflow for ETL and want to extend it to LLM-powered automation. When those pipelines involve Stripe — usage-based billing, subscription renewals, batch invoice runs — Airflow's retry semantics and backfill mechanics introduce billing failure modes that are not obvious from the DAG definition alone.
This post covers three failure modes specific to Airflow's architecture: task-level retry re-running a completed charge, DAG backfill and task clearing replaying billing for already-charged customers, and dynamic task mapping with expand() creating concurrent billing fan-out with no cross-task deduplication. Each section includes Python TaskFlow API code and the governance pattern that closes it — content-hash idempotency keys at the Stripe layer and per-run vault keys via the Keybrake proxy at the key-management layer.
Failure mode 1: Task retry re-fires completed Stripe charges on downstream failure
Airflow's task retry mechanism is configured per task with the retries parameter (default 0; commonly set to 3 for production pipelines). When a task fails — raises any exception — Airflow schedules a retry after retry_delay. The retry re-runs the entire Python callable from the top. There is no checkpoint inside the callable: if stripe.charges.create() succeeded and then the next line raised an exception, Airflow has no way to know the charge was already created. The retry starts from line 1 of the callable. Without an idempotency key, stripe.charges.create() fires again and Stripe creates a second charge object.
# billing_dag.py — UNSAFE: no idempotency key, task retries = duplicate charges
import stripe
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule="@monthly", start_date=datetime(2026, 1, 1))
def monthly_billing():
@task(retries=3, retry_delay=timedelta(minutes=5))
def charge_customer(customer_id: str, amount_cents: int, billing_period: str):
stripe.api_key = Variable.get("STRIPE_SECRET_KEY")
# Charge succeeds — Stripe returns ch_A
charge = stripe.charges.create(
amount=amount_cents,
currency="usd",
customer=customer_id,
description=f"Subscription {billing_period}",
# No idempotency_key — every retry = new Stripe charge
)
# If this downstream write fails, Airflow retries the entire task.
# stripe.charges.create() fires again on retry 1 → ch_B
# Again on retry 2 → ch_C. Three retries = four total charges.
update_billing_record(customer_id, charge["id"], billing_period)
return charge["id"]
monthly_billing()
The failure sequence: stripe.charges.create() returns ch_A. update_billing_record() raises a DatabaseConnectionError. The task fails. Airflow waits retry_delay and schedules retry 1. Retry 1 runs the callable from the top. stripe.charges.create() fires again — Stripe creates ch_B. If the database error persists, retry 2 creates ch_C, retry 3 creates ch_D. The task eventually reaches its retry limit and marks the task instance as failed. The Airflow UI shows a failed task with three retries — not four charges on the customer's account. The duplicate charges are only visible in the Stripe dashboard, where four charge objects appear within the retry window.
This failure mode is especially common in Airflow billing pipelines because the most frequent failure point is not the Stripe API itself — Stripe has ~99.99% uptime — but the downstream database or CRM write that records the charge. A slow database, a connection pool exhaustion, or a schema migration running in parallel all trigger a task failure after the charge has already been created successfully.
The fix: content-hash idempotency key + vault key via proxy
The idempotency key must be computed from parameters that are stable across all retries of the same billing operation. A UUID generated inside the task callable would produce a different value on each retry. The key must derive from the billing parameters themselves — customer ID, amount, and billing period — so that every retry of the same billing operation produces the same key, and Stripe deduplicates them into the original charge.
# billing_dag.py — SAFE: content-hash idempotency key + vault key per run
import hashlib
import stripe
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime, timedelta
def billing_idempotency_key(customer_id: str, amount_cents: int, billing_period: str) -> str:
raw = f"{customer_id}:{amount_cents}:{billing_period}:airflow-billing"
return hashlib.sha256(raw.encode()).hexdigest()[:32]
@dag(schedule="@monthly", start_date=datetime(2026, 1, 1))
def monthly_billing():
@task(retries=3, retry_delay=timedelta(minutes=5))
def charge_customer(customer_id: str, amount_cents: int, billing_period: str,
vault_key: str) -> str:
# vault_key: POST /v1/charges only, $amount_cents+10% daily cap, expires 24h
# Passed as a task parameter — not read from shared env var
stripe_client = stripe.StripeClient(
vault_key,
base_url="https://proxy.keybrake.com/stripe",
)
idempotency_key = billing_idempotency_key(customer_id, amount_cents, billing_period)
# Safe to retry — same key returns original charge on all subsequent calls
charge = stripe_client.charges.create(
params={
"amount": amount_cents,
"currency": "usd",
"customer": customer_id,
"description": f"Subscription {billing_period}",
"metadata": {"billing_period": billing_period},
},
options={"idempotency_key": idempotency_key},
)
# Downstream failures no longer produce duplicate charges
update_billing_record(customer_id, charge.id, billing_period)
return charge.id
monthly_billing()
The idempotency key is a SHA-256 hash of "{customer_id}:{amount_cents}:{billing_period}:airflow-billing". This string is identical on every retry of the same task instance — the parameters are fixed by the time the task callable executes. Stripe deduplicates any request with the same idempotency key within 24 hours, returning the original charge object without creating a new one. If the downstream write still fails after all retries, the task is marked failed and the charge is not duplicated. The vault_key is passed as a task parameter rather than read from a shared Variable, so each billing run can carry a key scoped to that specific customer and amount.
Failure mode 2: DAG backfill and task clearing replay billing for already-charged customers
Airflow has two operations that re-run completed or failed tasks without user intervention: backfill (airflow dags backfill or the Backfill button in Airflow 2.9+) and task clearing (airflow tasks clear, or the Clear button in the UI). Both operations are legitimate and commonly used — backfill to process historical data intervals, clearing to retry a downstream task that failed after billing succeeded. The problem is that both operations re-run the cleared tasks from scratch, regardless of whether they previously succeeded. A billing task that already charged a customer and wrote the record will run again, see no guard against re-charging, and call stripe.charges.create() a second time.
The backfill scenario is the more dangerous of the two. An engineer notices that last month's billing DAG run failed partway through. They run a backfill for the missed execution_date range. The backfill creates new task instances for every date in the range and runs them in sequence. If the original failure was in the downstream CRM write rather than the Stripe call, the new task instances re-fire the Stripe charge for every customer in each missed date range. A monthly billing backfill covering six months re-charges every customer six times.
# The unsafe pattern that makes backfill dangerous:
@task(retries=2)
def charge_customer_unsafe(customer_id: str, amount_cents: int, billing_period: str):
stripe.api_key = Variable.get("STRIPE_SECRET_KEY")
# No pre-flight check. No idempotency key.
# airflow dags backfill --start-date 2026-01-01 --end-date 2026-06-01
# runs this task for Jan, Feb, Mar, Apr, May, Jun.
# Each run calls stripe.charges.create() without deduplication.
# Result: 6 charges per customer instead of 6 separate monthly charges.
charge = stripe.charges.create(
amount=amount_cents,
currency="usd",
customer=customer_id,
)
update_billing_record(customer_id, charge["id"], billing_period)
return charge["id"]
The task clearing scenario is subtler. An engineer clears a downstream task — say, a CRM sync that failed — and Airflow's cascade clear re-runs all upstream tasks that the downstream task depends on. If the billing task is upstream of the CRM sync, it runs again. The engineer intended to re-run only the CRM sync; Airflow re-runs billing too.
The fix adds a pre-flight check using a read-only audit vault key before the billing step fires. The pre-flight queries Stripe for existing charges on the customer with the same billing_period in their metadata. If one is found, the task returns the existing charge ID and skips billing. The audit vault key is configured with allowed_endpoints: ["GET /v1/charges"] and a zero-dollar cap, so even if the pre-flight itself retries multiple times, it can never create a charge.
# billing_dag.py — SAFE: pre-flight check guards against backfill and task clearing
import hashlib
import stripe
from airflow.decorators import dag, task
from airflow.exceptions import AirflowSkipException
from datetime import datetime, timedelta
def billing_idempotency_key(customer_id: str, amount_cents: int, billing_period: str) -> str:
raw = f"{customer_id}:{amount_cents}:{billing_period}:airflow-billing"
return hashlib.sha256(raw.encode()).hexdigest()[:32]
@dag(schedule="@monthly", start_date=datetime(2026, 1, 1))
def monthly_billing():
@task(retries=2)
def check_existing_charge(customer_id: str, billing_period: str,
audit_vault_key: str) -> str | None:
# audit_vault_key: GET /v1/charges only, $0/day cap — cannot create charges
audit_client = stripe.StripeClient(
audit_vault_key,
base_url="https://proxy.keybrake.com/stripe",
)
charges = audit_client.charges.list(params={"customer": customer_id, "limit": 10})
for ch in charges.data:
if (ch.metadata.get("billing_period") == billing_period
and ch.status == "succeeded"):
return ch.id
return None
@task(retries=2)
def charge_customer(customer_id: str, amount_cents: int, billing_period: str,
vault_key: str, existing_charge_id: str | None) -> str:
if existing_charge_id:
# Already charged — backfill or task clear triggered a re-run.
# Skip silently rather than raising — marks task as Skipped, not Failed.
raise AirflowSkipException(
f"Customer {customer_id} already charged for {billing_period}: "
f"{existing_charge_id}"
)
stripe_client = stripe.StripeClient(
vault_key,
base_url="https://proxy.keybrake.com/stripe",
)
idempotency_key = billing_idempotency_key(customer_id, amount_cents, billing_period)
charge = stripe_client.charges.create(
params={
"amount": amount_cents,
"currency": "usd",
"customer": customer_id,
"description": f"Subscription {billing_period}",
"metadata": {"billing_period": billing_period},
},
options={"idempotency_key": idempotency_key},
)
update_billing_record(customer_id, charge.id, billing_period)
return charge.id
# DAG wiring: check_existing_charge → charge_customer
existing = check_existing_charge(
customer_id="{{ dag_run.conf['customer_id'] }}",
billing_period="{{ dag_run.conf['billing_period'] }}",
audit_vault_key="{{ dag_run.conf['audit_vault_key'] }}",
)
charge_customer(
customer_id="{{ dag_run.conf['customer_id'] }}",
amount_cents="{{ dag_run.conf['amount_cents'] }}",
billing_period="{{ dag_run.conf['billing_period'] }}",
vault_key="{{ dag_run.conf['vault_key'] }}",
existing_charge_id=existing,
)
monthly_billing()
Using AirflowSkipException rather than returning silently marks the task as Skipped in the UI with a visible reason, which makes it obvious to the engineer investigating a backfill run why the billing task did not fire. A downstream task with trigger_rule=TriggerRule.ALL_DONE will still execute, so CRM sync and notification tasks continue normally even when billing is skipped.
Failure mode 3: Dynamic task mapping with expand() creates concurrent billing fan-out
Airflow 2.3+ introduced dynamic task mapping — the ability to generate task instances from a list at runtime using the .expand() API. For billing pipelines, this is a natural pattern: fetch a list of customers, map a billing task over each one, and run them in parallel up to Airflow's concurrency limit. The failure mode emerges when the upstream task producing the customer list returns duplicates, or when the mapped task set is partially cleared and re-run while other instances are still executing.
# UNSAFE: expand() fan-out with no deduplication guard
from airflow.decorators import dag, task
from datetime import datetime
import stripe
@dag(schedule="@monthly", start_date=datetime(2026, 1, 1))
def batch_billing():
@task
def get_customers(billing_period: str) -> list[dict]:
# A JOIN with subscribers + active_plans can return duplicates
# if a customer appears in both tables for the same billing period.
return db.query("""
SELECT s.customer_id, s.amount_cents
FROM subscribers s
JOIN active_plans p ON s.plan_id = p.id
WHERE p.billing_period = %s
""", [billing_period])
@task(retries=2)
def charge_one(customer: dict, billing_period: str) -> str:
stripe.api_key = Variable.get("STRIPE_SECRET_KEY") # shared key, no per-run cap
# If get_customers() returns cus_abc twice, two instances of charge_one
# run concurrently with identical parameters. Both reach stripe.charges.create()
# before either result is returned. Customer charged twice.
charge = stripe.charges.create(
amount=customer["amount_cents"],
currency="usd",
customer=customer["customer_id"],
)
return charge["id"]
customers = get_customers(billing_period="{{ dag_run.conf['billing_period'] }}")
# .expand() spawns one task instance per item — including duplicate customer_ids
charge_one.expand(customer=customers).partial(
billing_period="{{ dag_run.conf['billing_period'] }}"
)
batch_billing()
The expanded task instances run concurrently up to the DAG's max_active_tasks limit. If the customer list contains a duplicate — from a JOIN without DISTINCT, from an upstream data quality issue, or from a race where a customer is added to the list while the query is executing — two task instances run with the same customer ID. Both call stripe.charges.create() simultaneously. Stripe receives two requests from two concurrent API calls with no shared idempotency key. Both succeed. The customer is charged twice.
The safe pattern adds three guards: a DISTINCT on the customer query, a content-hash idempotency key derived from customer parameters, and per-customer vault keys passed in the task payload rather than read from a shared Airflow Variable.
# SAFE: expand() fan-out with deduplication and per-customer vault keys
import hashlib
import stripe
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime, timedelta
def billing_idempotency_key(customer_id: str, amount_cents: int, billing_period: str) -> str:
raw = f"{customer_id}:{amount_cents}:{billing_period}:airflow-billing"
return hashlib.sha256(raw.encode()).hexdigest()[:32]
@dag(schedule="@monthly", start_date=datetime(2026, 1, 1))
def batch_billing():
@task
def get_customers(billing_period: str) -> list[dict]:
# DISTINCT ensures no customer appears twice even with JOIN duplicates
rows = db.query("""
SELECT DISTINCT s.customer_id, s.amount_cents, s.vault_key
FROM subscribers s
JOIN active_plans p ON s.plan_id = p.id
WHERE p.billing_period = %s
""", [billing_period])
return rows
@task(retries=2)
def charge_one(customer: dict, billing_period: str) -> str:
customer_id = customer["customer_id"]
amount_cents = customer["amount_cents"]
vault_key = customer["vault_key"] # per-customer: POST /v1/charges only, daily cap
# vault_key from per-customer Keybrake vault, not a shared env var
stripe_client = stripe.StripeClient(
vault_key,
base_url="https://proxy.keybrake.com/stripe",
)
idempotency_key = billing_idempotency_key(customer_id, amount_cents, billing_period)
# Even if two task instances fire for the same customer (partial clear re-run),
# Stripe deduplicates on the idempotency key — only one charge is created.
charge = stripe_client.charges.create(
params={
"amount": amount_cents,
"currency": "usd",
"customer": customer_id,
"description": f"Subscription {billing_period}",
"metadata": {"billing_period": billing_period},
},
options={"idempotency_key": idempotency_key},
)
return charge.id
customers = get_customers(billing_period="{{ dag_run.conf['billing_period'] }}")
charge_one.expand(customer=customers).partial(
billing_period="{{ dag_run.conf['billing_period'] }}"
)
batch_billing()
Per-customer vault keys mean that even in the worst case — two task instances fire for the same customer — the idempotency key at the Stripe layer deduplicates the charge. The second request arrives at Stripe, matches the existing charge by idempotency key, and returns the original charge object without creating a new one. The billing record write receives the same charge ID twice and can handle it as a no-op. No money leaves the customer's account a second time. If the downstream database write still fails after retries, the task fails cleanly and the pre-flight check in subsequent runs catches the existing charge.
Approach comparison
| Approach | Retry safe? | Backfill safe? | Fan-out dedup? | Per-run spend cap? | Audit log? | Key revoke? |
|---|---|---|---|---|---|---|
| Bare Stripe key in Variable | No | No | No | No | No | Rotate only |
| UUID idempotency key (per-run) | No — changes on retry | No | No | No | No | Rotate only |
| Content-hash idempotency key | Yes | Partial — only within 24h Stripe window | Yes | No | No | Rotate only |
| Restricted Stripe key (direct) | No — still duplicates without idem key | No | No | No (Stripe restricts endpoints, not spend) | No | Rotate only |
| Pre-flight check + idem key | Yes | Yes — catches expired 24h window | Yes | No | No | Rotate only |
| Vault key via Keybrake proxy | Yes | Yes | Yes | Yes — per-customer daily cap | Yes | Instant, no rotation needed |
Gap analysis
Airflow's execution_date is not a deduplication key
A common pattern is to use {{ ds }} (the DAG run's execution date) as part of an idempotency key or a billing period identifier. This works correctly for scheduled DAG runs, but breaks for manually triggered runs: two manual triggers on the same day share the same execution_date. If both reach stripe.charges.create() with the same execution-date-based key, Stripe deduplicates them — but the intent was two separate billing operations. Use an explicit billing_period parameter passed via dag_run.conf rather than deriving it from Airflow's scheduling metadata.
TaskGroup retry scope is the entire group
When billing tasks are wrapped in a TaskGroup, Airflow's UI Clear button on the group clears all tasks in the group, including ones that succeeded. If your TaskGroup contains both the billing task and a downstream notification task, clearing the group to retry the notification re-runs billing too. Either keep billing in its own TaskGroup separate from downstream tasks, or use trigger_rule=TriggerRule.ALL_DONE on downstream tasks so they run even when billing is skipped.
SLA miss callbacks and failure callbacks trigger on the task thread
Airflow's on_failure_callback and sla_miss_callback run in the same process as the task. If your failure callback sends a notification that itself calls a Stripe API (e.g., creating an invoice note), and that callback raises an exception, Airflow may re-run the callback as part of the retry sequence, creating another Stripe call per retry. Keep failure callbacks free of Stripe API calls; use the audit vault key read path for any post-failure charge lookups.
Celery worker concurrency compounds expand() duplication risk
In a Celery-backed Airflow deployment, expand() tasks are distributed across workers. If two workers pick up instances for the same customer ID — from a list that contains duplicates — they may both call Stripe before either result is returned to the scheduler. The content-hash idempotency key is the only guard at that point; the Keybrake proxy's per-customer spend cap provides the second layer in case the idempotency key misses (e.g., the 24-hour window expired on a monthly billing cycle).
Pytest enforcement suite
import hashlib
import pytest
from unittest.mock import MagicMock, patch, call
def billing_idempotency_key(customer_id: str, amount_cents: int, billing_period: str) -> str:
raw = f"{customer_id}:{amount_cents}:{billing_period}:airflow-billing"
return hashlib.sha256(raw.encode()).hexdigest()[:32]
def test_idempotency_key_stable_across_retries():
k1 = billing_idempotency_key("cus_abc", 2999, "2026-06")
k2 = billing_idempotency_key("cus_abc", 2999, "2026-06")
assert k1 == k2, "Key must be identical across retries"
def test_idempotency_key_differs_by_period():
k_june = billing_idempotency_key("cus_abc", 2999, "2026-06")
k_july = billing_idempotency_key("cus_abc", 2999, "2026-07")
assert k_june != k_july, "Different billing periods must produce different keys"
def test_idempotency_key_differs_by_customer():
k1 = billing_idempotency_key("cus_abc", 2999, "2026-06")
k2 = billing_idempotency_key("cus_def", 2999, "2026-06")
assert k1 != k2, "Different customers must produce different keys"
@patch("stripe.StripeClient")
def test_charge_uses_vault_key_not_env_var(mock_client_class):
mock_client = MagicMock()
mock_client_class.return_value = mock_client
mock_client.charges.create.return_value = MagicMock(id="ch_test_abc")
vault_key = "vault_key_per_customer_xyz"
from billing_dag import charge_one
charge_one.function(
customer={"customer_id": "cus_abc", "amount_cents": 2999, "vault_key": vault_key},
billing_period="2026-06",
)
mock_client_class.assert_called_once_with(
vault_key, base_url="https://proxy.keybrake.com/stripe"
)
@patch("stripe.StripeClient")
def test_skip_when_existing_charge_found(mock_client_class):
from airflow.exceptions import AirflowSkipException
from billing_dag import charge_customer
with pytest.raises(AirflowSkipException):
charge_customer.function(
customer_id="cus_abc",
amount_cents=2999,
billing_period="2026-06",
vault_key="vault_key_xyz",
existing_charge_id="ch_existing_123", # pre-flight found this
)
FAQ
Can I use the DAG run ID as an idempotency key?
No. Each Airflow DAG run gets a unique run ID generated at trigger time (manual__2026-06-21T10:00:00+00:00). On retry, Airflow creates a new task instance within the same run — the run ID is stable within a run, but a backfill creates an entirely new run with a new ID. If you use the run ID as the idempotency key, a backfill run produces a different key from the original run and Stripe treats it as a new charge. Use a content hash derived from billing parameters instead.
Does Airflow's task-level try_number help?
Airflow exposes context['ti'].try_number inside a task. It starts at 1 and increments on each retry. Some teams use f"{customer_id}:{billing_period}:{try_number}" as an idempotency key. This is worse than no key at all: it generates a unique key per retry, guaranteeing that every retry produces a new charge. The idempotency key must be the same across all retries — it is not a versioning mechanism, it is a deduplication key.
What if the customer is billed twice before the pre-flight check catches it?
The content-hash idempotency key at the Stripe layer catches concurrent duplicates within Stripe's 24-hour deduplication window — two task instances with the same key hitting Stripe simultaneously will result in only one charge. The pre-flight check catches the case where the idempotency window has expired (more than 24 hours between the original charge and the backfill retry). In practice, both layers together mean the customer can never be charged twice for the same billing period, regardless of how many times the task is run or how far apart the runs are.
How do vault keys work with Airflow Variables?
The per-customer vault key should come from your application database (or a secrets manager) at orchestrator time — when you build the customer list — and be passed into the task via the task's input parameters. Do not store per-customer vault keys in Airflow Variables: Variables are global to the Airflow instance, not scoped per run, and fetching a Variable inside a task creates a tight coupling between the Airflow metadata database and your billing security model. Pass vault keys as values in the task payload (via dag_run.conf for single-customer runs, or as a field in the per-customer dict for expand() fan-out).
Does the Keybrake proxy add latency to Stripe calls?
The proxy adds approximately 2-8ms of forwarding overhead — well within Stripe's own response time variance (typically 50-200ms for charges.create). For Airflow billing pipelines where each task typically takes 200-500ms including database writes, the proxy overhead is not measurable in end-to-end DAG execution time. Keybrake runs on the same factory VPS network as the Caddy frontend, with a co-located SQLite audit log, so there is no cross-region hop for the audit write.
What happens to the vault key if the DAG run fails mid-way through a batch?
Vault keys issued by Keybrake have an expires_at timestamp — typically 24-48 hours for a billing run. If the DAG run fails and is re-run the next day, the vault keys in the original dag_run.conf may have expired. Issue fresh vault keys at the start of each DAG run (in a fetch_customers task that also calls the Keybrake key-issue API), and pass them through the task payload. The idempotency key still protects against duplicate charges if the customer was already billed in the failed run — the fresh vault key is just for the new attempt.
Stop handing your Airflow DAGs a bare Stripe key
Keybrake issues per-run vault keys with endpoint allowlists, daily spend caps, and a full audit log — so a backfill, a retry storm, or an expand() duplicate can never charge a customer twice or exhaust your Stripe balance. One URL change in your StripeClient initializer.