"""MCP Server for OpenAI Deep Research - Direct API integration.""" import os import sys import time import asyncio from typing import Any from mcp.server.fastmcp import FastMCP from openai import AsyncOpenAI # Hardcoded fallback model (cheaper/faster option) DEFAULT_MODEL = "o4-mini-deep-research-2025-06-26" # Available models for reference AVAILABLE_MODELS = [ "o4-mini-deep-research-2025-06-26", # Faster, cheaper (DEFAULT) "o3-deep-research-2025-06-26", # Thorough, ~$1+ per query ] # Configuration from environment with hardcoded fallbacks OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") DEEP_RESEARCH_MODEL = os.getenv("DEEP_RESEARCH_MODEL") or DEFAULT_MODEL POLL_INTERVAL = float(os.getenv("DEEP_RESEARCH_POLL_INTERVAL") or "5.0") # Create MCP server instance mcp = FastMCP("deep-research") # OpenAI client (lazy init) _client: AsyncOpenAI | None = None def get_openai_client() -> AsyncOpenAI: """Get or create the OpenAI client.""" global _client if _client is None: if not OPENAI_API_KEY: raise ValueError("OPENAI_API_KEY environment variable is required") _client = AsyncOpenAI(api_key=OPENAI_API_KEY) return _client def format_output(response) -> dict[str, Any]: """Extract structured output from completed response.""" output = { "report_text": "", "citations": [], "web_searches": 0, "code_executions": 0, } if not response.output: return output for item in response.output: item_type = getattr(item, "type", None) # Count tool calls if item_type == "web_search_call": output["web_searches"] += 1 elif item_type == "code_interpreter_call": output["code_executions"] += 1 # Extract final message content if item_type == "message": for content in getattr(item, "content", []): if getattr(content, "type", None) == "output_text": output["report_text"] = content.text # Extract citations for annotation in getattr(content, "annotations", []): if hasattr(annotation, "url"): output["citations"].append({ "title": getattr(annotation, "title", ""), "url": annotation.url, }) return output @mcp.tool() async def deep_research( query: str, system_prompt: str | None = None, include_code_analysis: bool = True, max_wait_minutes: int = 15, ) -> dict[str, Any]: """ Run OpenAI Deep Research on a query. This performs comprehensive web research using OpenAI's deep research models. The model is configured via DEEP_RESEARCH_MODEL env var (default: o4-mini). Deep research can take several minutes to complete as it: - Searches the web for relevant information - Optionally runs code to analyze data - Synthesizes findings into a structured report with citations Args: query: The research question or topic to investigate system_prompt: Optional instructions to guide the research focus include_code_analysis: Whether to allow code execution for data analysis max_wait_minutes: Maximum time to wait for completion (default: 15) Returns: Structured research report containing: - report_text: Full markdown report with findings - citations: List of sources with titles and URLs - web_searches: Number of web searches performed - code_executions: Number of code executions (if enabled) - elapsed_time: Total time taken in seconds - model: The model used for research """ client = get_openai_client() start_time = time.time() # Build input messages input_messages = [] if system_prompt: input_messages.append({ "role": "developer", "content": [{"type": "input_text", "text": system_prompt}] }) input_messages.append({ "role": "user", "content": [{"type": "input_text", "text": query}] }) # Build tools tools = [{"type": "web_search_preview"}] if include_code_analysis: tools.append({ "type": "code_interpreter", "container": {"type": "auto", "file_ids": []} }) try: # Start background research response = await client.responses.create( model=DEEP_RESEARCH_MODEL, input=input_messages, reasoning={"summary": "auto"}, tools=tools, background=True, ) # Poll for completion max_wait_seconds = max_wait_minutes * 60 while True: response = await client.responses.retrieve(response.id) if response.status == "completed": output = format_output(response) return { "status": "completed", "model": DEEP_RESEARCH_MODEL, "elapsed_time": time.time() - start_time, **output, } if response.status in ("failed", "cancelled"): return { "status": response.status, "model": DEEP_RESEARCH_MODEL, "elapsed_time": time.time() - start_time, "error": f"Research {response.status}", "report_text": "", "citations": [], "web_searches": 0, "code_executions": 0, } elapsed = time.time() - start_time if elapsed >= max_wait_seconds: return { "status": "timeout", "model": DEEP_RESEARCH_MODEL, "elapsed_time": elapsed, "error": f"Timeout after {max_wait_minutes} minutes", "report_text": "", "citations": [], "web_searches": 0, "code_executions": 0, } await asyncio.sleep(POLL_INTERVAL) except Exception as e: return { "status": "error", "model": DEEP_RESEARCH_MODEL, "elapsed_time": time.time() - start_time, "error": str(e), "report_text": "", "citations": [], "web_searches": 0, "code_executions": 0, } @mcp.tool() async def deep_research_info() -> dict[str, Any]: """ Get information about the deep research configuration. Returns: Configuration info including model, API status, and pricing notes """ has_api_key = bool(OPENAI_API_KEY) return { "model": DEEP_RESEARCH_MODEL, "default_model": DEFAULT_MODEL, "api_key_configured": has_api_key, "poll_interval_seconds": POLL_INTERVAL, "pricing_note": "Deep research costs vary. o4-mini is cheaper/faster, o3 is more thorough (~$1+ per query)", "available_models": AVAILABLE_MODELS, } def main(): """Run the MCP server using stdio transport.""" print(f"Starting Deep Research MCP server", file=sys.stderr) print(f" Model: {DEEP_RESEARCH_MODEL}", file=sys.stderr) print(f" API Key configured: {bool(OPENAI_API_KEY)}", file=sys.stderr) mcp.run(transport="stdio") if __name__ == "__main__": main()