Apache Beam · AI agents · API key security
Apache Beam AI agent API key: scoping vendor calls in parallel data pipelines
Apache Beam is a unified programming model for large-scale batch and streaming data processing — ParDo for parallel per-element transforms, DoFn for the per-element processing logic, and PCollection as the distributed dataset abstraction. Beam pipelines run on Dataflow, Apache Flink, Apache Spark, and the local Direct runner. AI agent teams adopt Beam when they need to process large record sets and take actions per record — enriching a customer database, triggering per-customer billing, or sending outbound notifications at scale. When those actions call Stripe, Twilio, or Resend, Beam's parallel bundle model becomes a vendor spend amplifier: a single ParDo over a million-row PCollection fans out to thousands of simultaneous DoFn.process() calls, each making vendor API calls with no per-pipeline dollar cap. Bundle retries multiply spend further when a DoFn fails mid-bundle. This page covers the vault-key pattern that bounds vendor spend per Beam pipeline run.
TL;DR
Issue a vault key once before the pipeline starts — typically in a setup step or as a pipeline option — then pass the vault key string into your DoFn as a constructor argument or side input. Each pipeline run gets its own vault key with its own dollar cap and endpoint allowlist. All parallel DoFn.process() workers share the same vault key, so the cap accumulates atomically across every bundle. The real Stripe or Twilio secret stays in Keybrake, never in your DoFn code or pipeline options. Revoking a runaway pipeline is a single API call — no key rotation, no worker restart required.
How Beam AI agent pipelines call vendor APIs
A typical Beam AI agent pipeline uses a ParDo transform with a DoFn that calls a vendor API per element:
import apache_beam as beam
import stripe
class ChargeCustomerFn(beam.DoFn):
def setup(self):
# setup() runs once per worker — all bundles on this worker share this client
self.stripe = stripe.Stripe(api_key="sk_live_xxx") # full-access key
def process(self, customer):
# process() runs per element — all in parallel across workers and bundles
try:
charge = self.stripe.payment_intents.create(
amount=customer["amount_cents"],
currency="usd",
customer=customer["id"],
)
yield {"customer_id": customer["id"], "charge_id": charge["id"]}
except Exception as e:
yield beam.pvalue.TaggedOutput("errors", {"customer_id": customer["id"], "error": str(e)})
def run_billing_pipeline(customers_path: str):
with beam.Pipeline(runner="DataflowRunner") as p:
customers = p | "Read" >> beam.io.ReadFromText(customers_path) | "Parse" >> beam.Map(json.loads)
results, errors = (
customers
| "Charge" >> beam.ParDo(ChargeCustomerFn()).with_outputs("errors", main="charged")
)
results | "WriteResults" >> beam.io.WriteToText("gs://my-bucket/results")
errors | "WriteErrors" >> beam.io.WriteToText("gs://my-bucket/errors")
This is standard Beam. The DoFn.setup() method runs once per worker process, initializing the Stripe client with a full-access key. process() then runs for every element in every bundle assigned to that worker. On Dataflow with autoscaling, a million-row input can spawn thousands of workers, each running hundreds of process() calls in parallel — all with the same full-access Stripe key and no per-pipeline dollar cap. If customers_path points to a file with a data bug that includes 100,000 records instead of 1,000, 100,000 Stripe calls fire before anyone notices.
Three gaps Beam's native tooling doesn't fill for vendor spend control
| Gap | What happens in practice | Beam's answer |
|---|---|---|
| No per-pipeline spend cap | Beam's resource model tracks CPU, memory, and data throughput — not vendor API dollars. You can set a max_num_workers to cap parallelism, but this only controls how many DoFn workers run simultaneously, not the total dollar value of vendor calls across the pipeline run. A pipeline processing 10,000 records with 10 workers still makes 10,000 vendor calls — just in 10-at-a-time batches. There is no mechanism to stop the pipeline when cumulative vendor spend reaches a dollar threshold. |
Dataflow provides quotas and billing alerts on Google Cloud spend (compute hours, storage), not on the vendor API spend triggered by pipeline logic. |
| No mid-pipeline vendor revoke | Beam supports draining or cancelling a Dataflow job mid-run. Cancelling stops new bundles from being assigned but does not interrupt a DoFn.process() that is currently executing on a worker. The API key in DoFn.setup() is the real Stripe secret — rotating it requires redeploying the pipeline, which doesn't affect already-running workers (they loaded the key during setup()). To stop vendor calls on a running pipeline, you must cancel the Dataflow job and accept that in-flight bundles may complete or fail mid-processing. |
Dataflow job cancellation and drain are available. No per-call API key scoping or mid-run vendor call termination. |
| No per-call audit with pipeline context | Beam's logging captures worker logs and DoFn metrics. It doesn't parse dollar amounts from Stripe responses, correlate Stripe PaymentIntent.id values with Dataflow job ID and bundle ID in a queryable cost table, or provide a per-pipeline spend summary. Debugging an overcharge requires cross-referencing Dataflow logs (unstructured text) with the Stripe dashboard, matching on timestamps since there's no shared identifier between the two systems. |
Beam's metrics system (beam.metrics) allows custom counters, but these require instrumenting the DoFn and don't parse vendor response dollars automatically. |
Bundle retry behavior and vendor spend multiplication
Beam runners, including Dataflow, retry failed bundles automatically. If a DoFn.process() call raises an unhandled exception, Beam marks the bundle as failed and reschedules it on another worker. The retry reruns all elements in the bundle, not just the failing element. Elements that had already completed their vendor API calls within the bundle are processed again — a duplicate Stripe call per successfully-charged element.
Beam's retry model assumes DoFn transforms are idempotent or that transient failures are the common case. Neither is true when making vendor API calls that cost real money. A bundle of 500 customers with a DoFn failure at element 450 retries all 500, causing 449 duplicate Stripe calls (plus the original failed one). Without idempotency keys and a spend cap, a single transient Dataflow worker failure can double the vendor spend for an entire bundle.
Scoping vault keys per Beam pipeline run
import apache_beam as beam
import requests
class VaultKeyOptions(beam.options.pipeline_options.PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument("--budget_usd", type=float, default=500.0)
parser.add_value_provider_argument("--job_name", type=str, default="billing-pipeline")
class ChargeCustomerFn(beam.DoFn):
def __init__(self, vault_key: str, job_id: str):
self._vault_key = vault_key
self._job_id = job_id
def setup(self):
import stripe
self._stripe = stripe.Stripe(
api_key=self._vault_key,
base_url="https://proxy.keybrake.com/stripe",
)
def process(self, customer):
try:
charge = self._stripe.payment_intents.create(
amount=customer["amount_cents"],
currency="usd",
customer=customer["id"],
idempotency_key=f"{self._job_id}-{customer['id']}",
)
yield {"customer_id": customer["id"], "charge_id": charge["id"]}
except Exception as e:
if "cap_exhausted" in str(e):
# Signal pipeline to stop — don't retry on cap exhaustion
raise beam.transforms.combinefn_lifecycle.DoFnLifecycleException(str(e))
yield beam.pvalue.TaggedOutput("errors", {"customer_id": customer["id"], "error": str(e)})
def run_billing_pipeline(customers_path: str, budget_usd: float = 500.0):
# Issue vault key before the pipeline starts
resp = requests.post(
"https://proxy.keybrake.com/vault/keys",
headers={"Authorization": f"Bearer {KEYBRAKE_API_KEY}"},
json={
"vendor": "stripe",
"daily_usd_cap": budget_usd,
"allowed_endpoints": ["POST /v1/payment_intents"],
"expires_in": "4h",
"agent_run_label": f"beam/{customers_path}",
},
)
vault_key = resp.json()["vault_key"]
job_id = f"billing-{int(time.time())}"
with beam.Pipeline(runner="DataflowRunner") as p:
customers = p | "Read" >> beam.io.ReadFromText(customers_path) | "Parse" >> beam.Map(json.loads)
results, errors = (
customers
| "Charge" >> beam.ParDo(
ChargeCustomerFn(vault_key=vault_key, job_id=job_id)
).with_outputs("errors", main="charged")
)
results | "WriteResults" >> beam.io.WriteToText("gs://my-bucket/results")
errors | "WriteErrors" >> beam.io.WriteToText("gs://my-bucket/errors")
The vault key is issued once before the pipeline is submitted to Dataflow. It's passed as a constructor argument to ChargeCustomerFn, which means it's serialized into the pipeline graph and distributed to all workers at job startup. DoFn.setup() uses the vault key to initialize the Stripe client pointed at the Keybrake proxy. Every process() call across every bundle on every worker shares the same vault key — the cap accumulates atomically in Keybrake across all parallel calls.
The agent_run_label includes the input path, so every vendor call in the audit log is traceable to the specific Beam pipeline run. The idempotency key uses job_id + customer_id — stable across bundle retries, so a failed-and-retried bundle doesn't charge the same customer twice (Stripe deduplicates on the idempotency key within a 24-hour window).
How Keybrake fits
Keybrake is the proxy layer between your Beam DoFns and Stripe, Twilio, or Resend. All parallel process() calls on all workers share a single vault key whose dollar cap applies across the entire pipeline run. When the cap is hit, Keybrake returns a 429 to any DoFn worker that makes a subsequent vendor call — no further charges fire regardless of how many workers are still running. Revoking a runaway pipeline is a single DELETE /vault/keys/{key_id} call from any terminal — no Dataflow job drain, no Stripe key rotation, no worker restart. The audit log correlates every Stripe PaymentIntent.id with the agent_run_label, giving you a per-pipeline spend breakdown without manually cross-referencing Dataflow logs and the Stripe dashboard.
Related questions
Does the vault key serialized into the pipeline graph expose my Keybrake secret?
The vault key passed to ChargeCustomerFn is a scoped key with a dollar cap and TTL — not the Keybrake admin API key used to create it. If it's extracted from a serialized pipeline graph, the attacker can only make vendor calls up to your configured cap (e.g., $500), and the key expires in 4 hours. Your real Stripe secret never leaves Keybrake. For higher security, store the vault key in Google Cloud Secret Manager and read it from the runner environment rather than serializing it into the graph — the DoFn.setup() method is the right place for that fetch.
How do bundle retries interact with the spend cap?
When a bundle retries, every element in the bundle runs through process() again. Elements that already completed their vendor API calls generate duplicate Stripe calls — but Stripe's idempotency key (set to job_id-customer_id) deduplicates them at the Stripe level. The Keybrake proxy still counts the retry call against the cap (it sees an incoming request regardless of Stripe's deduplication), but the idempotency key prevents the actual charge from doubling. Set your cap high enough to account for 2–3× retry amplification on your expected element count.
What if my Beam pipeline runs for more than 4 hours?
Set expires_in to match or exceed your expected pipeline duration. For very long-running pipelines (8+ hours), issue the vault key with a longer TTL, or add a pipeline step at the start that writes the vault key to a Cloud Storage path and has each DoFn.setup() read it fresh. For streaming pipelines with unbounded runtimes, issue a new vault key per micro-batch window — the vault key TTL should match the micro-batch interval, and each window's DoFn gets a fresh key with a fresh cap.
Further reading
- Airflow AI agent API key — if you orchestrate Beam jobs from Airflow DAGs, the vault-key pattern applies at both layers: one vault key per DAG run (in the Airflow operator), not per Beam pipeline.
- Prefect AI agent API key — similar Python-native orchestration pattern; Prefect tasks map to Beam DoFns, and the vault-key-per-flow-run approach is equivalent.
- AI agent idempotency — the full idempotency key design that prevents duplicate charges on Beam bundle retries, covering the Stripe window, Twilio deduplication, and how to build a stable key from pipeline context.
- AI agent API key best practices — the five operational controls that reduce vendor spend risk across all agent orchestration frameworks, including Beam pipelines.