Celery Utils Module

Overview

The ibutsu_server.celery_utils module provides consolidated factory functions for creating Celery applications in Ibutsu. It implements two distinct configuration modes:

  1. Broker-Only Mode - Minimal configuration for monitoring (Flower)

  2. Flask-Integrated Mode - Full application context for task execution (Workers, Scheduler)

This architecture eliminates code duplication and provides a single source of truth for Celery app creation.

Architecture

The module provides a unified interface for all Celery app creation:

┌─────────────────────────────────────────────────────────────┐
│                    celery_utils.py                          │
│                 (Single Source of Truth)                     │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌──────────────────────────────────────────────────────┐  │
│  │ create_broker_celery_app()                           │  │
│  │ • Broker-only configuration                          │  │
│  │ • No Flask app required                              │  │
│  │ • No database access                                 │  │
│  │ • Used by: Flower monitoring                         │  │
│  └──────────────────────────────────────────────────────┘  │
│                                                              │
│  ┌──────────────────────────────────────────────────────┐  │
│  │ create_flask_celery_app(flask_app, name)            │  │
│  │ • Full Flask integration                             │  │
│  │ • Database access via IbutsuTask                     │  │
│  │ • Task imports & beat schedule                       │  │
│  │ • Used by: Workers, Scheduler                        │  │
│  └──────────────────────────────────────────────────────┘  │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Factory Functions

create_broker_celery_app()

Creates a minimal broker-only Celery app for monitoring purposes.

Function Signature:

def create_broker_celery_app(name="ibutsu_server_flower") -> Celery

Use Case: Flower monitoring dashboard

Configuration:

  • Reads CELERY_BROKER_URL from environment (required)

  • Reads CELERY_RESULT_BACKEND from environment (optional, defaults to broker URL)

  • Configures Redis socket timeouts and retry behavior

  • Does NOT import task modules

  • Does NOT require database access

  • Does NOT initialize Flask app

Example:

from ibutsu_server.celery_utils import create_broker_celery_app

# Create broker-only app for Flower
flower_app = create_broker_celery_app(name="ibutsu_server_flower")

# Use with Celery CLI:
# celery --app=ibutsu_server:flower_app flower --port=5555

Environment Variables:

CELERY_BROKER_URL=redis://:password@redis.example.com:6379/0
CELERY_RESULT_BACKEND=redis://:password@redis.example.com:6379/0  # Optional

Raises:

  • ValueError if CELERY_BROKER_URL is not set

create_flask_celery_app()

Creates a Flask-integrated Celery app with full application context.

Function Signature:

def create_flask_celery_app(app: Flask, name="ibutsu_server") -> Celery

Use Case: Worker and Scheduler containers that execute tasks

Configuration:

  • Requires Flask app instance with database configuration

  • Stores Flask app reference for IbutsuTask context management

  • Configures from Flask app config using CELERY namespace

  • Imports all task modules (db, importers, query, results, runs)

  • Configures beat schedule for periodic tasks

  • Sets up signal handlers for task failure retry

  • Stores Celery app in flask_app.extensions["celery"]

Example:

from flask import Flask
from ibutsu_server.celery_utils import create_flask_celery_app

# Create Flask app with configuration
flask_app = Flask(__name__)
flask_app.config['CELERY'] = {
    'broker_url': 'redis://localhost:6379/0',
    'result_backend': 'redis://localhost:6379/0',
}
flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://...'

# Create worker app
worker_app = create_flask_celery_app(flask_app, name="ibutsu_server_worker")

# Use with Celery CLI:
# celery --app=ibutsu_server:worker_app worker --loglevel=info

Flask Config Requirements:

flask_app.config['CELERY'] = {
    'broker_url': 'redis://...',
    'result_backend': 'redis://...',
    'broker_connection_retry': True,
    'broker_connection_retry_on_startup': True,
    'worker_cancel_long_running_tasks_on_connection_loss': True,
    'include': [
        'ibutsu_server.tasks.db',
        'ibutsu_server.tasks.importers',
        'ibutsu_server.tasks.query',
        'ibutsu_server.tasks.results',
        'ibutsu_server.tasks.runs',
    ],
}
flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://...'

Raises:

  • ValueError if Flask app is None

Integration with _AppRegistry

The _AppRegistry class in ibutsu_server/__init__.py delegates to these factory functions:

class _AppRegistry:
    @classmethod
    def get_flower_app(cls):
        if cls.flower_app is None:
            cls.flower_app = create_broker_celery_app(
                name="ibutsu_server_flower"
            )
        return cls.flower_app

    @classmethod
    def get_worker_app(cls):
        if cls.worker_app is None:
            flask_app = cls.get_flask_app()
            cls.worker_app = create_flask_celery_app(
                flask_app, name="ibutsu_server_worker"
            )
        return cls.worker_app

    @classmethod
    def get_scheduler_app(cls):
        if cls.scheduler_app is None:
            flask_app = cls.get_flask_app()
            cls.scheduler_app = create_flask_celery_app(
                flask_app, name="ibutsu_server_scheduler"
            )
        return cls.scheduler_app

Module-Level Exports

The module uses __getattr__ for lazy initialization:

# Import any of these and they'll be initialized on first access
from ibutsu_server import flower_app      # Broker-only
from ibutsu_server import worker_app      # Flask-integrated
from ibutsu_server import scheduler_app   # Flask-integrated

Container Entry Points

Worker Container

CMD ["celery", "--app", "ibutsu_server:worker_app", \
     "worker", "--events", "--loglevel=info"]

Scheduler Container

CMD ["celery", "--app", "ibutsu_server:scheduler_app", \
     "beat", "--loglevel=info"]

Flower Container

CMD ["celery", "--app=ibutsu_server:flower_app", \
     "flower", "--port=5555"]

Configuration Comparison

Feature

Broker-Only (Flower)

Flask-Integrated (Worker/Scheduler)

Factory Function

create_broker_celery_app()

create_flask_celery_app()

Flask App Required

❌ No

✅ Yes

Database Access

❌ No

✅ Yes

Task Imports

❌ No

✅ Yes

Beat Schedule

❌ No

✅ Yes (periodic tasks)

IbutsuTask Context

❌ No

✅ Yes (automatic)

Signal Handlers

❌ No

✅ Yes (retry on failure)

Env Vars Required

CELERY_BROKER_URL

All Flask config + DB

Use Case

Monitoring

Task execution

Socket Timeout Configuration

Both factory functions configure Redis socket timeouts from ibutsu_server.constants:

from ibutsu_server.constants import (
    SOCKET_TIMEOUT,
    SOCKET_CONNECT_TIMEOUT
)

# Applied to both broker-only and Flask-integrated apps
celery_app.conf.redis_socket_timeout = SOCKET_TIMEOUT  # 5 seconds
celery_app.conf.redis_socket_connect_timeout = SOCKET_CONNECT_TIMEOUT
celery_app.conf.redis_retry_on_timeout = True
celery_app.conf.broker_transport_options = {
    "socket_timeout": SOCKET_TIMEOUT,
    "socket_connect_timeout": SOCKET_CONNECT_TIMEOUT,
}
celery_app.conf.result_backend_transport_options = {
    "socket_timeout": SOCKET_TIMEOUT,
    "socket_connect_timeout": SOCKET_CONNECT_TIMEOUT,
}

Beat Schedule (Flask-Integrated Only)

The Flask-integrated factory configures periodic tasks:

celery_app.conf.beat_schedule = {
    "prune-old-artifact-files": {
        "task": "ibutsu_server.tasks.db.prune_old_files",
        "schedule": crontab(minute=0, hour=4, day_of_week=6),
        "args": (3,),  # Delete files older than 3 months
    },
    "prune-old-results": {
        "task": "ibutsu_server.tasks.db.prune_old_results",
        "schedule": crontab(minute=0, hour=5, day_of_week=6),
        "args": (5,),  # Delete results older than 5 months
    },
    "prune-old-runs": {
        "task": "ibutsu_server.tasks.db.prune_old_runs",
        "schedule": crontab(minute=0, hour=6, day_of_week=6),
        "args": (12,),  # Delete runs older than 12 months
    },
    "sync-aborted-runs": {
        "task": "ibutsu_server.tasks.runs.sync_aborted_runs",
        "schedule": 0.5 * 60 * 60,  # Every 30 minutes
    },
}

Backward Compatibility

The old ibutsu_server.tasks.create_celery_app() function is maintained as a thin wrapper:

# Old code (still works)
from ibutsu_server.tasks import create_celery_app
celery_app = create_celery_app(flask_app, name="my_app")

# New code (preferred)
from ibutsu_server.celery_utils import create_flask_celery_app
celery_app = create_flask_celery_app(flask_app, name="my_app")

Testing

Comprehensive tests are available in tests/test_celery_utils.py:

# Test broker-only factory
def test_create_broker_celery_app_configuration():
    app = create_broker_celery_app()
    assert app.conf.broker_url is not None
    assert app.main == "ibutsu_server_flower"

# Test Flask-integrated factory
def test_create_flask_celery_app_configuration(flask_app):
    client, _ = flask_app
    app = create_flask_celery_app(client.application)
    assert "celery" in client.application.extensions
    assert app.conf.beat_schedule is not None

Migration Guide

For New Code

Use the factory functions directly:

# For Flower monitoring
from ibutsu_server.celery_utils import create_broker_celery_app
flower_app = create_broker_celery_app()

# For workers/scheduler
from ibutsu_server.celery_utils import create_flask_celery_app
worker_app = create_flask_celery_app(flask_app, name="worker")

For Existing Code

No changes required. The old API still works:

# This continues to work
from ibutsu_server.tasks import create_celery_app
celery_app = create_celery_app(flask_app)

Troubleshooting

Flower Can’t Connect to Broker

Symptom: Flower fails to start with connection errors

Solution: Ensure CELERY_BROKER_URL environment variable is set:

export CELERY_BROKER_URL=redis://:password@redis.example.com:6379/0

Worker Can’t Access Database

Symptom: Tasks fail with “No application context” or database connection errors

Solution: Ensure Flask app is properly configured with database URI:

flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://...'

Tasks Not Discovered

Symptom: Worker starts but tasks are not registered

Solution: Ensure task modules are imported in create_flask_celery_app(). Check that all task files are in ibutsu_server/tasks/ and properly decorated with @shared_task.

Beat Schedule Not Running

Symptom: Periodic tasks don’t execute

Solution: Ensure you’re using the scheduler app, not the worker app:

celery --app=ibutsu_server:scheduler_app beat --loglevel=info

See Also

  • ibutsu_server/util/celery_task.py - IbutsuTask base class

  • ibutsu_server/util/app_context.py - Flask context management

  • tests/test_celery_utils.py - Comprehensive test suite

  • Deployment Architecture - Deployment architecture overview