Agent Governance
Haystack Stripe Integration: Restricted API Keys, Spend Caps, and Agent Governance
Haystack 2.x makes it clean to build a pipeline that retrieves documents, reasons over them with an LLM, and then calls Stripe to bill the user. What it doesn't handle is what happens when that pipeline fails partway through and you retry it — because retrying pipeline.run() re-calls every component from the beginning, including the Stripe component that already fired.
This post covers three failure modes specific to Haystack's pipeline architecture — pipeline retry duplication, concurrent branch execution, and shared component instance state — and shows the two-layer governance pattern that closes all three: a restricted Stripe API key as a first layer, and per-run vault keys via a spend-cap proxy as a second.
The standard Haystack Stripe pattern
Haystack 2.x is built around @component-decorated Python classes that declare inputs and outputs. You build a pipeline by adding component instances and connecting their input/output ports. A billing pipeline that retrieves a plan, confirms it with an LLM, and charges Stripe looks like this:
from haystack import Pipeline, component
from haystack.components.generators.chat import OpenAIChatGenerator
import stripe
import os
@component
class StripeChargeComponent:
def __init__(self, stripe_key: str):
self.stripe_key = stripe_key
@component.output_types(charge_id=str, amount_cents=int, status=str)
def run(self, customer_id: str, amount_cents: int, description: str):
charge = stripe.Charge.create(
customer=customer_id,
amount=amount_cents,
currency="usd",
description=description,
api_key=self.stripe_key,
)
return {
"charge_id": charge.id,
"amount_cents": amount_cents,
"status": charge.status,
}
# build the pipeline once
stripe_key = os.environ["STRIPE_SECRET_KEY"]
pipeline = Pipeline()
pipeline.add_component("confirm", OpenAIChatGenerator(model="gpt-4o"))
pipeline.add_component("charge", StripeChargeComponent(stripe_key=stripe_key))
pipeline.connect("confirm.replies", "charge.description")
# run it
result = pipeline.run({
"confirm": {
"messages": [{"role": "user", "content": "Confirm Pro plan charge for cus_ABC123, $29"}]
},
"charge": {"customer_id": "cus_ABC123", "amount_cents": 2900},
})
This works for the happy path. Haystack's component system is elegant for the retrieval-augmented generation use case it was designed around. The problem surfaces when you add the error handling patterns that production systems require.
Failure mode 1: Pipeline retry duplicates the charge
Risk: When pipeline.run() raises a PipelineError — because a downstream component failed after the Stripe component succeeded — retrying the full pipeline re-calls the Stripe component with no way to know the charge already fired. Without an idempotency key, Stripe bills the customer twice.
Haystack pipelines execute components in dependency order. If your pipeline has three stages — document retrieval, LLM confirmation, Stripe charge — and the LLM confirmation succeeds but a downstream write (saving the result to a database, sending a confirmation email) fails, you get a PipelineError. The most natural recovery is to catch it and retry:
import time
from haystack.core.errors import PipelineError
def run_billing_pipeline(pipeline, inputs, retries=3):
for attempt in range(retries):
try:
return pipeline.run(inputs)
except PipelineError as e:
if attempt < retries - 1:
time.sleep(2 ** attempt)
continue
raise
The retry re-runs every component from the beginning. If the Stripe charge component succeeded on attempt 1 and a downstream component failed, attempt 2 calls Stripe again — new charge, new charge ID, no relationship to the first. The customer sees two charges.
The fix is to bind an idempotency key to the pipeline run before it starts, pass it through the component graph as an input, and use it in the Stripe call:
import uuid
@component
class StripeChargeComponent:
def __init__(self, stripe_key: str):
self.stripe_key = stripe_key
@component.output_types(charge_id=str, amount_cents=int, status=str)
def run(
self,
customer_id: str,
amount_cents: int,
description: str,
idempotency_key: str, # injected per pipeline run
):
charge = stripe.Charge.create(
customer=customer_id,
amount=amount_cents,
currency="usd",
description=description,
api_key=self.stripe_key,
idempotency_key=idempotency_key,
)
return {
"charge_id": charge.id,
"amount_cents": amount_cents,
"status": charge.status,
}
# caller generates the key once per logical operation and reuses it across retries
def run_billing_pipeline_safe(pipeline, customer_id, amount_cents, description):
run_id = str(uuid.uuid4())
idempotency_key = f"billing-{customer_id}-{run_id}"
inputs = {
"confirm": {
"messages": [{"role": "user", "content": f"Confirm ${amount_cents/100:.2f} charge for {customer_id}"}]
},
"charge": {
"customer_id": customer_id,
"amount_cents": amount_cents,
"description": description,
"idempotency_key": idempotency_key, # same key on all retries
},
}
for attempt in range(3):
try:
return pipeline.run(inputs)
except PipelineError:
if attempt < 2:
time.sleep(2 ** attempt)
else:
raise
Stripe's idempotency key deduplication window is 24 hours. On the second pipeline attempt, Stripe sees the same idempotency_key and returns the original charge response without creating a new charge. The customer is billed exactly once regardless of how many pipeline retries the system makes.
Failure mode 2: Concurrent branch execution charges Stripe twice per run
Risk: Haystack 2.x executes independent pipeline branches concurrently when they have no data dependency. If two branches each call a Stripe component — a billing branch and a usage-record branch, for example — they can fire simultaneously within a single pipeline.run() call, creating two separate Stripe operations from one logical user action.
Haystack's async execution model is a feature, not a bug — parallel branches reduce pipeline latency significantly in retrieval-augmented workflows. The problem is when engineers treat the billing call and an ancillary Stripe call (recording a metered usage event, creating a Stripe Meter event, or writing to a usage record) as independent branches:
# DANGER: both branches run concurrently in async execution
pipeline.add_component("charge", StripeChargeComponent(stripe_key=key))
pipeline.add_component("usage", StripeMeterComponent(stripe_key=key)) # separate branch
# both components connect to the same upstream LLM output
pipeline.connect("confirm.replies", "charge.description")
pipeline.connect("confirm.replies", "usage.description")
# pipeline.run() fires both branches in parallel —
# two Stripe API calls from one logical billing operation
The branches run concurrently because Haystack sees them as independent (no data dependency between charge and usage). This isn't a bug in your retry logic — it happens on the first successful run.
The correct design is to either serialize them (connect the output of the charge component into the usage component so they run sequentially) or, better, handle both in a single component that wraps the full Stripe interaction atomically:
@component
class StripeBillingComponent:
"""Single component that handles both the charge and the usage record atomically."""
def __init__(self, stripe_key: str):
self.stripe_key = stripe_key
@component.output_types(charge_id=str, usage_record_id=str, status=str)
def run(
self,
customer_id: str,
amount_cents: int,
description: str,
idempotency_key: str,
):
# charge first
charge = stripe.Charge.create(
customer=customer_id,
amount=amount_cents,
currency="usd",
description=description,
api_key=self.stripe_key,
idempotency_key=idempotency_key,
)
# usage record after charge succeeds
usage_record = stripe.SubscriptionItem.create_usage_record(
subscription_item_id="si_ABC",
quantity=1,
timestamp="now",
)
return {
"charge_id": charge.id,
"usage_record_id": usage_record.id,
"status": charge.status,
}
Keeping all Stripe calls in one component eliminates the concurrency problem — a single component always executes in a single thread within one pipeline.run() call.
Failure mode 3: Shared component instances across pipeline runs
Risk: Haystack components are instantiated once when you call pipeline.add_component() and then reused across every pipeline.run() call on that pipeline. If you initialize StripeChargeComponent(stripe_key=os.environ["STRIPE_SECRET_KEY"]) at pipeline-build time, every pipeline run — for every customer, every agent type, every use case — shares one Stripe API key with one set of permissions. There is no per-run key isolation and no per-agent-type scope.
This is the subtler problem. Compare two component initialization patterns:
# WRONG: key baked into component at pipeline-build time
# all pipeline.run() calls share this single Stripe key
pipeline.add_component(
"charge",
StripeChargeComponent(stripe_key=os.environ["STRIPE_SECRET_KEY"])
)
# CORRECT: key injected as a pipeline input, resolved per run
@component
class StripeChargeComponent:
@component.output_types(charge_id=str, status=str)
def run(
self,
customer_id: str,
amount_cents: int,
description: str,
stripe_key: str, # injected per run, not set at component init
idempotency_key: str,
):
charge = stripe.Charge.create(
customer=customer_id,
amount=amount_cents,
currency="usd",
description=description,
api_key=stripe_key,
idempotency_key=idempotency_key,
)
return {"charge_id": charge.id, "status": charge.status}
# caller passes the key at run time — can be scoped per agent type, per customer segment
result = pipeline.run({
"charge": {
"customer_id": customer_id,
"amount_cents": amount_cents,
"description": description,
"stripe_key": get_scoped_key_for_agent_type(agent_type), # per-run scope
"idempotency_key": f"billing-{customer_id}-{run_id}",
}
})
Injecting the key as a run-time input lets you scope it. A billing pipeline gets a key restricted to charges:write. A refund pipeline gets a key restricted to refunds:write. A reporting pipeline gets a key with only read permissions. If any of these pipelines is compromised or stuck in a loop, the damage radius is bounded by the key's permissions.
Adding a proxy layer for spend caps and audit
The idempotency key pattern protects against duplicate charges, and per-run key injection enables scope isolation. But neither gives you a cap on total Stripe spend across pipeline runs, nor a queryable audit log of exactly what the pipeline charged and when. For that you need a proxy layer between the component and Stripe.
The proxy URL replaces the Stripe base URL in the component. No code change to the Stripe SDK — just point the base_url at the proxy:
@component
class StripeChargeComponent:
def __init__(self, proxy_base_url: str = "https://proxy.keybrake.com"):
self.proxy_base_url = proxy_base_url
@component.output_types(charge_id=str, status=str)
def run(
self,
customer_id: str,
amount_cents: int,
description: str,
vault_key: str, # vault_key replaces the raw Stripe key
idempotency_key: str,
):
client = stripe.StripeClient(
api_key=vault_key,
base_url=self.proxy_base_url + "/stripe/",
)
charge = client.charges.create(params={
"customer": customer_id,
"amount": amount_cents,
"currency": "usd",
"description": description,
}, options={"idempotency_key": idempotency_key})
return {"charge_id": charge.id, "status": charge.status}
# caller resolves a scoped vault key per pipeline run
def run_billing_pipeline(pipeline, customer_id, amount_cents, description, agent_type):
run_id = str(uuid.uuid4())
vault_key = get_vault_key_for_agent_type(agent_type) # scoped at proxy layer
result = pipeline.run({
"charge": {
"customer_id": customer_id,
"amount_cents": amount_cents,
"description": description,
"vault_key": vault_key,
"idempotency_key": f"billing-{customer_id}-{run_id}",
}
})
return result
The proxy looks up the real Stripe key from the vault, checks the policy (daily $ cap, allowed endpoint list, expiry), forwards the request, parses the charge amount from the Stripe response, and logs every call to an audit table. The dashboard shows today's spend per agent type. The kill switch revokes a vault key without rotating anything in your code.
Governance comparison: raw key vs restricted key vs vault key
| Property | Raw Stripe key | Restricted key | Vault key via proxy |
|---|---|---|---|
| Endpoint allowlist | All endpoints | ✅ Locked to specific resources | ✅ Additional per-policy enforcement |
| Daily spend cap | ❌ None | ❌ None | ✅ Enforced at proxy before call reaches Stripe |
| Per-run isolation | ❌ All runs share one key | ❌ All runs share one key | ✅ Vault key scoped per agent type or per run |
| Pipeline retry dedup | ❌ No dedup without idempotency key in code | ❌ No dedup without idempotency key in code | ✅ Proxy-layer dedup on idempotency key |
| Concurrent branch safety | ❌ Both branches hit Stripe simultaneously | ❌ Both branches hit Stripe simultaneously | ✅ Proxy serializes and deduplicates per key+endpoint |
| Audit trail | ❌ Stripe dashboard only, no pipeline context | ❌ Stripe dashboard only | ✅ Per-call log with vault key, endpoint, amount, pipeline run ID |
| Kill switch | Rotate Stripe key → update all code | Delete restricted key → still need redeploy | ✅ Revoke vault key → proxy rejects immediately, no code change |
Testing all three failure modes
A pytest suite that validates idempotency, scope isolation, and branch safety for a Haystack billing pipeline:
import pytest
import uuid
from unittest.mock import patch, MagicMock
from haystack import Pipeline
def make_pipeline(stripe_key_input=True):
"""Build pipeline with key injected per-run (stripe_key_input=True)
or baked at build time (stripe_key_input=False)."""
from product_api.components import StripeChargeComponent
p = Pipeline()
if stripe_key_input:
p.add_component("charge", StripeChargeComponent())
else:
p.add_component("charge", StripeChargeComponent(stripe_key="sk_live_hardcoded"))
return p
def test_pipeline_retry_uses_same_idempotency_key():
"""Retrying a failed pipeline must reuse the same idempotency key — no duplicate charge."""
call_log = []
with patch("stripe.Charge.create") as mock_create:
def side_effect(**kwargs):
call_log.append(kwargs.get("idempotency_key"))
if len(call_log) == 1:
raise Exception("transient downstream failure")
return MagicMock(id="ch_retry", status="succeeded")
mock_create.side_effect = side_effect
pipeline = make_pipeline()
run_id = str(uuid.uuid4())
key = f"billing-cus_ABC-{run_id}"
inputs = {
"charge": {
"customer_id": "cus_ABC",
"amount_cents": 2900,
"description": "Pro plan",
"vault_key": "vault_key_billing",
"idempotency_key": key,
}
}
for _ in range(2):
try:
pipeline.run(inputs)
except Exception:
pass
assert len(call_log) == 2, "component called twice (first call failed, second succeeded)"
assert call_log[0] == call_log[1], "idempotency key must be identical across retries"
def test_spend_cap_exceeded_raises_before_stripe():
"""Proxy 429 on cap exceeded must be raised before the Stripe call is forwarded."""
with patch("stripe.StripeClient") as MockClient:
MockClient.return_value.charges.create.side_effect = Exception("429 daily cap exceeded")
pipeline = make_pipeline()
with pytest.raises(Exception, match="429"):
pipeline.run({
"charge": {
"customer_id": "cus_CAP",
"amount_cents": 99900,
"description": "Over-limit",
"vault_key": "vault_key_billing",
"idempotency_key": str(uuid.uuid4()),
}
})
def test_billing_vault_key_returns_403_for_refund_endpoint():
"""A billing vault key scoped to charges:write must be rejected for refund operations."""
with patch("stripe.StripeClient") as MockClient:
MockClient.return_value.refunds.create.side_effect = Exception("403 not in allowed_ops")
pipeline = make_pipeline()
with pytest.raises(Exception, match="403"):
pipeline.run({
"refund": {
"charge_id": "ch_old",
"amount": 2900,
"vault_key": "vault_key_billing", # billing key — must not allow refunds
"idempotency_key": str(uuid.uuid4()),
}
})
def test_no_live_stripe_key_in_outbound_component_headers():
"""The raw Stripe secret key must never appear in outbound HTTP headers when using proxy routing."""
captured_headers = []
with patch("stripe.StripeClient") as MockClient:
def capture_init(*args, **kwargs):
captured_headers.append(kwargs.get("api_key", ""))
return MagicMock()
MockClient.side_effect = capture_init
pipeline = make_pipeline()
pipeline.run({
"charge": {
"customer_id": "cus_HDR",
"amount_cents": 100,
"description": "Test",
"vault_key": "vault_key_test_abc123",
"idempotency_key": str(uuid.uuid4()),
}
})
for header_key in captured_headers:
assert not header_key.startswith("sk_live_"), \
f"Live Stripe key found in outbound request: {header_key[:12]}..."
Gap analysis: what restricted keys and vault keys don't cover
Async pipeline execution and race conditions
Haystack's async execution model uses Python asyncio for concurrent branches. If you add await to your component's run() method and use an async Stripe client, you need to ensure idempotency keys are generated before the await — not inside it — to avoid race conditions where two concurrent invocations generate different UUIDs for what is logically the same operation.
Pipeline serialization and warm restart
Haystack supports serializing pipelines to YAML for warm restart. If a pipeline serializes mid-run state and restarts from a checkpoint, the idempotency key from the interrupted run must be recovered from the checkpoint — not regenerated. Store run_id in the pipeline's run inputs and include it in the checkpoint so a restarted pipeline continues with the original key.
DocumentStore write-before-charge ordering
A common Haystack pattern: retrieve documents → generate answer → write to DocumentStore → charge user. If the DocumentStore write succeeds but the charge fails, retrying the pipeline with the same idempotency key correctly deduplicates the charge. But if the retry generates a new idempotency key (because run_id was regenerated), you get a duplicate charge. The rule: run_id must be the caller's responsibility, not the pipeline's, and must survive across retries.
Custom component state mutation
Haystack components can hold mutable instance state (counters, caches, accumulators) that persists across pipeline.run() calls. If a SpendTrackerComponent accumulates spend in self.total_spent, it accumulates correctly within one pipeline instance but fails silently when multiple pipeline instances are running in parallel — each instance has its own counter. The proxy's per-key spend tracking is global and correct by construction.
FAQ
Does Haystack have built-in retry logic for component failures?
Haystack 2.x does not include automatic component-level retry with backoff in its core pipeline execution. Retry logic lives in the calling code (wrapping pipeline.run()) or inside individual components. This means idempotency key management is entirely the developer's responsibility — Haystack won't generate or preserve keys across retries for you.
Can I use Haystack's async pipeline mode with the proxy?
Yes. The proxy handles concurrent requests normally — each request is independently authenticated, policy-checked, and logged. The per-key spend cap is enforced atomically at the proxy (compare-and-decrement in the spend table). Two concurrent branch calls with different vault keys or different idempotency keys are treated as independent operations. Two calls with the same idempotency key within 24 hours return the cached result.
How does the vault key interact with Haystack's pipeline serialization (YAML)?
Don't serialize vault keys into pipeline YAML. The vault key is a runtime input — pass it in pipeline.run({"charge": {"vault_key": key}}), not as a component constructor parameter. Constructor parameters are serialized; runtime inputs are not. This separation also makes it safe to version-control your pipeline YAML without secrets.
What's the latency overhead of the proxy layer?
The proxy adds ~2–5ms per call (SQLite policy lookup + audit write + forward). For a Stripe charge that already takes 200–500ms round-trip, this is negligible. The proxy runs co-located with your application in the same VPC, so network overhead is minimal.
Does the vault key work with Haystack's integration with Anthropic or OpenAI models?
The vault key replaces only the Stripe API key — it's specific to your Stripe (or Twilio, Resend) calls. Your LLM API key (Anthropic, OpenAI) is separate and unaffected. Tools like LiteLLM handle LLM spend governance; Keybrake handles the downstream SaaS API spend governance. They're complementary, not overlapping.
What happens if the proxy is down when a pipeline tries to charge?
The charge fails with a connection error — the same way a Stripe API outage would fail. Your existing error handling (catch the exception, retry with exponential backoff, alert ops) applies. Configure a short proxy timeout (5s) so a proxy failure fails fast and falls through to your circuit breaker, rather than hanging the entire pipeline.
Add spend caps and audit to your Haystack billing pipeline
Keybrake is a scoped API-key proxy for the SaaS APIs your agents call — Stripe, Twilio, Resend. Swap one URL, get daily spend caps, endpoint allowlists, and a queryable audit log. Free tier: 1,000 proxied requests/month.