langfuse交互data和task交互原理

1. run_experiment 方法核心流程

根据 Langfuse.run_experiment() 方法的实现,data 和 task 的交互过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def run_experiment(
self,
*,
name: str,
run_name: Optional[str] = None,
description: Optional[str] = None,
data: ExperimentData, # 实验数据
task: TaskFunction, # 任务函数
evaluators: List[EvaluatorFunction] = [],
run_evaluators: List[RunEvaluatorFunction] = [],
max_concurrency: int = 50,
metadata: Optional[Dict[str, Any]] = None,
) -> ExperimentResult:

2. Data 和 Task 的交互过程

2.1 数据准备阶段

  • data: 可以是两种类型:
    • 字典列表:包含 input, expected_output, metadata
    • Langfuse DatasetItem 对象:来自 dataset.items

2.2 任务执行阶段

_process_experiment_item() 方法中:

1
2
3
4
5
6
7
8
9
10
async def _process_experiment_item(
self,
item: ExperimentItem, # 单个数据项
task: Callable, # 任务函数
evaluators: List[Callable], # 评估器
experiment_name: str,
experiment_run_name: str,
experiment_description: Optional[str],
experiment_metadata: Dict[str, Any],
) -> ExperimentItemResult:

关键交互步骤:

  1. 任务执行

    1
    output = await _run_task(task, item)
    • 调用 _run_task() 辅助函数
    • item 作为参数传递给 task 函数
    • 支持同步和异步任务函数
  2. 追踪记录

    1
    2
    3
    4
    5
    6
    with self.start_as_current_span(name=span_name) as span:
    span.update(
    input=input_data, # 从 item 中提取
    output=output, # task 函数的返回值
    metadata=final_metadata, # 合并的元数据
    )

3. 评估分数生成机制

3.1 评估器执行

在任务执行后,调用评估器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 运行评估器
evaluations = []
for evaluator in evaluators:
try:
expected_output = None
if isinstance(item, dict):
expected_output = item.get("expected_output")
elif hasattr(item, "expected_output"):
expected_output = item.expected_output

eval_results = await _run_evaluator(
evaluator,
input=input_data, # 输入数据
output=output, # 任务输出
expected_output=expected_output, # 期望输出
metadata=eval_metadata, # 元数据
)
evaluations.extend(eval_results)

3.2 评估分数存储

每个评估结果被转换为分数并存储:

1
2
3
4
5
6
7
8
9
10
11
# 存储评估结果为分数
for evaluation in eval_results:
self.create_score(
trace_id=trace_id, # 追踪ID
name=evaluation.name, # 评估名称
value=evaluation.value, # 评估值
comment=evaluation.comment, # 评估注释
metadata=evaluation.metadata, # 评估元数据
config_id=evaluation.config_id, # 配置ID
data_type=evaluation.data_type, # 数据类型
)

4. 完整的交互流程

sequenceDiagram
    participant D as Data Items
    participant T as Task Function
    participant E as Evaluators
    participant S as Score System
    participant L as Langfuse Tracing

    loop For each data item
        D->>T: item (input, expected_output, metadata)
        T->>T: Process item
        T->>L: Create trace span
        T->>L: Record input/output
        T->>E: Return output
        
        E->>E: Evaluate output
        E->>S: Generate evaluation results
        S->>L: Store scores with trace_id
    end
    
    Note over D,S: All executions are traced
and linked for analysis

5. 关键数据结构

5.1 评估结果结构

Evaluation 类:

1
2
3
4
5
6
7
class Evaluation(TypedDict, total=False):
name: str # 评估名称 (必需)
value: Union[float, str] # 评估值 (必需)
comment: Optional[str] # 评估注释
metadata: Optional[Any] # 评估元数据
config_id: Optional[str] # 配置ID
data_type: Optional[ScoreDataType] # 数据类型

5.2 实验项结果

ExperimentItemResult 类:

1
2
3
4
5
6
class ExperimentItemResult(TypedDict, total=False):
item: ExperimentItem # 原始数据项
output: Any # 任务输出
evaluations: List[Evaluation] # 评估结果列表
trace_id: Optional[str] # 追踪ID
dataset_run_id: Optional[str] # 数据集运行ID

6. 评估分数生成示例

假设有以下配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 数据项
data_item = {
"input": "What is the capital of France?",
"expected_output": "Paris",
"metadata": {"difficulty": "easy"}
}

# 任务函数
def answer_question(*, item, **kwargs):
return "The capital of France is Paris."

# 评估器
def accuracy_evaluator(*, input, output, expected_output=None, **kwargs):
if expected_output and expected_output.lower() in output.lower():
return {
"name": "accuracy",
"value": 1.0,
"comment": "Correct answer",
"data_type": "NUMERIC"
}
return {
"name": "accuracy",
"value": 0.0,
"comment": "Incorrect answer",
"data_type": "NUMERIC"
}

交互过程:

  1. data_item 传递给 answer_question 任务函数
  2. 任务返回输出:“The capital of France is Paris.”
  3. accuracy_evaluator 接收:
    • input: “What is the capital of France?”
    • output: “The capital of France is Paris.”
    • expected_output: “Paris”
  4. 评估器检查 “Paris” 是否在输出中,返回准确率分数 1.0
  5. 分数被存储到 Langfuse,关联到对应的追踪ID

7. 总结

Data 和 Task 的交互机制:

  • Data 提供输入:每个数据项包含输入、期望输出和元数据
  • Task 处理数据:接收数据项,执行处理逻辑,返回输出
  • Evaluators 评估输出:基于任务输出和期望输出生成评估分数
  • Scores 存储结果:评估分数被存储并与追踪记录关联

评估分数生成:

  • 通过评估器函数计算
  • 基于任务输出与期望输出的比较
  • 支持多种数据类型(NUMERIC, BOOLEAN, CATEGORICAL)
  • 自动关联到对应的追踪记录

这种设计使得 Langfuse 能够自动追踪实验执行过程,生成详细的评估指标,并提供完整的可观测性。