Agent框架集成多模态能力底层实现

该项目处理多模态RAG返回图片的完整流程:

架构概述

该项目采用分层架构处理多模态RAG:

  1. 前端接口层:通过schema.py中的ImageContentImageUrl模型支持base64和HTTPS两种图片URL格式
  2. RAG核心层rag.py中的RagClient提供统一的向量检索接口
  3. 多模态嵌入层multi_model.py中的AliyunEmbeddings使用阿里云DashScope的多模态嵌入API
  4. 数据存储层:使用Qdrant向量数据库存储图片和文本的嵌入向量

图片处理流程

1. 图片存储阶段

在feishu-crawler子项目中,图片处理流程如下:

  • 图片下载DownloadImageTransform从飞书下载图片到本地文件系统
  • 图片摘要生成GenerateImageSummaryTransform使用VLLM模型为图片生成文字描述
  • 多模态嵌入EmbedImageTransform调用MultiModelEmbedder生成图片+文字的联合嵌入向量
  • 向量存储:将base64编码的图片数据、文字描述和嵌入向量一起存入Qdrant

2. 图片检索阶段

  • 多模态查询:支持纯文本查询,通过阿里云qwen2.5-vl-embedding模型生成查询嵌入向量
  • 相似度搜索:在Qdrant中进行向量相似度搜索,返回最相关的文档(包括图片)
  • 结果返回:使用response_format="content_and_artifact"格式,同时返回序列化内容和原始文档对象

3. 图片返回格式

图片以两种形式返回:

  1. Base64数据URL:格式为data:image/{mime_type};base64,{base64_data}
  2. 元数据信息:包括图片描述、原始token等元数据

关键技术实现

多模态嵌入

1
2
3
4
5
6
# 使用阿里云DashScope的多模态嵌入API
response = dashscope.MultiModalEmbedding.call(
model="qwen2.5-vl-embedding",
input=[{"text": caption}, {"image": encoded_image}],
api_key=api_key
)

配置支持

config.py中支持多种嵌入提供者:

  • EMBEDDING_PROVIDER_ALIYUN_MM:阿里云多模态嵌入
  • EMBEDDING_PROVIDER_INFINI:Infini自有嵌入
  • EMBEDDING_PROVIDER_OPENAI:OpenAI兼容接口

检索工具集成

RAG工具配置为返回content_and_artifact格式,使得Agent可以同时获取:

  • 序列化的文本内容(供LLM理解)
  • 原始文档对象(包含图片base64数据等元数据)

优势特点

  1. 统一接口:文本和图片使用相同的RAG接口进行检索
  2. 多模态支持:支持文本到图片、图片到文本的跨模态检索
  3. 灵活的存储:图片以base64格式存储,便于直接在前端显示
  4. 可扩展架构:支持多种嵌入提供者和向量数据库

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
# 配置多模态RAG
config.rag.settings = [
RagSetting(
embedding_provider=EMBEDDING_PROVIDER_ALIYUN_MM,
embedding_model="qwen2.5-vl-embedding",
collection_name="multimodal_collection"
)
]

# 检索时自动处理图片
retriever = rag_client.as_retriever()
results = retriever.invoke("查找相关图片")
# results中包含base64编码的图片数据

这个设计使得项目能够有效地处理多模态RAG场景,特别是在飞书文档爬取和检索的应用中,能够很好地处理文档中的图片内容。

引言

多模态(Multimodal)AI 已成为智能体系统的核心能力之一。Infini Agent Framework 基于 LangGraph 构建,在架构层面原生支持多模态输入输出。本文将深入剖析框架中多模态能力的底层实现机制,涵盖消息格式定义、传输协议、工具集成等关键技术细节。

1. 多模态消息格式定义

1.1 核心数据模型

框架在 infini_agent_framework.langgraph.schema 模块中定义了多模态消息的核心数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from pydantic import BaseModel, Field
from typing import Literal
import re

class TextContent(BaseModel):
"""文本内容模型,遵循 OpenAI 多模态 API 规范"""
type: Literal["text"] = Field(default="text", description="内容类型")
text: str = Field(..., description="文本内容")

class ImageUrl(BaseModel):
"""图像 URL 模型,支持 Base64 数据 URL 和 HTTPS URL"""
url: str = Field(..., description="图像 URL")

@field_validator("url")
@classmethod
def validate_url(cls, v: str) -> str:
# 验证 Base64 数据 URL 格式: data:image/{type};base64,{data}
base64_pattern = r"^data:image/[a-zA-Z0-9+/]+;base64,[A-Za-z0-9+/=]+$"
# 验证 HTTPS URL 格式
https_pattern = r"^https://.+"

if re.match(base64_pattern, v) or re.match(https_pattern, v):
return v
raise ValueError("URL 必须是 Base64 数据 URL 或 HTTPS URL")

class ImageContent(BaseModel):
"""图像内容模型"""
type: Literal["image_url"] = Field(default="image_url", description="内容类型")
image_url: ImageUrl = Field(..., description="图像 URL 内容")

@staticmethod
def from_base64(mime_type: str, base64_data: str) -> "ImageContent":
"""从 Base64 数据创建图像内容"""
return ImageContent(
image_url=ImageUrl(url=f"data:{mime_type};base64,{base64_data}")
)

1.2 消息容器

多模态消息通过 LangChain 的 HumanMessage 容器承载,支持混合内容类型:

1
2
3
4
5
6
7
8
9
10
11
12
from langchain_core.messages import HumanMessage

# 构建多模态消息
multimodal_message = HumanMessage(content=[
{"type": "text", "text": "请分析这张图片"},
{
"type": "image_url",
"image_url": {
"url": "data:image/jpeg;base64,/9j/4AAQSkZJRg..."
}
}
])

这里需要多模态大模型来支持这种编码形式

2. 客户端多模态处理

2.1 图像文件处理工具

框架提供了 ImageUtil 工具类,负责图像文件的读取和编码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import base64
import os
from pathlib import Path
from typing import Tuple, Optional

class ImageUtil:
"""图像文件处理工具"""

MIME_TYPES = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp",
}

@staticmethod
def read_image_file(image_path: str) -> Tuple[Optional[str], Optional[str]]:
"""读取图像文件并转换为 Base64 编码"""
expanded_path = os.path.expanduser(image_path)
path = Path(expanded_path)

if not path.exists() or not path.is_file():
return None, None

# 获取 MIME 类型
suffix = path.suffix.lower()
mime_type = ImageUtil.MIME_TYPES.get(suffix)
if not mime_type:
return None, None

# 读取并编码
with open(path, "rb") as f:
image_data = base64.b64encode(f.read()).decode("utf-8")

return image_data, mime_type

2.2 客户端消息发送

Client 类提供了多模态消息发送接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from typing import List
from infini_agent_framework.langgraph.schema import ImageContent

class Client:
def send_message(
self,
message: str,
images: List[ImageContent] = [],
stream: bool = False
) -> dict:
"""发送多模态消息

Args:
message: 文本消息内容
images: 图像内容列表
stream: 是否流式响应
"""
# 构建消息内容
content = [{"type": "text", "text": message}]
for image in images:
content.append({
"type": "image_url",
"image_url": {"url": image.image_url.url}
})

# 构建请求体
request_data = {
"messages": [{"role": "user", "content": content}],
"stream": stream
}

# 发送 HTTP 请求
response = self._post("/chat/completions", json=request_data)
return response.json()

3. 服务器端消息转换

3.1 Agent Manager 消息转换

服务器端的 AgentManager 负责将外部 API 请求转换为 LangGraph 内部格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from typing import Union
from infini_agent_framework.server.agent_manager import (
AgentManagerRequestMessage,
AgentManagerMultiContent,
AgentManagerRequestMessageContentType
)

class AgentManager:
def _transform_request_message(
self,
message: AgentManagerRequestMessage
) -> Union[str, list]:
"""转换 Agent Manager 消息为 LangGraph 格式"""

# 单文本消息优化
if (len(message.multi_content) == 1 and
message.multi_content[0].type == AgentManagerRequestMessageContentType.TEXT):
return message.multi_content[0].content.content

# 多模态消息处理
content = []
for item in message.multi_content:
if item.type == AgentManagerRequestMessageContentType.TEXT:
content.append({
"type": "text",
"text": item.content.content
})
elif item.type == AgentManagerRequestMessageContentType.IMAGE_URL:
# 验证图像 URL
if item.content.url and item.content.url.startswith("data:"):
self._validate_base64_image(item.content.url)

content.append({
"type": "image_url",
"image_url": {"url": item.content.url}
})

return content

def _validate_base64_image(self, data_url: str):
"""验证 Base64 图像数据 URL"""
pattern = r"^data:image/((jpeg)|(png)|(bmp));base64,[A-Za-z0-9+/=]+$"
if not re.match(pattern, data_url):
raise ValueError(f"无效的 Base64 图像数据 URL: {data_url}")

3.2 消息验证中间件

框架通过 Pydantic 模型验证和自定义验证器确保多模态消息的安全性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from pydantic import field_validator

class DefaultAgentChatRequest(BaseModel):
"""聊天请求模型,支持多模态内容"""

messages: list = Field(
...,
description="消息列表(支持多模态内容)",
min_length=1
)

@field_validator("messages")
@classmethod
def validate_multimodal_content(cls, v: list) -> list:
"""验证多模态消息内容"""
for message in v:
if isinstance(message.content, list):
for content_item in message.content:
if content_item.get("type") == "image_url":
url = content_item.get("image_url", {}).get("url", "")
if url.startswith("http://"):
raise ValueError("HTTP URL 不被允许,请使用 HTTPS")
if url.startswith("data:"):
# 验证 Base64 数据大小
base64_data = url.split(",", 1)[1] if "," in url else ""
if len(base64_data) > 10 * 1024 * 1024: # 10MB
raise ValueError("Base64 数据过大")
return v

4. MCP 工具集成机制

4.1 MCP 客户端实现

框架通过 McpToolClient 类管理多模态工具集成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_core.tools import BaseTool

class McpToolClient:
def __init__(self, config):
self.config = config
self.mcp_client: Optional[MultiServerMCPClient] = None
self.tools_by_server: Dict[str, List[BaseTool]] = {}

async def initialize(self):
"""初始化 MCP 客户端并加载工具"""
if not self.config.mcp.mcp_servers:
return

# 构建连接配置
connections = {}
for server_name, server_config in self.config.mcp.mcp_servers.items():
connections[server_name] = {
"transport": "streamable_http",
"url": server_config.url,
"headers": server_config.headers,
"timeout": server_config.timeout
}

# 创建 MCP 客户端
self.mcp_client = MultiServerMCPClient(connections=connections)

# 加载工具
await self._load_all_tools()

async def _load_server_tools(self, server_name: str):
"""加载指定服务器的工具"""
tools = await self.mcp_client.get_tools(server_name=server_name)

# 处理工具元数据
processed_tools = []
for tool in tools:
# 添加服务器名前缀避免冲突
if self.config.mcp.mcp_servers[server_name].wrap_tool_names:
tool.name = f"{server_name}.{tool.name}"

# 检查是否需要人工审核
if tool.metadata and tool.metadata.get("_meta", {}).get("mcp_tool_need_approval"):
tool = self._wrap_with_human_approval(tool)

processed_tools.append(tool)

self.tools_by_server[server_name] = processed_tools

4.2 多模态工具包装

框架提供了工具包装机制,支持人机协同和多模态处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from langchain_core.tools import tool

def create_image_analysis_tool():
"""创建图像分析工具"""

@tool
async def analyze_image(image_url: str, analysis_type: str = "general") -> str:
"""分析图像内容

Args:
image_url: 图像 URL(支持 Base64 数据 URL)
analysis_type: 分析类型(general/objects/text/faces)
"""
# 解析 URL 类型
if image_url.startswith("data:"):
# 处理 Base64 数据
mime_type, base64_data = image_url.split(";base64,")
mime_type = mime_type.replace("data:", "")
image_bytes = base64.b64decode(base64_data)
else:
# 处理远程 URL
import httpx
async with httpx.AsyncClient() as client:
response = await client.get(image_url)
image_bytes = response.content

# 调用图像分析服务
result = await _call_vision_api(image_bytes, analysis_type)
return result

return analyze_image

def _wrap_with_human_approval(tool: BaseTool) -> BaseTool:
"""为工具添加人工审核包装"""
from infini_agent_framework.langgraph.tools import add_human_in_the_loop

return add_human_in_the_loop(
tool,
allow_edit=True, # 允许编辑参数
)

5. 多模态 RAG 集成

5.1 多模态嵌入生成

框架扩展了 RAG 客户端以支持多模态嵌入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class MultimodalEmbeddings:
"""多模态嵌入生成器"""

def __init__(self, config):
self.config = config
self.text_embedding = self._create_text_embedding()
self.image_embedding = self._create_image_embedding()

async def embed_documents(self, documents: List[Document]) -> List[List[float]]:
"""为多模态文档生成嵌入"""
embeddings = []
for doc in documents:
if doc.type == "text":
embedding = await self.text_embedding.aembed_query(doc.content)
elif doc.type == "image":
embedding = await self.image_embedding.embed_image(doc.content)
else:
raise ValueError(f"不支持的文档类型: {doc.type}")
embeddings.append(embedding)
return embeddings

def _create_image_embedding(self):
"""创建图像嵌入模型"""
# 使用 CLIP 或其他多模态模型
from transformers import CLIPProcessor, CLIPModel
import torch

model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

class ImageEmbeddingWrapper:
async def embed_image(self, image_data: bytes) -> List[float]:
# 处理图像并生成嵌入
inputs = processor(images=image_data, return_tensors="pt")
with torch.no_grad():
image_features = model.get_image_features(**inputs)
return image_features[0].tolist()

return ImageEmbeddingWrapper()

5.2 多模态检索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class MultimodalRetriever:
"""多模态检索器"""

def __init__(self, vector_store, text_embedding, image_embedding):
self.vector_store = vector_store
self.text_embedding = text_embedding
self.image_embedding = image_embedding

async def retrieve(
self,
query: Union[str, bytes],
query_type: str = "text",
top_k: int = 5
) -> List[Document]:
"""检索相关文档"""

# 生成查询嵌入
if query_type == "text":
query_embedding = await self.text_embedding.aembed_query(query)
elif query_type == "image":
query_embedding = await self.image_embedding.embed_image(query)
else:
raise ValueError(f"不支持的查询类型: {query_type}")

# 向量检索
results = self.vector_store.similarity_search_by_vector(
query_embedding,
k=top_k
)

return results

6. 性能优化策略

6.1 图像预处理流水线

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class ImagePreprocessor:
"""图像预处理流水线"""

def __init__(self, max_size: Tuple[int, int] = (1024, 1024)):
self.max_size = max_size

async def preprocess(self, image_data: bytes) -> bytes:
"""预处理图像数据"""
from PIL import Image
import io

# 解码图像
image = Image.open(io.BytesIO(image_data))

# 调整大小
image.thumbnail(self.max_size, Image.Resampling.LANCZOS)

# 转换为 WebP 格式(如果支持)
output = io.BytesIO()
image.save(output, format="WEBP", quality=85)

return output.getvalue()

6.2 异步批处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
from typing import List

class AsyncBatchProcessor:
"""异步批处理器"""

def __init__(self, batch_size: int = 10, max_concurrent: int = 5):
self.batch_size = batch_size
self.max_concurrent = max_concurrent

async def process_batch(
self,
items: List,
process_func: callable
) -> List:
"""批量处理项目"""
semaphore = asyncio.Semaphore(self.max_concurrent)

async def process_with_semaphore(item):
async with semaphore:
return await process_func(item)

# 分批处理
results = []
for i in range(0, len(items), self.batch_size):
batch = items[i:i + self.batch_size]
tasks = [process_with_semaphore(item) for item in batch]
batch_results = await asyncio.gather(*tasks)
results.extend(batch_results)

return results

7. 扩展开发指南

7.1 添加新的模态支持

# 1. 定义新的内容模型
from pydantic import BaseModel, Field
from typing import Literal

class VideoContent(BaseModel):
    """视频内容模型"""
    type: Literal["video_url"] = Field(default="video_url")
    video_url: VideoUrl = Field(..., description="视频 URL")
    
    class VideoUrl(BaseModel):
        url: str = Field(..., description="视频 URL")
        
        @field_validator("url")
        @classmethod
        def validate_video_url(cls, v: str):
            # 验证视频 URL 格式
            allowed_formats = [".mp4", ".webm", ".mov"]
            if any(v.endswith(fmt) for fmt in allowed_formats):
                return v
            raise ValueError(f"不支持的视频格式,支持: {allowed_formats}")

# 2. 扩展消息转换逻辑
def extend_message_transformer(original_transformer):
    """扩展消息转换器以支持视频"""
    def extended_transformer(message):
        content = original_transformer(message)
        
        if isinstance(content, list):
            for item in message.multi_content:
                if item.type == "video_url":
                    content.append({
                        "type": "video_url",
                        "video_url": {"url": item.content.url}
                    })
        
        return content
    
    return extended_transformer

#