"""FastAPI backend for LLM Council.""" from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel from typing import List, Dict, Any import uuid import json import asyncio import os from . import storage from .council import run_full_council, generate_conversation_title, stage1_collect_responses, stage2_collect_rankings, stage3_synthesize_final, calculate_aggregate_rankings from . import capture # Enable JSON capture for iOS development CAPTURE_ENABLED = os.getenv("CAPTURE_JSON", "false").lower() == "true" app = FastAPI(title="LLM Council API") # Enable CORS for local development app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:5173", "http://localhost:3000"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class CreateConversationRequest(BaseModel): """Request to create a new conversation.""" pass class SendMessageRequest(BaseModel): """Request to send a message in a conversation.""" content: str class ConversationMetadata(BaseModel): """Conversation metadata for list view.""" id: str created_at: str title: str message_count: int class Conversation(BaseModel): """Full conversation with all messages.""" id: str created_at: str title: str messages: List[Dict[str, Any]] @app.get("/") async def root(): """Health check endpoint.""" return {"status": "ok", "service": "LLM Council API"} @app.get("/api/conversations", response_model=List[ConversationMetadata]) async def list_conversations(): """List all conversations (metadata only).""" result = storage.list_conversations() if CAPTURE_ENABLED: capture.capture_conversation_list(result) return result @app.post("/api/conversations", response_model=Conversation) async def create_conversation(request: CreateConversationRequest): """Create a new conversation.""" conversation_id = str(uuid.uuid4()) conversation = storage.create_conversation(conversation_id) return conversation @app.get("/api/conversations/{conversation_id}", response_model=Conversation) async def get_conversation(conversation_id: str): """Get a specific conversation with all its messages.""" conversation = storage.get_conversation(conversation_id) if conversation is None: raise HTTPException(status_code=404, detail="Conversation not found") if CAPTURE_ENABLED: capture.capture_conversation(conversation) return conversation @app.post("/api/conversations/{conversation_id}/message") async def send_message(conversation_id: str, request: SendMessageRequest): """ Send a message and run the 3-stage council process. Returns the complete response with all stages. """ # Check if conversation exists conversation = storage.get_conversation(conversation_id) if conversation is None: raise HTTPException(status_code=404, detail="Conversation not found") # Check if this is the first message is_first_message = len(conversation["messages"]) == 0 # Add user message storage.add_user_message(conversation_id, request.content) # If this is the first message, generate a title if is_first_message: title = await generate_conversation_title(request.content) storage.update_conversation_title(conversation_id, title) # Run the 3-stage council process stage1_results, stage2_results, stage3_result, metadata = await run_full_council( request.content ) # Add assistant message with all stages storage.add_assistant_message( conversation_id, stage1_results, stage2_results, stage3_result ) response = { "stage1": stage1_results, "stage2": stage2_results, "stage3": stage3_result, "metadata": metadata } # Capture JSON for iOS development if enabled if CAPTURE_ENABLED: capture.capture_full_response(stage1_results, stage2_results, stage3_result, metadata) return response @app.post("/api/conversations/{conversation_id}/message/stream") async def send_message_stream(conversation_id: str, request: SendMessageRequest): """ Send a message and stream the 3-stage council process. Returns Server-Sent Events as each stage completes. """ # Check if conversation exists conversation = storage.get_conversation(conversation_id) if conversation is None: raise HTTPException(status_code=404, detail="Conversation not found") # Check if this is the first message is_first_message = len(conversation["messages"]) == 0 async def event_generator(): captured_events = [] # For iOS development capture try: # Add user message storage.add_user_message(conversation_id, request.content) # Start title generation in parallel (don't await yet) title_task = None if is_first_message: title_task = asyncio.create_task(generate_conversation_title(request.content)) # Stage 1: Collect responses event1_start = {'type': 'stage1_start'} captured_events.append(event1_start) yield f"data: {json.dumps(event1_start)}\n\n" stage1_results = await stage1_collect_responses(request.content) event1_complete = {'type': 'stage1_complete', 'data': stage1_results} captured_events.append(event1_complete) yield f"data: {json.dumps(event1_complete)}\n\n" # Stage 2: Collect rankings event2_start = {'type': 'stage2_start'} captured_events.append(event2_start) yield f"data: {json.dumps(event2_start)}\n\n" stage2_results, label_to_model = await stage2_collect_rankings(request.content, stage1_results) aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model) event2_complete = {'type': 'stage2_complete', 'data': stage2_results, 'metadata': {'label_to_model': label_to_model, 'aggregate_rankings': aggregate_rankings}} captured_events.append(event2_complete) yield f"data: {json.dumps(event2_complete)}\n\n" # Stage 3: Synthesize final answer event3_start = {'type': 'stage3_start'} captured_events.append(event3_start) yield f"data: {json.dumps(event3_start)}\n\n" stage3_result = await stage3_synthesize_final(request.content, stage1_results, stage2_results) event3_complete = {'type': 'stage3_complete', 'data': stage3_result} captured_events.append(event3_complete) yield f"data: {json.dumps(event3_complete)}\n\n" # Wait for title generation if it was started if title_task: title = await title_task storage.update_conversation_title(conversation_id, title) title_event = {'type': 'title_complete', 'data': {'title': title}} captured_events.append(title_event) yield f"data: {json.dumps(title_event)}\n\n" # Save complete assistant message storage.add_assistant_message( conversation_id, stage1_results, stage2_results, stage3_result ) # Send completion event complete_event = {'type': 'complete'} captured_events.append(complete_event) yield f"data: {json.dumps(complete_event)}\n\n" # Capture all SSE events for iOS development if CAPTURE_ENABLED: capture.capture_sse_events(captured_events) except Exception as e: # Send error event error_event = {'type': 'error', 'message': str(e)} captured_events.append(error_event) if CAPTURE_ENABLED: capture.capture_sse_events(captured_events) yield f"data: {json.dumps(error_event)}\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", } ) # ============================================================================ # INDIVIDUAL STAGE ENDPOINTS (for MCP granular control) # ============================================================================ class Stage1Request(BaseModel): """Request for Stage 1.""" query: str class Stage2Request(BaseModel): """Request for Stage 2.""" query: str stage1_results: List[Dict[str, Any]] class Stage3Request(BaseModel): """Request for Stage 3.""" query: str stage1_results: List[Dict[str, Any]] stage2_results: List[Dict[str, Any]] @app.post("/api/council/stage1") async def run_stage1(request: Stage1Request): """ Run Stage 1 independently - collect individual responses from all council models. """ results = await stage1_collect_responses(request.query) if CAPTURE_ENABLED: capture.capture_stage1(results) return results @app.post("/api/council/stage2") async def run_stage2(request: Stage2Request): """ Run Stage 2 independently - collect rankings with anonymization. """ stage2_results, label_to_model = await stage2_collect_rankings( request.query, request.stage1_results ) aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model) if CAPTURE_ENABLED: capture.capture_stage2(stage2_results, label_to_model, aggregate_rankings) return { "rankings": stage2_results, "label_to_model": label_to_model, "aggregate_rankings": aggregate_rankings } @app.post("/api/council/stage3") async def run_stage3(request: Stage3Request): """ Run Stage 3 independently - chairman synthesis. """ result = await stage3_synthesize_final( request.query, request.stage1_results, request.stage2_results ) if CAPTURE_ENABLED: capture.capture_stage3(result) return result if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8001)