Appearance
RAGFlow 节点开发完整指南
目录
1. 开发环境准备
1.1 环境要求
bash
# 后端环境
Python 3.10+
uv (Python包管理器)
Docker & Docker Compose
# 前端环境
Node.js 18+
npm 或 yarn1.2 项目启动
bash
# 1. 克隆项目
git clone https://github.com/infiniflow/ragflow.git
cd ragflow
# 2. 启动后端依赖服务
docker compose -f docker/docker-compose-base.yml up -d
# 3. 安装后端依赖
uv sync --python 3.10 --all-extras
source .venv/bin/activate
export PYTHONPATH=$(pwd)
# 4. 启动后端服务
bash docker/launch_backend_service.sh
# 5. 启动前端服务
cd web
npm install
npm run dev2. 后端组件开发
2.1 创建参数类 (Step 1)
创建文件:agent/component/my_custom_component.py
python
from agent.component.base import ComponentParamBase
import re
class MyCustomComponentParam(ComponentParamBase):
"""
自定义组件参数类
定义组件所需的所有配置参数
"""
def __init__(self):
super().__init__()
# 必填参数
self.input_text = "" # 输入文本
self.processing_mode = "simple" # 处理模式: simple/advanced
# 可选参数
self.max_length = 1000 # 最大长度限制
self.enable_cache = True # 是否启用缓存
self.custom_rules = [] # 自定义规则列表
# 高级配置
self.timeout = 30 # 超时时间(秒)
self.retry_count = 3 # 重试次数
def check(self):
"""
参数验证方法
在组件执行前会自动调用此方法验证参数
"""
# 检查必填参数
self.check_empty(["input_text"], "输入文本不能为空")
# 检查枚举值
self.check_valid_value(
self.processing_mode,
"处理模式",
["simple", "advanced", "expert"]
)
# 检查数值范围
if self.max_length <= 0 or self.max_length > 10000:
raise ValueError("最大长度必须在1-10000之间")
if self.timeout <= 0 or self.timeout > 300:
raise ValueError("超时时间必须在1-300秒之间")
# 检查自定义规则格式
for rule in self.custom_rules:
if not isinstance(rule, dict) or 'pattern' not in rule:
raise ValueError("自定义规则格式错误,必须包含pattern字段")
def get_openai_message(self):
"""
为LLM调用生成消息格式 (可选)
如果组件需要调用LLM,实现此方法
"""
return [{"role": "user", "content": self.input_text}]2.2 实现组件类 (Step 2)
在同一文件中继续添加:
python
from agent.component.base import ComponentBase
import pandas as pd
import time
import hashlib
import json
class MyCustomComponent(ComponentBase):
"""
自定义组件实现类
核心业务逻辑在这里实现
"""
component_name = "MyCustomComponent" # 组件名称,必须与类名一致
def _run(self, history, **kwargs):
"""
组件核心执行方法
Args:
history: 对话历史记录
**kwargs: 额外参数
Returns:
pd.DataFrame: 包含执行结果的DataFrame
"""
# 1. 获取上游组件的输入数据
input_df = self.get_input()
# 2. 如果没有上游输入,使用参数中的默认文本
if input_df.empty:
input_text = self._param.input_text
else:
# 合并所有上游组件的输出
input_text = " ".join(input_df["content"].tolist())
# 3. 根据处理模式执行不同逻辑
if self._param.processing_mode == "simple":
result = self._simple_process(input_text)
elif self._param.processing_mode == "advanced":
result = self._advanced_process(input_text)
else: # expert
result = self._expert_process(input_text)
# 4. 应用自定义规则
result = self._apply_custom_rules(result)
# 5. 返回标准格式的DataFrame
return pd.DataFrame([{
"content": result, # 主要输出内容
"component_id": self._id, # 组件ID
"processing_mode": self._param.processing_mode, # 附加信息
"input_length": len(input_text), # 输入长度
"output_length": len(result), # 输出长度
"reference": [] # 引用信息(如果有)
}])
def _simple_process(self, text):
"""简单处理模式:文本清理和基础转换"""
# 去除多余空白
text = re.sub(r'\s+', ' ', text.strip())
# 长度限制
if len(text) > self._param.max_length:
text = text[:self._param.max_length] + "..."
return f"[简单处理] {text}"
def _advanced_process(self, text):
"""高级处理模式:包含更复杂的文本分析"""
# 统计信息
word_count = len(text.split())
char_count = len(text)
# 提取关键信息
sentences = text.split('。')
key_sentences = [s for s in sentences if len(s) > 10][:3]
# 生成摘要
summary = "关键信息:" + ";".join(key_sentences)
result = f"""[高级处理结果]
原文长度:{char_count}字符,{word_count}词
关键摘要:{summary}
处理时间:{time.strftime('%Y-%m-%d %H:%M:%S')}"""
return result
def _expert_process(self, text):
"""专家处理模式:调用外部服务或LLM"""
try:
# 模拟调用LLM进行深度分析
# 在实际项目中,这里会调用具体的LLM API
analysis_prompt = f"""
请对以下文本进行深度分析:
1. 主题识别
2. 情感分析
3. 关键实体提取
4. 总结要点
文本内容:
{text}
"""
# 这里应该调用实际的LLM服务
# result = self._call_llm(analysis_prompt)
# 模拟LLM响应
result = f"""[专家分析结果]
主题:文本分析与处理
情感:中性
关键实体:文本、处理、分析
要点总结:这是一段需要进行深度分析的文本内容
处理模式:专家级
文本特征哈希:{hashlib.md5(text.encode()).hexdigest()[:8]}"""
return result
except Exception as e:
# 错误处理:降级到高级处理模式
return f"[专家处理异常,降级处理] {self._advanced_process(text)}\n错误信息:{str(e)}"
def _apply_custom_rules(self, text):
"""应用用户自定义的处理规则"""
for rule in self._param.custom_rules:
pattern = rule.get('pattern', '')
replacement = rule.get('replacement', '')
if pattern and isinstance(pattern, str):
try:
text = re.sub(pattern, replacement, text)
except re.error:
# 忽略无效的正则表达式
continue
return text
def _call_llm(self, prompt):
"""
调用LLM服务的通用方法
在实际项目中连接到具体的LLM API
"""
# 这里是LLM调用的占位符
# 实际实现中会调用OpenAI、Claude等API
pass2.3 注册组件 (Step 3)
编辑 agent/component/__init__.py:
python
# 在文件末尾添加新组件的导入
from .my_custom_component import MyCustomComponent, MyCustomComponentParam
# 更新 __all__ 列表
__all__ = [
# ... 现有组件
"MyCustomComponent",
"MyCustomComponentParam",
]2.4 编写单元测试 (Step 4)
创建文件:test/test_my_custom_component.py
python
import pytest
import pandas as pd
from unittest.mock import Mock, patch
from agent.component.my_custom_component import MyCustomComponent, MyCustomComponentParam
class TestMyCustomComponent:
def setup_method(self):
"""测试前的准备工作"""
self.canvas_mock = Mock()
self.component_id = "test_component_001"
# 创建参数实例
self.param = MyCustomComponentParam()
self.param.input_text = "这是一个测试文本,用于验证组件功能"
self.param.processing_mode = "simple"
self.param.max_length = 100
# 创建组件实例
self.component = MyCustomComponent(
canvas=self.canvas_mock,
component_id=self.component_id,
param=self.param
)
def test_param_validation_success(self):
"""测试参数验证成功的情况"""
# 应该不抛出异常
self.param.check()
def test_param_validation_empty_text(self):
"""测试空文本验证失败"""
self.param.input_text = ""
with pytest.raises(ValueError, match="输入文本不能为空"):
self.param.check()
def test_param_validation_invalid_mode(self):
"""测试无效处理模式"""
self.param.processing_mode = "invalid_mode"
with pytest.raises(ValueError, match="处理模式"):
self.param.check()
def test_simple_processing(self):
"""测试简单处理模式"""
# Mock get_input 返回空DataFrame
self.component.get_input = Mock(return_value=pd.DataFrame())
result_df = self.component._run(history=[])
assert not result_df.empty
assert "简单处理" in result_df.iloc[0]["content"]
assert result_df.iloc[0]["component_id"] == self.component_id
assert result_df.iloc[0]["processing_mode"] == "simple"
def test_advanced_processing(self):
"""测试高级处理模式"""
self.param.processing_mode = "advanced"
self.component.get_input = Mock(return_value=pd.DataFrame())
result_df = self.component._run(history=[])
assert "高级处理结果" in result_df.iloc[0]["content"]
assert "原文长度" in result_df.iloc[0]["content"]
def test_expert_processing(self):
"""测试专家处理模式"""
self.param.processing_mode = "expert"
self.component.get_input = Mock(return_value=pd.DataFrame())
result_df = self.component._run(history=[])
assert "专家分析结果" in result_df.iloc[0]["content"]
def test_with_upstream_input(self):
"""测试处理上游组件输入"""
upstream_data = pd.DataFrame([
{"content": "上游组件输出1", "component_id": "upstream1"},
{"content": "上游组件输出2", "component_id": "upstream2"}
])
self.component.get_input = Mock(return_value=upstream_data)
result_df = self.component._run(history=[])
# 验证合并了上游输入
assert not result_df.empty
result_content = result_df.iloc[0]["content"]
assert "上游组件输出1 上游组件输出2" in result_content or "简单处理" in result_content
def test_custom_rules_application(self):
"""测试自定义规则应用"""
self.param.custom_rules = [
{"pattern": r"测试", "replacement": "验证"}
]
self.component.get_input = Mock(return_value=pd.DataFrame())
result_df = self.component._run(history=[])
# 验证规则被应用
assert "验证" in result_df.iloc[0]["content"]
def test_max_length_limit(self):
"""测试长度限制"""
self.param.input_text = "a" * 200 # 超过max_length(100)的文本
self.param.max_length = 50
self.component.get_input = Mock(return_value=pd.DataFrame())
result_df = self.component._run(history=[])
# 验证长度被限制
content = result_df.iloc[0]["content"]
assert len(content) <= 100 # 包含处理标识的总长度
assert "..." in content
# 运行测试
if __name__ == "__main__":
pytest.main([__file__, "-v"])3. 前端节点开发
3.1 定义操作符枚举 (Step 1)
编辑 web/src/pages/flow/constant.tsx:
typescript
// 1. 添加新的操作符枚举
export enum Operator {
// ... 现有操作符
MyCustomComponent = 'MyCustomComponent',
}
// 2. 添加节点类型映射
export const NodeMap = {
// ... 现有映射
[Operator.MyCustomComponent]: 'myCustomNode',
};
// 3. 添加操作符描述
export const OperatorDescription = {
// ... 现有描述
[Operator.MyCustomComponent]: '自定义文本处理组件,支持多种处理模式和自定义规则',
};
// 4. 添加操作符图标
export const operatorIcon: Record<Operator, string> = {
// ... 现有图标
[Operator.MyCustomComponent]: 'my-custom-icon.svg',
};
// 5. 添加操作符分类
export const SidebarGroupMap: Record<SidebarGroupKey, Operator[]> = {
// ... 现有分类
textProcessing: [
// ... 现有组件
Operator.MyCustomComponent,
],
};3.2 创建节点组件 (Step 2)
创建文件:web/src/pages/flow/canvas/node/my-custom-node.tsx
typescript
import { Handle, Position, NodeProps } from '@xyflow/react';
import { Flex } from 'antd';
import React from 'react';
import NodeHeader from '../node-header';
import { Operator } from '../../constant';
import styles from '../index.less';
// 定义节点数据接口
interface IMyCustomNode {
id: string;
data: {
label: Operator.MyCustomComponent;
name: string;
form: IMyCustomComponentForm;
};
}
// 定义表单数据接口
export interface IMyCustomComponentForm {
input_text: string;
processing_mode: 'simple' | 'advanced' | 'expert';
max_length: number;
enable_cache: boolean;
custom_rules: Array<{
pattern: string;
replacement: string;
}>;
timeout: number;
retry_count: number;
}
// 节点组件实现
export function MyCustomNode({
id,
data,
isConnectable,
selected
}: NodeProps<IMyCustomNode>) {
return (
<section
className={`${styles.ragNode} ${selected ? styles.selectedNode : ''}`}
>
{/* 输入连接点 */}
<Handle
type="target"
position={Position.Left}
isConnectable={isConnectable}
className={styles.handle}
/>
{/* 输出连接点 */}
<Handle
type="source"
position={Position.Right}
isConnectable={isConnectable}
className={styles.handle}
/>
{/* 节点头部 */}
<NodeHeader
id={id}
name={data.name}
label={data.label}
/>
{/* 节点内容 */}
<div className={styles.nodeBody}>
<Flex vertical gap={4}>
{/* 处理模式显示 */}
<div className={styles.nodeInfo}>
<span className={styles.nodeInfoLabel}>模式:</span>
<span className={styles.nodeInfoValue}>
{getModeDisplayName(data.form.processing_mode)}
</span>
</div>
{/* 输入文本预览 */}
{data.form.input_text && (
<div className={styles.nodeInfo}>
<span className={styles.nodeInfoLabel}>输入:</span>
<span
className={styles.nodeInfoValue}
title={data.form.input_text}
>
{data.form.input_text.length > 20
? `${data.form.input_text.substring(0, 20)}...`
: data.form.input_text
}
</span>
</div>
)}
{/* 配置状态指示器 */}
<div className={styles.nodeStatus}>
{data.form.custom_rules.length > 0 && (
<span className={styles.statusBadge}>
{data.form.custom_rules.length} 规则
</span>
)}
{data.form.enable_cache && (
<span className={styles.statusBadge}>缓存</span>
)}
</div>
</Flex>
</div>
</section>
);
}
// 处理模式显示名称映射
function getModeDisplayName(mode: string): string {
const modeMap = {
simple: '简单',
advanced: '高级',
expert: '专家'
};
return modeMap[mode as keyof typeof modeMap] || mode;
}
export default MyCustomNode;3.3 创建配置表单 (Step 3)
创建文件:web/src/pages/flow/form/my-custom-form/index.tsx
typescript
import {
Form,
Input,
Select,
Switch,
InputNumber,
Button,
Space,
Card,
Divider
} from 'antd';
import { PlusOutlined, DeleteOutlined } from '@ant-design/icons';
import React, { useCallback } from 'react';
import { IOperatorForm } from '../../interface';
import { IMyCustomComponentForm } from '../../canvas/node/my-custom-node';
const { TextArea } = Input;
const { Option } = Select;
interface MyCustomFormProps extends IOperatorForm {
initialValues?: Partial<IMyCustomComponentForm>;
}
const MyCustomForm: React.FC<MyCustomFormProps> = ({
onValuesChange,
form,
initialValues
}) => {
// 添加自定义规则
const addCustomRule = useCallback(() => {
const rules = form.getFieldValue('custom_rules') || [];
const newRules = [...rules, { pattern: '', replacement: '' }];
form.setFieldsValue({ custom_rules: newRules });
onValuesChange({ custom_rules: newRules }, form.getFieldsValue());
}, [form, onValuesChange]);
// 删除自定义规则
const removeCustomRule = useCallback((index: number) => {
const rules = form.getFieldValue('custom_rules') || [];
const newRules = rules.filter((_: any, i: number) => i !== index);
form.setFieldsValue({ custom_rules: newRules });
onValuesChange({ custom_rules: newRules }, form.getFieldsValue());
}, [form, onValuesChange]);
return (
<Form
form={form}
layout="vertical"
onValuesChange={onValuesChange}
initialValues={{
input_text: '',
processing_mode: 'simple',
max_length: 1000,
enable_cache: true,
custom_rules: [],
timeout: 30,
retry_count: 3,
...initialValues
}}
>
{/* 基础配置 */}
<Card title="基础配置" size="small">
<Form.Item
name="input_text"
label="输入文本"
tooltip="当没有上游组件输入时使用此文本"
rules={[
{ required: true, message: '请输入文本内容' },
{ min: 1, max: 5000, message: '文本长度应在1-5000字符之间' }
]}
>
<TextArea
rows={4}
placeholder="请输入要处理的文本内容..."
showCount
maxLength={5000}
/>
</Form.Item>
<Form.Item
name="processing_mode"
label="处理模式"
tooltip="选择不同的文本处理方式"
rules={[{ required: true, message: '请选择处理模式' }]}
>
<Select placeholder="请选择处理模式">
<Option value="simple">简单模式 - 基础文本清理</Option>
<Option value="advanced">高级模式 - 文本分析统计</Option>
<Option value="expert">专家模式 - AI深度分析</Option>
</Select>
</Form.Item>
</Card>
<Divider />
{/* 高级配置 */}
<Card title="高级配置" size="small">
<Form.Item
name="max_length"
label="最大长度"
tooltip="限制输出文本的最大字符数"
rules={[
{ required: true, message: '请设置最大长度' },
{ type: 'number', min: 1, max: 10000, message: '长度应在1-10000之间' }
]}
>
<InputNumber
min={1}
max={10000}
style={{ width: '100%' }}
placeholder="输入最大字符数"
/>
</Form.Item>
<Form.Item
name="enable_cache"
label="启用缓存"
tooltip="缓存相同输入的处理结果,提高性能"
valuePropName="checked"
>
<Switch checkedChildren="开启" unCheckedChildren="关闭" />
</Form.Item>
<Form.Item
name="timeout"
label="超时时间(秒)"
tooltip="组件执行的最大等待时间"
rules={[
{ required: true, message: '请设置超时时间' },
{ type: 'number', min: 1, max: 300, message: '超时时间应在1-300秒之间' }
]}
>
<InputNumber
min={1}
max={300}
style={{ width: '100%' }}
placeholder="输入超时时间"
/>
</Form.Item>
<Form.Item
name="retry_count"
label="重试次数"
tooltip="执行失败时的重试次数"
rules={[
{ required: true, message: '请设置重试次数' },
{ type: 'number', min: 0, max: 10, message: '重试次数应在0-10之间' }
]}
>
<InputNumber
min={0}
max={10}
style={{ width: '100%' }}
placeholder="输入重试次数"
/>
</Form.Item>
</Card>
<Divider />
{/* 自定义规则 */}
<Card
title="自定义规则"
size="small"
extra={
<Button
type="link"
icon={<PlusOutlined />}
onClick={addCustomRule}
size="small"
>
添加规则
</Button>
}
>
<Form.List name="custom_rules">
{(fields, { add, remove }) => (
<>
{fields.map(({ key, name, ...restField }) => (
<Card key={key} size="small" style={{ marginBottom: 8 }}>
<Space direction="vertical" style={{ width: '100%' }}>
<Form.Item
{...restField}
name={[name, 'pattern']}
label="匹配模式(正则表达式)"
rules={[
{ required: true, message: '请输入匹配模式' }
]}
>
<Input placeholder="例如: \d{4}-\d{2}-\d{2}" />
</Form.Item>
<Form.Item
{...restField}
name={[name, 'replacement']}
label="替换内容"
rules={[
{ required: true, message: '请输入替换内容' }
]}
>
<Input placeholder="替换后的内容" />
</Form.Item>
<Button
type="link"
danger
icon={<DeleteOutlined />}
onClick={() => removeCustomRule(name)}
size="small"
>
删除规则
</Button>
</Space>
</Card>
))}
{fields.length === 0 && (
<div style={{
textAlign: 'center',
color: '#999',
padding: '20px 0'
}}>
暂无自定义规则,点击上方"添加规则"按钮添加
</div>
)}
</>
)}
</Form.List>
</Card>
</Form>
);
};
export default MyCustomForm;3.4 注册前端组件 (Step 4)
编辑相关配置文件:
typescript
// 1. 更新 web/src/pages/flow/constant.tsx 中的 nodeTypes
export const nodeTypes: NodeTypes = {
// ... 现有节点类型
myCustomNode: MyCustomNode,
};
// 2. 更新表单映射
const FormMap = {
// ... 现有表单映射
[Operator.MyCustomComponent]: MyCustomForm,
};
// 3. 添加节点图标 (将图标文件放到 web/src/assets/svg/)
export const operatorIcon: Record<Operator, string> = {
// ... 现有图标
[Operator.MyCustomComponent]: 'my-custom-icon.svg',
};4. 测试与调试
4.1 后端单元测试
bash
# 运行特定组件的测试
pytest test/test_my_custom_component.py -v
# 运行所有组件测试
pytest test/ -k "component" -v4.2 前端组件测试
创建文件:web/src/pages/flow/form/my-custom-form/__tests__/index.test.tsx
typescript
import React from 'react';
import { render, screen, fireEvent, waitFor } from '@testing-library/react';
import { Form } from 'antd';
import '@testing-library/jest-dom';
import MyCustomForm from '../index';
describe('MyCustomForm', () => {
let form: any;
let mockOnValuesChange: jest.Mock;
beforeEach(() => {
form = Form.useForm()[0];
mockOnValuesChange = jest.fn();
});
test('renders form with all fields', () => {
render(
<MyCustomForm
form={form}
onValuesChange={mockOnValuesChange}
/>
);
expect(screen.getByLabelText(/输入文本/)).toBeInTheDocument();
expect(screen.getByLabelText(/处理模式/)).toBeInTheDocument();
expect(screen.getByLabelText(/最大长度/)).toBeInTheDocument();
expect(screen.getByLabelText(/启用缓存/)).toBeInTheDocument();
});
test('validates required fields', async () => {
render(
<MyCustomForm
form={form}
onValuesChange={mockOnValuesChange}
/>
);
// 尝试提交空表单
fireEvent.click(screen.getByText('确定')); // 假设有提交按钮
await waitFor(() => {
expect(screen.getByText(/请输入文本内容/)).toBeInTheDocument();
});
});
test('adds and removes custom rules', async () => {
render(
<MyCustomForm
form={form}
onValuesChange={mockOnValuesChange}
/>
);
// 添加规则
fireEvent.click(screen.getByText(/添加规则/));
await waitFor(() => {
expect(screen.getByPlaceholderText(/匹配模式/)).toBeInTheDocument();
});
// 删除规则
fireEvent.click(screen.getByText(/删除规则/));
await waitFor(() => {
expect(screen.queryByPlaceholderText(/匹配模式/)).not.toBeInTheDocument();
});
});
});4.3 集成测试
创建完整的工作流测试:
typescript
// web/src/pages/flow/__tests__/integration.test.tsx
describe('MyCustomComponent Integration', () => {
test('creates and configures custom component in workflow', async () => {
// 1. 渲染工作流编辑器
// 2. 从侧边栏拖拽自定义组件到画布
// 3. 配置组件参数
// 4. 连接上下游组件
// 5. 验证工作流可以正常保存和执行
});
});5. 部署与发布
5.1 版本管理
在组件文件中添加版本信息:
python
# agent/component/my_custom_component.py
class MyCustomComponent(ComponentBase):
component_name = "MyCustomComponent"
version = "1.0.0" # 版本号
description = "自定义文本处理组件" # 描述
author = "Your Name" # 作者
# ... 其他代码5.2 文档生成
创建组件文档:docs/components/my-custom-component.md
markdown
# MyCustomComponent 自定义组件
## 概述
自定义文本处理组件,支持多种处理模式和自定义规则配置。
## 参数说明
| 参数名 | 类型 | 必填 | 默认值 | 说明 |
|--------|------|------|--------|------|
| input_text | string | 是 | - | 输入文本 |
| processing_mode | enum | 是 | simple | 处理模式 |
| max_length | number | 否 | 1000 | 最大长度 |
| enable_cache | boolean | 否 | true | 是否启用缓存 |
## 使用示例
1. 拖拽组件到画布
2. 配置输入文本和处理模式
3. 连接上下游组件
4. 运行工作流
## 版本历史
- v1.0.0: 初始版本5.3 打包发布
bash
# 构建前端
cd web
npm run build
# 构建后端Docker镜像
docker build -t ragflow:custom-component .
# 推送到镜像仓库
docker tag ragflow:custom-component your-registry/ragflow:custom-component
docker push your-registry/ragflow:custom-component6. 实战案例:JSON解析组件
让我们通过一个完整的实战案例来演示整个开发流程:
6.1 需求分析
开发一个JSON解析组件,功能包括:
- 解析JSON字符串
- 提取指定字段
- 数据类型转换
- 错误处理
6.2 后端实现
python
# agent/component/json_parser.py
import json
import pandas as pd
from typing import Any, Dict, List
from agent.component.base import ComponentBase, ComponentParamBase
class JsonParserParam(ComponentParamBase):
def __init__(self):
super().__init__()
self.json_path = "" # JSONPath表达式
self.extract_fields = [] # 要提取的字段列表
self.default_value = None # 默认值
self.strict_mode = False # 严格模式
def check(self):
self.check_empty(["json_path"], "JSONPath路径不能为空")
class JsonParser(ComponentBase):
component_name = "JsonParser"
def _run(self, history, **kwargs):
input_df = self.get_input()
results = []
for _, row in input_df.iterrows():
try:
# 解析JSON
json_data = json.loads(row["content"])
# 提取数据
extracted = self._extract_data(json_data)
results.append({
"content": json.dumps(extracted, ensure_ascii=False),
"component_id": self._id,
"extracted_fields": list(extracted.keys()),
"reference": []
})
except json.JSONDecodeError as e:
if self._param.strict_mode:
raise e
else:
# 返回默认值
results.append({
"content": json.dumps(self._param.default_value or {}),
"component_id": self._id,
"error": str(e),
"reference": []
})
return pd.DataFrame(results)
def _extract_data(self, json_data: Dict[str, Any]) -> Dict[str, Any]:
"""提取指定字段的数据"""
if not self._param.extract_fields:
return json_data
extracted = {}
for field in self._param.extract_fields:
# 支持嵌套字段访问,如 "user.name"
value = self._get_nested_value(json_data, field)
extracted[field] = value
return extracted
def _get_nested_value(self, data: Dict, path: str) -> Any:
"""获取嵌套字段的值"""
keys = path.split('.')
current = data
for key in keys:
if isinstance(current, dict) and key in current:
current = current[key]
else:
return self._param.default_value
return current6.3 前端实现
typescript
// web/src/pages/flow/canvas/node/json-parser-node.tsx
export interface IJsonParserForm {
json_path: string;
extract_fields: string[];
default_value: any;
strict_mode: boolean;
}
export function JsonParserNode({ id, data, isConnectable, selected }: NodeProps) {
return (
<section className={`${styles.ragNode} ${selected ? styles.selectedNode : ''}`}>
<Handle type="target" position={Position.Left} isConnectable={isConnectable} />
<Handle type="source" position={Position.Right} isConnectable={isConnectable} />
<NodeHeader id={id} name={data.name} label={data.label} />
<div className={styles.nodeBody}>
<div className={styles.nodeInfo}>
<span>提取字段: {data.form.extract_fields.length || 0}</span>
</div>
{data.form.strict_mode && (
<span className={styles.statusBadge}>严格模式</span>
)}
</div>
</section>
);
}typescript
// web/src/pages/flow/form/json-parser-form/index.tsx
const JsonParserForm: React.FC<IOperatorForm> = ({ onValuesChange, form }) => {
return (
<Form form={form} layout="vertical" onValuesChange={onValuesChange}>
<Form.Item
name="json_path"
label="JSONPath路径"
rules={[{ required: true, message: '请输入JSONPath' }]}
>
<Input placeholder="例如: $.data.items[*].name" />
</Form.Item>
<Form.Item name="extract_fields" label="提取字段">
<Select
mode="tags"
placeholder="输入字段名,支持嵌套访问如user.name"
style={{ width: '100%' }}
/>
</Form.Item>
<Form.Item name="strict_mode" label="严格模式" valuePropName="checked">
<Switch />
</Form.Item>
<Form.Item name="default_value" label="默认值">
<TextArea rows={3} placeholder="JSON格式的默认值" />
</Form.Item>
</Form>
);
};6.4 测试验证
python
# test/test_json_parser.py
def test_json_extraction():
param = JsonParserParam()
param.json_path = "$.data"
param.extract_fields = ["name", "age", "email"]
component = JsonParser(Mock(), "test", param)
# 模拟输入数据
input_data = pd.DataFrame([{
"content": '{"data": {"name": "张三", "age": 25, "email": "zhangsan@example.com", "address": "北京"}}'
}])
component.get_input = Mock(return_value=input_data)
result_df = component._run([])
# 验证结果
assert not result_df.empty
result = json.loads(result_df.iloc[0]["content"])
assert result["name"] == "张三"
assert result["age"] == 25
assert result["email"] == "zhangsan@example.com"
assert "address" not in result # 未在extract_fields中,应该被过滤6.5 部署上线
- 提交代码到版本控制系统
- 运行完整测试套件
- 构建并推送Docker镜像
- 更新部署环境
- 验证功能正常
总结
通过以上完整的开发流程,你可以:
- 快速创建新的工作流组件
- 保证质量通过完善的测试体系
- 易于维护通过清晰的代码结构和文档
- 灵活扩展支持各种复杂的业务需求
关键要点
- 后端优先:先实现后端逻辑,确保核心功能正确
- 类型安全:使用TypeScript和Python类型注解
- 测试驱动:编写充分的单元测试和集成测试
- 用户体验:前端界面要直观易用
- 文档完善:提供清晰的使用说明和API文档
最佳实践
- 参数验证要严格
- 错误处理要完善
- 代码要模块化
- 性能要优化
- 安全要考虑
这个开发指南涵盖了从环境准备到部署上线的完整流程,可以作为团队开发新组件的标准化参考文档。