"""OpenAI Responses API client for Deep Research.""" import asyncio import time from typing import Any from openai import AsyncOpenAI from .config import OPENAI_API_KEY, DEEP_RESEARCH_MODEL, POLL_INTERVAL_SECONDS class DeepResearchClient: """Client for OpenAI Deep Research via Responses API.""" def __init__(self): if not OPENAI_API_KEY: raise ValueError("OPENAI_API_KEY environment variable is required") self.client = AsyncOpenAI(api_key=OPENAI_API_KEY) self.model = DEEP_RESEARCH_MODEL async def start_research( self, query: str, system_prompt: str | None = None, include_code_analysis: bool = True, ) -> dict[str, Any]: """ Start a deep research task in background mode. Args: query: The research query system_prompt: Optional system/developer prompt include_code_analysis: Whether to include code_interpreter tool Returns: Response object with id for polling """ # Build input messages input_messages = [] if system_prompt: input_messages.append({ "role": "developer", "content": [{"type": "input_text", "text": system_prompt}] }) input_messages.append({ "role": "user", "content": [{"type": "input_text", "text": query}] }) # Build tools list tools = [{"type": "web_search_preview"}] if include_code_analysis: tools.append({ "type": "code_interpreter", "container": {"type": "auto", "file_ids": []} }) # Start background research response = await self.client.responses.create( model=self.model, input=input_messages, reasoning={"summary": "auto"}, tools=tools, background=True, # Run in background for long tasks ) return { "id": response.id, "status": response.status, "model": self.model, "created_at": time.time(), } async def poll_research(self, response_id: str) -> dict[str, Any]: """ Poll for research completion status. Args: response_id: The response ID from start_research Returns: Status dict with completion info """ response = await self.client.responses.retrieve(response_id) result = { "id": response.id, "status": response.status, "model": self.model, } if response.status == "completed": result["output"] = self._format_output(response) return result async def wait_for_completion( self, response_id: str, max_wait_minutes: int = 15, poll_interval: float = POLL_INTERVAL_SECONDS, ) -> dict[str, Any]: """ Wait for research to complete, polling at intervals. Args: response_id: The response ID from start_research max_wait_minutes: Maximum minutes to wait poll_interval: Seconds between polls Returns: Final result with report and citations """ start_time = time.time() max_wait_seconds = max_wait_minutes * 60 while True: result = await self.poll_research(response_id) if result["status"] == "completed": return result if result["status"] in ("failed", "cancelled"): return { **result, "error": f"Research {result['status']}", } elapsed = time.time() - start_time if elapsed >= max_wait_seconds: return { **result, "error": f"Timeout after {max_wait_minutes} minutes", "status": "timeout", } await asyncio.sleep(poll_interval) def _format_output(self, response) -> dict[str, Any]: """ Extract structured output from completed response. Returns: Dict with report_text, citations, and metadata """ output = { "report_text": "", "citations": [], "web_searches": 0, "code_executions": 0, } if not response.output: return output # Process output array for item in response.output: item_type = getattr(item, "type", None) # Count tool calls if item_type == "web_search_call": output["web_searches"] += 1 elif item_type == "code_interpreter_call": output["code_executions"] += 1 # Extract final message content if item_type == "message": for content in getattr(item, "content", []): if getattr(content, "type", None) == "output_text": output["report_text"] = content.text # Extract annotations/citations for annotation in getattr(content, "annotations", []): if hasattr(annotation, "url"): output["citations"].append({ "title": getattr(annotation, "title", ""), "url": annotation.url, "start_index": getattr(annotation, "start_index", 0), "end_index": getattr(annotation, "end_index", 0), }) return output # Singleton instance _client: DeepResearchClient | None = None def get_client() -> DeepResearchClient: """Get or create the singleton client instance.""" global _client if _client is None: _client = DeepResearchClient() return _client