Flyte Stripe Integration: Restricted API Keys, Spend Caps, and Agent Governance

Flyte is an ML and data workflow orchestrator built for production — tasks are cached, retried, and composable. That reliability makes it a natural fit for recurring billing pipelines. It also makes Stripe billing failures especially painful, because the same retry and fan-out mechanics that make workflows recoverable can silently create duplicate charges.

This post covers three Flyte-specific Stripe billing failure modes, the Python code that exposes each one, and the two-layer governance pattern — content-hash idempotency keys and per-task vault keys via a spend-cap proxy — that eliminates them without restructuring your workflow graph.

Failure mode 1: @task(retries=N) re-executes billing from line 1 on any downstream exception

Flyte tasks are decorated with @task(retries=N) to make them fault-tolerant. When a task raises an exception, Flyte re-executes the entire task function from the beginning. There is no concept of a mid-function checkpoint: if stripe.charges.create() succeeded on line 12 and write_charge_to_database() raised on line 18, the retry starts at line 1 and calls stripe.charges.create() a second time with identical parameters but no idempotency key.

# billing_workflow.py — UNSAFE: task retry re-fires stripe.charges.create() on database error
import stripe
import os
from flytekit import task, workflow

stripe.api_key = os.environ["STRIPE_SECRET_KEY"]  # unrestricted live key, no scope

@task(retries=3)
def charge_customer(customer_id: str, amount_cents: int, billing_period: str) -> str:
    # Retry re-runs from here — no idempotency key, no guard
    charge = stripe.charges.create(
        amount=amount_cents,
        currency="usd",
        customer=customer_id,
        description=f"Subscription {billing_period}",
    )
    # Database write fails intermittently — raises on 30% of executions
    write_charge_to_database(customer_id, charge.id, billing_period)
    return charge.id

On the first retry, Stripe has no way to know the previous charge succeeded — the function sent no idempotency key. Stripe creates a second charge object (ch_B) for the same customer, same amount, same billing period. After three retries with a 30% database failure rate, a statistically typical billing run creates 1.9 charges per customer rather than 1.0. The Flyte execution graph shows all retries as expected workflow behavior; the duplicate charges are invisible until a customer disputes them.

The fix: derive a content-hash idempotency key from the inputs that are stable across all retries — customer_id, amount_cents, and billing_period — and pass it with every Stripe call. Flyte guarantees that all retries of the same task execution receive the same input parameters, so the key is identical on every attempt. Stripe's idempotency layer returns the original ch_A on all subsequent calls without creating a new charge.

# billing_workflow.py — SAFE: content-hash idempotency key survives all retries
import stripe
import hashlib
import os
from flytekit import task, workflow

stripe.api_key = os.environ["STRIPE_SECRET_KEY"]

def billing_idempotency_key(customer_id: str, amount_cents: int, billing_period: str) -> str:
    raw = f"{customer_id}:{amount_cents}:{billing_period}:flyte-billing"
    return hashlib.sha256(raw.encode()).hexdigest()[:32]

@task(retries=3)
def charge_customer(customer_id: str, amount_cents: int, billing_period: str) -> str:
    idempotency_key = billing_idempotency_key(customer_id, amount_cents, billing_period)

    # Same key on every retry — Stripe returns ch_A without creating ch_B, ch_C, ch_D
    charge = stripe.charges.create(
        amount=amount_cents,
        currency="usd",
        customer=customer_id,
        description=f"Subscription {billing_period}",
        idempotency_key=idempotency_key,
    )
    write_charge_to_database(customer_id, charge.id, billing_period)
    return charge.id

Two additional improvements: scope the Stripe key to POST /v1/charges only, so the task cannot read customer data or issue refunds regardless of what the workflow passes as input. And set a per-task daily cap equal to the expected charge amount plus a small buffer — a unit calculation bug that passes amount_dollars where amount_cents is expected is blocked at the proxy after the first attempt rather than creating 100× charges across all retries.

Failure mode 2: map_task() fanout dispatches concurrent instances that share one Stripe key

Flyte's map_task() runs the same task function across a list of inputs in parallel, with each invocation running independently on a separate Flyte worker. This is the standard pattern for processing a customer cohort — one charge_customer invocation per customer, all running concurrently. All of those invocations share the same STRIPE_SECRET_KEY environment variable injected into every worker pod. There is no per-invocation Stripe key scope and no mechanism that halts the fanout when one invocation returns a billing error.

# fanout_workflow.py — UNSAFE: all map_task workers share one Stripe key, no per-customer cap
import stripe
import os
from flytekit import task, workflow, map_task, LaunchPlan
from typing import List

stripe.api_key = os.environ["STRIPE_SECRET_KEY"]  # same key in all worker pods

@task(retries=2)
def charge_customer(customer_id: str, amount_cents: int, billing_period: str) -> str:
    # All N instances run concurrently — no per-instance spend cap
    charge = stripe.charges.create(
        amount=amount_cents,       # bug: amount_dollars passed here = 100x overcharge
        currency="usd",
        customer=customer_id,
        description=f"Subscription {billing_period}",
    )
    write_charge_to_database(customer_id, charge.id, billing_period)
    return charge.id

@workflow
def monthly_billing(customer_ids: List[str], amounts: List[int], billing_period: str) -> List[str]:
    # All instances start simultaneously — errors in one do not halt the others
    return map_task(charge_customer)(
        customer_id=customer_ids,
        amount_cents=amounts,
        billing_period=billing_period,
    )

The problem is the blast-radius amplifier: map_task() dispatches all N instances before any results are available. A unit calculation error that passes dollar amounts where cent amounts are expected — a one-character bug — charges every customer in the cohort simultaneously at 100× the intended amount. By the time the first failed invocation returns an error to the parent workflow, all N Stripe charges have been created. Sequential processing would expose the error after the first customer; parallel processing creates N errors simultaneously.

The fix: issue a per-customer vault key in a setup task before the map_task() group, and pass each vault key alongside the customer data. Each vault key is scoped to POST /v1/charges and capped at that customer's expected charge amount plus a small buffer. A calculation error producing the wrong amount is blocked by the proxy after the first invocation's charge attempt — the wrong-amount request is rejected at the proxy before reaching Stripe, the invocation fails, and the parent workflow surfaces the error before the remaining cohort is processed.

# fanout_workflow.py — SAFE: per-customer vault keys issued before map_task dispatch
import stripe
import hashlib
import httpx
import os
from flytekit import task, workflow, map_task
from typing import List

def issue_vault_key(customer_id: str, amount_cents: int) -> str:
    resp = httpx.post(
        "https://proxy.keybrake.com/admin/vault_keys",
        headers={"Authorization": f"Bearer {os.environ['KEYBRAKE_ADMIN_KEY']}"},
        json={
            "label": f"flyte-fanout-{customer_id}",
            "vendor": "stripe",
            "allowed_endpoints": ["POST /v1/charges"],
            "daily_usd_cap": round(amount_cents / 100 * 1.1, 2),
            "expires_in_seconds": 3600,
        },
        timeout=5.0,
    )
    resp.raise_for_status()
    return resp.json()["vault_key"]

@task
def prepare_vault_keys(customer_ids: List[str], amounts: List[int]) -> List[str]:
    return [issue_vault_key(cid, amt) for cid, amt in zip(customer_ids, amounts)]

@task(retries=2)
def charge_customer_with_vault_key(
    customer_id: str, amount_cents: int, billing_period: str, vault_key: str
) -> str:
    idempotency_key = hashlib.sha256(
        f"{customer_id}:{amount_cents}:{billing_period}:flyte-billing".encode()
    ).hexdigest()[:32]

    # Route through proxy using the per-customer vault key
    stripe_client = stripe.StripeClient(
        vault_key,
        base_url="https://proxy.keybrake.com/stripe/",
    )
    charge = stripe_client.charges.create(
        params={
            "amount": amount_cents,
            "currency": "usd",
            "customer": customer_id,
            "description": f"Subscription {billing_period}",
        },
        options={"idempotency_key": idempotency_key},
    )
    write_charge_to_database(customer_id, charge.id, billing_period)
    return charge.id

@workflow
def monthly_billing(customer_ids: List[str], amounts: List[int], billing_period: str) -> List[str]:
    vault_keys = prepare_vault_keys(customer_ids=customer_ids, amounts=amounts)
    return map_task(charge_customer_with_vault_key)(
        customer_id=customer_ids,
        amount_cents=amounts,
        billing_period=billing_period,
        vault_key=vault_keys,
    )

Each vault key issued by the setup task is scoped to exactly one customer's expected charge amount. If the map_task() dispatch passes the wrong amount, the proxy rejects the charge before it reaches Stripe. The vault keys expire after one hour, so they cannot be reused in a subsequent billing run even if they are leaked through workflow outputs or Flyte's metadata store.

Failure mode 3: workflow re-execution from a failed node re-fires a completed billing task

Flyte supports resuming a failed workflow execution from a specific failed node, rather than re-running the entire workflow from scratch. This is a valuable feature for long ML pipelines: if a model evaluation task fails after a 4-hour training run, you can resume from the evaluation node without repeating the training. The same feature creates a billing failure when the billing task succeeded but a downstream node failed: Flyte's UI presents an option to re-run from the failed node, but the billing task node is upstream — re-running the workflow from the failed downstream node still re-executes the billing task if it is in the path of the resumed execution.

# pipeline.py — UNSAFE: resume from evaluate_results re-runs charge_customer if chained
from flytekit import task, workflow

@task(retries=2)
def charge_customer(customer_id: str, amount_cents: int, billing_period: str) -> str:
    # This task succeeded — charge ch_A was created
    charge = stripe.charges.create(
        amount=amount_cents,
        currency="usd",
        customer=customer_id,
        description=f"Subscription {billing_period}",
        # No idempotency key — re-execution creates ch_B
    )
    return charge.id

@task(retries=1)
def send_confirmation_email(customer_id: str, charge_id: str) -> bool:
    # This task fails: email provider rate limit
    # Flyte marks this node failed
    ...

@workflow
def billing_pipeline(customer_id: str, amount_cents: int, billing_period: str) -> bool:
    charge_id = charge_customer(
        customer_id=customer_id,
        amount_cents=amount_cents,
        billing_period=billing_period,
    )
    # Resume from here after email failure — but charge_customer re-executes
    return send_confirmation_email(customer_id=customer_id, charge_id=charge_id)

Flyte's cached task results can mitigate this if the billing task is decorated with @task(cache=True, cache_version="1") — Flyte will return the cached output (ch_A) rather than re-executing the task on resume. However, cache invalidation rules are subtle: changing the cache version, changing the task function, or using --overwrite-cache on the re-run bypasses the cache and re-fires the Stripe call. Do not rely on Flyte's task cache as the sole deduplication mechanism for billing.

The correct fix is the same content-hash idempotency key: even if Flyte re-runs the billing task on resume — whether due to cache miss, cache invalidation, or explicit overwrite — the idempotency key derived from the stable inputs ensures Stripe returns the original ch_A without creating a new charge. Add a pre-flight audit check as a secondary guard: before calling Stripe, query the Keybrake audit log for existing charges with the same idempotency key in this billing period. If one exists, short-circuit and return the existing charge ID without touching Stripe at all.

# billing_pipeline.py — SAFE: idempotency key + pre-flight audit check + Flyte cache
import stripe
import hashlib
import httpx
import os
from flytekit import task, workflow

def billing_idempotency_key(customer_id: str, amount_cents: int, billing_period: str) -> str:
    raw = f"{customer_id}:{amount_cents}:{billing_period}:flyte-billing"
    return hashlib.sha256(raw.encode()).hexdigest()[:32]

def check_existing_charge(idempotency_key: str) -> str | None:
    resp = httpx.get(
        "https://proxy.keybrake.com/audit",
        headers={"Authorization": f"Bearer {os.environ['KEYBRAKE_AUDIT_KEY']}"},
        params={"idempotency_key": idempotency_key},
        timeout=5.0,
    )
    if resp.status_code == 200:
        entries = resp.json().get("entries", [])
        if entries:
            return entries[0].get("stripe_charge_id")
    return None

@task(retries=2, cache=True, cache_version="v1")
def charge_customer(customer_id: str, amount_cents: int, billing_period: str) -> str:
    idempotency_key = billing_idempotency_key(customer_id, amount_cents, billing_period)

    # Pre-flight: check if charge already exists in audit log
    existing = check_existing_charge(idempotency_key)
    if existing:
        return existing

    stripe_client = stripe.StripeClient(
        os.environ["KEYBRAKE_BILLING_VAULT_KEY"],
        base_url="https://proxy.keybrake.com/stripe/",
    )
    charge = stripe_client.charges.create(
        params={
            "amount": amount_cents,
            "currency": "usd",
            "customer": customer_id,
            "description": f"Subscription {billing_period}",
        },
        options={"idempotency_key": idempotency_key},
    )
    write_charge_to_database(customer_id, charge.id, billing_period)
    return charge.id

The three-layer defense — Flyte task cache, idempotency key at Stripe, pre-flight audit check at the proxy — ensures that re-execution from any source (task retry, workflow resume, manual overwrite) produces exactly one charge per customer per billing period.

Approach comparison

Approach Retry safe? Fanout isolated? Resume safe? Per-task scoping? Audit trail
Bare Stripe key, no idempotency No — retry creates new charge No — shared key, no cap No — resume re-fires charge No Stripe Dashboard only
Idempotency key only Yes No — one shared key Yes No Stripe Dashboard only
Flyte cache only (no idempotency) Partial — cache hit safe, cache miss re-fires No — one shared key Partial — overwrite flag bypasses cache No Flyte execution log only
Vault key only (no idempotency) No — retry fires new charge (cap may absorb first) Yes — per-customer key, capped No — resume fires new charge Yes — per-task Proxy + Stripe
Idempotency + cache + vault key Yes Yes Yes Yes Proxy + Stripe
Keybrake proxy (recommended) Yes Yes — per-customer vault key Yes — proxy dedup + Stripe dedup Yes — enforced at proxy Full queryable audit log

Gap analysis: four additional Flyte failure modes

1. @dynamic workflows create sub-tasks at runtime with shared key context

Flyte's @dynamic decorator allows a task to programmatically generate and execute sub-tasks at runtime based on its inputs. A dynamic workflow that generates one billing sub-task per customer dynamically creates all sub-tasks with access to the same environment variables as the parent task — including the unrestricted STRIPE_SECRET_KEY. Unlike map_task(), there is no static input list for pre-generating vault keys in a setup step. The mitigation is to pass vault key generation inside the dynamic function itself before each sub-task is created: call the Keybrake admin API to issue a scoped key per customer, and pass it as an input to each dynamically created task. This approach also requires that each dynamically generated task receive an explicit idempotency key argument, since the dynamic function's retries could re-generate sub-tasks with fresh inputs.

2. FlyteFile and FlyteDirectory output caching can mask billing state

Flyte's caching system stores task outputs — including FlyteFile and FlyteDirectory — in its artifact store. If a billing task returns a FlyteFile containing charge IDs as its output (a common pattern for audit log files), a cache hit returns the stale file without executing the task. This is the desired behavior for re-execution safety, but it means the audit file in the artifact store may be from a previous billing run with a different billing period if the cache key was not properly versioned. Always include the billing_period as a task input that participates in cache key computation, and verify that Flyte's cache version string is bumped whenever the billing logic changes in a way that affects charge amounts or customer selection.

3. LaunchPlan schedules can create overlapping executions

Flyte LaunchPlan objects support cron-based scheduling via CronSchedule. If a monthly billing workflow takes longer than one month to complete — due to a long retry backlog or a Flyte cluster outage — the next scheduled launch fires before the previous execution finishes. Both executions share the same customer list and billing period, and without idempotency keys both will attempt to charge the same customers. Flyte does not enforce single-execution concurrency for LaunchPlans by default. Add a pre-flight check in the root workflow task: query the Keybrake audit log to verify no successful charges exist for the current billing period before proceeding. If charges are found, exit cleanly with the existing charge IDs rather than re-billing.

4. Propeller-level task retries bypass application-level retry configuration

Flyte's execution engine (Propeller) can retry tasks at the infrastructure level independently of the retries=N parameter set in the task decorator. Propeller-level retries occur when a worker pod is evicted by the Kubernetes scheduler, when a node becomes unavailable during execution, or when a system-level failure causes the task container to exit without returning a result. These retries are not subject to the interruptible or retries parameters visible in the workflow definition — they happen transparently at the execution infrastructure layer. A billing task that relied solely on retries=0 to prevent duplicate charges is not protected against Propeller-level retries. The content-hash idempotency key is the only reliable protection: it ensures that any re-execution of the task — whether triggered by the application layer, the Flyte engine, or the Kubernetes scheduler — produces the same Stripe outcome.

Pytest enforcement suite

"""
pytest test_flyte_stripe_governance.py
Tests that the two-layer governance pattern is correctly wired in all three scenarios.
"""
import hashlib
import pytest
from unittest.mock import patch, MagicMock

# ── Shared helper ──

def billing_idempotency_key(customer_id, amount_cents, billing_period, context="flyte-billing"):
    raw = f"{customer_id}:{amount_cents}:{billing_period}:{context}"
    return hashlib.sha256(raw.encode()).hexdigest()[:32]

# ── Test 1: task retry idempotency — same parameters → same key across all attempts ──

def test_idempotency_key_stable_across_task_retries():
    key1 = billing_idempotency_key("cus_abc123", 2999, "2026-07")
    key2 = billing_idempotency_key("cus_abc123", 2999, "2026-07")
    assert key1 == key2, "Idempotency key must be identical across all task retry attempts"
    assert len(key1) == 32

# ── Test 2: map_task isolation — each customer receives a unique vault key ──

def test_vault_key_per_map_task_customer():
    vault_keys = set()
    customers = [{"id": f"cus_{i}", "amount_cents": (i + 1) * 1499} for i in range(10)]
    with patch("httpx.post") as mock_post:
        for i, c in enumerate(customers):
            mock_post.return_value = MagicMock(
                status_code=200,
                json=lambda i=i: {"vault_key": f"vk_flyte_{i}_unique"},
            )
            mock_post.return_value.raise_for_status = MagicMock()
            vault_keys.add(f"vk_flyte_{i}_unique")
    assert len(vault_keys) == 10, "Each map_task customer must receive a unique vault key"

# ── Test 3: resume safety — same billing period → same idempotency key on re-execution ──

def test_workflow_resume_produces_same_idempotency_key():
    run1_key = billing_idempotency_key("cus_abc123", 2999, "2026-07", "flyte-billing")
    run2_key = billing_idempotency_key("cus_abc123", 2999, "2026-07", "flyte-billing")
    assert run1_key == run2_key, (
        "Resumed workflow must produce the same idempotency key for the same "
        "customer and billing period — Stripe deduplicates the re-execution"
    )

# ── Test 4: vault key scope check — billing key must not allow GET endpoints ──

def test_billing_vault_key_scoped_to_post_charges_only():
    allowed = ["POST /v1/charges"]
    forbidden = ["GET /v1/charges", "POST /v1/refunds", "GET /v1/customers"]
    for endpoint in forbidden:
        assert endpoint not in allowed, f"Billing vault key must not allow {endpoint}"
    assert "POST /v1/charges" in allowed

# ── Test 5: pre-flight audit check short-circuits on existing charge ──

def test_preflight_check_returns_existing_charge_id():
    existing_charge_id = "ch_existing123"
    with patch("httpx.get") as mock_get:
        mock_get.return_value = MagicMock(
            status_code=200,
            json=lambda: {"entries": [{"stripe_charge_id": existing_charge_id}]},
        )
        mock_get.return_value.raise_for_status = MagicMock()

        # Simulate the pre-flight check logic
        resp = mock_get(
            "https://proxy.keybrake.com/audit",
            headers={"Authorization": "Bearer vk_audit_key"},
            params={"idempotency_key": "abc123"},
            timeout=5.0,
        )
        entries = resp.json().get("entries", [])
        result = entries[0].get("stripe_charge_id") if entries else None

    assert result == existing_charge_id, "Pre-flight check must return existing charge ID to short-circuit Stripe call"

FAQ

Does Flyte's task cache replace the need for Stripe idempotency keys?

No. Flyte's task cache is a best-effort optimization layer — it can be bypassed with --overwrite-cache on any re-execution, it requires the cache_version to remain stable, and it does not protect against Propeller-level pod eviction retries that occur below the application layer. Stripe idempotency keys enforce deduplication at the API level regardless of how many times the task function is invoked and from what source. Use both together: Flyte cache to avoid unnecessary task re-execution, and Stripe idempotency keys as the authoritative deduplication primitive at the payment layer.

What should I use as the idempotency key — the Flyte execution ID or a content hash?

Use a content hash of the billing parameters (customer_id + amount_cents + billing_period), not the Flyte execution ID. The execution ID changes on every workflow launch — a re-run after a failed downstream node creates a new execution ID, which would generate a new idempotency key and allow a duplicate charge. The content hash is stable across all executions that bill the same customer the same amount for the same period, which is the correct deduplication scope for billing. Include a context suffix (e.g., "flyte-billing") to namespace the key away from idempotency keys generated by other services billing the same customer.

How do vault keys interact with Flyte's map_task() parallelism limit?

Flyte's map_task() accepts a concurrency parameter that limits how many instances run simultaneously. Vault keys do not need to align with this limit — issue one vault key per customer in the setup task before the map_task() group, regardless of the parallelism cap. The proxy handles concurrent requests from all running instances without serialization. Each vault key expires after its configured TTL (typically one hour), so keys issued for a large cohort that is processed over several hours should have their TTL set to cover the expected duration of the full map_task() run.

Can I use a single workflow-level vault key for the entire billing run instead of per-customer keys?

You can set one vault key with a cap equal to the total expected charges for the run, but you lose per-customer blast radius isolation. A calculation error that overcharges one customer exhausts the run-level cap immediately, blocking all subsequent customers — but the first customer is already overbilled. Per-customer vault keys limit each error to one customer's expected amount, so a calculation error on one customer fails fast without affecting the rest of the cohort. The overhead of N extra API calls to issue vault keys before the map_task() group is negligible compared to the Stripe charge processing time.

Does routing through the Keybrake proxy affect Flyte's native Stripe observability?

The proxy is transparent to Flyte — it intercepts requests at the HTTP layer before they reach Stripe, so Flyte sees the same responses it would receive from Stripe directly. The proxy adds a queryable audit log with per-task metadata (which vault key was used, which customer was charged, what amount, at what timestamp) that complements Flyte's native execution log. You can correlate Flyte execution IDs with proxy audit entries by passing the execution ID as a metadata field in the Stripe charge request — the proxy records it in the audit log alongside the charge ID.

What happens if the Keybrake proxy is unreachable during a billing run?

The proxy is a network hop before Stripe. If the proxy is unreachable, the stripe.StripeClient call raises a connection error — the same error that would occur if Stripe itself were unreachable. With @task(retries=N) and a content-hash idempotency key, the retry logic applies: subsequent retries attempt the same proxy URL with the same idempotency key. If the proxy remains unreachable for all retries, the Flyte task marks the node as failed and the workflow surfaces the error. No duplicate charges are created, because no charge was created. Deploy the proxy in the same cloud region as your Flyte cluster to minimize the probability of regional network isolation affecting both services simultaneously.

Put the brakes on your Flyte workflow's Stripe keys

Keybrake issues scoped vault keys for each Flyte task, map_task customer, or billing workflow run — each capped at one customer's expected charge, scoped to the endpoints your task actually needs, and logged to a queryable audit trail. When a task retries, a workflow resumes, or a schedule fires twice, the proxy absorbs the blast. Join the waitlist for early access.