架构总览
相关源文件
本页面内容基于以下源文件生成:
- py-src/data_formulator/app.py
- py-src/data_formulator/agents/data_agent.py
- src/lib/agents-chart/index.ts
- py-src/data_formulator/datalake/workspace.py
- py-src/data_formulator/workspace_factory.py
- py-src/data_formulator/agents/init.py
- py-src/data_formulator/agents/web_utils.py
- py-src/data_formulator/agents/agent_utils.py
- py-src/data_formulator/agents/client_utils.py
- py-src/data_formulator/agents/semantic_types.py
- src/lib/agents-chart/core/index.ts
- src/lib/agents-chart/gofish/index.ts
- src/lib/agents-chart/chartjs/index.ts
- src/lib/agents-chart/echarts/index.ts
- src/lib/agents-chart/vegalite/index.ts
- src/lib/agents-chart/test-data/index.ts
- src/lib/agents-chart/gofish/templates/index.ts
- src/lib/agents-chart/chartjs/templates/index.ts
- src/lib/agents-chart/echarts/templates/index.ts
- src/lib/agents-chart/vegalite/templates/index.ts
Data Formulator 是一个基于 AI 的数据可视化工具,采用 Python Flask 后端与 TypeScript 前端的分层架构。系统通过多 Agent 协作实现从自然语言到可视化图表的端到端转换,支持多种图表后端(Vega-Lite、ECharts、Chart.js、GoFish)和多种数据存储后端(本地文件系统、Azure Blob)。
系统架构总览
系统采用前后端分离架构,后端基于 Flask 实现 RESTful API,前端通过 TypeScript 实现图表语义层。核心设计理念是将 AI 推理、数据转换、图表渲染三个关注点分离,通过 Workspace 抽象实现数据持久层的可插拔性。
正在加载图表渲染器...
架构要点说明:
-
Blueprint 延迟加载:
app.py通过_register_blueprints()实现按需加载,避免启动时加载所有 AI/ML 依赖(py-src/data_formulator/app.py:93-122) -
Agent 层级协作:
DataAgent作为主控 Agent 实现 observe-think-act 循环,内部委托DataRecAgent执行具体的数据转换和图表推荐(py-src/data_formulator/agents/data_agent.py:131-156) -
图表后端可插拔:
agents-chart模块提供统一的语义层 API,支持 Vega-Lite、ECharts、Chart.js、GoFish 四种渲染后端(src/lib/agents-chart/index.ts:12-18) -
Workspace 工厂模式:通过
workspace_factory.get_workspace()根据配置动态选择本地或 Azure Blob 存储,实现存储层的透明切换(py-src/data_formulator/workspace_factory.py:70-128) -
安全边界:
web_utils.py实现了完整的 SSRF 防护,包括私有 IP 阻断、协议白名单、重定向验证(py-src/data_formulator/agents/web_utils.py:56-110)
核心模块详解
Flask 应用入口
职责边界:负责应用生命周期管理、Blueprint 注册、日志配置,不处理具体业务逻辑。
入口 API:
configure_logging():配置日志级别,抑制第三方库冗余输出(py-src/data_formulator/app.py:70-88)_register_blueprints():延迟加载并注册四个 Blueprint(py-src/data_formulator/app.py:93-125)
关键数据结构:
python1# CustomJSONEncoder 处理 numpy 类型和 bytes 序列化 2class CustomJSONEncoder(json.JSONEncoder): 3 def default(self, obj): 4 if isinstance(obj, np.int64): 5 return int(obj) 6 if isinstance(obj, (bytes, bytearray)): 7 return base64.b64encode(obj).decode('ascii')
(py-src/data_formulator/app.py:37-43)
关键调用链:
configure_logging()设置data_formulator.agents为 INFO 级别_register_blueprints()按顺序加载tables_bp→agent_bp→session_bp→demo_stream_bp- 启动后台 ISS 位置收集器(演示功能)
错误处理:使用 _blueprints_registered 全局标志防止重复注册(py-src/data_formulator/app.py:99-102)
DataAgent 自主探索引擎
职责边界:实现 observe-think-act 自主循环,决定何时可视化、何时澄清、何时结束;不直接执行代码,委托给 DataRecAgent。
入口 API:
run():主循环入口,返回 Generator 流式产出事件(py-src/data_formulator/agents/data_agent.py:161-340)
关键数据结构:
python1# 事件类型定义 2{ 3 "type": "action" | "result" | "clarify" | "completion" | "error", 4 "iteration": int, 5 "thought": str, # Agent 思考过程 6 "question": str, # 可视化问题 7 "content": dict, # 结果数据 8 "trajectory": list, # 对话历史(用于恢复) 9}
(py-src/data_formulator/agents/data_agent.py:171-180)
关键调用链:
正在加载图表渲染器...
错误处理与边界条件:
- LLM 返回无法解析时返回
None,产出error事件(py-src/data_formulator/agents/data_agent.py:401-413) - 代码执行失败时追加错误观察,让 Agent 决定是否重试(py-src/data_formulator/agents/data_agent.py:261-267)
- 达到
max_iterations时强制产出completion事件(py-src/data_formulator/agents/data_agent.py:331-340)
图表语义层
职责边界:提供目标无关的语义类型推断、布局决策、模板匹配;不依赖 React/Redux/UI 层。
入口 API:
assembleVegaLite(input)/assembleECharts(input)/assembleChartjs(input)/assembleGoFish(input):各后端的组装函数(src/lib/agents-chart/index.ts:19-23)
关键数据结构:
typescript1// 输入结构(推断自模块架构) 2interface ChartAssemblyInput { 3 data: { values: any[] }; 4 semantic_types: Record<string, string>; // 字段名 -> 语义类型 5 chart_spec: { 6 chartType: string; 7 encodings: Record<string, string>; 8 config?: object; 9 }; 10 canvas_size?: { width: number; height: number }; 11}
(src/lib/agents-chart/index.ts:35-39)
模块组织:
core/:目标无关的类型定义、语义类型推断、布局决策vegalite//echarts//chartjs//gofish/:各后端的模板定义和规格实例化- 每个后端提供
TemplateDefs/GetTemplateDef/GetTemplateChannels三个注册函数(src/lib/agents-chart/index.ts:26-29)
调用链:Python 端 DataAgent._create_chart() 调用 assemble_vegailte_chart()(Python 包装),最终调用 TypeScript 编译后的语义层。
Workspace 存储抽象
职责边界:提供统一的数据表读写接口,支持本地文件系统和 Azure Blob 两种后端;处理临时文件清理和缓存管理。
入口 API:
get_workspace(identity_id):工厂方法,根据配置返回对应实现(py-src/data_formulator/workspace_factory.py:70-128)get_data_formulator_home():解析主目录路径(py-src/data_formulator/datalake/workspace.py:51-73)
关键数据结构:
python1# 配置来源优先级 2CLI_ARGS = { 3 'workspace_backend': 'local' | 'azure_blob', 4 'data_dir': str | None, 5 'azure_blob_connection_string': str | None, 6 'azure_blob_account_url': str | None, 7 'azure_blob_container': str, 8 'cache_max_mb': int, 9 'global_cache_max_mb': int, 10}
(py-src/data_formulator/workspace_factory.py:77-84)
关键调用链:
get_workspace()读取 Flaskcurrent_app.config['CLI_ARGS']- 若
backend == 'azure_blob',调用_build_azure_container_client() - 返回
CachedAzureBlobWorkspace或本地Workspace
错误处理与边界条件:
- Azure Blob 配置缺失时抛出
ValueError,明确提示需要的环境变量(py-src/data_formulator/workspace_factory.py:62-67) cleanup_stale_temp_files()处理崩溃恢复,清理超过 24 小时的临时文件(py-src/data_formulator/datalake/workspace.py:85-127)
数据流与调用链
以下时序图展示了从用户请求到图表生成的完整数据流:
正在加载图表渲染器...
数据流要点:
-
流式响应:
DataAgent.run()使用 Python Generator 实现流式输出,前端通过 SSE(Server-Sent Events)接收实时进度(py-src/data_formulator/agents/data_agent.py:168) -
对话历史传递:
trajectory列表在每次 LLM 调用后追加 assistant 响应和 user 观察,形成完整的上下文链(py-src/data_formulator/agents/data_agent.py:209-212) -
修复循环:
_execute_visualize()内部实现代码修复循环,最多尝试max_repair_attempts次(py-src/data_formulator/agents/data_agent.py:454-473) -
跨语言调用:Python 端通过
assemble_vegailte_chart()调用 TypeScript 编译后的图表语义层,返回 Vega-Lite spec 后再转换为 base64 图片(py-src/data_formulator/agents/data_agent.py:516-520)
核心设计决策与取舍
| 决策点 | 选择 | 理由 | 已知限制 |
|---|---|---|---|
| Agent 循环模式 | observe-think-act | 允许 Agent 根据执行结果动态调整策略,支持澄清和多步探索 | 循环次数受 max_iterations 限制,复杂任务可能中断 |
| LLM 集成方式 | LiteLLM 统一封装 | 支持 OpenAI、Azure、Ollama 等多后端,配置灵活 | 增加一层抽象,调试时需追踪 LiteLLM 内部行为 |
| 图表后端数量 | 4 种(Vega-Lite、ECharts、Chart.js、GoFish) | 覆盖声明式和命令式两种范式,适应不同场景 | 维护成本高,模板定义需同步更新 |
| 存储后端选择 | 工厂模式 + 配置驱动 | 本地开发用文件系统,生产环境用 Azure Blob,无需改代码 | Azure Blob 需要额外配置凭证和缓存策略 |
| Blueprint 加载时机 | 延迟加载 | 避免启动时加载所有 AI/ML 依赖,加快冷启动 | 首次请求时可能有延迟 |
| SSRF 防护策略 | 多层验证(协议、IP、重定向) | 防止 Agent 访问内网资源,符合安全合规要求 | 可能误拦截某些合法的内部数据源 |
| 临时文件清理 | 后台定时清理 + 崩溃恢复 | 处理异常退出遗留的临时文件 | 24 小时延迟可能占用磁盘空间 |
| 日志级别控制 | Agent 模块 INFO,第三方库 WARNING | 平衡调试需求和日志噪音 | 生产环境可能需要更细粒度控制 |
关键架构模式:
-
Generator 模式:
DataAgent.run()返回 Generator 而非一次性结果,支持流式 UI 更新和早期中断(py-src/data_formulator/agents/data_agent.py:161-168) -
委托模式:
DataAgent将数据转换和图表推荐委托给DataRecAgent,保持职责单一(py-src/data_formulator/agents/data_agent.py:150-155) -
策略模式:图表语义层通过统一的
assemble*()接口支持多种后端,运行时根据配置选择(src/lib/agents-chart/index.ts:19-23) -
工厂模式:
workspace_factory.get_workspace()根据配置动态创建存储实例(py-src/data_formulator/workspace_factory.py:70-128)
技术选型
| 技术 | 用途 | 选型理由 | 替代方案 |
|---|---|---|---|
| Flask | Web 框架 | 轻量级,Blueprint 机制支持模块化,适合中小型 API 服务 | FastAPI(更现代但需迁移成本) |
| LiteLLM | LLM 统一网关 | 支持 100+ 模型,统一 API,简化多模型切换 | 直接使用 OpenAI SDK(仅支持 OpenAI) |
| Vega-Lite | 声明式图表 | 语法简洁,适合 AI 生成,支持交互 | D3.js(学习曲线陡峭) |
| ECharts | 交互式图表 | 功能丰富,中文文档完善,适合复杂可视化 | Highcharts(商业许可) |
| Chart.js | 简单图表 | 轻量,适合基础图表,社区活跃 | Chartist(功能较少) |
| Azure Blob | 云存储 | 与 Azure 生态集成,支持 Managed Identity | AWS S3(需额外配置) |
| Parquet | 数据存储格式 | 列式存储,压缩率高,适合分析场景 | CSV(无压缩,解析慢) |
| TypeScript | 前端类型系统 | 编译时类型检查,减少运行时错误 | JavaScript(无类型保护) |
模块依赖关系
正在加载图表渲染器...
依赖要点:
-
单向依赖:
app.py依赖各 Blueprint,Blueprint 依赖 Agent 层,Agent 层依赖 Client 和 Workspace,形成清晰的层级结构 -
跨语言边界:Python 后端通过
agents-chart调用 TypeScript 编译后的图表语义层,依赖方向为 Python → TypeScript -
外部依赖隔离:LiteLLM、Azure SDK、Pandas 等外部库仅在特定模块中使用,不污染核心业务逻辑
关键配置与启动流程
配置优先级
Workspace 后端选择(py-src/data_formulator/workspace_factory.py:77-84):
| 配置项 | CLI 参数 | 环境变量 | 默认值 |
|---|---|---|---|
| 后端类型 | --workspace-backend | WORKSPACE_BACKEND | local |
| Azure 连接串 | --azure-blob-connection-string | AZURE_BLOB_CONNECTION_STRING | 无 |
| Azure 账户 URL | --azure-blob-account-url | AZURE_BLOB_ACCOUNT_URL | 无 |
| Azure 容器名 | --azure-blob-container | AZURE_BLOB_CONTAINER | data-formulator |
| 缓存上限 | --cache-max-mb | DF_CACHE_MAX_MB | 1024 |
| 全局缓存上限 | --global-cache-max-mb | DF_GLOBAL_CACHE_MAX_MB | 10240 |
主目录解析(py-src/data_formulator/datalake/workspace.py:51-73):
- Flask
app.config['CLI_ARGS']['data_dir'](CLI--data-dir) - 环境变量
DATA_FORMULATOR_HOME - 默认
~/.data_formulator
启动流程
python1# 1. 配置日志 2configure_logging() # app.py:70-88 3 4# 2. 注册 Blueprint(延迟加载) 5_register_blueprints() # app.py:93-125 6# 加载顺序:tables_bp → agent_bp → session_bp → demo_stream_bp 7 8# 3. 启动后台任务 9start_iss_collector() # app.py:125,演示功能 10 11# 4. 请求到达时 12# agent_routes 调用 workspace_factory.get_workspace(identity_id) 13# 返回本地或 Azure Workspace 实例
日志配置
级别控制(py-src/data_formulator/app.py:72-84):
- 全局级别:
ERROR - Agent 模块:
INFO(记录 LLM 调用耗时) - 第三方库(httpx、litellm、openai):
WARNING(抑制冗余日志)
安全配置
SSRF 防护(py-src/data_formulator/agents/web_utils.py:81-108):
- 协议白名单:仅允许 HTTP/HTTPS
- IP 黑名单:私有 IP、环回地址、链路本地地址、云元数据端点(169.254.169.254)
- 重定向验证:每个重定向目标都经过 SSRF 检查
- 超时限制:默认 30 秒,最大 60 秒
