- Add content_blocks array to council_query return with model responses and synthesis result for rendering as UI cards - Add MODEL_DISPLAY_NAMES mapping and get_display_name helper for user-friendly model names in the UI - Update Python version to 3.13 for compatibility - Refactor FastAPI endpoints for cleaner error handling
317 lines
10 KiB
Python
317 lines
10 KiB
Python
"""MCP Server for LLM Council - Proxies to FastAPI service."""
|
|
|
|
import os
|
|
import sys
|
|
import httpx
|
|
from typing import Any
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
# FastAPI service URL (configurable via environment)
|
|
FASTAPI_URL = os.getenv("COUNCIL_FASTAPI_URL", "http://localhost:8001")
|
|
|
|
# Create MCP server instance
|
|
mcp = FastMCP("council")
|
|
|
|
# Model display name mapping for human-readable names
|
|
MODEL_DISPLAY_NAMES = {
|
|
"openai/gpt-4o": "GPT-4o",
|
|
"openai/gpt-4o-mini": "GPT-4o Mini",
|
|
"openai/gpt-4-turbo": "GPT-4 Turbo",
|
|
"openai/o1": "o1",
|
|
"openai/o1-mini": "o1 Mini",
|
|
"openai/o1-preview": "o1 Preview",
|
|
"anthropic/claude-3.5-sonnet": "Claude 3.5 Sonnet",
|
|
"anthropic/claude-3-opus": "Claude 3 Opus",
|
|
"anthropic/claude-3-haiku": "Claude 3 Haiku",
|
|
"google/gemini-pro": "Gemini Pro",
|
|
"google/gemini-pro-1.5": "Gemini Pro 1.5",
|
|
"google/gemini-2.0-flash-001": "Gemini 2.0 Flash",
|
|
"google/gemini-2.0-flash-thinking-exp": "Gemini 2.0 Flash Thinking",
|
|
"meta-llama/llama-3.1-405b-instruct": "Llama 3.1 405B",
|
|
"meta-llama/llama-3.1-70b-instruct": "Llama 3.1 70B",
|
|
"mistralai/mistral-large": "Mistral Large",
|
|
"deepseek/deepseek-chat": "DeepSeek Chat",
|
|
"deepseek/deepseek-r1": "DeepSeek R1",
|
|
}
|
|
|
|
|
|
def get_display_name(model_id: str) -> str:
|
|
"""Get human-readable display name for a model ID."""
|
|
if model_id in MODEL_DISPLAY_NAMES:
|
|
return MODEL_DISPLAY_NAMES[model_id]
|
|
# Fallback: extract the model name from the ID
|
|
parts = model_id.split("/")
|
|
if len(parts) > 1:
|
|
return parts[-1].replace("-", " ").title()
|
|
return model_id
|
|
|
|
# HTTP client for FastAPI communication
|
|
_http_client: httpx.AsyncClient | None = None
|
|
|
|
|
|
async def get_http_client() -> httpx.AsyncClient:
|
|
"""Get or create the HTTP client."""
|
|
global _http_client
|
|
if _http_client is None or _http_client.is_closed:
|
|
_http_client = httpx.AsyncClient(
|
|
base_url=FASTAPI_URL,
|
|
timeout=httpx.Timeout(300.0) # 5 min timeout for council queries
|
|
)
|
|
return _http_client
|
|
|
|
|
|
async def api_request(
|
|
method: str,
|
|
endpoint: str,
|
|
json_data: dict | None = None
|
|
) -> dict | list:
|
|
"""Make a request to the FastAPI service."""
|
|
client = await get_http_client()
|
|
try:
|
|
if method == "GET":
|
|
response = await client.get(endpoint)
|
|
elif method == "POST":
|
|
response = await client.post(endpoint, json=json_data or {})
|
|
else:
|
|
raise ValueError(f"Unsupported method: {method}")
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except httpx.ConnectError:
|
|
raise RuntimeError(
|
|
f"Cannot connect to Council FastAPI at {FASTAPI_URL}. "
|
|
"Ensure the service is running with: cd llm-council && uv run python -m backend.main"
|
|
)
|
|
except httpx.TimeoutException:
|
|
raise RuntimeError(
|
|
"Council query timed out. The 3-stage process may take several minutes."
|
|
)
|
|
except httpx.HTTPStatusError as e:
|
|
raise RuntimeError(f"HTTP error {e.response.status_code}: {e.response.text}")
|
|
|
|
|
|
# ============================================================================
|
|
# HIGH-LEVEL COUNCIL TOOL
|
|
# ============================================================================
|
|
|
|
@mcp.tool()
|
|
async def council_query(
|
|
query: str,
|
|
conversation_id: str | None = None
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Run the complete LLM Council 3-stage deliberation process.
|
|
|
|
Stage 1: Query all council models in parallel for individual responses.
|
|
Stage 2: Each model anonymously ranks the other responses.
|
|
Stage 3: Chairman model synthesizes a final answer based on all inputs.
|
|
|
|
This process typically takes 1-3 minutes depending on model response times.
|
|
|
|
Args:
|
|
query: The user's question to deliberate on
|
|
conversation_id: Optional existing conversation ID for multi-turn context
|
|
|
|
Returns:
|
|
Complete council response with all 3 stages and metadata including:
|
|
- conversation_id: The conversation ID used
|
|
- content_blocks: Structured blocks for each model response (for UI rendering)
|
|
- stage1: Individual model responses
|
|
- stage2: Peer rankings with aggregate scores
|
|
- stage3: Chairman's synthesized final answer
|
|
- metadata: Label mappings and aggregate rankings
|
|
"""
|
|
# Create conversation if not provided
|
|
if not conversation_id:
|
|
conv = await api_request("POST", "/api/conversations", {})
|
|
conversation_id = conv["id"]
|
|
|
|
# Send message and get full council response
|
|
result = await api_request(
|
|
"POST",
|
|
f"/api/conversations/{conversation_id}/message",
|
|
{"content": query}
|
|
)
|
|
|
|
# Build content_blocks for structured UI rendering
|
|
content_blocks = []
|
|
|
|
# Add Stage 1 responses as individual blocks
|
|
stage1_results = result.get("stage1", [])
|
|
for resp in stage1_results:
|
|
model_id = resp.get("model", "unknown")
|
|
content_blocks.append({
|
|
"type": "council_response",
|
|
"model": model_id,
|
|
"model_display_name": get_display_name(model_id),
|
|
"response": resp.get("response", ""),
|
|
"stage": 1
|
|
})
|
|
|
|
# Add Stage 3 synthesis block
|
|
stage3_result = result.get("stage3", {})
|
|
if stage3_result:
|
|
model_id = stage3_result.get("model", "unknown")
|
|
content_blocks.append({
|
|
"type": "council_synthesis",
|
|
"model": model_id,
|
|
"model_display_name": get_display_name(model_id),
|
|
"response": stage3_result.get("response", "")
|
|
})
|
|
|
|
return {
|
|
"conversation_id": conversation_id,
|
|
"content_blocks": content_blocks,
|
|
**result
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# INDIVIDUAL STAGE TOOLS
|
|
# ============================================================================
|
|
|
|
@mcp.tool()
|
|
async def council_stage1_collect(query: str) -> list[dict[str, str]]:
|
|
"""
|
|
Stage 1: Collect individual responses from all council models.
|
|
|
|
Queries all configured council models in parallel with the user's question.
|
|
Each model responds independently without seeing other responses.
|
|
|
|
Use this for granular control over the council process - you can analyze
|
|
Stage 1 results before deciding whether to proceed to ranking.
|
|
|
|
Args:
|
|
query: The user's question to send to all council models
|
|
|
|
Returns:
|
|
List of responses, each containing 'model' name and 'response' text
|
|
"""
|
|
result = await api_request(
|
|
"POST",
|
|
"/api/council/stage1",
|
|
{"query": query}
|
|
)
|
|
return result
|
|
|
|
|
|
@mcp.tool()
|
|
async def council_stage2_rank(
|
|
query: str,
|
|
stage1_results: list[dict]
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Stage 2: Have each model rank the anonymized responses from Stage 1.
|
|
|
|
Responses are anonymized as "Response A", "Response B", etc. to prevent
|
|
models from playing favorites based on model identity.
|
|
|
|
Args:
|
|
query: The original user question
|
|
stage1_results: Results from council_stage1_collect containing model responses
|
|
|
|
Returns:
|
|
Object containing:
|
|
- rankings: List of {model, ranking (raw text), parsed_ranking (ordered list)}
|
|
- label_to_model: Mapping of anonymous labels to model names
|
|
- aggregate_rankings: Models sorted by average rank position
|
|
"""
|
|
result = await api_request(
|
|
"POST",
|
|
"/api/council/stage2",
|
|
{"query": query, "stage1_results": stage1_results}
|
|
)
|
|
return result
|
|
|
|
|
|
@mcp.tool()
|
|
async def council_stage3_synthesize(
|
|
query: str,
|
|
stage1_results: list[dict],
|
|
stage2_results: list[dict]
|
|
) -> dict[str, str]:
|
|
"""
|
|
Stage 3: Chairman model synthesizes final answer from all inputs.
|
|
|
|
The chairman receives all individual responses and peer rankings,
|
|
then produces a comprehensive final answer representing council consensus.
|
|
|
|
Args:
|
|
query: The original user question
|
|
stage1_results: Individual model responses from Stage 1
|
|
stage2_results: Peer rankings from Stage 2
|
|
|
|
Returns:
|
|
Chairman's synthesized response with 'model' and 'response' fields
|
|
"""
|
|
result = await api_request(
|
|
"POST",
|
|
"/api/council/stage3",
|
|
{
|
|
"query": query,
|
|
"stage1_results": stage1_results,
|
|
"stage2_results": stage2_results
|
|
}
|
|
)
|
|
return result
|
|
|
|
|
|
# ============================================================================
|
|
# CONVERSATION MANAGEMENT TOOLS
|
|
# ============================================================================
|
|
|
|
@mcp.tool()
|
|
async def council_conversation_create() -> dict[str, Any]:
|
|
"""
|
|
Create a new council conversation.
|
|
|
|
Returns:
|
|
New conversation with id, created_at, title, and empty messages array
|
|
"""
|
|
return await api_request("POST", "/api/conversations", {})
|
|
|
|
|
|
@mcp.tool()
|
|
async def council_conversation_list() -> list[dict[str, Any]]:
|
|
"""
|
|
List all council conversations (metadata only).
|
|
|
|
Returns conversations sorted by creation time, newest first.
|
|
|
|
Returns:
|
|
List of conversation metadata (id, created_at, title, message_count)
|
|
"""
|
|
return await api_request("GET", "/api/conversations")
|
|
|
|
|
|
@mcp.tool()
|
|
async def council_conversation_get(conversation_id: str) -> dict[str, Any]:
|
|
"""
|
|
Get a specific council conversation with all messages.
|
|
|
|
Messages include both user queries and assistant responses with all 3 stages.
|
|
|
|
Args:
|
|
conversation_id: The conversation ID to retrieve
|
|
|
|
Returns:
|
|
Full conversation object including all messages
|
|
"""
|
|
return await api_request("GET", f"/api/conversations/{conversation_id}")
|
|
|
|
|
|
# ============================================================================
|
|
# ENTRY POINT
|
|
# ============================================================================
|
|
|
|
def main():
|
|
"""Run the MCP server using stdio transport."""
|
|
# Log to stderr (important for stdio transport - stdout is for protocol)
|
|
print(f"Starting Council MCP server, proxying to {FASTAPI_URL}", file=sys.stderr)
|
|
mcp.run(transport="stdio")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|