Ray 集群与 AI:分布式计算架构
更新于 2025 年 12 月 11 日
2025 年 12 月更新: OpenAI 使用 Ray 协调 ChatGPT 训练。Ray 加入 PyTorch Foundation,验证了企业级采用的可行性。该框架可从笔记本电脑扩展到数千个 GPU 的集群。实现每秒数百万任务处理,延迟低于毫秒级——比 Spark 在 AI 模式下快一个数量级。原生异构计算支持 CPU/GPU 混合工作负载。
OpenAI 使用 Ray 协调 ChatGPT 及其他模型的训练。¹ 该框架可从笔记本电脑扩展到包含数千个 GPU 的集群,处理分布式计算的复杂性——否则每个项目都需要定制基础设施。Ray 在 2025 年的采用率急剧增长,随着该框架加入 PyTorch Foundation 以及 Anyscale 筹集资金支持企业部署,其价值得到了验证。² 理解 Ray 的架构和部署模式有助于组织构建可从实验扩展到生产的分布式 AI 基础设施。
Ray 为分布式 AI 工作负载(训练、调优、推理和数据处理)提供统一框架,在抽象集群管理复杂性的同时保持对资源分配的细粒度控制。对于从单 GPU 实验转向多节点生产系统的组织,Ray 提供了通往可扩展 AI 基础设施的最直接路径。
为什么选择 Ray 作为 AI 基础设施
Ray 诞生于加州大学伯克利分校的 RISELab,旨在解决传统框架(如 Apache Spark)难以处理的分布式 AI 工作负载的特定挑战:
AI 工作负载需求
异构计算: AI 流水线混合了 CPU 密集型数据处理与 GPU 加速的训练和推理。Ray 原生支持异构资源分配,根据工作负载需求在 CPU 和 GPU 之间调度任务。³
细粒度并行: 深度学习训练需要协调跨工作节点的梯度、管理模型状态并优雅地处理故障。Ray 的任务和 Actor 抽象提供了这些模式所需的原语。
有状态计算: 与 MapReduce 风格的框架不同,AI 工作负载通常需要跨迭代维护状态。Ray Actor 在调用之间保持状态,支持参数服务器和强化学习智能体等模式。
低延迟: 实时推理和超参数搜索需要微秒级任务调度。Ray 实现每秒数百万任务处理,延迟低于毫秒级——对于这些模式比 Spark 快一个数量级。⁴
与替代方案的比较
Apache Spark: 针对批量数据处理进行了优化,使用 SQL 和 DataFrames。擅长 ETL、特征工程和结构化数据分析。不太适合 GPU 工作负载、有状态计算和低延迟需求。⁵
Dask: Python 原生分布式计算,具有 DataFrame 和数组 API。比 Spark 更轻量,但缺乏 Ray 的 Actor 模型和机器学习专用库。
Horovod: 专注于分布式深度学习训练。对于多样化的 AI 工作负载不如 Ray 灵活,但对于纯训练场景更简单。
Ray 的优势: 单一框架处理数据处理、训练、超参数调优和推理服务。团队避免了管理多个系统及其之间的集成复杂性。
生产环境采用
主要组织在生产环境中运行 Ray:⁶
- OpenAI: ChatGPT 训练协调
- Uber: 分布式机器学习平台
- Instacart: 机器学习基础设施骨干
- Shopify: 生产环境机器学习工作负载
- 蚂蚁集团: 大规模金融机器学习
Ray 集群官方支持最多 2,000 个节点,实现前沿模型训练和高容量推理所需的规模。
Ray 架构
核心抽象
Ray 为分布式计算提供两个基本抽象:
任务: 远程执行的无状态函数。Ray 自动在可用工作节点之间调度任务、处理故障并返回结果。
import ray
ray.init()
@ray.remote
def process_batch(data):
# 在任意可用工作节点上运行
return transform(data)
# 并行执行
futures = [process_batch.remote(batch) for batch in batches]
results = ray.get(futures)
Actor: 分布在集群中的有状态对象。每个 Actor 在方法调用之间维护状态,支持模型服务、参数服务器和游戏环境等模式。
@ray.remote
class ModelServer:
def __init__(self, model_path):
self.model = load_model(model_path)
def predict(self, input_data):
return self.model(input_data)
# 创建 Actor 实例
server = ModelServer.remote("model.pt")
# 远程调用方法
prediction = ray.get(server.predict.remote(data))
集群架构
Ray 集群由头节点和工作节点组成:
头节点: 运行管理集群状态的全局控制存储(GCS)、跟踪数据位置的对象目录以及协调任务放置的调度器。
工作节点: 执行任务和 Actor。每个工作节点运行一个 Raylet 守护进程,处理本地调度和对象管理。
对象存储: 分布式共享内存,实现同一节点上任务之间的零拷贝数据传递和跨节点的高效序列化。
资源管理
Ray 显式管理异构资源:
@ray.remote(num_cpus=4, num_gpus=1)
def train_step(model, data):
# 保证 4 个 CPU 和 1 个 GPU
return model.train(data)
自定义资源实现细粒度调度:
# 请求特定硬件
@ray.remote(resources={"TPU": 1})
def tpu_inference(data):
return run_on_tpu(data)
Ray AI 库
Ray 包含针对常见 AI 工作负载的专用库:
Ray Train
支持 PyTorch、TensorFlow 和其他框架的分布式训练抽象:
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
def train_func():
# 标准 PyTorch 训练代码
model = MyModel()
for epoch in range(10):
train_epoch(model)
trainer = TorchTrainer(
train_loop_per_worker=train_func,
scaling_config=ScalingConfig(
num_workers=8,
use_gpu=True
),
)
result = trainer.fit()
主要功能:⁷ - 只需修改 2 行代码即可从单 GPU 扩展到多节点集群 - 自动分布式数据加载和梯度同步 - 检查点管理和故障恢复 - 与 PyTorch Lightning、Hugging Face Transformers 和 DeepSpeed 集成
Ray Tune
分布式超参数优化:
from ray import tune
from ray.tune.schedulers import ASHAScheduler
def training_function(config):
model = build_model(config["learning_rate"], config["batch_size"])
for epoch in range(100):
loss = train_epoch(model)
tune.report(loss=loss)
analysis = tune.run(
training_function,
config={
"learning_rate": tune.loguniform(1e-4, 1e-1),
"batch_size": tune.choice([32, 64, 128])
},
scheduler=ASHAScheduler(metric="loss", mode="min"),
num_samples=100,
)
Ray Tune 提供: - 跨集群并行试验执行 - 使用 ASHA、基于种群的训练和其他调度器进行早停 - 与 Optuna、HyperOpt 和其他优化库集成 - 检查点和恢复
Ray Serve
具有自动扩缩容的生产模型服务:
from ray import serve
@serve.deployment(num_replicas=2, ray_actor_options={"num_gpus": 1})
class LLMDeployment:
def __init__(self):
self.model = load_llm()
async def __call__(self, request):
prompt = await request.json()
return self.model.generate(prompt["text"])
serve.run(LLMDeployment.bind())
Ray Serve 功能:⁸ - 基于请求率的自动扩缩容 - 零停机部署 - 多模型组合和请求路由 - 用于 LLM 服务的 OpenAI 兼容 API - 通过前缀感知路由减少 60% 的首 token 响应时间⁹
Ray Data
分布式数据加载和预处理:
import ray
ds = ray.data.read_parquet("s3://bucket/data/")
ds = ds.map(preprocess)
ds = ds.filter(lambda x: x["label"] > 0)
# 转换为 PyTorch DataLoader
train_loader = ds.iter_torch_batches(batch_size=32)
Ray Data 提供: - 大型数据集的流式执行 - GPU 加速预处理 - 与 Ray Train 集成用于分布式训练 - 原生支持图像、文本和表格数据
使用 KubeRay 进行 Kubernetes 部署
生产环境的 Ray 部署通常使用官方 Kubernetes operator KubeRay 在 Kubernetes 上运行:¹⁰
KubeRay 组件
RayCluster CRD: 定义集群配置,包括头节点和工作节点规格、自动扩缩容策略和资源需求。
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: ml-cluster
spec:
headGroupSpec:
rayStartParams:
dashboard-host: '0.0.0.0'
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.52.0-py310-gpu
resources:
limits:
nvidia.com/gpu: 1
workerGroupSpecs:
- replicas: 4
minReplicas: 2
maxReplicas: 10
groupName: gpu-workers
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray:2.52.0-py310-gpu
resources:
limits:
nvidia.com/gpu: 1
RayJob CRD: 向自动配置的集群提交作业。KubeRay 创建集群、运行作业,并可选择在完成时删除集群。
RayService CRD: 托管的 Ray Serve 部署,具有零停机升级和健康检查。
生产环境最佳实践
容器镜像: 将依赖项预先构建到发布的 Docker 镜像中,而不是在运行时安装。这确保了可重现性和更快的启动速度。¹¹
自动扩缩容: 同时启用 Ray 自动扩缩容(在集群内扩展工作节点)和 Kubernetes 自动扩缩容(扩展集群节点):
spec:
enableInTreeAutoscaling: true
autoscalerOptions:
upscalingMode: Default
idleTimeoutSeconds: 60
存储: 使用持久卷存储检查点,使用共享存储存储大型数据集:
volumes:
- name: shared-storage
persistentVolumeClaim:
claimName: ml-data-pvc
监控: 与 Prometheus 和 Grafana 集成以实现可观测性:
metadata:
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8080"
特定云平台部署
GKE: GKE 上的 Ray 插件提供托管 Ray 集群,具有自动配置和与 Google Cloud 服务的集成。¹²
EKS: 使用 Cluster Autoscaler 或 Karpenter 部署 KubeRay 以实现节点扩展。与 FSx for Lustre 集成提供高性能共享存储。
AKS: Microsoft 和 Anyscale 提供 Anyscale on Azure 作为可从 Azure Portal 访问的第一方服务。¹³
Anyscale 托管 Ray
Anyscale 是由 Ray 创建者创立的公司,提供托管 Ray 部署:
Anyscale 平台
托管集群: 生产级 Ray 集群,具有自动扩缩容、容错和监控功能,无需基础设施管理。
RayTurbo 运行时: 专有性能改进,与开源 Ray 相比提供更高的弹性、更快的性能和更低的成本。¹⁴
企业功能: 基于角色的访问控制、审计日志、VPC 对等和合规认证。
云合作伙伴
CoreWeave: Anyscale BYOC(自带云)通过 CoreWeave Kubernetes Service 直接部署在 CoreWeave 客户账户中。¹⁵
Azure: 第一方 Anyscale 服务可在 Azure Portal 中使用,具有原生集成。
AWS: Anyscale 在 AWS 上运行,与现有 AWS 服务集成。
何时使用 Anyscale
**考虑使用 Anysc
[内容因翻译需要被截断]