Skip to content

Python SDK Reference

The Krayne SDK is a set of stateless functions for managing Ray clusters programmatically. All functions live in the krayne.api module.

from krayne.api import (
    create_cluster,
    get_cluster,
    list_clusters,
    describe_cluster,
    scale_cluster,
    delete_cluster,
    wait_until_ready,
    get_cluster_services,
    open_tunnel,
)

Design principles

  • Functional — free functions, not classes. No internal state to manage.
  • Explicit dependencies — the client parameter makes the Kubernetes dependency visible and injectable.
  • Immutable returns — all return types are frozen dataclasses.
  • Testable — pass a mock KubeClient to test without a real cluster.

Functions

create_cluster

Create a new Ray cluster from a configuration object.

def create_cluster(
    config: ClusterConfig,
    *,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
    wait: bool = False,
    timeout: int = 300,
) -> ClusterInfo

Parameters:

Parameter Type Default Description
config ClusterConfig Cluster configuration (required)
client KubeClient \| None None Kubernetes client. Uses default kubeconfig if None.
kubeconfig str \| None None Path to kubeconfig file
wait bool False Block until the cluster is ready
timeout int 300 Timeout in seconds when wait=True

Returns: ClusterInfo

Raises:

  • ClusterAlreadyExistsError — cluster name already in use
  • NamespaceNotFoundError — namespace does not exist
  • ClusterTimeoutError — cluster not ready within timeout (when wait=True)
  • KubeConnectionError — cannot reach the Kubernetes API

Example:

from krayne.api import create_cluster
from krayne.config import ClusterConfig, WorkerGroupConfig

config = ClusterConfig(
    name="training-run",
    namespace="ml-team",
    worker_groups=[
        WorkerGroupConfig(replicas=4, gpus=1)
    ],
)
info = create_cluster(config, wait=True, timeout=600)
print(f"Dashboard: {info.dashboard_url}")

get_cluster

Retrieve summary information for a single cluster.

def get_cluster(
    name: str,
    namespace: str = "default",
    *,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
) -> ClusterInfo

Parameters:

Parameter Type Default Description
name str Cluster name (required)
namespace str "default" Kubernetes namespace
client KubeClient \| None None Kubernetes client
kubeconfig str \| None None Path to kubeconfig file

Returns: ClusterInfo

Raises: ClusterNotFoundError, KubeConnectionError


list_clusters

List all Ray clusters in a namespace.

def list_clusters(
    namespace: str = "default",
    *,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
) -> list[ClusterInfo]

Parameters:

Parameter Type Default Description
namespace str "default" Kubernetes namespace
client KubeClient \| None None Kubernetes client
kubeconfig str \| None None Path to kubeconfig file

Returns: list[ClusterInfo]

Raises: KubeConnectionError


describe_cluster

Get extended details for a cluster, including head node and worker group resource breakdowns.

def describe_cluster(
    name: str,
    namespace: str = "default",
    *,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
) -> ClusterDetails

Parameters:

Parameter Type Default Description
name str Cluster name (required)
namespace str "default" Kubernetes namespace
client KubeClient \| None None Kubernetes client
kubeconfig str \| None None Path to kubeconfig file

Returns: ClusterDetails

Raises: ClusterNotFoundError, KubeConnectionError


scale_cluster

Scale a worker group's desired, minimum, or maximum replica count.

When autoscaling is enabled on the cluster, only the explicitly provided fields are patched. When autoscaling is disabled, all three fields are pinned to the replicas value.

def scale_cluster(
    name: str,
    namespace: str,
    worker_group: str,
    replicas: int | None = None,
    *,
    min_replicas: int | None = None,
    max_replicas: int | None = None,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
) -> ClusterInfo

Parameters:

Parameter Type Default Description
name str Cluster name (required)
namespace str Kubernetes namespace (required)
worker_group str Name of the worker group to scale (required)
replicas int \| None None Target desired replica count
min_replicas int \| None None Minimum replicas for autoscaling
max_replicas int \| None None Maximum replicas for autoscaling
client KubeClient \| None None Kubernetes client
kubeconfig str \| None None Path to kubeconfig file

At least one of replicas, min_replicas, or max_replicas must be provided.

Returns: ClusterInfo

Raises: KrayneError (worker group not found or no arguments), ClusterNotFoundError, KubeConnectionError


delete_cluster

Delete a Ray cluster.

def delete_cluster(
    name: str,
    namespace: str = "default",
    *,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
) -> None

Parameters:

Parameter Type Default Description
name str Cluster name (required)
namespace str "default" Kubernetes namespace
client KubeClient \| None None Kubernetes client
kubeconfig str \| None None Path to kubeconfig file

Raises: ClusterNotFoundError, KubeConnectionError


wait_until_ready

Poll a cluster until it reaches the ready state or the timeout expires.

def wait_until_ready(
    name: str,
    namespace: str = "default",
    *,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
    timeout: int = 300,
    _poll_interval: float = 2.0,
) -> ClusterInfo

Parameters:

Parameter Type Default Description
name str Cluster name (required)
namespace str "default" Kubernetes namespace
client KubeClient \| None None Kubernetes client
kubeconfig str \| None None Path to kubeconfig file
timeout int 300 Maximum seconds to wait

Returns: ClusterInfo once the cluster is ready

Raises: ClusterTimeoutError, ClusterNotFoundError, KubeConnectionError


get_cluster_services()

Return the list of service names exposed on the cluster head node (e.g. ["dashboard", "client", "notebook", "ssh"]).

get_cluster_services(
    name: str,
    namespace: str = "default",
    *,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
) -> list[str]

Parameters:

Parameter Type Default Description
name str Cluster name (required)
namespace str "default" Kubernetes namespace
client KubeClient \| None None Kubernetes client
kubeconfig str \| None None Path to kubeconfig file

Returns: List of service name strings

Raises: ClusterNotFoundError, KubeConnectionError


open_tunnel

Context manager that opens port-forward tunnels to all detected services on a cluster and closes them on exit. Use it from your laptop to make ray.init (and the dashboard, notebook, etc.) reachable on localhost.

@contextmanager
def open_tunnel(
    name: str,
    namespace: str = "default",
    *,
    client: KubeClient | None = None,
    kubeconfig: str | None = None,
) -> Iterator[TunnelSession]

Parameters:

Parameter Type Default Description
name str Cluster name (required)
namespace str "default" Kubernetes namespace
client KubeClient \| None None Kubernetes client
kubeconfig str \| None None Path to kubeconfig file

Yields: TunnelSession — exposes client_url, dashboard_url, notebook_url, code_server_url, ssh_url as localhost URLs. Properties for services that aren't tunneled return None.

Tunnels are always closed on exit, even if an exception is raised inside the with block.

Raises: ClusterNotFoundError, KubeConnectionError

Prefer krayne submit for running jobs

To run a Python script as a Ray job, the recommended path is the krayne submit CLI — it ensures a tunnel exists and submits the job via ray job submit, so the driver runs inside the cluster and your local Python version is irrelevant. Reach for open_tunnel directly when you need raw tunnel access (e.g. to browse the dashboard, hit JobSubmissionClient programmatically, or as part of an SDK workflow that already has a pinned Python).

Example — dashboard / JobSubmissionClient (no version match required):

from krayne.api import open_tunnel
from ray.job_submission import JobSubmissionClient

with open_tunnel("my-cluster") as session:
    print(session.dashboard_url)            # http://localhost:...
    client = JobSubmissionClient(session.dashboard_url)
    job_id = client.submit_job(
        entrypoint="python demo.py",
        runtime_env={"working_dir": "."},
    )
    print(f"{session.dashboard_url}/#/jobs/{job_id}")
# tunnel closed when the block exits

Example — Ray Client (ray.init("ray://…"), advanced):

Strict Python and Ray version match required

ray.init("ray://…") performs a major.minor.patch Python check and an exact Ray-version check against the cluster image at handshake time. A single patch difference (e.g. 3.12.6 vs 3.12.9) is rejected. This is a known Ray pain point, not specific to krayne. Only use this path if your local interpreter is pinned to match rayproject/ray:<ver>-pyXY; otherwise prefer krayne submit or JobSubmissionClient (above).

import ray
from krayne.api import open_tunnel

with open_tunnel("my-cluster") as session:
    ray.init(session.client_url)            # ray://localhost:...

    @ray.remote
    def hello(i: int) -> str:
        return f"Hello from worker {i}"

    print(ray.get([hello.remote(i) for i in range(4)]))
    ray.shutdown()
# tunnels closed when the block exits

Return types

All return types are immutable frozen dataclasses defined in krayne.api.types.

ClusterInfo

Summary information about a Ray cluster.

@dataclass(frozen=True)
class ClusterInfo:
    name: str                    # Cluster name
    namespace: str               # Kubernetes namespace
    status: str                  # Cluster status (e.g. "ready", "creating")
    head_ip: str | None          # Head node pod/service IP
    dashboard_url: str | None    # Ray dashboard URL
    client_url: str | None       # Ray client URL
    notebook_url: str | None     # Jupyter notebook URL (if enabled)
    code_server_url: str | None  # Code Server URL (if enabled)
    ssh_url: str | None          # SSH URL (if enabled)
    num_workers: int             # Total worker replicas
    autoscaling_enabled: bool    # Whether autoscaling is enabled
    created_at: str              # Creation timestamp

ClusterDetails

Extended cluster information with resource breakdown.

@dataclass(frozen=True)
class ClusterDetails:
    info: ClusterInfo                    # Summary info
    head: HeadNodeInfo                   # Head node resources
    worker_groups: list[WorkerGroupInfo] # Worker group details
    ray_version: str                     # Ray version
    python_version: str                  # Python version

HeadNodeInfo

Head node resource details.

@dataclass(frozen=True)
class HeadNodeInfo:
    cpus: str            # CPU count (e.g. "1", "500m")
    memory: str          # Memory (e.g. "4Gi")
    gpus: int            # GPU count parsed from the head pod's nvidia.com/gpu requests
    image: str           # Container image
    runs_tasks: bool     # Derived from rayStartParams["num-cpus"] != 0 — when False, head is a control plane

WorkerGroupInfo

Worker group resource details.

@dataclass(frozen=True)
class WorkerGroupInfo:
    name: str            # Worker group name
    replicas: int        # Desired number of replicas
    min_replicas: int    # Minimum replicas (autoscaling lower bound)
    max_replicas: int    # Maximum replicas (autoscaling upper bound)
    cpus: str            # CPUs per worker
    memory: str          # Memory per worker
    gpus: int            # GPUs per worker

TunnelSession

Active tunnel session with local URLs for all forwarded services.

@dataclass(frozen=True)
class TunnelSession:
    cluster_name: str          # Cluster name
    namespace: str             # Kubernetes namespace
    tunnels: list[TunnelInfo]  # List of active tunnels

Provides URL properties: dashboard_url, client_url, notebook_url, code_server_url, ssh_url — each returns the local URL for the corresponding service, or None if not tunneled.


KubeClient protocol

The KubeClient protocol defines the interface for Kubernetes operations. You can implement it for testing or custom backends.

from typing import Protocol, runtime_checkable

@runtime_checkable
class KubeClient(Protocol):
    def create_ray_cluster(self, manifest: dict) -> dict: ...
    def get_ray_cluster(self, name: str, namespace: str) -> dict: ...
    def list_ray_clusters(self, namespace: str) -> list[dict]: ...
    def patch_ray_cluster(self, name: str, namespace: str, patch: dict) -> dict: ...
    def delete_ray_cluster(self, name: str, namespace: str) -> None: ...
    def get_cluster_status(self, name: str, namespace: str) -> str: ...
    def list_pods(self, cluster_name: str, namespace: str) -> list[dict]: ...
    def get_head_node_port(self, cluster_name: str, namespace: str, port_name: str) -> int | None: ...
    def list_namespaces(self) -> list[str]: ...

Any object that implements these methods satisfies the protocol — no inheritance required.

Example mock for testing:

from unittest.mock import MagicMock
from krayne.api import create_cluster
from krayne.config import ClusterConfig

mock_client = MagicMock()
mock_client.create_ray_cluster.return_value = {
    "metadata": {"name": "test", "namespace": "default", "creationTimestamp": "now"},
    "spec": {"workerGroupSpecs": [{"replicas": 1}]},
    "status": {"state": "ready", "head": {"podIP": "10.0.0.1"}},
}

config = ClusterConfig(name="test")
info = create_cluster(config, client=mock_client)
assert info.name == "test"
assert info.status == "ready"