- FastMCP server with deep_research and deep_research_info tools - OpenAI Responses API integration with background polling - Configurable model via DEEP_RESEARCH_MODEL env var - Default: o4-mini-deep-research (faster/cheaper) - Optional FastAPI backend for standalone use - Tested successfully: 80s query, 20 web searches, 4 citations
194 lines
5.9 KiB
Python
194 lines
5.9 KiB
Python
"""OpenAI Responses API client for Deep Research."""
|
|
|
|
import asyncio
|
|
import time
|
|
from typing import Any
|
|
from openai import AsyncOpenAI
|
|
|
|
from .config import OPENAI_API_KEY, DEEP_RESEARCH_MODEL, POLL_INTERVAL_SECONDS
|
|
|
|
|
|
class DeepResearchClient:
|
|
"""Client for OpenAI Deep Research via Responses API."""
|
|
|
|
def __init__(self):
|
|
if not OPENAI_API_KEY:
|
|
raise ValueError("OPENAI_API_KEY environment variable is required")
|
|
self.client = AsyncOpenAI(api_key=OPENAI_API_KEY)
|
|
self.model = DEEP_RESEARCH_MODEL
|
|
|
|
async def start_research(
|
|
self,
|
|
query: str,
|
|
system_prompt: str | None = None,
|
|
include_code_analysis: bool = True,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Start a deep research task in background mode.
|
|
|
|
Args:
|
|
query: The research query
|
|
system_prompt: Optional system/developer prompt
|
|
include_code_analysis: Whether to include code_interpreter tool
|
|
|
|
Returns:
|
|
Response object with id for polling
|
|
"""
|
|
# Build input messages
|
|
input_messages = []
|
|
if system_prompt:
|
|
input_messages.append({
|
|
"role": "developer",
|
|
"content": [{"type": "input_text", "text": system_prompt}]
|
|
})
|
|
input_messages.append({
|
|
"role": "user",
|
|
"content": [{"type": "input_text", "text": query}]
|
|
})
|
|
|
|
# Build tools list
|
|
tools = [{"type": "web_search_preview"}]
|
|
if include_code_analysis:
|
|
tools.append({
|
|
"type": "code_interpreter",
|
|
"container": {"type": "auto", "file_ids": []}
|
|
})
|
|
|
|
# Start background research
|
|
response = await self.client.responses.create(
|
|
model=self.model,
|
|
input=input_messages,
|
|
reasoning={"summary": "auto"},
|
|
tools=tools,
|
|
background=True, # Run in background for long tasks
|
|
)
|
|
|
|
return {
|
|
"id": response.id,
|
|
"status": response.status,
|
|
"model": self.model,
|
|
"created_at": time.time(),
|
|
}
|
|
|
|
async def poll_research(self, response_id: str) -> dict[str, Any]:
|
|
"""
|
|
Poll for research completion status.
|
|
|
|
Args:
|
|
response_id: The response ID from start_research
|
|
|
|
Returns:
|
|
Status dict with completion info
|
|
"""
|
|
response = await self.client.responses.retrieve(response_id)
|
|
|
|
result = {
|
|
"id": response.id,
|
|
"status": response.status,
|
|
"model": self.model,
|
|
}
|
|
|
|
if response.status == "completed":
|
|
result["output"] = self._format_output(response)
|
|
|
|
return result
|
|
|
|
async def wait_for_completion(
|
|
self,
|
|
response_id: str,
|
|
max_wait_minutes: int = 15,
|
|
poll_interval: float = POLL_INTERVAL_SECONDS,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Wait for research to complete, polling at intervals.
|
|
|
|
Args:
|
|
response_id: The response ID from start_research
|
|
max_wait_minutes: Maximum minutes to wait
|
|
poll_interval: Seconds between polls
|
|
|
|
Returns:
|
|
Final result with report and citations
|
|
"""
|
|
start_time = time.time()
|
|
max_wait_seconds = max_wait_minutes * 60
|
|
|
|
while True:
|
|
result = await self.poll_research(response_id)
|
|
|
|
if result["status"] == "completed":
|
|
return result
|
|
|
|
if result["status"] in ("failed", "cancelled"):
|
|
return {
|
|
**result,
|
|
"error": f"Research {result['status']}",
|
|
}
|
|
|
|
elapsed = time.time() - start_time
|
|
if elapsed >= max_wait_seconds:
|
|
return {
|
|
**result,
|
|
"error": f"Timeout after {max_wait_minutes} minutes",
|
|
"status": "timeout",
|
|
}
|
|
|
|
await asyncio.sleep(poll_interval)
|
|
|
|
def _format_output(self, response) -> dict[str, Any]:
|
|
"""
|
|
Extract structured output from completed response.
|
|
|
|
Returns:
|
|
Dict with report_text, citations, and metadata
|
|
"""
|
|
output = {
|
|
"report_text": "",
|
|
"citations": [],
|
|
"web_searches": 0,
|
|
"code_executions": 0,
|
|
}
|
|
|
|
if not response.output:
|
|
return output
|
|
|
|
# Process output array
|
|
for item in response.output:
|
|
item_type = getattr(item, "type", None)
|
|
|
|
# Count tool calls
|
|
if item_type == "web_search_call":
|
|
output["web_searches"] += 1
|
|
elif item_type == "code_interpreter_call":
|
|
output["code_executions"] += 1
|
|
|
|
# Extract final message content
|
|
if item_type == "message":
|
|
for content in getattr(item, "content", []):
|
|
if getattr(content, "type", None) == "output_text":
|
|
output["report_text"] = content.text
|
|
|
|
# Extract annotations/citations
|
|
for annotation in getattr(content, "annotations", []):
|
|
if hasattr(annotation, "url"):
|
|
output["citations"].append({
|
|
"title": getattr(annotation, "title", ""),
|
|
"url": annotation.url,
|
|
"start_index": getattr(annotation, "start_index", 0),
|
|
"end_index": getattr(annotation, "end_index", 0),
|
|
})
|
|
|
|
return output
|
|
|
|
|
|
# Singleton instance
|
|
_client: DeepResearchClient | None = None
|
|
|
|
|
|
def get_client() -> DeepResearchClient:
|
|
"""Get or create the singleton client instance."""
|
|
global _client
|
|
if _client is None:
|
|
_client = DeepResearchClient()
|
|
return _client
|