MCPcopy Index your code
hub / github.com/OpenDCAI/DataFlow

github.com/OpenDCAI/DataFlow @v1.0.10

repository ↗ · DeepWiki ↗ · release v1.0.10 ↗ · + Follow
2,827 symbols 11,032 edges 399 files 699 documented · 25%
README

DataFlow

大模型数据生成、清洗与准备,一站式搞定

issue resolution issue resolution

PyPI version PyPI - Python Version PyPI - Downloads Downloads

Colab Docker Documents Arxiv Ask DeepWiki

Discord Online wechat

OpenDCAI%2FDataFlow | Trendshift

可视化、低代码流水线,支持跨领域和用例的灵活编排。💪

将原始数据转化为高质量的 LLM 训练数据集。🔧

🎉 以更低的成本获得更智能的 LLM —— 在 GitHub 上给我们点个Star ⭐ 以获取最新更新。

初学者友好学习资源(持续更新): [🎬 视频教程] [📚 文字教程]

简体中文 | English

📰 0. 新闻

  • [2026-02-02] 🖥️ DataFlow WebUI 正式发布! 通过一条命令 dataflow webui 即可启动可视化流水线构建器,在直观的网页界面中构建并运行 DataFlow 流水线。👉 WebUI 文档

  • [2026-01-20] 🌟 DataFlow Awesome Works 上线! 新增板块用于展示基于 DataFlow 的开源项目与研究工作,欢迎提交 Pull Request 分享你的成果!👉 Awesome Works

  • [2025-12-19] 🎉 DataFlow 技术报告正式发布! 欢迎阅读并引用我们的 arXiv 论文:https://arxiv.org/abs/2512.16676

  • [2025-11-20] 🤖 DataFlow 全新 Data Agents 发布! 现在即可体验,并通过 Bilibili 教程快速上手:https://space.bilibili.com/3546929239689711/lists/6761342?type=season

  • [2025-06-28] 🎉 DataFlow 正式开源发布! 我们全新发布的以数据为中心的系统DataFlow已开源 —— 敬请关注后续更新!

🔍 1. 什么是 DataFlow?

DataFlow 是一个专门为从嘈杂来源(PDF、纯文本、低质量 QA)中生成、精炼、评估和过滤高质量 AI 数据而设计的数据准备和训练系统,从而通过针对性训练(预训练、指令微调 SFT、强化学习训练 RL)或 RAG 系统,提升大语言模型 (LLM) 在医疗、金融、法律和学术研究等特定领域的性能。

通过“基于算子 (operator-based)”的设计,DataFlow 将整个数据清洗工作流转化为可重现、可重用且可共享的“流水线 (pipeline)”,为以数据为中心的 AI 社区提供核心基础设施。此外,我们开发了智能化的 DataFlow-agent,能够根据需求通过重新组合现有算子或创建新算子来动态组装新的“流水线”。

df_overview_final_300

🔍 2. 核心特性

✅ 2.1 开箱即用的数据合成与清洗流水线

  • 高质量训练数据生成
  • 文本、数学和代码数据生成(参见 DataFlow-Instruct-10K 结果)
  • 通过 AgenticRAG 和 Text2SQL 等工具进行数据生成

  • 结构化数据提取

  • 大规模 PDF → QA 转换
  • 大规模书籍 PDF → 视觉 QA 转换

  • 科学数据工作流管理

  • Text2SQL 工作流管理(被 ICDE 2026 录用)
  • 数学数据工作流(被 KDD 2026 录用)

⚙️ 2.2 灵活的自定义流水线编排

  • 10+ 核心算子定义了交互模式和设计原则
  • 100+ 特定流水线算子可供重用或参考
  • 全面支持创建自定义算子 —— 插件式设计,可通过 GitHub 或 PyPI 轻松封装和分发

🧠 2.3 可复现、可复用且可分发的以数据为中心的 AI 系统

  • 数据治理算法被封装为算子流水线,实现了不同数据治理策略的可复现性和公平比较(❤️ 研究友好)
  • 轻松更换底层大模型复用原有管线,快速分析模型性能与数据质量之间的关系
  • 基于 Python 和 Git 生态系统,方便分发、管理和追溯高质量、用户定义的数据治理算子和流水线(❤️ 企业友好)

🛠️ 3. DataFlow 套件

DataFlow 套件提供了基本的算力设施,以配合 DataFlow 主仓库实现 LLM 数据准备的自动化和规模化。它由四个紧密集成的层组成:

  • DataFlow-WebUI – 一个直观的可视化界面,用于通过拖拽式算子工作流构建和管理复杂的数据流水线。
  • DataFlow-Agent – 一个由 AI 驱动的助手,根据用户的高层意图动态组合、执行和优化算子及流水线。
  • DataFlow-Ecosystem – 一个标准化的算子注册模块化分发层。它允许特定领域的模块(例如 DataFlow-MM、DataFlow-AI4S)在统一的抽象下贡献可扩展库。
  • RayOrch – 一个基于 Ray 构建的高性能编排层,为大规模数据任务提供分布式计算调度和资源管理。

这些组件共同构成了一个统一、可扩展的环境,将原始数据转化为模型就绪的智能信息。

✅ 4. 为什么使用 DataFlow?

数据生成和清洗对于高质量模型至关重要,但对于企业和个人而言,这些任务往往耗时、耗力且成本高昂。DataFlow 提供了一站式解决方案,高效应对这些挑战。 与 Nemo-Curator 和 Data-Juicer 等系统相比,DataFlow 提供:

  • 更强的数据合成模块支持 – 无缝集成文本、代码和数学数据生成流水线,用于高质量训练数据集。
  • 类 PyTorch 的编程管理 – 清晰的 流水线 (Pipeline) → 算子 (Operator) → 提示词 (Prompt) 分层结构,用于工作流控制。
  • 原则化和多类别的算子分类 – 算子被系统地组织到诸如生成、评估、过滤和精炼等多个功能类别中,形成了科学的多维分类法,反映了数据准备的不同阶段,并支持精确的算子选择与组合。
  • 用户友好设计,易于调试和上手 – 简化的工作流模式降低了学习曲线,加速了实验进程。

🔧 5. 算子如何工作?

DataFlow 算子的设计秉持简单与清晰的原则。

算子接收结构化输入(JSON, JSONL, CSV),经过智能化处理后产生高质量输出。 每个算子封装了一个特定的数据处理任务,提供简洁且一致的 API,易于理解和集成。类 PyTorch 的设计使其直观且开箱即用,让您可以快速构建、组合和自定义流水线,无需处理复杂的模板代码。

更多详情请参考 算子文档。下面是一个演示如何调用 PromptedGenerator 算子的极简示例:

示例输入数据 (json/jsonl 样式):

// input.json
[
  {"problem": "What is 17 + 25?"},
  {"problem": "If x = 3, compute 2x^2 + 1."}
]

算子调用代码:

from dataflow.operators.core_text import PromptedGenerator
from dataflow.utils.storage import FileStorage
from dataflow.serving import APILLMServing_request

# 将输入文件设置到全局存储类
storage = FileStorage(first_entry_file_name="./input.json",)

# 配置 LLM 服务(例如 OpenAI API)
# api key 需要通过 `export DF_API_KEY=sk-xxx` 设置
llm_serving = APILLMServing_request(
    api_url="https://api.openai.com/v1/chat/completions",
)

prompted_generator = PromptedGenerator(
    llm_serving=llm_serving,  # 预配置的 LLM 后端
    system_prompt="Please solve this math problem."
)

prompted_generator.run(
    storage=self.storage.step(),  # 数据管理(细节省略)
    input_key="problem",          # 从该列读取
    output_key="solution"         # 写入该列
)

运行后,算子会将生成的结果追加到 output_key 中。例如,输出数据 (json/jsonl 样式) 变为:

// dataflow_step1.json
[
    {"problem":"What is 17 + 25?","solution":"42"},
    {"problem":"If x = 3, compute 2x^2 + 1.","solution":"19"}
]

🛠️ 6. 流水线 (点击展开)

🔧 6.1 开箱即用的流水线

DataFlow 目前包含的流水线如下: - 📝 文本处理流水线(Text Pipeline):从大规模纯文本(多为网络爬取)中挖掘问答对,用于监督微调和强化学习训练。 - dataflow_text_pipeline - [HuggingFace🤗 示例数据]

⚙️ 6.2 灵活的算子流水线

在此框架下,算子被分为基础算子、通用算子、特定领域算子和评估算子等,支持数据处理和评估功能。详情请参考 文档

🤖 6.3 智能体引导的流水线

⚡ 7. 快速开始

🛠️ 7.1 环境配置与安装

DataFlow 支持 Python >= 3.10 环境,已在 Windows、Linux 和 MacOS 上的 Python 3.10, 3.11, 3.12 版本通过测试。

请使用以下命令进行环境配置与安装👇

我们推荐使用 uv 安装 DataFlow 以加速下载。

pip install uv
uv pip install open-dataflow

如果您想使用自己的 GPU 进行本地推理,请使用:

pip install uv
uv pip install open-dataflow[vllm]

安装完成后,可以使用以下命令检查 DataFlow 是否安装正确:

dataflow -v

如果安装成功,您将看到:

open-dataflow codebase version: 1.0.0
        Checking for updates...
        Local version:  1.0.0
        PyPI newest version:  1.0.0
You are using the latest version: 1.0.0.

🐳 7.2 Docker 安装(备选)

我们也提供了 Dockerfile 以便部署,并提供 预构建的 Docker 镜像 以便立即使用。

选项 1:使用预构建 Docker 镜像

您可以直接拉取并使用我们的预构建 Docker 镜像:

# 拉取预构建镜像
docker pull molyheci/dataflow:cu124

# 运行带有 GPU 支持的容器
docker run --gpus all -it molyheci/dataflow:cu124

# 在容器内验证安装
dataflow -v
选项 2:从 Dockerfile 构建

或者,您可以根据提供的 Dockerfile 构建 Docker 镜像:

# 克隆仓库 (HTTPS)
git clone https://github.com/OpenDCAI/DataFlow.git
# 或使用 SSH
# git clone git@github.com:OpenDCAI/DataFlow.git

cd DataFlow

# 构建 Docker 镜像
docker build -t dataflow:custom .

# 运行容器
docker run --gpus all -it dataflow:custom

# 在容器内验证安装
dataflow -v

注意: Docker 镜像包含 CUDA 12.4.1 支持,并预装了 vLLM 以实现 GPU 加速。请确保您已安装 NVIDIA Container Toolkit 以使用 GPU 功能。

🚀 7.3 通过 Google Colab 快速开始

您可以直接在 Google Colab 上开始您的第一个 DataFlow 翻译项目。 按照提供的指南,您可以无缝地从简单的翻译示例扩展到更复杂的 DataFlow 流水线。

👉 通过 Google Colab 开启 DataFlow

📖 7.4 参考项目文档

有关详细的使用说明入门指南,请访问我们的 DataFlow 文档

🖥️ 7.5 DataFlow-WebUI

DataFlow 提供了一个基于 Web 的 UI (WebUI),用于可视化流水线的构建与执行。

安装 DataFlow 主仓库后,只需运行以下命令即可启动 DataFlow-WebUI

dataflow webui

这将自动下载并启动最新的 DataFlow-WebUI 并在浏览器中打开(如果未自动打开,请访问 http://localhost:8000/)。

📚 7.5.1 WebUI 文档

🛠️ 7.5.2 开发仓库

🧪 8. 实验结果

8.1 DataFlow-Instruct-10k

DataFlow-Instruct-10K 是一个由 DataFlow 框架生成的统一多领域指令数据集。它通过跨越数学推理、代码和通用文本指令的多个自动化数据准备流水线构建。每个流水线遵循“生成-评估-过滤-精炼”工作流,以合成和策划高质量的“指令-响应”对。最终的数据集包含约 10K 个样本,为指令微调提供高质量监督,使基座模型能够以更少的训练样本达到全量训练指令模型的性能水平。

有关详细的实验设置,请访问我们的 DataFlow 技术报告

模型 数学平均 (Math-Avg) 代码平均 (Code-Avg) 知识平均 (Knowledge-Avg)
Qwen2-7B 系列
Base 20.1 66.3 76.2
+ Infinity-Instruct-10K 29.0 67.8 76.2
+ Infinity-Instruct-1M 27.9 68.2 76.2
+ DataFlow-Instruct-10K 32.4 66.2 76.1
Qwen2.5-7B 系列
Base 37.1 76.5 76.0
+ Infinity-Instruct-10K 22.6 77.6 75.8
+ Infinity-Instruct-1M 33.3 78.0 75.8
+ DataFlow-Instruct-10K 46.7 78.6 76.2

🛠️ 8.2 其他流水线结果 (点击展开)

8.2.1 文本流水线

8.2.1.1 预训练数据过滤流水线

从 SlimPajama-627B 语料库中,我们提取了一个 100B-token 子集,并应用了多个 DataFlow 文本预训练过滤器。我们使用 Megatron-DeepSpeed 框架从头开始训练了一个 Qwen2.5-0.5B 模型,训练量为 30B tokens,结果如下:

方法 ARC-C ARC-E MMLU HellaSwag WinoGrande Gaokao-MathQA 平均
Random-30B 25.26 43.94 27.03 37.02 50.99 27.35 35.26
Qurating-30B 25.00 43.14 27.50 37.03 50.67 26.78 35.02
FineWeb-Edu-30B 26.45 45.41 27.41 38.06 50.43 25.64 35.57
DataFlow-30B 25.51 45.58 27.42 37.58 50.67 27.35 35.69
8.2.1.2 SFT 数据过滤与合成流水线

为了研究小规模 SFT 数据质量,我们使用 LLaMA-Factory 在 WizardLM 和 Alpaca 数据集上对

Core symbols most depended-on inside this repo

get
called by 424
dataflow/utils/registry.py
step
called by 397
dataflow/utils/storage.py
run
called by 360
dataflow/statics/scaffold/{{cookiecutter.repo_name}}/{{cookiecutter.package_name}}/operators/core/my_prompted_generator.py
write
called by 247
dataflow/utils/storage.py
get_logger
called by 245
dataflow/logger.py
read
called by 211
dataflow/utils/storage.py
update
called by 104
dataflow/operators/general_text/eval/task2vec/utils.py
generate_from_input
called by 89
dataflow/core/llm_serving.py

Shape

Method 1,994
Class 527
Function 303
Route 3

Languages

Python100%

Modules by API surface

dataflow/prompts/agenticrag.py107 symbols
dataflow/operators/general_text/filter/rule_based_filter.py84 symbols
dataflow/utils/storage.py62 symbols
dataflow/prompts/text2sql.py52 symbols
dataflow/operators/text2sql/eval/sql_component_classifier.py47 symbols
dataflow/operators/text_pt/eval/Qurating/modeling/modeling_flash_llama.py44 symbols
test/cpu_only/test_prompt_restrict.py38 symbols
dataflow/operators/code/eval/python_executor.py38 symbols
dataflow/operators/conversations/generate/func_call_generators.py37 symbols
dataflow/prompts/general_text.py34 symbols
dataflow/prompts/func_call.py33 symbols
dataflow/operators/general_text/eval/task2vec/task_similarity.py33 symbols

Dependencies from manifests, versioned

google-cloud-aiplatform1.55 · 1×
pyarrow20.0.0 · 1×
vendi-score0.0.3 · 1×

For agents

$ claude mcp add DataFlow \
  -- python -m otcore.mcp_server <graph>

⬇ download graph artifact