Dagster · AI agents · API key security

Dagster AI agent API key: scoping vendor calls in data pipeline assets

Dagster is the modern data orchestration platform built around software-defined assets — assets declare their upstream dependencies, Dagster resolves the execution graph, and the asset catalog tracks every materialization. When AI agent pipelines use Dagster assets to trigger vendor API calls, the platform's strengths become spending risks: dynamic partitions materialize hundreds of assets concurrently with no per-run vendor spend cap, RetryPolicy multiplies each failed asset materialization into multiple vendor charges, and backfill runs re-execute every selected partition — including partitions that already sent the charge. This page covers the vault-key pattern using Dagster's resource system that bounds per-run spend without restructuring your asset graph.

TL;DR

Dagster resources are the idiomatic way to inject external clients into assets — the same mechanism you use for database connections, S3 clients, and API clients. Wrap the vault key in a custom Dagster resource, issue the vault key when the resource is initialized at run start, and pass the resource to every asset that makes vendor calls. All concurrent partition materializations share the same vault key and its per-run cap. Revoke from the Keybrake dashboard without rotating the real API key stored in your Dagster deployment's secret manager.

How Dagster AI agent assets call vendor APIs

In a Dagster pipeline, vendor API calls live inside @asset-decorated functions that receive resources via dependency injection. An AI billing pipeline that processes subscription renewals might use dynamic partitions to materialize one asset per customer:

from dagster import asset, DynamicPartitionsDefinition, RetryPolicy
import stripe
import os

customer_partitions = DynamicPartitionsDefinition(name="customers")

@asset(
    partitions_def=customer_partitions,
    retry_policy=RetryPolicy(max_retries=3, delay=1.0),
)
def charged_customer(context) -> dict:
    customer_id = context.partition_key
    stripe.api_key = os.environ["STRIPE_SECRET_KEY"]  # full-access key
    result = stripe.PaymentIntent.create(
        amount=2999,
        currency="usd",
        customer=customer_id,
    )
    return {"status": result.status, "id": result.id}

This is standard Dagster. Materializing the charged_customer asset for all customer partitions executes one materialization per customer. Dagster's executor dispatches them concurrently up to your configured parallelism. The problem: STRIPE_SECRET_KEY is a full-access key from the environment, there's no per-run cap on total charges, and RetryPolicy(max_retries=3) means every materialization failure can produce up to 4 vendor API calls for the same customer.

Three gaps Dagster's native tooling doesn't fill for vendor spend control

GapWhat happens in practiceDagster's answer
No per-run spend cap A billing pipeline runs a backfill to catch up on 2,000 customer partitions created from a query that joined an unfiltered table. Dagster dispatches materializations in parallel. Each materialization calls Stripe independently. Dagster's asset catalog records 2,000 successful materializations while Stripe records 2,000 unintended charges — all at the correct RetryPolicy delay, well-behaved from Dagster's perspective. None. Dagster's asset catalog tracks materialization status, metadata, and lineage — not dollar amounts spent on vendor calls inside materializations.
No asset-level vendor revoke You can terminate a Dagster run from the Dagster UI or via dagster run terminate. Dagster will attempt to interrupt executing steps. But a materialization already past the Stripe API call will complete that call before the termination signal is processed. Rotating the real Stripe secret to stop in-flight materializations breaks every other asset and non-Dagster process using the same key. Run termination via Dagster UI or CLI. Graceful termination sends a signal; materializations mid-execution may complete their current operation including the vendor call before stopping.
No per-call audit with asset context Dagster's metadata system can store arbitrary metadata attached to each materialization (via context.add_output_metadata()), but doesn't automatically parse Stripe response data or cross-reference Stripe Request-Id values with Dagster run IDs and partition keys. Asset materialization metadata and event log. You can manually attach metadata in asset code, but there's no automatic vendor call correlation out of the box.

The dynamic partition risk: concurrent materializations and vendor call bursts

Dagster's dynamic partitions let you define a partition key set at runtime — useful for processing a dynamic list of customers, orders, or events. When you trigger a backfill or run on a large partition set, Dagster's multi-process executor dispatches up to max_concurrent materializations simultaneously. On a Dagster deployment with max_concurrent=50, 50 concurrent Stripe calls execute in the first wave — each one independent, each one counting against the same Stripe key's rate limits and your daily charge volume.

A vault key with a dollar cap turns this into a bounded operation. The cap is enforced atomically at the proxy layer across all concurrent materializations. Once the cap is hit, further calls return 429. Dagster materializations that receive a 429 raise exceptions, which Dagster records as materialization failures with the full exception traceback — visible in the asset catalog without any additional instrumentation.

Using Dagster resources to inject vault keys

Dagster resources are initialized once per run and injected into assets that declare them as dependencies. This is the idiomatic place to issue a vault key — once at run start, shared across all materializations in the run:

import httpx
from dagster import asset, resource, DynamicPartitionsDefinition, RetryPolicy, RunConfig
import stripe
import os
from dataclasses import dataclass

customer_partitions = DynamicPartitionsDefinition(name="customers")

@dataclass
class KeybrakeResource:
    vault_key: str
    base_url: str = "https://proxy.keybrake.com/stripe"

@resource(config_schema={"budget_usd": float})
def keybrake_stripe_resource(context):
    run_id = context.run_id
    r = httpx.post(
        "https://proxy.keybrake.com/vault/keys",
        headers={"Authorization": f"Bearer {os.environ['KEYBRAKE_API_KEY']}"},
        json={
            "vendor": "stripe",
            "daily_usd_cap": context.resource_config["budget_usd"],
            "allowed_endpoints": ["POST /v1/payment_intents"],
            "expires_in": "4h",
            "agent_run_label": f"dagster/{run_id}",
        },
    )
    key = r.json()["vault_key"]
    return KeybrakeResource(vault_key=key)

@asset(
    partitions_def=customer_partitions,
    retry_policy=RetryPolicy(max_retries=3, delay=1.0),
    required_resource_keys={"keybrake"},
)
def charged_customer(context) -> dict:
    kb = context.resources.keybrake
    stripe.api_key = kb.vault_key              # scoped vault key from resource
    stripe.api_base = kb.base_url
    customer_id = context.partition_key
    result = stripe.PaymentIntent.create(
        amount=2999,
        currency="usd",
        customer=customer_id,
        idempotency_key=f"{customer_id}-2999",  # stable on retry
    )
    return {"status": result.status, "id": result.id}

The keybrake_stripe_resource is initialized once when the Dagster run starts. All charged_customer materializations in the run share the same KeybrakeResource instance and its vault key. The vault key's cap accumulates across all concurrent materializations. The real Stripe secret stays in Keybrake, never in Dagster's environment or asset metadata. The audit log records each call with agent_run_label: "dagster/{run_id}" — queryable by run ID or time window.

How Keybrake fits

Keybrake is the proxy layer between your Dagster assets and Stripe, Twilio, or Resend. You wrap it in a Dagster resource, issue the vault key at resource initialization (run start), and inject it into any asset that makes vendor calls. The real Stripe secret stays in Keybrake, not in Dagster's configuration or asset code. Each run gets its own vault key with its own dollar cap, endpoint allowlist, and expiry. Concurrent partition materializations that exceed the cap return 429s — these surface as Dagster materialization failures with structured traceback data, not silent charges across thousands of concurrent partitions.

Get early access

Related questions

How do I scope vault keys per partition rather than per run if I need per-partition budgets?

Instead of issuing the vault key in the resource (once per run), issue it inside the asset function itself (once per materialization). Read context.partition_key as the label and your per-partition budget from the asset's config. The trade-off: one Keybrake API call per materialization instead of one per run. For large partition sets (thousands of concurrent materializations), the per-run approach is more efficient — one vault key, one cap, all materializations share the budget. For per-partition accounting, the per-materialization approach gives you a separate audit row and cap per partition key.

How does RetryPolicy interact with vault key cap exhaustion?

When a materialization hits the vault key's cap, the proxy returns 429. Dagster's RetryPolicy will retry the materialization — which will also hit the cap, producing another 429. This creates a retry storm against an already-exhausted cap. To prevent this, catch 429 responses from the proxy and raise a specific exception class (e.g. BudgetExhaustedError) that you configure as non-retryable using RetryPolicy(max_retries=3, retry_on=stripe.error.APIConnectionError) — explicitly excluding your budget-exhaustion exception. Cap exhaustion is an intentional stop, not a transient error to retry.

Does the vault key work with Dagster's IO managers and op-based pipelines?

Yes. The same resource pattern works for op-based Dagster pipelines using @op and @job. Pass the resource via required_resource_keys={"keybrake"} in your op definition and access it via context.resources.keybrake. For IO managers that make vendor API calls (e.g. a custom IO manager that writes results to a SaaS API), inject the resource into the IO manager class in the same way. Dagster's resource system is consistent across @asset, @op, and IO managers — the vault key integration works in all three.

Further reading