Skip to content

Models

The model gateway routes completion requests to a configured provider adapter.

arcana.models.gateway.ModelGateway

ModelGateway(
    connections,
    *,
    providers=None,
    retry=None,
    pricing=None,
    on_cost=None,
    unhealthy_cooldown=_UNHEALTHY_COOLDOWN,
)

Single entry point the Agent uses to talk to any model.

Routes provider/model_id strings to the correct adapter, pools adapter instances per connection, retries transient failures with exponential backoff, and emits a CostEvent per completed call.

Token usage is recorded on the session regardless. Per-call cost is emitted only if on_cost is provided at construction; without it, no CostEvent fires.

Usage::

async with ModelGateway(connections=ConnectionStore()) as gw:
    response = await gw.complete("ollama/hermes-3", request)

# Or with a cost sink:
def record(event: CostEvent) -> None:
    ...

gw = ModelGateway(connections=store, on_cost=record)
Source code in packages/arcana-core/arcana/models/gateway.py
def __init__(
    self,
    connections: ConnectionStore,
    *,
    providers: ProviderRegistry | None = None,
    retry: RetryPolicy | None = None,
    pricing: PricingTable | None = None,
    on_cost: Callable[[CostEvent], Any] | None = None,
    unhealthy_cooldown: float = _UNHEALTHY_COOLDOWN,
) -> None:
    self._connections = connections
    self._providers = providers or DEFAULT_PROVIDERS
    self._retry = retry or RetryPolicy()
    self._pricing = pricing or DEFAULT_PRICING
    self._on_cost = on_cost
    self._unhealthy_cooldown = unhealthy_cooldown
    self._cache: dict[str, _CacheEntry] = {}
    self._cache_locks: dict[str, asyncio.Lock] = {}

complete async

complete(model, request)

Dispatch a completion request, retrying transient errors with backoff.

Source code in packages/arcana-core/arcana/models/gateway.py
async def complete(self, model: str, request: CompletionRequest) -> CompletionResponse:
    """Dispatch a completion request, retrying transient errors with backoff."""
    conn = self.resolve(model)
    entry = await self._get_cache_entry(conn)

    if entry.is_open(self._unhealthy_cooldown):
        raise ModelUnavailableError(f"Connection {model!r} is in cooldown after repeated failures.")

    req = replace(request, model_id=conn.model_id)
    session_id = (req.metadata or {}).get("session_id", "")

    with get_tracer().start_as_current_span("model.complete") as span:
        span.set_attribute("arcana.model", model)
        if session_id:
            span.set_attribute("arcana.session_id", session_id)

        last_exc: Exception | None = None
        start = time.monotonic()
        for attempt in range(self._retry.max_retries + 1):
            attempt_start = time.monotonic()
            try:
                response = await entry.adapter.complete(req)
                entry.mark_healthy()
                latency_ms = int((time.monotonic() - attempt_start) * 1000)
                span.set_attribute("arcana.input_tokens", response.input_tokens)
                span.set_attribute("arcana.output_tokens", response.output_tokens)
                span.set_attribute("arcana.attempts", attempt + 1)
                await self._emit_cost(model, response, conn, req.metadata)
                _emit_model_call(
                    session_id, model, latency_ms, response.input_tokens, response.output_tokens, attempt + 1, True
                )
                return response
            except _RETRYABLE as exc:
                latency_ms = int((time.monotonic() - attempt_start) * 1000)
                _emit_model_call(session_id, model, latency_ms, 0, 0, attempt + 1, False, str(exc))
                last_exc = exc
                if attempt == self._retry.max_retries:
                    break
                delay = self._retry.backoff(attempt, retry_after=getattr(exc, "retry_after", None))
                if self._retry.total_timeout is not None:
                    elapsed = time.monotonic() - start
                    remaining = self._retry.total_timeout - elapsed
                    if remaining <= 0:
                        break
                    delay = min(delay, remaining)
                await asyncio.sleep(delay)
            except ModelError as exc:
                span.record_exception(exc)
                raise

        if last_exc is not None:
            span.record_exception(last_exc)
        if isinstance(last_exc, ModelUnavailableError):
            entry.mark_unhealthy()
        raise last_exc  # type: ignore[misc]

stream async

stream(model, request)

Stream a response as ModelChunk deltas.

Retry applies only before the first token arrives — mid-stream failures are surfaced immediately since output cannot be cleanly replayed. Emits one CostEvent after the stream completes.

Source code in packages/arcana-core/arcana/models/gateway.py
async def stream(self, model: str, request: CompletionRequest) -> AsyncGenerator[ModelChunk, None]:
    """Stream a response as ``ModelChunk`` deltas.

    Retry applies only before the first token arrives — mid-stream failures
    are surfaced immediately since output cannot be cleanly replayed.
    Emits one ``CostEvent`` after the stream completes.
    """
    conn = self.resolve(model)
    entry = await self._get_cache_entry(conn)

    if entry.is_open(self._unhealthy_cooldown):
        raise ModelUnavailableError(f"Connection {model!r} is in cooldown after repeated failures.")

    req = replace(request, model_id=conn.model_id)
    async with aclosing(self._retry_stream(model, conn, entry, req)) as gen:
        async for chunk in gen:
            yield chunk

health async

health(model=None)

Check health for one model string or all cached adapters.

A successful check resets the unhealthy flag so the connection is allowed back into the request path without waiting for cooldown.

Source code in packages/arcana-core/arcana/models/gateway.py
async def health(self, model: str | None = None) -> dict[str, ModelHealth]:
    """Check health for one model string or all cached adapters.

    A successful check resets the unhealthy flag so the connection is
    allowed back into the request path without waiting for cooldown.
    """
    if model is not None:
        conn = self.resolve(model)
        entry = await self._get_cache_entry(conn)
        result = await entry.adapter.health_check()
        if result.healthy:
            entry.mark_healthy()
        return {model: result}

    results: dict[str, ModelHealth] = {}
    for key, entry in self._cache.items():
        result = await entry.adapter.health_check()
        if result.healthy:
            entry.mark_healthy()
        results[key] = result
    return results

resolve

resolve(model)

Parse provider/model_id or provider:connection_name/model_id and return a ModelConnection.

When a connection name is given, looks it up by name in ConnectionStore and raises ValueError if it doesn't exist. Without a name, checks by provider first then falls back to ProviderRegistry defaults so out-of-the-box usage requires no config file.

Source code in packages/arcana-core/arcana/models/gateway.py
def resolve(self, model: str) -> ModelConnection:
    """Parse ``provider/model_id`` or ``provider:connection_name/model_id`` and return a ``ModelConnection``.

    When a connection name is given, looks it up by name in ``ConnectionStore`` and raises
    ``ValueError`` if it doesn't exist.  Without a name, checks by provider first then falls
    back to ``ProviderRegistry`` defaults so out-of-the-box usage requires no config file.
    """
    provider, conn_name, model_id = self._parse_model_string(model)

    if conn_name is not None:
        conn = self._connections.get_by_name(conn_name)
        if conn is None:
            raise ValueError(
                f"No connection named {conn_name!r} found. "
                f"Add it with `arcana connect model` or check your connections file."
            )
        return conn.model_copy(update={"model_id": model_id})

    entry = self._providers.get(provider)
    if entry is not None:
        conn = self._connections.get_by_provider(entry.provider)
        if conn is not None:
            return conn.model_copy(update={"model_id": model_id})

    return self._providers.build_default_connection(provider, model_id)

aclose async

aclose()

Close all cached adapters. Called automatically by the context manager.

Source code in packages/arcana-core/arcana/models/gateway.py
async def aclose(self) -> None:
    """Close all cached adapters. Called automatically by the context manager."""
    for entry in self._cache.values():
        await entry.adapter.aclose()
    self._cache.clear()
    self._cache_locks.clear()

arcana.models.connection_store.ConnectionStore

ConnectionStore(path=None)

Reads ModelConnection records from disk and credentials from the OS keyring.

Connections are loaded lazily on first access; call reload() to invalidate the cache if the file changes at runtime.

Usage::

store = ConnectionStore()
conn = store.get_by_provider(ModelProvider.ANTHROPIC)
key  = store.get_api_key(conn.id)
Source code in packages/arcana-core/arcana/models/connection_store.py
def __init__(self, path: Path | None = None) -> None:
    self._path = path or _default_path()
    self._connections: list[ModelConnection] | None = None

upsert

upsert(conn)

Insert conn, or replace the existing connection with the same name.

Source code in packages/arcana-core/arcana/models/connection_store.py
def upsert(self, conn: ModelConnection) -> None:
    """Insert conn, or replace the existing connection with the same name."""
    connections = list(self._load())
    idx = next((i for i, c in enumerate(connections) if c.name == conn.name), None)
    if idx is not None:
        connections[idx] = conn.model_copy(update={"id": connections[idx].id})
    else:
        connections.append(conn)
    self._save(connections)