Apache Airflow · AI agents · API key security

Airflow AI agent API key: scoping vendor calls in DAGs and operators

Apache Airflow gives you mature, battle-tested workflow orchestration — DAGs, sensors, XCom, dynamic task mapping, and schedulers that have run in production for a decade. Those same features become liabilities when your AI agent tasks call Stripe, Twilio, or Resend: a task with retries=3 is three vendor API calls if the first two hit network timeouts, and a DAG with expand() dynamic task mapping can fan out to hundreds of simultaneous vendor calls before the next scheduler heartbeat. This page covers what Airflow's native tooling doesn't handle for vendor API spend control, and the vault-key pattern that does.

TL;DR

Airflow's retries and expand() are designed for idempotent compute — not for vendor API calls that cost money. A vault key proxy sits between your Airflow tasks and the vendor: issue one vault key at DAG-run start using a setup task, enforce a per-run dollar cap, and get a structured per-call audit log with the DAG run ID attached. If a run goes wrong, revoke the vault key without rotating the real Stripe key that every other DAG depends on.

How Airflow AI agent DAGs call vendor APIs

In an Airflow DAG, vendor API calls live inside PythonOperator tasks (or @task-decorated functions in the TaskFlow API). A scheduled billing agent might look like:

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
import stripe, os

@dag(schedule_interval="@daily", start_date=days_ago(1))
def billing_dag():

    @task(retries=3, retry_delay=timedelta(seconds=30))
    def charge_customer(customer_id: str) -> dict:
        stripe.api_key = os.environ["STRIPE_SECRET_KEY"]  # full-access key
        return stripe.PaymentIntent.create(
            amount=2900,
            currency="usd",
            customer=customer_id,
        )

    @task
    def get_customer_ids() -> list[str]:
        # returns list from database
        return fetch_due_customers()

    customer_ids = get_customer_ids()
    charge_customer.expand(customer_id=customer_ids)  # dynamic task mapping

billing_dag()

This is standard modern Airflow. The problem: STRIPE_SECRET_KEY is a long-lived full-access key, retries=3 will reissue the same charge on a timeout without an idempotency key, and expand() with 300 customer IDs creates 300 simultaneous Stripe tasks — each sharing the same unbounded key.

Three gaps Airflow's native tooling doesn't fill for vendor spend control

GapWhat happens in practiceAirflow's answer
No per-run spend cap A billing DAG receives a corrupted customer list with 5,000 rows instead of 50. Airflow faithfully schedules 5,000 charge tasks. Each task calls Stripe. The cap on real-money damage is your Stripe account limit — not the $500 you intended for that day's billing run. None. DAG and task run logs show what executed; they don't prevent it mid-run.
No per-run revoke You mark a DAG run as failed at 3am. Tasks in state QUEUED or RUNNING may have already sent Stripe API calls. Rotating the Stripe key to halt in-flight calls breaks every other DAG that uses it, including unrelated billing runs that are succeeding. You can clear tasks or mark them failed, but in-flight vendor API calls that have already left the Airflow executor cannot be recalled.
No per-call audit with DAG run context Airflow task logs show PaymentIntent.create() was called and whether it raised an exception. They don't parse the dollar amount from the Stripe response, don't cross-reference the Stripe Request-Id with the Airflow run_id, and aren't queryable by spend. Task instance logs and XCom. No structured cost extraction, no Airflow-run-to-Stripe-charge mapping.

The dynamic task mapping risk: expand() and vendor call fan-out

Airflow's expand() (introduced in Airflow 2.3) is one of its most powerful features: a task that dynamically spawns as many instances as elements in a list. For data processing, this is excellent. For vendor API calls, the fan-out requires a spend guard.

Consider what happens when a billing DAG runs during a data error:

  1. A database bug returns 2,000 customer IDs instead of the expected 200.
  2. charge_customer.expand(customer_id=customer_ids) spawns 2,000 task instances.
  3. Airflow's executor begins processing them in parallel, limited only by the worker concurrency setting.
  4. At 100 concurrent workers, Airflow processes all 2,000 tasks in roughly 20 batches — completing before most engineers start their morning.
  5. Each task issued a real Stripe charge. Your Stripe dashboard now shows 2,000 charges.

A per-run vault key with a daily_usd_cap: 500 would have stopped the fan-out at task ~17, returned 429s to the remaining tasks (which Airflow logs as task failures), and left a clear audit trail showing exactly where the cap was hit and why.

The retry risk: task-level retries and idempotency

Airflow's retries parameter reissues the task on any exception — including network timeouts that occur after the Stripe call was already received and processed by Stripe. A task that fails with requests.exceptions.Timeout doesn't mean Stripe didn't receive the charge; it means Airflow didn't receive Stripe's confirmation.

The correct fix is stable idempotency keys on every Stripe call, using the Airflow run_id and task_id as the key prefix. A vault key proxy adds a second safety layer: retries that would push the run past its dollar cap are blocked at the proxy before reaching Stripe at all, regardless of whether the original call succeeded.

Scoping vault keys per DAG run in Airflow

Issue the vault key in a setup task that runs first, then pass it downstream via XCom or as a task parameter:

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
import httpx, stripe, os
from datetime import timedelta

@dag(schedule_interval="@daily", start_date=days_ago(1))
def billing_dag():

    @task
    def issue_vault_key(**context) -> str:
        run_id = context["run_id"]
        r = httpx.post(
            "https://proxy.keybrake.com/vault/keys",
            headers={"Authorization": f"Bearer {os.environ['KEYBRAKE_API_KEY']}"},
            json={
                "vendor": "stripe",
                "daily_usd_cap": 500.0,
                "allowed_endpoints": ["POST /v1/payment_intents"],
                "expires_in": "6h",
                "agent_run_label": f"airflow-billing/{run_id}",
            },
        )
        return r.json()["vault_key"]

    @task(retries=3, retry_delay=timedelta(seconds=30))
    def charge_customer(customer_id: str, vault_key: str, **context) -> dict:
        run_id = context["run_id"]
        stripe.api_key = vault_key                          # scoped key
        stripe.api_base = "https://proxy.keybrake.com/stripe"
        return stripe.PaymentIntent.create(
            amount=2900,
            currency="usd",
            customer=customer_id,
            idempotency_key=f"{run_id}-{customer_id}",     # stable retry key
        )

    @task
    def get_customer_ids() -> list[str]:
        return fetch_due_customers()

    vault_key = issue_vault_key()
    customer_ids = get_customer_ids()
    charge_customer.partial(vault_key=vault_key).expand(customer_id=customer_ids)

billing_dag()

Three changes from the original: (1) STRIPE_SECRET_KEY is gone from task code — only Keybrake holds the real secret; (2) the vault key is shared across all expanded task instances via .partial(), so the $500 cap is a per-run ceiling not a per-task ceiling; (3) run_id + customer_id is the idempotency key, so Airflow retries are genuinely idempotent even on network failures. The Keybrake audit log records each call with agent_run_label: "airflow-billing/{run_id}", giving you a queryable per-run spend view.

How Keybrake fits

Keybrake is the proxy layer between your Airflow tasks and Stripe, Twilio, or Resend. You swap stripe.api_key for the vault key and set stripe.api_base to https://proxy.keybrake.com/stripe. The real Stripe secret stays in Keybrake, not in Airflow connections or environment variables on your workers. Each DAG run gets its own vault key with its own dollar cap, endpoint allowlist, and expiry. Dynamic task mapping fan-outs that hit the per-run cap return 429s — catchable exceptions in your tasks, not silent charges that appear on next week's Stripe invoice.

Get early access

Related questions

How do I pass the vault key to dynamically mapped tasks using Airflow's .partial()?

Use task.partial(vault_key=vault_key).expand(customer_id=customer_ids). The .partial() method fixes arguments that are the same across all expanded instances, while .expand() specifies the argument to iterate. The vault key returned by your setup task is an XCom value — Airflow resolves it before the mapped tasks run. All 300 expanded instances share the same vault key and therefore the same per-run cap, which is correct: the $500 limit represents total authorized spend for the entire DAG run, not $500 per customer task.

What happens to the vault key if I clear and re-run failed tasks?

A vault key issued with expires_in: "6h" remains valid until it expires or you explicitly revoke it. If you clear failed tasks and re-run them within the vault key's TTL, they reuse the same key and the same per-run cap. This is correct behavior: the cap represents authorized spend for that DAG run, including re-runs of failed tasks. If you clear the entire DAG run and restart from scratch, the setup task runs again and issues a fresh vault key with a fresh cap — so the re-run starts with a clean budget, not the remaining balance from the original run.

Can I use Airflow Connections to store the Keybrake API key instead of environment variables?

Yes. Create an Airflow Connection with the connection ID keybrake_default using the Generic connection type, and store your KEYBRAKE_API_KEY in the password field. Retrieve it with BaseHook.get_connection("keybrake_default").password. This integrates with Airflow's Secrets Backend (AWS Secrets Manager, HashiCorp Vault, etc.) for centralized secret management. The vault keys issued per DAG run are short-lived and ephemeral — they don't need to be stored in Airflow Connections.

Further reading