Agent框架集成多模态能力底层实现
该项目处理多模态RAG返回图片的完整流程:
架构概述
该项目采用分层架构处理多模态RAG:
- 前端接口层:通过
schema.py中的ImageContent和ImageUrl模型支持base64和HTTPS两种图片URL格式
- RAG核心层:
rag.py中的RagClient提供统一的向量检索接口
- 多模态嵌入层:
multi_model.py中的AliyunEmbeddings使用阿里云DashScope的多模态嵌入API
- 数据存储层:使用Qdrant向量数据库存储图片和文本的嵌入向量
图片处理流程
1. 图片存储阶段
在feishu-crawler子项目中,图片处理流程如下:
- 图片下载:
DownloadImageTransform从飞书下载图片到本地文件系统
- 图片摘要生成:
GenerateImageSummaryTransform使用VLLM模型为图片生成文字描述
- 多模态嵌入:
EmbedImageTransform调用MultiModelEmbedder生成图片+文字的联合嵌入向量
- 向量存储:将base64编码的图片数据、文字描述和嵌入向量一起存入Qdrant
2. 图片检索阶段
- 多模态查询:支持纯文本查询,通过阿里云
qwen2.5-vl-embedding模型生成查询嵌入向量
- 相似度搜索:在Qdrant中进行向量相似度搜索,返回最相关的文档(包括图片)
- 结果返回:使用
response_format="content_and_artifact"格式,同时返回序列化内容和原始文档对象
3. 图片返回格式
图片以两种形式返回:
- Base64数据URL:格式为
data:image/{mime_type};base64,{base64_data}
- 元数据信息:包括图片描述、原始token等元数据
关键技术实现
多模态嵌入
1 2 3 4 5 6
| response = dashscope.MultiModalEmbedding.call( model="qwen2.5-vl-embedding", input=[{"text": caption}, {"image": encoded_image}], api_key=api_key )
|
配置支持
在config.py中支持多种嵌入提供者:
EMBEDDING_PROVIDER_ALIYUN_MM:阿里云多模态嵌入
EMBEDDING_PROVIDER_INFINI:Infini自有嵌入
EMBEDDING_PROVIDER_OPENAI:OpenAI兼容接口
检索工具集成
RAG工具配置为返回content_and_artifact格式,使得Agent可以同时获取:
- 序列化的文本内容(供LLM理解)
- 原始文档对象(包含图片base64数据等元数据)
优势特点
- 统一接口:文本和图片使用相同的RAG接口进行检索
- 多模态支持:支持文本到图片、图片到文本的跨模态检索
- 灵活的存储:图片以base64格式存储,便于直接在前端显示
- 可扩展架构:支持多种嵌入提供者和向量数据库
使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13
| config.rag.settings = [ RagSetting( embedding_provider=EMBEDDING_PROVIDER_ALIYUN_MM, embedding_model="qwen2.5-vl-embedding", collection_name="multimodal_collection" ) ]
retriever = rag_client.as_retriever() results = retriever.invoke("查找相关图片")
|
这个设计使得项目能够有效地处理多模态RAG场景,特别是在飞书文档爬取和检索的应用中,能够很好地处理文档中的图片内容。
引言
多模态(Multimodal)AI 已成为智能体系统的核心能力之一。Infini Agent Framework 基于 LangGraph 构建,在架构层面原生支持多模态输入输出。本文将深入剖析框架中多模态能力的底层实现机制,涵盖消息格式定义、传输协议、工具集成等关键技术细节。
1. 多模态消息格式定义
1.1 核心数据模型
框架在 infini_agent_framework.langgraph.schema 模块中定义了多模态消息的核心数据结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| from pydantic import BaseModel, Field from typing import Literal import re
class TextContent(BaseModel): """文本内容模型,遵循 OpenAI 多模态 API 规范""" type: Literal["text"] = Field(default="text", description="内容类型") text: str = Field(..., description="文本内容")
class ImageUrl(BaseModel): """图像 URL 模型,支持 Base64 数据 URL 和 HTTPS URL""" url: str = Field(..., description="图像 URL") @field_validator("url") @classmethod def validate_url(cls, v: str) -> str: base64_pattern = r"^data:image/[a-zA-Z0-9+/]+;base64,[A-Za-z0-9+/=]+$" https_pattern = r"^https://.+" if re.match(base64_pattern, v) or re.match(https_pattern, v): return v raise ValueError("URL 必须是 Base64 数据 URL 或 HTTPS URL")
class ImageContent(BaseModel): """图像内容模型""" type: Literal["image_url"] = Field(default="image_url", description="内容类型") image_url: ImageUrl = Field(..., description="图像 URL 内容") @staticmethod def from_base64(mime_type: str, base64_data: str) -> "ImageContent": """从 Base64 数据创建图像内容""" return ImageContent( image_url=ImageUrl(url=f"data:{mime_type};base64,{base64_data}") )
|
1.2 消息容器
多模态消息通过 LangChain 的 HumanMessage 容器承载,支持混合内容类型:
1 2 3 4 5 6 7 8 9 10 11 12
| from langchain_core.messages import HumanMessage
multimodal_message = HumanMessage(content=[ {"type": "text", "text": "请分析这张图片"}, { "type": "image_url", "image_url": { "url": "data:image/jpeg;base64,/9j/4AAQSkZJRg..." } } ])
|
这里需要多模态大模型来支持这种编码形式
2. 客户端多模态处理
2.1 图像文件处理工具
框架提供了 ImageUtil 工具类,负责图像文件的读取和编码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import base64 import os from pathlib import Path from typing import Tuple, Optional
class ImageUtil: """图像文件处理工具""" MIME_TYPES = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".gif": "image/gif", ".webp": "image/webp", } @staticmethod def read_image_file(image_path: str) -> Tuple[Optional[str], Optional[str]]: """读取图像文件并转换为 Base64 编码""" expanded_path = os.path.expanduser(image_path) path = Path(expanded_path) if not path.exists() or not path.is_file(): return None, None suffix = path.suffix.lower() mime_type = ImageUtil.MIME_TYPES.get(suffix) if not mime_type: return None, None with open(path, "rb") as f: image_data = base64.b64encode(f.read()).decode("utf-8") return image_data, mime_type
|
2.2 客户端消息发送
Client 类提供了多模态消息发送接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| from typing import List from infini_agent_framework.langgraph.schema import ImageContent
class Client: def send_message( self, message: str, images: List[ImageContent] = [], stream: bool = False ) -> dict: """发送多模态消息 Args: message: 文本消息内容 images: 图像内容列表 stream: 是否流式响应 """ content = [{"type": "text", "text": message}] for image in images: content.append({ "type": "image_url", "image_url": {"url": image.image_url.url} }) request_data = { "messages": [{"role": "user", "content": content}], "stream": stream } response = self._post("/chat/completions", json=request_data) return response.json()
|
3. 服务器端消息转换
3.1 Agent Manager 消息转换
服务器端的 AgentManager 负责将外部 API 请求转换为 LangGraph 内部格式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| from typing import Union from infini_agent_framework.server.agent_manager import ( AgentManagerRequestMessage, AgentManagerMultiContent, AgentManagerRequestMessageContentType )
class AgentManager: def _transform_request_message( self, message: AgentManagerRequestMessage ) -> Union[str, list]: """转换 Agent Manager 消息为 LangGraph 格式""" if (len(message.multi_content) == 1 and message.multi_content[0].type == AgentManagerRequestMessageContentType.TEXT): return message.multi_content[0].content.content content = [] for item in message.multi_content: if item.type == AgentManagerRequestMessageContentType.TEXT: content.append({ "type": "text", "text": item.content.content }) elif item.type == AgentManagerRequestMessageContentType.IMAGE_URL: if item.content.url and item.content.url.startswith("data:"): self._validate_base64_image(item.content.url) content.append({ "type": "image_url", "image_url": {"url": item.content.url} }) return content def _validate_base64_image(self, data_url: str): """验证 Base64 图像数据 URL""" pattern = r"^data:image/((jpeg)|(png)|(bmp));base64,[A-Za-z0-9+/=]+$" if not re.match(pattern, data_url): raise ValueError(f"无效的 Base64 图像数据 URL: {data_url}")
|
3.2 消息验证中间件
框架通过 Pydantic 模型验证和自定义验证器确保多模态消息的安全性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| from pydantic import field_validator
class DefaultAgentChatRequest(BaseModel): """聊天请求模型,支持多模态内容""" messages: list = Field( ..., description="消息列表(支持多模态内容)", min_length=1 ) @field_validator("messages") @classmethod def validate_multimodal_content(cls, v: list) -> list: """验证多模态消息内容""" for message in v: if isinstance(message.content, list): for content_item in message.content: if content_item.get("type") == "image_url": url = content_item.get("image_url", {}).get("url", "") if url.startswith("http://"): raise ValueError("HTTP URL 不被允许,请使用 HTTPS") if url.startswith("data:"): base64_data = url.split(",", 1)[1] if "," in url else "" if len(base64_data) > 10 * 1024 * 1024: raise ValueError("Base64 数据过大") return v
|
4. MCP 工具集成机制
4.1 MCP 客户端实现
框架通过 McpToolClient 类管理多模态工具集成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| from langchain_mcp_adapters.client import MultiServerMCPClient from langchain_core.tools import BaseTool
class McpToolClient: def __init__(self, config): self.config = config self.mcp_client: Optional[MultiServerMCPClient] = None self.tools_by_server: Dict[str, List[BaseTool]] = {} async def initialize(self): """初始化 MCP 客户端并加载工具""" if not self.config.mcp.mcp_servers: return connections = {} for server_name, server_config in self.config.mcp.mcp_servers.items(): connections[server_name] = { "transport": "streamable_http", "url": server_config.url, "headers": server_config.headers, "timeout": server_config.timeout } self.mcp_client = MultiServerMCPClient(connections=connections) await self._load_all_tools() async def _load_server_tools(self, server_name: str): """加载指定服务器的工具""" tools = await self.mcp_client.get_tools(server_name=server_name) processed_tools = [] for tool in tools: if self.config.mcp.mcp_servers[server_name].wrap_tool_names: tool.name = f"{server_name}.{tool.name}" if tool.metadata and tool.metadata.get("_meta", {}).get("mcp_tool_need_approval"): tool = self._wrap_with_human_approval(tool) processed_tools.append(tool) self.tools_by_server[server_name] = processed_tools
|
4.2 多模态工具包装
框架提供了工具包装机制,支持人机协同和多模态处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| from langchain_core.tools import tool
def create_image_analysis_tool(): """创建图像分析工具""" @tool async def analyze_image(image_url: str, analysis_type: str = "general") -> str: """分析图像内容 Args: image_url: 图像 URL(支持 Base64 数据 URL) analysis_type: 分析类型(general/objects/text/faces) """ if image_url.startswith("data:"): mime_type, base64_data = image_url.split(";base64,") mime_type = mime_type.replace("data:", "") image_bytes = base64.b64decode(base64_data) else: import httpx async with httpx.AsyncClient() as client: response = await client.get(image_url) image_bytes = response.content result = await _call_vision_api(image_bytes, analysis_type) return result return analyze_image
def _wrap_with_human_approval(tool: BaseTool) -> BaseTool: """为工具添加人工审核包装""" from infini_agent_framework.langgraph.tools import add_human_in_the_loop return add_human_in_the_loop( tool, allow_edit=True, )
|
5. 多模态 RAG 集成
5.1 多模态嵌入生成
框架扩展了 RAG 客户端以支持多模态嵌入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| class MultimodalEmbeddings: """多模态嵌入生成器""" def __init__(self, config): self.config = config self.text_embedding = self._create_text_embedding() self.image_embedding = self._create_image_embedding() async def embed_documents(self, documents: List[Document]) -> List[List[float]]: """为多模态文档生成嵌入""" embeddings = [] for doc in documents: if doc.type == "text": embedding = await self.text_embedding.aembed_query(doc.content) elif doc.type == "image": embedding = await self.image_embedding.embed_image(doc.content) else: raise ValueError(f"不支持的文档类型: {doc.type}") embeddings.append(embedding) return embeddings def _create_image_embedding(self): """创建图像嵌入模型""" from transformers import CLIPProcessor, CLIPModel import torch model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") class ImageEmbeddingWrapper: async def embed_image(self, image_data: bytes) -> List[float]: inputs = processor(images=image_data, return_tensors="pt") with torch.no_grad(): image_features = model.get_image_features(**inputs) return image_features[0].tolist() return ImageEmbeddingWrapper()
|
5.2 多模态检索
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| class MultimodalRetriever: """多模态检索器""" def __init__(self, vector_store, text_embedding, image_embedding): self.vector_store = vector_store self.text_embedding = text_embedding self.image_embedding = image_embedding async def retrieve( self, query: Union[str, bytes], query_type: str = "text", top_k: int = 5 ) -> List[Document]: """检索相关文档""" if query_type == "text": query_embedding = await self.text_embedding.aembed_query(query) elif query_type == "image": query_embedding = await self.image_embedding.embed_image(query) else: raise ValueError(f"不支持的查询类型: {query_type}") results = self.vector_store.similarity_search_by_vector( query_embedding, k=top_k ) return results
|
6. 性能优化策略
6.1 图像预处理流水线
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| class ImagePreprocessor: """图像预处理流水线""" def __init__(self, max_size: Tuple[int, int] = (1024, 1024)): self.max_size = max_size async def preprocess(self, image_data: bytes) -> bytes: """预处理图像数据""" from PIL import Image import io image = Image.open(io.BytesIO(image_data)) image.thumbnail(self.max_size, Image.Resampling.LANCZOS) output = io.BytesIO() image.save(output, format="WEBP", quality=85) return output.getvalue()
|
6.2 异步批处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import asyncio from typing import List
class AsyncBatchProcessor: """异步批处理器""" def __init__(self, batch_size: int = 10, max_concurrent: int = 5): self.batch_size = batch_size self.max_concurrent = max_concurrent async def process_batch( self, items: List, process_func: callable ) -> List: """批量处理项目""" semaphore = asyncio.Semaphore(self.max_concurrent) async def process_with_semaphore(item): async with semaphore: return await process_func(item) results = [] for i in range(0, len(items), self.batch_size): batch = items[i:i + self.batch_size] tasks = [process_with_semaphore(item) for item in batch] batch_results = await asyncio.gather(*tasks) results.extend(batch_results) return results
|
7. 扩展开发指南
7.1 添加新的模态支持
# 1. 定义新的内容模型
from pydantic import BaseModel, Field
from typing import Literal
class VideoContent(BaseModel):
"""视频内容模型"""
type: Literal["video_url"] = Field(default="video_url")
video_url: VideoUrl = Field(..., description="视频 URL")
class VideoUrl(BaseModel):
url: str = Field(..., description="视频 URL")
@field_validator("url")
@classmethod
def validate_video_url(cls, v: str):
# 验证视频 URL 格式
allowed_formats = [".mp4", ".webm", ".mov"]
if any(v.endswith(fmt) for fmt in allowed_formats):
return v
raise ValueError(f"不支持的视频格式,支持: {allowed_formats}")
# 2. 扩展消息转换逻辑
def extend_message_transformer(original_transformer):
"""扩展消息转换器以支持视频"""
def extended_transformer(message):
content = original_transformer(message)
if isinstance(content, list):
for item in message.multi_content:
if item.type == "video_url":
content.append({
"type": "video_url",
"video_url": {"url": item.content.url}
})
return content
return extended_transformer
#