ZenML Stripe Integration: Restricted API Keys, Spend Caps, and Agent Governance

ZenML is a framework-agnostic MLOps platform that turns Python functions into reproducible, retryable pipeline steps deployable across Vertex AI Pipelines, Kubeflow, SageMaker, and local runners. That reproducibility is its core value. It also makes Stripe billing failures uniquely dangerous, because the same retry and caching mechanics that make ML experiments reproducible can silently replay billing operations on customers who have already been charged.

This post covers three ZenML-specific Stripe billing failure modes, the Python code that exposes each one, and the two-layer governance pattern — content-hash idempotency keys and per-step vault keys via a spend-cap proxy — that eliminates them without restructuring your pipeline.

Failure mode 1: StepRetryConfig re-executes billing from line 1 on any downstream exception

ZenML supports step-level retry configuration via StepRetryConfig, passed to the @step decorator. When a step raises any unhandled exception, ZenML re-executes the entire step function from the beginning. There is no concept of a mid-function checkpoint: if stripe.charges.create() succeeded on line 8 and write_charge_to_database() raised on line 14, the retry starts at line 1 and calls stripe.charges.create() a second time with the same parameters but no idempotency key.

# billing_pipeline.py — UNSAFE: step retry re-fires stripe.charges.create() on database error
import stripe
import os
from zenml import step, pipeline
from zenml.config import StepRetryConfig

stripe.api_key = os.environ["STRIPE_SECRET_KEY"]  # unrestricted live key

@step(retry=StepRetryConfig(max_retries=3, delay=5, backoff=2))
def charge_customer(customer_id: str, amount_cents: int, billing_period: str) -> str:
    # Retry re-runs from here — no idempotency key, no guard
    charge = stripe.charges.create(
        amount=amount_cents,
        currency="usd",
        customer=customer_id,
        description=f"Subscription {billing_period}",
    )
    # Database write fails intermittently — raises on 20% of executions
    write_charge_to_database(customer_id, charge.id, billing_period)
    return charge.id

On the first retry, Stripe has no record of the previous call — no idempotency key was sent, so Stripe treats it as a new request and creates a second charge (ch_B) for the same customer, same amount, same billing period. With max_retries=3 and a 20% database failure rate, a typical billing run creates 1.7 charges per customer rather than 1.0. ZenML's pipeline dashboard shows each retry as expected step behavior; the duplicate charges are invisible until customers dispute them or a billing reconciliation catches the discrepancy days later.

The fix: derive a content-hash idempotency key from the inputs that are stable across all retries — customer_id, amount_cents, and billing_period — and pass it with every Stripe call. ZenML guarantees that all retries of the same step execution receive the same input parameters, so the key is identical on every attempt. Stripe's idempotency layer returns the original ch_A on all subsequent calls without creating a new charge.

# billing_pipeline.py — SAFE: content-hash idempotency key survives all StepRetryConfig retries
import stripe
import hashlib
import os
from zenml import step, pipeline
from zenml.config import StepRetryConfig

stripe.api_key = os.environ["STRIPE_SECRET_KEY"]

def billing_idempotency_key(customer_id: str, amount_cents: int, billing_period: str) -> str:
    raw = f"{customer_id}:{amount_cents}:{billing_period}:zenml-billing"
    return hashlib.sha256(raw.encode()).hexdigest()[:32]

@step(retry=StepRetryConfig(max_retries=3, delay=5, backoff=2))
def charge_customer(customer_id: str, amount_cents: int, billing_period: str) -> str:
    idempotency_key = billing_idempotency_key(customer_id, amount_cents, billing_period)

    # Same key on every retry — Stripe returns ch_A without creating ch_B, ch_C, ch_D
    charge = stripe.charges.create(
        amount=amount_cents,
        currency="usd",
        customer=customer_id,
        description=f"Subscription {billing_period}",
        idempotency_key=idempotency_key,
    )
    write_charge_to_database(customer_id, charge.id, billing_period)
    return charge.id

Two additional improvements: scope the Stripe key to POST /v1/charges only so the step cannot read customer data, issue refunds, or list subscriptions regardless of what the pipeline passes as input. And set a per-step daily cap equal to the expected charge amount plus a small buffer — a unit bug that passes amount_dollars where amount_cents is expected is blocked by the proxy after the first attempt rather than creating 100× charges across all retries and the full customer cohort.

Failure mode 2: parallel pipeline steps share one unrestricted Stripe key with no per-step spend cap

ZenML pipelines execute steps in parallel when those steps have no explicit data dependency between them. A common pattern is to fan out billing across multiple customer segments: separate steps for charge_tier_free, charge_tier_pro, and charge_tier_enterprise can run concurrently if they don't consume each other's outputs. All of those steps run with the same environment variables — including STRIPE_SECRET_KEY — injected at the container or process level. There is no mechanism that halts a parallel sibling step when one step encounters a billing error.

# segmented_billing.py — UNSAFE: parallel steps share one Stripe key, no per-segment cap
import stripe
import os
from zenml import step, pipeline
from typing import List

stripe.api_key = os.environ["STRIPE_SECRET_KEY"]  # same key in all parallel step containers

@step
def charge_free_tier(customer_ids: List[str], billing_period: str) -> List[str]:
    results = []
    for cid in customer_ids:
        # No idempotency key, no spend cap — all 3 steps run concurrently
        charge = stripe.charges.create(
            amount=999,  # $9.99 — but if amount_cents/amount_dollars confusion exists...
            currency="usd",
            customer=cid,
            description=f"Free→Pro upgrade {billing_period}",
        )
        results.append(charge.id)
    return results

@step
def charge_pro_tier(customer_ids: List[str], billing_period: str) -> List[str]:
    results = []
    for cid in customer_ids:
        charge = stripe.charges.create(
            amount=2999,  # $29.99
            currency="usd",
            customer=cid,
            description=f"Pro subscription {billing_period}",
        )
        results.append(charge.id)
    return results

@pipeline
def monthly_billing_pipeline(
    free_customers: List[str],
    pro_customers: List[str],
    billing_period: str,
) -> None:
    # ZenML runs these in parallel — no dependency, no shared circuit breaker
    charge_free_tier(free_customers, billing_period)
    charge_pro_tier(pro_customers, billing_period)

The blast-radius problem: when both segment steps run simultaneously and one contains a billing bug — wrong amount, wrong customer list, wrong description — both are already in progress by the time the error surfaces. Sequential execution would expose the bug after the first segment; parallel execution creates errors in all segments simultaneously before any step result is returned to the pipeline. With a unit calculation error producing amounts in dollars instead of cents, all concurrent steps charge customers 100× simultaneously with no circuit breaker between them.

The fix: issue per-step vault keys in a setup step that runs before the parallel fan-out, and pass each vault key to its corresponding billing step. Each vault key is scoped to POST /v1/charges and capped at the maximum expected charge for that segment plus a small buffer. A wrong-amount request is rejected at the proxy before reaching Stripe; because each step has its own capped vault key, a bug in one segment does not consume the spend budget of another segment.

# segmented_billing.py — SAFE: per-segment vault keys issued before parallel fan-out
import stripe
import hashlib
import httpx
import os
from zenml import step, pipeline
from typing import List, Tuple

def issue_vault_key(label: str, max_amount_cents: int) -> str:
    resp = httpx.post(
        "https://proxy.keybrake.com/admin/vault_keys",
        headers={"Authorization": f"Bearer {os.environ['KEYBRAKE_ADMIN_KEY']}"},
        json={
            "label": label,
            "vendor": "stripe",
            "allowed_endpoints": ["POST /v1/charges"],
            "daily_usd_cap": round(max_amount_cents / 100 * 1.1, 2),
            "expires_in_seconds": 7200,
        },
        timeout=5.0,
    )
    resp.raise_for_status()
    return resp.json()["vault_key"]

@step
def prepare_segment_keys(billing_period: str) -> Tuple[str, str]:
    free_key = issue_vault_key(f"zenml-free-{billing_period}", max_amount_cents=999)
    pro_key = issue_vault_key(f"zenml-pro-{billing_period}", max_amount_cents=2999)
    return free_key, pro_key

@step
def charge_free_tier(
    customer_ids: List[str], billing_period: str, vault_key: str
) -> List[str]:
    results = []
    for cid in customer_ids:
        ikey = hashlib.sha256(
            f"{cid}:999:{billing_period}:zenml-free".encode()
        ).hexdigest()[:32]
        stripe_client = stripe.StripeClient(
            vault_key,
            base_url="https://proxy.keybrake.com/stripe/",
        )
        charge = stripe_client.charges.create(
            params={"amount": 999, "currency": "usd", "customer": cid,
                    "description": f"Free→Pro upgrade {billing_period}"},
            options={"idempotency_key": ikey},
        )
        results.append(charge.id)
    return results

@step
def charge_pro_tier(
    customer_ids: List[str], billing_period: str, vault_key: str
) -> List[str]:
    results = []
    for cid in customer_ids:
        ikey = hashlib.sha256(
            f"{cid}:2999:{billing_period}:zenml-pro".encode()
        ).hexdigest()[:32]
        stripe_client = stripe.StripeClient(
            vault_key,
            base_url="https://proxy.keybrake.com/stripe/",
        )
        charge = stripe_client.charges.create(
            params={"amount": 2999, "currency": "usd", "customer": cid,
                    "description": f"Pro subscription {billing_period}"},
            options={"idempotency_key": ikey},
        )
        results.append(charge.id)
    return results

@pipeline
def monthly_billing_pipeline(
    free_customers: List[str],
    pro_customers: List[str],
    billing_period: str,
) -> None:
    free_key, pro_key = prepare_segment_keys(billing_period)
    # Both steps now run with isolated vault keys and spend caps
    charge_free_tier(free_customers, billing_period, free_key)
    charge_pro_tier(pro_customers, billing_period, pro_key)

Each vault key expires after two hours, so they cannot be reused if a subsequent pipeline run picks up a cached artifact containing the key string. The per-segment spend caps ensure that a calculation bug in one segment is contained — the proxy rejects the first wrong-amount charge request and the step fails with a 402 rather than silently overcharging the entire segment.

Failure mode 3: step cache invalidation silently re-runs billing steps that already completed

ZenML caches step outputs by default, keyed on the step function's source code hash plus its input artifact hashes. This is an excellent property for ML experiment reproducibility — you don't re-run a 4-hour training step if the data and code haven't changed. For billing steps, the same caching behavior creates a dangerous illusion of safety: developers assume that once a billing step has run, it won't run again. That assumption breaks in three common scenarios.

First, any change to the billing step's source code — bumping a version constant, adding a log statement, changing a comment — changes the code hash and invalidates the cache. On the next pipeline run, ZenML re-executes the billing step as if it had never run, including calling stripe.charges.create() for customers who were charged in the previous run. Second, ZenML's --no-cache flag (passed at pipeline execution time or set via run_pipeline(..., config=PipelineRunConfiguration(enable_cache=False))) bypasses all caches globally, including billing steps. Third, input artifact changes — updating the customer list, changing the billing period string — produce different input hashes and trigger re-execution.

# billing_pipeline.py — UNSAFE: step cache invalidated by code change, re-runs charge
from zenml import step, pipeline

@step  # cache enabled by default — but invalidated by any source code change
def charge_customer(customer_id: str, amount_cents: int, billing_period: str) -> str:
    # This step ran last month and charged ch_A.
    # Developer added a logging statement below → code hash changed.
    # ZenML re-executes this step → stripe.charges.create() fires again → ch_B.
    import logging
    logging.info(f"Charging customer {customer_id}")  # newly added line

    charge = stripe.charges.create(
        amount=amount_cents,
        currency="usd",
        customer=customer_id,
        description=f"Subscription {billing_period}",
        # No idempotency key — cache invalidation = duplicate charge
    )
    return charge.id

The insidious part: ZenML gives no warning that the billing step was previously executed for the same customer and billing period. From ZenML's perspective, the cache miss is legitimate — the code changed, so the output might differ. From Stripe's perspective, it receives a structurally identical charge request with no idempotency key and creates a new charge object. The developer sees the pipeline step execute successfully (correct from ZenML's view), the customer receives a second charge (incorrect from the billing view), and the discrepancy surfaces only at end-of-month reconciliation.

The fix is identical to failure modes 1 and 2: a content-hash idempotency key derived from stable business inputs makes the billing operation safe regardless of how many times ZenML executes the step function. Add a pre-flight audit check as a secondary guard — query the Keybrake audit log before calling Stripe, and short-circuit with the existing charge ID if one is found for this idempotency key. This handles the case where Stripe's own 24-hour idempotency key window has expired but the customer has already been charged in a prior pipeline run.

# billing_pipeline.py — SAFE: idempotency key + pre-flight audit check survive cache invalidation
import stripe
import hashlib
import httpx
import os
from zenml import step, pipeline

def billing_idempotency_key(customer_id: str, amount_cents: int, billing_period: str) -> str:
    raw = f"{customer_id}:{amount_cents}:{billing_period}:zenml-billing"
    return hashlib.sha256(raw.encode()).hexdigest()[:32]

def check_existing_charge(idempotency_key: str) -> str | None:
    resp = httpx.get(
        "https://proxy.keybrake.com/audit",
        headers={"Authorization": f"Bearer {os.environ['KEYBRAKE_AUDIT_KEY']}"},
        params={"idempotency_key": idempotency_key},
        timeout=5.0,
    )
    if resp.status_code == 200:
        entries = resp.json().get("entries", [])
        if entries:
            return entries[0].get("stripe_charge_id")
    return None

@step
def charge_customer(customer_id: str, amount_cents: int, billing_period: str) -> str:
    idempotency_key = billing_idempotency_key(customer_id, amount_cents, billing_period)

    # Pre-flight: check proxy audit log before touching Stripe
    # Catches re-execution after cache miss, --no-cache flag, or input change
    existing = check_existing_charge(idempotency_key)
    if existing:
        return existing

    stripe_client = stripe.StripeClient(
        os.environ["KEYBRAKE_BILLING_VAULT_KEY"],
        base_url="https://proxy.keybrake.com/stripe/",
    )
    charge = stripe_client.charges.create(
        params={
            "amount": amount_cents,
            "currency": "usd",
            "customer": customer_id,
            "description": f"Subscription {billing_period}",
        },
        options={"idempotency_key": idempotency_key},
    )
    write_charge_to_database(customer_id, charge.id, billing_period)
    return charge.id

The three-layer defense — Stripe idempotency key, pre-flight audit check at the proxy, and scoped vault key — ensures that re-execution from any source (retry, cache miss, --no-cache flag, input change) produces exactly one charge per customer per billing period.

Approach comparison

Approach Retry safe? Parallel isolated? Cache-miss safe? Per-step scoping? Audit trail
Bare Stripe key, no idempotency No — retry creates new charge No — shared key, no cap No — cache miss re-fires charge No Stripe Dashboard only
Idempotency key only Yes No — shared key, no blast-radius limit Yes — within Stripe's 24h window No Stripe Dashboard only
Restricted Stripe key No idempotency — retry still creates new charge Partial — endpoint scoped, but shared across all steps No idempotency — cache miss still re-fires charge No — key shared across all ZenML containers Stripe Dashboard only
Idempotency key + per-step vault key (Keybrake) Yes — same key returned on retry Yes — per-step spend cap, isolated blast radius Yes — pre-flight audit check extends beyond 24h window Yes — scoped to allowed endpoints + daily cap Proxy audit log with full request history

Gap analysis: four ZenML edge cases not covered above

Scheduled pipeline overlaps. ZenML supports cron-scheduled pipelines via Schedule(cron_expression="0 0 1 * *"). If a monthly billing run takes longer than the schedule interval — or if a clock drift causes two scheduler ticks to fire within the same window — ZenML launches a second pipeline run while the first is still executing. Both runs reach the billing step with identical customer lists and billing periods; without idempotency keys, both create Stripe charges. The idempotency key pattern closes this: the second run's charge requests return the first run's charge objects because the keys are identical.

ZenML artifact store contamination. ZenML stores step outputs as serialized artifacts in the artifact store (S3, GCS, local filesystem). If a billing step returns a list of charge IDs as its output artifact, that artifact is readable by any subsequent step or pipeline that depends on it. A downstream step that accidentally receives a vault key artifact (if vault keys are passed as step outputs) could use it to make Stripe calls outside the intended billing step. Avoid passing vault keys as ZenML step output artifacts; instead, call the vault key issuance API inside the billing step itself and keep the key in memory only for the duration of that step invocation.

ZenML Model Control Plane re-training triggers. ZenML's Model Control Plane can automatically trigger a pipeline run when a new model version is registered or when a model transitions between stages (e.g., staging → production). If a billing pipeline is downstream of a model training pipeline in ZenML's DAG, a model promotion event can trigger the billing pipeline outside of its normal schedule. The idempotency key pattern handles this correctly — the same content-hash key will match any existing charge for that customer/amount/period — but teams should audit which model lifecycle events are wired to billing pipelines and ensure the billing step's billing_period input is deterministic, not derived from datetime.now() at step execution time.

Local runner vs. remote orchestrator divergence. ZenML's local runner executes steps in the same Python process as the pipeline driver, while remote orchestrators (Vertex AI Pipelines, Kubeflow) run steps in isolated containers. The retry behavior, environment variable injection, and parallelism semantics differ between environments. A billing pipeline tested with @pipeline(settings={"orchestrator.local": {}}) that passes safely locally may have different retry behavior when deployed to Vertex AI Pipelines, where retries are managed by the orchestrator's pod restart policy rather than ZenML's StepRetryConfig. Test idempotency behavior against the same orchestrator backend used in production, not just the local runner.

Pytest enforcement suite

# test_zenml_billing.py
import hashlib
import pytest

def billing_idempotency_key(customer_id, amount_cents, billing_period):
    raw = f"{customer_id}:{amount_cents}:{billing_period}:zenml-billing"
    return hashlib.sha256(raw.encode()).hexdigest()[:32]

def test_idempotency_key_stability_across_retries():
    """Same inputs always produce same key — StepRetryConfig retries are safe."""
    k1 = billing_idempotency_key("cus_abc", 2999, "2026-07")
    k2 = billing_idempotency_key("cus_abc", 2999, "2026-07")
    assert k1 == k2

def test_idempotency_key_distinct_per_billing_period():
    """Different billing periods produce different keys — monthly runs are isolated."""
    k_jul = billing_idempotency_key("cus_abc", 2999, "2026-07")
    k_aug = billing_idempotency_key("cus_abc", 2999, "2026-08")
    assert k_jul != k_aug

def test_idempotency_key_distinct_per_customer():
    """Different customers produce different keys — parallel step fan-out is safe."""
    k_a = billing_idempotency_key("cus_aaa", 2999, "2026-07")
    k_b = billing_idempotency_key("cus_bbb", 2999, "2026-07")
    assert k_a != k_b

def test_vault_key_scope_blocks_refund_endpoint(monkeypatch):
    """Vault key scoped to POST /v1/charges cannot call POST /v1/refunds."""
    import httpx

    def mock_post(url, **kwargs):
        if "/v1/refunds" in url:
            return type("R", (), {"status_code": 403, "raise_for_status": lambda self: (_ for _ in ()).throw(httpx.HTTPStatusError("403", request=None, response=None))})()
        return type("R", (), {"status_code": 200, "json": lambda self: {"id": "ch_test"}, "raise_for_status": lambda self: None})()

    monkeypatch.setattr(httpx, "post", mock_post)
    with pytest.raises(httpx.HTTPStatusError):
        httpx.post("https://proxy.keybrake.com/stripe/v1/refunds").raise_for_status()

def test_preflight_short_circuits_on_existing_charge(monkeypatch):
    """Pre-flight audit check returns existing charge ID — cache miss does not re-fire."""
    import httpx

    def mock_get(url, **kwargs):
        return type("R", (), {
            "status_code": 200,
            "json": lambda self: {"entries": [{"stripe_charge_id": "ch_existing_123"}]},
        })()

    monkeypatch.setattr(httpx, "get", mock_get)

    import httpx as h
    resp = h.get("https://proxy.keybrake.com/audit", params={"idempotency_key": "abc"})
    entries = resp.json()["entries"]
    assert entries[0]["stripe_charge_id"] == "ch_existing_123"

Frequently asked questions

Does ZenML's built-in step caching make idempotency keys unnecessary?

No. ZenML's step cache prevents re-execution when the step's code hash and input hashes are unchanged — it is a pipeline optimization feature, not a billing deduplication mechanism. The cache is invalidated by any source code change, by --no-cache at run time, by input changes, and it has no integration with Stripe's API. The idempotency key is the only mechanism that prevents Stripe from creating a second charge object; the step cache is complementary but not a substitute.

Can I use ZenML's execution ID as the idempotency key?

No. ZenML assigns a new execution ID to every pipeline run, including retries triggered by StepRetryConfig. Using the execution ID or step run ID as the idempotency key means each retry sends a different key, and Stripe creates a new charge on every retry — exactly the failure mode you are trying to prevent. The idempotency key must be derived from stable business inputs (customer_id, amount_cents, billing_period) that are identical across all retries of the same logical billing operation.

How long should per-step vault keys live?

Set the TTL to the maximum expected step duration plus a reasonable buffer — typically 1–2 hours for a billing step that processes a few hundred customers synchronously. Short TTLs prevent leaked vault key artifacts from being reused in a subsequent pipeline run. If your billing step processes large cohorts and might run for longer, issue separate vault keys in a setup step and pass one per customer or batch rather than extending a single key's TTL indefinitely.

What happens if the Keybrake proxy is unreachable during a ZenML step?

The billing step raises an httpx.ConnectError or similar connection exception, which ZenML's StepRetryConfig will retry. Because the step never successfully called Stripe through the proxy, no charge was created — the retry is safe. Configure retry_on_exception in StepRetryConfig to include httpx.ConnectError so ZenML retries on proxy connectivity failures. When the proxy becomes reachable, the first successful retry creates the charge with the stable idempotency key; subsequent retries return the same charge from Stripe's idempotency layer.

Does this pattern work with ZenML's cloud orchestrators (Vertex AI, Kubeflow, SageMaker)?

Yes. The content-hash idempotency key is computed from business inputs, not from ZenML or orchestrator runtime state — it produces the same string regardless of which orchestrator or container runtime executes the step. The vault key issuance API call and the Stripe proxy call are standard HTTPS requests that work identically in local runner, Vertex AI Pipelines containers, Kubeflow pods, and SageMaker Processing jobs. The only configuration difference is ensuring the KEYBRAKE_ADMIN_KEY and KEYBRAKE_BILLING_VAULT_KEY secrets are available as environment variables in each orchestrator's secret store.

Should billing steps use ZenML's step cache at all?

Disable the cache on billing steps explicitly with @step(enable_cache=False). This makes the behavior unambiguous: the billing step always executes, and idempotency keys always handle deduplication. Relying on ZenML's cache for billing safety is fragile — it works until someone changes a log line or passes --no-cache, at which point it silently stops working. The pre-flight audit check at the proxy is a more robust deduplication layer because it operates at the business-logic level (same customer + same amount + same period) rather than at the code hash level.

Add spend caps and per-step vault keys to your ZenML billing pipeline

Keybrake is a scoped API-key proxy for the non-LLM SaaS APIs your agents and pipelines call. Issue vault keys before parallel fan-out, enforce per-step spend caps, and get a full audit trail of every Stripe call across every ZenML step — without changing your pipeline graph.

Related reading