Skip to content

Python SDK Reference

The Krayne SDK is a set of stateless functions for managing Ray clusters programmatically. All functions live in the krayne.api module.

from krayne.api import (
    create_cluster,
    get_cluster,
    list_clusters,
    describe_cluster,
    scale_cluster,
    delete_cluster,
    managed_cluster,
    wait_until_ready,
)

Design principles

  • Functional — free functions, not classes. No internal state to manage.
  • Explicit dependencies — the client parameter makes the Kubernetes dependency visible and injectable.
  • Immutable returns — all return types are frozen dataclasses.
  • Testable — pass a mock KubeClient to test without a real cluster.

Functions

create_cluster

Create a new Ray cluster from a configuration object.

def create_cluster(
    config: ClusterConfig,
    *,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
    wait: bool = False,
    timeout: int = 300,
) -> ClusterInfo

Parameters:

Parameter Type Default Description
config ClusterConfig Cluster configuration (required)
client KubeClient \| None None Kubernetes client. Uses default kubeconfig if None.
kubeconfig str \| None None Path to kubeconfig file
wait bool False Block until the cluster is ready
timeout int 300 Timeout in seconds when wait=True

Returns: ClusterInfo

Raises:

  • ClusterAlreadyExistsError — cluster name already in use
  • NamespaceNotFoundError — namespace does not exist
  • ClusterTimeoutError — cluster not ready within timeout (when wait=True)
  • KubeConnectionError — cannot reach the Kubernetes API

Example:

from krayne.api import create_cluster
from krayne.config import ClusterConfig, WorkerGroupConfig

config = ClusterConfig(
    name="training-run",
    namespace="ml-team",
    worker_groups=[
        WorkerGroupConfig(replicas=4, gpus=1, gpu_type="a100")
    ],
)
info = create_cluster(config, wait=True, timeout=600)
print(f"Dashboard: {info.dashboard_url}")

get_cluster

Retrieve summary information for a single cluster.

def get_cluster(
    name: str,
    namespace: str = "default",
    *,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
) -> ClusterInfo

Parameters:

Parameter Type Default Description
name str Cluster name (required)
namespace str "default" Kubernetes namespace
client KubeClient \| None None Kubernetes client
kubeconfig str \| None None Path to kubeconfig file

Returns: ClusterInfo

Raises: ClusterNotFoundError, KubeConnectionError


list_clusters

List all Ray clusters in a namespace.

def list_clusters(
    namespace: str = "default",
    *,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
) -> list[ClusterInfo]

Parameters:

Parameter Type Default Description
namespace str "default" Kubernetes namespace
client KubeClient \| None None Kubernetes client
kubeconfig str \| None None Path to kubeconfig file

Returns: list[ClusterInfo]

Raises: KubeConnectionError


describe_cluster

Get extended details for a cluster, including head node and worker group resource breakdowns.

def describe_cluster(
    name: str,
    namespace: str = "default",
    *,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
) -> ClusterDetails

Parameters:

Parameter Type Default Description
name str Cluster name (required)
namespace str "default" Kubernetes namespace
client KubeClient \| None None Kubernetes client
kubeconfig str \| None None Path to kubeconfig file

Returns: ClusterDetails

Raises: ClusterNotFoundError, KubeConnectionError


scale_cluster

Scale a worker group's desired, minimum, or maximum replica count.

When autoscaling is enabled on the cluster, only the explicitly provided fields are patched. When autoscaling is disabled, all three fields are pinned to the replicas value.

def scale_cluster(
    name: str,
    namespace: str,
    worker_group: str,
    replicas: int | None = None,
    *,
    min_replicas: int | None = None,
    max_replicas: int | None = None,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
) -> ClusterInfo

Parameters:

Parameter Type Default Description
name str Cluster name (required)
namespace str Kubernetes namespace (required)
worker_group str Name of the worker group to scale (required)
replicas int \| None None Target desired replica count
min_replicas int \| None None Minimum replicas for autoscaling
max_replicas int \| None None Maximum replicas for autoscaling
client KubeClient \| None None Kubernetes client
kubeconfig str \| None None Path to kubeconfig file

At least one of replicas, min_replicas, or max_replicas must be provided.

Returns: ClusterInfo

Raises: KrayneError (worker group not found or no arguments), ClusterNotFoundError, KubeConnectionError


delete_cluster

Delete a Ray cluster.

def delete_cluster(
    name: str,
    namespace: str = "default",
    *,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
) -> None

Parameters:

Parameter Type Default Description
name str Cluster name (required)
namespace str "default" Kubernetes namespace
client KubeClient \| None None Kubernetes client
kubeconfig str \| None None Path to kubeconfig file

Raises: ClusterNotFoundError, KubeConnectionError


managed_cluster

Context manager that creates a cluster, waits for readiness, optionally opens tunnels, and cleans up everything on exit.

def managed_cluster(
    config: ClusterConfig,
    *,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
    timeout: int = 300,
    tunnel: bool = True,
) -> ContextManager[ManagedClusterResult]

Parameters:

Parameter Type Default Description
config ClusterConfig Cluster configuration (required)
client KubeClient \| None None Kubernetes client
kubeconfig str \| None None Path to kubeconfig file
timeout int 300 Timeout in seconds for cluster readiness
tunnel bool True Open port-forward tunnels to cluster services

Yields: ManagedClusterResult once the cluster is ready

Raises:

  • ClusterAlreadyExistsError — cluster name already in use
  • NamespaceNotFoundError — namespace does not exist
  • ClusterTimeoutError — cluster not ready within timeout
  • KubeConnectionError — cannot reach the Kubernetes API

The cluster is always deleted on exit, even if an exception occurs inside the with block. When tunnel=True, tunnels are closed before the cluster is deleted.

Example:

import ray
from krayne.api import managed_cluster
from krayne.config import ClusterConfig, WorkerGroupConfig

config = ClusterConfig(
    name="experiment",
    worker_groups=[WorkerGroupConfig(replicas=2, gpus=1, gpu_type="a100")],
)

# Tunnels are opened by default
with managed_cluster(config, timeout=600) as managed:
    ray.init(managed.tunnel.client_url)     # ray://localhost:...
    print(managed.tunnel.dashboard_url)     # http://localhost:...
    print(managed.cluster.client_url)       # ray://10.0.0.1:10001
    # ... run distributed work ...
    ray.shutdown()
# Tunnels closed, then cluster deleted

# Use tunnel=False for in-cluster access (e.g. running inside the same K8s cluster)
with managed_cluster(config, tunnel=False) as managed:
    ray.init(managed.cluster.client_url)    # ray://10.0.0.1:10001
    assert managed.tunnel is None

wait_until_ready

Poll a cluster until it reaches the ready state or the timeout expires.

def wait_until_ready(
    name: str,
    namespace: str = "default",
    *,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
    timeout: int = 300,
    _poll_interval: float = 2.0,
) -> ClusterInfo

Parameters:

Parameter Type Default Description
name str Cluster name (required)
namespace str "default" Kubernetes namespace
client KubeClient \| None None Kubernetes client
kubeconfig str \| None None Path to kubeconfig file
timeout int 300 Maximum seconds to wait

Returns: ClusterInfo once the cluster is ready

Raises: ClusterTimeoutError, ClusterNotFoundError, KubeConnectionError


get_cluster_services()

Return the list of service names exposed on the cluster head node (e.g. ["dashboard", "client", "notebook", "ssh"]).

get_cluster_services(
    name: str,
    namespace: str = "default",
    *,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
) -> list[str]

Parameters:

Parameter Type Default Description
name str Cluster name (required)
namespace str "default" Kubernetes namespace
client KubeClient \| None None Kubernetes client
kubeconfig str \| None None Path to kubeconfig file

Returns: List of service name strings

Raises: ClusterNotFoundError, KubeConnectionError


Return types

All return types are immutable frozen dataclasses defined in krayne.api.types.

ClusterInfo

Summary information about a Ray cluster.

@dataclass(frozen=True)
class ClusterInfo:
    name: str                    # Cluster name
    namespace: str               # Kubernetes namespace
    status: str                  # Cluster status (e.g. "ready", "creating")
    head_ip: str | None          # Head node pod/service IP
    dashboard_url: str | None    # Ray dashboard URL
    client_url: str | None       # Ray client URL
    notebook_url: str | None     # Jupyter notebook URL (if enabled)
    code_server_url: str | None  # Code Server URL (if enabled)
    ssh_url: str | None          # SSH URL (if enabled)
    num_workers: int             # Total worker replicas
    autoscaling_enabled: bool    # Whether autoscaling is enabled
    created_at: str              # Creation timestamp

ClusterDetails

Extended cluster information with resource breakdown.

@dataclass(frozen=True)
class ClusterDetails:
    info: ClusterInfo                    # Summary info
    head: HeadNodeInfo                   # Head node resources
    worker_groups: list[WorkerGroupInfo] # Worker group details
    ray_version: str                     # Ray version
    python_version: str                  # Python version

HeadNodeInfo

Head node resource details.

@dataclass(frozen=True)
class HeadNodeInfo:
    cpus: str       # CPU count
    memory: str     # Memory (e.g. "48Gi")
    gpus: int       # GPU count
    image: str      # Container image

WorkerGroupInfo

Worker group resource details.

@dataclass(frozen=True)
class WorkerGroupInfo:
    name: str            # Worker group name
    replicas: int        # Desired number of replicas
    min_replicas: int    # Minimum replicas (autoscaling lower bound)
    max_replicas: int    # Maximum replicas (autoscaling upper bound)
    cpus: str            # CPUs per worker
    memory: str          # Memory per worker
    gpus: int            # GPUs per worker
    gpu_type: str | None # GPU accelerator type

ManagedClusterResult

Aggregated result from managed_cluster, combining cluster info with an optional tunnel session.

@dataclass(frozen=True)
class ManagedClusterResult:
    cluster: ClusterInfo              # Cluster information
    tunnel: TunnelSession | None      # Tunnel session (None if tunnel=False)

Access cluster URLs via result.cluster and tunnel URLs via result.tunnel:

with managed_cluster(config) as managed:
    # Tunnel (localhost) URLs
    managed.tunnel.client_url       # ray://localhost:12346
    managed.tunnel.dashboard_url    # http://localhost:12345

    # In-cluster IPs
    managed.cluster.client_url      # ray://10.0.0.1:10001
    managed.cluster.dashboard_url   # http://10.0.0.1:8265

TunnelSession

Active tunnel session with local URLs for all forwarded services.

@dataclass(frozen=True)
class TunnelSession:
    cluster_name: str          # Cluster name
    namespace: str             # Kubernetes namespace
    tunnels: list[TunnelInfo]  # List of active tunnels

Provides URL properties: dashboard_url, client_url, notebook_url, code_server_url, ssh_url — each returns the local URL for the corresponding service, or None if not tunneled.


KubeClient protocol

The KubeClient protocol defines the interface for Kubernetes operations. You can implement it for testing or custom backends.

from typing import Protocol, runtime_checkable

@runtime_checkable
class KubeClient(Protocol):
    def create_ray_cluster(self, manifest: dict) -> dict: ...
    def get_ray_cluster(self, name: str, namespace: str) -> dict: ...
    def list_ray_clusters(self, namespace: str) -> list[dict]: ...
    def patch_ray_cluster(self, name: str, namespace: str, patch: dict) -> dict: ...
    def delete_ray_cluster(self, name: str, namespace: str) -> None: ...
    def get_cluster_status(self, name: str, namespace: str) -> str: ...
    def list_pods(self, cluster_name: str, namespace: str) -> list[dict]: ...
    def get_head_node_port(self, cluster_name: str, namespace: str, port_name: str) -> int | None: ...

Any object that implements these methods satisfies the protocol — no inheritance required.

Example mock for testing:

from unittest.mock import MagicMock
from krayne.api import create_cluster
from krayne.config import ClusterConfig

mock_client = MagicMock()
mock_client.create_ray_cluster.return_value = {
    "metadata": {"name": "test", "namespace": "default", "creationTimestamp": "now"},
    "spec": {"workerGroupSpecs": [{"replicas": 1}]},
    "status": {"state": "ready", "head": {"podIP": "10.0.0.1"}},
}

config = ClusterConfig(name="test")
info = create_cluster(config, client=mock_client)
assert info.name == "test"
assert info.status == "ready"