价格

架构总览

相关源文件

本页面内容基于以下源文件生成:

Data Formulator 是一个基于 AI 的数据可视化工具,采用 Python Flask 后端与 TypeScript 前端的分层架构。系统通过多 Agent 协作实现从自然语言到可视化图表的端到端转换,支持多种图表后端(Vega-Lite、ECharts、Chart.js、GoFish)和多种数据存储后端(本地文件系统、Azure Blob)。

系统架构总览

系统采用前后端分离架构,后端基于 Flask 实现 RESTful API,前端通过 TypeScript 实现图表语义层。核心设计理念是将 AI 推理、数据转换、图表渲染三个关注点分离,通过 Workspace 抽象实现数据持久层的可插拔性。

正在加载图表渲染器...

架构要点说明

  1. Blueprint 延迟加载app.py 通过 _register_blueprints() 实现按需加载,避免启动时加载所有 AI/ML 依赖(py-src/data_formulator/app.py:93-122

  2. Agent 层级协作DataAgent 作为主控 Agent 实现 observe-think-act 循环,内部委托 DataRecAgent 执行具体的数据转换和图表推荐(py-src/data_formulator/agents/data_agent.py:131-156

  3. 图表后端可插拔agents-chart 模块提供统一的语义层 API,支持 Vega-Lite、ECharts、Chart.js、GoFish 四种渲染后端(src/lib/agents-chart/index.ts:12-18

  4. Workspace 工厂模式:通过 workspace_factory.get_workspace() 根据配置动态选择本地或 Azure Blob 存储,实现存储层的透明切换(py-src/data_formulator/workspace_factory.py:70-128

  5. 安全边界web_utils.py 实现了完整的 SSRF 防护,包括私有 IP 阻断、协议白名单、重定向验证(py-src/data_formulator/agents/web_utils.py:56-110

核心模块详解

Flask 应用入口

职责边界:负责应用生命周期管理、Blueprint 注册、日志配置,不处理具体业务逻辑。

入口 API

关键数据结构

python
1# CustomJSONEncoder 处理 numpy 类型和 bytes 序列化
2class CustomJSONEncoder(json.JSONEncoder):
3    def default(self, obj):
4        if isinstance(obj, np.int64):
5            return int(obj)
6        if isinstance(obj, (bytes, bytearray)):
7            return base64.b64encode(obj).decode('ascii')

py-src/data_formulator/app.py:37-43

关键调用链

  1. configure_logging() 设置 data_formulator.agents 为 INFO 级别
  2. _register_blueprints() 按顺序加载 tables_bpagent_bpsession_bpdemo_stream_bp
  3. 启动后台 ISS 位置收集器(演示功能)

错误处理:使用 _blueprints_registered 全局标志防止重复注册(py-src/data_formulator/app.py:99-102

DataAgent 自主探索引擎

职责边界:实现 observe-think-act 自主循环,决定何时可视化、何时澄清、何时结束;不直接执行代码,委托给 DataRecAgent

入口 API

关键数据结构

python
1# 事件类型定义
2{
3    "type": "action" | "result" | "clarify" | "completion" | "error",
4    "iteration": int,
5    "thought": str,      # Agent 思考过程
6    "question": str,     # 可视化问题
7    "content": dict,     # 结果数据
8    "trajectory": list,  # 对话历史(用于恢复)
9}

py-src/data_formulator/agents/data_agent.py:171-180

关键调用链

正在加载图表渲染器...

错误处理与边界条件

图表语义层

职责边界:提供目标无关的语义类型推断、布局决策、模板匹配;不依赖 React/Redux/UI 层。

入口 API

关键数据结构

typescript
1// 输入结构(推断自模块架构)
2interface ChartAssemblyInput {
3  data: { values: any[] };
4  semantic_types: Record<string, string>; // 字段名 -> 语义类型
5  chart_spec: {
6    chartType: string;
7    encodings: Record<string, string>;
8    config?: object;
9  };
10  canvas_size?: { width: number; height: number };
11}

src/lib/agents-chart/index.ts:35-39

模块组织

  • core/:目标无关的类型定义、语义类型推断、布局决策
  • vegalite/ / echarts/ / chartjs/ / gofish/:各后端的模板定义和规格实例化
  • 每个后端提供 TemplateDefs / GetTemplateDef / GetTemplateChannels 三个注册函数(src/lib/agents-chart/index.ts:26-29

调用链:Python 端 DataAgent._create_chart() 调用 assemble_vegailte_chart()(Python 包装),最终调用 TypeScript 编译后的语义层。

Workspace 存储抽象

职责边界:提供统一的数据表读写接口,支持本地文件系统和 Azure Blob 两种后端;处理临时文件清理和缓存管理。

入口 API

关键数据结构

python
1# 配置来源优先级
2CLI_ARGS = {
3    'workspace_backend': 'local' | 'azure_blob',
4    'data_dir': str | None,
5    'azure_blob_connection_string': str | None,
6    'azure_blob_account_url': str | None,
7    'azure_blob_container': str,
8    'cache_max_mb': int,
9    'global_cache_max_mb': int,
10}

py-src/data_formulator/workspace_factory.py:77-84

关键调用链

  1. get_workspace() 读取 Flask current_app.config['CLI_ARGS']
  2. backend == 'azure_blob',调用 _build_azure_container_client()
  3. 返回 CachedAzureBlobWorkspace 或本地 Workspace

错误处理与边界条件

数据流与调用链

以下时序图展示了从用户请求到图表生成的完整数据流:

正在加载图表渲染器...

数据流要点

  1. 流式响应DataAgent.run() 使用 Python Generator 实现流式输出,前端通过 SSE(Server-Sent Events)接收实时进度(py-src/data_formulator/agents/data_agent.py:168

  2. 对话历史传递trajectory 列表在每次 LLM 调用后追加 assistant 响应和 user 观察,形成完整的上下文链(py-src/data_formulator/agents/data_agent.py:209-212

  3. 修复循环_execute_visualize() 内部实现代码修复循环,最多尝试 max_repair_attempts 次(py-src/data_formulator/agents/data_agent.py:454-473

  4. 跨语言调用:Python 端通过 assemble_vegailte_chart() 调用 TypeScript 编译后的图表语义层,返回 Vega-Lite spec 后再转换为 base64 图片(py-src/data_formulator/agents/data_agent.py:516-520

核心设计决策与取舍

决策点选择理由已知限制
Agent 循环模式observe-think-act允许 Agent 根据执行结果动态调整策略,支持澄清和多步探索循环次数受 max_iterations 限制,复杂任务可能中断
LLM 集成方式LiteLLM 统一封装支持 OpenAI、Azure、Ollama 等多后端,配置灵活增加一层抽象,调试时需追踪 LiteLLM 内部行为
图表后端数量4 种(Vega-Lite、ECharts、Chart.js、GoFish)覆盖声明式和命令式两种范式,适应不同场景维护成本高,模板定义需同步更新
存储后端选择工厂模式 + 配置驱动本地开发用文件系统,生产环境用 Azure Blob,无需改代码Azure Blob 需要额外配置凭证和缓存策略
Blueprint 加载时机延迟加载避免启动时加载所有 AI/ML 依赖,加快冷启动首次请求时可能有延迟
SSRF 防护策略多层验证(协议、IP、重定向)防止 Agent 访问内网资源,符合安全合规要求可能误拦截某些合法的内部数据源
临时文件清理后台定时清理 + 崩溃恢复处理异常退出遗留的临时文件24 小时延迟可能占用磁盘空间
日志级别控制Agent 模块 INFO,第三方库 WARNING平衡调试需求和日志噪音生产环境可能需要更细粒度控制

关键架构模式

  1. Generator 模式DataAgent.run() 返回 Generator 而非一次性结果,支持流式 UI 更新和早期中断(py-src/data_formulator/agents/data_agent.py:161-168

  2. 委托模式DataAgent 将数据转换和图表推荐委托给 DataRecAgent,保持职责单一(py-src/data_formulator/agents/data_agent.py:150-155

  3. 策略模式:图表语义层通过统一的 assemble*() 接口支持多种后端,运行时根据配置选择(src/lib/agents-chart/index.ts:19-23

  4. 工厂模式workspace_factory.get_workspace() 根据配置动态创建存储实例(py-src/data_formulator/workspace_factory.py:70-128

技术选型

技术用途选型理由替代方案
FlaskWeb 框架轻量级,Blueprint 机制支持模块化,适合中小型 API 服务FastAPI(更现代但需迁移成本)
LiteLLMLLM 统一网关支持 100+ 模型,统一 API,简化多模型切换直接使用 OpenAI SDK(仅支持 OpenAI)
Vega-Lite声明式图表语法简洁,适合 AI 生成,支持交互D3.js(学习曲线陡峭)
ECharts交互式图表功能丰富,中文文档完善,适合复杂可视化Highcharts(商业许可)
Chart.js简单图表轻量,适合基础图表,社区活跃Chartist(功能较少)
Azure Blob云存储与 Azure 生态集成,支持 Managed IdentityAWS S3(需额外配置)
Parquet数据存储格式列式存储,压缩率高,适合分析场景CSV(无压缩,解析慢)
TypeScript前端类型系统编译时类型检查,减少运行时错误JavaScript(无类型保护)

模块依赖关系

正在加载图表渲染器...

依赖要点

  1. 单向依赖app.py 依赖各 Blueprint,Blueprint 依赖 Agent 层,Agent 层依赖 Client 和 Workspace,形成清晰的层级结构

  2. 跨语言边界:Python 后端通过 agents-chart 调用 TypeScript 编译后的图表语义层,依赖方向为 Python → TypeScript

  3. 外部依赖隔离:LiteLLM、Azure SDK、Pandas 等外部库仅在特定模块中使用,不污染核心业务逻辑

关键配置与启动流程

配置优先级

Workspace 后端选择py-src/data_formulator/workspace_factory.py:77-84):

配置项CLI 参数环境变量默认值
后端类型--workspace-backendWORKSPACE_BACKENDlocal
Azure 连接串--azure-blob-connection-stringAZURE_BLOB_CONNECTION_STRING
Azure 账户 URL--azure-blob-account-urlAZURE_BLOB_ACCOUNT_URL
Azure 容器名--azure-blob-containerAZURE_BLOB_CONTAINERdata-formulator
缓存上限--cache-max-mbDF_CACHE_MAX_MB1024
全局缓存上限--global-cache-max-mbDF_GLOBAL_CACHE_MAX_MB10240

主目录解析py-src/data_formulator/datalake/workspace.py:51-73):

  1. Flask app.config['CLI_ARGS']['data_dir'](CLI --data-dir
  2. 环境变量 DATA_FORMULATOR_HOME
  3. 默认 ~/.data_formulator

启动流程

python
1# 1. 配置日志
2configure_logging()  # app.py:70-88
3
4# 2. 注册 Blueprint(延迟加载)
5_register_blueprints()  # app.py:93-125
6# 加载顺序:tables_bp → agent_bp → session_bp → demo_stream_bp
7
8# 3. 启动后台任务
9start_iss_collector()  # app.py:125,演示功能
10
11# 4. 请求到达时
12# agent_routes 调用 workspace_factory.get_workspace(identity_id)
13# 返回本地或 Azure Workspace 实例

日志配置

级别控制py-src/data_formulator/app.py:72-84):

  • 全局级别:ERROR
  • Agent 模块:INFO(记录 LLM 调用耗时)
  • 第三方库(httpx、litellm、openai):WARNING(抑制冗余日志)

安全配置

SSRF 防护py-src/data_formulator/agents/web_utils.py:81-108):

  • 协议白名单:仅允许 HTTP/HTTPS
  • IP 黑名单:私有 IP、环回地址、链路本地地址、云元数据端点(169.254.169.254)
  • 重定向验证:每个重定向目标都经过 SSRF 检查
  • 超时限制:默认 30 秒,最大 60 秒