Dramatiq · AI agents · API key security

Dramatiq AI agent API key: scoping vendor calls in actor-model task queues

Dramatiq is a modern Python task queue built on the actor model — simpler middleware, predictable retry semantics, and first-class support for parallel dispatch via group() and pipeline(). When AI agents use Dramatiq to fan out vendor API calls, those same primitives become spending risks: group() dispatches hundreds of messages simultaneously with no per-group spend cap, max_retries multiplies each network failure into up to N+1 vendor calls, and middleware pipelines can trigger actors in sequence without any per-pipeline dollar limit. This page covers the vault-key pattern that bounds per-group vendor spend without restructuring your actor definitions.

TL;DR

Dramatiq actors are designed for reliable background work — not for vendor API calls that cost money per invocation. Issue a vault key before dispatching a group(), pass it as a message argument to every actor in the group, enforce a per-group dollar cap at the proxy layer, and get a structured per-call audit log with the actor name and message ID attached. Revoke the vault key from the Keybrake dashboard without rotating the real API key that every Dramatiq worker shares via the broker.

How Dramatiq AI agent actors call vendor APIs

In a Dramatiq application, vendor API calls live inside @dramatiq.actor-decorated functions. An AI billing agent that processes subscription renewals might use dramatiq.group() to fan out charges:

import dramatiq
from dramatiq.brokers.redis import RedisBroker
import stripe
import os

broker = RedisBroker(url="redis://localhost:6379")
dramatiq.set_broker(broker)

@dramatiq.actor(max_retries=3, min_backoff=1000, max_backoff=60000)
def charge_customer(customer_id: str, amount_cents: int) -> None:
    stripe.api_key = os.environ["STRIPE_SECRET_KEY"]  # full-access key
    stripe.PaymentIntent.create(
        amount=amount_cents,
        currency="usd",
        customer=customer_id,
    )

def run_billing(customer_ids: list[str], amount_cents: int) -> None:
    group = dramatiq.group([
        charge_customer.message(cid, amount_cents)
        for cid in customer_ids
    ])
    group.run()

This is standard Dramatiq. The group() enqueues all messages simultaneously; max_retries=3 handles transient network errors. The problem: STRIPE_SECRET_KEY is a full-access key read from the environment on every worker, there's no cap on how many charges the group can issue, and a retry on a network timeout may create a duplicate charge if an idempotency key isn't used.

Three gaps Dramatiq's native tooling doesn't fill for vendor spend control

GapWhat happens in practiceDramatiq's answer
No per-group spend cap A billing group receives 4,000 customer IDs due to a data pipeline bug that duplicated a query result. Dramatiq enqueues all 4,000 messages immediately. Workers process them in parallel at whatever thread-pool concurrency you've configured. Stripe confirms each charge. The only limit on total spend is your Stripe account credit cap — not the $300 intended for the day's renewal run. None. Dramatiq's middleware and monitoring hooks expose message states and throughput, not dollar amounts spent on vendor calls.
No actor-level vendor revoke Dramatiq does not have a built-in mid-run cancel for enqueued messages. You can flush the broker queue, but messages already picked up by workers will execute. Rotating the real Stripe secret to stop in-flight actor calls breaks every other Dramatiq worker that reads that environment variable. No equivalent to Celery's revoke(). Clearing the queue stops pending messages but does not interrupt actors already executing.
No per-call audit with actor context Dramatiq's result backend (Redis/database) records success and failure states, but doesn't parse dollar amounts from Stripe responses or cross-reference Stripe Request-Id values with Dramatiq message IDs in a queryable format. StatsD metrics for throughput and failure rates. No structured cost tracking, no actor-message-to-Stripe-charge correlation.

The group() risk: simultaneous dispatch and parallel vendor calls

A Dramatiq group() call enqueues all messages to the broker in a single batch. The broker distributes them to available workers immediately. On a Dramatiq deployment with 8 worker processes at 8 threads each, that's 64 simultaneous actor executions — 64 simultaneous Stripe API calls. Unlike Celery's chord, Dramatiq groups don't have a built-in completion callback by default (you use group.run() which enqueues all and optionally waits), meaning the fan-out to vendor calls happens before any aggregation or budget check.

A per-group vault key with a dollar cap turns this into a bounded operation. The proxy enforces the cap atomically across all concurrent actor calls: once cumulative spend hits the limit, further calls return 429. Dramatiq actors that receive a 429 raise a non-retryable exception if you configure it correctly — which is the right behavior when the cap is hit intentionally (the budget is exhausted, not a transient error).

The max_retries risk: duplicate charges on network failures

Dramatiq retries messages that raise exceptions during actor execution, up to max_retries times with exponential backoff. Applied to vendor API calls, this creates a duplicate-charge risk identical to Celery's autoretry_for:

  1. A charge_customer actor calls Stripe and the TCP connection drops before the response arrives.
  2. Dramatiq sees an uncaught exception and schedules a retry with backoff.
  3. The retry calls Stripe again — but Stripe may have already processed the first request before the connection dropped.
  4. Without a stable idempotency key, Stripe creates a second charge.

Idempotency keys are the primary defense. A vault key proxy adds a second layer: if the first call succeeded (recorded in the proxy audit log), the proxy can return a cached response on retries for the same idempotency key instead of forwarding again. This prevents double charges even when the idempotency key isn't set correctly on the application side.

Scoping vault keys per Dramatiq group

Issue the vault key before dispatching the group, then pass it as an explicit argument in every actor message:

import httpx
import dramatiq
from dramatiq.brokers.redis import RedisBroker
import stripe
import os
import uuid

broker = RedisBroker(url="redis://localhost:6379")
dramatiq.set_broker(broker)

def issue_vault_key(group_label: str, budget_usd: float) -> str:
    r = httpx.post(
        "https://proxy.keybrake.com/vault/keys",
        headers={"Authorization": f"Bearer {os.environ['KEYBRAKE_API_KEY']}"},
        json={
            "vendor": "stripe",
            "daily_usd_cap": budget_usd,
            "allowed_endpoints": ["POST /v1/payment_intents"],
            "expires_in": "2h",
            "agent_run_label": f"dramatiq/{group_label}",
        },
    )
    return r.json()["vault_key"]

@dramatiq.actor(max_retries=3, min_backoff=1000, max_backoff=60000)
def charge_customer(customer_id: str, amount_cents: int, vault_key: str) -> None:
    stripe.api_key = vault_key                          # scoped vault key
    stripe.api_base = "https://proxy.keybrake.com/stripe"
    stripe.PaymentIntent.create(
        amount=amount_cents,
        currency="usd",
        customer=customer_id,
        idempotency_key=f"{customer_id}-{amount_cents}",  # stable retry key
    )

def run_billing(customer_ids: list[str], amount_cents: int, budget_usd: float = 300.0) -> None:
    group_label = str(uuid.uuid4())[:8]
    vault_key = issue_vault_key(group_label, budget_usd)
    group = dramatiq.group([
        charge_customer.message(cid, amount_cents, vault_key)
        for cid in customer_ids
    ])
    group.run()

The vault key is issued once and embedded in every message in the group. All actor executions in the group share the same vault key and its per-group cap. The real Stripe secret stays in Keybrake's environment, never in the Dramatiq broker queue or worker environment variables. The audit log records each call with agent_run_label: "dramatiq/{group_label}" — queryable by group or by time window.

How Keybrake fits

Keybrake is the proxy layer between your Dramatiq actors and Stripe, Twilio, or Resend. You swap stripe.api_key for the vault key and set stripe.api_base to https://proxy.keybrake.com/stripe. The real Stripe secret stays in Keybrake, not in your Dramatiq broker messages or worker process environment. Each group dispatch gets its own vault key with its own dollar cap, endpoint allowlist, and expiry. Groups that exceed the cap return 429s — raise these as non-retryable dramatiq.actor(throws=...) exceptions to prevent retry storms on intentional cap exhaustion.

Get early access

Related questions

How should I handle the vault key in Dramatiq pipeline() compositions?

For dramatiq.pipeline(), each stage in the pipeline is a separate actor message. Issue the vault key before running the pipeline and pass it as an argument to each stage that makes vendor calls. If a pipeline stage both processes data and makes a vendor call, the vault key travels through the pipeline as an explicit message argument. The per-pipeline cap accumulates across all stages — which is correct if the pipeline represents a single logical transaction. If stages in a pipeline represent independent operations with independent budgets, issue separate vault keys per stage.

Does passing the vault key inside Dramatiq messages expose it in the broker?

The vault key string is serialized into the message payload in the broker (Redis, RabbitMQ) just like any other argument. Apply the same security practices you would for any sensitive argument: ensure your broker connection is TLS-encrypted and access-controlled. The key advantage over embedding the real API key is the vault key's properties: it has a short TTL, a dollar cap, an endpoint allowlist, and can be revoked from the Keybrake dashboard even after it's been distributed across hundreds of queued messages. The real Stripe secret never enters the broker queue at all.

How do I configure max_retries so retries don't fire on intentional 429 cap exhaustion?

Use Dramatiq's Retries middleware with a custom should_retry predicate, or raise a specific exception class for cap exhaustion that you exclude from retry. When the proxy returns 429 due to cap exhaustion, your actor receives an HTTP error response — raise a custom BudgetExhausted exception and configure @dramatiq.actor(throws=BudgetExhausted) with max_retries=0 for that exception class. Stripe's own 429s (rate limits, not cap exhaustion) should still be retried — distinguish them by response body content: Keybrake cap exhaustion returns a specific X-Keybrake-Cap-Hit: true header you can check in the exception handler.

Further reading