Ray 集群与 AI:分布式计算架构

Ray 集群与 AI:分布式计算架构

Ray 集群与 AI:分布式计算架构

更新于 2025 年 12 月 11 日

2025 年 12 月更新: OpenAI 使用 Ray 协调 ChatGPT 训练。Ray 加入 PyTorch Foundation,验证了企业级采用的可行性。该框架可从笔记本电脑扩展到数千个 GPU 的集群。实现每秒数百万任务处理,延迟低于毫秒级——比 Spark 在 AI 模式下快一个数量级。原生异构计算支持 CPU/GPU 混合工作负载。

OpenAI 使用 Ray 协调 ChatGPT 及其他模型的训练。¹ 该框架可从笔记本电脑扩展到包含数千个 GPU 的集群,处理分布式计算的复杂性——否则每个项目都需要定制基础设施。Ray 在 2025 年的采用率急剧增长,随着该框架加入 PyTorch Foundation 以及 Anyscale 筹集资金支持企业部署,其价值得到了验证。² 理解 Ray 的架构和部署模式有助于组织构建可从实验扩展到生产的分布式 AI 基础设施。

Ray 为分布式 AI 工作负载(训练、调优、推理和数据处理)提供统一框架,在抽象集群管理复杂性的同时保持对资源分配的细粒度控制。对于从单 GPU 实验转向多节点生产系统的组织,Ray 提供了通往可扩展 AI 基础设施的最直接路径。

为什么选择 Ray 作为 AI 基础设施

Ray 诞生于加州大学伯克利分校的 RISELab,旨在解决传统框架(如 Apache Spark)难以处理的分布式 AI 工作负载的特定挑战:

AI 工作负载需求

异构计算: AI 流水线混合了 CPU 密集型数据处理与 GPU 加速的训练和推理。Ray 原生支持异构资源分配,根据工作负载需求在 CPU 和 GPU 之间调度任务。³

细粒度并行: 深度学习训练需要协调跨工作节点的梯度、管理模型状态并优雅地处理故障。Ray 的任务和 Actor 抽象提供了这些模式所需的原语。

有状态计算: 与 MapReduce 风格的框架不同,AI 工作负载通常需要跨迭代维护状态。Ray Actor 在调用之间保持状态,支持参数服务器和强化学习智能体等模式。

低延迟: 实时推理和超参数搜索需要微秒级任务调度。Ray 实现每秒数百万任务处理,延迟低于毫秒级——对于这些模式比 Spark 快一个数量级。⁴

与替代方案的比较

Apache Spark: 针对批量数据处理进行了优化,使用 SQL 和 DataFrames。擅长 ETL、特征工程和结构化数据分析。不太适合 GPU 工作负载、有状态计算和低延迟需求。⁵

Dask: Python 原生分布式计算,具有 DataFrame 和数组 API。比 Spark 更轻量,但缺乏 Ray 的 Actor 模型和机器学习专用库。

Horovod: 专注于分布式深度学习训练。对于多样化的 AI 工作负载不如 Ray 灵活,但对于纯训练场景更简单。

Ray 的优势: 单一框架处理数据处理、训练、超参数调优和推理服务。团队避免了管理多个系统及其之间的集成复杂性。

生产环境采用

主要组织在生产环境中运行 Ray:⁶

  • OpenAI: ChatGPT 训练协调
  • Uber: 分布式机器学习平台
  • Instacart: 机器学习基础设施骨干
  • Shopify: 生产环境机器学习工作负载
  • 蚂蚁集团: 大规模金融机器学习

Ray 集群官方支持最多 2,000 个节点,实现前沿模型训练和高容量推理所需的规模。

Ray 架构

核心抽象

Ray 为分布式计算提供两个基本抽象:

任务: 远程执行的无状态函数。Ray 自动在可用工作节点之间调度任务、处理故障并返回结果。

import ray

ray.init()

@ray.remote
def process_batch(data):
    # 在任意可用工作节点上运行
    return transform(data)

# 并行执行
futures = [process_batch.remote(batch) for batch in batches]
results = ray.get(futures)

Actor: 分布在集群中的有状态对象。每个 Actor 在方法调用之间维护状态,支持模型服务、参数服务器和游戏环境等模式。

@ray.remote
class ModelServer:
    def __init__(self, model_path):
        self.model = load_model(model_path)

    def predict(self, input_data):
        return self.model(input_data)

# 创建 Actor 实例
server = ModelServer.remote("model.pt")
# 远程调用方法
prediction = ray.get(server.predict.remote(data))

集群架构

Ray 集群由头节点和工作节点组成:

头节点: 运行管理集群状态的全局控制存储(GCS)、跟踪数据位置的对象目录以及协调任务放置的调度器。

工作节点: 执行任务和 Actor。每个工作节点运行一个 Raylet 守护进程,处理本地调度和对象管理。

对象存储: 分布式共享内存,实现同一节点上任务之间的零拷贝数据传递和跨节点的高效序列化。

资源管理

Ray 显式管理异构资源:

@ray.remote(num_cpus=4, num_gpus=1)
def train_step(model, data):
    # 保证 4 个 CPU 和 1 个 GPU
    return model.train(data)

自定义资源实现细粒度调度:

# 请求特定硬件
@ray.remote(resources={"TPU": 1})
def tpu_inference(data):
    return run_on_tpu(data)

Ray AI 库

Ray 包含针对常见 AI 工作负载的专用库:

Ray Train

支持 PyTorch、TensorFlow 和其他框架的分布式训练抽象:

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

def train_func():
    # 标准 PyTorch 训练代码
    model = MyModel()
    for epoch in range(10):
        train_epoch(model)

trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=ScalingConfig(
        num_workers=8,
        use_gpu=True
    ),
)
result = trainer.fit()

主要功能:⁷ - 只需修改 2 行代码即可从单 GPU 扩展到多节点集群 - 自动分布式数据加载和梯度同步 - 检查点管理和故障恢复 - 与 PyTorch Lightning、Hugging Face Transformers 和 DeepSpeed 集成

Ray Tune

分布式超参数优化:

from ray import tune
from ray.tune.schedulers import ASHAScheduler

def training_function(config):
    model = build_model(config["learning_rate"], config["batch_size"])
    for epoch in range(100):
        loss = train_epoch(model)
        tune.report(loss=loss)

analysis = tune.run(
    training_function,
    config={
        "learning_rate": tune.loguniform(1e-4, 1e-1),
        "batch_size": tune.choice([32, 64, 128])
    },
    scheduler=ASHAScheduler(metric="loss", mode="min"),
    num_samples=100,
)

Ray Tune 提供: - 跨集群并行试验执行 - 使用 ASHA、基于种群的训练和其他调度器进行早停 - 与 Optuna、HyperOpt 和其他优化库集成 - 检查点和恢复

Ray Serve

具有自动扩缩容的生产模型服务:

from ray import serve

@serve.deployment(num_replicas=2, ray_actor_options={"num_gpus": 1})
class LLMDeployment:
    def __init__(self):
        self.model = load_llm()

    async def __call__(self, request):
        prompt = await request.json()
        return self.model.generate(prompt["text"])

serve.run(LLMDeployment.bind())

Ray Serve 功能:⁸ - 基于请求率的自动扩缩容 - 零停机部署 - 多模型组合和请求路由 - 用于 LLM 服务的 OpenAI 兼容 API - 通过前缀感知路由减少 60% 的首 token 响应时间⁹

Ray Data

分布式数据加载和预处理:

import ray

ds = ray.data.read_parquet("s3://bucket/data/")
ds = ds.map(preprocess)
ds = ds.filter(lambda x: x["label"] > 0)

# 转换为 PyTorch DataLoader
train_loader = ds.iter_torch_batches(batch_size=32)

Ray Data 提供: - 大型数据集的流式执行 - GPU 加速预处理 - 与 Ray Train 集成用于分布式训练 - 原生支持图像、文本和表格数据

使用 KubeRay 进行 Kubernetes 部署

生产环境的 Ray 部署通常使用官方 Kubernetes operator KubeRay 在 Kubernetes 上运行:¹⁰

KubeRay 组件

RayCluster CRD: 定义集群配置,包括头节点和工作节点规格、自动扩缩容策略和资源需求。

apiVersion: ray.io/v1
kind: RayCluster
metadata:
  name: ml-cluster
spec:
  headGroupSpec:
    rayStartParams:
      dashboard-host: '0.0.0.0'
    template:
      spec:
        containers:
        - name: ray-head
          image: rayproject/ray:2.52.0-py310-gpu
          resources:
            limits:
              nvidia.com/gpu: 1
  workerGroupSpecs:
  - replicas: 4
    minReplicas: 2
    maxReplicas: 10
    groupName: gpu-workers
    rayStartParams: {}
    template:
      spec:
        containers:
        - name: ray-worker
          image: rayproject/ray:2.52.0-py310-gpu
          resources:
            limits:
              nvidia.com/gpu: 1

RayJob CRD: 向自动配置的集群提交作业。KubeRay 创建集群、运行作业,并可选择在完成时删除集群。

RayService CRD: 托管的 Ray Serve 部署,具有零停机升级和健康检查。

生产环境最佳实践

容器镜像: 将依赖项预先构建到发布的 Docker 镜像中,而不是在运行时安装。这确保了可重现性和更快的启动速度。¹¹

自动扩缩容: 同时启用 Ray 自动扩缩容(在集群内扩展工作节点)和 Kubernetes 自动扩缩容(扩展集群节点):

spec:
  enableInTreeAutoscaling: true
  autoscalerOptions:
    upscalingMode: Default
    idleTimeoutSeconds: 60

存储: 使用持久卷存储检查点,使用共享存储存储大型数据集:

volumes:
- name: shared-storage
  persistentVolumeClaim:
    claimName: ml-data-pvc

监控: 与 Prometheus 和 Grafana 集成以实现可观测性:

metadata:
  annotations:
    prometheus.io/scrape: "true"
    prometheus.io/port: "8080"

特定云平台部署

GKE: GKE 上的 Ray 插件提供托管 Ray 集群,具有自动配置和与 Google Cloud 服务的集成。¹²

EKS: 使用 Cluster Autoscaler 或 Karpenter 部署 KubeRay 以实现节点扩展。与 FSx for Lustre 集成提供高性能共享存储。

AKS: Microsoft 和 Anyscale 提供 Anyscale on Azure 作为可从 Azure Portal 访问的第一方服务。¹³

Anyscale 托管 Ray

Anyscale 是由 Ray 创建者创立的公司,提供托管 Ray 部署:

Anyscale 平台

托管集群: 生产级 Ray 集群,具有自动扩缩容、容错和监控功能,无需基础设施管理。

RayTurbo 运行时: 专有性能改进,与开源 Ray 相比提供更高的弹性、更快的性能和更低的成本。¹⁴

企业功能: 基于角色的访问控制、审计日志、VPC 对等和合规认证。

云合作伙伴

CoreWeave: Anyscale BYOC(自带云)通过 CoreWeave Kubernetes Service 直接部署在 CoreWeave 客户账户中。¹⁵

Azure: 第一方 Anyscale 服务可在 Azure Portal 中使用,具有原生集成。

AWS: Anyscale 在 AWS 上运行,与现有 AWS 服务集成。

何时使用 Anyscale

**考虑使用 Anysc

[内容因翻译需要被截断]

Request a Quote_

Tell us about your project and we'll respond within 72 hours.

> TRANSMISSION_COMPLETE

Request Received_

Thank you for your inquiry. Our team will review your request and respond within 72 hours.

QUEUED FOR PROCESSING