Basic Streaming
Simple Streaming Example
TypeScript
import { Sudo } from "sudo-ai";
const sudo = new Sudo({
serverURL: "https://sudoapp.dev/api",
apiKey: process.env.SUDO_API_KEY ?? "",
});
async function simpleStreaming() {
try {
// Create a streaming response
const stream = await sudo.router.createStreaming({
model: "claude-sonnet-4-20250514",
messages: [
{ role: "user", content: "Write a short story about a robot" }
]
});
// Process the stream
console.log("AI Response:");
for await (const chunk of stream) {
if (chunk.data?.choices) {
for (const choice of chunk.data.choices) {
if (choice.delta?.content) {
process.stdout.write(choice.delta.content);
}
}
}
}
console.log(); // New line at the end
} catch (error) {
console.error("Streaming error:", error);
}
}
simpleStreaming();
Stream Event Processing
Detailed Event Handling
TypeScript
import { Sudo } from "sudo-ai";
const sudo = new Sudo({
serverURL: "https://sudoapp.dev/api",
apiKey: process.env.SUDO_API_KEY ?? "",
});
async function detailedStreamProcessing() {
try {
const stream = await sudo.router.createStreaming({
model: "gpt-4o",
messages: [
{ role: "user", content: "Explain machine learning algorithms" }
],
streamOptions: { includeUsage: true } // Include usage information
});
let fullResponse = "";
let totalTokens = 0;
for await (const chunk of stream) {
// Check for data
if (chunk.data) {
// Process choices
if (chunk.data.choices) {
for (const choice of chunk.data.choices) {
// Content delta
if (choice.delta?.content) {
const content = choice.delta.content;
process.stdout.write(content);
fullResponse += content;
}
// Finish reason
if (choice.finishReason) {
console.log(`\n[Finished: ${choice.finishReason}]`);
}
}
}
// Usage information (if available)
if (chunk.data.usage) {
totalTokens = chunk.data.usage.totalTokens;
console.log(`\n[Tokens used: ${totalTokens}]`);
}
}
}
return {
content: fullResponse,
tokens: totalTokens
};
} catch (error) {
console.error("Streaming error:", error);
return null;
}
}
detailedStreamProcessing().then(result => {
if (result) {
console.log(`\nCompleted. Total length: ${result.content.length} chars, ${result.tokens} tokens`);
}
});
Handling Different Event Types
TypeScript
import { Sudo } from "sudo-ai";
const sudo = new Sudo({
serverURL: "https://sudoapp.dev/api",
apiKey: process.env.SUDO_API_KEY ?? "",
});
async function comprehensiveStreamHandler() {
try {
const stream = await sudo.router.createStreaming({
model: "gpt-4o",
messages: [{ role: "user", content: "Write a poem about programming" }]
});
for await (const chunk of stream) {
if (chunk.data) {
// Model information
if (chunk.data.model) {
console.log(`[Model: ${chunk.data.model}]`);
}
// Process each choice
if (chunk.data.choices) {
chunk.data.choices.forEach((choice, i) => {
if (choice.delta) {
// Role information
if (choice.delta.role) {
console.log(`\n[Role: ${choice.delta.role}]`);
}
// Content
if (choice.delta.content) {
process.stdout.write(choice.delta.content);
}
// Tool calls (if any)
if (choice.delta.toolCalls) {
console.log(`\n[Tool calls: ${JSON.stringify(choice.delta.toolCalls)}]`);
}
}
// Finish information
if (choice.finishReason) {
console.log(`\n[Choice ${i} finished: ${choice.finishReason}]`);
}
});
}
}
}
} catch (error) {
console.error("Streaming error:", error);
}
}
comprehensiveStreamHandler();
Async Streaming
Basic Python Async Streaming
Python
import asyncio
import os
from sudo import Sudo
async def async_stream_chat():
async with Sudo(
server_url="https://sudoapp.dev/api",
api_key=os.getenv("SUDO_API_KEY"),
) as client:
stream = await client.router.create_streaming_async(
model="claude-sonnet-4-20250514",
messages=[
{"role": "user", "content": "Explain the benefits of async programming"}
]
)
print("Async streaming response:")
full_response = ""
async with stream as event_stream:
async for chunk in event_stream:
if chunk.data and chunk.data.choices:
for choice in chunk.data.choices:
if choice.delta and choice.delta.content:
content = choice.delta.content
print(content, end="", flush=True)
full_response += content
print() # New line
return full_response
# Run async streaming
response = asyncio.run(async_stream_chat())
print(f"Response length: {len(response)} characters")
Concurrent Streaming
TypeScript
import { Sudo } from "sudo-ai";
const sudo = new Sudo({
serverURL: "https://sudoapp.dev/api",
apiKey: process.env.SUDO_API_KEY ?? "",
});
async function streamSingleQuery(query: string, queryId: number): Promise<string> {
try {
console.log(`\n[Query ${queryId}] Starting: ${query}`);
const stream = await sudo.router.createStreaming({
model: "gpt-4o-mini",
messages: [{ role: "user", content: query }]
});
let response = "";
for await (const chunk of stream) {
if (chunk.data?.choices) {
for (const choice of chunk.data.choices) {
if (choice.delta?.content) {
const content = choice.delta.content;
response += content;
// Print with query ID prefix
process.stdout.write(`[Q${queryId}] ${content}`);
}
}
}
}
console.log(`\n[Query ${queryId}] Completed`);
return response;
} catch (error) {
console.error(`[Query ${queryId}] Error:`, error);
return "";
}
}
async function streamMultipleQueries(): Promise<void> {
try {
const queries = [
"What is TypeScript?",
"Explain JavaScript",
"What is machine learning?"
];
// Start all streams concurrently
const promises = queries.map((query, index) =>
streamSingleQuery(query, index + 1)
);
const responses = await Promise.all(promises);
console.log("\n\nAll queries completed!");
responses.forEach((response, index) => {
console.log(`Query ${index + 1} length: ${response.length} characters`);
});
} catch (error) {
console.error("Concurrent streaming error:", error);
}
}
streamMultipleQueries();
Error Handling and Recovery
Robust Streaming with Retry
TypeScript
import { Sudo } from "sudo-ai";
const sudo = new Sudo({
serverURL: "https://sudoapp.dev/api",
apiKey: process.env.SUDO_API_KEY ?? "",
});
async function robustStreamingWithRetry(
query: string,
maxRetries: number = 3,
backoffMs: number = 1000
): Promise<string | null> {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
console.log(`Attempt ${attempt}: Starting stream for "${query}"`);
const stream = await sudo.router.createStreaming({
model: "gpt-4o",
messages: [{ role: "user", content: query }],
maxCompletionTokens: 150
});
let response = "";
let lastChunkTime = Date.now();
const timeoutMs = 30000; // 30 second timeout
for await (const chunk of stream) {
lastChunkTime = Date.now();
if (chunk.data?.choices) {
for (const choice of chunk.data.choices) {
if (choice.delta?.content) {
const content = choice.delta.content;
response += content;
process.stdout.write(content);
}
if (choice.finishReason) {
console.log(`\nStream completed with reason: ${choice.finishReason}`);
return response;
}
}
}
// Check for timeout
if (Date.now() - lastChunkTime > timeoutMs) {
throw new Error("Stream timeout - no data received");
}
}
return response;
} catch (error) {
console.error(`\nAttempt ${attempt} failed:`, error);
if (attempt === maxRetries) {
console.error("All retry attempts exhausted");
return null;
}
// Exponential backoff
const delay = backoffMs * Math.pow(2, attempt - 1);
console.log(`Retrying in ${delay}ms...`);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
return null;
}
// Usage
async function robustStreamingExample() {
const response = await robustStreamingWithRetry(
"Explain the theory of relativity in simple terms"
);
if (response) {
console.log(`\nSuccessfully received response: ${response.length} characters`);
} else {
console.log("Failed to get response after all retry attempts");
}
}
robustStreamingExample();
Best Practices
Optimized Streaming Setup
TypeScript
import { Sudo } from "sudo-ai";
class OptimizedStreaming {
private sudo: Sudo;
private currentStream: any = null;
private isShuttingDown = false;
constructor() {
this.sudo = new Sudo({
serverURL: "https://sudoapp.dev/api",
apiKey: process.env.SUDO_API_KEY ?? "",
});
this.setupSignalHandlers();
}
private setupSignalHandlers(): void {
// Handle graceful shutdown
process.on('SIGINT', () => this.signalHandler('SIGINT'));
process.on('SIGTERM', () => this.signalHandler('SIGTERM'));
}
private signalHandler(signal: string): void {
if (this.isShuttingDown) return;
console.log(`\nReceived ${signal}. Shutting down gracefully...`);
this.isShuttingDown = true;
if (this.currentStream) {
try {
// Note: Actual stream cancellation depends on the specific implementation
this.currentStream = null;
} catch (error) {
console.error("Error during stream cleanup:", error);
}
}
process.exit(0);
}
async streamWithContext(message: string, model: string = "gpt-4o"): Promise<string | null> {
if (this.isShuttingDown) {
console.log("Cannot start stream: shutting down");
return null;
}
try {
const stream = await this.sudo.router.createStreaming({
model,
messages: [{ role: "user", content: message }],
streamOptions: { includeUsage: true }
});
this.currentStream = stream;
const responseParts: string[] = [];
for await (const chunk of stream) {
if (this.isShuttingDown) {
console.log("\nStreaming interrupted by shutdown signal");
break;
}
if (chunk.data?.choices) {
for (const choice of chunk.data.choices) {
if (choice.delta?.content) {
const content = choice.delta.content;
responseParts.push(content);
process.stdout.write(content);
}
}
}
}
console.log();
return responseParts.join('');
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
console.log("\nStreaming interrupted by user");
return null;
}
console.error(`\nStreaming error: ${error}`);
return null;
} finally {
this.currentStream = null;
}
}
// Graceful shutdown method
async shutdown(): Promise<void> {
this.isShuttingDown = true;
if (this.currentStream) {
this.currentStream = null;
}
}
}
// Usage
async function optimizedStreamingExample() {
const streamer = new OptimizedStreaming();
try {
const result = await streamer.streamWithContext(
"Write a comprehensive guide to TypeScript decorators"
);
if (result) {
console.log(`\nStreaming completed. Response length: ${result.length} characters`);
}
} catch (error) {
console.error("Streaming example error:", error);
}
}
optimizedStreamingExample();
Next Steps
Now that you understand streaming:- Try Tool Calling - Combine streaming with function calls
- Structured Output - Generate structured data
- CRUD Operations - Manage stored completions
Use streaming for long-form content generation and interactive applications. Remember to implement proper error handling and user interruption capabilities for the best user experience. Both TypeScript and Python SDKs provide excellent streaming support with robust error handling and async capabilities.