Add iOS JSON capture utility
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -18,4 +18,4 @@ data/
|
||||
# Frontend
|
||||
frontend/node_modules/
|
||||
frontend/dist/
|
||||
frontend/.vite/
|
||||
frontend/.vite/.railway/
|
||||
|
||||
72
backend/capture.py
Normal file
72
backend/capture.py
Normal file
@@ -0,0 +1,72 @@
|
||||
"""JSON capture utilities for iOS development."""
|
||||
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Any
|
||||
|
||||
# Output directory for captured JSON
|
||||
CAPTURE_DIR = os.path.join(os.path.dirname(__file__), "..", "ios_json_samples")
|
||||
|
||||
|
||||
def _ensure_capture_dir():
|
||||
"""Ensure the capture directory exists."""
|
||||
os.makedirs(CAPTURE_DIR, exist_ok=True)
|
||||
|
||||
|
||||
def _save_json(filename: str, data: Any):
|
||||
"""Save data as JSON file."""
|
||||
_ensure_capture_dir()
|
||||
filepath = os.path.join(CAPTURE_DIR, filename)
|
||||
with open(filepath, "w") as f:
|
||||
json.dump(data, f, indent=2, default=str)
|
||||
print(f"[Capture] Saved: {filepath}")
|
||||
|
||||
|
||||
def capture_conversation_list(conversations: List[Dict[str, Any]]):
|
||||
"""Capture conversation list response."""
|
||||
_save_json("conversation_list.json", conversations)
|
||||
|
||||
|
||||
def capture_conversation(conversation: Dict[str, Any]):
|
||||
"""Capture single conversation response."""
|
||||
_save_json(f"conversation_{conversation.get('id', 'unknown')[:8]}.json", conversation)
|
||||
|
||||
|
||||
def capture_full_response(stage1: List[Dict], stage2: List[Dict], stage3: Dict, metadata: Dict):
|
||||
"""Capture full council response."""
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
_save_json(f"full_response_{timestamp}.json", {
|
||||
"stage1": stage1,
|
||||
"stage2": stage2,
|
||||
"stage3": stage3,
|
||||
"metadata": metadata
|
||||
})
|
||||
|
||||
|
||||
def capture_sse_events(events: List[Dict[str, Any]]):
|
||||
"""Capture SSE stream events."""
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
_save_json(f"sse_events_{timestamp}.json", events)
|
||||
|
||||
|
||||
def capture_stage1(results: List[Dict[str, Any]]):
|
||||
"""Capture Stage 1 results."""
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
_save_json(f"stage1_{timestamp}.json", results)
|
||||
|
||||
|
||||
def capture_stage2(rankings: List[Dict], label_to_model: Dict, aggregate_rankings: List[Dict]):
|
||||
"""Capture Stage 2 results."""
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
_save_json(f"stage2_{timestamp}.json", {
|
||||
"rankings": rankings,
|
||||
"label_to_model": label_to_model,
|
||||
"aggregate_rankings": aggregate_rankings
|
||||
})
|
||||
|
||||
|
||||
def capture_stage3(result: Dict[str, Any]):
|
||||
"""Capture Stage 3 result."""
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
_save_json(f"stage3_{timestamp}.json", result)
|
||||
Reference in New Issue
Block a user