多GPU并行AI推理

Hugging Face 平台拥有许多数据集和预训练模型,使使用和训练最先进的机器学习模型变得越来越容易。然而,扩展 AI 任务可能很困难,因为 AI 数据集通常很大(100 GB 到 TB),使用 Hugging Face Transformer 进行模型推理有时计算成本很高。

Dask 是一个用于分布式计算的 Python 库,它可以通过将数据集分解为可管理的块来处理核外计算(处理内存中放不下的数据)。这使得执行以下操作变得容易:

  • 使用模仿 Pandas 的易于使用的 API 高效地加载和预处理 TB 级数据集
  • 并行模型推理(可选择多节点 GPU 推理)

在这篇文章中,我们展示了一个来自 FineWeb 数据集的数据处理示例,使用 FineWeb-Edu 分类器来识别具有高教育价值的网页。我们将展示:

  • 如何使用 pandas 在本地处理 100 行
  • 如何使用 Dask 在云端的多个 GPU 上扩展到 2.11 亿行

1、使用 Pandas 处理 100 行

FineWeb 数据集包含来自 Common Crawl 的 15 万亿个英文网络数据标记,Common Crawl 是一个非营利组织,它托管每月更新的公共网络爬虫数据集。该数据集通常用于各种任务,例如大型语言模型训练、分类、内容过滤和跨各个部门的信息检索。

在笔记本电脑上使用 pandas 下载和读取单个文件可能需要 1 分钟以上:

import pandas as pd

df = pd.read_parquet(
    "hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/000_00000.parquet"
)

接下来,我们将使用 HF FineWeb-Edu 分类器来判断数据集中网页的教育价值。网页的等级从 0 到 5,其中 0 表示不具教育意义,5 表示教育意义高。我们可以使用 pandas 对较小的 100 行数据子集执行此操作,这在配备 GPU 的 M1 Mac 上大约需要 10 秒。

from transformers import pipeline

def compute_scores(texts):
    import torch

    # Select which hardware to use
    if torch.cuda.is_available():
        device = torch.device("cuda")
    elif torch.backends.mps.is_available():
        device = torch.device("mps")
    else:
        device = torch.device("cpu")

    pipe = pipeline(
        "text-classification",
        model="HuggingFaceFW/fineweb-edu-classifier",
        device=device
    )
    results = pipe(
        texts.to_list(),
        batch_size=25,                    # Choose batch size based on data size and hardware
        padding="longest",
        truncation=True,
        function_to_apply="none"
    )
    
    return pd.Series([r["score"] for r in results])

df = df[:100]
min_edu_score = 3
df["edu-classifier-score"] = compute_scores(df.text)
df = df[df["edu-classifier-score"] >= min_edu_score]

请注意,我们还添加了一个步骤来检查 compute_scores 函数中的可用硬件,因为当我们在下一步使用 Dask 扩展时,它将被分发。这样可以轻松地从在单台机器上进行本地测试(在 CPU 上,或者您可能拥有一台带有 Apple Silicon GPU 的 MacBook)转变为分布在多台机器上(例如 NVIDIA GPU)。

2、使用 Dask 扩展到 2.11 亿行

整个 2024 年 2 月/3 月抓取的数据在磁盘上为 432 GB,在内存中为 ~715 GB,分布在 250 个 Parquet 文件中。即使在具有足够内存来容纳整个数据集的机器上,连续执行此操作也会非常慢。

要进行扩展,我们可以使用 Dask DataFrame,它通过并行化 pandas 来帮助您处理大型表格数据。它与 pandas API 非常相似,可以轻松地从在单个数据集上进行测试转变为扩展到完整数据集。 Dask 与 Hugging Face 数据集的默认格式 Parquet 配合良好,可实现丰富的数据类型、高效的列过滤和压缩。

import dask.dataframe as dd

df = dd.read_parquet(
    # Load the full dataset lazily with Dask
    "hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/*.parquet" 
)

我们将使用 map_partitions 在 Dask DataFrame 上并行应用 compute_scores 函数进行文本分类,该函数将我们的函数并行应用于更大的 Dask DataFrame 中的每个 pandas DataFrame。meta 参数特定于 Dask,指示输出的数据结构(列名和数据类型)。

from transformers import pipeline

def compute_scores(texts):
    import torch

    # Select which hardware to use
    if torch.cuda.is_available():
        device = torch.device("cuda")
    elif torch.backends.mps.is_available():
        device = torch.device("mps")
    else:
        device = torch.device("cpu")

    pipe = pipeline(
        "text-classification",
        model="HuggingFaceFW/fineweb-edu-classifier",
        device=device,
    )
    results = pipe(
        texts.to_list(),
        batch_size=768,
        padding="longest",
        truncation=True,
        function_to_apply="none",
    )

    return pd.Series([r["score"] for r in results])

min_edu_score = 3
df["edu-classifier-score"] = df.text.map_partitions(compute_scores, meta=pd.Series([0]))
df = df[df["edu-classifier-score"] >= min_edu_score]

请注意,我们为本示例选择了合适的 batch_size,但您可能需要根据自己工作流程中使用的硬件、数据和模型对其进行自定义(请参阅有关管道批处理的 HF 文档)。

现在我们已经确定了我们感兴趣的数据集的行,我们可以保存结果以供其他下游分析使用。 Dask DataFrame 自动支持对 Parquet 的分布式写入。 但是,由于 Hugging Face 使用提交来跟踪数据集更改,因此我们需要一个自定义函数来允许与 Hugging Face 存储并行写入 Dask DataFrame。

from dask_hf import to_parquet

to_parquet(
    df,
    "hf://datasets/<your-hf-user>/<data-dir>"  # Update with your dataset location
)

将来,我们将创建与 Dask 和 huggingface_hub 的直接集成,但现在,您可以复制并粘贴此自定义函数以供自己使用。在此示例中,我们将其保存到名为 dask_hf.py 的文件中,该文件是上面 import 子句中引用的文件。

3、多 GPU 并行模型推理

有多种方法可以在各种硬件上部署 Dask。在这里,我们将使用 Coiled 在云上部署 Dask,以便我们可以根据需要启动虚拟机,然后在完成后清理它们。

cluster = coiled.Cluster(
    region="us-east-1",                 # Same region as data
    n_workers=100,                      
    spot_policy="spot_with_fallback",   # Use spot instances, if available
    worker_vm_types="g5.xlarge",        # NVIDIA A10 Tensor Core GPU
    worker_options={"nthreads": 1},
)
client = cluster.get_client()

Coiled 的底层处理:

  • 为云虚拟机配置 GPU 硬件。在本例中,AWS 上的 g5.xlarge 实例。
  • 设置适当的 NVIDIA 驱动程序、CUDA 运行时等。
  • 使用包同步自动在云虚拟机上安装与本地相同的包。这包括工作目录中的 Python 文件,因此我们可以直接从远程集群上的 dask_hf.py 导入。

工作流程大约需要 5 个小时才能完成,我们的 GPU 硬件利用率很高。

GPU 利用率和内存使用率都接近其最大容量,这意味着我们正在充分利用可用的硬件

综上所述,以下是完整的工作流程:

import dask.dataframe as dd
from transformers import pipeline
from dask_hf import to_parquet
import os
import coiled

cluster = coiled.Cluster(
    region="us-east-1",
    n_workers=100,
    spot_policy="spot_with_fallback",
    worker_vm_types="g5.xlarge",
    worker_options={"nthreads": 1},
)
client = cluster.get_client()
cluster.send_private_envs(
    {"HF_TOKEN": "<your-hf-token>"}             #  Send credentials over encrypted connection
)

df = dd.read_parquet(
    "hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/*.parquet"
)

def compute_scores(texts):
    import torch

    # Select which hardware to use
    if torch.cuda.is_available():
        device = torch.device("cuda")
    elif torch.backends.mps.is_available():
        device = torch.device("mps")
    else:
        device = torch.device("cpu")

    pipe = pipeline(
        "text-classification",
        model="HuggingFaceFW/fineweb-edu-classifier",
        device=device
    )
    results = pipe(
        texts.to_list(),
        batch_size=768,
        padding="longest",
        truncation=True,
        function_to_apply="none"
    )

    return pd.Series([r["score"] for r in results])

min_edu_score = 3
df["edu-classifier-score"] = df.text.map_partitions(compute_scores, meta=pd.Series([0]))
df = df[df["edu-classifier-score"] >= min_edu_score]

to_parquet(
    df,
    "hf://datasets/<your-hf-user>/<data-dir>"               # Replace with your HF user and directory
)

4、结束语

Hugging Face + Dask 是一个强大的组合。在此示例中,我们使用 Dask + Coiled 在云端的多个 GPU 上并行运行工作流,将分类任务从 100 行扩展到 2.11 亿行。

此相同类型的工作流可用于其他用例,例如:

  • 过滤基因组数据以选择感兴趣的基因
  • 从非结构化文本中提取信息并将其转换为结构化数据集
  • 清理从互联网或 Common Crawl 抓取的文本数据
  • 运行多模态模型推理以分析大型音频、图像或视频数据集

原文链接:Scaling AI-Based Data Processing with Hugging Face + Dask

BimAnt翻译整理,转载请标明出处