Celery Utils Module¶
Overview¶
The ibutsu_server.celery_utils module provides consolidated factory functions for creating Celery applications in Ibutsu. It implements two distinct configuration modes:
Broker-Only Mode - Minimal configuration for monitoring (Flower)
Flask-Integrated Mode - Full application context for task execution (Workers, Scheduler)
This architecture eliminates code duplication and provides a single source of truth for Celery app creation.
Architecture¶
The module provides a unified interface for all Celery app creation:
┌─────────────────────────────────────────────────────────────┐
│ celery_utils.py │
│ (Single Source of Truth) │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ create_broker_celery_app() │ │
│ │ • Broker-only configuration │ │
│ │ • No Flask app required │ │
│ │ • No database access │ │
│ │ • Used by: Flower monitoring │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ create_flask_celery_app(flask_app, name) │ │
│ │ • Full Flask integration │ │
│ │ • Database access via IbutsuTask │ │
│ │ • Task imports & beat schedule │ │
│ │ • Used by: Workers, Scheduler │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Factory Functions¶
create_broker_celery_app()¶
Creates a minimal broker-only Celery app for monitoring purposes.
Function Signature:
def create_broker_celery_app(name="ibutsu_server_flower") -> Celery
Use Case: Flower monitoring dashboard
Configuration:
Reads
CELERY_BROKER_URLfrom environment (required)Reads
CELERY_RESULT_BACKENDfrom environment (optional, defaults to broker URL)Configures Redis socket timeouts and retry behavior
Does NOT import task modules
Does NOT require database access
Does NOT initialize Flask app
Example:
from ibutsu_server.celery_utils import create_broker_celery_app
# Create broker-only app for Flower
flower_app = create_broker_celery_app(name="ibutsu_server_flower")
# Use with Celery CLI:
# celery --app=ibutsu_server:flower_app flower --port=5555
Environment Variables:
CELERY_BROKER_URL=redis://:password@redis.example.com:6379/0
CELERY_RESULT_BACKEND=redis://:password@redis.example.com:6379/0 # Optional
Raises:
ValueErrorifCELERY_BROKER_URLis not set
create_flask_celery_app()¶
Creates a Flask-integrated Celery app with full application context.
Function Signature:
def create_flask_celery_app(app: Flask, name="ibutsu_server") -> Celery
Use Case: Worker and Scheduler containers that execute tasks
Configuration:
Requires Flask app instance with database configuration
Stores Flask app reference for IbutsuTask context management
Configures from Flask app config using
CELERYnamespaceImports all task modules (db, importers, query, results, runs)
Configures beat schedule for periodic tasks
Sets up signal handlers for task failure retry
Stores Celery app in
flask_app.extensions["celery"]
Example:
from flask import Flask
from ibutsu_server.celery_utils import create_flask_celery_app
# Create Flask app with configuration
flask_app = Flask(__name__)
flask_app.config['CELERY'] = {
'broker_url': 'redis://localhost:6379/0',
'result_backend': 'redis://localhost:6379/0',
}
flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://...'
# Create worker app
worker_app = create_flask_celery_app(flask_app, name="ibutsu_server_worker")
# Use with Celery CLI:
# celery --app=ibutsu_server:worker_app worker --loglevel=info
Flask Config Requirements:
flask_app.config['CELERY'] = {
'broker_url': 'redis://...',
'result_backend': 'redis://...',
'broker_connection_retry': True,
'broker_connection_retry_on_startup': True,
'worker_cancel_long_running_tasks_on_connection_loss': True,
'include': [
'ibutsu_server.tasks.db',
'ibutsu_server.tasks.importers',
'ibutsu_server.tasks.query',
'ibutsu_server.tasks.results',
'ibutsu_server.tasks.runs',
],
}
flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://...'
Raises:
ValueErrorif Flask app is None
Integration with _AppRegistry¶
The _AppRegistry class in ibutsu_server/__init__.py delegates to these factory functions:
class _AppRegistry:
@classmethod
def get_flower_app(cls):
if cls.flower_app is None:
cls.flower_app = create_broker_celery_app(
name="ibutsu_server_flower"
)
return cls.flower_app
@classmethod
def get_worker_app(cls):
if cls.worker_app is None:
flask_app = cls.get_flask_app()
cls.worker_app = create_flask_celery_app(
flask_app, name="ibutsu_server_worker"
)
return cls.worker_app
@classmethod
def get_scheduler_app(cls):
if cls.scheduler_app is None:
flask_app = cls.get_flask_app()
cls.scheduler_app = create_flask_celery_app(
flask_app, name="ibutsu_server_scheduler"
)
return cls.scheduler_app
Module-Level Exports¶
The module uses __getattr__ for lazy initialization:
# Import any of these and they'll be initialized on first access
from ibutsu_server import flower_app # Broker-only
from ibutsu_server import worker_app # Flask-integrated
from ibutsu_server import scheduler_app # Flask-integrated
Container Entry Points¶
Worker Container¶
CMD ["celery", "--app", "ibutsu_server:worker_app", \
"worker", "--events", "--loglevel=info"]
Scheduler Container¶
CMD ["celery", "--app", "ibutsu_server:scheduler_app", \
"beat", "--loglevel=info"]
Flower Container¶
CMD ["celery", "--app=ibutsu_server:flower_app", \
"flower", "--port=5555"]
Configuration Comparison¶
Feature |
Broker-Only (Flower) |
Flask-Integrated (Worker/Scheduler) |
|---|---|---|
Factory Function |
|
|
Flask App Required |
❌ No |
✅ Yes |
Database Access |
❌ No |
✅ Yes |
Task Imports |
❌ No |
✅ Yes |
Beat Schedule |
❌ No |
✅ Yes (periodic tasks) |
IbutsuTask Context |
❌ No |
✅ Yes (automatic) |
Signal Handlers |
❌ No |
✅ Yes (retry on failure) |
Env Vars Required |
|
All Flask config + DB |
Use Case |
Monitoring |
Task execution |
Socket Timeout Configuration¶
Both factory functions configure Redis socket timeouts from ibutsu_server.constants:
from ibutsu_server.constants import (
SOCKET_TIMEOUT,
SOCKET_CONNECT_TIMEOUT
)
# Applied to both broker-only and Flask-integrated apps
celery_app.conf.redis_socket_timeout = SOCKET_TIMEOUT # 5 seconds
celery_app.conf.redis_socket_connect_timeout = SOCKET_CONNECT_TIMEOUT
celery_app.conf.redis_retry_on_timeout = True
celery_app.conf.broker_transport_options = {
"socket_timeout": SOCKET_TIMEOUT,
"socket_connect_timeout": SOCKET_CONNECT_TIMEOUT,
}
celery_app.conf.result_backend_transport_options = {
"socket_timeout": SOCKET_TIMEOUT,
"socket_connect_timeout": SOCKET_CONNECT_TIMEOUT,
}
Beat Schedule (Flask-Integrated Only)¶
The Flask-integrated factory configures periodic tasks:
celery_app.conf.beat_schedule = {
"prune-old-artifact-files": {
"task": "ibutsu_server.tasks.db.prune_old_files",
"schedule": crontab(minute=0, hour=4, day_of_week=6),
"args": (3,), # Delete files older than 3 months
},
"prune-old-results": {
"task": "ibutsu_server.tasks.db.prune_old_results",
"schedule": crontab(minute=0, hour=5, day_of_week=6),
"args": (5,), # Delete results older than 5 months
},
"prune-old-runs": {
"task": "ibutsu_server.tasks.db.prune_old_runs",
"schedule": crontab(minute=0, hour=6, day_of_week=6),
"args": (12,), # Delete runs older than 12 months
},
"sync-aborted-runs": {
"task": "ibutsu_server.tasks.runs.sync_aborted_runs",
"schedule": 0.5 * 60 * 60, # Every 30 minutes
},
}
Backward Compatibility¶
The old ibutsu_server.tasks.create_celery_app() function is maintained as a thin wrapper:
# Old code (still works)
from ibutsu_server.tasks import create_celery_app
celery_app = create_celery_app(flask_app, name="my_app")
# New code (preferred)
from ibutsu_server.celery_utils import create_flask_celery_app
celery_app = create_flask_celery_app(flask_app, name="my_app")
Testing¶
Comprehensive tests are available in tests/test_celery_utils.py:
# Test broker-only factory
def test_create_broker_celery_app_configuration():
app = create_broker_celery_app()
assert app.conf.broker_url is not None
assert app.main == "ibutsu_server_flower"
# Test Flask-integrated factory
def test_create_flask_celery_app_configuration(flask_app):
client, _ = flask_app
app = create_flask_celery_app(client.application)
assert "celery" in client.application.extensions
assert app.conf.beat_schedule is not None
Migration Guide¶
For New Code¶
Use the factory functions directly:
# For Flower monitoring
from ibutsu_server.celery_utils import create_broker_celery_app
flower_app = create_broker_celery_app()
# For workers/scheduler
from ibutsu_server.celery_utils import create_flask_celery_app
worker_app = create_flask_celery_app(flask_app, name="worker")
For Existing Code¶
No changes required. The old API still works:
# This continues to work
from ibutsu_server.tasks import create_celery_app
celery_app = create_celery_app(flask_app)
Troubleshooting¶
Flower Can’t Connect to Broker¶
Symptom: Flower fails to start with connection errors
Solution: Ensure CELERY_BROKER_URL environment variable is set:
export CELERY_BROKER_URL=redis://:password@redis.example.com:6379/0
Worker Can’t Access Database¶
Symptom: Tasks fail with “No application context” or database connection errors
Solution: Ensure Flask app is properly configured with database URI:
flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://...'
Tasks Not Discovered¶
Symptom: Worker starts but tasks are not registered
Solution: Ensure task modules are imported in create_flask_celery_app(). Check that all task files are in ibutsu_server/tasks/ and properly decorated with @shared_task.
Beat Schedule Not Running¶
Symptom: Periodic tasks don’t execute
Solution: Ensure you’re using the scheduler app, not the worker app:
celery --app=ibutsu_server:scheduler_app beat --loglevel=info
See Also¶
ibutsu_server/util/celery_task.py- IbutsuTask base classibutsu_server/util/app_context.py- Flask context managementtests/test_celery_utils.py- Comprehensive test suiteDeployment Architecture - Deployment architecture overview