Python SDK Reference¶
The Krayne SDK is a set of stateless functions for managing Ray clusters programmatically. All functions live in the krayne.api module.
from krayne.api import (
create_cluster,
get_cluster,
list_clusters,
describe_cluster,
scale_cluster,
delete_cluster,
managed_cluster,
wait_until_ready,
)
Design principles¶
- Functional — free functions, not classes. No internal state to manage.
- Explicit dependencies — the
clientparameter makes the Kubernetes dependency visible and injectable. - Immutable returns — all return types are frozen dataclasses.
- Testable — pass a mock
KubeClientto test without a real cluster.
Functions¶
create_cluster¶
Create a new Ray cluster from a configuration object.
def create_cluster(
config: ClusterConfig,
*,
client: KubeClient | None = None,
kubeconfig: str | None = None,
wait: bool = False,
timeout: int = 300,
) -> ClusterInfo
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
config |
ClusterConfig |
— | Cluster configuration (required) |
client |
KubeClient \| None |
None |
Kubernetes client. Uses default kubeconfig if None. |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
wait |
bool |
False |
Block until the cluster is ready |
timeout |
int |
300 |
Timeout in seconds when wait=True |
Returns: ClusterInfo
Raises:
ClusterAlreadyExistsError— cluster name already in useNamespaceNotFoundError— namespace does not existClusterTimeoutError— cluster not ready within timeout (whenwait=True)KubeConnectionError— cannot reach the Kubernetes API
Example:
from krayne.api import create_cluster
from krayne.config import ClusterConfig, WorkerGroupConfig
config = ClusterConfig(
name="training-run",
namespace="ml-team",
worker_groups=[
WorkerGroupConfig(replicas=4, gpus=1, gpu_type="a100")
],
)
info = create_cluster(config, wait=True, timeout=600)
print(f"Dashboard: {info.dashboard_url}")
get_cluster¶
Retrieve summary information for a single cluster.
def get_cluster(
name: str,
namespace: str = "default",
*,
client: KubeClient | None = None,
kubeconfig: str | None = None,
) -> ClusterInfo
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Cluster name (required) |
namespace |
str |
"default" |
Kubernetes namespace |
client |
KubeClient \| None |
None |
Kubernetes client |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
Returns: ClusterInfo
Raises: ClusterNotFoundError, KubeConnectionError
list_clusters¶
List all Ray clusters in a namespace.
def list_clusters(
namespace: str = "default",
*,
client: KubeClient | None = None,
kubeconfig: str | None = None,
) -> list[ClusterInfo]
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
namespace |
str |
"default" |
Kubernetes namespace |
client |
KubeClient \| None |
None |
Kubernetes client |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
Returns: list[ClusterInfo]
Raises: KubeConnectionError
describe_cluster¶
Get extended details for a cluster, including head node and worker group resource breakdowns.
def describe_cluster(
name: str,
namespace: str = "default",
*,
client: KubeClient | None = None,
kubeconfig: str | None = None,
) -> ClusterDetails
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Cluster name (required) |
namespace |
str |
"default" |
Kubernetes namespace |
client |
KubeClient \| None |
None |
Kubernetes client |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
Returns: ClusterDetails
Raises: ClusterNotFoundError, KubeConnectionError
scale_cluster¶
Scale a worker group's desired, minimum, or maximum replica count.
When autoscaling is enabled on the cluster, only the explicitly provided fields are patched. When autoscaling is disabled, all three fields are pinned to the replicas value.
def scale_cluster(
name: str,
namespace: str,
worker_group: str,
replicas: int | None = None,
*,
min_replicas: int | None = None,
max_replicas: int | None = None,
client: KubeClient | None = None,
kubeconfig: str | None = None,
) -> ClusterInfo
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Cluster name (required) |
namespace |
str |
— | Kubernetes namespace (required) |
worker_group |
str |
— | Name of the worker group to scale (required) |
replicas |
int \| None |
None |
Target desired replica count |
min_replicas |
int \| None |
None |
Minimum replicas for autoscaling |
max_replicas |
int \| None |
None |
Maximum replicas for autoscaling |
client |
KubeClient \| None |
None |
Kubernetes client |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
At least one of replicas, min_replicas, or max_replicas must be provided.
Returns: ClusterInfo
Raises: KrayneError (worker group not found or no arguments), ClusterNotFoundError, KubeConnectionError
delete_cluster¶
Delete a Ray cluster.
def delete_cluster(
name: str,
namespace: str = "default",
*,
client: KubeClient | None = None,
kubeconfig: str | None = None,
) -> None
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Cluster name (required) |
namespace |
str |
"default" |
Kubernetes namespace |
client |
KubeClient \| None |
None |
Kubernetes client |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
Raises: ClusterNotFoundError, KubeConnectionError
managed_cluster¶
Context manager that creates a cluster, waits for readiness, optionally opens tunnels, and cleans up everything on exit.
def managed_cluster(
config: ClusterConfig,
*,
client: KubeClient | None = None,
kubeconfig: str | None = None,
timeout: int = 300,
tunnel: bool = True,
) -> ContextManager[ManagedClusterResult]
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
config |
ClusterConfig |
— | Cluster configuration (required) |
client |
KubeClient \| None |
None |
Kubernetes client |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
timeout |
int |
300 |
Timeout in seconds for cluster readiness |
tunnel |
bool |
True |
Open port-forward tunnels to cluster services |
Yields: ManagedClusterResult once the cluster is ready
Raises:
ClusterAlreadyExistsError— cluster name already in useNamespaceNotFoundError— namespace does not existClusterTimeoutError— cluster not ready within timeoutKubeConnectionError— cannot reach the Kubernetes API
The cluster is always deleted on exit, even if an exception occurs inside the with block. When tunnel=True, tunnels are closed before the cluster is deleted.
Example:
import ray
from krayne.api import managed_cluster
from krayne.config import ClusterConfig, WorkerGroupConfig
config = ClusterConfig(
name="experiment",
worker_groups=[WorkerGroupConfig(replicas=2, gpus=1, gpu_type="a100")],
)
# Tunnels are opened by default
with managed_cluster(config, timeout=600) as managed:
ray.init(managed.tunnel.client_url) # ray://localhost:...
print(managed.tunnel.dashboard_url) # http://localhost:...
print(managed.cluster.client_url) # ray://10.0.0.1:10001
# ... run distributed work ...
ray.shutdown()
# Tunnels closed, then cluster deleted
# Use tunnel=False for in-cluster access (e.g. running inside the same K8s cluster)
with managed_cluster(config, tunnel=False) as managed:
ray.init(managed.cluster.client_url) # ray://10.0.0.1:10001
assert managed.tunnel is None
wait_until_ready¶
Poll a cluster until it reaches the ready state or the timeout expires.
def wait_until_ready(
name: str,
namespace: str = "default",
*,
client: KubeClient | None = None,
kubeconfig: str | None = None,
timeout: int = 300,
_poll_interval: float = 2.0,
) -> ClusterInfo
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Cluster name (required) |
namespace |
str |
"default" |
Kubernetes namespace |
client |
KubeClient \| None |
None |
Kubernetes client |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
timeout |
int |
300 |
Maximum seconds to wait |
Returns: ClusterInfo once the cluster is ready
Raises: ClusterTimeoutError, ClusterNotFoundError, KubeConnectionError
get_cluster_services()¶
Return the list of service names exposed on the cluster head node (e.g. ["dashboard", "client", "notebook", "ssh"]).
get_cluster_services(
name: str,
namespace: str = "default",
*,
client: KubeClient | None = None,
kubeconfig: str | None = None,
) -> list[str]
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Cluster name (required) |
namespace |
str |
"default" |
Kubernetes namespace |
client |
KubeClient \| None |
None |
Kubernetes client |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
Returns: List of service name strings
Raises: ClusterNotFoundError, KubeConnectionError
Return types¶
All return types are immutable frozen dataclasses defined in krayne.api.types.
ClusterInfo¶
Summary information about a Ray cluster.
@dataclass(frozen=True)
class ClusterInfo:
name: str # Cluster name
namespace: str # Kubernetes namespace
status: str # Cluster status (e.g. "ready", "creating")
head_ip: str | None # Head node pod/service IP
dashboard_url: str | None # Ray dashboard URL
client_url: str | None # Ray client URL
notebook_url: str | None # Jupyter notebook URL (if enabled)
code_server_url: str | None # Code Server URL (if enabled)
ssh_url: str | None # SSH URL (if enabled)
num_workers: int # Total worker replicas
autoscaling_enabled: bool # Whether autoscaling is enabled
created_at: str # Creation timestamp
ClusterDetails¶
Extended cluster information with resource breakdown.
@dataclass(frozen=True)
class ClusterDetails:
info: ClusterInfo # Summary info
head: HeadNodeInfo # Head node resources
worker_groups: list[WorkerGroupInfo] # Worker group details
ray_version: str # Ray version
python_version: str # Python version
HeadNodeInfo¶
Head node resource details.
@dataclass(frozen=True)
class HeadNodeInfo:
cpus: str # CPU count
memory: str # Memory (e.g. "48Gi")
gpus: int # GPU count
image: str # Container image
WorkerGroupInfo¶
Worker group resource details.
@dataclass(frozen=True)
class WorkerGroupInfo:
name: str # Worker group name
replicas: int # Desired number of replicas
min_replicas: int # Minimum replicas (autoscaling lower bound)
max_replicas: int # Maximum replicas (autoscaling upper bound)
cpus: str # CPUs per worker
memory: str # Memory per worker
gpus: int # GPUs per worker
gpu_type: str | None # GPU accelerator type
ManagedClusterResult¶
Aggregated result from managed_cluster, combining cluster info with an optional tunnel session.
@dataclass(frozen=True)
class ManagedClusterResult:
cluster: ClusterInfo # Cluster information
tunnel: TunnelSession | None # Tunnel session (None if tunnel=False)
Access cluster URLs via result.cluster and tunnel URLs via result.tunnel:
with managed_cluster(config) as managed:
# Tunnel (localhost) URLs
managed.tunnel.client_url # ray://localhost:12346
managed.tunnel.dashboard_url # http://localhost:12345
# In-cluster IPs
managed.cluster.client_url # ray://10.0.0.1:10001
managed.cluster.dashboard_url # http://10.0.0.1:8265
TunnelSession¶
Active tunnel session with local URLs for all forwarded services.
@dataclass(frozen=True)
class TunnelSession:
cluster_name: str # Cluster name
namespace: str # Kubernetes namespace
tunnels: list[TunnelInfo] # List of active tunnels
Provides URL properties: dashboard_url, client_url, notebook_url, code_server_url, ssh_url — each returns the local URL for the corresponding service, or None if not tunneled.
KubeClient protocol¶
The KubeClient protocol defines the interface for Kubernetes operations. You can implement it for testing or custom backends.
from typing import Protocol, runtime_checkable
@runtime_checkable
class KubeClient(Protocol):
def create_ray_cluster(self, manifest: dict) -> dict: ...
def get_ray_cluster(self, name: str, namespace: str) -> dict: ...
def list_ray_clusters(self, namespace: str) -> list[dict]: ...
def patch_ray_cluster(self, name: str, namespace: str, patch: dict) -> dict: ...
def delete_ray_cluster(self, name: str, namespace: str) -> None: ...
def get_cluster_status(self, name: str, namespace: str) -> str: ...
def list_pods(self, cluster_name: str, namespace: str) -> list[dict]: ...
def get_head_node_port(self, cluster_name: str, namespace: str, port_name: str) -> int | None: ...
Any object that implements these methods satisfies the protocol — no inheritance required.
Example mock for testing:
from unittest.mock import MagicMock
from krayne.api import create_cluster
from krayne.config import ClusterConfig
mock_client = MagicMock()
mock_client.create_ray_cluster.return_value = {
"metadata": {"name": "test", "namespace": "default", "creationTimestamp": "now"},
"spec": {"workerGroupSpecs": [{"replicas": 1}]},
"status": {"state": "ready", "head": {"podIP": "10.0.0.1"}},
}
config = ClusterConfig(name="test")
info = create_cluster(config, client=mock_client)
assert info.name == "test"
assert info.status == "ready"