Streaming
Catzilla provides advanced streaming capabilities for real-time communication, including Server-Sent Events (SSE), chunked responses, and efficient connection management for high-performance streaming applications.
Overview
Catzilla’s streaming system provides:
Server-Sent Events (SSE) - Real-time data streaming to browsers
Chunked Transfer Encoding - Efficient streaming of large responses
Connection Management - Active connection tracking and cleanup
Real-Time Communication - Live data feeds and notifications
Memory Efficient - Stream large datasets without memory overflow
Auto-Reconnection - Client-side reconnection support
Quick Start
Basic Server-Sent Events
Create a simple SSE endpoint:
from catzilla import Catzilla, Request, Response, StreamingResponse
import json
import time
app = Catzilla()
@app.get("/events")
def sse_endpoint(request: Request) -> Response:
"""Stream server-sent events for real-time updates"""
def generate_sse():
for i in range(20):
# Format according to SSE specification
yield f"id: {i}\n"
yield f"data: {{'count': {i}, 'timestamp': {time.time()}}}\n\n"
time.sleep(0.5) # Add a delay to simulate real-time updates
return StreamingResponse(
generate_sse(),
content_type="text/event-stream"
)
@app.get("/")
def home(request: Request) -> Response:
return JSONResponse({"message": "Hello with streaming!"})
if __name__ == "__main__":
print("🚀 Starting Catzilla streaming example...")
print("Try: curl -N http://localhost:8000/events")
app.listen(port=8000)
Real-Time Data Streaming
Stream live data updates:
from catzilla import Catzilla, Request, Response, StreamingResponse
import json
import time
from datetime import datetime
app = Catzilla()
def generate_realtime_data():
"""Generate real-time data stream"""
count = 0
for i in range(10):
count += 1
data = {
"timestamp": datetime.now().isoformat(),
"count": count,
"value": count * 1.5,
"status": "active"
}
# Format as simple JSON lines
yield f"{json.dumps(data)}\n"
time.sleep(1)
# Final message
yield f'{{"status": "completed", "total_items": {count}}}\n'
@app.get("/stream/realtime")
def stream_realtime_data(request: Request) -> Response:
"""Stream real-time data"""
return StreamingResponse(
generate_realtime_data(),
content_type="text/plain",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
if __name__ == "__main__":
print("🚀 Starting real-time streaming example...")
print("Try: curl -N http://localhost:8000/stream/realtime")
app.listen(port=8000)
Advanced Streaming Patterns
Chunked File Streaming
Stream large files efficiently:
from catzilla import Catzilla, Request, Response, StreamingResponse, JSONResponse
import os
from pathlib import Path
app = Catzilla()
def file_streamer(file_path, chunk_size=8192):
"""Stream file in chunks"""
with open(file_path, "rb") as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
yield chunk
@app.get("/download/{filename}")
def stream_file_download(request: Request) -> Response:
"""Stream large file download"""
filename = request.path_params["filename"]
file_path = Path("uploads") / filename
if not file_path.exists():
return JSONResponse({"error": "File not found"}, status_code=404)
# Get file size for Content-Length header
file_size = file_path.stat().st_size
return StreamingResponse(
file_streamer(file_path),
content_type="application/octet-stream",
headers={
"Content-Disposition": f"attachment; filename={filename}",
"Content-Length": str(file_size)
}
)
if __name__ == "__main__":
# Create uploads directory if it doesn't exist
Path("uploads").mkdir(exist_ok=True)
print("🚀 Starting file streaming server...")
print("Try: curl http://localhost:8000/download/sample.txt")
app.listen(port=8000)
Data Processing Pipeline
Stream processed data in real-time:
from catzilla import Catzilla, Request, Response, StreamingResponse
import json
import time
from datetime import datetime
app = Catzilla()
def process_and_stream():
"""Process data and stream results"""
# Simulate large dataset processing
total_items = 100
for i in range(total_items):
# Simulate data processing
processed_item = {
"item_id": i,
"processed_at": datetime.now().isoformat(),
"result": f"Processed item {i}",
"progress": round((i + 1) / total_items * 100, 2)
}
# Stream each processed item as JSON lines
yield f"{json.dumps(processed_item)}\n"
# Simulate processing time
time.sleep(0.01)
# Send completion notification
completion = {
"event": "completed",
"total_processed": total_items,
"completed_at": datetime.now().isoformat()
}
yield f"{json.dumps(completion)}\n"
@app.post("/process-stream")
def stream_data_processing(request: Request) -> Response:
"""Stream data processing results"""
return StreamingResponse(
process_and_stream(),
content_type="application/x-ndjson",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
if __name__ == "__main__":
print("🚀 Starting data processing streaming server...")
print("Try: curl -X POST http://localhost:8000/process-stream")
app.listen(port=8000)
Connection Management
Simple Connection Tracking
Track streaming connections with basic monitoring:
from catzilla import Catzilla, Request, Response, StreamingResponse
import json
import time
from datetime import datetime
from collections import defaultdict
app = Catzilla()
# Simple connection tracking
active_connections = defaultdict(dict)
connection_count = 0
def generate_connection_status():
"""Generate connection status updates"""
global connection_count
for i in range(10):
status = {
"timestamp": datetime.now().isoformat(),
"active_connections": len(active_connections),
"total_connections": connection_count,
"update_number": i + 1
}
yield f"data: {json.dumps(status)}\n\n"
time.sleep(2)
@app.get("/stream/status")
def stream_connection_status(request: Request) -> Response:
"""Stream connection status updates"""
global connection_count
# Track this connection
connection_id = f"conn_{connection_count}"
connection_count += 1
active_connections[connection_id] = {
"connected_at": datetime.now(),
"client_ip": request.client.host if hasattr(request, 'client') else "unknown"
}
print(f"➕ New connection: {connection_id}")
try:
response = StreamingResponse(
generate_connection_status(),
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Connection-ID": connection_id
}
)
return response
finally:
# Note: In real implementation, cleanup would happen on disconnect
pass
if __name__ == "__main__":
print("🚀 Starting connection tracking server...")
print("Try: curl -N http://localhost:8000/stream/status")
app.listen(port=8000)
Broadcasting Example
Simple message broadcasting:
from catzilla import Catzilla, Request, Response, StreamingResponse
import json
import time
from datetime import datetime
from threading import Thread, Lock
import queue
app = Catzilla()
# Simple broadcasting system
message_queues = {}
queues_lock = Lock()
def broadcast_message(message):
"""Broadcast message to all connected clients"""
with queues_lock:
for connection_id, msg_queue in message_queues.items():
try:
msg_queue.put_nowait(message)
except queue.Full:
print(f"Queue full for connection {connection_id}")
def periodic_broadcaster():
"""Send periodic broadcast messages"""
counter = 0
while True:
message = {
"type": "broadcast",
"id": counter,
"timestamp": datetime.now().isoformat(),
"message": f"Broadcast message {counter}"
}
broadcast_message(message)
counter += 1
time.sleep(5)
# Start background broadcaster
Thread(target=periodic_broadcaster, daemon=True).start()
def stream_broadcasts(connection_id):
"""Stream broadcast messages for a connection"""
msg_queue = queue.Queue(maxsize=100)
with queues_lock:
message_queues[connection_id] = msg_queue
try:
while True:
try:
# Get message with timeout
message = msg_queue.get(timeout=1.0)
yield f"data: {json.dumps(message)}\n\n"
except queue.Empty:
# Send heartbeat
heartbeat = {"type": "heartbeat", "timestamp": datetime.now().isoformat()}
yield f"data: {json.dumps(heartbeat)}\n\n"
finally:
with queues_lock:
message_queues.pop(connection_id, None)
@app.get("/broadcast-stream")
def join_broadcast_stream(request: Request) -> Response:
"""Join broadcast stream"""
import uuid
connection_id = str(uuid.uuid4())
print(f"➕ New broadcast subscriber: {connection_id}")
return StreamingResponse(
stream_broadcasts(connection_id),
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Connection-ID": connection_id
}
)
@app.post("/broadcast")
def send_broadcast(request: Request) -> Response:
"""Send message to all connected clients"""
# Get message from request body (simplified for example)
message = {
"type": "manual_broadcast",
"message": "Hello from server!",
"timestamp": datetime.now().isoformat()
}
# Broadcast to all connected clients
broadcast_message(message)
return Response(
json.dumps({
"status": "broadcast_sent",
"message": message,
"recipients": len(message_queues)
}),
content_type="application/json"
)
if __name__ == "__main__":
print("🚀 Starting broadcasting server...")
print("Try: curl -N http://localhost:8000/broadcast-stream")
print("Send: curl -X POST http://localhost:8000/broadcast")
app.listen(port=8000)
Real-Time Applications
Simple Chat System
Build a basic chat with streaming:
from catzilla import Catzilla, Request, Response, StreamingResponse
import json
import time
from datetime import datetime
from threading import Lock
import queue
app = Catzilla()
# Simple chat room
chat_clients = {}
chat_lock = Lock()
message_history = []
def add_chat_client(client_id, client_name):
"""Add client to chat room"""
client_queue = queue.Queue(maxsize=100)
with chat_lock:
chat_clients[client_id] = {
"name": client_name,
"queue": client_queue,
"joined_at": datetime.now()
}
# Send join notification
join_message = {
"type": "user_joined",
"user": client_name,
"timestamp": datetime.now().isoformat(),
"clients_count": len(chat_clients)
}
broadcast_chat_message(join_message)
return client_queue
def remove_chat_client(client_id):
"""Remove client from chat room"""
with chat_lock:
client = chat_clients.pop(client_id, None)
if client:
leave_message = {
"type": "user_left",
"user": client["name"],
"timestamp": datetime.now().isoformat(),
"clients_count": len(chat_clients)
}
broadcast_chat_message(leave_message)
def broadcast_chat_message(message):
"""Broadcast message to all chat clients"""
message_history.append(message)
# Keep only last 50 messages
if len(message_history) > 50:
message_history[:] = message_history[-50:]
with chat_lock:
for client_id, client in chat_clients.items():
try:
client["queue"].put_nowait(message)
except queue.Full:
print(f"Message queue full for client {client_id}")
def stream_chat_messages(client_id, client_name):
"""Stream chat messages for a client"""
client_queue = add_chat_client(client_id, client_name)
try:
# Send recent message history
for msg in message_history[-10:]: # Last 10 messages
yield f"data: {json.dumps(msg)}\n\n"
# Stream new messages
while True:
try:
message = client_queue.get(timeout=1.0)
yield f"data: {json.dumps(message)}\n\n"
except queue.Empty:
# Send heartbeat
heartbeat = {"type": "heartbeat", "timestamp": datetime.now().isoformat()}
yield f"data: {json.dumps(heartbeat)}\n\n"
finally:
remove_chat_client(client_id)
@app.get("/chat/stream")
def join_chat_stream(request: Request) -> Response:
"""Join chat stream"""
import uuid
client_id = str(uuid.uuid4())
client_name = request.query_params.get("name", f"User_{client_id[:8]}")
print(f"🎯 Chat client joined: {client_name} ({client_id})")
return StreamingResponse(
stream_chat_messages(client_id, client_name),
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
@app.post("/chat/send")
def send_chat_message(request: Request) -> Response:
"""Send message to chat room"""
# Simple message sending (in real app, would parse JSON from body)
sender = request.query_params.get("sender", "Anonymous")
message_text = request.query_params.get("message", "Hello!")
chat_message = {
"type": "chat_message",
"sender": sender,
"message": message_text,
"timestamp": datetime.now().isoformat()
}
broadcast_chat_message(chat_message)
return Response(
json.dumps({"status": "message_sent", "message": chat_message}),
content_type="application/json"
)
if __name__ == "__main__":
print("🚀 Starting chat server...")
print("Join chat: curl -N 'http://localhost:8000/chat/stream?name=Alice'")
print("Send message: curl -X POST 'http://localhost:8000/chat/send?sender=Alice&message=Hello'")
app.listen(port=8000)
Live Analytics Dashboard
Stream analytics data for dashboards:
from catzilla import Catzilla, Request, Response, StreamingResponse
import json
import time
import random
import queue
from datetime import datetime
from threading import Thread, Lock
app = Catzilla()
# Simple analytics data
analytics_data = {
"page_views": 0,
"unique_visitors": set(),
"api_calls": 0,
"errors": 0
}
analytics_lock = Lock()
analytics_subscribers = {}
def track_page_view(visitor_id):
"""Track page view"""
with analytics_lock:
analytics_data["page_views"] += 1
analytics_data["unique_visitors"].add(visitor_id)
broadcast_analytics_update()
def track_api_call():
"""Track API call"""
with analytics_lock:
analytics_data["api_calls"] += 1
broadcast_analytics_update()
def track_error():
"""Track error"""
with analytics_lock:
analytics_data["errors"] += 1
broadcast_analytics_update()
def broadcast_analytics_update():
"""Broadcast analytics update"""
with analytics_lock:
update = {
"page_views": analytics_data["page_views"],
"unique_visitors": len(analytics_data["unique_visitors"]),
"api_calls": analytics_data["api_calls"],
"errors": analytics_data["errors"],
"timestamp": datetime.now().isoformat()
}
for subscriber_id, queue in analytics_subscribers.items():
try:
queue.put_nowait(update)
except queue.Full:
print(f"Analytics queue full for subscriber {subscriber_id}")
def simulate_analytics_data():
"""Simulate incoming analytics data"""
import uuid
counter = 0
while True:
counter += 1
# Simulate random events
if counter % 3 == 0:
track_page_view(str(uuid.uuid4()))
if counter % 2 == 0:
track_api_call()
if counter % 10 == 0:
track_error()
time.sleep(2)
# Start analytics simulation
Thread(target=simulate_analytics_data, daemon=True).start()
def stream_analytics_updates(subscriber_id):
"""Stream analytics updates"""
subscriber_queue = queue.Queue(maxsize=50)
analytics_subscribers[subscriber_id] = subscriber_queue
try:
# Send current state first
current_state = {
"page_views": analytics_data["page_views"],
"unique_visitors": len(analytics_data["unique_visitors"]),
"api_calls": analytics_data["api_calls"],
"errors": analytics_data["errors"],
"timestamp": datetime.now().isoformat()
}
yield f"data: {json.dumps(current_state)}\n\n"
# Stream updates
while True:
try:
update = subscriber_queue.get(timeout=5.0)
yield f"data: {json.dumps(update)}\n\n"
except queue.Empty:
# Send heartbeat
heartbeat = {"type": "heartbeat", "timestamp": datetime.now().isoformat()}
yield f"data: {json.dumps(heartbeat)}\n\n"
finally:
analytics_subscribers.pop(subscriber_id, None)
@app.get("/analytics-stream")
def analytics_dashboard_stream(request: Request) -> Response:
"""Stream analytics for dashboard"""
import uuid
subscriber_id = str(uuid.uuid4())
print(f"📊 Analytics subscriber connected: {subscriber_id}")
return StreamingResponse(
stream_analytics_updates(subscriber_id),
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
if __name__ == "__main__":
print("🚀 Starting analytics dashboard server...")
print("Try: curl -N http://localhost:8000/analytics-stream")
app.listen(port=8000)
Performance Optimization
Memory Efficient Streaming
Stream large datasets without memory issues:
from catzilla import Catzilla, Request, Response, StreamingResponse
import json
import time
from datetime import datetime
app = Catzilla()
def efficient_data_stream():
"""Generate large dataset efficiently"""
batch_size = 100
total_records = 10000 # Smaller for demo
for batch_start in range(0, total_records, batch_size):
# Generate batch of records
batch = []
for i in range(batch_start, min(batch_start + batch_size, total_records)):
record = {
"id": i,
"data": f"Record {i}",
"timestamp": datetime.now().isoformat()
}
batch.append(record)
# Stream batch as single chunk
chunk = {
"batch": batch,
"batch_start": batch_start,
"batch_size": len(batch),
"total_records": total_records,
"progress": round((batch_start + len(batch)) / total_records * 100, 2)
}
yield f"{json.dumps(chunk)}\n"
# Small delay to prevent overwhelming the client
time.sleep(0.01)
@app.get("/large-dataset")
def stream_large_dataset(request: Request) -> Response:
"""Stream large dataset efficiently"""
return StreamingResponse(
efficient_data_stream(),
content_type="application/x-ndjson",
headers={
"Cache-Control": "no-cache",
"Transfer-Encoding": "chunked"
}
)
if __name__ == "__main__":
print("🚀 Starting large dataset streaming server...")
print("Try: curl -N http://localhost:8000/large-dataset")
app.listen(port=8000)
Connection Health Monitoring
Monitor connection health with heartbeats:
from catzilla import Catzilla, Request, Response, StreamingResponse
import json
import time
from datetime import datetime
app = Catzilla()
def monitored_stream():
"""Stream with health checks"""
last_ping = time.time()
ping_interval = 10 # Ping every 10 seconds
counter = 0
while True:
current_time = time.time()
# Send ping if needed
if current_time - last_ping >= ping_interval:
ping_message = {
"type": "ping",
"timestamp": datetime.now().isoformat(),
"server_time": current_time
}
yield f"event: ping\n"
yield f"data: {json.dumps(ping_message)}\n\n"
last_ping = current_time
# Send regular data
data_message = {
"type": "data",
"counter": counter,
"timestamp": datetime.now().isoformat(),
"status": "healthy"
}
yield f"data: {json.dumps(data_message)}\n\n"
counter += 1
time.sleep(1)
@app.get("/health-monitored-stream")
def health_monitored_stream(request: Request) -> Response:
"""Stream with connection health monitoring"""
return StreamingResponse(
monitored_stream(),
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Disable nginx buffering
}
)
if __name__ == "__main__":
print("🚀 Starting health monitored streaming server...")
print("Try: curl -N http://localhost:8000/health-monitored-stream")
app.listen(port=8000)
Best Practices
Client-Side JavaScript
Example client-side code for consuming streams:
// Basic SSE client
const eventSource = new EventSource('/events');
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log('Received:', data);
};
eventSource.onerror = function(event) {
console.error('SSE error:', event);
};
// Auto-reconnection wrapper
function createReconnectingEventSource(url, options = {}) {
let eventSource;
let reconnectInterval = options.reconnectInterval || 3000;
function connect() {
eventSource = new EventSource(url);
eventSource.onopen = function() {
console.log('Connected to stream');
};
eventSource.onmessage = function(event) {
if (options.onMessage) {
options.onMessage(JSON.parse(event.data));
}
};
eventSource.onerror = function() {
console.log('Connection lost, reconnecting...');
eventSource.close();
setTimeout(connect, reconnectInterval);
};
}
connect();
return eventSource;
}
Performance Guidelines
# ✅ Good: Use generator functions for streaming
def good_stream():
for i in range(1000):
yield f"data: {i}\n"
time.sleep(0.1) # Control flow rate
# ❌ Avoid: Creating all data in memory
def bad_stream():
all_data = [f"data: {i}\n" for i in range(1000)] # Memory intensive
for item in all_data:
yield item
# ✅ Good: Use appropriate chunk sizes
chunk_size = 8192 # 8KB chunks for files
batch_size = 100 # 100 records for data
# ✅ Good: Add delays to prevent overwhelming clients
def controlled_stream():
for i in range(100):
yield f"data: {i}\n"
time.sleep(0.01) # Small delay
# ✅ Good: Use proper content types
# For SSE: content_type="text/event-stream"
# For JSON Lines: content_type="application/x-ndjson"
# For binary: content_type="application/octet-stream"
This streaming system enables real-time communication and efficient data transfer for modern web applications that require live updates and interactive features.