- FastMCP server with deep_research and deep_research_info tools - OpenAI Responses API integration with background polling - Configurable model via DEEP_RESEARCH_MODEL env var - Default: o4-mini-deep-research (faster/cheaper) - Optional FastAPI backend for standalone use - Tested successfully: 80s query, 20 web searches, 4 citations
231 lines
7.3 KiB
Python
231 lines
7.3 KiB
Python
"""MCP Server for OpenAI Deep Research - Direct API integration."""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import asyncio
|
|
from typing import Any
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
from openai import AsyncOpenAI
|
|
|
|
# Hardcoded fallback model (cheaper/faster option)
|
|
DEFAULT_MODEL = "o4-mini-deep-research-2025-06-26"
|
|
|
|
# Available models for reference
|
|
AVAILABLE_MODELS = [
|
|
"o4-mini-deep-research-2025-06-26", # Faster, cheaper (DEFAULT)
|
|
"o3-deep-research-2025-06-26", # Thorough, ~$1+ per query
|
|
]
|
|
|
|
# Configuration from environment with hardcoded fallbacks
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
|
|
DEEP_RESEARCH_MODEL = os.getenv("DEEP_RESEARCH_MODEL") or DEFAULT_MODEL
|
|
POLL_INTERVAL = float(os.getenv("DEEP_RESEARCH_POLL_INTERVAL") or "5.0")
|
|
|
|
# Create MCP server instance
|
|
mcp = FastMCP("deep-research")
|
|
|
|
# OpenAI client (lazy init)
|
|
_client: AsyncOpenAI | None = None
|
|
|
|
|
|
def get_openai_client() -> AsyncOpenAI:
|
|
"""Get or create the OpenAI client."""
|
|
global _client
|
|
if _client is None:
|
|
if not OPENAI_API_KEY:
|
|
raise ValueError("OPENAI_API_KEY environment variable is required")
|
|
_client = AsyncOpenAI(api_key=OPENAI_API_KEY)
|
|
return _client
|
|
|
|
|
|
def format_output(response) -> dict[str, Any]:
|
|
"""Extract structured output from completed response."""
|
|
output = {
|
|
"report_text": "",
|
|
"citations": [],
|
|
"web_searches": 0,
|
|
"code_executions": 0,
|
|
}
|
|
|
|
if not response.output:
|
|
return output
|
|
|
|
for item in response.output:
|
|
item_type = getattr(item, "type", None)
|
|
|
|
# Count tool calls
|
|
if item_type == "web_search_call":
|
|
output["web_searches"] += 1
|
|
elif item_type == "code_interpreter_call":
|
|
output["code_executions"] += 1
|
|
|
|
# Extract final message content
|
|
if item_type == "message":
|
|
for content in getattr(item, "content", []):
|
|
if getattr(content, "type", None) == "output_text":
|
|
output["report_text"] = content.text
|
|
|
|
# Extract citations
|
|
for annotation in getattr(content, "annotations", []):
|
|
if hasattr(annotation, "url"):
|
|
output["citations"].append({
|
|
"title": getattr(annotation, "title", ""),
|
|
"url": annotation.url,
|
|
})
|
|
|
|
return output
|
|
|
|
|
|
@mcp.tool()
|
|
async def deep_research(
|
|
query: str,
|
|
system_prompt: str | None = None,
|
|
include_code_analysis: bool = True,
|
|
max_wait_minutes: int = 15,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Run OpenAI Deep Research on a query.
|
|
|
|
This performs comprehensive web research using OpenAI's deep research models.
|
|
The model is configured via DEEP_RESEARCH_MODEL env var (default: o4-mini).
|
|
|
|
Deep research can take several minutes to complete as it:
|
|
- Searches the web for relevant information
|
|
- Optionally runs code to analyze data
|
|
- Synthesizes findings into a structured report with citations
|
|
|
|
Args:
|
|
query: The research question or topic to investigate
|
|
system_prompt: Optional instructions to guide the research focus
|
|
include_code_analysis: Whether to allow code execution for data analysis
|
|
max_wait_minutes: Maximum time to wait for completion (default: 15)
|
|
|
|
Returns:
|
|
Structured research report containing:
|
|
- report_text: Full markdown report with findings
|
|
- citations: List of sources with titles and URLs
|
|
- web_searches: Number of web searches performed
|
|
- code_executions: Number of code executions (if enabled)
|
|
- elapsed_time: Total time taken in seconds
|
|
- model: The model used for research
|
|
"""
|
|
client = get_openai_client()
|
|
start_time = time.time()
|
|
|
|
# Build input messages
|
|
input_messages = []
|
|
if system_prompt:
|
|
input_messages.append({
|
|
"role": "developer",
|
|
"content": [{"type": "input_text", "text": system_prompt}]
|
|
})
|
|
input_messages.append({
|
|
"role": "user",
|
|
"content": [{"type": "input_text", "text": query}]
|
|
})
|
|
|
|
# Build tools
|
|
tools = [{"type": "web_search_preview"}]
|
|
if include_code_analysis:
|
|
tools.append({
|
|
"type": "code_interpreter",
|
|
"container": {"type": "auto", "file_ids": []}
|
|
})
|
|
|
|
try:
|
|
# Start background research
|
|
response = await client.responses.create(
|
|
model=DEEP_RESEARCH_MODEL,
|
|
input=input_messages,
|
|
reasoning={"summary": "auto"},
|
|
tools=tools,
|
|
background=True,
|
|
)
|
|
|
|
# Poll for completion
|
|
max_wait_seconds = max_wait_minutes * 60
|
|
while True:
|
|
response = await client.responses.retrieve(response.id)
|
|
|
|
if response.status == "completed":
|
|
output = format_output(response)
|
|
return {
|
|
"status": "completed",
|
|
"model": DEEP_RESEARCH_MODEL,
|
|
"elapsed_time": time.time() - start_time,
|
|
**output,
|
|
}
|
|
|
|
if response.status in ("failed", "cancelled"):
|
|
return {
|
|
"status": response.status,
|
|
"model": DEEP_RESEARCH_MODEL,
|
|
"elapsed_time": time.time() - start_time,
|
|
"error": f"Research {response.status}",
|
|
"report_text": "",
|
|
"citations": [],
|
|
"web_searches": 0,
|
|
"code_executions": 0,
|
|
}
|
|
|
|
elapsed = time.time() - start_time
|
|
if elapsed >= max_wait_seconds:
|
|
return {
|
|
"status": "timeout",
|
|
"model": DEEP_RESEARCH_MODEL,
|
|
"elapsed_time": elapsed,
|
|
"error": f"Timeout after {max_wait_minutes} minutes",
|
|
"report_text": "",
|
|
"citations": [],
|
|
"web_searches": 0,
|
|
"code_executions": 0,
|
|
}
|
|
|
|
await asyncio.sleep(POLL_INTERVAL)
|
|
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"model": DEEP_RESEARCH_MODEL,
|
|
"elapsed_time": time.time() - start_time,
|
|
"error": str(e),
|
|
"report_text": "",
|
|
"citations": [],
|
|
"web_searches": 0,
|
|
"code_executions": 0,
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
async def deep_research_info() -> dict[str, Any]:
|
|
"""
|
|
Get information about the deep research configuration.
|
|
|
|
Returns:
|
|
Configuration info including model, API status, and pricing notes
|
|
"""
|
|
has_api_key = bool(OPENAI_API_KEY)
|
|
|
|
return {
|
|
"model": DEEP_RESEARCH_MODEL,
|
|
"default_model": DEFAULT_MODEL,
|
|
"api_key_configured": has_api_key,
|
|
"poll_interval_seconds": POLL_INTERVAL,
|
|
"pricing_note": "Deep research costs vary. o4-mini is cheaper/faster, o3 is more thorough (~$1+ per query)",
|
|
"available_models": AVAILABLE_MODELS,
|
|
}
|
|
|
|
|
|
def main():
|
|
"""Run the MCP server using stdio transport."""
|
|
print(f"Starting Deep Research MCP server", file=sys.stderr)
|
|
print(f" Model: {DEEP_RESEARCH_MODEL}", file=sys.stderr)
|
|
print(f" API Key configured: {bool(OPENAI_API_KEY)}", file=sys.stderr)
|
|
mcp.run(transport="stdio")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|