Skip to content

fightBoxing/daft-platform

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Daft 多模态数据处理平台

基于 Daft 引擎构建的多模态数据处理低代码平台,提供 Web SQL 工作台、Schema 浏览器、函数文档、AI UDF 等功能。

功能概览

模块 说明
SQL 工作台 Web 端 SQL 编辑器,支持自动补全、语法高亮、执行历史
Schema 浏览器 Catalog → Schema → Table 三级树形元数据浏览
函数文档 219+ 内置函数分类查看、搜索、自动补全
AI UDF 16 个预置 AI 函数:图像识别、OCR、情感分析、PII 脱敏等
元数据服务 支持 Apache Gravitino(分布式)和 SQLite(本地降级)双后端
DAG 编排 可视化数据处理流程编排(实验性)

项目结构

daft-platform/                      # 项目根目录
├── README.md
├── requirements.txt
├── Makefile                        # make dev / make test / make run
├── .gitignore
└── daft_platform/                  # Python 包
    ├── app.py                      # FastAPI 应用入口
    ├── config.py                   # 配置管理(环境变量)
    ├── models/                     # Pydantic 请求/响应模型
    │   ├── sql.py                  # SQL 执行相关
    │   ├── datasource.py           # 数据源/元数据相关
    │   └── function.py             # 函数注册表相关
    ├── routers/                    # API 路由层
    │   ├── sql.py                  # /api/sql/*
    │   ├── datasource.py           # /api/datasource/*
    │   ├── function.py             # /api/function/*
    │   └── settings.py             # /api/settings/*
    ├── services/                   # 业务逻辑层
    │   ├── session_manager.py      # Daft Session 单例管理
    │   ├── sql_executor.py         # SQL 执行 + 错误翻译
    │   ├── sql_history.py          # 执行历史(SQLite)
    │   ├── function_registry.py    # 函数注册表
    │   ├── builtin_udfs.py         # 16 个预置 AI UDF
    │   ├── metadata_service.py     # 元数据抽象接口 + 工厂函数
    │   ├── gravitino_metadata.py   # Gravitino REST API 实现
    │   └── local_metadata.py       # SQLite 本地降级实现
    ├── templates/                  # Jinja2 前端页面
    │   ├── index.html              # SQL 工作台主页
    │   └── dag.html                # DAG 编排页
    ├── static/                     # 静态资源(JS/CSS)
    ├── tests/                      # 单元测试(pytest)
    ├── test_data/                  # 测试用图片(dog_*.jpg)
    ├── test_data.csv               # 测试用 CSV
    ├── deploy/                     # 部署配置
    │   └── gravitino-pod.yaml      # Gravitino K8s 部署
    └── data/                       # 运行时数据(.gitignore)

技术栈

  • 后端: Python 3.10+, FastAPI, Uvicorn, Pydantic v2
  • 计算引擎: Daft (Rust + Python)
  • 元数据: Apache Gravitino / SQLite (降级)
  • 前端: Jinja2 模板 + 原生 JavaScript
  • 测试: pytest

快速开始

1. 前置条件

  • Python 3.10+
  • Daft 引擎(需从源码编译)
# 编译安装 Daft 引擎
git clone https://github.com/Eventual-Inc/Daft.git && cd Daft
make .venv && make build
source .venv/bin/activate

2. 克隆本项目并安装依赖

git clone https://github.com/fightBoxing/daft-platform.git
cd daft-platform

pip install -r requirements.txt

# 可选:AI UDF 额外依赖
pip install easyocr          # OCR 文字识别
pip install imagehash        # 图像感知哈希
pip install face_recognition  # 人脸检测(或 opencv-python)

3. 启动服务

# 开发模式(自动重载)
make dev

# 生产模式
make run

# 或直接用 uvicorn
uvicorn daft_platform.app:app --host 0.0.0.0 --port 8080

启动后访问:


配置

所有配置通过环境变量管理,前缀 DAFT_PLATFORM_

环境变量 默认值 说明
DAFT_PLATFORM_DEBUG true 调试模式
DAFT_PLATFORM_MAX_PREVIEW_ROWS 50 预览最大行数
DAFT_PLATFORM_MAX_RESULT_ROWS 10000 查询结果最大行数
DAFT_PLATFORM_SQL_TIMEOUT_SECONDS 300 SQL 执行超时(秒)
DAFT_PLATFORM_GRAVITINO_ENABLED true 是否启用 Gravitino 元数据服务
DAFT_PLATFORM_GRAVITINO_URI http://localhost:30090 Gravitino 服务地址
DAFT_PLATFORM_GRAVITINO_METALAKE daft_platform Gravitino metalake 名称
DAFT_PLATFORM_GRAVITINO_TIMEOUT 3 Gravitino 连接超时(秒)

AI / LLM 配置

AI UDF(如 classify_imagetext_summary)需要 LLM API:

环境变量 说明
OPENAI_API_KEY OpenAI / 兼容 API 密钥
OPENAI_BASE_URL API 地址(默认 https://api.openai.com/v1
OPENAI_MODEL 模型名(默认 gpt-4o-mini

也可在 Web 界面右上角 ⚙️ 设置中配置,保存到本地数据库。

元数据服务降级

启动时自动检测 Gravitino 可用性:

  • Gravitino 可用 → 使用 Gravitino REST API 管理元数据
  • Gravitino 不可用 → 自动降级到本地 SQLite(零外部依赖)

禁用 Gravitino 直接使用 SQLite:

export DAFT_PLATFORM_GRAVITINO_ENABLED=false

部署 Gravitino(可选)

# K8s 部署
kubectl apply -f daft_platform/deploy/gravitino-pod.yaml

# Docker 部署
docker run -d --name gravitino -p 30090:8090 apache/gravitino:latest

API 接口

SQL 执行

# 执行 SQL
curl -X POST http://localhost:8080/api/sql/execute \
  -H 'Content-Type: application/json' \
  -d '{"sql": "SELECT * FROM my_table LIMIT 10", "limit": 1000}'

# SQL 安全验证
curl -X POST http://localhost:8080/api/sql/validate \
  -H 'Content-Type: application/json' \
  -d '{"sql": "SELECT 1"}'

# 查看执行历史
curl http://localhost:8080/api/sql/history?limit=10

数据源管理

# 注册 JSON 数据表
curl -X POST http://localhost:8080/api/datasource/register \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "users",
    "source_type": "json",
    "data": {"id": [1, 2, 3], "name": ["Alice", "Bob", "Carol"]}
  }'

# 注册文件表(CSV/Parquet/JSON)
curl -X POST http://localhost:8080/api/datasource/register \
  -H 'Content-Type: application/json' \
  -d '{"name": "sales", "source_type": "file", "path": "/data/sales.parquet"}'

# 注册 HuggingFace 数据集
curl -X POST http://localhost:8080/api/datasource/register \
  -H 'Content-Type: application/json' \
  -d '{"name": "mnist", "source_type": "huggingface", "dataset": "mnist", "limit": 1000}'

# 注册 Glob 路径(批量文件)
curl -X POST http://localhost:8080/api/datasource/register \
  -H 'Content-Type: application/json' \
  -d '{"name": "images", "source_type": "glob", "glob_pattern": "/data/photos/**/*.jpg"}'

# 查看元数据树
curl http://localhost:8080/api/datasource/tree

函数查询

# 列出所有函数
curl http://localhost:8080/api/function/list

# 按分类筛选
curl http://localhost:8080/api/function/list?category=ai

# 搜索函数
curl http://localhost:8080/api/function/search?q=image

# SQL 自动补全
curl http://localhost:8080/api/function/completions?prefix=image_

SQL 语法指南

Daft SQL 兼容标准 SQL 语法,额外支持多模态数据处理函数。

基础查询

-- 注册测试数据后查询
SELECT * FROM test_data WHERE score > 85 ORDER BY score DESC;

-- 聚合统计
SELECT category, COUNT(*) AS cnt, AVG(score) AS avg_score
FROM test_data
GROUP BY category
ORDER BY avg_score DESC;

-- CTE(公用表表达式)
WITH top_students AS (
    SELECT * FROM test_data WHERE score > 90
)
SELECT category, COUNT(*) FROM top_students GROUP BY category;

-- 窗口函数
SELECT name, score,
       RANK() OVER (ORDER BY score DESC) AS rank,
       ROW_NUMBER() OVER (PARTITION BY category ORDER BY score DESC) AS cat_rank
FROM test_data;

字符串函数

SELECT lower(name), upper(city) FROM test_data;
SELECT * FROM test_data WHERE contains(city, '');
SELECT * FROM test_data WHERE starts_with(name, '');
SELECT length(name) AS name_len FROM test_data;
SELECT replace(city, '北京', 'Beijing') FROM test_data;
SELECT split(name, '') FROM test_data;  -- 按字符拆分

数值函数

SELECT abs(score - 85) AS diff FROM test_data;
SELECT round(score, 0) AS rounded FROM test_data;
SELECT ceil(score), floor(score) FROM test_data;
SELECT clip(score, 80, 95) AS clipped FROM test_data;

时间函数

SELECT current_date(), current_timestamp();
SELECT year(to_date('2024-03-15', '%Y-%m-%d'));
SELECT date_trunc('month', current_timestamp());

列表函数

SELECT explode(split(name, '')) FROM test_data;  -- 展开为多行
SELECT list_count(split(city, '')) FROM test_data;

多模态数据处理 SQL

以下为 Daft 特色的多模态数据处理能力,可在 SQL 中直接使用。

图像处理

-- 1. 注册图片数据集(Glob 方式扫描目录)
--    在 Web 界面中:数据源 → 注册 → Glob → 路径填 /path/to/images/**/*.jpg

-- 2. 解码图片并调整尺寸
SELECT path, image_decode(path) AS img FROM images;
SELECT path, image_resize(image_decode(path), 224, 224) AS resized FROM images;

-- 3. 图像裁剪
SELECT path, image_crop(image_decode(path), 0, 0, 100, 100) AS cropped FROM images;

AI 视觉识别(需配置 LLM API Key)

-- 图像内容识别:调用 Vision API 返回中文描述
SELECT path, classify_image(path) AS description FROM images;

-- 图像元信息:宽高、格式、文件大小
SELECT path, image_info(path) AS info FROM images;

-- 主色调提取:返回 HEX 颜色值
SELECT path, dominant_color(path) AS color FROM images;

-- 图像感知哈希:用于去重/相似度比较
SELECT path, image_phash(path) AS phash FROM images;

-- 人脸检测:返回人脸数量
SELECT path, face_count(path) AS faces FROM images;

-- EXIF 元数据:相机型号、GPS、拍摄时间等
SELECT path, exif_extract(path) AS exif FROM images;

-- OCR 文字识别(支持中英文)
SELECT path, ocr_extract(path) AS text FROM document_images;

完整图像分析流水线示例

-- 假设已注册 images 表(Glob: /data/photos/**/*.jpg)

-- 分析每张图片:识别内容 + 提取信息 + 主色调 + 人脸
SELECT
    path,
    classify_image(path)  AS description,
    image_info(path)      AS info,
    dominant_color(path)   AS color,
    face_count(path)       AS faces,
    image_phash(path)      AS phash
FROM images
LIMIT 20;

-- 筛选包含人脸的图片
SELECT path, face_count(path) AS faces
FROM images
WHERE face_count(path) > 0;

-- 图像去重(基于感知哈希)
SELECT phash, COUNT(*) AS cnt, MIN(path) AS sample
FROM (SELECT path, image_phash(path) AS phash FROM images)
GROUP BY phash
HAVING cnt > 1;

文本分析

-- 情感分析:返回 positive / negative / neutral
SELECT name, sentiment(name) AS mood FROM test_data;

-- 语言检测:返回 zh / en
SELECT name, detect_language(name) AS lang FROM test_data;

-- 关键词提取(TF 频率统计)
SELECT extract_keywords(description) AS keywords FROM articles;

-- 文本摘要(调用 LLM,50 字以内)
SELECT text_summary(content) AS summary FROM articles;

-- PII 脱敏(自动识别手机号/身份证/邮箱/银行卡)
SELECT mask_pii('联系方式: 13812345678, 邮箱: test@example.com') AS masked;
-- 输出: "联系方式: 138****5678, 邮箱: te***@example.com"

-- 文本相似度(Jaccard bigram)
SELECT text_similarity('你好世界', '你好中国') AS similarity;

音频 & 文件处理

-- 音频时长(秒,支持 mp3/wav/flac)
SELECT path, audio_duration(path) AS duration_sec FROM audio_files;

-- 文件 MD5 哈希
SELECT path, file_md5(path) AS md5 FROM files;

-- 文件转 Base64
SELECT path, to_base64(path) AS b64 FROM files;

向量距离计算

-- 余弦相似度(用于 Embedding 比较)
SELECT cosine_similarity(emb1, emb2) AS sim FROM embedding_pairs;

-- 欧氏距离
SELECT euclidean_distance(emb1, emb2) AS dist FROM embedding_pairs;

-- 点积
SELECT dot_product(emb1, emb2) AS dp FROM embedding_pairs;

URL & 网络

-- 从 URL 下载内容
SELECT url_download(url_col) AS content FROM web_sources;

-- 解析 URL 各部分
SELECT url_parse(url_col) AS parsed FROM web_sources;

-- 猜测文件 MIME 类型
SELECT guess_mime_type(path) AS mime FROM files;

内置 AI UDF 完整列表

函数名 输入 输出 说明 额外依赖
classify_image(path) 文件路径 String LLM Vision API 图像识别 LLM API Key
image_info(path) 文件路径 Struct 宽高/格式/文件大小 Pillow
dominant_color(path) 文件路径 String 主色调 HEX 值 Pillow
image_phash(path) 文件路径 String 感知哈希(去重用) imagehash / 降级 MD5
face_count(path) 文件路径 Int32 人脸数量 face_recognition / OpenCV
exif_extract(path) 文件路径 String EXIF 元数据 JSON Pillow
ocr_extract(path) 文件路径 String OCR 文字识别 easyocr / pytesseract
sentiment(text) 文本 String 情感分析
detect_language(text) 文本 String 语言检测 (zh/en)
extract_keywords(text) 文本 String 关键词提取
text_summary(text) 文本 String LLM 文本摘要 LLM API Key
mask_pii(text) 文本 String PII 脱敏
text_similarity(a, b) 两段文本 Float64 Jaccard 相似度
audio_duration(path) 文件路径 Float64 音频时长(秒) mutagen / wave
file_md5(path) 文件路径 String 文件 MD5 哈希
to_base64(path) 文件路径 String 文件转 Base64

标记 "无" 的函数无需额外安装,开箱即用。


测试

# 运行全部测试
make test

# 运行单个测试文件
python -m pytest daft_platform/tests/test_sql_executor.py -v

# 运行单个测试方法
python -m pytest daft_platform/tests/test_routers.py::test_register_and_list_table -v

测试覆盖:

测试文件 覆盖模块
test_models.py Pydantic 模型验证、health 端点
test_sql_executor.py SQL 安全验证、错误翻译
test_sql_history.py 执行历史 CRUD
test_function_registry.py 函数列表/搜索/补全
test_metadata_service.py LocalMetadataService 完整 CRUD
test_session_manager.py Session 单例、表注册
test_routers.py API 路由集成测试

安全机制

SQL 安全验证

执行前自动拦截危险操作:

DROP TABLE    → ❌ 拦截
DELETE FROM   → ❌ 拦截
TRUNCATE      → ❌ 拦截
ALTER TABLE   → ❌ 拦截
SELECT / WITH → ✅ 允许

错误翻译

Daft 引擎的 Rust 层错误自动翻译为中文友好提示:

原始错误 翻译 建议
column 'foo' not found 列 'foo' 不存在于当前表中 请检查列名拼写
table 'bar' not found 表 'bar' 不存在 请先注册该表
Type mismatch 数据类型不匹配 使用 CAST() 转换
Division by zero 除零错误 添加 CASE WHEN 判断
Out of memory 内存不足 添加 LIMIT 限制

已知问题

详见 KNOWN_ISSUES.md,包含开发过程中的踩坑记录和解决方案。

License

本项目基于 Daft 引擎开发,遵循 Apache 2.0 License。

About

Multimodal data processing platform built on Daft engine — SQL workbench, AI UDFs, metadata management

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors