This commit is contained in:
karpathy
2025-11-22 14:27:53 -08:00
commit eb0eb26f4c
39 changed files with 6660 additions and 0 deletions

1
backend/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""LLM Council backend package."""

26
backend/config.py Normal file
View File

@@ -0,0 +1,26 @@
"""Configuration for the LLM Council."""
import os
from dotenv import load_dotenv
load_dotenv()
# OpenRouter API key
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
# Council members - list of OpenRouter model identifiers
COUNCIL_MODELS = [
"openai/gpt-5.1",
"google/gemini-3-pro-preview",
"anthropic/claude-sonnet-4.5",
"x-ai/grok-4",
]
# Chairman model - synthesizes final response
CHAIRMAN_MODEL = "google/gemini-3-pro-preview"
# OpenRouter API endpoint
OPENROUTER_API_URL = "https://openrouter.ai/api/v1/chat/completions"
# Data directory for conversation storage
DATA_DIR = "data/conversations"

297
backend/council.py Normal file
View File

@@ -0,0 +1,297 @@
"""3-stage LLM Council orchestration."""
from typing import List, Dict, Any, Tuple
from .openrouter import query_models_parallel, query_model
from .config import COUNCIL_MODELS, CHAIRMAN_MODEL
async def stage1_collect_responses(user_query: str) -> List[Dict[str, Any]]:
"""
Stage 1: Collect individual responses from all council models.
Args:
user_query: The user's question
Returns:
List of dicts with 'model' and 'response' keys
"""
messages = [{"role": "user", "content": user_query}]
# Query all models in parallel
responses = await query_models_parallel(COUNCIL_MODELS, messages)
# Format results
stage1_results = []
for model, response in responses.items():
if response is not None: # Only include successful responses
stage1_results.append({
"model": model,
"response": response.get('content', '')
})
return stage1_results
async def stage2_collect_rankings(
user_query: str,
stage1_results: List[Dict[str, Any]]
) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
"""
Stage 2: Each model ranks the anonymized responses.
Args:
user_query: The original user query
stage1_results: Results from Stage 1
Returns:
Tuple of (rankings list, label_to_model mapping)
"""
# Create anonymized labels for responses (Response A, Response B, etc.)
labels = [chr(65 + i) for i in range(len(stage1_results))] # A, B, C, ...
# Create mapping from label to model name
label_to_model = {
f"Response {label}": result['model']
for label, result in zip(labels, stage1_results)
}
# Build the ranking prompt
responses_text = "\n\n".join([
f"Response {label}:\n{result['response']}"
for label, result in zip(labels, stage1_results)
])
ranking_prompt = f"""You are evaluating different responses to the following question:
Question: {user_query}
Here are the responses from different models (anonymized):
{responses_text}
Your task:
1. First, evaluate each response individually. For each response, explain what it does well and what it does poorly.
2. Then, at the very end of your response, provide a final ranking.
IMPORTANT: Your final ranking MUST be formatted EXACTLY as follows:
- Start with the line "FINAL RANKING:" (all caps, with colon)
- Then list the responses from best to worst as a numbered list
- Each line should be: number, period, space, then ONLY the response label (e.g., "1. Response A")
- Do not add any other text or explanations in the ranking section
Example of the correct format for your ENTIRE response:
Response A provides good detail on X but misses Y...
Response B is accurate but lacks depth on Z...
Response C offers the most comprehensive answer...
FINAL RANKING:
1. Response C
2. Response A
3. Response B
Now provide your evaluation and ranking:"""
messages = [{"role": "user", "content": ranking_prompt}]
# Get rankings from all council models in parallel
responses = await query_models_parallel(COUNCIL_MODELS, messages)
# Format results
stage2_results = []
for model, response in responses.items():
if response is not None:
full_text = response.get('content', '')
parsed = parse_ranking_from_text(full_text)
stage2_results.append({
"model": model,
"ranking": full_text,
"parsed_ranking": parsed
})
return stage2_results, label_to_model
async def stage3_synthesize_final(
user_query: str,
stage1_results: List[Dict[str, Any]],
stage2_results: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Stage 3: Chairman synthesizes final response.
Args:
user_query: The original user query
stage1_results: Individual model responses from Stage 1
stage2_results: Rankings from Stage 2
Returns:
Dict with 'model' and 'response' keys
"""
# Build comprehensive context for chairman
stage1_text = "\n\n".join([
f"Model: {result['model']}\nResponse: {result['response']}"
for result in stage1_results
])
stage2_text = "\n\n".join([
f"Model: {result['model']}\nRanking: {result['ranking']}"
for result in stage2_results
])
chairman_prompt = f"""You are the Chairman of an LLM Council. Multiple AI models have provided responses to a user's question, and then ranked each other's responses.
Original Question: {user_query}
STAGE 1 - Individual Responses:
{stage1_text}
STAGE 2 - Peer Rankings:
{stage2_text}
Your task as Chairman is to synthesize all of this information into a single, comprehensive, accurate answer to the user's original question. Consider:
- The individual responses and their insights
- The peer rankings and what they reveal about response quality
- Any patterns of agreement or disagreement
Provide a clear, well-reasoned final answer that represents the council's collective wisdom:"""
messages = [{"role": "user", "content": chairman_prompt}]
# Query the chairman model
response = await query_model(CHAIRMAN_MODEL, messages)
if response is None:
# Fallback if chairman fails
return {
"model": CHAIRMAN_MODEL,
"response": "Error: Unable to generate final synthesis."
}
return {
"model": CHAIRMAN_MODEL,
"response": response.get('content', '')
}
def parse_ranking_from_text(ranking_text: str) -> List[str]:
"""
Parse the FINAL RANKING section from the model's response.
Args:
ranking_text: The full text response from the model
Returns:
List of response labels in ranked order
"""
import re
# Look for "FINAL RANKING:" section
if "FINAL RANKING:" in ranking_text:
# Extract everything after "FINAL RANKING:"
parts = ranking_text.split("FINAL RANKING:")
if len(parts) >= 2:
ranking_section = parts[1]
# Try to extract numbered list format (e.g., "1. Response A")
# This pattern looks for: number, period, optional space, "Response X"
numbered_matches = re.findall(r'\d+\.\s*Response [A-Z]', ranking_section)
if numbered_matches:
# Extract just the "Response X" part
return [re.search(r'Response [A-Z]', m).group() for m in numbered_matches]
# Fallback: Extract all "Response X" patterns in order
matches = re.findall(r'Response [A-Z]', ranking_section)
return matches
# Fallback: try to find any "Response X" patterns in order
matches = re.findall(r'Response [A-Z]', ranking_text)
return matches
def calculate_aggregate_rankings(
stage2_results: List[Dict[str, Any]],
label_to_model: Dict[str, str]
) -> List[Dict[str, Any]]:
"""
Calculate aggregate rankings across all models.
Args:
stage2_results: Rankings from each model
label_to_model: Mapping from anonymous labels to model names
Returns:
List of dicts with model name and average rank, sorted best to worst
"""
from collections import defaultdict
# Track positions for each model
model_positions = defaultdict(list)
for ranking in stage2_results:
ranking_text = ranking['ranking']
# Parse the ranking from the structured format
parsed_ranking = parse_ranking_from_text(ranking_text)
for position, label in enumerate(parsed_ranking, start=1):
if label in label_to_model:
model_name = label_to_model[label]
model_positions[model_name].append(position)
# Calculate average position for each model
aggregate = []
for model, positions in model_positions.items():
if positions:
avg_rank = sum(positions) / len(positions)
aggregate.append({
"model": model,
"average_rank": round(avg_rank, 2),
"rankings_count": len(positions)
})
# Sort by average rank (lower is better)
aggregate.sort(key=lambda x: x['average_rank'])
return aggregate
async def run_full_council(user_query: str) -> Tuple[List, List, Dict, Dict]:
"""
Run the complete 3-stage council process.
Args:
user_query: The user's question
Returns:
Tuple of (stage1_results, stage2_results, stage3_result, metadata)
"""
# Stage 1: Collect individual responses
stage1_results = await stage1_collect_responses(user_query)
# If no models responded successfully, return error
if not stage1_results:
return [], [], {
"model": "error",
"response": "All models failed to respond. Please try again."
}, {}
# Stage 2: Collect rankings
stage2_results, label_to_model = await stage2_collect_rankings(user_query, stage1_results)
# Calculate aggregate rankings
aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model)
# Stage 3: Synthesize final answer
stage3_result = await stage3_synthesize_final(
user_query,
stage1_results,
stage2_results
)
# Prepare metadata
metadata = {
"label_to_model": label_to_model,
"aggregate_rankings": aggregate_rankings
}
return stage1_results, stage2_results, stage3_result, metadata

115
backend/main.py Normal file
View File

@@ -0,0 +1,115 @@
"""FastAPI backend for LLM Council."""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Dict, Any
import uuid
from . import storage
from .council import run_full_council
app = FastAPI(title="LLM Council API")
# Enable CORS for local development
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class CreateConversationRequest(BaseModel):
"""Request to create a new conversation."""
pass
class SendMessageRequest(BaseModel):
"""Request to send a message in a conversation."""
content: str
class ConversationMetadata(BaseModel):
"""Conversation metadata for list view."""
id: str
created_at: str
message_count: int
class Conversation(BaseModel):
"""Full conversation with all messages."""
id: str
created_at: str
messages: List[Dict[str, Any]]
@app.get("/")
async def root():
"""Health check endpoint."""
return {"status": "ok", "service": "LLM Council API"}
@app.get("/api/conversations", response_model=List[ConversationMetadata])
async def list_conversations():
"""List all conversations (metadata only)."""
return storage.list_conversations()
@app.post("/api/conversations", response_model=Conversation)
async def create_conversation(request: CreateConversationRequest):
"""Create a new conversation."""
conversation_id = str(uuid.uuid4())
conversation = storage.create_conversation(conversation_id)
return conversation
@app.get("/api/conversations/{conversation_id}", response_model=Conversation)
async def get_conversation(conversation_id: str):
"""Get a specific conversation with all its messages."""
conversation = storage.get_conversation(conversation_id)
if conversation is None:
raise HTTPException(status_code=404, detail="Conversation not found")
return conversation
@app.post("/api/conversations/{conversation_id}/message")
async def send_message(conversation_id: str, request: SendMessageRequest):
"""
Send a message and run the 3-stage council process.
Returns the complete response with all stages.
"""
# Check if conversation exists
conversation = storage.get_conversation(conversation_id)
if conversation is None:
raise HTTPException(status_code=404, detail="Conversation not found")
# Add user message
storage.add_user_message(conversation_id, request.content)
# Run the 3-stage council process
stage1_results, stage2_results, stage3_result, metadata = await run_full_council(
request.content
)
# Add assistant message with all stages
storage.add_assistant_message(
conversation_id,
stage1_results,
stage2_results,
stage3_result
)
# Return the complete response with metadata
return {
"stage1": stage1_results,
"stage2": stage2_results,
"stage3": stage3_result,
"metadata": metadata
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)

79
backend/openrouter.py Normal file
View File

@@ -0,0 +1,79 @@
"""OpenRouter API client for making LLM requests."""
import httpx
from typing import List, Dict, Any, Optional
from .config import OPENROUTER_API_KEY, OPENROUTER_API_URL
async def query_model(
model: str,
messages: List[Dict[str, str]],
timeout: float = 120.0
) -> Optional[Dict[str, Any]]:
"""
Query a single model via OpenRouter API.
Args:
model: OpenRouter model identifier (e.g., "openai/gpt-4o")
messages: List of message dicts with 'role' and 'content'
timeout: Request timeout in seconds
Returns:
Response dict with 'content' and optional 'reasoning_details', or None if failed
"""
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
}
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
OPENROUTER_API_URL,
headers=headers,
json=payload
)
response.raise_for_status()
data = response.json()
message = data['choices'][0]['message']
return {
'content': message.get('content'),
'reasoning_details': message.get('reasoning_details')
}
except Exception as e:
print(f"Error querying model {model}: {e}")
return None
async def query_models_parallel(
models: List[str],
messages: List[Dict[str, str]]
) -> Dict[str, Optional[Dict[str, Any]]]:
"""
Query multiple models in parallel.
Args:
models: List of OpenRouter model identifiers
messages: List of message dicts to send to each model
Returns:
Dict mapping model identifier to response dict (or None if failed)
"""
import asyncio
# Create tasks for all models
tasks = [query_model(model, messages) for model in models]
# Wait for all to complete
responses = await asyncio.gather(*tasks)
# Map models to their responses
return {model: response for model, response in zip(models, responses)}

154
backend/storage.py Normal file
View File

@@ -0,0 +1,154 @@
"""JSON-based storage for conversations."""
import json
import os
from datetime import datetime
from typing import List, Dict, Any, Optional
from pathlib import Path
from .config import DATA_DIR
def ensure_data_dir():
"""Ensure the data directory exists."""
Path(DATA_DIR).mkdir(parents=True, exist_ok=True)
def get_conversation_path(conversation_id: str) -> str:
"""Get the file path for a conversation."""
return os.path.join(DATA_DIR, f"{conversation_id}.json")
def create_conversation(conversation_id: str) -> Dict[str, Any]:
"""
Create a new conversation.
Args:
conversation_id: Unique identifier for the conversation
Returns:
New conversation dict
"""
ensure_data_dir()
conversation = {
"id": conversation_id,
"created_at": datetime.utcnow().isoformat(),
"messages": []
}
# Save to file
path = get_conversation_path(conversation_id)
with open(path, 'w') as f:
json.dump(conversation, f, indent=2)
return conversation
def get_conversation(conversation_id: str) -> Optional[Dict[str, Any]]:
"""
Load a conversation from storage.
Args:
conversation_id: Unique identifier for the conversation
Returns:
Conversation dict or None if not found
"""
path = get_conversation_path(conversation_id)
if not os.path.exists(path):
return None
with open(path, 'r') as f:
return json.load(f)
def save_conversation(conversation: Dict[str, Any]):
"""
Save a conversation to storage.
Args:
conversation: Conversation dict to save
"""
ensure_data_dir()
path = get_conversation_path(conversation['id'])
with open(path, 'w') as f:
json.dump(conversation, f, indent=2)
def list_conversations() -> List[Dict[str, Any]]:
"""
List all conversations (metadata only).
Returns:
List of conversation metadata dicts
"""
ensure_data_dir()
conversations = []
for filename in os.listdir(DATA_DIR):
if filename.endswith('.json'):
path = os.path.join(DATA_DIR, filename)
with open(path, 'r') as f:
data = json.load(f)
# Return metadata only
conversations.append({
"id": data["id"],
"created_at": data["created_at"],
"message_count": len(data["messages"])
})
# Sort by creation time, newest first
conversations.sort(key=lambda x: x["created_at"], reverse=True)
return conversations
def add_user_message(conversation_id: str, content: str):
"""
Add a user message to a conversation.
Args:
conversation_id: Conversation identifier
content: User message content
"""
conversation = get_conversation(conversation_id)
if conversation is None:
raise ValueError(f"Conversation {conversation_id} not found")
conversation["messages"].append({
"role": "user",
"content": content
})
save_conversation(conversation)
def add_assistant_message(
conversation_id: str,
stage1: List[Dict[str, Any]],
stage2: List[Dict[str, Any]],
stage3: Dict[str, Any]
):
"""
Add an assistant message with all 3 stages to a conversation.
Args:
conversation_id: Conversation identifier
stage1: List of individual model responses
stage2: List of model rankings
stage3: Final synthesized response
"""
conversation = get_conversation(conversation_id)
if conversation is None:
raise ValueError(f"Conversation {conversation_id} not found")
conversation["messages"].append({
"role": "assistant",
"stage1": stage1,
"stage2": stage2,
"stage3": stage3
})
save_conversation(conversation)