Metaflow · AI agents · API key security

Metaflow AI agent API key: scoping vendor calls in ML workflow steps

Metaflow is Netflix's open-source framework for building and deploying ML and data science workflows — step-based flows with built-in versioning, @foreach for parallelism, and @retry for resilience. When AI agent workflows use Metaflow steps to dispatch vendor API calls, the same patterns that make pipelines reliable become spending risks: @foreach creates parallel branches that each call vendor APIs independently with no per-flow dollar cap, @retry multiplies each failed step into multiple vendor charges, and re-running a flow to resume after a step failure restarts the vendor calls from that step onward. This page covers the vault-key pattern that bounds per-flow vendor spend using a flow artifact issued in the start step.

TL;DR

Metaflow flows use self.* artifacts to pass data between steps — the same mechanism you use to share model weights, dataset references, or config values. Issue a vault key in the start step and store it as self.vault_key. Every downstream step that makes vendor API calls reads self.vault_key and uses it instead of the real API key. All branches created by @foreach share the same vault key and its per-flow cap. Revoke from the Keybrake dashboard without touching the real key stored in your environment.

How Metaflow AI agent workflows call vendor APIs

In a Metaflow flow, vendor API calls typically live inside a @foreach-parallel step that processes items in a list. An AI billing agent that processes subscription renewals might look like this:

from metaflow import FlowSpec, step
import stripe
import os

class BillingFlow(FlowSpec):

    @step
    def start(self):
        self.customer_ids = ["cus_A", "cus_B", "cus_C", ...]  # could be thousands
        self.amount_cents = 2999
        self.next(self.charge_customers, foreach="customer_ids")

    @step
    def charge_customers(self):
        stripe.api_key = os.environ["STRIPE_SECRET_KEY"]  # full-access key
        stripe.PaymentIntent.create(
            amount=self.amount_cents,
            currency="usd",
            customer=self.input,  # current item in the foreach iteration
        )
        self.next(self.join)

    @step
    def join(self, inputs):
        self.results = [i.input for i in inputs]
        self.next(self.end)

    @step
    def end(self):
        print(f"Processed {len(self.results)} customers")

if __name__ == "__main__":
    BillingFlow()

This is standard Metaflow. The @foreach creates one parallel branch per customer ID. On a Metaflow deployment with Kubernetes or AWS Batch, all branches execute simultaneously up to your cluster's concurrency limit. The problem: STRIPE_SECRET_KEY is a full-access key read from the environment in every branch, there's no cap on how many charges the flow can issue, and re-running the flow after a failure restarts all charge_customers branches from scratch — including ones that already charged the customer successfully.

Three gaps Metaflow's native tooling doesn't fill for vendor spend control

GapWhat happens in practiceMetaflow's answer
No per-flow spend cap A billing flow receives 10,000 customer IDs from a query that joined the wrong table. Metaflow dispatches all 10,000 charge_customers branches simultaneously on Batch or Kubernetes. Each branch calls Stripe independently. The flow completes successfully from Metaflow's perspective — all steps returned without exception — while Stripe records 10,000 unintended charges. None. Metaflow's metadata service tracks step states, execution times, and artifacts — not dollar amounts spent on vendor calls within steps.
No step-level vendor revoke You can call flow.stop() or cancel the Batch/Kubernetes job to stop the flow. But branches already executing will complete their current step — including the vendor API calls inside it. There's no way to stop in-flight vendor calls without rotating the real API key, which breaks every other Metaflow flow and non-Metaflow process using that key. Flow cancellation via Metaflow's CLI or Batch job termination. Neither stops vendor API calls already in progress within executing step containers.
No per-call audit with step context Metaflow's metadata service records step start/end times, artifact values, and exception tracebacks, but doesn't parse dollar amounts from Stripe response bodies or cross-reference Stripe Request-Id values with Metaflow run IDs and step names. Metaflow Client API for querying run metadata. No structured cost tracking, no Metaflow-step-to-Stripe-charge correlation out of the box.

The @foreach risk: parallel branches and simultaneous vendor calls

Metaflow's @foreach is semantically a fan-out followed by a join. When the start step routes to self.next(self.charge_customers, foreach="customer_ids"), Metaflow creates one task per item and dispatches all of them to the compute backend (local processes, Batch, Kubernetes) simultaneously. On a Batch cluster with 500 vCPUs available, 500 branches can execute in parallel — 500 simultaneous Stripe API calls in the first wave, another 500 in the next, and so on until all items are processed.

A vault key stored as self.vault_key in the start step is automatically available in all @foreach branches as a flow artifact. The vault key's dollar cap is enforced atomically across all concurrent branches: once cumulative spend hits the limit, further calls return 429. Branches that receive a 429 raise exceptions, which Metaflow records as step failures — surfaced in the Metaflow UI without any additional instrumentation.

The @retry risk: re-running failed steps re-executes vendor calls

Metaflow's @retry decorator retries a failed step up to N times before marking it as failed. Combined with @foreach, a single transient network error in one branch retries that branch — which means another Stripe API call on the retry. Without idempotency keys, retried branches can create duplicate charges.

More subtle: when you resume a failed Metaflow run using run.resume(), Metaflow re-runs all failed steps from their last successful checkpoint. If a charge_customers step failed mid-way through a @foreach, resuming re-executes all the failed branches — including branches that may have completed the Stripe call before encountering an unrelated error (like a metadata write failure). Stable idempotency keys prevent these duplicate charges on resume.

Scoping vault keys per Metaflow flow run

Issue the vault key in the start step and store it as a flow artifact. It automatically propagates to all downstream steps, including all @foreach branches:

import httpx
from metaflow import FlowSpec, step, retry
import stripe
import os

class BillingFlow(FlowSpec):

    @step
    def start(self):
        from metaflow import current
        self.customer_ids = ["cus_A", "cus_B", "cus_C", ...]
        self.amount_cents = 2999
        self.vault_key = self._issue_vault_key(
            run_id=current.run_id,
            budget_usd=300.0,
        )
        self.next(self.charge_customers, foreach="customer_ids")

    def _issue_vault_key(self, run_id: str, budget_usd: float) -> str:
        r = httpx.post(
            "https://proxy.keybrake.com/vault/keys",
            headers={"Authorization": f"Bearer {os.environ['KEYBRAKE_API_KEY']}"},
            json={
                "vendor": "stripe",
                "daily_usd_cap": budget_usd,
                "allowed_endpoints": ["POST /v1/payment_intents"],
                "expires_in": "4h",
                "agent_run_label": f"metaflow/{run_id}",
            },
        )
        return r.json()["vault_key"]

    @retry(times=2)
    @step
    def charge_customers(self):
        stripe.api_key = self.vault_key          # scoped vault key from start step
        stripe.api_base = "https://proxy.keybrake.com/stripe"
        stripe.PaymentIntent.create(
            amount=self.amount_cents,
            currency="usd",
            customer=self.input,
            idempotency_key=f"{self.input}-{self.amount_cents}",  # stable on retry
        )
        self.next(self.join)

    @step
    def join(self, inputs):
        self.results = [i.input for i in inputs]
        self.next(self.end)

    @step
    def end(self):
        print(f"Processed {len(self.results)} customers")

if __name__ == "__main__":
    BillingFlow()

The vault key is issued once in start and stored as self.vault_key. Metaflow's artifact system makes it available in every downstream step — @foreach branches each get their own copy of self.vault_key pointing to the same vault key string. All branches share the same cap. The real Stripe secret stays in Keybrake, never in Metaflow's artifact store or container environment variables. The audit log records each call with agent_run_label: "metaflow/{run_id}" — queryable by run ID or time window from the Keybrake dashboard.

How Keybrake fits

Keybrake is the proxy layer between your Metaflow steps and Stripe, Twilio, or Resend. You swap stripe.api_key for the vault key stored in self.vault_key and set stripe.api_base to https://proxy.keybrake.com/stripe. The real Stripe secret stays in Keybrake, not in Metaflow's artifact store or S3 backend. Each flow run gets its own vault key with its own dollar cap, endpoint allowlist, and expiry. Parallel @foreach branches that exceed the cap return 429s — these surface as Metaflow step failures with structured exception data, not silent charges spread across thousands of parallel containers.

Get early access

Related questions

Does storing the vault key as a Metaflow artifact expose it in S3 or the metadata service?

Metaflow artifacts are serialized (via pickle by default) and stored in S3 (on AWS) or the local datastore. A vault key string stored as self.vault_key will be persisted like any other artifact. Apply the same access controls to your Metaflow artifact store that you would to any sensitive data — bucket policies, IAM roles, encryption at rest. The key advantage over storing the real API key is the vault key's bounded lifespan: it expires after the configured TTL (e.g. 4h) and can be revoked from Keybrake even after it's been stored in the artifact backend. If the artifact bucket is compromised, the vault key is useless once expired or revoked, while a real Stripe secret would require a full rotation to invalidate.

How should I handle the vault key when resuming a failed Metaflow run?

When you resume a failed run using run.resume(), Metaflow re-runs only the failed steps — the start step is not re-executed. The vault key from the original run's start step is already stored as an artifact and available to the resumed @foreach branches. If the original vault key expired before the resume, you'll get 401 errors from the proxy. Issue vault keys with an expiry longer than your maximum expected flow runtime — for flows that can take hours, use a 12h or 24h TTL and rely on the dollar cap (not the TTL) as the primary safety mechanism.

Can I use a single vault key across multiple concurrent Metaflow flow runs?

Yes, but you lose per-run attribution in the audit log. A vault key can be shared across runs — all calls using that key accumulate toward the same cap. If two concurrent billing runs share a vault key with a $300 cap, the first $300 of combined spend from both runs hits the limit, and you can't tell from the cap exhaustion which run caused it. For cost attribution and incident investigation, issue one vault key per flow run in the start step. The marginal cost of one extra HTTP call to Keybrake at start is negligible compared to the diagnostic value of per-run audit trails.

Further reading