多GPU并行AI推理
Hugging Face 平台拥有许多数据集和预训练模型,使使用和训练最先进的机器学习模型变得越来越容易。然而,扩展 AI 任务可能很困难,因为 AI 数据集通常很大(100 GB 到 TB),使用 Hugging Face Transformer 进行模型推理有时计算成本很高。
Dask 是一个用于分布式计算的 Python 库,它可以通过将数据集分解为可管理的块来处理核外计算(处理内存中放不下的数据)。这使得执行以下操作变得容易:
- 使用模仿 Pandas 的易于使用的 API 高效地加载和预处理 TB 级数据集
- 并行模型推理(可选择多节点 GPU 推理)
在这篇文章中,我们展示了一个来自 FineWeb 数据集的数据处理示例,使用 FineWeb-Edu 分类器来识别具有高教育价值的网页。我们将展示:
- 如何使用 pandas 在本地处理 100 行
- 如何使用 Dask 在云端的多个 GPU 上扩展到 2.11 亿行
1、使用 Pandas 处理 100 行
FineWeb 数据集包含来自 Common Crawl 的 15 万亿个英文网络数据标记,Common Crawl 是一个非营利组织,它托管每月更新的公共网络爬虫数据集。该数据集通常用于各种任务,例如大型语言模型训练、分类、内容过滤和跨各个部门的信息检索。
在笔记本电脑上使用 pandas 下载和读取单个文件可能需要 1 分钟以上:
import pandas as pd
df = pd.read_parquet(
"hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/000_00000.parquet"
)
接下来,我们将使用 HF FineWeb-Edu 分类器来判断数据集中网页的教育价值。网页的等级从 0 到 5,其中 0 表示不具教育意义,5 表示教育意义高。我们可以使用 pandas 对较小的 100 行数据子集执行此操作,这在配备 GPU 的 M1 Mac 上大约需要 10 秒。
from transformers import pipeline
def compute_scores(texts):
import torch
# Select which hardware to use
if torch.cuda.is_available():
device = torch.device("cuda")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
pipe = pipeline(
"text-classification",
model="HuggingFaceFW/fineweb-edu-classifier",
device=device
)
results = pipe(
texts.to_list(),
batch_size=25, # Choose batch size based on data size and hardware
padding="longest",
truncation=True,
function_to_apply="none"
)
return pd.Series([r["score"] for r in results])
df = df[:100]
min_edu_score = 3
df["edu-classifier-score"] = compute_scores(df.text)
df = df[df["edu-classifier-score"] >= min_edu_score]
请注意,我们还添加了一个步骤来检查 compute_scores 函数中的可用硬件,因为当我们在下一步使用 Dask 扩展时,它将被分发。这样可以轻松地从在单台机器上进行本地测试(在 CPU 上,或者您可能拥有一台带有 Apple Silicon GPU 的 MacBook)转变为分布在多台机器上(例如 NVIDIA GPU)。
2、使用 Dask 扩展到 2.11 亿行
整个 2024 年 2 月/3 月抓取的数据在磁盘上为 432 GB,在内存中为 ~715 GB,分布在 250 个 Parquet 文件中。即使在具有足够内存来容纳整个数据集的机器上,连续执行此操作也会非常慢。
要进行扩展,我们可以使用 Dask DataFrame,它通过并行化 pandas 来帮助您处理大型表格数据。它与 pandas API 非常相似,可以轻松地从在单个数据集上进行测试转变为扩展到完整数据集。 Dask 与 Hugging Face 数据集的默认格式 Parquet 配合良好,可实现丰富的数据类型、高效的列过滤和压缩。
import dask.dataframe as dd
df = dd.read_parquet(
# Load the full dataset lazily with Dask
"hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/*.parquet"
)
我们将使用 map_partitions 在 Dask DataFrame 上并行应用 compute_scores 函数进行文本分类,该函数将我们的函数并行应用于更大的 Dask DataFrame 中的每个 pandas DataFrame。meta 参数特定于 Dask,指示输出的数据结构(列名和数据类型)。
from transformers import pipeline
def compute_scores(texts):
import torch
# Select which hardware to use
if torch.cuda.is_available():
device = torch.device("cuda")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
pipe = pipeline(
"text-classification",
model="HuggingFaceFW/fineweb-edu-classifier",
device=device,
)
results = pipe(
texts.to_list(),
batch_size=768,
padding="longest",
truncation=True,
function_to_apply="none",
)
return pd.Series([r["score"] for r in results])
min_edu_score = 3
df["edu-classifier-score"] = df.text.map_partitions(compute_scores, meta=pd.Series([0]))
df = df[df["edu-classifier-score"] >= min_edu_score]
请注意,我们为本示例选择了合适的 batch_size,但您可能需要根据自己工作流程中使用的硬件、数据和模型对其进行自定义(请参阅有关管道批处理的 HF 文档)。
现在我们已经确定了我们感兴趣的数据集的行,我们可以保存结果以供其他下游分析使用。 Dask DataFrame 自动支持对 Parquet 的分布式写入。 但是,由于 Hugging Face 使用提交来跟踪数据集更改,因此我们需要一个自定义函数来允许与 Hugging Face 存储并行写入 Dask DataFrame。
from dask_hf import to_parquet
to_parquet(
df,
"hf://datasets/<your-hf-user>/<data-dir>" # Update with your dataset location
)
将来,我们将创建与 Dask 和 huggingface_hub 的直接集成,但现在,您可以复制并粘贴此自定义函数以供自己使用。在此示例中,我们将其保存到名为 dask_hf.py 的文件中,该文件是上面 import 子句中引用的文件。
3、多 GPU 并行模型推理
有多种方法可以在各种硬件上部署 Dask。在这里,我们将使用 Coiled 在云上部署 Dask,以便我们可以根据需要启动虚拟机,然后在完成后清理它们。
cluster = coiled.Cluster(
region="us-east-1", # Same region as data
n_workers=100,
spot_policy="spot_with_fallback", # Use spot instances, if available
worker_vm_types="g5.xlarge", # NVIDIA A10 Tensor Core GPU
worker_options={"nthreads": 1},
)
client = cluster.get_client()
Coiled 的底层处理:
- 为云虚拟机配置 GPU 硬件。在本例中,AWS 上的 g5.xlarge 实例。
- 设置适当的 NVIDIA 驱动程序、CUDA 运行时等。
- 使用包同步自动在云虚拟机上安装与本地相同的包。这包括工作目录中的 Python 文件,因此我们可以直接从远程集群上的 dask_hf.py 导入。
工作流程大约需要 5 个小时才能完成,我们的 GPU 硬件利用率很高。
综上所述,以下是完整的工作流程:
import dask.dataframe as dd
from transformers import pipeline
from dask_hf import to_parquet
import os
import coiled
cluster = coiled.Cluster(
region="us-east-1",
n_workers=100,
spot_policy="spot_with_fallback",
worker_vm_types="g5.xlarge",
worker_options={"nthreads": 1},
)
client = cluster.get_client()
cluster.send_private_envs(
{"HF_TOKEN": "<your-hf-token>"} # Send credentials over encrypted connection
)
df = dd.read_parquet(
"hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/*.parquet"
)
def compute_scores(texts):
import torch
# Select which hardware to use
if torch.cuda.is_available():
device = torch.device("cuda")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
pipe = pipeline(
"text-classification",
model="HuggingFaceFW/fineweb-edu-classifier",
device=device
)
results = pipe(
texts.to_list(),
batch_size=768,
padding="longest",
truncation=True,
function_to_apply="none"
)
return pd.Series([r["score"] for r in results])
min_edu_score = 3
df["edu-classifier-score"] = df.text.map_partitions(compute_scores, meta=pd.Series([0]))
df = df[df["edu-classifier-score"] >= min_edu_score]
to_parquet(
df,
"hf://datasets/<your-hf-user>/<data-dir>" # Replace with your HF user and directory
)
4、结束语
Hugging Face + Dask 是一个强大的组合。在此示例中,我们使用 Dask + Coiled 在云端的多个 GPU 上并行运行工作流,将分类任务从 100 行扩展到 2.11 亿行。
此相同类型的工作流可用于其他用例,例如:
- 过滤基因组数据以选择感兴趣的基因
- 从非结构化文本中提取信息并将其转换为结构化数据集
- 清理从互联网或 Common Crawl 抓取的文本数据
- 运行多模态模型推理以分析大型音频、图像或视频数据集
原文链接:Scaling AI-Based Data Processing with Hugging Face + Dask
BimAnt翻译整理,转载请标明出处