Streaming API¶
The streaming module provides real-time token generation with the Stream trait, enabling incremental text output, backpressure handling, and integration with web response streams.
Feature Gate
This module requires the streaming feature flag (which automatically enables async):
TokenStream¶
The primary streaming interface, implementing the futures::Stream trait for async iteration over generated tokens.
pub struct TokenStream {
// Internal state for token generation and buffering
}
impl TokenStream {
/// Get next token (async, waits for generation)
pub async fn next(&mut self) -> Option<Result<TokenData, MullamaError>>;
/// Try to get next token without waiting (non-blocking)
pub fn try_next(&mut self) -> Option<Result<TokenData, MullamaError>>;
/// Get stream configuration
pub fn config(&self) -> &StreamConfig;
/// Check if stream has completed
pub fn is_finished(&self) -> bool;
/// Get count of tokens generated so far
pub fn tokens_generated(&self) -> usize;
/// Cancel the stream (stops generation immediately)
pub fn cancel(&mut self);
}
// Implements the futures Stream trait for use with StreamExt combinators
impl Stream for TokenStream {
type Item = Result<TokenData, MullamaError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;
}
TokenData¶
Individual token data emitted by the stream.
#[derive(Debug, Clone)]
pub struct TokenData {
pub text: String, // Decoded text for this token
pub token_id: TokenId, // Numeric token ID
pub is_final: bool, // Whether this is the last token (EOS or max reached)
pub logprob: f32, // Log probability of this token selection
}
StreamConfig¶
Configuration for streaming behavior, using a builder pattern.
pub struct StreamConfig {
// Internal fields
}
impl StreamConfig {
pub fn new() -> Self;
pub fn max_tokens(mut self, max: usize) -> Self;
pub fn temperature(mut self, temp: f32) -> Self;
pub fn top_k(mut self, k: u32) -> Self;
pub fn top_p(mut self, p: f32) -> Self;
pub fn stop_sequences(mut self, sequences: Vec<String>) -> Self;
pub fn stream_delay_ms(mut self, delay: u64) -> Self;
pub fn buffer_size(mut self, size: usize) -> Self;
pub fn timeout_ms(mut self, timeout: u64) -> Self;
}
Fields¶
| Name | Type | Default | Description |
|---|---|---|---|
max_tokens |
usize |
256 |
Maximum number of tokens to generate |
temperature |
f32 |
0.8 |
Sampling temperature (higher = more random) |
top_k |
u32 |
40 |
Top-K sampling parameter |
top_p |
f32 |
0.95 |
Nucleus sampling threshold |
stop_sequences |
Vec<String> |
[] |
Text sequences that trigger early stopping |
stream_delay_ms |
u64 |
0 |
Artificial delay between tokens (for testing/throttling) |
buffer_size |
usize |
32 |
Internal buffer size for backpressure management |
timeout_ms |
u64 |
30000 |
Maximum time for the entire stream (0 = no timeout) |
Example:
let config = StreamConfig::new()
.max_tokens(500)
.temperature(0.7)
.top_p(0.9)
.stop_sequences(vec!["\n\n".to_string(), "END".to_string()])
.buffer_size(64)
.timeout_ms(60000);
Creating Streams¶
Streams are created from AsyncModel or AsyncContext:
use mullama::{AsyncModel, StreamConfig};
let model = AsyncModel::load("model.gguf").await?;
let config = StreamConfig::new()
.max_tokens(200)
.temperature(0.7);
let stream = model.generate_stream("Tell me about Rust", config).await?;
Token-by-Token Iteration¶
Using while let with next()¶
The most common pattern for consuming a token stream:
use mullama::{AsyncModel, StreamConfig};
#[tokio::main]
async fn main() -> Result<(), mullama::MullamaError> {
let model = AsyncModel::load("model.gguf").await?;
let config = StreamConfig::new()
.max_tokens(200)
.temperature(0.7);
let mut stream = model.generate_stream("Tell me about AI", config).await?;
print!("AI: ");
while let Some(token_result) = stream.next().await {
let token_data = token_result?;
print!("{}", token_data.text);
// Flush stdout for real-time display
}
println!();
println!("Generated {} tokens", stream.tokens_generated());
Ok(())
}
Using StreamExt combinators¶
use tokio_stream::StreamExt;
let mut stream = model.generate_stream("Hello", config).await?;
// Collect all tokens into a string
let tokens: Vec<String> = stream
.map(|r| r.map(|t| t.text))
.collect::<Result<Vec<_>, _>>()
.await?;
let full_text = tokens.join("");
println!("Full response: {}", full_text);
Taking a limited number of tokens¶
use tokio_stream::StreamExt;
let mut stream = model.generate_stream("Count:", config).await?;
// Take only first 10 tokens
let first_10: Vec<_> = stream
.take(10)
.collect::<Vec<_>>()
.await;
Error Handling in Streams¶
Errors can occur during streaming for various reasons:
let mut stream = model.generate_stream("prompt", config).await?;
while let Some(token_result) = stream.next().await {
match token_result {
Ok(token_data) => {
print!("{}", token_data.text);
}
Err(MullamaError::StreamingError(msg)) => {
eprintln!("Stream error: {}", msg);
break;
}
Err(MullamaError::GenerationError(msg)) => {
eprintln!("Generation failed: {}", msg);
break;
}
Err(e) => {
eprintln!("Unexpected error: {}", e);
break;
}
}
}
Backpressure¶
The stream uses an internal buffer to handle backpressure when the consumer is slower than the producer:
- Tokens are buffered up to
buffer_size - When the buffer is full, generation pauses until the consumer reads
- This prevents unbounded memory growth
// Configure small buffer for demonstration
let config = StreamConfig::new()
.max_tokens(1000)
.buffer_size(16); // Generation pauses when 16 tokens are buffered
let mut stream = model.generate_stream("Write a long essay", config).await?;
while let Some(token_result) = stream.next().await {
let token_data = token_result?;
print!("{}", token_data.text);
// Simulate slow processing -- backpressure kicks in automatically
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Cancellation and Early Stopping¶
Manual Cancellation¶
let mut stream = model.generate_stream("Count:", config).await?;
let mut count = 0;
while let Some(token_result) = stream.next().await {
let token_data = token_result?;
print!("{}", token_data.text);
count += 1;
if count >= 50 {
stream.cancel(); // Immediately stops generation
break;
}
}
println!("\nStopped after {} tokens", count);
Stop Sequences¶
Configured stop sequences automatically end the stream when generated:
let config = StreamConfig::new()
.max_tokens(500)
.stop_sequences(vec![
"\n\n".to_string(), // Stop on double newline
"```".to_string(), // Stop on code fence
"END".to_string(), // Stop on explicit marker
]);
let mut stream = model.generate_stream("Write a function:", config).await?;
// Stream automatically stops when any stop sequence is generated
Timeout¶
let config = StreamConfig::new()
.max_tokens(10000)
.timeout_ms(5000); // 5 second timeout
let mut stream = model.generate_stream("Write forever", config).await?;
// Stream will end with StreamingError after 5 seconds
Integration with Web Responses¶
Streams integrate naturally with web frameworks for Server-Sent Events (SSE):
use axum::{response::sse::{Event, Sse}, extract::State};
use futures::stream::Stream;
use std::convert::Infallible;
async fn stream_handler(
State(model): State<Arc<AsyncModel>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let config = StreamConfig::new()
.max_tokens(500)
.temperature(0.7);
let token_stream = model
.generate_stream("Hello", config)
.await
.unwrap();
let event_stream = token_stream.map(|result| {
match result {
Ok(token_data) => Ok(Event::default().data(token_data.text)),
Err(_) => Ok(Event::default().data("[ERROR]")),
}
});
Sse::new(event_stream)
}
Complete Example¶
use mullama::{AsyncModel, StreamConfig};
use tokio_stream::StreamExt;
#[tokio::main]
async fn main() -> Result<(), mullama::MullamaError> {
let model = AsyncModel::load("model.gguf").await?;
// Interactive streaming generation with multiple prompts
let prompts = vec![
"What is Rust?",
"Explain ownership in one paragraph:",
"Write a haiku about programming:",
];
for prompt in prompts {
let config = StreamConfig::new()
.max_tokens(150)
.temperature(0.7)
.stop_sequences(vec!["\n\n".to_string()]);
println!("\n> {}", prompt);
print!(" ");
let mut stream = model.generate_stream(prompt, config).await?;
let mut total_tokens = 0;
while let Some(token_result) = stream.next().await {
match token_result {
Ok(token_data) => {
print!("{}", token_data.text);
total_tokens += 1;
}
Err(e) => {
eprintln!("\nStream error: {}", e);
break;
}
}
}
println!("\n [{} tokens generated]", total_tokens);
}
Ok(())
}