Appearance
RAGFlow 工作流编排系统深度架构分析
概述
RAGFlow的工作流编排系统是一个基于React Flow的可视化AI工作流平台,支持拖拽式组件编排、实时执行、和复杂的条件逻辑。本文档提供了前后端开发的详细分析和开发指南。
1. 系统整体架构
mermaid
graph TB
subgraph "前端 Frontend (React/TypeScript)"
A[React Flow 画布] --> B[节点组件库]
B --> C[配置表单系统]
C --> D[状态管理 Zustand]
D --> E[API服务层]
E --> F[SSE实时通信]
end
subgraph "后端 Backend (Python/Flask)"
G[Canvas API] --> H[工作流执行引擎]
H --> I[组件注册系统]
I --> J[组件基类架构]
J --> K[67种组件实现]
K --> L[LLM集成层]
end
subgraph "数据层 Data Layer"
M[工作流DSL JSON]
N[组件参数配置]
O[执行状态存储]
P[消息历史记录]
end
F --> G
H --> M
I --> N
H --> O
F --> P
subgraph "外部服务 External Services"
Q[LLM提供商]
R[搜索引擎]
S[数据库]
T[第三方API]
end
L --> Q
K --> R
K --> S
K --> T2. 前端架构详解
2.1 技术栈
- React Flow:
@xyflow/react- 可视化节点编辑器核心 - 状态管理: Zustand + Immer - 响应式状态管理
- UI框架: Ant Design + Radix UI + Tailwind CSS
- 表单系统: React Hook Form + 自定义验证
- 实时通信: Server-Sent Events (SSE)
2.2 目录结构
web/src/pages/flow/
├── index.tsx # 主工作流页面
├── canvas/ # React Flow 画布组件
│ ├── node/ # 节点组件实现
│ ├── edge/ # 连线组件
│ └── sidebar/ # 组件面板
├── form/ # 节点配置表单
│ ├── begin-form/ # 开始节点表单
│ ├── generate-form/ # 生成节点表单
│ └── ... # 67种组件表单
├── hooks/ # 自定义Hooks
├── store.ts # Zustand状态管理
├── constant.tsx # 组件定义常量
└── interface.ts # TypeScript接口2.3 节点系统架构
mermaid
classDiagram
class Operator {
<<enumeration>>
+Begin
+Generate
+Retrieval
+Categorize
+Switch
+...67_operators
}
class RAGFlowNodeType {
+id: string
+type: string
+position: Position
+data: NodeData
}
class Position {
+x: number
+y: number
}
class NodeData {
+label: Operator
+name: string
+form: FormData
}
class FormData {
+[key: string]: any
}
class RagNode {
+handles: Handle[]
+render(): JSX.Element
+onNodeClick(): void
}
class NodeConfiguration {
+nodeTypes: NodeTypes
+FormMap: ComponentMap
+OperatorMap: OperatorRecord
}
Operator --> RAGFlowNodeType
RAGFlowNodeType --> NodeData
RAGFlowNodeType --> Position
NodeData --> FormData
RagNode --> RAGFlowNodeType
NodeConfiguration --> Operator2.4 节点开发流程
步骤1: 定义操作符类型
typescript
// constant.tsx
export enum Operator {
MyNewOperator = 'MyNewOperator'
}步骤2: 创建节点组件
typescript
// canvas/node/my-new-node.tsx
interface IMyNewNode {
id: string;
data: {
label: Operator.MyNewOperator;
name: string;
form: IMyNewNodeForm;
};
}
export function MyNewNode({ id, data, isConnectable, selected }: NodeProps<IMyNewNode>) {
const { styles } = useFlowStore();
return (
<section className={`${styles.ragNode} ${selected ? styles.selectedNode : ''}`}>
<Handle type="target" position={Position.Left} isConnectable={isConnectable} />
<Handle type="source" position={Position.Right} isConnectable={isConnectable} />
<NodeHeader id={id} name={data.name} label={data.label} />
<div className="node-body">
{/* 自定义节点内容 */}
<span className="text-sm text-gray-600">
{data.form.description || '自定义节点描述'}
</span>
</div>
</section>
);
}步骤3: 创建配置表单
typescript
// form/my-new-form/index.tsx
interface IMyNewNodeForm {
description: string;
enabled: boolean;
parameters: Record<string, any>;
}
const MyNewForm = ({ onValuesChange, form }: IOperatorForm) => {
return (
<Form form={form} onValuesChange={onValuesChange} layout="vertical">
<Form.Item
name="description"
label="描述"
rules={[{ required: true, message: '请输入描述' }]}
>
<Input.TextArea rows={3} placeholder="请输入节点描述" />
</Form.Item>
<Form.Item name="enabled" label="启用" valuePropName="checked">
<Switch />
</Form.Item>
<Form.Item name="parameters" label="参数配置">
<JsonEditor />
</Form.Item>
</Form>
);
};步骤4: 注册组件
typescript
// constant.tsx更新映射关系
export const NodeMap = {
[Operator.MyNewOperator]: 'myNewNode'
};
export const nodeTypes: NodeTypes = {
myNewNode: MyNewNode
};
const FormMap = {
[Operator.MyNewOperator]: MyNewForm
};
export const operatorIcon: Record<Operator, string> = {
[Operator.MyNewOperator]: 'my-new-icon.svg'
};
export const OperatorDescription = {
[Operator.MyNewOperator]: '自定义操作节点,用于执行特定业务逻辑'
};2.5 数据流管理
mermaid
sequenceDiagram
participant User
participant Canvas
participant NodeForm
participant ZustandStore
participant APIService
User->>Canvas: 拖拽节点到画布
Canvas->>ZustandStore: addNode(nodeData)
ZustandStore->>Canvas: 更新节点状态
User->>NodeForm: 配置节点参数
NodeForm->>ZustandStore: updateNodeForm(nodeId, formData)
ZustandStore->>Canvas: 触发重新渲染
User->>Canvas: 连接节点
Canvas->>ZustandStore: addEdge(edgeData)
ZustandStore->>Canvas: 验证连接有效性
User->>Canvas: 执行工作流
Canvas->>APIService: runCanvas(dslJson)
APIService->>ZustandStore: 实时更新执行状态3. 后端架构详解
3.1 组件系统架构
mermaid
classDiagram
class ComponentBase {
<<abstract>>
+component_id: str
+_param: ComponentParamBase
+_canvas: Canvas
+_upstream: StringList
+_downstream: StringList
+_run()* DataFrame
+get_input() DataFrame
+output(start_at: int) DataFrame
}
class ComponentParamBase {
+_vars: Dict
+_req_vars: Set[str]
+_deprecated: Dict
+update(d: Dict)
+to_dict() Dict
+get_value(k: str)
}
class Generate {
+llm_id: str
+prompt: str
+cite: bool
+_run() DataFrame
}
class Retrieval {
+kb_ids: StringList
+similarity_threshold: float
+keywords_similarity_weight: float
+_run() DataFrame
}
class Begin {
+prologue: str
+_run() DataFrame
}
class Answer {
+_run() DataFrame
}
class Switch {
+conditions: ConditionList
+_run() DataFrame
}
ComponentBase <|-- Generate
ComponentBase <|-- Retrieval
ComponentBase <|-- Begin
ComponentBase <|-- Answer
ComponentBase <|-- Switch
ComponentParamBase <|-- GenerateParam
ComponentParamBase <|-- RetrievalParam3.2 组件注册系统
python
# agent/component/__init__.py
def component_class(class_name: str):
"""动态组件类加载工厂"""
m = importlib.import_module("agent.component")
c = getattr(m, class_name)
return c
# 使用示例
component_class("Generate") # 返回 Generate 类
component_class("GenerateParam") # 返回 GenerateParam 类3.3 后端组件开发流程
步骤1: 定义参数类
python
# agent/component/my_new_component.py
class MyNewComponentParam(ComponentParamBase):
"""
自定义组件参数定义
"""
def __init__(self):
super().__init__()
self.description = ""
self.enabled = True
self.parameters = {}
def check(self):
"""参数验证"""
self.check_empty(["description"], "Description is required")
self.check_valid_value(self.enabled, "Enabled", [True, False])步骤2: 实现组件类
python
class MyNewComponent(ComponentBase):
"""
自定义组件实现
"""
component_name = "MyNewComponent"
def _run(self, history, **kwargs):
"""
组件执行逻辑
Args:
history: 对话历史
**kwargs: 额外参数
Returns:
DataFrame: 组件输出数据
"""
# 获取上游组件输入
input_df = self.get_input()
# 执行自定义逻辑
result_content = self.process_data(input_df)
# 返回标准DataFrame格式
return pd.DataFrame([{
"content": result_content,
"component_id": self._id,
"reference": []
}])
def process_data(self, input_df):
"""处理数据的具体逻辑"""
if self._param.enabled:
# 执行启用状态下的逻辑
processed_data = f"Processed: {input_df['content'].iloc[0]}"
return processed_data
else:
# 返回原始数据
return input_df['content'].iloc[0]步骤3: 注册到系统
python
# 在 agent/component/__init__.py 中确保组件可被导入
from .my_new_component import MyNewComponent, MyNewComponentParam
__all__ = [
"MyNewComponent",
"MyNewComponentParam",
# ... 其他组件
]3.4 执行引擎架构
mermaid
graph TB
subgraph "Canvas 执行引擎"
A[Canvas.run()] --> B[解析DSL JSON]
B --> C[初始化组件]
C --> D[构建依赖图]
D --> E[准备执行队列]
E --> F[依次执行组件]
F --> G[处理特殊组件]
G --> H[更新执行路径]
H --> I[流式返回结果]
end
subgraph "组件执行流程"
J[检查依赖] --> K[获取输入数据]
K --> L[执行组件逻辑]
L --> M[验证输出格式]
M --> N[更新组件状态]
N --> O[通知下游组件]
end
subgraph "特殊组件处理"
P[Switch组件] --> Q[条件评估]
Q --> R[动态路由]
S[Categorize组件] --> T[分类执行]
U[Iteration组件] --> V[循环控制]
end
F --> J
G --> P
G --> S
G --> U4. 前后端数据协议
4.1 DSL JSON结构
typescript
interface WorkflowDSL {
// 组件定义
components: {
[componentId: string]: {
obj: {
component_name: string; // 组件类型名称
params: Record<string, any>; // 组件参数
};
downstream: string[]; // 下游组件ID列表
upstream: string[]; // 上游组件ID列表
parent_id?: string; // 父组件ID(用于迭代)
};
};
// 执行状态
history: Message[]; // 对话历史
path: string[][]; // 执行路径
answer: AnswerComponent[]; // 等待回答的组件
// 可视化数据
graph: {
nodes: ReactFlowNode[]; // 前端节点数据
edges: ReactFlowEdge[]; // 前端连线数据
};
// 引用和消息
reference: Reference[]; // 文档引用
messages: Message[]; // 消息记录
globals: Record<string, any>; // 全局变量
}4.2 API数据格式
工作流执行请求
typescript
interface RunWorkflowRequest {
canvas_id: string; // 工作流ID
message: string; // 用户输入消息
message_id: string; // 消息唯一ID
stream: boolean; // 是否流式返回
**kwargs: any; // 其他参数
}流式响应格式
typescript
interface StreamResponse {
code: number; // 响应状态码
data: {
answer: string; // 当前回答内容
running_status?: boolean; // 是否正在运行
reference?: Reference[]; // 引用文档
path?: string[]; // 当前执行路径
};
}5. 实时通信架构
mermaid
sequenceDiagram
participant Frontend
participant FlaskAPI
participant Canvas
participant Component
Frontend->>FlaskAPI: POST /v1/canvas/completion
FlaskAPI->>Canvas: canvas.run(stream=True)
loop 组件执行
Canvas->>Component: component._run()
Component->>Canvas: return DataFrame
Canvas->>FlaskAPI: yield streaming_data
FlaskAPI->>Frontend: SSE: data: {"answer": "...", "running_status": true}
Frontend->>Frontend: 更新UI状态
end
Canvas->>FlaskAPI: yield final_result
FlaskAPI->>Frontend: SSE: data: {"answer": "final", "reference": [...]}
Frontend->>Frontend: 显示最终结果5.1 SSE实现细节
后端流式响应
python
@canvas_app.route("/v1/canvas/completion", methods=["POST"])
@login_required
def completion():
req = request.json
canvas_id = req.get("canvas_id")
def sse():
for ans in canvas.run(stream=True, **req):
if ans.get("running_status"):
# 中间状态推送
yield f"data: {json.dumps({
'code': 0,
'data': {
'answer': ans['content'],
'running_status': True,
'component_id': ans.get('component_id')
}
})}\n\n"
else:
# 最终结果推送
yield f"data: {json.dumps({
'code': 0,
'data': {
'answer': ans['content'],
'reference': ans.get('reference', []),
'path': ans.get('path', [])
}
})}\n\n"
return Response(sse(), mimetype='text/event-stream')前端SSE消费
typescript
export const useSendMessageWithSse = () => {
const send = async (params: RunWorkflowRequest) => {
const response = await fetch('/v1/canvas/completion', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${getToken()}`
},
body: JSON.stringify(params)
});
const reader = response.body
?.pipeThrough(new TextDecoderStream())
?.pipeThrough(new EventSourceParserStream())
?.getReader();
while (true) {
const { done, value } = await reader?.read();
if (done) break;
if (value?.type === 'event') {
const data = JSON.parse(value.data);
// 更新执行状态
if (data.data.running_status) {
setExecutionStatus('running');
setCurrentAnswer(data.data.answer);
} else {
setExecutionStatus('completed');
setFinalAnswer(data.data.answer);
setReferences(data.data.reference);
}
}
}
};
return { send };
};6. 节点样式和配置系统
6.1 节点样式架构
mermaid
graph TB
subgraph "样式系统"
A[节点基础样式] --> B[主题适配]
B --> C[状态样式]
C --> D[图标系统]
D --> E[动画效果]
end
subgraph "配置系统"
F[表单组件库] --> G[验证规则]
G --> H[条件显示]
H --> I[数据绑定]
I --> J[实时预览]
end
subgraph "交互系统"
K[点击编辑] --> L[拖拽排列]
L --> M[右键菜单]
M --> N[快捷键]
N --> O[撤销重做]
end
A --> F
F --> K6.2 节点样式定制
CSS类结构
less
// web/src/pages/flow/index.less
.ragNode {
min-width: 200px;
background: white;
border: 2px solid #e5e7eb;
border-radius: 8px;
box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1);
&.selected {
border-color: #3b82f6;
box-shadow: 0 0 0 3px rgba(59, 130, 246, 0.1);
}
&.error {
border-color: #ef4444;
background-color: #fef2f2;
}
&.running {
border-color: #f59e0b;
animation: pulse 2s infinite;
}
.node-header {
display: flex;
align-items: center;
padding: 12px;
border-bottom: 1px solid #f3f4f6;
.node-icon {
width: 20px;
height: 20px;
margin-right: 8px;
}
.node-title {
font-weight: 500;
color: #374151;
}
}
.node-body {
padding: 12px;
.node-description {
font-size: 12px;
color: #6b7280;
margin-bottom: 8px;
}
.node-status {
display: flex;
align-items: center;
font-size: 11px;
&.success { color: #10b981; }
&.error { color: #ef4444; }
&.running { color: #f59e0b; }
}
}
}主题适配系统
typescript
// web/src/components/theme-provider.tsx
const useNodeTheme = () => {
const { theme } = useTheme();
return {
nodeStyles: {
backgroundColor: theme === 'dark' ? '#1f2937' : '#ffffff',
borderColor: theme === 'dark' ? '#374151' : '#e5e7eb',
textColor: theme === 'dark' ? '#f9fafb' : '#374151',
},
selectedStyles: {
borderColor: theme === 'dark' ? '#60a5fa' : '#3b82f6',
shadowColor: theme === 'dark' ? 'rgba(96, 165, 250, 0.2)' : 'rgba(59, 130, 246, 0.1)',
}
};
};6.3 配置面板架构
动态表单系统
typescript
// web/src/components/dynamic-form/index.tsx
interface DynamicFormProps {
schema: FormSchema;
values: Record<string, any>;
onChange: (values: Record<string, any>) => void;
}
interface FormSchema {
fields: FormField[];
layout: 'vertical' | 'horizontal';
sections?: FormSection[];
}
interface FormField {
name: string;
type: 'input' | 'select' | 'textarea' | 'switch' | 'slider' | 'custom';
label: string;
required?: boolean;
dependencies?: string[];
validation?: ValidationRule[];
conditional?: ConditionalRule;
componentProps?: any;
}
const DynamicForm: React.FC<DynamicFormProps> = ({ schema, values, onChange }) => {
const [form] = Form.useForm();
const renderField = (field: FormField) => {
switch (field.type) {
case 'input':
return <Input {...field.componentProps} />;
case 'select':
return <Select {...field.componentProps} />;
case 'textarea':
return <Input.TextArea {...field.componentProps} />;
case 'switch':
return <Switch {...field.componentProps} />;
case 'slider':
return <Slider {...field.componentProps} />;
case 'custom':
return field.componentProps.component;
default:
return <Input />;
}
};
return (
<Form form={form} layout={schema.layout} onValuesChange={onChange}>
{schema.fields.map(field => (
<Form.Item
key={field.name}
name={field.name}
label={field.label}
required={field.required}
rules={field.validation}
hidden={!isFieldVisible(field, values)}
>
{renderField(field)}
</Form.Item>
))}
</Form>
);
};配置面板组件
typescript
// web/src/pages/flow/components/node-config-panel.tsx
interface NodeConfigPanelProps {
node: RAGFlowNodeType;
onClose: () => void;
onSave: (nodeId: string, formData: any) => void;
}
const NodeConfigPanel: React.FC<NodeConfigPanelProps> = ({ node, onClose, onSave }) => {
const [form] = Form.useForm();
const [formValues, setFormValues] = useState(node.data.form);
// 根据节点类型获取表单组件
const FormComponent = FormMap[node.data.label];
// 实时保存表单数据
const handleValuesChange = useCallback((changedValues: any, allValues: any) => {
setFormValues(allValues);
onSave(node.id, allValues);
}, [node.id, onSave]);
return (
<div className="node-config-panel">
<div className="panel-header">
<h3>{node.data.name} 配置</h3>
<Button type="text" icon={<CloseOutlined />} onClick={onClose} />
</div>
<div className="panel-body">
{FormComponent && (
<FormComponent
form={form}
onValuesChange={handleValuesChange}
initialValues={formValues}
/>
)}
</div>
<div className="panel-footer">
<Space>
<Button onClick={onClose}>取消</Button>
<Button type="primary" onClick={() => form.submit()}>确定</Button>
</Space>
</div>
</div>
);
};7. 工作流执行和调试
7.1 执行状态管理
typescript
// web/src/stores/flow-execution-store.ts
interface ExecutionState {
status: 'idle' | 'running' | 'completed' | 'error';
currentStep: string | null;
executedSteps: string[];
stepResults: Record<string, any>;
streamingContent: string;
finalResult: string;
references: Reference[];
errors: ExecutionError[];
}
const useExecutionStore = create<ExecutionState & ExecutionActions>((set, get) => ({
// 状态
status: 'idle',
currentStep: null,
executedSteps: [],
stepResults: {},
streamingContent: '',
finalResult: '',
references: [],
errors: [],
// 动作
startExecution: (workflowId: string) => {
set({ status: 'running', executedSteps: [], errors: [] });
},
updateCurrentStep: (stepId: string, content: string) => {
set(state => ({
currentStep: stepId,
streamingContent: content,
executedSteps: [...new Set([...state.executedSteps, stepId])]
}));
},
completeExecution: (result: string, references: Reference[]) => {
set({
status: 'completed',
finalResult: result,
references,
currentStep: null
});
},
handleError: (error: ExecutionError) => {
set(state => ({
status: 'error',
errors: [...state.errors, error]
}));
}
}));7.2 调试功能实现
单步调试
typescript
// web/src/hooks/use-debug-execution.ts
const useDebugExecution = () => {
const [debugMode, setDebugMode] = useState(false);
const [breakpoints, setBreakpoints] = useState<Set<string>>(new Set());
const [currentBreakpoint, setCurrentBreakpoint] = useState<string | null>(null);
const debugSingleComponent = async (componentId: string, inputs: any) => {
try {
const response = await api.debugComponent({
component_id: componentId,
inputs: inputs,
canvas_id: currentCanvasId
});
return response.data;
} catch (error) {
console.error('Debug component failed:', error);
throw error;
}
};
const toggleBreakpoint = (componentId: string) => {
setBreakpoints(prev => {
const newBreakpoints = new Set(prev);
if (newBreakpoints.has(componentId)) {
newBreakpoints.delete(componentId);
} else {
newBreakpoints.add(componentId);
}
return newBreakpoints;
});
};
const continueExecution = () => {
setCurrentBreakpoint(null);
// 继续执行工作流
};
return {
debugMode,
setDebugMode,
breakpoints,
currentBreakpoint,
debugSingleComponent,
toggleBreakpoint,
continueExecution
};
};执行日志系统
typescript
// web/src/components/execution-logger.tsx
interface ExecutionLog {
id: string;
timestamp: Date;
componentId: string;
componentName: string;
level: 'info' | 'warning' | 'error';
message: string;
data?: any;
}
const ExecutionLogger: React.FC = () => {
const [logs, setLogs] = useState<ExecutionLog[]>([]);
const [filter, setFilter] = useState<'all' | 'info' | 'warning' | 'error'>('all');
const addLog = useCallback((log: Omit<ExecutionLog, 'id' | 'timestamp'>) => {
setLogs(prev => [...prev, {
...log,
id: uuid(),
timestamp: new Date()
}]);
}, []);
const filteredLogs = useMemo(() => {
return filter === 'all' ? logs : logs.filter(log => log.level === filter);
}, [logs, filter]);
return (
<div className="execution-logger">
<div className="logger-header">
<h4>执行日志</h4>
<Select value={filter} onChange={setFilter}>
<Option value="all">全部</Option>
<Option value="info">信息</Option>
<Option value="warning">警告</Option>
<Option value="error">错误</Option>
</Select>
</div>
<div className="logger-content">
{filteredLogs.map(log => (
<div key={log.id} className={`log-entry ${log.level}`}>
<span className="log-timestamp">
{log.timestamp.toLocaleTimeString()}
</span>
<span className="log-component">{log.componentName}</span>
<span className="log-message">{log.message}</span>
{log.data && (
<details className="log-data">
<summary>详细数据</summary>
<pre>{JSON.stringify(log.data, null, 2)}</pre>
</details>
)}
</div>
))}
</div>
</div>
);
};8. 性能优化和最佳实践
8.1 前端性能优化
虚拟化大型工作流
typescript
// web/src/components/virtual-canvas.tsx
const VirtualCanvas = React.memo(() => {
const { nodes, edges } = useFlowStore();
const [viewport, setViewport] = useState({ x: 0, y: 0, zoom: 1 });
// 只渲染视窗内的节点
const visibleNodes = useMemo(() => {
return nodes.filter(node => {
const nodeRect = {
x: node.position.x,
y: node.position.y,
width: 200,
height: 100
};
return isInViewport(nodeRect, viewport);
});
}, [nodes, viewport]);
return (
<ReactFlow
nodes={visibleNodes}
edges={edges}
onViewportChange={setViewport}
nodeTypes={nodeTypes}
/>
);
});节点懒加载
typescript
// web/src/components/lazy-node.tsx
const LazyNode = React.lazy(() => import('./heavy-node-component'));
const NodeWrapper: React.FC<NodeProps> = (props) => {
const [shouldLoad, setShouldLoad] = useState(false);
const nodeRef = useRef<HTMLDivElement>(null);
useEffect(() => {
const observer = new IntersectionObserver(
(entries) => {
if (entries[0].isIntersecting) {
setShouldLoad(true);
observer.disconnect();
}
},
{ threshold: 0.1 }
);
if (nodeRef.current) {
observer.observe(nodeRef.current);
}
return () => observer.disconnect();
}, []);
return (
<div ref={nodeRef}>
{shouldLoad ? (
<Suspense fallback={<NodeSkeleton />}>
<LazyNode {...props} />
</Suspense>
) : (
<NodeSkeleton />
)}
</div>
);
};8.2 后端性能优化
组件并行执行
python
# agent/canvas.py
class Canvas:
async def run_parallel(self, **kwargs):
"""并行执行独立的组件分支"""
# 构建依赖图
dependency_graph = self.build_dependency_graph()
# 找出可以并行执行的组件组
parallel_groups = self.find_parallel_groups(dependency_graph)
for group in parallel_groups:
if len(group) > 1:
# 并行执行
tasks = [
asyncio.create_task(self.run_component(comp_id))
for comp_id in group
]
results = await asyncio.gather(*tasks)
# 更新组件输出
for comp_id, result in zip(group, results):
self.components[comp_id]["result"] = result
else:
# 串行执行单个组件
comp_id = group[0]
result = await self.run_component(comp_id)
self.components[comp_id]["result"] = result
def find_parallel_groups(self, dependency_graph):
"""找出可以并行执行的组件组"""
groups = []
visited = set()
for comp_id in dependency_graph:
if comp_id not in visited:
group = self.find_independent_components(comp_id, dependency_graph)
groups.append(group)
visited.update(group)
return groups结果缓存系统
python
# agent/component/base.py
class ComponentBase:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._cache = {}
self._cache_enabled = True
def _run(self, history, **kwargs):
if not self._cache_enabled:
return self._execute(history, **kwargs)
# 生成缓存键
cache_key = self._generate_cache_key(history, **kwargs)
# 检查缓存
if cache_key in self._cache:
return self._cache[cache_key]
# 执行并缓存结果
result = self._execute(history, **kwargs)
self._cache[cache_key] = result
return result
def _generate_cache_key(self, history, **kwargs):
"""生成基于输入的缓存键"""
import hashlib
# 组合所有影响输出的输入
cache_input = {
'params': self._param.to_dict(),
'history': str(history),
'upstream_outputs': self.get_input().to_dict()
}
cache_str = json.dumps(cache_input, sort_keys=True)
return hashlib.md5(cache_str.encode()).hexdigest()9. 扩展开发指南
9.1 添加新的节点类型
完整开发清单
后端组件开发
- [ ] 创建参数类
MyComponentParam - [ ] 实现组件类
MyComponent - [ ] 添加到
__init__.py导出 - [ ] 编写单元测试
- [ ] 创建参数类
前端节点开发
- [ ] 定义操作符枚举
Operator.MyComponent - [ ] 创建节点组件
MyComponentNode - [ ] 创建配置表单
MyComponentForm - [ ] 更新映射关系和图标
- [ ] 定义操作符枚举
样式和交互
- [ ] 设计节点样式
- [ ] 实现连接规则
- [ ] 添加右键菜单
- [ ] 配置工具提示
测试和文档
- [ ] 前端组件测试
- [ ] 后端组件测试
- [ ] 集成测试
- [ ] 使用文档
9.2 自定义主题开发
typescript
// web/src/themes/custom-theme.ts
export const customTheme = {
colors: {
primary: '#6366f1',
secondary: '#8b5cf6',
success: '#10b981',
warning: '#f59e0b',
error: '#ef4444',
background: '#ffffff',
surface: '#f9fafb',
text: '#1f2937',
textSecondary: '#6b7280',
},
nodeStyles: {
defaultNode: {
backgroundColor: '#ffffff',
borderColor: '#e5e7eb',
borderRadius: '8px',
borderWidth: '2px',
boxShadow: '0 4px 6px -1px rgba(0, 0, 0, 0.1)',
},
selectedNode: {
borderColor: '#6366f1',
boxShadow: '0 0 0 3px rgba(99, 102, 241, 0.1)',
},
runningNode: {
borderColor: '#f59e0b',
animation: 'pulse 2s infinite',
},
errorNode: {
borderColor: '#ef4444',
backgroundColor: '#fef2f2',
}
},
edgeStyles: {
default: {
stroke: '#6b7280',
strokeWidth: 2,
},
selected: {
stroke: '#6366f1',
strokeWidth: 3,
},
animated: {
strokeDasharray: '5,5',
animation: 'dash 1s linear infinite',
}
}
};9.3 插件系统设计
typescript
// web/src/plugins/plugin-system.ts
interface FlowPlugin {
name: string;
version: string;
description: string;
// 节点扩展
nodeTypes?: Record<string, React.ComponentType>;
// 工具栏扩展
toolbarItems?: ToolbarItem[];
// 右键菜单扩展
contextMenuItems?: ContextMenuItem[];
// 生命周期钩子
onInstall?: () => void;
onUninstall?: () => void;
onCanvasMount?: (canvas: ReactFlowInstance) => void;
onCanvasUnmount?: () => void;
// 事件处理
onNodeCreate?: (node: Node) => void;
onNodeUpdate?: (node: Node) => void;
onNodeDelete?: (nodeId: string) => void;
onEdgeCreate?: (edge: Edge) => void;
onEdgeDelete?: (edgeId: string) => void;
}
class PluginManager {
private plugins: Map<string, FlowPlugin> = new Map();
registerPlugin(plugin: FlowPlugin) {
this.plugins.set(plugin.name, plugin);
plugin.onInstall?.();
}
unregisterPlugin(pluginName: string) {
const plugin = this.plugins.get(pluginName);
if (plugin) {
plugin.onUninstall?.();
this.plugins.delete(pluginName);
}
}
getNodeTypes(): Record<string, React.ComponentType> {
const nodeTypes = {};
for (const plugin of this.plugins.values()) {
if (plugin.nodeTypes) {
Object.assign(nodeTypes, plugin.nodeTypes);
}
}
return nodeTypes;
}
getToolbarItems(): ToolbarItem[] {
const items = [];
for (const plugin of this.plugins.values()) {
if (plugin.toolbarItems) {
items.push(...plugin.toolbarItems);
}
}
return items;
}
}
export const pluginManager = new PluginManager();10. 总结
RAGFlow的工作流编排系统是一个高度模块化、可扩展的平台,具有以下特点:
架构优势
- 前后端分离: React Flow + Python Canvas 的清晰分工
- 组件化设计: 67种预置组件,易于扩展和维护
- 实时通信: SSE流式通信提供良好的用户体验
- 类型安全: TypeScript + Python类型注解确保代码质量
开发优势
- 低代码门槛: 可视化编排降低AI应用开发难度
- 高度可定制: 支持自定义节点、样式、主题
- 丰富的调试功能: 单步调试、日志系统、性能监控
- 良好的开发体验: 热重载、类型提示、错误处理
技术亮点
- 智能连接验证: 防止无效连接和循环依赖
- 动态表单系统: 根据节点类型自动生成配置界面
- 并行执行优化: 自动识别可并行执行的组件分支
- 缓存机制: 避免重复计算提高执行效率
这个系统为AI应用开发提供了一个强大而灵活的平台,既满足了可视化编排的易用性需求,也保持了足够的技术深度和扩展能力。
本分析文档基于RAGFlow v0.19.1版本,涵盖了工作流编排系统的核心架构和开发实践。