Files
Krishna Kumar 9a6ac3fd2f Add OpenAI Deep Research MCP server
- FastMCP server with deep_research and deep_research_info tools
- OpenAI Responses API integration with background polling
- Configurable model via DEEP_RESEARCH_MODEL env var
- Default: o4-mini-deep-research (faster/cheaper)
- Optional FastAPI backend for standalone use
- Tested successfully: 80s query, 20 web searches, 4 citations
2025-12-30 16:00:37 -06:00

231 lines
7.3 KiB
Python

"""MCP Server for OpenAI Deep Research - Direct API integration."""
import os
import sys
import time
import asyncio
from typing import Any
from mcp.server.fastmcp import FastMCP
from openai import AsyncOpenAI
# Hardcoded fallback model (cheaper/faster option)
DEFAULT_MODEL = "o4-mini-deep-research-2025-06-26"
# Available models for reference
AVAILABLE_MODELS = [
"o4-mini-deep-research-2025-06-26", # Faster, cheaper (DEFAULT)
"o3-deep-research-2025-06-26", # Thorough, ~$1+ per query
]
# Configuration from environment with hardcoded fallbacks
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
DEEP_RESEARCH_MODEL = os.getenv("DEEP_RESEARCH_MODEL") or DEFAULT_MODEL
POLL_INTERVAL = float(os.getenv("DEEP_RESEARCH_POLL_INTERVAL") or "5.0")
# Create MCP server instance
mcp = FastMCP("deep-research")
# OpenAI client (lazy init)
_client: AsyncOpenAI | None = None
def get_openai_client() -> AsyncOpenAI:
"""Get or create the OpenAI client."""
global _client
if _client is None:
if not OPENAI_API_KEY:
raise ValueError("OPENAI_API_KEY environment variable is required")
_client = AsyncOpenAI(api_key=OPENAI_API_KEY)
return _client
def format_output(response) -> dict[str, Any]:
"""Extract structured output from completed response."""
output = {
"report_text": "",
"citations": [],
"web_searches": 0,
"code_executions": 0,
}
if not response.output:
return output
for item in response.output:
item_type = getattr(item, "type", None)
# Count tool calls
if item_type == "web_search_call":
output["web_searches"] += 1
elif item_type == "code_interpreter_call":
output["code_executions"] += 1
# Extract final message content
if item_type == "message":
for content in getattr(item, "content", []):
if getattr(content, "type", None) == "output_text":
output["report_text"] = content.text
# Extract citations
for annotation in getattr(content, "annotations", []):
if hasattr(annotation, "url"):
output["citations"].append({
"title": getattr(annotation, "title", ""),
"url": annotation.url,
})
return output
@mcp.tool()
async def deep_research(
query: str,
system_prompt: str | None = None,
include_code_analysis: bool = True,
max_wait_minutes: int = 15,
) -> dict[str, Any]:
"""
Run OpenAI Deep Research on a query.
This performs comprehensive web research using OpenAI's deep research models.
The model is configured via DEEP_RESEARCH_MODEL env var (default: o4-mini).
Deep research can take several minutes to complete as it:
- Searches the web for relevant information
- Optionally runs code to analyze data
- Synthesizes findings into a structured report with citations
Args:
query: The research question or topic to investigate
system_prompt: Optional instructions to guide the research focus
include_code_analysis: Whether to allow code execution for data analysis
max_wait_minutes: Maximum time to wait for completion (default: 15)
Returns:
Structured research report containing:
- report_text: Full markdown report with findings
- citations: List of sources with titles and URLs
- web_searches: Number of web searches performed
- code_executions: Number of code executions (if enabled)
- elapsed_time: Total time taken in seconds
- model: The model used for research
"""
client = get_openai_client()
start_time = time.time()
# Build input messages
input_messages = []
if system_prompt:
input_messages.append({
"role": "developer",
"content": [{"type": "input_text", "text": system_prompt}]
})
input_messages.append({
"role": "user",
"content": [{"type": "input_text", "text": query}]
})
# Build tools
tools = [{"type": "web_search_preview"}]
if include_code_analysis:
tools.append({
"type": "code_interpreter",
"container": {"type": "auto", "file_ids": []}
})
try:
# Start background research
response = await client.responses.create(
model=DEEP_RESEARCH_MODEL,
input=input_messages,
reasoning={"summary": "auto"},
tools=tools,
background=True,
)
# Poll for completion
max_wait_seconds = max_wait_minutes * 60
while True:
response = await client.responses.retrieve(response.id)
if response.status == "completed":
output = format_output(response)
return {
"status": "completed",
"model": DEEP_RESEARCH_MODEL,
"elapsed_time": time.time() - start_time,
**output,
}
if response.status in ("failed", "cancelled"):
return {
"status": response.status,
"model": DEEP_RESEARCH_MODEL,
"elapsed_time": time.time() - start_time,
"error": f"Research {response.status}",
"report_text": "",
"citations": [],
"web_searches": 0,
"code_executions": 0,
}
elapsed = time.time() - start_time
if elapsed >= max_wait_seconds:
return {
"status": "timeout",
"model": DEEP_RESEARCH_MODEL,
"elapsed_time": elapsed,
"error": f"Timeout after {max_wait_minutes} minutes",
"report_text": "",
"citations": [],
"web_searches": 0,
"code_executions": 0,
}
await asyncio.sleep(POLL_INTERVAL)
except Exception as e:
return {
"status": "error",
"model": DEEP_RESEARCH_MODEL,
"elapsed_time": time.time() - start_time,
"error": str(e),
"report_text": "",
"citations": [],
"web_searches": 0,
"code_executions": 0,
}
@mcp.tool()
async def deep_research_info() -> dict[str, Any]:
"""
Get information about the deep research configuration.
Returns:
Configuration info including model, API status, and pricing notes
"""
has_api_key = bool(OPENAI_API_KEY)
return {
"model": DEEP_RESEARCH_MODEL,
"default_model": DEFAULT_MODEL,
"api_key_configured": has_api_key,
"poll_interval_seconds": POLL_INTERVAL,
"pricing_note": "Deep research costs vary. o4-mini is cheaper/faster, o3 is more thorough (~$1+ per query)",
"available_models": AVAILABLE_MODELS,
}
def main():
"""Run the MCP server using stdio transport."""
print(f"Starting Deep Research MCP server", file=sys.stderr)
print(f" Model: {DEEP_RESEARCH_MODEL}", file=sys.stderr)
print(f" API Key configured: {bool(OPENAI_API_KEY)}", file=sys.stderr)
mcp.run(transport="stdio")
if __name__ == "__main__":
main()