Skip to content

PgQueuer v1.0

v1.0.0 Breaking

This release includes 8 breaking changes for platform teams planning a safe upgrade.

✓ No known CVEs patched
Read the diff → Tool health → What is this tool? →

✓ No known CVEs patched in this version

Topics

postgresql python queue

Affected surfaces

breaking_upgrade

ReleasePort's take

Light signal
editorial:auto 13d

PgQueuer v1.0 requires schema migration and converts entrypoints to async-only. Multiple API breaking changes affect QueueManager, factories, and CLI arguments.

Why it matters: Schema migration to pgq v1.0 is a prerequisite. Sync entrypoints will error at runtime without code changes. API signature changes require QueueManager, factory, and CLI updates. Plan comprehensive migration before deploying.

Summary

AI summary

PgQueuer v1.0 introduces async‑only entrypoints, schema migration for retries, global concurrency limits, and an MCP server for AI agents.

Changes in this release

Breaking High

Synchronous entrypoint handlers are rejected; only async def is allowed

Synchronous entrypoint handlers are rejected; only async def is allowed

Source: granite4.1:30b@2026-05-23-audit

Confidence: low

Breaking Medium

Entrypoint factories must be async context managers with yield

Entrypoint factories must be async context managers with yield

Source: llm_adapter@2026-05-21

Confidence: high

Breaking Medium

serialized_dispatch=True becomes concurrency_limit=1; retry_timer consolidated to heartbeat_timeout

serialized_dispatch=True becomes concurrency_limit=1; retry_timer consolidated to heartbeat_timeout

Source: llm_adapter@2026-05-21

Confidence: high

Breaking Medium

Custom async Driver must implement notify(channel, payload) method

Custom async Driver must implement notify(channel, payload) method

Source: llm_adapter@2026-05-21

Confidence: high

Breaking Medium

Sync entrypoints raise TypeError at decoration; use asyncio.to_thread

Sync entrypoints raise TypeError at decoration; use asyncio.to_thread

Source: llm_adapter@2026-05-21

Confidence: low

Breaking Medium

CLI --pg-host, --pg-user removed; use libpq env vars

CLI --pg-host, --pg-user removed; use libpq env vars

Source: llm_adapter@2026-05-21

Confidence: low

Breaking Medium

RetryWithBackoffEntrypointExecutor renamed to DatabaseRetryEntrypointExecutor

RetryWithBackoffEntrypointExecutor renamed to DatabaseRetryEntrypointExecutor

Source: llm_adapter@2026-05-21

Confidence: low

Breaking Medium

QueueManager(driver) becomes QueueManager(Queries(driver)); PgQueuer unchanged

QueueManager(driver) becomes QueueManager(Queries(driver)); PgQueuer unchanged

Source: llm_adapter@2026-05-21

Confidence: low

Breaking Medium

Run pgq upgrade to add attempts column and failed enum value

Run pgq upgrade to add attempts column and failed enum value

Source: llm_adapter@2026-05-21

Confidence: low

Breaking Medium

14 internal shim modules removed from package root

14 internal shim modules removed from package root

Source: llm_adapter@2026-05-21

Confidence: low

Breaking Medium

CLI flags --pg-host, --pg-user etc. removed; use libpq environment variables or --pg-dsn instead

CLI flags --pg-host, --pg-user etc. removed; use libpq environment variables or --pg-dsn instead

Source: granite4.1:30b@2026-05-23-audit

Confidence: low

Breaking Medium

RetryWithBackoffEntrypointExecutor renamed to DatabaseRetryEntrypointExecutor and no longer depends on async-timeout

RetryWithBackoffEntrypointExecutor renamed to DatabaseRetryEntrypointExecutor and no longer depends on async-timeout

Source: granite4.1:30b@2026-05-23-audit

Confidence: low

Breaking Medium

QueueManager now requires Queries(driver) instead of driver directly; PgQueuer constructor unchanged

QueueManager now requires Queries(driver) instead of driver directly; PgQueuer constructor unchanged

Source: granite4.1:30b@2026-05-23-audit

Confidence: low

Feature Medium

Raise RetryRequested exception to retry failed jobs with delay

Raise RetryRequested exception to retry failed jobs with delay

Source: llm_adapter@2026-05-21

Confidence: high

Feature Medium

on_failure='hold' option parks failed jobs for inspection and requeue

on_failure='hold' option parks failed jobs for inspection and requeue

Source: llm_adapter@2026-05-21

Confidence: high

Feature Medium

Global concurrency_limit enforces limits across entire fleet via database

Global concurrency_limit enforces limits across entire fleet via database

Source: llm_adapter@2026-05-21

Confidence: high

Feature Medium

ScheduleContext provides shared resources to scheduled tasks like pools

ScheduleContext provides shared resources to scheduled tasks like pools

Source: llm_adapter@2026-05-21

Confidence: high

Feature Medium

DatabaseRetryEntrypointExecutor wrapper adds exponential backoff to any handler

DatabaseRetryEntrypointExecutor wrapper adds exponential backoff to any handler

Source: llm_adapter@2026-05-21

Confidence: low

Feature Medium

MCP server with 11 read-only tools for queue inspection

MCP server with 11 read-only tools for queue inspection

Source: llm_adapter@2026-05-21

Confidence: low

Feature Medium

DatabaseRetryEntrypointExecutor adds exponential backoff based on job attempts

DatabaseRetryEntrypointExecutor adds exponential backoff based on job attempts

Source: granite4.1:30b@2026-05-23-audit

Confidence: low

Performance Medium

Dequeue CTE split restores partial-index usage for throughput

Dequeue CTE split restores partial-index usage for throughput

Source: llm_adapter@2026-05-21

Confidence: high

Bugfix Medium

NOTIFY watcher survives bad callbacks instead of dying silent

NOTIFY watcher survives bad callbacks instead of dying silent

Source: llm_adapter@2026-05-21

Confidence: high

Bugfix Medium

Deferred jobs wake within 100ms of eligible time

Deferred jobs wake within 100ms of eligible time

Source: llm_adapter@2026-05-21

Confidence: low

Bugfix Medium

Heartbeat waits interval before first send, prevents redundant write

Heartbeat waits interval before first send, prevents redundant write

Source: llm_adapter@2026-05-21

Confidence: low

Bugfix Medium

TOCTOU race on deferred jobs fixed, eliminates unnecessary sleep

TOCTOU race on deferred jobs fixed, eliminates unnecessary sleep

Source: llm_adapter@2026-05-21

Confidence: low

Bugfix Medium

In-memory adapter releases dedupe_key on held and failed jobs

In-memory adapter releases dedupe_key on held and failed jobs

Source: llm_adapter@2026-05-21

Confidence: low

Bugfix Low

Deferred jobs wake within ~100 ms of their eligible time instead of full dequeue_timeout

Deferred jobs wake within ~100 ms of their eligible time instead of full dequeue_timeout

Source: granite4.1:30b@2026-05-23-audit

Confidence: low

Bugfix Low

Heartbeat waits its configured interval before first send, avoiding redundant DB writes on short jobs

Heartbeat waits its configured interval before first send, avoiding redundant DB writes on short jobs

Source: granite4.1:30b@2026-05-23-audit

Confidence: low

Full changelog

PgQueuer v1.0.0

From here on: strict semver. Patch for bug fixes,
minor for backward-compat features, major only for breaks. Public API frozen.

This release is a hard reset: cleaner core, fewer footguns, smaller surface,
new building blocks for retries, failure handling, and AI introspection.

Why upgrade

Database-level retry, finally done right. Raise RetryRequested from any
handler — the job stays in the queue, gets a bumped attempts counter, and
re-runs after your chosen delay. Survives worker crashes, restarts, OOM kills.

from pgqueuer import RetryRequested
from datetime import timedelta

@pgq.entrypoint("call_api")
async def call_api(job: Job) -> None:
    if response.status == 429:
        raise RetryRequested(delay=timedelta(seconds=30), reason="rate limited")

Exponential backoff in one line. DatabaseRetryEntrypointExecutor wraps
any handler with retries + backoff, computed from job.attempts. No
async-timeout, no in-process state.

@pgq.entrypoint(
    "flaky_api",
    executor_factory=lambda p: DatabaseRetryEntrypointExecutor(
        parameters=p, max_attempts=5, max_delay=timedelta(minutes=5),
    ),
)
async def flaky_api(job: Job) -> None: ...

Hold failed jobs for inspection. on_failure="hold" parks terminally
failed jobs in the queue table with status='failed' instead of deleting
them. Inspect with pgq failed, requeue with pgq requeue 42 43 44.

Concurrency limits are now global. concurrency_limit=5 means 5 across
your entire fleet, enforced at the database via FOR UPDATE SKIP LOCKED.
No more per-process semaphores that lied to you under horizontal scale.

MCP server for AI agents. pip install pgqueuer[mcp] ships an MCP server
with 11 read-only tools (queue size, throughput, stale jobs, schema info,
failed jobs, …). Plug it into Claude Code, Cursor, or any MCP client to let
your AI assistant inspect queue state.

Scheduled tasks get shared resources too. ScheduleContext mirrors the
queue handler Context — pass HTTP clients, DB pools, ML models in once via
PgQueuer(driver, resources={...}).

Bug fixes worth calling out

  • Deferred jobs wake within ~100ms of their eligible time. Previously
    they could wait the full dequeue_timeout (default 30s).
  • TOCTOU race on deferred jobs fixed — no more unnecessary full-timeout
    sleeps when a job becomes eligible mid-dequeue.
  • Dequeue CTE split to restore partial-index usage on the hot path —
    meaningful throughput recovery on busy queues.
  • Heartbeat waits its interval before the first send — no redundant DB
    write on short jobs.
  • Psycopg NOTIFY watcher survives bad callbacks instead of dying silent.
  • In-memory adapter correctly releases dedupe_key on held/failed jobs.

Breaking changes — the short version

Full list and migration steps in
RELEASE.md (all
23 numbered) and the upgrade guide.

The themes you'll hit:

  1. Async only. Sync entrypoints (def handler) raise TypeError at
    decoration time. Wrap blocking calls with asyncio.to_thread().

  2. Factories are async context managers. Use @asynccontextmanager +
    yield. Plain async def factory() -> PgQueuer is rejected.

  3. Schema migration required. Run pgq upgrade (or pgq install on a
    fresh DB). Adds the attempts column and 'failed' enum value.

  4. Concurrency knobs collapsed. serialized_dispatch=True
    concurrency_limit=1. Per-entrypoint retry_timer= → single
    heartbeat_timeout= on pgq.run() (default 30s). requests_per_second
    removed — use concurrency_limit for deterministic backpressure.

  5. CLI connection flags gone. --pg-host / --pg-user / etc. removed.
    Use standard libpq env vars (PGHOST, PGUSER, …) or --pg-dsn.

  6. Retry executor renamed. RetryWithBackoffEntrypointExecutor
    DatabaseRetryEntrypointExecutor. Drops the async-timeout runtime dep.

  7. Manager constructors take a Queries. QueueManager(driver)
    QueueManager(Queries(driver)). Same for SchedulerManager and
    CompletionWatcher. PgQueuer(driver) is unchanged.

  8. 14 internal shim modules removed from package root (pgqueuer.buffers,
    pgqueuer.qb, pgqueuer.helpers, …). Public API shims (pgqueuer.db,
    .queries, .qm, .sm, …) are untouched.

  9. Custom async Driver implementations must add notify(channel, payload).
    pg_notify moved from the queries layer into the driver protocol.
    SyncDriver is unaffected.

Upgrade in 60 seconds

pip install --upgrade pgqueuer
pgq upgrade

Then:

# 1. async handlers
@pgq.entrypoint("my_job")
async def my_job(job: Job) -> None: ...

# 2. async-context-manager factory
@asynccontextmanager
async def factory():
    conn = await asyncpg.connect()
    yield PgQueuer(AsyncpgDriver(conn))
    await conn.close()

# 3. global heartbeat instead of per-entrypoint retry_timer
await pgq.run(heartbeat_timeout=timedelta(seconds=60))

Run your tests. Most breakages surface at decoration or startup time, so
problems are immediately visible.

What's next

v1.x is for additions and fixes. No more API churn without a major bump.

Thanks to everyone who filed issues, sent PRs, and stress-tested early
versions. PgQueuer is production-ready.

Full Changelog: https://github.com/janbjorge/pgqueuer/compare/v0.26.3...v1.0.0

Breaking Changes

  • Sync entrypoints (`def handler`) now raise `TypeError`; all handlers must be `async def`.
  • Factory functions for PgQueuer instances must be async context managers (use `@asynccontextmanager`).
  • Database schema migration required: add `attempts` column and `'failed'` enum value via `pgq upgrade`.
  • Concurrency knobs consolidated: `serialized_dispatch=True` → `concurrency_limit=1`; per‑entrypoint `retry_timer=` removed; use global `heartbeat_timeout=` on `pgq.run()`.
  • CLI connection flags (`--pg-host`, `--pg-user`, etc.) removed; use libpq environment variables or `--pg-dsn`.
  • Retry executor renamed from `RetryWithBackoffEntrypointExecutor` to `DatabaseRetryEntrypointExecutor`.
  • Manager constructors now require a `Queries` object (e.g., `QueueManager(Queries(driver))`).
  • Removed 14 internal shim modules (`pgqueuer.buffers`, `pgqueuer.qb`, …) from the package root; public API unchanged.

Weekly OSS security release digest.

The CVE patches and breaking changes that affected production tools this week. One email, every Sunday.

No spam, unsubscribe anytime.

Share this release

Track PgQueuer v1.0

Get notified when new releases ship.

Sign up free

About PgQueuer v1.0

All releases →

Related context

Related tools

Beta — feedback welcome: [email protected]