diff --git a/.python-version b/.python-version index c8cfe39..24ee5b1 100644 --- a/.python-version +++ b/.python-version @@ -1 +1 @@ -3.10 +3.13 diff --git a/backend/main.py b/backend/main.py index 25b9f1c..f819b88 100644 --- a/backend/main.py +++ b/backend/main.py @@ -8,9 +8,14 @@ from typing import List, Dict, Any import uuid import json import asyncio +import os from . import storage from .council import run_full_council, generate_conversation_title, stage1_collect_responses, stage2_collect_rankings, stage3_synthesize_final, calculate_aggregate_rankings +from . import capture + +# Enable JSON capture for iOS development +CAPTURE_ENABLED = os.getenv("CAPTURE_JSON", "false").lower() == "true" app = FastAPI(title="LLM Council API") @@ -59,7 +64,10 @@ async def root(): @app.get("/api/conversations", response_model=List[ConversationMetadata]) async def list_conversations(): """List all conversations (metadata only).""" - return storage.list_conversations() + result = storage.list_conversations() + if CAPTURE_ENABLED: + capture.capture_conversation_list(result) + return result @app.post("/api/conversations", response_model=Conversation) @@ -76,6 +84,8 @@ async def get_conversation(conversation_id: str): conversation = storage.get_conversation(conversation_id) if conversation is None: raise HTTPException(status_code=404, detail="Conversation not found") + if CAPTURE_ENABLED: + capture.capture_conversation(conversation) return conversation @@ -114,14 +124,19 @@ async def send_message(conversation_id: str, request: SendMessageRequest): stage3_result ) - # Return the complete response with metadata - return { + response = { "stage1": stage1_results, "stage2": stage2_results, "stage3": stage3_result, "metadata": metadata } + # Capture JSON for iOS development if enabled + if CAPTURE_ENABLED: + capture.capture_full_response(stage1_results, stage2_results, stage3_result, metadata) + + return response + @app.post("/api/conversations/{conversation_id}/message/stream") async def send_message_stream(conversation_id: str, request: SendMessageRequest): @@ -138,6 +153,8 @@ async def send_message_stream(conversation_id: str, request: SendMessageRequest) is_first_message = len(conversation["messages"]) == 0 async def event_generator(): + captured_events = [] # For iOS development capture + try: # Add user message storage.add_user_message(conversation_id, request.content) @@ -148,26 +165,43 @@ async def send_message_stream(conversation_id: str, request: SendMessageRequest) title_task = asyncio.create_task(generate_conversation_title(request.content)) # Stage 1: Collect responses - yield f"data: {json.dumps({'type': 'stage1_start'})}\n\n" + event1_start = {'type': 'stage1_start'} + captured_events.append(event1_start) + yield f"data: {json.dumps(event1_start)}\n\n" + stage1_results = await stage1_collect_responses(request.content) - yield f"data: {json.dumps({'type': 'stage1_complete', 'data': stage1_results})}\n\n" + event1_complete = {'type': 'stage1_complete', 'data': stage1_results} + captured_events.append(event1_complete) + yield f"data: {json.dumps(event1_complete)}\n\n" # Stage 2: Collect rankings - yield f"data: {json.dumps({'type': 'stage2_start'})}\n\n" + event2_start = {'type': 'stage2_start'} + captured_events.append(event2_start) + yield f"data: {json.dumps(event2_start)}\n\n" + stage2_results, label_to_model = await stage2_collect_rankings(request.content, stage1_results) aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model) - yield f"data: {json.dumps({'type': 'stage2_complete', 'data': stage2_results, 'metadata': {'label_to_model': label_to_model, 'aggregate_rankings': aggregate_rankings}})}\n\n" + event2_complete = {'type': 'stage2_complete', 'data': stage2_results, 'metadata': {'label_to_model': label_to_model, 'aggregate_rankings': aggregate_rankings}} + captured_events.append(event2_complete) + yield f"data: {json.dumps(event2_complete)}\n\n" # Stage 3: Synthesize final answer - yield f"data: {json.dumps({'type': 'stage3_start'})}\n\n" + event3_start = {'type': 'stage3_start'} + captured_events.append(event3_start) + yield f"data: {json.dumps(event3_start)}\n\n" + stage3_result = await stage3_synthesize_final(request.content, stage1_results, stage2_results) - yield f"data: {json.dumps({'type': 'stage3_complete', 'data': stage3_result})}\n\n" + event3_complete = {'type': 'stage3_complete', 'data': stage3_result} + captured_events.append(event3_complete) + yield f"data: {json.dumps(event3_complete)}\n\n" # Wait for title generation if it was started if title_task: title = await title_task storage.update_conversation_title(conversation_id, title) - yield f"data: {json.dumps({'type': 'title_complete', 'data': {'title': title}})}\n\n" + title_event = {'type': 'title_complete', 'data': {'title': title}} + captured_events.append(title_event) + yield f"data: {json.dumps(title_event)}\n\n" # Save complete assistant message storage.add_assistant_message( @@ -178,11 +212,21 @@ async def send_message_stream(conversation_id: str, request: SendMessageRequest) ) # Send completion event - yield f"data: {json.dumps({'type': 'complete'})}\n\n" + complete_event = {'type': 'complete'} + captured_events.append(complete_event) + yield f"data: {json.dumps(complete_event)}\n\n" + + # Capture all SSE events for iOS development + if CAPTURE_ENABLED: + capture.capture_sse_events(captured_events) except Exception as e: # Send error event - yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" + error_event = {'type': 'error', 'message': str(e)} + captured_events.append(error_event) + if CAPTURE_ENABLED: + capture.capture_sse_events(captured_events) + yield f"data: {json.dumps(error_event)}\n\n" return StreamingResponse( event_generator(), @@ -222,6 +266,8 @@ async def run_stage1(request: Stage1Request): Run Stage 1 independently - collect individual responses from all council models. """ results = await stage1_collect_responses(request.query) + if CAPTURE_ENABLED: + capture.capture_stage1(results) return results @@ -235,6 +281,9 @@ async def run_stage2(request: Stage2Request): ) aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model) + if CAPTURE_ENABLED: + capture.capture_stage2(stage2_results, label_to_model, aggregate_rankings) + return { "rankings": stage2_results, "label_to_model": label_to_model, @@ -250,6 +299,8 @@ async def run_stage3(request: Stage3Request): result = await stage3_synthesize_final( request.query, request.stage1_results, request.stage2_results ) + if CAPTURE_ENABLED: + capture.capture_stage3(result) return result diff --git a/mcp_server/server.py b/mcp_server/server.py index 2cf40fb..06421a5 100644 --- a/mcp_server/server.py +++ b/mcp_server/server.py @@ -13,6 +13,39 @@ FASTAPI_URL = os.getenv("COUNCIL_FASTAPI_URL", "http://localhost:8001") # Create MCP server instance mcp = FastMCP("council") +# Model display name mapping for human-readable names +MODEL_DISPLAY_NAMES = { + "openai/gpt-4o": "GPT-4o", + "openai/gpt-4o-mini": "GPT-4o Mini", + "openai/gpt-4-turbo": "GPT-4 Turbo", + "openai/o1": "o1", + "openai/o1-mini": "o1 Mini", + "openai/o1-preview": "o1 Preview", + "anthropic/claude-3.5-sonnet": "Claude 3.5 Sonnet", + "anthropic/claude-3-opus": "Claude 3 Opus", + "anthropic/claude-3-haiku": "Claude 3 Haiku", + "google/gemini-pro": "Gemini Pro", + "google/gemini-pro-1.5": "Gemini Pro 1.5", + "google/gemini-2.0-flash-001": "Gemini 2.0 Flash", + "google/gemini-2.0-flash-thinking-exp": "Gemini 2.0 Flash Thinking", + "meta-llama/llama-3.1-405b-instruct": "Llama 3.1 405B", + "meta-llama/llama-3.1-70b-instruct": "Llama 3.1 70B", + "mistralai/mistral-large": "Mistral Large", + "deepseek/deepseek-chat": "DeepSeek Chat", + "deepseek/deepseek-r1": "DeepSeek R1", +} + + +def get_display_name(model_id: str) -> str: + """Get human-readable display name for a model ID.""" + if model_id in MODEL_DISPLAY_NAMES: + return MODEL_DISPLAY_NAMES[model_id] + # Fallback: extract the model name from the ID + parts = model_id.split("/") + if len(parts) > 1: + return parts[-1].replace("-", " ").title() + return model_id + # HTTP client for FastAPI communication _http_client: httpx.AsyncClient | None = None @@ -83,6 +116,7 @@ async def council_query( Returns: Complete council response with all 3 stages and metadata including: - conversation_id: The conversation ID used + - content_blocks: Structured blocks for each model response (for UI rendering) - stage1: Individual model responses - stage2: Peer rankings with aggregate scores - stage3: Chairman's synthesized final answer @@ -100,8 +134,35 @@ async def council_query( {"content": query} ) + # Build content_blocks for structured UI rendering + content_blocks = [] + + # Add Stage 1 responses as individual blocks + stage1_results = result.get("stage1", []) + for resp in stage1_results: + model_id = resp.get("model", "unknown") + content_blocks.append({ + "type": "council_response", + "model": model_id, + "model_display_name": get_display_name(model_id), + "response": resp.get("response", ""), + "stage": 1 + }) + + # Add Stage 3 synthesis block + stage3_result = result.get("stage3", {}) + if stage3_result: + model_id = stage3_result.get("model", "unknown") + content_blocks.append({ + "type": "council_synthesis", + "model": model_id, + "model_display_name": get_display_name(model_id), + "response": stage3_result.get("response", "") + }) + return { "conversation_id": conversation_id, + "content_blocks": content_blocks, **result }