Streaming allows you to receive AI responses in real-time as they’re generated, providing a better user experience for long-form content generation and interactive applications.

Basic Streaming

Simple Streaming Example

TypeScript
import { Sudo } from "sudo-ai";

const sudo = new Sudo({
  serverURL: "https://sudoapp.dev/api",
  apiKey: process.env.SUDO_API_KEY ?? "",
});

async function simpleStreaming() {
  try {
    // Create a streaming response
    const stream = await sudo.router.createStreaming({
      model: "claude-sonnet-4-20250514",
      messages: [
        { role: "user", content: "Write a short story about a robot" }
      ]
    });
    
    // Process the stream
    console.log("AI Response:");
    for await (const chunk of stream) {
      if (chunk.data?.choices) {
        for (const choice of chunk.data.choices) {
          if (choice.delta?.content) {
            process.stdout.write(choice.delta.content);
          }
        }
      }
    }
    console.log(); // New line at the end
  } catch (error) {
    console.error("Streaming error:", error);
  }
}

simpleStreaming();

Stream Event Processing

Detailed Event Handling

TypeScript
import { Sudo } from "sudo-ai";

const sudo = new Sudo({
  serverURL: "https://sudoapp.dev/api",
  apiKey: process.env.SUDO_API_KEY ?? "",
});

async function detailedStreamProcessing() {
  try {
    const stream = await sudo.router.createStreaming({
      model: "gpt-4o",
      messages: [
        { role: "user", content: "Explain machine learning algorithms" }
      ],
      streamOptions: { includeUsage: true }  // Include usage information
    });
    
    let fullResponse = "";
    let totalTokens = 0;
    
    for await (const chunk of stream) {
      // Check for data
      if (chunk.data) {
        // Process choices
        if (chunk.data.choices) {
          for (const choice of chunk.data.choices) {
            // Content delta
            if (choice.delta?.content) {
              const content = choice.delta.content;
              process.stdout.write(content);
              fullResponse += content;
            }
            
            // Finish reason
            if (choice.finishReason) {
              console.log(`\n[Finished: ${choice.finishReason}]`);
            }
          }
        }
        
        // Usage information (if available)
        if (chunk.data.usage) {
          totalTokens = chunk.data.usage.totalTokens;
          console.log(`\n[Tokens used: ${totalTokens}]`);
        }
      }
    }
    
    return {
      content: fullResponse,
      tokens: totalTokens
    };
  } catch (error) {
    console.error("Streaming error:", error);
    return null;
  }
}

detailedStreamProcessing().then(result => {
  if (result) {
    console.log(`\nCompleted. Total length: ${result.content.length} chars, ${result.tokens} tokens`);
  }
});

Handling Different Event Types

TypeScript
import { Sudo } from "sudo-ai";

const sudo = new Sudo({
  serverURL: "https://sudoapp.dev/api",
  apiKey: process.env.SUDO_API_KEY ?? "",
});

async function comprehensiveStreamHandler() {
  try {
    const stream = await sudo.router.createStreaming({
      model: "gpt-4o",
      messages: [{ role: "user", content: "Write a poem about programming" }]
    });
    
    for await (const chunk of stream) {
      if (chunk.data) {
        // Model information
        if (chunk.data.model) {
          console.log(`[Model: ${chunk.data.model}]`);
        }
        
        // Process each choice
        if (chunk.data.choices) {
          chunk.data.choices.forEach((choice, i) => {
            if (choice.delta) {
              // Role information
              if (choice.delta.role) {
                console.log(`\n[Role: ${choice.delta.role}]`);
              }
              
              // Content
              if (choice.delta.content) {
                process.stdout.write(choice.delta.content);
              }
              
              // Tool calls (if any)
              if (choice.delta.toolCalls) {
                console.log(`\n[Tool calls: ${JSON.stringify(choice.delta.toolCalls)}]`);
              }
            }
            
            // Finish information
            if (choice.finishReason) {
              console.log(`\n[Choice ${i} finished: ${choice.finishReason}]`);
            }
          });
        }
      }
    }
  } catch (error) {
    console.error("Streaming error:", error);
  }
}

comprehensiveStreamHandler();

Async Streaming

Basic Python Async Streaming

Python
import asyncio
import os
from sudo import Sudo

async def async_stream_chat():
    async with Sudo(
        server_url="https://sudoapp.dev/api",
        api_key=os.getenv("SUDO_API_KEY"),
    ) as client:
        stream = await client.router.create_streaming_async(
            model="claude-sonnet-4-20250514",
            messages=[
                {"role": "user", "content": "Explain the benefits of async programming"}
            ]
        )
        
        print("Async streaming response:")
        full_response = ""
        
        async with stream as event_stream:
            async for chunk in event_stream:
                if chunk.data and chunk.data.choices:
                    for choice in chunk.data.choices:
                        if choice.delta and choice.delta.content:
                            content = choice.delta.content
                            print(content, end="", flush=True)
                            full_response += content
        
        print()  # New line
        return full_response

# Run async streaming
response = asyncio.run(async_stream_chat())
print(f"Response length: {len(response)} characters")

Concurrent Streaming

TypeScript
import { Sudo } from "sudo-ai";

const sudo = new Sudo({
  serverURL: "https://sudoapp.dev/api",
  apiKey: process.env.SUDO_API_KEY ?? "",
});

async function streamSingleQuery(query: string, queryId: number): Promise<string> {
  try {
    console.log(`\n[Query ${queryId}] Starting: ${query}`);
    
    const stream = await sudo.router.createStreaming({
      model: "gpt-4o-mini",
      messages: [{ role: "user", content: query }]
    });
    
    let response = "";
    for await (const chunk of stream) {
      if (chunk.data?.choices) {
        for (const choice of chunk.data.choices) {
          if (choice.delta?.content) {
            const content = choice.delta.content;
            response += content;
            // Print with query ID prefix
            process.stdout.write(`[Q${queryId}] ${content}`);
          }
        }
      }
    }
    
    console.log(`\n[Query ${queryId}] Completed`);
    return response;
  } catch (error) {
    console.error(`[Query ${queryId}] Error:`, error);
    return "";
  }
}

async function streamMultipleQueries(): Promise<void> {
  try {
    const queries = [
      "What is TypeScript?",
      "Explain JavaScript",
      "What is machine learning?"
    ];
    
    // Start all streams concurrently
    const promises = queries.map((query, index) => 
      streamSingleQuery(query, index + 1)
    );
    
    const responses = await Promise.all(promises);
    
    console.log("\n\nAll queries completed!");
    responses.forEach((response, index) => {
      console.log(`Query ${index + 1} length: ${response.length} characters`);
    });
  } catch (error) {
    console.error("Concurrent streaming error:", error);
  }
}

streamMultipleQueries();

Error Handling and Recovery

Robust Streaming with Retry

TypeScript
import { Sudo } from "sudo-ai";

const sudo = new Sudo({
  serverURL: "https://sudoapp.dev/api",
  apiKey: process.env.SUDO_API_KEY ?? "",
});

async function robustStreamingWithRetry(
  query: string, 
  maxRetries: number = 3,
  backoffMs: number = 1000
): Promise<string | null> {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      console.log(`Attempt ${attempt}: Starting stream for "${query}"`);
      
      const stream = await sudo.router.createStreaming({
        model: "gpt-4o",
        messages: [{ role: "user", content: query }],
        maxCompletionTokens: 150
      });
      
      let response = "";
      let lastChunkTime = Date.now();
      const timeoutMs = 30000; // 30 second timeout
      
      for await (const chunk of stream) {
        lastChunkTime = Date.now();
        
        if (chunk.data?.choices) {
          for (const choice of chunk.data.choices) {
            if (choice.delta?.content) {
              const content = choice.delta.content;
              response += content;
              process.stdout.write(content);
            }
            
            if (choice.finishReason) {
              console.log(`\nStream completed with reason: ${choice.finishReason}`);
              return response;
            }
          }
        }
        
        // Check for timeout
        if (Date.now() - lastChunkTime > timeoutMs) {
          throw new Error("Stream timeout - no data received");
        }
      }
      
      return response;
      
    } catch (error) {
      console.error(`\nAttempt ${attempt} failed:`, error);
      
      if (attempt === maxRetries) {
        console.error("All retry attempts exhausted");
        return null;
      }
      
      // Exponential backoff
      const delay = backoffMs * Math.pow(2, attempt - 1);
      console.log(`Retrying in ${delay}ms...`);
      await new Promise(resolve => setTimeout(resolve, delay));
    }
  }
  
  return null;
}

// Usage
async function robustStreamingExample() {
  const response = await robustStreamingWithRetry(
    "Explain the theory of relativity in simple terms"
  );

  if (response) {
    console.log(`\nSuccessfully received response: ${response.length} characters`);
  } else {
    console.log("Failed to get response after all retry attempts");
  }
}

robustStreamingExample();

Best Practices

Optimized Streaming Setup

TypeScript
import { Sudo } from "sudo-ai";

class OptimizedStreaming {
  private sudo: Sudo;
  private currentStream: any = null;
  private isShuttingDown = false;

  constructor() {
    this.sudo = new Sudo({
      serverURL: "https://sudoapp.dev/api",
      apiKey: process.env.SUDO_API_KEY ?? "",
    });
    this.setupSignalHandlers();
  }

  private setupSignalHandlers(): void {
    // Handle graceful shutdown
    process.on('SIGINT', () => this.signalHandler('SIGINT'));
    process.on('SIGTERM', () => this.signalHandler('SIGTERM'));
  }

  private signalHandler(signal: string): void {
    if (this.isShuttingDown) return;
    
    console.log(`\nReceived ${signal}. Shutting down gracefully...`);
    this.isShuttingDown = true;
    
    if (this.currentStream) {
      try {
        // Note: Actual stream cancellation depends on the specific implementation
        this.currentStream = null;
      } catch (error) {
        console.error("Error during stream cleanup:", error);
      }
    }
    
    process.exit(0);
  }

  async streamWithContext(message: string, model: string = "gpt-4o"): Promise<string | null> {
    if (this.isShuttingDown) {
      console.log("Cannot start stream: shutting down");
      return null;
    }

    try {
      const stream = await this.sudo.router.createStreaming({
        model,
        messages: [{ role: "user", content: message }],
        streamOptions: { includeUsage: true }
      });

      this.currentStream = stream;
      const responseParts: string[] = [];

      for await (const chunk of stream) {
        if (this.isShuttingDown) {
          console.log("\nStreaming interrupted by shutdown signal");
          break;
        }

        if (chunk.data?.choices) {
          for (const choice of chunk.data.choices) {
            if (choice.delta?.content) {
              const content = choice.delta.content;
              responseParts.push(content);
              process.stdout.write(content);
            }
          }
        }
      }

      console.log();
      return responseParts.join('');

    } catch (error) {
      if (error instanceof Error && error.name === 'AbortError') {
        console.log("\nStreaming interrupted by user");
        return null;
      }
      console.error(`\nStreaming error: ${error}`);
      return null;
    } finally {
      this.currentStream = null;
    }
  }

  // Graceful shutdown method
  async shutdown(): Promise<void> {
    this.isShuttingDown = true;
    if (this.currentStream) {
      this.currentStream = null;
    }
  }
}

// Usage
async function optimizedStreamingExample() {
  const streamer = new OptimizedStreaming();
  
  try {
    const result = await streamer.streamWithContext(
      "Write a comprehensive guide to TypeScript decorators"
    );

    if (result) {
      console.log(`\nStreaming completed. Response length: ${result.length} characters`);
    }
  } catch (error) {
    console.error("Streaming example error:", error);
  }
}

optimizedStreamingExample();

Next Steps

Now that you understand streaming:
  1. Try Tool Calling - Combine streaming with function calls
  2. Structured Output - Generate structured data
  3. CRUD Operations - Manage stored completions
Use streaming for long-form content generation and interactive applications. Remember to implement proper error handling and user interruption capabilities for the best user experience. Both TypeScript and Python SDKs provide excellent streaming support with robust error handling and async capabilities.