Dagster Stripe Integration: Restricted API Keys, Spend Caps, and Agent Governance
Dagster's RetryPolicy is the idiomatic way to make a billing op resilient. It is also the most reliable mechanism for silently creating duplicate Stripe charges when any step downstream of the charge fails.
Dagster is the asset-native data orchestration platform used by engineering teams who model their pipelines as graphs of data assets — tables, ML models, reports — rather than task sequences. When those pipelines touch Stripe — usage-based billing runs, subscription renewals, invoicing — Dagster's retry, partitioning, and re-execution primitives introduce billing failure modes that do not appear when testing against Stripe's sandbox and are invisible in Dagster's event log until a customer disputes a charge.
This post covers three failure modes specific to Dagster's architecture: RetryPolicy re-running a billing op that already charged, a partitioned asset backfill creating concurrent independent Stripe calls for each partition, and Dagster's run re-execution feature re-running a billing op that succeeded before the run failed. Each section includes Python code and the governance pattern that closes it — content-hash idempotency keys at the Stripe layer and per-op vault keys via the Keybrake proxy at the key-management layer. A gap analysis closes the post with four additional Dagster-specific edge cases not covered by the main three.
Failure mode 1: RetryPolicy re-runs billing op on downstream failure
Dagster's RetryPolicy is configured at the op or asset level and specifies how many times Dagster should retry the op on failure, with optional delay and backoff. The intent for a billing op is to retry on transient infrastructure failures — a database write timeout, a network hiccup calling an internal service. The problem is that Dagster retries the entire op callable from line 1: there is no checkpoint within an op, and no awareness of what side effects succeeded before the exception that triggered the retry.
# billing.py — UNSAFE: RetryPolicy fires duplicate charges on any failure after charge
from dagster import op, RetryPolicy, Backoff
import stripe
import os
stripe.api_key = os.environ["STRIPE_SECRET_KEY"] # shared across all ops in the process
@op(
retry_policy=RetryPolicy(
max_retries=3,
delay=30,
backoff=Backoff.EXPONENTIAL,
)
)
def charge_customer(context, customer_id: str, amount_cents: int, billing_month: str) -> str:
# Charge succeeds — Stripe returns ch_A
charge = stripe.charges.create(
amount=amount_cents,
currency="usd",
customer=customer_id,
description=f"Subscription {billing_month}",
# No idempotency_key — every retry = new Stripe charge
)
# If this write raises an exception, RetryPolicy fires.
# Retry 1: stripe.charges.create() runs again → ch_B
# Retry 2: → ch_C. Retry 3: → ch_D. Four charges total.
save_charge_to_warehouse(customer_id, charge["id"], billing_month)
context.log.info(f"Charged {customer_id}: {charge['id']}")
return charge["id"]
The failure sequence: stripe.charges.create() returns ch_A. save_charge_to_warehouse() raises a DatabaseTimeoutError. Dagster catches it, applies the backoff, and re-runs the op from the start. On retry 1, stripe.charges.create() fires again — Stripe has no link to the prior request — and creates ch_B. With max_retries=3 and a persistent database issue, the customer is charged four times. The Dagster event log shows three step failure events and a step retry. The duplicate charges appear only in the Stripe dashboard.
This pattern is especially dangerous because the most reliable failure point is not Stripe — Stripe's API is far more stable than a data warehouse write or an internal RPC. Any transient error in the recording step triggers the retry chain, and because the Stripe call is the first line of the op (before the write), every retry re-fires it. Teams often discover the bug only when a customer emails about being charged four times for a billing run that the Dagster UI correctly shows as "Succeeded."
The fix: content-hash idempotency key + vault key via proxy
The idempotency key must be derived from the billing parameters themselves — not generated with uuid4() at op entry, which produces a different value on every retry. A SHA-256 hash of (customer_id, amount_cents, billing_month) is stable across every retry of the same billing op, so Stripe deduplicates all retries into the original ch_A regardless of how many times the RetryPolicy fires.
# billing.py — SAFE: content-hash idempotency key + vault key per op run
import hashlib
from dagster import op, RetryPolicy, Backoff, OpExecutionContext
import stripe
def billing_idempotency_key(customer_id: str, amount_cents: int, billing_month: str) -> str:
raw = f"{customer_id}:{amount_cents}:{billing_month}:dagster-billing"
return hashlib.sha256(raw.encode()).hexdigest()[:32]
@op(
retry_policy=RetryPolicy(
max_retries=3,
delay=30,
backoff=Backoff.EXPONENTIAL,
)
)
def charge_customer(
context: OpExecutionContext,
customer_id: str,
amount_cents: int,
billing_month: str,
vault_key: str, # scoped to POST /v1/charges only, daily cap = amount + 10%
) -> str:
stripe_client = stripe.StripeClient(
vault_key,
base_url="https://proxy.keybrake.com/stripe",
)
idempotency_key = billing_idempotency_key(customer_id, amount_cents, billing_month)
# Same key on every retry — Stripe returns ch_A on all retries after the first
charge = stripe_client.charges.create(
params={
"amount": amount_cents,
"currency": "usd",
"customer": customer_id,
"description": f"Subscription {billing_month}",
"metadata": {"billing_month": billing_month},
},
options={"idempotency_key": idempotency_key},
)
save_charge_to_warehouse(customer_id, charge.id, billing_month)
context.log.info(f"Charged {customer_id}: {charge.id}")
return charge.id
With this pattern, every retry of the same op produces the same idempotency key. Stripe sees retries 1 through 3, recognizes the key, and returns the original ch_A without creating a new charge. The vault_key is passed as an op input rather than read from a module-level environment variable, which closes the key-isolation problem: each billing run gets its own key scoped to that run's expected charge amount.
Failure mode 2: Partitioned asset backfill creates concurrent Stripe calls per partition
Dagster's partitions system is one of its most powerful features: you define a PartitionsDefinition on an asset, and Dagster can materialize each partition — a day, a month, a customer segment — independently and concurrently. For a billing asset partitioned by month, materializing the last six months in a backfill dispatches six independent materializations that run in parallel. The problem is that each partition's materialization op is a fully independent process with no awareness of the other partitions running simultaneously.
# UNSAFE: partitioned billing asset without cross-partition dedup
from dagster import asset, MonthlyPartitionsDefinition, AssetExecutionContext
import stripe
monthly_partitions = MonthlyPartitionsDefinition(start_date="2026-01-01")
@asset(partitions_def=monthly_partitions)
def monthly_billing(context: AssetExecutionContext):
billing_month = context.partition_key # e.g., "2026-06-01"
customers = get_billable_customers(billing_month)
for customer in customers:
# When a 6-month backfill runs, 6 partition materializations execute concurrently.
# If the same customer appears in two adjacent monthly cohorts during the backfill
# (e.g., sign-up date straddles partition boundary), both partitions charge them.
# No cross-partition lock or dedup — each materialization is fully isolated.
stripe.charges.create(
amount=customer["amount_cents"],
currency="usd",
customer=customer["id"],
# No idempotency_key — two concurrent partitions = two concurrent charges
)
The concurrency risk is highest during backfills — when you trigger materialization of multiple historical partitions at once. In production single-partition runs, each partition materializes independently on its own schedule, so collisions are less likely (though not impossible — a sensor trigger and a scheduled run can overlap on the same partition). But when an engineer runs a 3-month billing backfill after a bug fix, or when Dagster's daemon automatically retries a failed partition alongside a newly-scheduled run for the same window, two materializations can execute concurrently against the same customer list.
The fix requires both an idempotency key that incorporates the partition key, and per-partition vault keys issued at backfill dispatch time with spend caps sized for that partition's expected total charge volume.
# SAFE: idempotency key scoped to partition + per-partition vault key
from dagster import asset, MonthlyPartitionsDefinition, AssetExecutionContext
import hashlib
import stripe
monthly_partitions = MonthlyPartitionsDefinition(start_date="2026-01-01")
def partition_billing_key(customer_id: str, amount_cents: int, partition_key: str) -> str:
raw = f"{customer_id}:{amount_cents}:{partition_key}:dagster-monthly-billing"
return hashlib.sha256(raw.encode()).hexdigest()[:32]
@asset(
partitions_def=monthly_partitions,
required_resource_keys={"vault_keys"},
)
def monthly_billing(context: AssetExecutionContext):
billing_month = context.partition_key # e.g., "2026-06-01"
# vault_key scoped to POST /v1/charges only, daily cap = total expected partition charges
# issued by dispatch job; stored in context.resources.vault_keys[billing_month]
vault_key = context.resources.vault_keys.get(billing_month)
stripe_client = stripe.StripeClient(
vault_key,
base_url="https://proxy.keybrake.com/stripe",
)
customers = get_billable_customers(billing_month)
for customer in customers:
idempotency_key = partition_billing_key(
customer["id"], customer["amount_cents"], billing_month
)
# Two concurrent partitions for the same billing_month produce the same key
# → Stripe deduplicates; only one charge created per customer per month
charge = stripe_client.charges.create(
params={
"amount": customer["amount_cents"],
"currency": "usd",
"customer": customer["id"],
"description": f"Subscription {billing_month}",
"metadata": {"billing_month": billing_month},
},
options={"idempotency_key": idempotency_key},
)
save_charge(customer["id"], charge.id, billing_month)
context.log.info(f"Charged {customer['id']} for {billing_month}: {charge.id}")
Including the partition key in the idempotency hash means two concurrent materializations of the same partition produce the same idempotency keys for the same customers — Stripe deduplicates them. Materializations of different partitions (different months) produce different keys, so each billing month still creates its own charges. The vault key's daily cap on the partition's total expected volume provides the backstop: even if a bug causes a single partition to loop, it cannot charge more than the cap across all customers in that partition.
Failure mode 3: Run re-execution re-runs billing ops that already charged
Dagster's "Re-execute from failure" feature is designed to save time: instead of re-running an entire pipeline after a downstream step fails, it skips the steps that succeeded and re-runs only the failed step and its downstream dependencies. For most pipeline ops — data transformations, model training, report generation — this is safe. For billing ops, the failure scenario that triggers re-execution is precisely the scenario where the charge already succeeded.
# Timeline that makes re-execution dangerous:
#
# 1. charge_customer op runs:
# → stripe.charges.create() succeeds, ch_A created ✓
# → save_charge_to_warehouse() raises DatabaseConnectionError ✗
# → Dagster marks charge_customer FAILED
# → RetryPolicy exhausted (or not configured)
#
# 2. Engineer opens Dagster UI → sees the run failed at charge_customer
# 3. Engineer clicks "Re-execute from failure" (selects charge_customer as start)
# 4. charge_customer re-runs from line 1:
# → stripe.charges.create() fires again — no idempotency key → ch_B created
# → save_charge_to_warehouse() succeeds this time
# → Dagster marks charge_customer SUCCEEDED
#
# Net result: customer charged twice. Dagster shows one success. Stripe shows two charges.
# The re-execution is invisible in the original run's event log — only visible in re-run.
The same failure sequence occurs when using the Dagster API to programmatically trigger re-runs: client.reexecute_pipeline_run(run_id, reexecution_params=ReexecutionParams(reexecute_from_failure=True)) will re-run the charge op if it was the step that failed. This pattern is common in automated failure-handling pipelines where a sensor monitors run events and automatically re-triggers failed runs — effectively auto-retrying every failed op, including billing ops that charged before failing.
The fix is the same content-hash idempotency key, but here the pre-flight check is especially valuable. A pre-flight check using a read-only audit vault key catches re-executions where the original run did not use an idempotency key (a common migration scenario when retrofitting governance to an existing pipeline), and protects against re-execution outside Stripe's 24-hour idempotency window.
# SAFE: idempotency key + pre-flight check protects re-execution scenarios
from dagster import op, RetryPolicy, OpExecutionContext
import hashlib
import stripe
def billing_idempotency_key(customer_id: str, amount_cents: int, billing_month: str) -> str:
raw = f"{customer_id}:{amount_cents}:{billing_month}:dagster-billing"
return hashlib.sha256(raw.encode()).hexdigest()[:32]
@op(retry_policy=RetryPolicy(max_retries=2, delay=60))
def charge_customer(
context: OpExecutionContext,
customer_id: str,
amount_cents: int,
billing_month: str,
vault_key: str, # POST /v1/charges only, daily cap = amount + 10%
audit_vault_key: str, # GET /v1/charges only, $0 daily cap — cannot create charges
) -> str:
# Pre-flight: check if this customer was already charged this billing_month
# Catches: re-execution, re-delivery after crash, any scenario where charge succeeded
# but op failed before returning the charge ID
audit_client = stripe.StripeClient(
audit_vault_key,
base_url="https://proxy.keybrake.com/stripe",
)
existing = audit_client.charges.list(params={"customer": customer_id, "limit": 10})
for ch in existing.data:
if (ch.metadata.get("billing_month") == billing_month
and ch.status == "succeeded"):
context.log.info(
f"Pre-flight: existing charge {ch.id} found for {customer_id} / {billing_month}"
" — skipping charge (re-execution safe)"
)
save_charge_to_warehouse(customer_id, ch.id, billing_month)
return ch.id
# No existing charge — safe to proceed
stripe_client = stripe.StripeClient(
vault_key,
base_url="https://proxy.keybrake.com/stripe",
)
idempotency_key = billing_idempotency_key(customer_id, amount_cents, billing_month)
charge = stripe_client.charges.create(
params={
"amount": amount_cents,
"currency": "usd",
"customer": customer_id,
"description": f"Subscription {billing_month}",
"metadata": {"billing_month": billing_month},
},
options={"idempotency_key": idempotency_key},
)
save_charge_to_warehouse(customer_id, charge.id, billing_month)
context.log.info(f"Charged {customer_id} / {billing_month}: {charge.id}")
return charge.id
The two layers are complementary: the idempotency key handles re-runs within Stripe's 24-hour window (including all RetryPolicy retries, which happen within minutes), and the pre-flight check handles re-executions triggered days later when an engineer investigates a billing discrepancy and re-runs a historical pipeline. The audit vault key's zero-dollar cap ensures the pre-flight read itself cannot create a charge even if there is a logic bug in the pre-flight code.
Approach comparison
| Approach | Retry safe? | Re-execution safe? | Partition backfill safe? | Per-op spend cap? | Key revoke? | Audit log? |
|---|---|---|---|---|---|---|
| Module-level stripe.api_key | No | No | No | No | Rotate only (all ops) | No |
| UUID idempotency key | No — new UUID each retry | No | No | No | Rotate only | No |
| Content-hash idempotency key | Yes | Partial — only within 24h Stripe window | Yes — if partition key in hash | No | Rotate only | No |
| Stripe restricted key (direct) | No — still duplicates without idem key | No | No | No (endpoint only, not spend) | Rotate only | No |
| Pre-flight check + idem key | Yes | Yes — catches expired 24h window | Yes | No | Rotate only | No |
| Vault key via Keybrake proxy | Yes | Yes | Yes | Yes — per-op daily cap | Instant, no rotation needed | Yes |
Gap analysis
Dagster daemon run_retries auto-retry fires billing op again
Dagster's daemon can be configured with run_retries.max_retries in dagster.yaml: when a run fails, the daemon automatically re-submits it as a new run (not a re-execution from failure — a full fresh run from op 1). This is common in production deployments for transient infrastructure failures. If max_retries: 2 is configured and a billing run fails after the charge op succeeded, the daemon fires two complete re-runs of the entire pipeline, each re-running the billing op from scratch. The content-hash idempotency key closes this, but it is important to know that run_retries operates at the run level (not the op level) — the entire run re-executes, including all ops before the originally failed step. A single database failure downstream of a billing op can trigger up to three full pipeline runs via the daemon.
Sensor-triggered re-runs on failure events create auto-retry billing loops
A common Dagster pattern uses a run_status_sensor or run_failure_sensor to trigger compensating actions or automatic re-runs when a pipeline fails. If the sensor triggers a new run of the billing pipeline on failure (a reasonable pattern for data pipelines that should be retried after infrastructure issues), and if the billing op is in that pipeline, each sensor-triggered run fires the billing op again. Without an idempotency key and a pre-flight check, a persistent downstream failure (a flaky database that fails every other run) creates an automatic billing loop: sensor detects failure → triggers re-run → charge op fires → downstream fails → sensor detects failure → triggers re-run → charge op fires again. Ensure any auto-retry sensor excludes billing pipelines from automatic re-triggering, or confirm idempotency + pre-flight are in place before enabling it.
Multi-asset ops bill and record in one atomic unit — but Stripe is not transactional
Dagster's @multi_asset decorator lets you define an op that materializes multiple assets in a single callable — for example, an op that creates a Stripe charge AND updates your billing database in one step, yielding both as outputs. If the Stripe call succeeds but the database update raises an exception, the entire multi-asset op is marked as failed. Dagster re-runs the entire op callable on retry, including the Stripe call. This is the same failure mode as the single op case, but it is less obvious because the billing side effect appears to be "part of" an asset materialization rather than a standalone billing task. Apply the same idempotency key pattern within multi-asset ops; do not assume that Dagster's asset transaction semantics extend to external API calls.
Dynamic partitions created mid-backfill can overlap with existing partition runs
Dagster's DynamicPartitionsDefinition lets you add partition keys at runtime rather than defining them statically. If you add a new partition key (e.g., a new customer cohort) while a backfill is running, Dagster can trigger a materialization of the new partition concurrently with the in-progress backfill materializations. If multiple materializations land on the same customer — because the new dynamic partition overlaps with a static monthly partition — both fire Stripe charges for that customer independently. Dynamic partition definitions in billing assets require extra care: add idempotency keys that encode both the partition key and the customer ID, and ensure the vault key's daily cap is set conservatively to limit damage from unexpected partition overlaps.
Pytest enforcement suite
import hashlib
import pytest
from unittest.mock import MagicMock, patch
def billing_idempotency_key(customer_id: str, amount_cents: int, billing_month: str) -> str:
raw = f"{customer_id}:{amount_cents}:{billing_month}:dagster-billing"
return hashlib.sha256(raw.encode()).hexdigest()[:32]
def test_idempotency_key_stable_across_retries():
k1 = billing_idempotency_key("cus_abc", 2999, "2026-06-01")
k2 = billing_idempotency_key("cus_abc", 2999, "2026-06-01")
assert k1 == k2, "Key must be identical on every retry"
def test_idempotency_key_differs_by_partition():
k_june = billing_idempotency_key("cus_abc", 2999, "2026-06-01")
k_july = billing_idempotency_key("cus_abc", 2999, "2026-07-01")
assert k_june != k_july, "Different partitions must produce different keys"
def test_idempotency_key_differs_by_customer():
k1 = billing_idempotency_key("cus_abc", 2999, "2026-06-01")
k2 = billing_idempotency_key("cus_def", 2999, "2026-06-01")
assert k1 != k2, "Different customers must produce different keys"
@patch("stripe.StripeClient")
def test_charge_op_uses_vault_key_not_env_var(mock_client_class):
mock_client = MagicMock()
mock_client_class.return_value = mock_client
mock_client.charges.create.return_value = MagicMock(id="ch_test")
mock_client.charges.list.return_value = MagicMock(data=[])
from billing import charge_customer
from dagster import build_op_context
ctx = build_op_context()
vault_key = "vault_key_per_op_xyz"
result = charge_customer(ctx, "cus_abc", 2999, "2026-06-01", vault_key, "audit_key_xyz")
assert mock_client_class.call_args_list[0][0][0] == "audit_key_xyz"
assert mock_client_class.call_args_list[1][0][0] == vault_key
assert result == "ch_test"
@patch("stripe.StripeClient")
def test_pre_flight_skips_charge_on_reexecution(mock_client_class):
existing = MagicMock(id="ch_existing", status="succeeded")
existing.metadata = {"billing_month": "2026-06-01"}
audit_mock = MagicMock()
audit_mock.charges.list.return_value = MagicMock(data=[existing])
charge_mock = MagicMock()
mock_client_class.side_effect = [audit_mock, charge_mock]
from billing import charge_customer
from dagster import build_op_context
ctx = build_op_context()
result = charge_customer(ctx, "cus_abc", 2999, "2026-06-01", "vault_key", "audit_key")
assert result == "ch_existing"
charge_mock.charges.create.assert_not_called()
FAQ
Can I use the Dagster run ID or op execution ID as an idempotency key?
No. The Dagster run ID changes with every run — including run re-executions triggered by the daemon's run_retries, sensor-triggered re-runs, and engineer-initiated "Re-execute" actions. A content-hash of billing parameters is stable across all re-run scenarios. The op execution ID also changes on retry: each RetryPolicy retry increments the retry count and assigns a new step event key. Only parameters that describe the billing transaction itself — customer ID, amount, billing period — are stable across every possible re-run path.
Does Dagster's built-in "Re-execute from failure" skip the billing op if it succeeded?
"Re-execute from failure" skips ops that completed with a Success status. The billing op is only skipped if it reached the end of the callable and returned its output value without raising an exception. If the op raised an exception at any point — including after stripe.charges.create() returned successfully but before the op returned — Dagster marks the op as failed, and "Re-execute from failure" will re-run it. The failure scenario that triggers re-execution is precisely the one where stripe.charges.create() succeeded and the downstream step failed. This is why the pre-flight check is necessary: it catches the case where the op is classified as "failed" by Dagster even though the charge went through.
What is the recommended vault key scope for a partitioned billing asset?
Issue one vault key per partition materialization, scoped to POST /v1/charges only, with a daily USD cap equal to the sum of all expected charges in that partition plus a 10-15% buffer. For a monthly billing partition covering 1,000 customers at $29.99 each, the vault key cap should be approximately $33,000 (1,000 × $29.99 × 1.1). Issue the key immediately before the partition run starts — not at pipeline definition time — so the expiry window covers the expected materialization duration. For backfills covering multiple months, issue separate vault keys for each partition: a shared key across six partitions would need a cap of $198,000, which offers no per-partition protection.
How do I handle billing in a Dagster Software-Defined Asset with upstream dependencies?
In a Software-Defined Asset graph, a billing asset that depends on an upstream customer data asset will be re-materialized whenever the upstream asset is updated. If your billing asset reads the upstream asset and calls Stripe for each customer, a routine upstream data refresh can trigger unexpected billing runs. Gate the billing asset behind an explicit partition boundary or a manual-only materialization policy (AutoMaterializePolicy.none()) so it does not automatically fire on upstream changes. The idempotency key and pre-flight check still protect against duplicate charges if it does re-run, but the correct first defense is to ensure the billing asset only materializes when explicitly intended.
Can I use Dagster's IOManager to make billing idempotent?
An IOManager controls how op outputs are stored and retrieved — not whether the op's side effects (the Stripe API call) are deduplicated. Even if you write a custom IOManager that detects an existing output for a given run ID and partition, the IOManager is invoked after the op callable completes. If the op raised before returning, the IOManager is not called. The idempotency must be at the Stripe call layer inside the op callable, not at the IOManager layer. Use the IOManager for persisting the charge ID after a successful op; use the idempotency key and pre-flight check to ensure the Stripe call itself is safe to retry.
Does the Keybrake proxy work with Dagster's asset check framework?
Yes. Dagster asset checks (@asset_check) are ops that run after a materialization and assert properties of the resulting asset — for example, checking that the number of charges recorded matches the number of customers billed. You can pass a read-only audit vault key to an asset check op that queries Stripe via the proxy to verify charge counts without risking creating new charges. The audit vault key (scoped to GET /v1/charges only, $0 daily cap) is the right tool for verification checks: it cannot create charges even if the check logic has a bug, and it produces a proxy audit log showing which assets triggered verification reads.
Stop handing your Dagster ops a bare Stripe key
Keybrake issues per-op vault keys with endpoint allowlists, daily spend caps, and a full audit log — so a RetryPolicy storm, a 6-month backfill, or a re-execution from failure can never charge a customer twice or exhaust your Stripe balance. One URL change in your StripeClient initializer.