a bit more progressive update and single turn

This commit is contained in:
karpathy
2025-11-22 15:24:47 -08:00
parent 827bfd3d3e
commit 87b4a178ec
5 changed files with 283 additions and 39 deletions

View File

@@ -2,12 +2,15 @@
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import List, Dict, Any
import uuid
import json
import asyncio
from . import storage
from .council import run_full_council, generate_conversation_title
from .council import run_full_council, generate_conversation_title, stage1_collect_responses, stage2_collect_rankings, stage3_synthesize_final, calculate_aggregate_rankings
app = FastAPI(title="LLM Council API")
@@ -120,6 +123,77 @@ async def send_message(conversation_id: str, request: SendMessageRequest):
}
@app.post("/api/conversations/{conversation_id}/message/stream")
async def send_message_stream(conversation_id: str, request: SendMessageRequest):
"""
Send a message and stream the 3-stage council process.
Returns Server-Sent Events as each stage completes.
"""
# Check if conversation exists
conversation = storage.get_conversation(conversation_id)
if conversation is None:
raise HTTPException(status_code=404, detail="Conversation not found")
# Check if this is the first message
is_first_message = len(conversation["messages"]) == 0
async def event_generator():
try:
# Add user message
storage.add_user_message(conversation_id, request.content)
# Start title generation in parallel (don't await yet)
title_task = None
if is_first_message:
title_task = asyncio.create_task(generate_conversation_title(request.content))
# Stage 1: Collect responses
yield f"data: {json.dumps({'type': 'stage1_start'})}\n\n"
stage1_results = await stage1_collect_responses(request.content)
yield f"data: {json.dumps({'type': 'stage1_complete', 'data': stage1_results})}\n\n"
# Stage 2: Collect rankings
yield f"data: {json.dumps({'type': 'stage2_start'})}\n\n"
stage2_results, label_to_model = await stage2_collect_rankings(request.content, stage1_results)
aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model)
yield f"data: {json.dumps({'type': 'stage2_complete', 'data': stage2_results, 'metadata': {'label_to_model': label_to_model, 'aggregate_rankings': aggregate_rankings}})}\n\n"
# Stage 3: Synthesize final answer
yield f"data: {json.dumps({'type': 'stage3_start'})}\n\n"
stage3_result = await stage3_synthesize_final(request.content, stage1_results, stage2_results)
yield f"data: {json.dumps({'type': 'stage3_complete', 'data': stage3_result})}\n\n"
# Wait for title generation if it was started
if title_task:
title = await title_task
storage.update_conversation_title(conversation_id, title)
yield f"data: {json.dumps({'type': 'title_complete', 'data': {'title': title}})}\n\n"
# Save complete assistant message
storage.add_assistant_message(
conversation_id,
stage1_results,
stage2_results,
stage3_result
)
# Send completion event
yield f"data: {json.dumps({'type': 'complete'})}\n\n"
except Exception as e:
# Send error event
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)

View File

@@ -69,33 +69,114 @@ function App() {
messages: [...prev.messages, userMessage],
}));
// Send message and get council response
const response = await api.sendMessage(currentConversationId, content);
// Add assistant message to UI
// Create a partial assistant message that will be updated progressively
const assistantMessage = {
role: 'assistant',
stage1: response.stage1,
stage2: response.stage2,
stage3: response.stage3,
metadata: response.metadata,
stage1: null,
stage2: null,
stage3: null,
metadata: null,
loading: {
stage1: false,
stage2: false,
stage3: false,
},
};
// Add the partial assistant message
setCurrentConversation((prev) => ({
...prev,
messages: [...prev.messages, assistantMessage],
}));
// Reload conversations list to update message count
await loadConversations();
// Send message with streaming
await api.sendMessageStream(currentConversationId, content, (eventType, event) => {
switch (eventType) {
case 'stage1_start':
setCurrentConversation((prev) => {
const messages = [...prev.messages];
const lastMsg = messages[messages.length - 1];
lastMsg.loading.stage1 = true;
return { ...prev, messages };
});
break;
case 'stage1_complete':
setCurrentConversation((prev) => {
const messages = [...prev.messages];
const lastMsg = messages[messages.length - 1];
lastMsg.stage1 = event.data;
lastMsg.loading.stage1 = false;
return { ...prev, messages };
});
break;
case 'stage2_start':
setCurrentConversation((prev) => {
const messages = [...prev.messages];
const lastMsg = messages[messages.length - 1];
lastMsg.loading.stage2 = true;
return { ...prev, messages };
});
break;
case 'stage2_complete':
setCurrentConversation((prev) => {
const messages = [...prev.messages];
const lastMsg = messages[messages.length - 1];
lastMsg.stage2 = event.data;
lastMsg.metadata = event.metadata;
lastMsg.loading.stage2 = false;
return { ...prev, messages };
});
break;
case 'stage3_start':
setCurrentConversation((prev) => {
const messages = [...prev.messages];
const lastMsg = messages[messages.length - 1];
lastMsg.loading.stage3 = true;
return { ...prev, messages };
});
break;
case 'stage3_complete':
setCurrentConversation((prev) => {
const messages = [...prev.messages];
const lastMsg = messages[messages.length - 1];
lastMsg.stage3 = event.data;
lastMsg.loading.stage3 = false;
return { ...prev, messages };
});
break;
case 'title_complete':
// Reload conversations to get updated title
loadConversations();
break;
case 'complete':
// Stream complete, reload conversations list
loadConversations();
setIsLoading(false);
break;
case 'error':
console.error('Stream error:', event.message);
setIsLoading(false);
break;
default:
console.log('Unknown event type:', eventType);
}
});
} catch (error) {
console.error('Failed to send message:', error);
// Remove optimistic user message on error
// Remove optimistic messages on error
setCurrentConversation((prev) => ({
...prev,
messages: prev.messages.slice(0, -1),
messages: prev.messages.slice(0, -2),
}));
} finally {
setIsLoading(false);
}
};

View File

@@ -65,4 +65,51 @@ export const api = {
}
return response.json();
},
/**
* Send a message and receive streaming updates.
* @param {string} conversationId - The conversation ID
* @param {string} content - The message content
* @param {function} onEvent - Callback function for each event: (eventType, data) => void
* @returns {Promise<void>}
*/
async sendMessageStream(conversationId, content, onEvent) {
const response = await fetch(
`${API_BASE}/api/conversations/${conversationId}/message/stream`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ content }),
}
);
if (!response.ok) {
throw new Error('Failed to send message');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
try {
const event = JSON.parse(data);
onEvent(event.type, event);
} catch (e) {
console.error('Failed to parse SSE event:', e);
}
}
}
}
},
};

View File

@@ -71,6 +71,20 @@
font-size: 14px;
}
.stage-loading {
display: flex;
align-items: center;
gap: 12px;
padding: 16px;
margin: 12px 0;
background: #f9fafb;
border-radius: 8px;
border: 1px solid #e0e0e0;
color: #666;
font-size: 14px;
font-style: italic;
}
.spinner {
width: 20px;
height: 20px;

View File

@@ -71,13 +71,39 @@ export default function ChatInterface({
) : (
<div className="assistant-message">
<div className="message-label">LLM Council</div>
<Stage1 responses={msg.stage1} />
<Stage2
rankings={msg.stage2}
labelToModel={msg.metadata?.label_to_model}
aggregateRankings={msg.metadata?.aggregate_rankings}
/>
<Stage3 finalResponse={msg.stage3} />
{/* Stage 1 */}
{msg.loading?.stage1 && (
<div className="stage-loading">
<div className="spinner"></div>
<span>Running Stage 1: Collecting individual responses...</span>
</div>
)}
{msg.stage1 && <Stage1 responses={msg.stage1} />}
{/* Stage 2 */}
{msg.loading?.stage2 && (
<div className="stage-loading">
<div className="spinner"></div>
<span>Running Stage 2: Peer rankings...</span>
</div>
)}
{msg.stage2 && (
<Stage2
rankings={msg.stage2}
labelToModel={msg.metadata?.label_to_model}
aggregateRankings={msg.metadata?.aggregate_rankings}
/>
)}
{/* Stage 3 */}
{msg.loading?.stage3 && (
<div className="stage-loading">
<div className="spinner"></div>
<span>Running Stage 3: Final synthesis...</span>
</div>
)}
{msg.stage3 && <Stage3 finalResponse={msg.stage3} />}
</div>
)}
</div>
@@ -94,24 +120,26 @@ export default function ChatInterface({
<div ref={messagesEndRef} />
</div>
<form className="input-form" onSubmit={handleSubmit}>
<textarea
className="message-input"
placeholder="Ask your question... (Shift+Enter for new line, Enter to send)"
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyDown={handleKeyDown}
disabled={isLoading}
rows={3}
/>
<button
type="submit"
className="send-button"
disabled={!input.trim() || isLoading}
>
Send
</button>
</form>
{conversation.messages.length === 0 && (
<form className="input-form" onSubmit={handleSubmit}>
<textarea
className="message-input"
placeholder="Ask your question... (Shift+Enter for new line, Enter to send)"
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyDown={handleKeyDown}
disabled={isLoading}
rows={3}
/>
<button
type="submit"
className="send-button"
disabled={!input.trim() || isLoading}
>
Send
</button>
</form>
)}
</div>
);
}