前言:从理论到实践
在上一篇文章中,我们详细介绍了LangChain的核心概念和基础模块,包括Model I/O、Chain和Agent。现在,让我们将这些知识应用到实际项目中,构建一个完整的智能问答系统。
1、项目概述:智能问答系统架构
我们的智能问答系统采用RAG(Retrieval-Augmented Generation)架构:
用户问题 → 语义检索 → 相关文档 → 问答生成 → 答案
↑ ↓
向量数据库 ← 文档处理
2、完整实现:智能问答系统
2.1 基础版本实现
import os
from typing import List, Dict, Any
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.chains import RetrievalQA
from langchain.vectorstores import Chroma
from langchain.text_splitter import CharacterTextSplitter
from langchain.document_loaders import TextLoader, PyPDFLoader, Docx2txtLoader
from langchain.prompts import PromptTemplate
class SmartQASystem:
"""智能问答系统"""
def __init__(self, model_config: Dict[str, Any] = None):
# 默认配置
default_config = {
'model_name': 'qwen1.5-32b-chat-int4',
'openai_api_base': 'http://20.20.136.251:8001/v1',
'openai_api_key': 'q7r8s9t0-u1v2-w3x4-y5z6-a7b8c9d0e1f2',
'temperature': 0.7,
'max_tokens': 1000
}
if model_config:
default_config.update(model_config)
# 初始化模型
self.llm = ChatOpenAI(
model_name=default_config['model_name'],
openai_api_base=default_config['openai_api_base'],
openai_api_key=default_config['openai_api_key'],
temperature=default_config['temperature'],
max_tokens=default_config['max_tokens']
)
self.embeddings = OpenAIEmbeddings(
openai_api_base=default_config['openai_api_base'],
openai_api_key=default_config['openai_api_key']
)
self.vectorstore = None
self.qa_chain = None
self.text_splitter = CharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separator="\n"
)
def load_documents(self, directory: str) -> List[Any]:
"""加载目录下的所有文档"""
all_documents = []
supported_extensions = ['.pdf', '.docx', '.txt', '.md']
for root, _, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext in supported_extensions:
try:
if file_ext == '.pdf':
loader = PyPDFLoader(file_path)
elif file_ext == '.docx':
loader = Docx2txtLoader(file_path)
else:
loader = TextLoader(file_path, encoding='utf-8')
documents = loader.load()
all_documents.extend(documents)
except Exception as e:
print(f"加载文档失败: {file_path}, 错误: {str(e)}")
print(f"总共加载了 {len(all_documents)} 个文档片段")
return all_documents
def create_vectorstore(self, documents: List[Any], persist_dir: str = "./chroma_db") -> Chroma:
"""创建向量数据库"""
if not documents:
raise ValueError("没有可处理的文档")
# 分割文档
texts = self.text_splitter.split_documents(documents)
print(f"文档分割完成,共{len(texts)}个文本片段")
# 创建向量数据库
self.vectorstore = Chroma.from_documents(
documents=texts,
embedding=self.embeddings,
persist_directory=persist_dir
)
self.vectorstore.persist()
print(f"向量数据库已创建并保存到: {persist_dir}")
return self.vectorstore
def create_qa_chain(self, chain_type: str = "stuff") -> RetrievalQA:
"""创建问答链"""
if not self.vectorstore:
raise ValueError("请先创建向量数据库")
# 自定义提示模板
prompt_template = """基于以下上下文信息,回答用户的问题。如果你不知道答案,就说你不知道,不要编造答案。
上下文:
{context}
问题:{question}
请提供详细、准确的答案:"""
PROMPT = PromptTemplate(
template=prompt_template,
input_variables=["context", "question"]
)
# 创建检索器
retriever = self.vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 4}
)
# 创建问答链
self.qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type=chain_type,
retriever=retriever,
chain_type_kwargs={"prompt": PROMPT},
return_source_documents=True
)
print(f"问答链已创建,类型: {chain_type}")
return self.qa_chain
def ask_question(self, question: str) -> Dict[str, Any]:
"""提问并获取答案"""
if not self.qa_chain:
raise ValueError("请先创建问答链")
try:
result = self.qa_chain.invoke({"query": question})
answer = result.get("result", "无法生成答案")
source_docs = result.get("source_documents", [])
response = {
"question": question,
"answer": answer,
"sources": []
}
for i, doc in enumerate(source_docs[:3]):
source_info = {
"source": doc.metadata.get("source", "未知"),
"page": doc.metadata.get("page", "未知"),
"content_preview": doc.page_content[:200] + "..." if len(doc.page_content) > 200 else doc.page_content
}
response["sources"].append(source_info)
return response
except Exception as e:
print(f"提问失败: {str(e)}")
return {
"question": question,
"answer": f"抱歉,处理问题时出现错误: {str(e)}",
"sources": []
}
# 使用示例
if __name__ == "__main__":
# 1. 初始化系统
qa_system = SmartQASystem()
# 2. 加载文档
documents = qa_system.load_documents("./documents")
# 3. 创建向量数据库
qa_system.create_vectorstore(documents)
# 4. 创建问答链
qa_system.create_qa_chain()
# 5. 测试问答
questions = [
"LangChain是什么?",
"如何创建向量数据库?",
"Agent模块有什么作用?"
]
for question in questions:
print(f"\n问题: {question}")
result = qa_system.ask_question(question)
print(f"答案: {result['answer'][:200]}...")
2.2 高级功能:多轮对话支持
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
class AdvancedQASystem(SmartQASystem):
"""支持多轮对话的问答系统"""
def __init__(self, model_config: Dict[str, Any] = None):
super().__init__(model_config)
self.conversation_memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
self.conversation_chain = None
def create_conversation_chain(self):
"""创建对话式检索链"""
if not self.vectorstore:
raise ValueError("请先创建向量数据库")
retriever = self.vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 10, "lambda_mult": 0.5}
)
self.conversation_chain = ConversationalRetrievalChain.from_llm(
llm=self.llm,
retriever=retriever,
memory=self.conversation_memory,
verbose=True
)
return self.conversation_chain
def chat(self, question: str) -> str:
"""多轮对话"""
if not self.conversation_chain:
self.create_conversation_chain()
result = self.conversation_chain.invoke({"question": question})
return result["answer"]
2.3 性能优化:缓存机制
from langchain.cache import InMemoryCache
from langchain.globals import set_llm_cache
from functools import lru_cache
class OptimizedQASystem(AdvancedQASystem):
"""带缓存的优化版问答系统"""
def __init__(self, model_config: Dict[str, Any] = None):
super().__init__(model_config)
set_llm_cache(InMemoryCache())
@lru_cache(maxsize=100)
def get_cached_answer(self, question: str) -> Dict[str, Any]:
"""缓存常见问题的答案"""
return super().ask_question(question)
def ask_question_optimized(self, question: str, use_cache: bool = True) -> Dict[str, Any]:
"""优化版的提问方法"""
if use_cache:
cached_result = self.get_cached_answer(question)
if cached_result:
print("使用缓存答案")
return cached_result
return super().ask_question(question)
3、部署与使用
3.1 环境配置
创建 requirements.txt:
langchain>=0.1.0
langchain-openai>=0.0.5
langchain-community>=0.0.10
chromadb>=0.4.0
tiktoken>=0.5.0
openai>=1.0.0
3.2 快速启动脚本
创建 run_qa.py:
#!/usr/bin/env python3
"""
智能问答系统启动脚本
"""
import sys
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent
sys.path.append(str(project_root))
from qa_system import SmartQASystem
def main():
"""主函数"""
print("=" * 50)
print("智能问答系统启动")
print("=" * 50)
try:
# 初始化系统
qa_system = SmartQASystem()
# 检查文档目录
docs_dir = project_root / "documents"
if not docs_dir.exists():
print(f"警告:文档目录不存在: {docs_dir}")
print("请创建文档目录并添加PDF、TXT或DOCX文件")
docs_dir.mkdir(parents=True, exist_ok=True)
return
# 加载文档
print("正在加载文档...")
documents = qa_system.load_documents(str(docs_dir))
if not documents:
print("没有找到可处理的文档")
return
# 创建向量数据库
print("正在创建向量数据库...")
qa_system.create_vectorstore(documents)
# 创建问答链
print("正在创建问答链...")
qa_system.create_qa_chain()
print("\n系统初始化完成!")
print("输入 'quit' 或 'exit' 退出")
print("-" * 50)
# 交互模式
while True:
try:
question = input("\n请输入问题: ").strip()
if question.lower() in ['quit', 'exit', '退出']:
print("感谢使用,再见!")
break
if not question:
continue
# 获取答案
result = qa_system.ask_question(question)
# 显示答案
print(f"\n答案: {result['answer']}")
# 显示源文档
if result['sources']:
print("\n参考来源:")
for i, source in enumerate(result['sources'], 1):
print(f"{i}. {source['source']}")
if source['page'] != "未知":
print(f" 页码: {source['page']}")
except KeyboardInterrupt:
print("\n\n程序被用户中断")
break
except Exception as e:
print(f"发生错误: {str(e)}")
except Exception as e:
print(f"系统启动失败: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
3.3 Docker部署
创建 Dockerfile:
FROM python:3.10-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建文档目录
RUN mkdir -p ./documents ./chroma_db
# 启动命令
CMD ["python", "run_qa.py"]
4、总结与最佳实践
4.1 核心要点总结
- 文档处理:使用合适的加载器和分割器处理不同格式的文档
- 向量化:选择适合的嵌入模型将文本转换为向量
- 检索优化:调整检索参数(k值、搜索类型)平衡精度和速度
- 提示工程:设计清晰的提示模板提高答案质量
- 性能优化:使用缓存、批处理等技术提升系统响应速度
4.2 常见问题与解决方案
| 问题 | 解决方案 |
|---|---|
| 文档加载失败 | 检查文件格式、编码和权限 |
| 向量数据库创建慢 | 减少chunk_size,增加chunk_overlap |
| 检索结果不准确 | 调整检索参数,优化嵌入模型 |
| 答案质量差 | 改进提示模板,增加上下文长度 |
| 内存占用高 | 使用持久化向量数据库,启用缓存 |
4.3 下一步学习建议
- 扩展功能:添加文件上传接口、用户认证、对话历史保存
- 性能优化:实现异步处理、分布式向量数据库、模型量化
- 监控运维:添加日志记录、性能监控、错误报警
- 领域适配:针对特定领域优化提示模板和检索策略
通过本文的实战项目,你已经掌握了使用LangChain构建智能问答系统的核心技能。接下来,可以根据具体业务需求,进一步优化和扩展系统功能。
本文是LangChain开发框架实践系列的第二篇,专注于实战项目开发。建议结合第一篇的基础知识学习,掌握完整的LangChain开发流程。
「真诚赞赏,手留余香」
真诚赞赏,手留余香
使用微信扫描二维码完成支付