Streaming Generation¶
In production applications, users expect to see text appear as it is generated -- not after the entire response is complete. Streaming generation bridges the gap between the token-by-token internal generation loop and the user-facing interface, delivering tokens in real time through a thread-safe pipeline.
1. Architecture¶
The streaming pipeline has four stages: the model produces logits, the generator selects tokens, a thread-safe buffer queues them, and a processor delivers them to the user interface.
flowchart LR
subgraph "Generation Thread"
MODEL["Model Forward Pass"]
SAMP["Token Sampling"]
ENC["Token Encoding"]
end
subgraph "Thread-Safe Buffer"
BUF["TokenBuffer<br/>(Mutex + Condition)"]
end
subgraph "Consumer Thread"
PROC["StreamProcessor"]
CB["Callback / Iterator"]
UI["User Interface"]
end
MODEL --> SAMP --> ENC --> BUF --> PROC --> CB --> UI The key design constraint is that generation and consumption run on separate threads. The model forward pass is compute-intensive and should not be blocked by UI rendering; conversely, the UI should not stall waiting for the next forward pass. The TokenBuffer decouples these two rates using a mutex-protected queue with condition-variable signalling.
2. StreamingGenerator API¶
The StreamingGenerator wraps a TextGenerator and adds streaming capabilities:
pub const StreamingGenerator = struct {
generator: *TextGenerator,
tokenizer: *SimpleTokenizer,
config: StreamingConfig,
allocator: Allocator,
buffer: TokenBuffer,
generation_thread: ?Thread,
status: StreamStatus,
is_streaming: bool,
pub fn init(generator: *TextGenerator, tokenizer: *SimpleTokenizer,
allocator: Allocator) !StreamingGenerator { ... }
pub fn deinit(self: *StreamingGenerator) void { ... }
pub fn setConfig(self: *StreamingGenerator, config: StreamingConfig) !void { ... }
pub fn startStreaming(self: *StreamingGenerator, prompt: []const u8) !void { ... }
pub fn stopStreaming(self: *StreamingGenerator) void { ... }
pub fn nextChunk(self: *StreamingGenerator) ?TokenChunk { ... }
pub fn streamWithCallback(self: *StreamingGenerator, prompt: []const u8,
callback: StreamCallback, error_callback: ?ErrorCallback,
user_data: ?*anyopaque) !void { ... }
pub fn collectAll(self: *StreamingGenerator, prompt: []const u8) ![]TokenChunk { ... }
pub fn getStatus(self: StreamingGenerator) StreamStatus { ... }
};
2.1 Callback-Based Streaming¶
The streamWithCallback method is the primary API for real-time delivery. The caller provides a function pointer that is invoked for each token:
pub const StreamCallback = *const fn (
chunk: TokenChunk,
status: StreamStatus,
user_data: ?*anyopaque,
) void;
Example usage:
fn onToken(chunk: TokenChunk, status: StreamStatus, _: ?*anyopaque) void {
if (chunk.text) |text| {
std.io.getStdOut().writer().writeAll(text) catch {};
}
// Check streaming throughput
if (status.average_tps > 0) {
std.debug.print("[{d:.1} t/s]\n", .{status.average_tps});
}
}
try streaming_gen.streamWithCallback("Explain quantum computing", onToken, null, null);
2.2 Iterator-Based Streaming¶
For pull-based consumers, StreamingIterator provides a standard iterator interface:
pub const StreamingIterator = struct {
streaming_gen: *StreamingGenerator,
finished: bool,
pub fn next(self: *StreamingIterator) ?TokenChunk { ... }
};
2.3 Batch Collection¶
collectAll buffers all tokens and returns them as a slice -- useful for testing and batch processing where streaming delivery is not needed:
const chunks = try streaming_gen.collectAll("Summarise this article");
defer allocator.free(chunks);
for (chunks) |chunk| {
// Process each token
}
3. StreamStatus¶
StreamStatus tracks the state of an ongoing stream, updated after each token is produced:
pub const StreamStatus = struct {
tokens_generated: u32,
current_tps: f32, // Instantaneous tokens per second
average_tps: f32, // Average tokens per second
start_time: i64, // Stream start timestamp (ms)
current_time: i64, // Last update timestamp (ms)
is_active: bool, // Whether stream is still producing
stop_reason: ?StopReason,
};
The status transitions through the following states:
stateDiagram-v2
[*] --> Active : startStreaming()
Active --> Active : token generated
Active --> Paused : pause()
Paused --> Active : resume()
Active --> Complete : stop condition met
Active --> Error : generation error
Paused --> Complete : stopStreaming()
Complete --> [*]
Error --> [*] Throughput Metrics
current_tps measures the instantaneous rate between the last two tokens. average_tps is the total tokens divided by total elapsed time. The instantaneous metric is more useful for detecting performance regressions in real time; the average is more stable for benchmarking.
4. Thread Safety¶
The TokenBuffer is the critical synchronisation point between the generation thread and the consumer thread.
const TokenBuffer = struct {
chunks: ArrayList(TokenChunk),
mutex: Mutex,
condition: Condition,
closed: bool,
allocator: Allocator,
pub fn push(self: *TokenBuffer, chunk: TokenChunk) !void { ... }
pub fn pop(self: *TokenBuffer, timeout_ms: u32) ?TokenChunk { ... }
pub fn close(self: *TokenBuffer) void { ... }
pub fn isEmpty(self: *TokenBuffer) bool { ... }
pub fn size(self: *TokenBuffer) usize { ... }
};
Producer-Consumer Protocol
Producer (generation thread):
- Acquire mutex.
- Append
TokenChunkto the queue. - Signal the condition variable.
- Release mutex.
Consumer (application thread):
- Acquire mutex.
- while queue is empty and not closed:
- Wait on condition variable with timeout.
- If queue is non-empty, remove and return the front element.
- Release mutex.
Shutdown:
- Set
closed = true. - Broadcast condition variable (wakes all waiters).
Timeout Handling
The pop method accepts a timeout_ms parameter. If no token arrives within the timeout, it returns null rather than blocking indefinitely. This prevents the consumer from hanging if generation stalls or errors out. The default timeout is 5000 ms; adjust via StreamingConfig.
5. Chunk Management¶
5.1 TokenChunk¶
Each chunk carries a single token plus metadata:
pub const TokenChunk = struct {
token_id: TokenId,
text: ?[]const u8, // Decoded text (may be null for special tokens)
log_prob: f32, // Log probability of this token
cumulative_log_prob: f32, // Running total log probability
position: u32, // Position in the output sequence
timestamp: i64, // When this token was generated (ms)
};
5.2 StreamingConfig¶
pub const StreamingConfig = struct {
buffer_size: usize = 64, // Maximum buffered chunks
timeout_ms: u32 = 5000, // Consumer timeout
flush_on_newline: bool = true, // Flush at line breaks
flush_on_sentence_end: bool = true,
min_chunk_size: usize = 1, // Minimum tokens per delivery
max_chunk_size: usize = 32, // Force flush above this
detailed_stats: bool = false,
};
5.3 Adaptive Chunk Sizing¶
When min_chunk_size > 1, the stream processor accumulates tokens before delivering them to the callback. This reduces per-token callback overhead at the cost of slightly increased perceived latency.
| Chunk Size | Callback Overhead | Perceived Latency | Use Case |
|---|---|---|---|
| 1 | High | Lowest | Interactive chat |
| 4--8 | Moderate | Low | Web API responses |
| 16--32 | Low | Moderate | Batch post-processing |
6. Natural Break Detection¶
The streaming processor can insert brief pauses at natural language boundaries to improve the reading experience:
if (self.config.flush_on_sentence_end and chunk.text != null) {
const text = chunk.text.?;
if (text.len > 0) {
const last_char = text[text.len - 1];
if (last_char == '.' or last_char == '!' or last_char == '?') {
std.time.sleep(10_000_000); // 10ms pause at sentence end
}
}
}
Break detection heuristics:
| Pattern | Type | Pause |
|---|---|---|
. ! ? | Sentence ending | 10 ms |
\n\n | Paragraph break | 20 ms |
\n (with flush_on_newline) | Line break | Immediate flush |
| Code fence boundary | Code block | Immediate flush |
Disabling Pauses
For maximum throughput (e.g., when the consumer is another program, not a human reader), set flush_on_sentence_end = false and flush_on_newline = false.
7. First-Token Latency vs Total Throughput¶
Streaming introduces a fundamental tension between two performance metrics:
Latency Metrics
- First-token latency (TTFT): Time from prompt submission to the first generated token appearing. Dominated by the prompt processing (prefill) phase.
- Total throughput: Tokens generated per second over the entire response. Dominated by the per-token generation (decode) phase.
gantt
title Streaming Generation Timeline
dateFormat X
axisFormat %s
section Prefill
Process Prompt :0, 500
section Decode
Token 1 :500, 550
Token 2 :550, 600
Token 3 :600, 650
Token N :650, 700
section User Perception
Waiting (TTFT) :crit, 0, 500
Reading tokens :active, 500, 700 | Metric | Optimisation Strategy |
|---|---|
| Reduce TTFT | Prompt caching, speculative decoding, smaller prefill batch |
| Increase throughput | KV caching, quantisation, batching, SIMD kernels |
| Both | Model parallelism, continuous batching |
TTFT in Practice
For a 7B model on CPU, typical TTFT ranges from 200 ms (short prompt, cached) to several seconds (long prompt, cold start). Users generally perceive responses as "instant" when TTFT is below 500 ms.
8. Error Handling¶
The ErrorCallback type allows the consumer to react to generation failures without terminating the stream:
Common error scenarios:
| Error | Cause | Recovery |
|---|---|---|
StreamTimeout | No token within timeout period | Retry or abort |
BufferClosed | Generation completed or stopped | Read remaining tokens |
AlreadyStreaming | Called startStreaming twice | Stop first, then restart |
OutOfMemory | Token buffer allocation failed | Reduce buffer size or free memory |
References¶
-
Gerganov, G. "llama.cpp -- Inference of LLaMA model in C/C++." GitHub, 2023. ↩
-
Kwon, W. et al. "Efficient Memory Management for Large Language Model Serving with PagedAttention." SOSP, 2023. ↩
-
Yu, G. et al. "Orca: A Distributed Serving System for Transformer-Based Language Generation." OSDI, 2022. ↩