> ## Documentation Index
> Fetch the complete documentation index at: https://docs.sudoapp.dev/llms.txt
> Use this file to discover all available pages before exploring further.

# Streaming

> Real-time streaming responses with server-sent events

Streaming allows you to receive AI responses in real-time as they're generated, providing a better user experience for long-form content generation and interactive applications.

## Basic Streaming

### Simple Streaming Example

<CodeGroup dropdown>
  ```typescript TypeScript theme={null}
  import { Sudo } from "sudo-ai";

  const sudo = new Sudo({
    serverURL: "https://sudoapp.dev/api",
    apiKey: process.env.SUDO_API_KEY ?? "",
  });

  async function simpleStreaming() {
    try {
      // Create a streaming response
      const stream = await sudo.router.createStreaming({
        model: "claude-sonnet-4-20250514",
        messages: [
          { role: "user", content: "Write a short story about a robot" }
        ]
      });
      
      // Process the stream
      console.log("AI Response:");
      for await (const chunk of stream) {
        if (chunk.data?.choices) {
          for (const choice of chunk.data.choices) {
            if (choice.delta?.content) {
              process.stdout.write(choice.delta.content);
            }
          }
        }
      }
      console.log(); // New line at the end
    } catch (error) {
      console.error("Streaming error:", error);
    }
  }

  simpleStreaming();
  ```

  ```python Python theme={null}
  import os
  from sudo import Sudo

  with Sudo(
      server_url="https://sudoapp.dev/api",
      api_key=os.getenv("SUDO_API_KEY"),
  ) as client:
      # Create a streaming response
      stream = client.router.create_streaming(
          model="claude-sonnet-4-20250514",
          messages=[
              {"role": "user", "content": "Write a short story about a robot"}
          ]
      )
      
      # Process the stream
      print("AI Response:")
      with stream as event_stream:
          for chunk in event_stream:
              if chunk.data and chunk.data.choices:
                  for choice in chunk.data.choices:
                      if choice.delta and choice.delta.content:
                          print(choice.delta.content, end="", flush=True)
          print()  # New line at the end
  ```
</CodeGroup>

## Stream Event Processing

### Detailed Event Handling

<CodeGroup dropdown>
  ```typescript TypeScript theme={null}
  import { Sudo } from "sudo-ai";

  const sudo = new Sudo({
    serverURL: "https://sudoapp.dev/api",
    apiKey: process.env.SUDO_API_KEY ?? "",
  });

  async function detailedStreamProcessing() {
    try {
      const stream = await sudo.router.createStreaming({
        model: "gpt-4o",
        messages: [
          { role: "user", content: "Explain machine learning algorithms" }
        ],
        streamOptions: { includeUsage: true }  // Include usage information
      });
      
      let fullResponse = "";
      let totalTokens = 0;
      
      for await (const chunk of stream) {
        // Check for data
        if (chunk.data) {
          // Process choices
          if (chunk.data.choices) {
            for (const choice of chunk.data.choices) {
              // Content delta
              if (choice.delta?.content) {
                const content = choice.delta.content;
                process.stdout.write(content);
                fullResponse += content;
              }
              
              // Finish reason
              if (choice.finishReason) {
                console.log(`\n[Finished: ${choice.finishReason}]`);
              }
            }
          }
          
          // Usage information (if available)
          if (chunk.data.usage) {
            totalTokens = chunk.data.usage.totalTokens;
            console.log(`\n[Tokens used: ${totalTokens}]`);
          }
        }
      }
      
      return {
        content: fullResponse,
        tokens: totalTokens
      };
    } catch (error) {
      console.error("Streaming error:", error);
      return null;
    }
  }

  detailedStreamProcessing().then(result => {
    if (result) {
      console.log(`\nCompleted. Total length: ${result.content.length} chars, ${result.tokens} tokens`);
    }
  });
  ```

  ```python Python theme={null}
  import os
  import json
  from sudo import Sudo

  def detailed_stream_processing():
      with Sudo(
          server_url="https://sudoapp.dev/api",
          api_key=os.getenv("SUDO_API_KEY"),
      ) as client:
          stream = client.router.create_streaming(
              model="gpt-4o",
              messages=[
                  {"role": "user", "content": "Explain machine learning algorithms"}
              ],
              stream_options={"include_usage": True}  # Include usage information
          )
          
          full_response = ""
          total_tokens = 0
          
          with stream as event_stream:
              for chunk in event_stream:
                  # Check for data
                  if chunk.data:
                      # Process choices
                      if chunk.data.choices:
                          for choice in chunk.data.choices:
                              # Content delta
                              if choice.delta and choice.delta.content:
                                  content = choice.delta.content
                                  print(content, end="", flush=True)
                                  full_response += content
                              
                              # Finish reason
                              if choice.finish_reason:
                                  print(f"\n[Finished: {choice.finish_reason}]")
                      
                      # Usage information (if available)
                      if chunk.data.usage:
                          total_tokens = chunk.data.usage.total_tokens
                          print(f"\n[Tokens used: {total_tokens}]")
          
          return {
              "content": full_response,
              "tokens": total_tokens
          }

  result = detailed_stream_processing()
  print(f"\nCompleted. Total length: {len(result['content'])} chars, {result['tokens']} tokens")
  ```
</CodeGroup>

### Handling Different Event Types

<CodeGroup dropdown>
  ```typescript TypeScript theme={null}
  import { Sudo } from "sudo-ai";

  const sudo = new Sudo({
    serverURL: "https://sudoapp.dev/api",
    apiKey: process.env.SUDO_API_KEY ?? "",
  });

  async function comprehensiveStreamHandler() {
    try {
      const stream = await sudo.router.createStreaming({
        model: "gpt-4o",
        messages: [{ role: "user", content: "Write a poem about programming" }]
      });
      
      for await (const chunk of stream) {
        if (chunk.data) {
          // Model information
          if (chunk.data.model) {
            console.log(`[Model: ${chunk.data.model}]`);
          }
          
          // Process each choice
          if (chunk.data.choices) {
            chunk.data.choices.forEach((choice, i) => {
              if (choice.delta) {
                // Role information
                if (choice.delta.role) {
                  console.log(`\n[Role: ${choice.delta.role}]`);
                }
                
                // Content
                if (choice.delta.content) {
                  process.stdout.write(choice.delta.content);
                }
                
                // Tool calls (if any)
                if (choice.delta.toolCalls) {
                  console.log(`\n[Tool calls: ${JSON.stringify(choice.delta.toolCalls)}]`);
                }
              }
              
              // Finish information
              if (choice.finishReason) {
                console.log(`\n[Choice ${i} finished: ${choice.finishReason}]`);
              }
            });
          }
        }
      }
    } catch (error) {
      console.error("Streaming error:", error);
    }
  }

  comprehensiveStreamHandler();
  ```

  ```python Python theme={null}
  import os
  from sudo import Sudo

  def comprehensive_stream_handler():
      with Sudo(
          server_url="https://sudoapp.dev/api",
          api_key=os.getenv("SUDO_API_KEY"),
      ) as client:
          stream = client.router.create_streaming(
              model="gpt-4o",
              messages=[{"role": "user", "content": "Write a poem about programming"}]
          )
          
          with stream as event_stream:
              for chunk in event_stream:
                  if chunk.data:
                      # Model information
                      if hasattr(chunk.data, 'model') and chunk.data.model:
                          print(f"[Model: {chunk.data.model}]")
                      
                      # Process each choice
                      if chunk.data.choices:
                          for i, choice in enumerate(chunk.data.choices):
                              if choice.delta:
                                  # Role information
                                  if choice.delta.role:
                                      print(f"\n[Role: {choice.delta.role}]")
                                  
                                  # Content
                                  if choice.delta.content:
                                      print(choice.delta.content, end="", flush=True)
                                  
                                  # Tool calls (if any)
                                  if choice.delta.tool_calls:
                                      print(f"\n[Tool calls: {choice.delta.tool_calls}]")
                              
                              # Finish information
                              if choice.finish_reason:
                                  print(f"\n[Choice {i} finished: {choice.finish_reason}]")

  comprehensive_stream_handler()
  ```
</CodeGroup>

## Async Streaming

### Basic Python Async Streaming

<CodeGroup dropdown>
  ```python Python theme={null}
  import asyncio
  import os
  from sudo import Sudo

  async def async_stream_chat():
      async with Sudo(
          server_url="https://sudoapp.dev/api",
          api_key=os.getenv("SUDO_API_KEY"),
      ) as client:
          stream = await client.router.create_streaming_async(
              model="claude-sonnet-4-20250514",
              messages=[
                  {"role": "user", "content": "Explain the benefits of async programming"}
              ]
          )
          
          print("Async streaming response:")
          full_response = ""
          
          async with stream as event_stream:
              async for chunk in event_stream:
                  if chunk.data and chunk.data.choices:
                      for choice in chunk.data.choices:
                          if choice.delta and choice.delta.content:
                              content = choice.delta.content
                              print(content, end="", flush=True)
                              full_response += content
          
          print()  # New line
          return full_response

  # Run async streaming
  response = asyncio.run(async_stream_chat())
  print(f"Response length: {len(response)} characters")
  ```
</CodeGroup>

### Concurrent Streaming

<CodeGroup dropdown>
  ```typescript TypeScript theme={null}
  import { Sudo } from "sudo-ai";

  const sudo = new Sudo({
    serverURL: "https://sudoapp.dev/api",
    apiKey: process.env.SUDO_API_KEY ?? "",
  });

  async function streamSingleQuery(query: string, queryId: number): Promise<string> {
    try {
      console.log(`\n[Query ${queryId}] Starting: ${query}`);
      
      const stream = await sudo.router.createStreaming({
        model: "gpt-4o-mini",
        messages: [{ role: "user", content: query }]
      });
      
      let response = "";
      for await (const chunk of stream) {
        if (chunk.data?.choices) {
          for (const choice of chunk.data.choices) {
            if (choice.delta?.content) {
              const content = choice.delta.content;
              response += content;
              // Print with query ID prefix
              process.stdout.write(`[Q${queryId}] ${content}`);
            }
          }
        }
      }
      
      console.log(`\n[Query ${queryId}] Completed`);
      return response;
    } catch (error) {
      console.error(`[Query ${queryId}] Error:`, error);
      return "";
    }
  }

  async function streamMultipleQueries(): Promise<void> {
    try {
      const queries = [
        "What is TypeScript?",
        "Explain JavaScript",
        "What is machine learning?"
      ];
      
      // Start all streams concurrently
      const promises = queries.map((query, index) => 
        streamSingleQuery(query, index + 1)
      );
      
      const responses = await Promise.all(promises);
      
      console.log("\n\nAll queries completed!");
      responses.forEach((response, index) => {
        console.log(`Query ${index + 1} length: ${response.length} characters`);
      });
    } catch (error) {
      console.error("Concurrent streaming error:", error);
    }
  }

  streamMultipleQueries();
  ```

  ```python Python theme={null}
  import asyncio
  import os
  from sudo import Sudo

  async def stream_multiple_queries():
      """Stream responses to multiple queries concurrently"""
      
      async def stream_single_query(client, query, query_id):
          """Stream a single query"""
          print(f"\n[Query {query_id}] Starting: {query}")
          
          stream = await client.router.create_streaming_async(
              model="gpt-4o-mini",
              messages=[{"role": "user", "content": query}]
          )
          
          response = ""
          async with stream as event_stream:
              async for chunk in event_stream:
                  if chunk.data and chunk.data.choices:
                      for choice in chunk.data.choices:
                          if choice.delta and choice.delta.content:
                              content = choice.delta.content
                              response += content
                              # Print with query ID prefix
                              print(f"[Q{query_id}] {content}", end="", flush=True)
          
          print(f"\n[Query {query_id}] Completed")
          return response
      
      async with Sudo(
          server_url="https://sudoapp.dev/api",
          api_key=os.getenv("SUDO_API_KEY"),
      ) as client:
          
          queries = [
              "What is Python?",
              "Explain JavaScript",
              "What is machine learning?"
          ]
          
          # Start all streams concurrently
          tasks = [
              stream_single_query(client, query, i+1) 
              for i, query in enumerate(queries)
          ]
          
          responses = await asyncio.gather(*tasks)
          
          print("\n\nAll queries completed!")
          for i, response in enumerate(responses):
              print(f"Query {i+1} length: {len(response)} characters")

  asyncio.run(stream_multiple_queries())
  ```
</CodeGroup>

## Error Handling and Recovery

### Robust Streaming with Retry

<CodeGroup dropdown>
  ```typescript TypeScript theme={null}
  import { Sudo } from "sudo-ai";

  const sudo = new Sudo({
    serverURL: "https://sudoapp.dev/api",
    apiKey: process.env.SUDO_API_KEY ?? "",
  });

  async function robustStreamingWithRetry(
    query: string, 
    maxRetries: number = 3,
    backoffMs: number = 1000
  ): Promise<string | null> {
    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      try {
        console.log(`Attempt ${attempt}: Starting stream for "${query}"`);
        
        const stream = await sudo.router.createStreaming({
          model: "gpt-4o",
          messages: [{ role: "user", content: query }],
          maxCompletionTokens: 150
        });
        
        let response = "";
        let lastChunkTime = Date.now();
        const timeoutMs = 30000; // 30 second timeout
        
        for await (const chunk of stream) {
          lastChunkTime = Date.now();
          
          if (chunk.data?.choices) {
            for (const choice of chunk.data.choices) {
              if (choice.delta?.content) {
                const content = choice.delta.content;
                response += content;
                process.stdout.write(content);
              }
              
              if (choice.finishReason) {
                console.log(`\nStream completed with reason: ${choice.finishReason}`);
                return response;
              }
            }
          }
          
          // Check for timeout
          if (Date.now() - lastChunkTime > timeoutMs) {
            throw new Error("Stream timeout - no data received");
          }
        }
        
        return response;
        
      } catch (error) {
        console.error(`\nAttempt ${attempt} failed:`, error);
        
        if (attempt === maxRetries) {
          console.error("All retry attempts exhausted");
          return null;
        }
        
        // Exponential backoff
        const delay = backoffMs * Math.pow(2, attempt - 1);
        console.log(`Retrying in ${delay}ms...`);
        await new Promise(resolve => setTimeout(resolve, delay));
      }
    }
    
    return null;
  }

  // Usage
  async function robustStreamingExample() {
    const response = await robustStreamingWithRetry(
      "Explain the theory of relativity in simple terms"
    );

    if (response) {
      console.log(`\nSuccessfully received response: ${response.length} characters`);
    } else {
      console.log("Failed to get response after all retry attempts");
    }
  }

  robustStreamingExample();
  ```

  ```python Python theme={null}
  import os
  import asyncio
  import time
  from sudo import Sudo

  async def robust_streaming_with_retry(query, max_retries=3, backoff_seconds=1):
      """Stream with automatic retry and exponential backoff"""
      
      for attempt in range(1, max_retries + 1):
          try:
              print(f"Attempt {attempt}: Starting stream for '{query}'")
              
              async with Sudo(
                  server_url="https://sudoapp.dev/api",
                  api_key=os.getenv("SUDO_API_KEY"),
              ) as client:
                  
                  stream = await client.router.create_streaming_async(
                      model="gpt-4o",
                      messages=[{"role": "user", "content": query}],
                      max_completion_tokens=150
                  )
                  
                  response = ""
                  last_chunk_time = time.time()
                  timeout_seconds = 30  # 30 second timeout
                  
                  async with stream as event_stream:
                      async for chunk in event_stream:
                          last_chunk_time = time.time()
                          
                          if chunk.data and chunk.data.choices:
                              for choice in chunk.data.choices:
                                  if choice.delta and choice.delta.content:
                                      content = choice.delta.content
                                      response += content
                                      print(content, end="", flush=True)
                                  
                                  if choice.finish_reason:
                                      print(f"\nStream completed with reason: {choice.finish_reason}")
                                      return response
                          
                          # Check for timeout
                          if time.time() - last_chunk_time > timeout_seconds:
                              raise Exception("Stream timeout - no data received")
                  
                  return response
                  
          except Exception as e:
              print(f"\nAttempt {attempt} failed: {e}")
              
              if attempt == max_retries:
                  print("All retry attempts exhausted")
                  return None
              
              # Exponential backoff
              delay = backoff_seconds * (2 ** (attempt - 1))
              print(f"Retrying in {delay} seconds...")
              await asyncio.sleep(delay)
      
      return None

  # Usage
  response = asyncio.run(robust_streaming_with_retry(
      "Explain the theory of relativity in simple terms"
  ))

  if response:
      print(f"\nSuccessfully received response: {len(response)} characters")
  else:
      print("Failed to get response after all retry attempts")
  ```
</CodeGroup>

## Best Practices

### Optimized Streaming Setup

<CodeGroup dropdown>
  ```typescript TypeScript theme={null}
  import { Sudo } from "sudo-ai";

  class OptimizedStreaming {
    private sudo: Sudo;
    private currentStream: any = null;
    private isShuttingDown = false;

    constructor() {
      this.sudo = new Sudo({
        serverURL: "https://sudoapp.dev/api",
        apiKey: process.env.SUDO_API_KEY ?? "",
      });
      this.setupSignalHandlers();
    }

    private setupSignalHandlers(): void {
      // Handle graceful shutdown
      process.on('SIGINT', () => this.signalHandler('SIGINT'));
      process.on('SIGTERM', () => this.signalHandler('SIGTERM'));
    }

    private signalHandler(signal: string): void {
      if (this.isShuttingDown) return;
      
      console.log(`\nReceived ${signal}. Shutting down gracefully...`);
      this.isShuttingDown = true;
      
      if (this.currentStream) {
        try {
          // Note: Actual stream cancellation depends on the specific implementation
          this.currentStream = null;
        } catch (error) {
          console.error("Error during stream cleanup:", error);
        }
      }
      
      process.exit(0);
    }

    async streamWithContext(message: string, model: string = "gpt-4o"): Promise<string | null> {
      if (this.isShuttingDown) {
        console.log("Cannot start stream: shutting down");
        return null;
      }

      try {
        const stream = await this.sudo.router.createStreaming({
          model,
          messages: [{ role: "user", content: message }],
          streamOptions: { includeUsage: true }
        });

        this.currentStream = stream;
        const responseParts: string[] = [];

        for await (const chunk of stream) {
          if (this.isShuttingDown) {
            console.log("\nStreaming interrupted by shutdown signal");
            break;
          }

          if (chunk.data?.choices) {
            for (const choice of chunk.data.choices) {
              if (choice.delta?.content) {
                const content = choice.delta.content;
                responseParts.push(content);
                process.stdout.write(content);
              }
            }
          }
        }

        console.log();
        return responseParts.join('');

      } catch (error) {
        if (error instanceof Error && error.name === 'AbortError') {
          console.log("\nStreaming interrupted by user");
          return null;
        }
        console.error(`\nStreaming error: ${error}`);
        return null;
      } finally {
        this.currentStream = null;
      }
    }

    // Graceful shutdown method
    async shutdown(): Promise<void> {
      this.isShuttingDown = true;
      if (this.currentStream) {
        this.currentStream = null;
      }
    }
  }

  // Usage
  async function optimizedStreamingExample() {
    const streamer = new OptimizedStreaming();
    
    try {
      const result = await streamer.streamWithContext(
        "Write a comprehensive guide to TypeScript decorators"
      );

      if (result) {
        console.log(`\nStreaming completed. Response length: ${result.length} characters`);
      }
    } catch (error) {
      console.error("Streaming example error:", error);
    }
  }

  optimizedStreamingExample();
  ```

  ```python Python theme={null}
  import os
  import signal
  import sys
  from sudo import Sudo

  class OptimizedStreaming:
      def __init__(self):
          self.client = None
          self.current_stream = None
          self.setup_signal_handlers()
      
      def setup_signal_handlers(self):
          """Handle graceful shutdown"""
          signal.signal(signal.SIGINT, self.signal_handler)
          signal.signal(signal.SIGTERM, self.signal_handler)
      
      def signal_handler(self, sig, frame):
          """Clean up on signal"""
          print("\nShutting down gracefully...")
          if self.current_stream:
              try:
                  self.current_stream.close()
              except:
                  pass
          sys.exit(0)
      
      def stream_with_context(self, message, model="gpt-4o"):
          """Stream with proper context management"""
          try:
              with Sudo(
                  server_url="https://sudoapp.dev/api",
                  api_key=os.getenv("SUDO_API_KEY"),
              ) as client:
                  self.client = client
                  
                  stream = client.router.create_streaming(
                      model=model,
                      messages=[{"role": "user", "content": message}],
                      stream_options={"include_usage": True}
                  )
                  
                  self.current_stream = stream
                  response_parts = []
                  
                  with stream as event_stream:
                      for chunk in event_stream:
                          if chunk.data and chunk.data.choices:
                              for choice in chunk.data.choices:
                                  if choice.delta and choice.delta.content:
                                      content = choice.delta.content
                                      response_parts.append(content)
                                      print(content, end="", flush=True)
                  
                  print()
                  return ''.join(response_parts)
                  
          except KeyboardInterrupt:
              print("\nStreaming interrupted by user")
              return None
          except Exception as e:
              print(f"\nStreaming error: {e}")
              return None
          finally:
              self.current_stream = None

  # Usage
  streamer = OptimizedStreaming()
  result = streamer.stream_with_context(
      "Write a comprehensive guide to Python decorators"
  )

  if result:
      print(f"\nStreaming completed. Response length: {len(result)} characters")
  ```
</CodeGroup>

## Next Steps

Now that you understand streaming:

1. **[Try Tool Calling](/sdk/sudo-sdk/tool-calling)** - Combine streaming with function calls
2. **[Structured Output](/sdk/sudo-sdk/structured-output)** - Generate structured data
3. **[CRUD Operations](/sdk/sudo-sdk/crud-operations)** - Manage stored completions

<Tip>
  Use streaming for long-form content generation and interactive applications. Remember to implement proper error handling and user interruption capabilities for the best user experience. Both TypeScript and Python SDKs provide excellent streaming support with robust error handling and async capabilities.
</Tip>
