Real-World Recipes

This section provides practical, production-ready patterns and recipes for common web development scenarios using Catzilla. These examples are based on real-world use cases and demonstrate best practices.

Authentication Patterns

JWT Authentication

Implement secure JWT-based authentication:

from catzilla import Catzilla, Request, Response, JSONResponse, BaseModel, Field
import time
import hashlib
import hmac
import secrets
import json
import base64
from typing import Optional, List, Dict, Any
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
import uuid

app = Catzilla(production=False, show_banner=True, log_requests=True)

# Configuration
AUTH_CONFIG = {
    "jwt_secret": "your-super-secret-jwt-key-change-in-production",
    "jwt_algorithm": "HS256",
    "jwt_expiry_hours": 24,
    "password_salt_rounds": 12,
    "max_login_attempts": 5,
    "lockout_duration_minutes": 15
}

# User roles
class UserRole(str, Enum):
    ADMIN = "admin"
    USER = "user"
    READONLY = "readonly"

# Data models
class LoginModel(BaseModel):
    email: str = Field(regex=r'^[^@]+@[^@]+\.[^@]+$', description="Email address")
    password: str = Field(min_length=6, description="Password")

class RegisterModel(BaseModel):
    name: str = Field(min_length=2, max_length=50, description="Full name")
    email: str = Field(regex=r'^[^@]+@[^@]+\.[^@]+$', description="Email address")
    password: str = Field(min_length=6, description="Password")
    role: UserRole = UserRole.USER

@dataclass
class User:
    id: str
    name: str
    email: str
    password_hash: str
    role: UserRole
    is_active: bool = True
    login_attempts: int = 0
    locked_until: Optional[datetime] = None
    created_at: datetime = None

    def __post_init__(self):
        if self.created_at is None:
            self.created_at = datetime.now()

# In-memory storage
users_db: Dict[str, User] = {}

# Utility functions
def hash_password(password: str) -> str:
    """Hash password with salt"""
    salt = secrets.token_hex(16)
    password_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
    return f"{salt}:{password_hash.hex()}"

def verify_password(password: str, password_hash: str) -> bool:
    """Verify password against hash"""
    try:
        salt, hash_hex = password_hash.split(':')
        password_hash_check = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
        return hmac.compare_digest(hash_hex, password_hash_check.hex())
    except ValueError:
        return False

def generate_jwt(user: User) -> str:
    """Generate JWT token for user (simplified for demo)"""
    payload = {
        'user_id': user.id,
        'email': user.email,
        'role': user.role.value,
        'exp': (datetime.utcnow() + timedelta(hours=AUTH_CONFIG["jwt_expiry_hours"])).timestamp(),
        'iat': datetime.utcnow().timestamp()
    }
    return base64.b64encode(json.dumps(payload).encode()).decode()

def verify_jwt(token: str) -> Optional[Dict[str, Any]]:
    """Verify and decode JWT token (simplified for demo)"""
    try:
        payload = json.loads(base64.b64decode(token.encode()).decode())
        if payload.get('exp') and payload['exp'] < datetime.utcnow().timestamp():
            return None
        return payload
    except:
        return None

# Authentication middleware
@app.middleware(priority=20, pre_route=True, name="authentication")
def authentication_middleware(request: Request) -> Optional[Response]:
    """JWT authentication middleware"""
    # Public paths that don't require auth
    public_paths = {'/', '/auth/login', '/auth/register', '/health'}

    if request.path in public_paths:
        return None

    # Try JWT authentication
    auth_header = request.headers.get('Authorization', '')
    if auth_header.startswith('Bearer '):
        token = auth_header[7:]
        payload = verify_jwt(token)
        if payload:
            user_id = payload['user_id']
            if user_id in users_db:
                user = users_db[user_id]
                if user.is_active:
                    if not hasattr(request, 'context'):
                        request.context = {}
                    request.context['user'] = user
                    request.context['auth_payload'] = payload
                    return None

        return JSONResponse({"error": "Invalid or expired token"}, status_code=401)

    return JSONResponse({"error": "Authentication required"}, status_code=401)

# Authentication endpoints
@app.post("/auth/register")
def register(request: Request, user_data: RegisterModel) -> Response:
    """Register new user"""
    # Check if email already exists
    for user in users_db.values():
        if user.email == user_data.email:
            return JSONResponse({"error": "Email already registered"}, status_code=409)

    # Create new user
    user_id = str(uuid.uuid4())
    password_hash = hash_password(user_data.password)

    user = User(
        id=user_id,
        name=user_data.name,
        email=user_data.email,
        password_hash=password_hash,
        role=user_data.role
    )

    users_db[user_id] = user

    # Generate JWT token
    token = generate_jwt(user)

    return JSONResponse({
        "user": {
            "id": user.id,
            "name": user.name,
            "email": user.email,
            "role": user.role.value
        },
        "token": token,
        "expires_in": AUTH_CONFIG["jwt_expiry_hours"] * 3600
    }, status_code=201)

@app.post("/auth/login")
def login(request: Request, login_data: LoginModel) -> Response:
    """Login user"""
    # Find user by email
    user = None
    for u in users_db.values():
        if u.email == login_data.email:
            user = u
            break

    if not user:
        return JSONResponse({"error": "Invalid email or password"}, status_code=401)

    # Check if account is locked
    if user.locked_until and datetime.now() < user.locked_until:
        minutes_left = (user.locked_until - datetime.now()).total_seconds() / 60
        return JSONResponse(
            {"error": f"Account locked. Try again in {int(minutes_left)} minutes"},
            status_code=423
        )

    # Verify password
    if not verify_password(login_data.password, user.password_hash):
        user.login_attempts += 1
        if user.login_attempts >= AUTH_CONFIG["max_login_attempts"]:
            user.locked_until = datetime.now() + timedelta(minutes=AUTH_CONFIG["lockout_duration_minutes"])
            return JSONResponse(
                {"error": f"Too many failed attempts. Account locked for {AUTH_CONFIG['lockout_duration_minutes']} minutes"},
                status_code=423
            )
        return JSONResponse({"error": "Invalid email or password"}, status_code=401)

    # Check if account is active
    if not user.is_active:
        return JSONResponse({"error": "Account deactivated"}, status_code=403)

    # Reset login attempts on successful login
    user.login_attempts = 0
    user.locked_until = None

    # Generate JWT token
    token = generate_jwt(user)

    return JSONResponse({
        "user": {
            "id": user.id,
            "name": user.name,
            "email": user.email,
            "role": user.role.value
        },
        "token": token,
        "expires_in": AUTH_CONFIG["jwt_expiry_hours"] * 3600
    })

@app.get("/auth/profile")
def get_profile(request: Request) -> Response:
    """Get user profile"""
    user = getattr(request, 'context', {}).get('user')
    return JSONResponse({
        "id": user.id,
        "name": user.name,
        "email": user.email,
        "role": user.role.value,
        "is_active": user.is_active,
        "created_at": user.created_at.isoformat()
    })

# Protected endpoint example
@app.get("/api/protected")
def protected_endpoint(request: Request) -> Response:
    """Protected endpoint requiring authentication"""
    user = getattr(request, 'context', {}).get('user')
    return JSONResponse({
        "message": "Access granted to protected resource",
        "user": {
            "id": user.id,
            "name": user.name,
            "role": user.role.value
        }
    })

if __name__ == "__main__":
    # Seed admin user for testing
    admin_id = str(uuid.uuid4())
    admin_user = User(
        id=admin_id,
        name="System Admin",
        email="admin@example.com",
        password_hash=hash_password("admin123"),
        role=UserRole.ADMIN
    )
    users_db[admin_id] = admin_user

    print("🔐 Admin user created:")
    print("   Email: admin@example.com")
    print("   Password: admin123")

    app.listen(port=8000)

Session-Based Authentication

Alternative session-based authentication:

from catzilla import Catzilla, Request, Response, JSONResponse, BaseModel, Field
import uuid
import time
from datetime import datetime, timedelta
from typing import Optional, Dict

app = Catzilla(production=False, show_banner=True, log_requests=True)

# Session storage (use Redis in production)
sessions: Dict[str, Dict] = {}

class LoginModel(BaseModel):
    username: str = Field(min_length=3, max_length=20, description="Username")
    password: str = Field(min_length=6, description="Password")

class SessionManager:
    def __init__(self, session_timeout_minutes: int = 30):
        self.session_timeout = timedelta(minutes=session_timeout_minutes)

    def create_session(self, user_data: dict) -> str:
        """Create new user session"""
        session_id = str(uuid.uuid4())

        sessions[session_id] = {
            "user": user_data,
            "created_at": datetime.utcnow(),
            "last_accessed": datetime.utcnow()
        }

        return session_id

    def get_session(self, session_id: str) -> Optional[dict]:
        """Get user from session"""
        session = sessions.get(session_id)

        if not session:
            return None

        # Check if session expired
        if datetime.utcnow() - session["last_accessed"] > self.session_timeout:
            del sessions[session_id]
            return None

        # Update last accessed time
        session["last_accessed"] = datetime.utcnow()

        return session["user"]

    def destroy_session(self, session_id: str):
        """Destroy user session"""
        sessions.pop(session_id, None)

session_manager = SessionManager()

# Mock user database
users_db = {
    "admin": {"username": "admin", "password": "admin123", "email": "admin@example.com"},
    "user": {"username": "user", "password": "user123", "email": "user@example.com"}
}

def authenticate_user(username: str, password: str) -> Optional[dict]:
    """Simple authentication"""
    user_data = users_db.get(username)
    if user_data and user_data["password"] == password:
        return {"username": user_data["username"], "email": user_data["email"]}
    return None

@app.post("/session-login")
def session_login(request: Request, login_data: LoginModel) -> Response:
    """Session-based login"""
    user = authenticate_user(login_data.username, login_data.password)
    if not user:
        return JSONResponse({"error": "Invalid credentials"}, status_code=401)

    session_id = session_manager.create_session(user)

    response = JSONResponse({
        "message": "Login successful",
        "user": user
    })

    # Set session cookie (simplified - in production use secure settings)
    response.headers["Set-Cookie"] = f"session_id={session_id}; Path=/; Max-Age=1800; HttpOnly"

    return response

@app.post("/session-logout")
def session_logout(request: Request) -> Response:
    """Session-based logout"""
    # Get session ID from cookie header
    cookie_header = request.headers.get("Cookie", "")
    session_id = None

    if cookie_header:
        # Simple cookie parsing (in production use proper cookie parser)
        for part in cookie_header.split(";"):
            if "session_id=" in part:
                session_id = part.split("session_id=")[1].strip()
                break

    if session_id:
        session_manager.destroy_session(session_id)

    response = JSONResponse({"message": "Logout successful"})
    response.headers["Set-Cookie"] = "session_id=; Path=/; Max-Age=0; HttpOnly"

    return response

@app.get("/session-profile")
def session_profile(request: Request) -> Response:
    """Get profile using session"""
    # Get session ID from cookie
    cookie_header = request.headers.get("Cookie", "")
    session_id = None

    if cookie_header:
        for part in cookie_header.split(";"):
            if "session_id=" in part:
                session_id = part.split("session_id=")[1].strip()
                break

    if not session_id:
        return JSONResponse({"error": "No session found"}, status_code=401)

    user = session_manager.get_session(session_id)
    if not user:
        return JSONResponse({"error": "Invalid or expired session"}, status_code=401)

    return JSONResponse({"user": user, "session_active": True})

if __name__ == "__main__":
    app.listen(port=8000)

REST API Patterns

RESTful Resource Management

Complete CRUD operations with validation:

from catzilla import Catzilla, Request, Response, JSONResponse, BaseModel, Field, Query, Path
from typing import Optional, List, Dict, Any
from datetime import datetime
from dataclasses import dataclass
from enum import Enum
import uuid

app = Catzilla(production=False, show_banner=True, log_requests=True)

# Data models
class UserStatus(str, Enum):
    ACTIVE = "active"
    INACTIVE = "inactive"
    SUSPENDED = "suspended"

class TaskCreate(BaseModel):
    title: str = Field(min_length=1, max_length=200, description="Task title")
    description: Optional[str] = Field(None, max_length=1000, description="Task description")
    priority: int = Field(1, ge=1, le=5, description="Priority level")
    due_date: Optional[str] = None

class TaskUpdate(BaseModel):
    title: Optional[str] = Field(None, min_length=1, max_length=200)
    description: Optional[str] = Field(None, max_length=1000)
    priority: Optional[int] = Field(None, ge=1, le=5)
    due_date: Optional[str] = None
    completed: Optional[bool] = None

@dataclass
class Task:
    id: str
    title: str
    description: Optional[str]
    priority: int
    due_date: Optional[str]
    completed: bool = False
    created_at: datetime = None
    updated_at: datetime = None

    def __post_init__(self):
        if self.created_at is None:
            self.created_at = datetime.utcnow()
        if self.updated_at is None:
            self.updated_at = datetime.utcnow()

# Mock database
tasks_db: Dict[str, Task] = {}

class TaskService:
    @staticmethod
    def create_task(task_data: TaskCreate) -> Task:
        """Create new task"""
        task_id = str(uuid.uuid4())
        task = Task(
            id=task_id,
            title=task_data.title,
            description=task_data.description,
            priority=task_data.priority,
            due_date=task_data.due_date
        )
        tasks_db[task_id] = task
        return task

    @staticmethod
    def get_task(task_id: str) -> Optional[Task]:
        """Get task by ID"""
        return tasks_db.get(task_id)

    @staticmethod
    def update_task(task_id: str, task_data: TaskUpdate) -> Optional[Task]:
        """Update existing task"""
        task = tasks_db.get(task_id)
        if not task:
            return None

        if task_data.title is not None:
            task.title = task_data.title
        if task_data.description is not None:
            task.description = task_data.description
        if task_data.priority is not None:
            task.priority = task_data.priority
        if task_data.due_date is not None:
            task.due_date = task_data.due_date
        if task_data.completed is not None:
            task.completed = task_data.completed

        task.updated_at = datetime.utcnow()
        return task

    @staticmethod
    def delete_task(task_id: str) -> bool:
        """Delete task"""
        return tasks_db.pop(task_id, None) is not None

    @staticmethod
    def list_tasks(skip: int = 0, limit: int = 100, completed: Optional[bool] = None) -> List[Task]:
        """List tasks with pagination and filtering"""
        all_tasks = list(tasks_db.values())

        if completed is not None:
            all_tasks = [t for t in all_tasks if t.completed == completed]

        all_tasks.sort(key=lambda x: x.created_at, reverse=True)
        return all_tasks[skip:skip + limit]

def serialize_task(task: Task) -> Dict[str, Any]:
    """Serialize task for JSON response"""
    return {
        "id": task.id,
        "title": task.title,
        "description": task.description,
        "priority": task.priority,
        "due_date": task.due_date,
        "completed": task.completed,
        "created_at": task.created_at.isoformat(),
        "updated_at": task.updated_at.isoformat()
    }

# REST API endpoints
@app.get("/api/tasks")
def list_tasks(
    request: Request,
    skip: int = Query(0, ge=0, description="Number of tasks to skip"),
    limit: int = Query(10, ge=1, le=100, description="Number of tasks to return"),
    completed: Optional[bool] = Query(None, description="Filter by completion status")
) -> Response:
    """List all tasks with pagination and filtering"""
    tasks = TaskService.list_tasks(skip=skip, limit=limit, completed=completed)

    return JSONResponse({
        "tasks": [serialize_task(task) for task in tasks],
        "pagination": {
            "skip": skip,
            "limit": limit,
            "total": len(tasks_db),
            "returned": len(tasks)
        },
        "filters": {"completed": completed}
    })

@app.post("/api/tasks")
def create_task(request: Request, task: TaskCreate) -> Response:
    """Create a new task"""
    new_task = TaskService.create_task(task)
    return JSONResponse(
        {"task": serialize_task(new_task), "message": "Task created successfully"},
        status_code=201
    )

@app.get("/api/tasks/{task_id}")
def get_task(request: Request, task_id: str = Path(..., description="Task ID")) -> Response:
    """Get a specific task by ID"""
    task = TaskService.get_task(task_id)

    if not task:
        return JSONResponse({"error": "Task not found"}, status_code=404)

    return JSONResponse({"task": serialize_task(task)})

@app.put("/api/tasks/{task_id}")
def update_task(
    request: Request,
    task_id: str = Path(..., description="Task ID"),
    task_update: TaskUpdate = None
) -> Response:
    """Update an existing task"""
    updated_task = TaskService.update_task(task_id, task_update)

    if not updated_task:
        return JSONResponse({"error": "Task not found"}, status_code=404)

    return JSONResponse({
        "task": serialize_task(updated_task),
        "message": "Task updated successfully"
    })

@app.delete("/api/tasks/{task_id}")
def delete_task(request: Request, task_id: str = Path(..., description="Task ID")) -> Response:
    """Delete a task"""
    deleted = TaskService.delete_task(task_id)

    if not deleted:
        return JSONResponse({"error": "Task not found"}, status_code=404)

    return JSONResponse({"message": "Task deleted successfully"})

# Bulk operations
@app.post("/api/tasks/bulk")
def bulk_create_tasks(request: Request, tasks: List[TaskCreate]) -> Response:
    """Create multiple tasks"""
    if len(tasks) > 50:
        return JSONResponse({"error": "Maximum 50 tasks per bulk operation"}, status_code=400)

    created_tasks = []
    errors = []

    for i, task_data in enumerate(tasks):
        try:
            new_task = TaskService.create_task(task_data)
            created_tasks.append(serialize_task(new_task))
        except Exception as e:
            errors.append({"index": i, "error": str(e)})

    return JSONResponse({
        "created": created_tasks,
        "errors": errors,
        "summary": {
            "total_submitted": len(tasks),
            "successful": len(created_tasks),
            "failed": len(errors)
        }
    })

if __name__ == "__main__":
    app.listen(port=8000)

API Versioning

Implement API versioning strategies:

from catzilla import Catzilla, Request, Response, JSONResponse, Path
from catzilla.router import RouterGroup

app = Catzilla(production=False, show_banner=True, log_requests=True)

# URL path versioning with RouterGroups
v1_router = RouterGroup(prefix="/api/v1")
v2_router = RouterGroup(prefix="/api/v2")

@v1_router.get("/users/{user_id}")
def get_user_v1(request: Request, user_id: str = Path(..., description="User ID")) -> Response:
    """Version 1 of user API"""
    return JSONResponse({
        "id": user_id,
        "name": f"User {user_id}",
        "version": "1.0"
    })

@v2_router.get("/users/{user_id}")
def get_user_v2(request: Request, user_id: str = Path(..., description="User ID")) -> Response:
    """Version 2 of user API with additional fields"""
    return JSONResponse({
        "id": user_id,
        "name": f"User {user_id}",
        "email": f"user{user_id}@example.com",
        "profile": {
            "created_at": "2023-01-01",
            "last_login": "2024-01-01"
        },
        "version": "2.0"
    })

# Header-based versioning
def get_api_version(request: Request) -> str:
    """Extract API version from headers"""
    return request.headers.get("API-Version", "v1")

@app.get("/api/users/{user_id}")
def get_user_versioned(request: Request, user_id: str = Path(..., description="User ID")) -> Response:
    """Versioned user endpoint using headers"""
    version = get_api_version(request)

    base_data = {
        "id": user_id,
        "name": f"User {user_id}"
    }

    if version == "v2":
        return JSONResponse({
            **base_data,
            "email": f"user{user_id}@example.com",
            "profile": {
                "created_at": "2023-01-01",
                "last_login": "2024-01-01"
            },
            "version": "2.0"
        })
    else:
        return JSONResponse({
            **base_data,
            "version": "1.0"
        })

# Content negotiation versioning
@app.get("/api/data/{item_id}")
def get_data_with_content_negotiation(request: Request, item_id: str = Path(..., description="Item ID")) -> Response:
    """API versioning through content negotiation"""
    accept_header = request.headers.get("Accept", "application/json")

    base_data = {
        "id": item_id,
        "name": f"Item {item_id}"
    }

    if "application/vnd.api.v2+json" in accept_header:
        # Version 2 format
        return JSONResponse({
            "data": {
                **base_data,
                "metadata": {
                    "created_at": "2023-01-01",
                    "updated_at": "2024-01-01"
                }
            },
            "version": "2.0"
        })
    else:
        # Version 1 format (default)
        return JSONResponse({
            **base_data,
            "version": "1.0"
        })

# Register router groups
app.include_routes(v1_router)
app.include_routes(v2_router)

if __name__ == "__main__":
    app.listen(port=8000)

Error Handling Patterns

Comprehensive Error Handling

Structured error responses and logging:

from catzilla import Catzilla, Request, Response, JSONResponse, BaseModel, Field, Path
import uuid
import time
from enum import Enum
from typing import Optional, Dict, Any

app = Catzilla(production=False, show_banner=True, log_requests=True)

class ErrorCode(Enum):
    VALIDATION_ERROR = "VALIDATION_ERROR"
    NOT_FOUND = "NOT_FOUND"
    UNAUTHORIZED = "UNAUTHORIZED"
    FORBIDDEN = "FORBIDDEN"
    INTERNAL_ERROR = "INTERNAL_ERROR"
    RATE_LIMITED = "RATE_LIMITED"

class APIError(Exception):
    def __init__(self, code: ErrorCode, message: str, details: dict = None, status_code: int = 400):
        self.code = code
        self.message = message
        self.details = details or {}
        self.status_code = status_code
        super().__init__(self.message)

class UserCreate(BaseModel):
    name: str = Field(min_length=2, max_length=50, description="User name")
    email: str = Field(regex=r'^[^@]+@[^@]+\.[^@]+$', description="Email address")

# Custom exception handlers
def validation_error_handler(request: Request, exc: Exception) -> Response:
    """Handle validation errors"""
    return JSONResponse({
        "error": "validation_error",
        "message": str(exc),
        "status_code": 422,
        "request_id": f"req_{int(time.time() * 1000)}"
    }, status_code=422)

def api_error_handler(request: Request, exc: APIError) -> Response:
    """Handle custom API errors"""
    return JSONResponse({
        "error": exc.code.value,
        "message": exc.message,
        "details": exc.details,
        "status_code": exc.status_code,
        "request_id": f"req_{int(time.time() * 1000)}"
    }, status_code=exc.status_code)

# Register exception handlers
app.set_exception_handler(APIError, api_error_handler)
app.set_exception_handler(ValueError, validation_error_handler)

# Custom 404 handler
@app.set_not_found_handler
def custom_404_handler(request: Request) -> Response:
    """Custom 404 handler"""
    return JSONResponse({
        "error": "not_found",
        "message": f"Endpoint {request.path} not found",
        "method": request.method,
        "path": request.path,
        "available_endpoints": [
            "/api/users/{user_id}",
            "/api/validation-error",
            "/api/server-error",
            "/api/protected"
        ]
    }, status_code=404)

# Custom 500 handler
@app.set_internal_error_handler
def custom_500_handler(request: Request, exc: Exception) -> Response:
    """Custom internal server error handler"""
    if app.production:
        return JSONResponse({
            "error": "internal_server_error",
            "message": "An internal error occurred",
            "status_code": 500,
            "request_id": f"req_{int(time.time() * 1000)}"
        }, status_code=500)
    else:
        return JSONResponse({
            "error": "internal_server_error",
            "message": str(exc),
            "type": type(exc).__name__,
            "status_code": 500,
            "path": request.path,
            "method": request.method
        }, status_code=500)

# Routes that demonstrate error handling
@app.get("/api/users/{user_id}")
def get_user(request: Request, user_id: str = Path(..., description="User ID")) -> Response:
    """Get user - demonstrates NotFoundError"""
    if user_id == "999":
        raise APIError(
            ErrorCode.NOT_FOUND,
            f"User {user_id} not found",
            {"resource": "User", "resource_id": user_id},
            status_code=404
        )

    return JSONResponse({
        "id": user_id,
        "name": f"User {user_id}",
        "email": f"user{user_id}@example.com"
    })

@app.post("/api/users")
def create_user(request: Request, user_data: UserCreate) -> Response:
    """Create user - demonstrates auto-validation with BaseModel"""
    return JSONResponse({
        "message": "User created successfully",
        "user": {
            "id": str(uuid.uuid4()),
            "name": user_data.name,
            "email": user_data.email
        }
    }, status_code=201)

@app.get("/api/validation-error")
def trigger_validation_error(request: Request) -> Response:
    """Endpoint to test validation error handling"""
    raise APIError(
        ErrorCode.VALIDATION_ERROR,
        "This is a test validation error",
        {"field": "test_field"},
        status_code=422
    )

@app.get("/api/server-error")
def trigger_server_error(request: Request) -> Response:
    """Endpoint to test server error handling"""
    result = 1 / 0  # This will trigger a ZeroDivisionError
    return JSONResponse({"result": result})

@app.get("/api/protected")
def protected_endpoint(request: Request) -> Response:
    """Protected endpoint - demonstrates authentication error"""
    auth_header = request.headers.get("Authorization", "")

    if not auth_header.startswith("Bearer "):
        raise APIError(
            ErrorCode.UNAUTHORIZED,
            "Authentication required",
            {"required_header": "Authorization: Bearer <token>"},
            status_code=401
        )

    token = auth_header[7:]
    if token != "valid-token":
        raise APIError(
            ErrorCode.UNAUTHORIZED,
            "Invalid token",
            {"token": token},
            status_code=401
        )

    return JSONResponse({
        "message": "Access granted to protected resource",
        "user": "authenticated_user"
    })

@app.get("/health")
def health_check(request: Request) -> Response:
    """Health check with error handling info"""
    return JSONResponse({
        "status": "healthy",
        "error_handling": {
            "production_mode": app.production,
            "custom_handlers": ["APIError", "ValueError"],
            "custom_404": True,
            "custom_500": True
        }
    })

if __name__ == "__main__":
    app.listen(port=8000)

Rate Limiting and Throttling

Implement rate limiting for API protection:

from catzilla import Catzilla, Request, Response, JSONResponse, Path
import time
from collections import defaultdict
from typing import Dict, Tuple, Optional

app = Catzilla(production=False, show_banner=True, log_requests=True)

class RateLimiter:
    def __init__(self):
        self.requests: Dict[str, list] = defaultdict(list)
        self.limits = {
            "default": {"count": 100, "window": 3600},  # 100 requests per hour
            "premium": {"count": 1000, "window": 3600},  # 1000 requests per hour
            "admin": {"count": 10000, "window": 3600}    # 10000 requests per hour
        }

    def is_allowed(self, identifier: str, tier: str = "default") -> Tuple[bool, dict]:
        """Check if request is allowed"""
        now = time.time()
        limit_config = self.limits.get(tier, self.limits["default"])
        window_start = now - limit_config["window"]

        # Clean old requests
        self.requests[identifier] = [
            req_time for req_time in self.requests[identifier]
            if req_time > window_start
        ]

        current_count = len(self.requests[identifier])
        allowed = current_count < limit_config["count"]

        if allowed:
            self.requests[identifier].append(now)

        return allowed, {
            "limit": limit_config["count"],
            "remaining": limit_config["count"] - current_count - (1 if allowed else 0),
            "reset_time": window_start + limit_config["window"],
            "window": limit_config["window"]
        }

rate_limiter = RateLimiter()

def rate_limit_middleware(tier: str = "default"):
    """Rate limiting middleware factory"""
    def middleware_func(request: Request) -> Optional[Response]:
        # Identify client (could use IP, user ID, API key, etc.)
        client_id = request.headers.get("X-Forwarded-For", "127.0.0.1")
        api_key = request.headers.get("X-API-Key")

        # Determine tier based on API key
        if api_key == "premium-key":
            tier = "premium"
        elif api_key == "admin-key":
            tier = "admin"
        else:
            tier = "default"

        # Check rate limit
        allowed, limit_info = rate_limiter.is_allowed(client_id, tier)

        if not allowed:
            return JSONResponse({
                "error": "Rate limit exceeded",
                "limit": limit_info["limit"],
                "window": limit_info["window"],
                "retry_after": int(limit_info["reset_time"] - time.time())
            }, status_code=429, headers={
                "X-RateLimit-Limit": str(limit_info["limit"]),
                "X-RateLimit-Remaining": "0",
                "X-RateLimit-Reset": str(int(limit_info["reset_time"])),
                "Retry-After": str(int(limit_info["reset_time"] - time.time()))
            })

        # Add rate limit info to request context
        if not hasattr(request, 'context'):
            request.context = {}
        request.context['rate_limit'] = limit_info

        return None  # Continue to next middleware/handler

    return middleware_func

# Apply rate limiting to different endpoint groups
@app.get("/api/public/data", middleware=[rate_limit_middleware("default")])
def public_data_endpoint(request: Request) -> Response:
    """Public endpoint with default rate limiting"""
    rate_limit = getattr(request, 'context', {}).get('rate_limit', {})
    response = JSONResponse({"data": "public information"})

    # Add rate limit headers to response
    if rate_limit:
        response.headers.update({
            "X-RateLimit-Limit": str(rate_limit["limit"]),
            "X-RateLimit-Remaining": str(rate_limit["remaining"]),
            "X-RateLimit-Reset": str(int(rate_limit["reset_time"]))
        })

    return response

@app.get("/api/premium/analytics", middleware=[rate_limit_middleware("premium")])
def premium_analytics_endpoint(request: Request) -> Response:
    """Premium endpoint with higher rate limits"""
    rate_limit = getattr(request, 'context', {}).get('rate_limit', {})
    response = JSONResponse({"analytics": "premium data"})

    if rate_limit:
        response.headers.update({
            "X-RateLimit-Limit": str(rate_limit["limit"]),
            "X-RateLimit-Remaining": str(rate_limit["remaining"]),
            "X-RateLimit-Reset": str(int(rate_limit["reset_time"]))
        })

    return response

@app.get("/api/rate-limit-status")
def rate_limit_status(request: Request) -> Response:
    """Get current rate limit status"""
    client_id = request.headers.get("X-Forwarded-For", "127.0.0.1")

    status = {}
    for tier_name, tier_config in rate_limiter.limits.items():
        allowed, limit_info = rate_limiter.is_allowed(client_id, tier_name)
        status[tier_name] = {
            "allowed": allowed,
            "limit": limit_info["limit"],
            "remaining": limit_info["remaining"],
            "reset_time": limit_info["reset_time"]
        }

    return JSONResponse({
        "client_id": client_id,
        "rate_limits": status
    })

if __name__ == "__main__":
    app.listen(port=8000)

Performance Monitoring

Monitor API performance and health:

from catzilla import Catzilla, Request, Response, JSONResponse
import time
import psutil
from collections import deque
from dataclasses import dataclass
from typing import Dict, Any

app = Catzilla(production=False, show_banner=True, log_requests=True)

@dataclass
class PerformanceMetrics:
    request_count: int = 0
    error_count: int = 0
    total_response_time: float = 0.0
    start_time: float = None

    def __post_init__(self):
        if self.start_time is None:
            self.start_time = time.time()

class PerformanceMonitor:
    def __init__(self, max_samples: int = 1000):
        self.request_times = deque(maxlen=max_samples)
        self.metrics = PerformanceMetrics()

    def record_request(self, duration: float, status_code: int):
        """Record request metrics"""
        self.request_times.append(duration)
        self.metrics.request_count += 1
        self.metrics.total_response_time += duration

        if status_code >= 400:
            self.metrics.error_count += 1

    def get_metrics(self) -> Dict[str, Any]:
        """Get current performance metrics"""
        if not self.request_times:
            return {"message": "No requests recorded yet"}

        avg_response_time = sum(self.request_times) / len(self.request_times)
        uptime = time.time() - self.metrics.start_time

        # System metrics
        try:
            cpu_percent = psutil.cpu_percent(interval=None)
            memory = psutil.virtual_memory()
        except:
            cpu_percent = 0.0
            memory = None

        return {
            "performance": {
                "avg_response_time_ms": round(avg_response_time * 1000, 3),
                "total_requests": self.metrics.request_count,
                "error_rate_percent": round((self.metrics.error_count / max(self.metrics.request_count, 1)) * 100, 2),
                "requests_per_second": round(self.metrics.request_count / max(uptime, 1), 2),
                "uptime_seconds": round(uptime, 2)
            },
            "system": {
                "cpu_percent": cpu_percent,
                "memory_percent": memory.percent if memory else 0,
                "memory_available_mb": round(memory.available / 1024 / 1024, 2) if memory else 0
            }
        }

performance_monitor = PerformanceMonitor()

@app.middleware(priority=5, pre_route=True, name="performance_monitor")
def performance_monitoring_middleware(request: Request) -> None:
    """Performance monitoring middleware"""
    if not hasattr(request, 'context'):
        request.context = {}
    request.context['start_time'] = time.time()
    return None

@app.middleware(priority=5, pre_route=False, post_route=True, name="performance_recorder")
def performance_recording_middleware(request: Request) -> None:
    """Record performance metrics after request"""
    start_time = getattr(request, 'context', {}).get('start_time')
    if start_time:
        duration = time.time() - start_time
        # Simulate status code (in real implementation, get from response)
        status_code = 200  # Default success
        performance_monitor.record_request(duration, status_code)
    return None

@app.get("/api/health")
def health_check(request: Request) -> Response:
    """Comprehensive health check endpoint"""
    metrics = performance_monitor.get_metrics()

    # Determine health status
    health_status = "healthy"
    issues = []

    if "performance" in metrics:
        if metrics["performance"]["avg_response_time_ms"] > 1000:
            issues.append("High response time")
            health_status = "degraded"

        if metrics["performance"]["error_rate_percent"] > 5:
            issues.append("High error rate")
            health_status = "degraded"

        if metrics["system"]["cpu_percent"] > 80:
            issues.append("High CPU usage")
            health_status = "degraded"

        if metrics["system"]["memory_percent"] > 85:
            issues.append("High memory usage")
            health_status = "degraded"

    return JSONResponse({
        "status": health_status,
        "timestamp": time.time(),
        "issues": issues,
        "metrics": metrics
    })

@app.get("/api/metrics")
def get_metrics(request: Request) -> Response:
    """Get detailed performance metrics"""
    return JSONResponse(performance_monitor.get_metrics())

@app.get("/api/slow-endpoint")
def slow_endpoint(request: Request) -> Response:
    """Endpoint that simulates slow processing"""
    time.sleep(0.5)  # Simulate slow operation
    return JSONResponse({"message": "This endpoint is intentionally slow"})

@app.get("/api/error-endpoint")
def error_endpoint(request: Request) -> Response:
    """Endpoint that returns an error for testing"""
    return JSONResponse({"error": "This is a test error"}, status_code=500)

@app.get("/api/fast-endpoint")
def fast_endpoint(request: Request) -> Response:
    """Fast endpoint for comparison"""
    return JSONResponse({"message": "This endpoint is fast", "timestamp": time.time()})

if __name__ == "__main__":
    app.listen(port=8000)

These real-world recipes provide production-ready patterns that you can adapt and extend for your specific use cases with Catzilla. All examples use actual Catzilla APIs and demonstrate best practices for building high-performance web applications.