Skip to content

Async API

The async module provides non-blocking wrappers around Mullama's core operations, enabling integration with Tokio-based async applications. It includes async model loading, non-blocking inference, model pooling, and advanced runtime management.

Feature Gates

  • async -- Enables AsyncModel, AsyncContext, AsyncConfig, ModelInfo
  • tokio-runtime -- Enables MullamaRuntime, ModelPool, TaskManager, RuntimeMetrics
mullama = { version = "0.3", features = ["async"] }
# Or for full runtime management:
mullama = { version = "0.3", features = ["async", "tokio-runtime"] }

AsyncModel

Async-first model interface with full Tokio integration. Wraps the synchronous Model and offloads CPU-intensive work to a blocking thread pool.

pub struct AsyncModel {
    inner: Arc<Model>,
    // Runtime management fields
}

impl AsyncModel {
    /// Load model asynchronously (non-blocking file I/O + parsing)
    pub async fn load(path: impl AsRef<Path>) -> Result<Self, MullamaError>;

    /// Load with full configuration
    pub async fn load_with_config(config: AsyncConfig) -> Result<Self, MullamaError>;

    /// Generate text asynchronously
    pub async fn generate(
        &self,
        prompt: &str,
        max_tokens: usize,
    ) -> Result<String, MullamaError>;

    /// Generate with streaming output
    pub async fn generate_stream(
        &self,
        prompt: &str,
        config: StreamConfig,
    ) -> Result<TokenStream, MullamaError>;

    /// Tokenize asynchronously
    pub async fn tokenize_async(
        &self,
        text: &str,
        add_bos: bool,
    ) -> Result<Vec<TokenId>, MullamaError>;

    /// Get model metadata
    pub async fn model_info(&self) -> Result<ModelInfo, MullamaError>;

    /// Check if model is ready for inference
    pub async fn is_ready(&self) -> bool;

    /// Clone for concurrent use (Arc-based, cheap)
    pub fn clone(&self) -> Self;
}

Thread Safety: AsyncModel implements Send + Sync and is designed for concurrent use across multiple async tasks.

Example

use mullama::AsyncModel;

#[tokio::main]
async fn main() -> Result<(), mullama::MullamaError> {
    // Load model (non-blocking)
    let model = AsyncModel::load("model.gguf").await?;

    // Generate text
    let response = model.generate("Tell me about Rust", 200).await?;
    println!("{}", response);

    // Model info
    let info = model.model_info().await?;
    println!("Model: {} ({} params)", info.name, info.n_params);

    Ok(())
}

Node.js / Python Equivalents

  • Node.js: const model = await Model.load("model.gguf"); const result = await model.generate("prompt", 200);
  • Python: model = await Model.load("model.gguf"); result = await model.generate("prompt", 200)

AsyncContext

Async context management for non-blocking inference operations. Each AsyncContext manages its own synchronous Context internally via spawn_blocking.

pub struct AsyncContext {
    // Internal synchronous context managed via spawn_blocking
}

impl AsyncContext {
    /// Create async context from model
    pub async fn new(model: Arc<AsyncModel>) -> Result<Self, MullamaError>;

    /// Generate with async streaming
    pub async fn generate_stream(
        &mut self,
        tokens: &[TokenId],
    ) -> Result<TokenStream, MullamaError>;

    /// Process batch asynchronously
    pub async fn process_batch(
        &mut self,
        batches: Vec<Batch>,
    ) -> Result<Vec<Vec<TokenId>>, MullamaError>;

    /// Save context state to file
    pub async fn save_state(
        &self,
        path: impl AsRef<Path>,
    ) -> Result<(), MullamaError>;

    /// Load context state from file
    pub async fn load_state(
        &mut self,
        path: impl AsRef<Path>,
    ) -> Result<(), MullamaError>;
}

AsyncConfig

Configuration for async model loading and behavior.

#[derive(Debug, Clone)]
pub struct AsyncConfig {
    pub model_path: String,
    pub gpu_layers: i32,
    pub context_size: u32,
    pub threads: i32,
    pub flash_attention: bool,
    pub max_concurrent: usize,
    pub timeout_ms: u64,
}

Fields

Name Type Default Description
model_path String "" Path to the GGUF model file
gpu_layers i32 0 Number of layers to offload to GPU
context_size u32 2048 Context window size in tokens
threads i32 CPU count Number of threads for inference
flash_attention bool false Enable flash attention
max_concurrent usize 4 Maximum concurrent inference requests
timeout_ms u64 30000 Request timeout in milliseconds

Example:

use mullama::{AsyncModel, AsyncConfig};

let config = AsyncConfig {
    model_path: "model.gguf".to_string(),
    gpu_layers: 32,
    context_size: 4096,
    threads: 8,
    flash_attention: true,
    max_concurrent: 4,
    timeout_ms: 30000,
};

let model = AsyncModel::load_with_config(config).await?;

Cancellation

Async operations support cancellation via Tokio's CancellationToken:

use tokio_util::sync::CancellationToken;
use tokio::select;

let model = AsyncModel::load("model.gguf").await?;
let cancel = CancellationToken::new();

let cancel_clone = cancel.clone();
let generate_handle = tokio::spawn(async move {
    select! {
        result = model.generate("Write a long essay", 5000) => result,
        _ = cancel_clone.cancelled() => {
            Err(MullamaError::OperationFailed("Cancelled".to_string()))
        }
    }
});

// Cancel after 5 seconds
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
cancel.cancel();

ModelInfo

Metadata about a loaded model, available through async access.

#[derive(Debug, Clone)]
pub struct ModelInfo {
    pub name: String,
    pub architecture: String,
    pub n_params: u64,
    pub n_vocab: usize,
    pub n_embd: usize,
    pub n_layer: usize,
    pub n_head: usize,
    pub n_ctx_train: usize,
    pub size_bytes: u64,
}

MullamaRuntime

Advanced Tokio runtime management with builder pattern. Provides a managed runtime specifically configured for LLM inference workloads.

Feature Gate

Requires the tokio-runtime feature flag.

pub struct MullamaRuntime {
    // Tokio runtime and management state
}

impl MullamaRuntime {
    /// Create new runtime builder
    pub fn new() -> MullamaRuntimeBuilder;

    /// Spawn async task on runtime
    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
    where
        F: Future + Send + 'static,
        F::Output: Send + 'static;

    /// Spawn blocking task (for CPU-intensive inference)
    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
    where
        F: FnOnce() -> R + Send + 'static,
        R: Send + 'static;

    /// Get runtime performance metrics
    pub async fn metrics(&self) -> RuntimeMetrics;

    /// Graceful shutdown with timeout
    pub async fn shutdown(self);

    /// Check if runtime is still running
    pub fn is_running(&self) -> bool;
}

MullamaRuntimeBuilder

let runtime = MullamaRuntime::new()
    .worker_threads(4)
    .max_blocking_threads(8)
    .thread_stack_size(4 * 1024 * 1024)
    .thread_name("mullama-worker")
    .enable_io()
    .enable_time()
    .build()?;
Method Parameter Description
worker_threads(n) usize Number of async worker threads
max_blocking_threads(n) usize Maximum blocking thread pool size
thread_stack_size(n) usize Stack size per thread in bytes
thread_name(s) impl Into<String> Thread name prefix for debugging
enable_io() -- Enable I/O driver (network, files)
enable_time() -- Enable time driver (timers, delays)
enable_all() -- Enable all drivers
build() -- Build the runtime

ModelPool

Manages multiple model/context instances for concurrent request handling. Implements connection pooling semantics.

pub struct ModelPool {
    // Pool of model instances
}

impl ModelPool {
    /// Create a pool with N model instances
    pub async fn new(
        config: AsyncConfig,
        pool_size: usize,
    ) -> Result<Self, MullamaError>;

    /// Acquire a model instance (blocks if all in use)
    pub async fn acquire(&self) -> Result<PooledModel, MullamaError>;

    /// Get current pool statistics
    pub fn stats(&self) -> PoolStats;

    /// Resize the pool dynamically
    pub async fn resize(&self, new_size: usize) -> Result<(), MullamaError>;
}

Example:

use mullama::{AsyncConfig, ModelPool};

let config = AsyncConfig {
    model_path: "model.gguf".to_string(),
    gpu_layers: 32,
    ..Default::default()
};

// Create pool with 4 model instances
let pool = ModelPool::new(config, 4).await?;

// Acquire and use (model returned to pool when PooledModel is dropped)
let model = pool.acquire().await?;
let result = model.generate("Hello", 100).await?;
drop(model); // Explicitly return to pool (or let it drop naturally)

// Check pool stats
let stats = pool.stats();
println!("Available: {}, In use: {}", stats.available, stats.in_use);

TaskManager

Coordinates async tasks with lifecycle management and health monitoring.

pub struct TaskManager {
    // Task tracking state
}

impl TaskManager {
    /// Create new task manager
    pub fn new(runtime: &Arc<MullamaRuntime>) -> Self;

    /// Spawn a generation worker
    pub async fn spawn_generation_worker(&mut self) -> Result<(), MullamaError>;

    /// Spawn a metrics collection worker
    pub async fn spawn_metrics_collector(&mut self) -> Result<(), MullamaError>;

    /// Spawn a custom named worker
    pub async fn spawn_worker<F>(
        &mut self,
        name: &str,
        worker: F,
    ) -> Result<(), MullamaError>
    where
        F: Future<Output = Result<(), MullamaError>> + Send + 'static;

    /// Stop all managed workers
    pub async fn stop_all(&mut self) -> Result<(), MullamaError>;

    /// Get status of all workers
    pub fn worker_status(&self) -> HashMap<String, WorkerStatus>;
}

RuntimeMetrics

Performance monitoring data for the async runtime.

#[derive(Debug, Clone)]
pub struct RuntimeMetrics {
    pub active_tasks: usize,
    pub completed_tasks: u64,
    pub worker_threads: usize,
    pub blocking_threads: usize,
    pub total_generations: u64,
    pub avg_generation_ms: f64,
    pub uptime_seconds: u64,
}

Async/Sync Duality

The core API is synchronous for simplicity. Async wrappers use spawn_blocking internally to avoid blocking the Tokio runtime:

// Synchronous (core API -- no feature required)
let model = Model::load("model.gguf")?;
let tokens = model.tokenize("Hello", true, false)?;

// Asynchronous (requires "async" feature)
let model = AsyncModel::load("model.gguf").await?;
let tokens = model.tokenize_async("Hello", true).await?;

Both approaches use the same underlying llama.cpp operations. The async wrappers offload CPU-intensive work (model loading, token generation) to a blocking thread pool so they do not block async worker threads.

Complete Example

use mullama::{AsyncModel, AsyncConfig, MullamaRuntime, StreamConfig};
use std::sync::Arc;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() -> Result<(), mullama::MullamaError> {
    // Create runtime
    let runtime = Arc::new(
        MullamaRuntime::new()
            .worker_threads(4)
            .enable_all()
            .build()?
    );

    // Load model with configuration
    let model = AsyncModel::load_with_config(AsyncConfig {
        model_path: "model.gguf".to_string(),
        gpu_layers: 32,
        context_size: 4096,
        threads: 8,
        flash_attention: true,
        max_concurrent: 4,
        timeout_ms: 30000,
    }).await?;

    // Simple generation
    let response = model.generate("What is Rust?", 200).await?;
    println!("Response: {}", response);

    // Streaming generation
    let config = StreamConfig::new()
        .max_tokens(200)
        .temperature(0.7);

    let mut stream = model.generate_stream("Tell me a story", config).await?;

    print!("Story: ");
    while let Some(token_result) = stream.next().await {
        let token_data = token_result?;
        print!("{}", token_data.text);
    }
    println!();

    // Runtime metrics
    let metrics = runtime.metrics().await;
    println!("Completed tasks: {}", metrics.completed_tasks);

    Ok(())
}