Files
llm-council/mcp_server/server.py
Krishna Kumar 93a70bb195 Remove council_query tool to force stage-by-stage usage
The all-in-one council_query tool returned everything as a single blob,
preventing the iOS app from displaying individual model responses as
separate content blocks. By removing it, the agent must now use:

- council_stage1_collect (individual responses)
- council_stage2_rank (peer rankings)
- council_stage3_synthesize (final synthesis)

Each stage call generates a separate tool_result, enabling granular
display in the UI.
2025-12-16 15:18:21 -06:00

208 lines
6.2 KiB
Python

"""MCP Server for LLM Council - Proxies to FastAPI service."""
import os
import sys
import httpx
from typing import Any
from mcp.server.fastmcp import FastMCP
# FastAPI service URL (configurable via environment)
FASTAPI_URL = os.getenv("COUNCIL_FASTAPI_URL", "http://localhost:8001")
# Create MCP server instance
mcp = FastMCP("council")
# HTTP client for FastAPI communication
_http_client: httpx.AsyncClient | None = None
async def get_http_client() -> httpx.AsyncClient:
"""Get or create the HTTP client."""
global _http_client
if _http_client is None or _http_client.is_closed:
_http_client = httpx.AsyncClient(
base_url=FASTAPI_URL,
timeout=httpx.Timeout(300.0) # 5 min timeout for council queries
)
return _http_client
async def api_request(
method: str,
endpoint: str,
json_data: dict | None = None
) -> dict | list:
"""Make a request to the FastAPI service."""
client = await get_http_client()
try:
if method == "GET":
response = await client.get(endpoint)
elif method == "POST":
response = await client.post(endpoint, json=json_data or {})
else:
raise ValueError(f"Unsupported method: {method}")
response.raise_for_status()
return response.json()
except httpx.ConnectError:
raise RuntimeError(
f"Cannot connect to Council FastAPI at {FASTAPI_URL}. "
"Ensure the service is running with: cd llm-council && uv run python -m backend.main"
)
except httpx.TimeoutException:
raise RuntimeError(
"Council query timed out. The 3-stage process may take several minutes."
)
except httpx.HTTPStatusError as e:
raise RuntimeError(f"HTTP error {e.response.status_code}: {e.response.text}")
# ============================================================================
# INDIVIDUAL STAGE TOOLS (council_query removed to force stage-by-stage usage)
# ============================================================================
@mcp.tool()
async def council_stage1_collect(query: str) -> list[dict[str, str]]:
"""
Stage 1: Collect individual responses from all council models.
Queries all configured council models in parallel with the user's question.
Each model responds independently without seeing other responses.
Use this for granular control over the council process - you can analyze
Stage 1 results before deciding whether to proceed to ranking.
Args:
query: The user's question to send to all council models
Returns:
List of responses, each containing 'model' name and 'response' text
"""
result = await api_request(
"POST",
"/api/council/stage1",
{"query": query}
)
return result
@mcp.tool()
async def council_stage2_rank(
query: str,
stage1_results: list[dict]
) -> dict[str, Any]:
"""
Stage 2: Have each model rank the anonymized responses from Stage 1.
Responses are anonymized as "Response A", "Response B", etc. to prevent
models from playing favorites based on model identity.
Args:
query: The original user question
stage1_results: Results from council_stage1_collect containing model responses
Returns:
Object containing:
- rankings: List of {model, ranking (raw text), parsed_ranking (ordered list)}
- label_to_model: Mapping of anonymous labels to model names
- aggregate_rankings: Models sorted by average rank position
"""
result = await api_request(
"POST",
"/api/council/stage2",
{"query": query, "stage1_results": stage1_results}
)
return result
@mcp.tool()
async def council_stage3_synthesize(
query: str,
stage1_results: list[dict],
stage2_results: list[dict]
) -> dict[str, str]:
"""
Stage 3: Chairman model synthesizes final answer from all inputs.
The chairman receives all individual responses and peer rankings,
then produces a comprehensive final answer representing council consensus.
Args:
query: The original user question
stage1_results: Individual model responses from Stage 1
stage2_results: Peer rankings from Stage 2
Returns:
Chairman's synthesized response with 'model' and 'response' fields
"""
result = await api_request(
"POST",
"/api/council/stage3",
{
"query": query,
"stage1_results": stage1_results,
"stage2_results": stage2_results
}
)
return result
# ============================================================================
# CONVERSATION MANAGEMENT TOOLS
# ============================================================================
@mcp.tool()
async def council_conversation_create() -> dict[str, Any]:
"""
Create a new council conversation.
Returns:
New conversation with id, created_at, title, and empty messages array
"""
return await api_request("POST", "/api/conversations", {})
@mcp.tool()
async def council_conversation_list() -> list[dict[str, Any]]:
"""
List all council conversations (metadata only).
Returns conversations sorted by creation time, newest first.
Returns:
List of conversation metadata (id, created_at, title, message_count)
"""
return await api_request("GET", "/api/conversations")
@mcp.tool()
async def council_conversation_get(conversation_id: str) -> dict[str, Any]:
"""
Get a specific council conversation with all messages.
Messages include both user queries and assistant responses with all 3 stages.
Args:
conversation_id: The conversation ID to retrieve
Returns:
Full conversation object including all messages
"""
return await api_request("GET", f"/api/conversations/{conversation_id}")
# ============================================================================
# ENTRY POINT
# ============================================================================
def main():
"""Run the MCP server using stdio transport."""
# Log to stderr (important for stdio transport - stdout is for protocol)
print(f"Starting Council MCP server, proxying to {FASTAPI_URL}", file=sys.stderr)
mcp.run(transport="stdio")
if __name__ == "__main__":
main()