"""MCP Server for LLM Council - Proxies to FastAPI service.""" import os import sys import httpx from typing import Any from mcp.server.fastmcp import FastMCP # FastAPI service URL (configurable via environment) FASTAPI_URL = os.getenv("COUNCIL_FASTAPI_URL", "http://localhost:8001") # Create MCP server instance mcp = FastMCP("council") # HTTP client for FastAPI communication _http_client: httpx.AsyncClient | None = None async def get_http_client() -> httpx.AsyncClient: """Get or create the HTTP client.""" global _http_client if _http_client is None or _http_client.is_closed: _http_client = httpx.AsyncClient( base_url=FASTAPI_URL, timeout=httpx.Timeout(300.0) # 5 min timeout for council queries ) return _http_client async def api_request( method: str, endpoint: str, json_data: dict | None = None ) -> dict | list: """Make a request to the FastAPI service.""" client = await get_http_client() try: if method == "GET": response = await client.get(endpoint) elif method == "POST": response = await client.post(endpoint, json=json_data or {}) else: raise ValueError(f"Unsupported method: {method}") response.raise_for_status() return response.json() except httpx.ConnectError: raise RuntimeError( f"Cannot connect to Council FastAPI at {FASTAPI_URL}. " "Ensure the service is running with: cd llm-council && uv run python -m backend.main" ) except httpx.TimeoutException: raise RuntimeError( "Council query timed out. The 3-stage process may take several minutes." ) except httpx.HTTPStatusError as e: raise RuntimeError(f"HTTP error {e.response.status_code}: {e.response.text}") # ============================================================================ # HIGH-LEVEL COUNCIL TOOL # ============================================================================ @mcp.tool() async def council_query( query: str, conversation_id: str | None = None ) -> dict[str, Any]: """ Run the complete LLM Council 3-stage deliberation process. Stage 1: Query all council models in parallel for individual responses. Stage 2: Each model anonymously ranks the other responses. Stage 3: Chairman model synthesizes a final answer based on all inputs. This process typically takes 1-3 minutes depending on model response times. Args: query: The user's question to deliberate on conversation_id: Optional existing conversation ID for multi-turn context Returns: Complete council response with all 3 stages and metadata including: - conversation_id: The conversation ID used - stage1: Individual model responses - stage2: Peer rankings with aggregate scores - stage3: Chairman's synthesized final answer - metadata: Label mappings and aggregate rankings """ # Create conversation if not provided if not conversation_id: conv = await api_request("POST", "/api/conversations", {}) conversation_id = conv["id"] # Send message and get full council response result = await api_request( "POST", f"/api/conversations/{conversation_id}/message", {"content": query} ) return { "conversation_id": conversation_id, **result } # ============================================================================ # INDIVIDUAL STAGE TOOLS # ============================================================================ @mcp.tool() async def council_stage1_collect(query: str) -> list[dict[str, str]]: """ Stage 1: Collect individual responses from all council models. Queries all configured council models in parallel with the user's question. Each model responds independently without seeing other responses. Use this for granular control over the council process - you can analyze Stage 1 results before deciding whether to proceed to ranking. Args: query: The user's question to send to all council models Returns: List of responses, each containing 'model' name and 'response' text """ result = await api_request( "POST", "/api/council/stage1", {"query": query} ) return result @mcp.tool() async def council_stage2_rank( query: str, stage1_results: list[dict] ) -> dict[str, Any]: """ Stage 2: Have each model rank the anonymized responses from Stage 1. Responses are anonymized as "Response A", "Response B", etc. to prevent models from playing favorites based on model identity. Args: query: The original user question stage1_results: Results from council_stage1_collect containing model responses Returns: Object containing: - rankings: List of {model, ranking (raw text), parsed_ranking (ordered list)} - label_to_model: Mapping of anonymous labels to model names - aggregate_rankings: Models sorted by average rank position """ result = await api_request( "POST", "/api/council/stage2", {"query": query, "stage1_results": stage1_results} ) return result @mcp.tool() async def council_stage3_synthesize( query: str, stage1_results: list[dict], stage2_results: list[dict] ) -> dict[str, str]: """ Stage 3: Chairman model synthesizes final answer from all inputs. The chairman receives all individual responses and peer rankings, then produces a comprehensive final answer representing council consensus. Args: query: The original user question stage1_results: Individual model responses from Stage 1 stage2_results: Peer rankings from Stage 2 Returns: Chairman's synthesized response with 'model' and 'response' fields """ result = await api_request( "POST", "/api/council/stage3", { "query": query, "stage1_results": stage1_results, "stage2_results": stage2_results } ) return result # ============================================================================ # CONVERSATION MANAGEMENT TOOLS # ============================================================================ @mcp.tool() async def council_conversation_create() -> dict[str, Any]: """ Create a new council conversation. Returns: New conversation with id, created_at, title, and empty messages array """ return await api_request("POST", "/api/conversations", {}) @mcp.tool() async def council_conversation_list() -> list[dict[str, Any]]: """ List all council conversations (metadata only). Returns conversations sorted by creation time, newest first. Returns: List of conversation metadata (id, created_at, title, message_count) """ return await api_request("GET", "/api/conversations") @mcp.tool() async def council_conversation_get(conversation_id: str) -> dict[str, Any]: """ Get a specific council conversation with all messages. Messages include both user queries and assistant responses with all 3 stages. Args: conversation_id: The conversation ID to retrieve Returns: Full conversation object including all messages """ return await api_request("GET", f"/api/conversations/{conversation_id}") # ============================================================================ # ENTRY POINT # ============================================================================ def main(): """Run the MCP server using stdio transport.""" # Log to stderr (important for stdio transport - stdout is for protocol) print(f"Starting Council MCP server, proxying to {FASTAPI_URL}", file=sys.stderr) mcp.run(transport="stdio") if __name__ == "__main__": main()