Background Tasks

Catzilla provides a powerful background task system for executing long-running operations without blocking your API responses. Schedule tasks, monitor their progress, and handle graceful shutdowns with ease.

Overview

Catzilla’s background task system provides:

  • Non-Blocking Execution - Tasks run independently of HTTP requests

  • Task Scheduling - Schedule tasks for immediate or delayed execution

  • Progress Monitoring - Track task status and progress in real-time

  • Graceful Shutdown - Clean task termination on application shutdown

  • Task Queues - Priority-based task processing

  • Persistent Storage - Task results and state persistence

  • Error Handling - Robust error recovery and retry mechanisms

Quick Start

Basic Background Task

Execute a simple background task:

from catzilla import Catzilla, Request, Response, JSONResponse
import time

app = Catzilla()

# Enable background task system
app.enable_background_tasks(
    workers=4,
    enable_profiling=True,
    memory_pool_mb=500
)

def long_running_task():
    """Simulate a long-running operation"""
    for i in range(10):
        time.sleep(1)  # Simulate work
        print(f"Task progress: {i+1}/10")
    return {"result": "Task completed successfully"}

@app.post("/start-task")
def start_background_task(request: Request) -> Response:
    """Start a background task"""
    # Schedule the task using Catzilla's background task system
    task_result = app.add_task(long_running_task)

    return JSONResponse({
        "message": "Task started",
        "task_id": task_result.task_id,
        "status": "scheduled"
    })

@app.get("/")
def home(request: Request) -> Response:
    return JSONResponse({"message": "Hello with background tasks!"})

if __name__ == "__main__":
    print("🚀 Starting Catzilla background tasks example...")
    print("Try: curl -X POST http://localhost:8000/start-task")
    app.listen(port=8000)

Async Background Tasks

Background tasks with async operations:

from catzilla import Catzilla, Request, Response, JSONResponse
import asyncio
import time

app = Catzilla()

# Enable background task system
app.enable_background_tasks(
    workers=4,
    enable_profiling=True,
    memory_pool_mb=500
)

async def fetch_from_api():
    """Fetch data from external API"""
    # Simulate multiple API calls
    results = []
    for i in range(5):
        await asyncio.sleep(0.5)  # Simulate API delay
        results.append(f"Data chunk {i+1}")

    return {"data": results, "total_items": len(results)}

@app.post("/fetch-data")
def fetch_external_data(request: Request) -> Response:
    """Start async background task"""
    task_result = app.add_task(fetch_from_api)

    return JSONResponse({
        "message": "Async task started",
        "task_id": task_result.task_id,
        "estimated_duration": "2-3 seconds"
    })

if __name__ == "__main__":
    print("🚀 Starting async background tasks example...")
    print("Try: curl -X POST http://localhost:8000/fetch-data")
    app.listen(port=8000)

Task Scheduling

Delayed Execution

Schedule tasks for future execution:

from catzilla import Catzilla, Request, Response, JSONResponse
from catzilla.background_tasks import TaskPriority
from datetime import datetime, timedelta
import time

app = Catzilla()

# Enable background task system
app.enable_background_tasks(workers=4)

def send_reminder():
    """Send reminder notification"""
    print("📧 Sending reminder email...")
    # Simulate email sending
    time.sleep(2)
    return {"notification": "Reminder sent", "timestamp": datetime.now().isoformat()}

@app.post("/schedule-reminder")
def schedule_reminder(request: Request) -> Response:
    """Schedule a reminder task"""
    # Schedule task with priority (immediate execution)
    task_result = app.add_task(send_reminder, priority=TaskPriority.NORMAL)

    return JSONResponse({
        "message": "Reminder scheduled",
        "task_id": task_result.task_id,
        "scheduled_for": datetime.now().isoformat()
    })

if __name__ == "__main__":
    print("🚀 Starting task scheduling example...")
    print("Try: curl -X POST http://localhost:8000/schedule-reminder")
    app.listen(port=8000)

Recurring Tasks

Schedule periodic tasks:

from catzilla import Catzilla, Request, Response, JSONResponse
from datetime import datetime
import time

app = Catzilla()

# Enable background task system
app.enable_background_tasks(workers=4)

def check_system_health():
    """Monitor system health"""
    # Simulate system health check
    cpu_percent = 25.5  # Simulated CPU usage
    memory_percent = 60.2  # Simulated memory usage

    print(f"🖥️  System Health - CPU: {cpu_percent}%, Memory: {memory_percent}%")

    return {
        "cpu_percent": cpu_percent,
        "memory_percent": memory_percent,
        "timestamp": datetime.now().isoformat()
    }

@app.post("/start-monitoring")
def start_system_monitoring(request: Request) -> Response:
    """Start periodic system monitoring"""
    # Schedule recurring task
    task_result = app.add_task(check_system_health)

    return JSONResponse({
        "message": "System monitoring started",
        "task_id": task_result.task_id,
        "interval": "immediate",
        "note": "For periodic tasks, schedule multiple instances or use external schedulers"
    })

if __name__ == "__main__":
    print("🚀 Starting recurring tasks example...")
    print("Try: curl -X POST http://localhost:8000/start-monitoring")
    app.listen(port=8000)

Task Monitoring

Progress Tracking

Track task progress with status updates:

from catzilla import Catzilla, Request, Response, JSONResponse, Path
import time
import uuid
from datetime import datetime
from typing import Dict, Any

app = Catzilla()

# Enable background task system
app.enable_background_tasks(workers=4)

# Task storage for tracking (in production, use Redis or database)
task_storage: Dict[str, Dict[str, Any]] = {}

def data_processing_task(task_id: str):
    """Process data with progress updates"""
    task_storage[task_id] = {
        "id": task_id,
        "status": "running",
        "progress": 0,
        "started_at": datetime.now().isoformat()
    }

    try:
        # Simulate data processing steps
        steps = [
            "Loading data...",
            "Validating records...",
            "Processing batch 1/3...",
            "Processing batch 2/3...",
            "Processing batch 3/3...",
            "Generating report...",
            "Saving results..."
        ]

        for i, step in enumerate(steps):
            progress = (i + 1) * 100 // len(steps)
            task_storage[task_id].update({
                "progress": progress,
                "current_step": step
            })
            print(f"📊 Task {task_id}: {step} ({progress}%)")
            time.sleep(1)  # Simulate processing time

        task_storage[task_id].update({
            "status": "completed",
            "completed_at": datetime.now().isoformat(),
            "result": {
                "processed_records": 1000,
                "generated_file": "report_2024.pdf",
                "processing_time": f"{len(steps)} seconds"
            }
        })

    except Exception as e:
        task_storage[task_id].update({
            "status": "failed",
            "error": str(e),
            "failed_at": datetime.now().isoformat()
        })

@app.post("/process-data")
def process_large_dataset(request: Request) -> Response:
    """Process data with progress tracking"""
    task_id = f"data_{uuid.uuid4().hex[:8]}"

    # Schedule the background task
    task_result = app.add_task(data_processing_task, task_id)

    return JSONResponse({
        "message": "Data processing started",
        "task_id": task_id,
        "catzilla_task_id": task_result.task_id,
        "progress_available": True
    })

@app.get("/task-progress/{task_id}")
def get_task_progress(request: Request, task_id: str = Path(...)) -> Response:
    """Get detailed task progress"""
    if task_id not in task_storage:
        return JSONResponse({
            "error": "Task not found"
        }, status_code=404)

    progress_info = task_storage[task_id]

    return JSONResponse({
        "task_id": task_id,
        "progress_percent": progress_info.get("progress", 0),
        "current_step": progress_info.get("current_step", "Unknown"),
        "status": progress_info.get("status", "unknown"),
        "started_at": progress_info.get("started_at"),
        "result": progress_info.get("result")
    })

if __name__ == "__main__":
    print("🚀 Starting task progress tracking example...")
    print("Try: curl -X POST http://localhost:8000/process-data")
    print("Try: curl http://localhost:8000/task-progress/{task_id}")
    app.listen(port=8000)

Real-Time Task Updates

Get real-time task updates:

from catzilla import Catzilla, Request, Response, JSONResponse
from typing import Dict, Any, List

app = Catzilla()

# Enable background task system
app.enable_background_tasks(workers=4)

# Task storage for tracking
task_storage: Dict[str, Dict[str, Any]] = {}

@app.get("/active-tasks")
def list_active_tasks(request: Request) -> Response:
    """List all active tasks"""
    active_tasks = [
        task for task in task_storage.values()
        if task.get("status") == "running"
    ]

    return JSONResponse({
        "active_tasks": active_tasks,
        "total_count": len(active_tasks)
    })

@app.get("/task-stats")
def get_task_stats(request: Request) -> Response:
    """Get Catzilla background task system statistics"""
    try:
        # Get stats from the actual Catzilla background task system
        stats = app.get_task_stats()

        return JSONResponse({
            "catzilla_stats": {
                "queue_metrics": {
                    "critical_queue_size": stats.critical_queue_size,
                    "high_queue_size": stats.high_queue_size,
                    "normal_queue_size": stats.normal_queue_size,
                    "low_queue_size": stats.low_queue_size,
                    "total_queued": stats.total_queued
                },
                "worker_metrics": {
                    "active_workers": stats.active_workers,
                    "idle_workers": stats.idle_workers,
                    "total_workers": stats.total_workers
                },
                "performance_metrics": {
                    "tasks_per_second": stats.tasks_per_second,
                    "avg_execution_time_ms": stats.avg_execution_time_ms,
                    "memory_usage_mb": stats.memory_usage_mb
                }
            }
        })

    except Exception as e:
        return JSONResponse({
            "error": "Failed to get Catzilla task stats",
            "details": str(e)
        }, status_code=500)

if __name__ == "__main__":
    print("🚀 Starting real-time task updates example...")
    print("Try: curl http://localhost:8000/active-tasks")
    print("Try: curl http://localhost:8000/task-stats")
    app.listen(port=8000)

Error Handling and Retry

Task Error Recovery

Handle task failures with proper error handling:

from catzilla import Catzilla, Request, Response, JSONResponse
from catzilla.background_tasks import TaskPriority
import time
import random

app = Catzilla()

# Enable background task system
app.enable_background_tasks(workers=4)

def unreliable_operation():
    """Simulate an operation that might fail"""
    if random.random() < 0.3:  # 30% chance of failure
        raise Exception("Simulated network error")

    # Simulate successful operation
    time.sleep(2)
    return {"status": "success", "data": "Operation completed"}

@app.post("/unreliable-task")
def start_unreliable_task(request: Request) -> Response:
    """Start task that might fail"""
    # Schedule with higher priority for important tasks
    task_result = app.add_task(
        unreliable_operation,
        priority=TaskPriority.HIGH
    )

    return JSONResponse({
        "message": "Unreliable task started",
        "task_id": task_result.task_id,
        "note": "Task may succeed or fail randomly"
    })

if __name__ == "__main__":
    print("🚀 Starting error handling example...")
    print("Try: curl -X POST http://localhost:8000/unreliable-task")
    app.listen(port=8000)

Custom Error Handlers

Define custom error handling strategies:

from catzilla import Catzilla, Request, Response, JSONResponse
import random

app = Catzilla()

# Enable background task system
app.enable_background_tasks(workers=4)

def potentially_failing_task():
    """Simulate different types of errors"""
    error_type = random.choice(["connection", "validation", "success"])

    if error_type == "connection":
        raise ConnectionError("Failed to connect to external service")
    elif error_type == "validation":
        raise ValueError("Invalid data format")
    elif error_type == "success":
        return {"status": "success", "message": "Task completed successfully"}

@app.post("/task-with-error-handling")
def task_with_custom_errors(request: Request) -> Response:
    """Start task with error handling"""
    task_result = app.add_task(potentially_failing_task)

    return JSONResponse({
        "message": "Task with error handling started",
        "task_id": task_result.task_id,
        "note": "Task may succeed or fail with different error types"
    })

if __name__ == "__main__":
    print("🚀 Starting custom error handling example...")
    print("Try: curl -X POST http://localhost:8000/task-with-error-handling")
    app.listen(port=8000)

Production Patterns

Task Queues and Priorities

Manage task execution with priorities:

from catzilla import Catzilla, Request, Response, JSONResponse
from catzilla.background_tasks import TaskPriority
import time

app = Catzilla()

# Enable background task system
app.enable_background_tasks(workers=4)

def high_priority_task():
    """Critical system maintenance task"""
    print("🔧 Performing critical system maintenance...")
    time.sleep(5)
    return {"maintenance": "completed", "systems": "healthy"}

def process_batch_item(item_id: int):
    """Process individual batch item"""
    time.sleep(1)  # Simulate processing
    return {"item_id": item_id, "processed": True}

@app.post("/priority-task")
def schedule_priority_task(request: Request) -> Response:
    """Schedule task with specific priority"""
    task_result = app.add_task(
        high_priority_task,
        priority=TaskPriority.HIGH
    )

    return JSONResponse({
        "message": "High priority task scheduled",
        "task_id": task_result.task_id,
        "priority": "HIGH"
    })

@app.post("/batch-processing")
def schedule_batch_processing(request: Request) -> Response:
    """Schedule multiple related tasks"""
    # Schedule multiple tasks as a batch
    batch_tasks = []
    for i in range(10):
        task_result = app.add_task(
            lambda item=i: process_batch_item(item),
            priority=TaskPriority.NORMAL
        )
        batch_tasks.append(task_result.task_id)

    return JSONResponse({
        "message": "Batch processing started",
        "batch_tasks": batch_tasks,
        "total_items": len(batch_tasks)
    })

if __name__ == "__main__":
    print("🚀 Starting task priorities example...")
    print("Try: curl -X POST http://localhost:8000/priority-task")
    print("Try: curl -X POST http://localhost:8000/batch-processing")
    app.listen(port=8000)

Graceful Shutdown

Handle application shutdown gracefully:

from catzilla import Catzilla, Request, Response, JSONResponse
import signal
import sys
import time

app = Catzilla()

# Enable background task system
app.enable_background_tasks(workers=4)

# Global shutdown flag
shutdown_requested = False

def setup_graceful_shutdown():
    """Setup graceful shutdown handlers"""
    def signal_handler(signum, frame):
        global shutdown_requested
        print("🛑 Graceful shutdown initiated...")
        shutdown_requested = True

        # Give tasks time to complete
        print("⏳ Waiting for tasks to complete...")
        time.sleep(5)  # Simple wait - in production, monitor actual task completion

        print("✅ Graceful shutdown completed")
        sys.exit(0)

    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

# Setup graceful shutdown when app starts
setup_graceful_shutdown()

@app.get("/shutdown-status")
def get_shutdown_status(request: Request) -> Response:
    """Get current shutdown status"""
    return JSONResponse({
        "shutdown_requested": shutdown_requested,
        "message": "Server is running normally" if not shutdown_requested else "Shutdown in progress"
    })

if __name__ == "__main__":
    print("🚀 Starting graceful shutdown example...")
    print("Try: curl http://localhost:8000/shutdown-status")
    print("Press Ctrl+C to test graceful shutdown")
    app.listen(port=8000)

Task Result Storage

Persistent Results

Store and retrieve task results:

from catzilla import Catzilla, Request, Response, JSONResponse, Path
from datetime import datetime
import time
from typing import Dict, Any

app = Catzilla()

# Enable background task system
app.enable_background_tasks(workers=4)

# Task results storage (in production, use Redis or database)
task_results: Dict[str, Any] = {}

def complex_calculation(task_id: str):
    """Perform complex mathematical calculation"""
    result = 0
    for i in range(1000000):
        result += i ** 2

    calculation_result = {
        "calculation": "sum_of_squares",
        "range": "1 to 1,000,000",
        "result": result,
        "computed_at": datetime.now().isoformat()
    }

    # Store result for later retrieval
    task_results[task_id] = calculation_result
    return calculation_result

@app.post("/long-calculation")
def start_calculation(request: Request) -> Response:
    """Start a calculation with persistent results"""
    import uuid
    task_id = f"calc_{uuid.uuid4().hex[:8]}"

    # Schedule the task
    task_result = app.add_task(complex_calculation, task_id)

    return JSONResponse({
        "message": "Calculation started",
        "task_id": task_id,
        "catzilla_task_id": task_result.task_id,
        "note": "Result will be stored for retrieval"
    })

@app.get("/calculation-result/{task_id}")
def get_calculation_result(request: Request, task_id: str = Path(...)) -> Response:
    """Retrieve stored calculation result"""
    if task_id not in task_results:
        return JSONResponse({
            "error": "Result not found or not yet available"
        }, status_code=404)

    return JSONResponse({
        "task_id": task_id,
        "result": task_results[task_id],
        "retrieved_at": datetime.now().isoformat()
    })

if __name__ == "__main__":
    print("🚀 Starting task result storage example...")
    print("Try: curl -X POST http://localhost:8000/long-calculation")
    print("Try: curl http://localhost:8000/calculation-result/{task_id}")
    app.listen(port=8000)

Task Analytics

Monitor task performance and metrics:

from catzilla import Catzilla, Request, Response, JSONResponse
from datetime import datetime

app = Catzilla()

# Enable background task system
app.enable_background_tasks(workers=4)

@app.get("/task-analytics")
def get_task_analytics(request: Request) -> Response:
    """Get task system analytics"""
    try:
        # Get stats from the actual Catzilla background task system
        stats = app.get_task_stats()

        return JSONResponse({
            "total_tasks_executed": stats.total_tasks_processed,
            "performance_metrics": {
                "tasks_per_second": stats.tasks_per_second,
                "average_execution_time_ms": stats.avg_execution_time_ms,
                "p95_execution_time_ms": stats.p95_execution_time_ms
            },
            "queue_status": {
                "total_queued": stats.total_queued,
                "queue_pressure": stats.queue_pressure
            },
            "worker_status": {
                "active_workers": stats.active_workers,
                "total_workers": stats.total_workers,
                "worker_utilization": stats.avg_worker_utilization
            },
            "memory_usage": {
                "current_mb": stats.memory_usage_mb,
                "efficiency": stats.memory_efficiency
            },
            "error_metrics": {
                "failed_tasks": stats.failed_tasks,
                "error_rate": stats.error_rate
            },
            "uptime_seconds": stats.uptime_seconds
        })

    except Exception as e:
        return JSONResponse({
            "error": "Failed to get analytics",
            "details": str(e)
        }, status_code=500)

@app.get("/task-performance")
def get_task_performance(request: Request) -> Response:
    """Get detailed performance metrics"""
    try:
        stats = app.get_task_stats()

        return JSONResponse({
            "performance_summary": {
                "throughput": f"{stats.tasks_per_second:.2f} tasks/sec",
                "avg_response_time": f"{stats.avg_execution_time_ms:.2f}ms",
                "memory_efficiency": f"{stats.memory_efficiency:.1f}%",
                "worker_utilization": f"{stats.avg_worker_utilization:.1f}%"
            },
            "detailed_metrics": {
                "execution_times": {
                    "average_ms": stats.avg_execution_time_ms,
                    "p95_ms": stats.p95_execution_time_ms,
                    "p99_ms": stats.p99_execution_time_ms
                },
                "resource_usage": {
                    "cpu_usage": stats.worker_cpu_usage,
                    "memory_mb": stats.worker_memory_usage,
                    "engine_cpu": stats.engine_cpu_usage
                }
            },
            "timestamp": datetime.now().isoformat()
        })

    except Exception as e:
        return JSONResponse({
            "error": "Failed to get performance metrics",
            "details": str(e)
        }, status_code=500)

if __name__ == "__main__":
    print("🚀 Starting task analytics example...")
    print("Try: curl http://localhost:8000/task-analytics")
    print("Try: curl http://localhost:8000/task-performance")
    app.listen(port=8000)

Best Practices

Task Design Guidelines

from catzilla import Catzilla, Request, Response, JSONResponse

app = Catzilla()
app.enable_background_tasks(workers=4)

# ✅ Good: Stateless tasks
def good_task(data):
    """Process data without external dependencies"""
    return {"processed": len(data), "result": data.upper()}

# ❌ Avoid: Tasks with external state
global_counter = 0
def bad_task(data):
    """Task depends on global state"""
    global global_counter
    global_counter += 1  # Race condition risk
    return {"count": global_counter}

# ✅ Good: Idempotent tasks
def idempotent_task(user_id: str, email: str):
    """Task can be safely retried"""
    # Check if email was already sent
    if not email_already_sent(user_id):
        send_email(email)
    return {"email_sent": True}

# ✅ Good: Proper error handling
def robust_task(url: str):
    """Task with proper error handling"""
    try:
        response = fetch_url(url)
        return {"data": response.json()}
    except ConnectionError:
        raise  # Let Catzilla handle connection errors
    except ValueError as e:
        # Don't retry validation errors
        return {"error": str(e), "retry": False}

# Helper functions (would be implemented in real app)
def email_already_sent(user_id: str) -> bool:
    return False  # Placeholder

def send_email(email: str):
    pass  # Placeholder

def fetch_url(url: str):
    pass  # Placeholder

@app.post("/good-task")
def schedule_good_task(request: Request) -> Response:
    """Schedule a well-designed task"""
    task_result = app.add_task(good_task, "sample data")
    return JSONResponse({"task_id": task_result.task_id})

if __name__ == "__main__":
    print("🚀 Starting best practices example...")
    print("Try: curl -X POST http://localhost:8000/good-task")
    app.listen(port=8000)

Performance Tips

from catzilla import Catzilla, Request, Response, JSONResponse
from catzilla.background_tasks import TaskPriority
import asyncio

app = Catzilla()
app.enable_background_tasks(workers=4)

# ✅ Use async for I/O-bound tasks
async def io_bound_task():
    # Simulate async I/O operation
    await asyncio.sleep(1)
    return {"result": "I/O operation completed"}

# ✅ Use sync for CPU-bound tasks
def cpu_bound_task(data):
    # Simulate CPU-intensive computation
    result = sum(i ** 2 for i in range(10000))
    return {"computation_result": result}

# ✅ Batch related operations
def batch_task(items):
    """Process multiple items together"""
    results = []
    for item in items:
        results.append(f"processed_{item}")
    return {"batch_results": results, "count": len(results)}

@app.post("/io-task")
def schedule_io_task(request: Request) -> Response:
    """Schedule I/O-bound task"""
    task_result = app.add_task(io_bound_task, priority=TaskPriority.NORMAL)
    return JSONResponse({"task_id": task_result.task_id, "type": "io_bound"})

@app.post("/cpu-task")
def schedule_cpu_task(request: Request) -> Response:
    """Schedule CPU-bound task"""
    task_result = app.add_task(cpu_bound_task, "sample_data", priority=TaskPriority.HIGH)
    return JSONResponse({"task_id": task_result.task_id, "type": "cpu_bound"})

@app.post("/batch-task")
def schedule_batch_task(request: Request) -> Response:
    """Schedule batch processing task"""
    items = ["item1", "item2", "item3", "item4", "item5"]
    task_result = app.add_task(batch_task, items, priority=TaskPriority.NORMAL)
    return JSONResponse({"task_id": task_result.task_id, "type": "batch", "items": len(items)})

if __name__ == "__main__":
    print("🚀 Starting performance tips example...")
    print("Try: curl -X POST http://localhost:8000/io-task")
    print("Try: curl -X POST http://localhost:8000/cpu-task")
    print("Try: curl -X POST http://localhost:8000/batch-task")
    app.listen(port=8000)

This comprehensive background task system enables you to build scalable, responsive applications that can handle complex workflows and long-running operations efficiently.