Skip to content

API Reference

Transport

celery_redis_plus.Transport

Custom transport with sorted set queues, priority encoding, delayed delivery, and Redis Streams fanout.

Usage:

# For Valkey
app.config_from_object({
    'broker_url': 'valkey://localhost:6379/0',
})

# For Redis
app.config_from_object({
    'broker_url': 'redis://localhost:6379/0',
    'broker_transport': 'celery_redis_plus.transport:Transport',
})

Features:

  • Sorted set queues with BZMPOP for atomic consumption
  • Full 256-level priority support (0-255, higher = more important)
  • Native delayed delivery using sorted set timestamps
  • Redis Streams for reliable fanout messaging

Bootstep

celery_redis_plus.DelayedDeliveryBootstep

Worker bootstep that integrates with the Celery consumer lifecycle.

Usage:

from celery_redis_plus import DelayedDeliveryBootstep

app.steps['consumer'].add(DelayedDeliveryBootstep)

Responsibilities:

  • Signals transport when consumer starts/stops
  • Enables native delayed delivery mode on the transport

Configuration Options

broker_transport_options

All options are passed via Celery's broker_transport_options configuration.

Core Options

Option Type Default Description
visibility_timeout int 300 Seconds before unacked messages are reclaimed
global_keyprefix str "" Prefix for all Redis keys
stream_maxlen int 10000 Max messages per fanout stream (approximate)

Message Storage Options

Option Type Default Description
message_key_prefix str "message:" Prefix for per-message hash keys
message_ttl int 259200 TTL in seconds for message hashes (default: 3 days)
messages_index_prefix str "messages_index:" Prefix for per-queue messages index sorted sets

Connection Options

Option Type Default Description
socket_timeout float None Socket timeout in seconds
socket_connect_timeout float None Socket connection timeout in seconds
socket_keepalive bool None Enable TCP keepalive
socket_keepalive_options dict None TCP keepalive options
max_connections int 10 Maximum connections in pool
health_check_interval int 25 Health check interval in seconds
retry_on_timeout bool None Retry on timeout
client_name str None Redis client name for CLIENT SETNAME
ssl bool or dict None SSL/TLS configuration

Fanout Options

Option Type Default Description
fanout_prefix bool or str True Prefix for fanout streams (True uses /{db}.)
fanout_patterns bool True Enable pattern-based fanout routing

Advanced Options

Option Type Default Description
sep str "\x06\x16" Separator for binding key encoding

Example Configuration

app.config_from_object({
    'broker_url': 'valkey://localhost:6379/0',
    'broker_transport_options': {
        'global_keyprefix': 'myapp:',
        'visibility_timeout': 600,
        'stream_maxlen': 50000,
        'message_ttl': 86400,  # 1 day
        'max_connections': 20,
        'health_check_interval': 30,
    },
})

Redis Keys

The transport uses the following Redis key patterns:

Pattern Type Description
queue:{name} Sorted Set Queue storing delivery tags with priority+timestamp scores
message:{delivery_tag} Hash Message payload, routing key, priority, and flags
messages_index:{name} Sorted Set Per-queue index tracking {delivery_tag: queue_at} for visibility timeout and delayed delivery
/{db}.{exchange} Stream Fanout messages
_kombu.binding.{exchange} Set Queue-exchange bindings

Constants

The following constants are used internally and define default behavior:

Constant Value Description
DEFAULT_VISIBILITY_TIMEOUT 300 Default visibility timeout (5 minutes)
DEFAULT_REQUEUE_CHECK_INTERVAL 60 Interval for checking messages to requeue
DEFAULT_REQUEUE_BATCH_LIMIT 1000 Max messages processed per requeue cycle
DEFAULT_STREAM_MAXLEN 10000 Default max length for fanout streams
DEFAULT_MESSAGE_TTL 259200 Default TTL for message hashes (3 days)
PRIORITY_SCORE_MULTIPLIER 10^13 Multiplier for priority in score calculation
QUEUE_KEY_PREFIX "queue:" Prefix for queue sorted sets
MESSAGE_KEY_PREFIX "message:" Prefix for message hashes
MESSAGES_INDEX_PREFIX "messages_index:" Prefix for per-queue message index sorted sets