Luigi · AI agents · API key security
Luigi AI agent API key: scoping vendor calls in Spotify's pipeline orchestrator
Luigi is Spotify's open-source Python pipeline framework — dependency graphs expressed as Task classes, requires() for upstream dependencies, and output() for completion markers. It's widely used for batch data processing, ML feature pipelines, and increasingly for AI agent workloads that need reliable task sequencing with automatic dependency resolution. When AI agents use Luigi to call vendor APIs, the framework's dynamic task generation and automatic failure re-scheduling create vendor spend risks that Luigi's native tooling doesn't address: no per-pipeline dollar cap, no task-level API key revoke, and no per-call audit log with Luigi task context. This page covers the vault-key pattern that bounds vendor spend per Luigi run.
TL;DR
Issue a vault key in a Luigi setup task that upstream tasks depend on via requires(). The vault key flows downstream through Luigi's parameter system as an explicit luigi.Parameter() on each task class that makes vendor calls. Every task in the pipeline run shares the same vault key and its per-run dollar cap. The real Stripe or Twilio secret never appears in Luigi worker processes or task output files. Revoking a stuck run is a single DELETE call — it stops further vendor calls from any task in the pipeline without disrupting other Luigi pipelines using the same central scheduler.
How Luigi AI agent tasks call vendor APIs
A typical Luigi AI agent billing pipeline uses a root task that generates subtasks dynamically via requires(). Each subtask reads the Stripe key from the environment:
import luigi
import stripe
import os
class ChargeCustomer(luigi.Task):
customer_id = luigi.Parameter()
amount_cents = luigi.IntParameter()
def output(self):
return luigi.LocalTarget(f"/tmp/charged/{self.customer_id}.json")
def run(self):
stripe.api_key = os.environ["STRIPE_SECRET_KEY"] # full-access key
intent = stripe.PaymentIntent.create(
amount=self.amount_cents,
currency="usd",
customer=self.customer_id,
)
with self.output().open("w") as f:
f.write(intent.to_json())
class BillingPipeline(luigi.Task):
customer_ids = luigi.ListParameter()
amount_cents = luigi.IntParameter()
def requires(self):
# dynamically generates one ChargeCustomer task per customer
return [
ChargeCustomer(
customer_id=cid,
amount_cents=self.amount_cents
)
for cid in self.customer_ids
]
def run(self):
# aggregation step after all charges complete
pass
This is standard Luigi. BillingPipeline.requires() generates one ChargeCustomer task per customer ID. Luigi's worker pool runs them in parallel up to the configured --workers N count. The Stripe key is read from os.environ on every task — shared across the entire Luigi worker process. If customer_ids contains 500 entries due to a data pipeline error, 500 parallel Stripe calls execute with no dollar cap.
Three gaps Luigi's native tooling doesn't fill for vendor spend control
| Gap | What happens in practice | Luigi's answer |
|---|---|---|
| No per-pipeline spend cap | A data feed passes a duplicated customer list to BillingPipeline — 2,000 IDs instead of 200. Luigi generates 2,000 ChargeCustomer tasks. With --workers 16, 16 Stripe calls fire simultaneously in a continuous stream until all 2,000 complete. Luigi's completion semantics guarantee idempotency via output() markers — meaning Luigi considers a successfully charged customer "done" and won't re-run it. But the 2,000 first-time charges still execute. There is no per-pipeline dollar limit that halts task generation partway through. |
None. Luigi tracks task completion states (pending/running/done/failed) but has no visibility into the dollar value of vendor API calls made inside task.run(). |
| No task-level vendor revoke | Luigi workers are long-running processes. Stopping vendor calls mid-pipeline requires either killing the worker processes (which may leave tasks in an indeterminate state) or rotating the Stripe secret in the environment and restarting workers. Rotating the key breaks every other Luigi pipeline on the same worker that reads the same environment variable. Luigi's central scheduler doesn't provide a way to cancel individual tasks that are currently executing — it can mark tasks as failed and prevent retries, but the running process continues until the run() method returns. |
The central scheduler's /api/v1/task_list API shows task states. No per-task API key scoping or revocation mechanism exists. |
| No per-call audit with Luigi task context | Luigi's event hooks (@luigi.Task.event_handler) fire on task start, success, and failure. They capture task parameters and timing, not the dollar amounts from Stripe responses. There's no built-in way to correlate a Stripe PaymentIntent.id with the Luigi customer_id parameter and pipeline_run_id in a queryable audit table. |
Luigi's history backend stores task execution records. No structured vendor-call cost tracking or external transaction ID correlation. |
The requires() fan-out risk: dynamic task generation and parallel calls
Luigi's power is in requires() — the ability to compute upstream task dependencies dynamically at runtime. For AI agent pipelines, this often means reading a customer list from a database or upstream task output, then generating one vendor-calling task per customer. The task list is computed before any tasks run, which means Luigi will attempt to execute all generated tasks regardless of the cumulative vendor spend they represent.
Luigi's worker parallelism is configurable (--workers N on the command line), but this controls CPU/IO parallelism, not vendor spend rate. Setting --workers 1 serializes execution and slows the spend rate, but doesn't cap total spend — it just means the overcharge takes longer to complete. A dollar cap enforced at the proxy layer fires regardless of worker count.
The automatic re-run risk: failed tasks and repeated vendor calls
Luigi re-runs failed tasks when the pipeline is re-submitted, unless a output() marker exists confirming completion. This creates a subtle risk: if a Stripe call succeeds (the charge is created) but the task fails after the call (network error before writing the output file), Luigi will re-run the task and call Stripe again. Without an idempotency key, this creates a duplicate charge.
The two-part defense: idempotency keys on every vendor call (keyed to a stable task parameter combination), and a proxy deduplication layer that caches responses for a short window keyed on the idempotency key. If Luigi re-runs a task and the vault key for that run is still active, the proxy returns the cached response instead of forwarding the duplicate call.
Scoping vault keys per Luigi pipeline run
Add a VaultKeyTask that other vendor-calling tasks depend on, and thread the vault key through as a luigi.Parameter():
import luigi
import stripe
import httpx
import os
import uuid
class VaultKeyTask(luigi.Task):
run_id = luigi.Parameter(default_factory=lambda: str(uuid.uuid4())[:8])
budget_usd = luigi.FloatParameter(default=300.0)
def output(self):
return luigi.LocalTarget(f"/tmp/vault-keys/{self.run_id}.txt")
def run(self):
r = httpx.post(
"https://proxy.keybrake.com/vault/keys",
headers={"Authorization": f"Bearer {os.environ['KEYBRAKE_API_KEY']}"},
json={
"vendor": "stripe",
"daily_usd_cap": self.budget_usd,
"allowed_endpoints": ["POST /v1/payment_intents"],
"expires_in": "4h",
"agent_run_label": f"luigi/{self.run_id}",
},
)
vault_key = r.json()["vault_key"]
with self.output().open("w") as f:
f.write(vault_key)
class ChargeCustomer(luigi.Task):
customer_id = luigi.Parameter()
amount_cents = luigi.IntParameter()
run_id = luigi.Parameter()
budget_usd = luigi.FloatParameter(default=300.0)
def requires(self):
return VaultKeyTask(run_id=self.run_id, budget_usd=self.budget_usd)
def output(self):
return luigi.LocalTarget(f"/tmp/charged/{self.run_id}/{self.customer_id}.json")
def run(self):
with self.input().open("r") as f:
vault_key = f.read().strip()
stripe.api_key = vault_key
stripe.api_base = "https://proxy.keybrake.com/stripe"
intent = stripe.PaymentIntent.create(
amount=self.amount_cents,
currency="usd",
customer=self.customer_id,
idempotency_key=f"{self.run_id}-{self.customer_id}-{self.amount_cents}",
)
with self.output().open("w") as f:
f.write(intent.to_json())
class BillingPipeline(luigi.Task):
customer_ids = luigi.ListParameter()
amount_cents = luigi.IntParameter()
run_id = luigi.Parameter(default_factory=lambda: str(uuid.uuid4())[:8])
budget_usd = luigi.FloatParameter(default=300.0)
def requires(self):
return [
ChargeCustomer(
customer_id=cid,
amount_cents=self.amount_cents,
run_id=self.run_id,
budget_usd=self.budget_usd,
)
for cid in self.customer_ids
]
VaultKeyTask runs once and writes the vault key to a local output file. All ChargeCustomer tasks declare VaultKeyTask as a dependency — Luigi ensures the vault key exists before running any charge tasks. The vault key is read from the output file rather than the environment, so it's scoped to this pipeline run without modifying the worker's environment variables. The real Stripe secret never appears in Luigi task parameters, output files, or the central scheduler's history.
How Keybrake fits
Keybrake is the proxy layer between your Luigi tasks and Stripe, Twilio, or Resend. Tasks that previously called stripe.api_key = os.environ["STRIPE_SECRET_KEY"] now read a vault key from the upstream VaultKeyTask output and set stripe.api_base = "https://proxy.keybrake.com/stripe". Each pipeline run gets its own vault key with its own budget, endpoint allowlist, and TTL. When Luigi re-runs a failed task, the vault key is already present in the output file — no new key is issued, no additional budget is consumed until actual vendor calls are made.
Related questions
Does Luigi's output() marker interact well with vault key scoping?
Yes, and it's one of Luigi's advantages for this pattern. Luigi only re-runs a task if its output() target doesn't exist. If a ChargeCustomer task completes successfully (charge created, output file written), Luigi won't re-run it even on pipeline re-submission. The vault key output file (VaultKeyTask.output()) persists across re-runs of the same run_id, so re-submitted pipelines read the same vault key rather than issuing a new one. If the vault key expires (TTL reached) before a re-run, issue a new run_id to trigger a fresh key issuance — this also ensures Luigi re-runs charge tasks that may have been skipped due to expired keys.
How do I handle cap exhaustion in Luigi task retry logic?
When the proxy returns 429 due to cap exhaustion, raise a specific exception class in your run() method — not a generic Exception. Luigi's default behavior is to mark the task as failed and allow it to be retried on next pipeline submission. To prevent retries on intentional cap exhaustion (as opposed to transient network errors), check the response for X-Keybrake-Cap-Hit: true and raise a non-retryable exception, or write a sentinel output file that marks the task as "skipped due to cap" rather than "failed". This prevents Luigi from re-running cap-exhausted tasks on the next pipeline submission without a new vault key.
Can I use this pattern with Luigi's batch_method for grouped vendor calls?
Yes. Luigi's @batch_method decorator groups multiple task run() calls into a single execution for efficiency. Issue the vault key in the setup task as above, then pass it as a task parameter that the batch method reads. All tasks in the batch share the same vault key and its cap — which is correct, since they represent the same logical pipeline run. If different batches represent different budget boundaries, use different run_id values to trigger separate vault key issuance per batch.
Further reading
- Airflow AI agent API key — Airflow's TaskFlow API creates a similar dynamic task generation pattern; the vault-key injection approach is nearly identical.
- Prefect AI agent API key — Prefect's subflow pattern maps to Luigi's pipeline hierarchy; per-subflow vault keys are the equivalent of per-run vault keys here.
- AI agent API key rotation — why per-run short-lived vault keys eliminate the rotation schedule problem for Luigi pipelines.
- AI agent audit trail schema — what the per-call audit log looks like when
agent_run_labelcarries Luigi task parameters.