LangChain开发框架实战:构建智能问答系统

Posted by 虞天 on Saturday, July 13, 2024

前言:从理论到实践

在上一篇文章中,我们详细介绍了LangChain的核心概念和基础模块,包括Model I/O、Chain和Agent。现在,让我们将这些知识应用到实际项目中,构建一个完整的智能问答系统。

1、项目概述:智能问答系统架构

我们的智能问答系统采用RAG(Retrieval-Augmented Generation)架构:

用户问题 → 语义检索 → 相关文档 → 问答生成 → 答案
         ↑          ↓
     向量数据库 ← 文档处理

2、完整实现:智能问答系统

2.1 基础版本实现

import os
from typing import List, Dict, Any
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.chains import RetrievalQA
from langchain.vectorstores import Chroma
from langchain.text_splitter import CharacterTextSplitter
from langchain.document_loaders import TextLoader, PyPDFLoader, Docx2txtLoader
from langchain.prompts import PromptTemplate

class SmartQASystem:
    """智能问答系统"""
    
    def __init__(self, model_config: Dict[str, Any] = None):
        # 默认配置
        default_config = {
            'model_name': 'qwen1.5-32b-chat-int4',
            'openai_api_base': 'http://20.20.136.251:8001/v1',
            'openai_api_key': 'q7r8s9t0-u1v2-w3x4-y5z6-a7b8c9d0e1f2',
            'temperature': 0.7,
            'max_tokens': 1000
        }
        
        if model_config:
            default_config.update(model_config)
        
        # 初始化模型
        self.llm = ChatOpenAI(
            model_name=default_config['model_name'],
            openai_api_base=default_config['openai_api_base'],
            openai_api_key=default_config['openai_api_key'],
            temperature=default_config['temperature'],
            max_tokens=default_config['max_tokens']
        )
        
        self.embeddings = OpenAIEmbeddings(
            openai_api_base=default_config['openai_api_base'],
            openai_api_key=default_config['openai_api_key']
        )
        
        self.vectorstore = None
        self.qa_chain = None
        self.text_splitter = CharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            separator="\n"
        )
    
    def load_documents(self, directory: str) -> List[Any]:
        """加载目录下的所有文档"""
        all_documents = []
        supported_extensions = ['.pdf', '.docx', '.txt', '.md']
        
        for root, _, files in os.walk(directory):
            for file in files:
                file_path = os.path.join(root, file)
                file_ext = os.path.splitext(file_path)[1].lower()
                
                if file_ext in supported_extensions:
                    try:
                        if file_ext == '.pdf':
                            loader = PyPDFLoader(file_path)
                        elif file_ext == '.docx':
                            loader = Docx2txtLoader(file_path)
                        else:
                            loader = TextLoader(file_path, encoding='utf-8')
                        
                        documents = loader.load()
                        all_documents.extend(documents)
                    except Exception as e:
                        print(f"加载文档失败: {file_path}, 错误: {str(e)}")
        
        print(f"总共加载了 {len(all_documents)} 个文档片段")
        return all_documents
    
    def create_vectorstore(self, documents: List[Any], persist_dir: str = "./chroma_db") -> Chroma:
        """创建向量数据库"""
        if not documents:
            raise ValueError("没有可处理的文档")
        
        # 分割文档
        texts = self.text_splitter.split_documents(documents)
        print(f"文档分割完成,共{len(texts)}个文本片段")
        
        # 创建向量数据库
        self.vectorstore = Chroma.from_documents(
            documents=texts,
            embedding=self.embeddings,
            persist_directory=persist_dir
        )
        
        self.vectorstore.persist()
        print(f"向量数据库已创建并保存到: {persist_dir}")
        return self.vectorstore
    
    def create_qa_chain(self, chain_type: str = "stuff") -> RetrievalQA:
        """创建问答链"""
        if not self.vectorstore:
            raise ValueError("请先创建向量数据库")
        
        # 自定义提示模板
        prompt_template = """基于以下上下文信息,回答用户的问题。如果你不知道答案,就说你不知道,不要编造答案。

        上下文:
        {context}

        问题:{question}

        请提供详细、准确的答案:"""
        
        PROMPT = PromptTemplate(
            template=prompt_template,
            input_variables=["context", "question"]
        )
        
        # 创建检索器
        retriever = self.vectorstore.as_retriever(
            search_type="similarity",
            search_kwargs={"k": 4}
        )
        
        # 创建问答链
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type=chain_type,
            retriever=retriever,
            chain_type_kwargs={"prompt": PROMPT},
            return_source_documents=True
        )
        
        print(f"问答链已创建,类型: {chain_type}")
        return self.qa_chain
    
    def ask_question(self, question: str) -> Dict[str, Any]:
        """提问并获取答案"""
        if not self.qa_chain:
            raise ValueError("请先创建问答链")
        
        try:
            result = self.qa_chain.invoke({"query": question})
            
            answer = result.get("result", "无法生成答案")
            source_docs = result.get("source_documents", [])
            
            response = {
                "question": question,
                "answer": answer,
                "sources": []
            }
            
            for i, doc in enumerate(source_docs[:3]):
                source_info = {
                    "source": doc.metadata.get("source", "未知"),
                    "page": doc.metadata.get("page", "未知"),
                    "content_preview": doc.page_content[:200] + "..." if len(doc.page_content) > 200 else doc.page_content
                }
                response["sources"].append(source_info)
            
            return response
            
        except Exception as e:
            print(f"提问失败: {str(e)}")
            return {
                "question": question,
                "answer": f"抱歉,处理问题时出现错误: {str(e)}",
                "sources": []
            }

# 使用示例
if __name__ == "__main__":
    # 1. 初始化系统
    qa_system = SmartQASystem()
    
    # 2. 加载文档
    documents = qa_system.load_documents("./documents")
    
    # 3. 创建向量数据库
    qa_system.create_vectorstore(documents)
    
    # 4. 创建问答链
    qa_system.create_qa_chain()
    
    # 5. 测试问答
    questions = [
        "LangChain是什么?",
        "如何创建向量数据库?",
        "Agent模块有什么作用?"
    ]
    
    for question in questions:
        print(f"\n问题: {question}")
        result = qa_system.ask_question(question)
        print(f"答案: {result['answer'][:200]}...")

2.2 高级功能:多轮对话支持

from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory

class AdvancedQASystem(SmartQASystem):
    """支持多轮对话的问答系统"""
    
    def __init__(self, model_config: Dict[str, Any] = None):
        super().__init__(model_config)
        
        self.conversation_memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )
        self.conversation_chain = None
    
    def create_conversation_chain(self):
        """创建对话式检索链"""
        if not self.vectorstore:
            raise ValueError("请先创建向量数据库")
        
        retriever = self.vectorstore.as_retriever(
            search_type="mmr",
            search_kwargs={"k": 5, "fetch_k": 10, "lambda_mult": 0.5}
        )
        
        self.conversation_chain = ConversationalRetrievalChain.from_llm(
            llm=self.llm,
            retriever=retriever,
            memory=self.conversation_memory,
            verbose=True
        )
        
        return self.conversation_chain
    
    def chat(self, question: str) -> str:
        """多轮对话"""
        if not self.conversation_chain:
            self.create_conversation_chain()
        
        result = self.conversation_chain.invoke({"question": question})
        return result["answer"]

2.3 性能优化:缓存机制

from langchain.cache import InMemoryCache
from langchain.globals import set_llm_cache
from functools import lru_cache

class OptimizedQASystem(AdvancedQASystem):
    """带缓存的优化版问答系统"""
    
    def __init__(self, model_config: Dict[str, Any] = None):
        super().__init__(model_config)
        set_llm_cache(InMemoryCache())
    
    @lru_cache(maxsize=100)
    def get_cached_answer(self, question: str) -> Dict[str, Any]:
        """缓存常见问题的答案"""
        return super().ask_question(question)
    
    def ask_question_optimized(self, question: str, use_cache: bool = True) -> Dict[str, Any]:
        """优化版的提问方法"""
        if use_cache:
            cached_result = self.get_cached_answer(question)
            if cached_result:
                print("使用缓存答案")
                return cached_result
        
        return super().ask_question(question)

3、部署与使用

3.1 环境配置

创建 requirements.txt

langchain>=0.1.0
langchain-openai>=0.0.5
langchain-community>=0.0.10
chromadb>=0.4.0
tiktoken>=0.5.0
openai>=1.0.0

3.2 快速启动脚本

创建 run_qa.py

#!/usr/bin/env python3
"""
智能问答系统启动脚本
"""

import sys
from pathlib import Path

# 添加项目根目录到Python路径
project_root = Path(__file__).parent
sys.path.append(str(project_root))

from qa_system import SmartQASystem

def main():
    """主函数"""
    print("=" * 50)
    print("智能问答系统启动")
    print("=" * 50)
    
    try:
        # 初始化系统
        qa_system = SmartQASystem()
        
        # 检查文档目录
        docs_dir = project_root / "documents"
        if not docs_dir.exists():
            print(f"警告:文档目录不存在: {docs_dir}")
            print("请创建文档目录并添加PDF、TXT或DOCX文件")
            docs_dir.mkdir(parents=True, exist_ok=True)
            return
        
        # 加载文档
        print("正在加载文档...")
        documents = qa_system.load_documents(str(docs_dir))
        
        if not documents:
            print("没有找到可处理的文档")
            return
        
        # 创建向量数据库
        print("正在创建向量数据库...")
        qa_system.create_vectorstore(documents)
        
        # 创建问答链
        print("正在创建问答链...")
        qa_system.create_qa_chain()
        
        print("\n系统初始化完成!")
        print("输入 'quit' 或 'exit' 退出")
        print("-" * 50)
        
        # 交互模式
        while True:
            try:
                question = input("\n请输入问题: ").strip()
                
                if question.lower() in ['quit', 'exit', '退出']:
                    print("感谢使用,再见!")
                    break
                
                if not question:
                    continue
                
                # 获取答案
                result = qa_system.ask_question(question)
                
                # 显示答案
                print(f"\n答案: {result['answer']}")
                
                # 显示源文档
                if result['sources']:
                    print("\n参考来源:")
                    for i, source in enumerate(result['sources'], 1):
                        print(f"{i}. {source['source']}")
                        if source['page'] != "未知":
                            print(f"   页码: {source['page']}")
                
            except KeyboardInterrupt:
                print("\n\n程序被用户中断")
                break
            except Exception as e:
                print(f"发生错误: {str(e)}")
    
    except Exception as e:
        print(f"系统启动失败: {str(e)}")
        sys.exit(1)

if __name__ == "__main__":
    main()

3.3 Docker部署

创建 Dockerfile

FROM python:3.10-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 创建文档目录
RUN mkdir -p ./documents ./chroma_db

# 启动命令
CMD ["python", "run_qa.py"]

4、总结与最佳实践

4.1 核心要点总结

  1. 文档处理:使用合适的加载器和分割器处理不同格式的文档
  2. 向量化:选择适合的嵌入模型将文本转换为向量
  3. 检索优化:调整检索参数(k值、搜索类型)平衡精度和速度
  4. 提示工程:设计清晰的提示模板提高答案质量
  5. 性能优化:使用缓存、批处理等技术提升系统响应速度

4.2 常见问题与解决方案

问题 解决方案
文档加载失败 检查文件格式、编码和权限
向量数据库创建慢 减少chunk_size,增加chunk_overlap
检索结果不准确 调整检索参数,优化嵌入模型
答案质量差 改进提示模板,增加上下文长度
内存占用高 使用持久化向量数据库,启用缓存

4.3 下一步学习建议

  1. 扩展功能:添加文件上传接口、用户认证、对话历史保存
  2. 性能优化:实现异步处理、分布式向量数据库、模型量化
  3. 监控运维:添加日志记录、性能监控、错误报警
  4. 领域适配:针对特定领域优化提示模板和检索策略

通过本文的实战项目,你已经掌握了使用LangChain构建智能问答系统的核心技能。接下来,可以根据具体业务需求,进一步优化和扩展系统功能。


本文是LangChain开发框架实践系列的第二篇,专注于实战项目开发。建议结合第一篇的基础知识学习,掌握完整的LangChain开发流程。

「真诚赞赏,手留余香」

YuTian Blog

真诚赞赏,手留余香

使用微信扫描二维码完成支付