Python SDK Reference¶
The Krayne SDK is a set of stateless functions for managing Ray clusters programmatically. All functions live in the krayne.api module.
from krayne.api import (
create_cluster,
get_cluster,
list_clusters,
describe_cluster,
scale_cluster,
delete_cluster,
wait_until_ready,
get_cluster_services,
open_tunnel,
)
Design principles¶
- Functional — free functions, not classes. No internal state to manage.
- Explicit dependencies — the
clientparameter makes the Kubernetes dependency visible and injectable. - Immutable returns — all return types are frozen dataclasses.
- Testable — pass a mock
KubeClientto test without a real cluster.
Functions¶
create_cluster¶
Create a new Ray cluster from a configuration object.
def create_cluster(
config: ClusterConfig,
*,
client: KubeClient | None = None,
kubeconfig: str | None = None,
wait: bool = False,
timeout: int = 300,
) -> ClusterInfo
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
config |
ClusterConfig |
— | Cluster configuration (required) |
client |
KubeClient \| None |
None |
Kubernetes client. Uses default kubeconfig if None. |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
wait |
bool |
False |
Block until the cluster is ready |
timeout |
int |
300 |
Timeout in seconds when wait=True |
Returns: ClusterInfo
Raises:
ClusterAlreadyExistsError— cluster name already in useNamespaceNotFoundError— namespace does not existClusterTimeoutError— cluster not ready within timeout (whenwait=True)KubeConnectionError— cannot reach the Kubernetes API
Example:
from krayne.api import create_cluster
from krayne.config import ClusterConfig, WorkerGroupConfig
config = ClusterConfig(
name="training-run",
namespace="ml-team",
worker_groups=[
WorkerGroupConfig(replicas=4, gpus=1)
],
)
info = create_cluster(config, wait=True, timeout=600)
print(f"Dashboard: {info.dashboard_url}")
get_cluster¶
Retrieve summary information for a single cluster.
def get_cluster(
name: str,
namespace: str = "default",
*,
client: KubeClient | None = None,
kubeconfig: str | None = None,
) -> ClusterInfo
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Cluster name (required) |
namespace |
str |
"default" |
Kubernetes namespace |
client |
KubeClient \| None |
None |
Kubernetes client |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
Returns: ClusterInfo
Raises: ClusterNotFoundError, KubeConnectionError
list_clusters¶
List all Ray clusters in a namespace.
def list_clusters(
namespace: str = "default",
*,
client: KubeClient | None = None,
kubeconfig: str | None = None,
) -> list[ClusterInfo]
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
namespace |
str |
"default" |
Kubernetes namespace |
client |
KubeClient \| None |
None |
Kubernetes client |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
Returns: list[ClusterInfo]
Raises: KubeConnectionError
describe_cluster¶
Get extended details for a cluster, including head node and worker group resource breakdowns.
def describe_cluster(
name: str,
namespace: str = "default",
*,
client: KubeClient | None = None,
kubeconfig: str | None = None,
) -> ClusterDetails
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Cluster name (required) |
namespace |
str |
"default" |
Kubernetes namespace |
client |
KubeClient \| None |
None |
Kubernetes client |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
Returns: ClusterDetails
Raises: ClusterNotFoundError, KubeConnectionError
scale_cluster¶
Scale a worker group's desired, minimum, or maximum replica count.
When autoscaling is enabled on the cluster, only the explicitly provided fields are patched. When autoscaling is disabled, all three fields are pinned to the replicas value.
def scale_cluster(
name: str,
namespace: str,
worker_group: str,
replicas: int | None = None,
*,
min_replicas: int | None = None,
max_replicas: int | None = None,
client: KubeClient | None = None,
kubeconfig: str | None = None,
) -> ClusterInfo
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Cluster name (required) |
namespace |
str |
— | Kubernetes namespace (required) |
worker_group |
str |
— | Name of the worker group to scale (required) |
replicas |
int \| None |
None |
Target desired replica count |
min_replicas |
int \| None |
None |
Minimum replicas for autoscaling |
max_replicas |
int \| None |
None |
Maximum replicas for autoscaling |
client |
KubeClient \| None |
None |
Kubernetes client |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
At least one of replicas, min_replicas, or max_replicas must be provided.
Returns: ClusterInfo
Raises: KrayneError (worker group not found or no arguments), ClusterNotFoundError, KubeConnectionError
delete_cluster¶
Delete a Ray cluster.
def delete_cluster(
name: str,
namespace: str = "default",
*,
client: KubeClient | None = None,
kubeconfig: str | None = None,
) -> None
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Cluster name (required) |
namespace |
str |
"default" |
Kubernetes namespace |
client |
KubeClient \| None |
None |
Kubernetes client |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
Raises: ClusterNotFoundError, KubeConnectionError
wait_until_ready¶
Poll a cluster until it reaches the ready state or the timeout expires.
def wait_until_ready(
name: str,
namespace: str = "default",
*,
client: KubeClient | None = None,
kubeconfig: str | None = None,
timeout: int = 300,
_poll_interval: float = 2.0,
) -> ClusterInfo
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Cluster name (required) |
namespace |
str |
"default" |
Kubernetes namespace |
client |
KubeClient \| None |
None |
Kubernetes client |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
timeout |
int |
300 |
Maximum seconds to wait |
Returns: ClusterInfo once the cluster is ready
Raises: ClusterTimeoutError, ClusterNotFoundError, KubeConnectionError
get_cluster_services()¶
Return the list of service names exposed on the cluster head node (e.g. ["dashboard", "client", "notebook", "ssh"]).
get_cluster_services(
name: str,
namespace: str = "default",
*,
client: KubeClient | None = None,
kubeconfig: str | None = None,
) -> list[str]
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Cluster name (required) |
namespace |
str |
"default" |
Kubernetes namespace |
client |
KubeClient \| None |
None |
Kubernetes client |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
Returns: List of service name strings
Raises: ClusterNotFoundError, KubeConnectionError
open_tunnel¶
Context manager that opens port-forward tunnels to all detected services on a cluster and closes them on exit. Use it from your laptop to make ray.init (and the dashboard, notebook, etc.) reachable on localhost.
@contextmanager
def open_tunnel(
name: str,
namespace: str = "default",
*,
client: KubeClient | None = None,
kubeconfig: str | None = None,
) -> Iterator[TunnelSession]
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Cluster name (required) |
namespace |
str |
"default" |
Kubernetes namespace |
client |
KubeClient \| None |
None |
Kubernetes client |
kubeconfig |
str \| None |
None |
Path to kubeconfig file |
Yields: TunnelSession — exposes client_url, dashboard_url, notebook_url, code_server_url, ssh_url as localhost URLs. Properties for services that aren't tunneled return None.
Tunnels are always closed on exit, even if an exception is raised inside the with block.
Raises: ClusterNotFoundError, KubeConnectionError
Prefer krayne submit for running jobs
To run a Python script as a Ray job, the recommended path is the krayne submit CLI — it ensures a tunnel exists and submits the job via ray job submit, so the driver runs inside the cluster and your local Python version is irrelevant. Reach for open_tunnel directly when you need raw tunnel access (e.g. to browse the dashboard, hit JobSubmissionClient programmatically, or as part of an SDK workflow that already has a pinned Python).
Example — dashboard / JobSubmissionClient (no version match required):
from krayne.api import open_tunnel
from ray.job_submission import JobSubmissionClient
with open_tunnel("my-cluster") as session:
print(session.dashboard_url) # http://localhost:...
client = JobSubmissionClient(session.dashboard_url)
job_id = client.submit_job(
entrypoint="python demo.py",
runtime_env={"working_dir": "."},
)
print(f"{session.dashboard_url}/#/jobs/{job_id}")
# tunnel closed when the block exits
Example — Ray Client (ray.init("ray://…"), advanced):
Strict Python and Ray version match required
ray.init("ray://…") performs a major.minor.patch Python check and an exact Ray-version check against the cluster image at handshake time. A single patch difference (e.g. 3.12.6 vs 3.12.9) is rejected. This is a known Ray pain point, not specific to krayne. Only use this path if your local interpreter is pinned to match rayproject/ray:<ver>-pyXY; otherwise prefer krayne submit or JobSubmissionClient (above).
import ray
from krayne.api import open_tunnel
with open_tunnel("my-cluster") as session:
ray.init(session.client_url) # ray://localhost:...
@ray.remote
def hello(i: int) -> str:
return f"Hello from worker {i}"
print(ray.get([hello.remote(i) for i in range(4)]))
ray.shutdown()
# tunnels closed when the block exits
Return types¶
All return types are immutable frozen dataclasses defined in krayne.api.types.
ClusterInfo¶
Summary information about a Ray cluster.
@dataclass(frozen=True)
class ClusterInfo:
name: str # Cluster name
namespace: str # Kubernetes namespace
status: str # Cluster status (e.g. "ready", "creating")
head_ip: str | None # Head node pod/service IP
dashboard_url: str | None # Ray dashboard URL
client_url: str | None # Ray client URL
notebook_url: str | None # Jupyter notebook URL (if enabled)
code_server_url: str | None # Code Server URL (if enabled)
ssh_url: str | None # SSH URL (if enabled)
num_workers: int # Total worker replicas
autoscaling_enabled: bool # Whether autoscaling is enabled
created_at: str # Creation timestamp
ClusterDetails¶
Extended cluster information with resource breakdown.
@dataclass(frozen=True)
class ClusterDetails:
info: ClusterInfo # Summary info
head: HeadNodeInfo # Head node resources
worker_groups: list[WorkerGroupInfo] # Worker group details
ray_version: str # Ray version
python_version: str # Python version
HeadNodeInfo¶
Head node resource details.
@dataclass(frozen=True)
class HeadNodeInfo:
cpus: str # CPU count (e.g. "1", "500m")
memory: str # Memory (e.g. "4Gi")
gpus: int # GPU count parsed from the head pod's nvidia.com/gpu requests
image: str # Container image
runs_tasks: bool # Derived from rayStartParams["num-cpus"] != 0 — when False, head is a control plane
WorkerGroupInfo¶
Worker group resource details.
@dataclass(frozen=True)
class WorkerGroupInfo:
name: str # Worker group name
replicas: int # Desired number of replicas
min_replicas: int # Minimum replicas (autoscaling lower bound)
max_replicas: int # Maximum replicas (autoscaling upper bound)
cpus: str # CPUs per worker
memory: str # Memory per worker
gpus: int # GPUs per worker
TunnelSession¶
Active tunnel session with local URLs for all forwarded services.
@dataclass(frozen=True)
class TunnelSession:
cluster_name: str # Cluster name
namespace: str # Kubernetes namespace
tunnels: list[TunnelInfo] # List of active tunnels
Provides URL properties: dashboard_url, client_url, notebook_url, code_server_url, ssh_url — each returns the local URL for the corresponding service, or None if not tunneled.
KubeClient protocol¶
The KubeClient protocol defines the interface for Kubernetes operations. You can implement it for testing or custom backends.
from typing import Protocol, runtime_checkable
@runtime_checkable
class KubeClient(Protocol):
def create_ray_cluster(self, manifest: dict) -> dict: ...
def get_ray_cluster(self, name: str, namespace: str) -> dict: ...
def list_ray_clusters(self, namespace: str) -> list[dict]: ...
def patch_ray_cluster(self, name: str, namespace: str, patch: dict) -> dict: ...
def delete_ray_cluster(self, name: str, namespace: str) -> None: ...
def get_cluster_status(self, name: str, namespace: str) -> str: ...
def list_pods(self, cluster_name: str, namespace: str) -> list[dict]: ...
def get_head_node_port(self, cluster_name: str, namespace: str, port_name: str) -> int | None: ...
def list_namespaces(self) -> list[str]: ...
Any object that implements these methods satisfies the protocol — no inheritance required.
Example mock for testing:
from unittest.mock import MagicMock
from krayne.api import create_cluster
from krayne.config import ClusterConfig
mock_client = MagicMock()
mock_client.create_ray_cluster.return_value = {
"metadata": {"name": "test", "namespace": "default", "creationTimestamp": "now"},
"spec": {"workerGroupSpecs": [{"replicas": 1}]},
"status": {"state": "ready", "head": {"podIP": "10.0.0.1"}},
}
config = ClusterConfig(name="test")
info = create_cluster(config, client=mock_client)
assert info.name == "test"
assert info.status == "ready"