基于Langchain的txt文本向量库搭建与检索

这里的源码主要来自于Langchain-ChatGLM中的向量库部分,做了一些代码上的修改和封装,以适用于基于问题包含数据库表描述的txt文件(文件名为库表名,文件内容为库表中的字段及描述)对数据库表进行快速检索。

中文分词类

splitter.py

from langchain.text_splitter import CharacterTextSplitter
import re
from typing import List


class ChineseTextSplitter(CharacterTextSplitter):
    def __init__(self, pdf: bool = False, sentence_size: int = 100, **kwargs):
        super().__init__(**kwargs)
        self.pdf = pdf
        self.sentence_size = sentence_size

    def split_text1(self, text: str) -> List[str]:
        if self.pdf:
            text = re.sub(r"n{3,}", "n", text)
            text = re.sub('s', ' ', text)
            text = text.replace("nn", "")
        sent_sep_pattern = re.compile('([﹒﹔﹖﹗。!?]["’”」』]{0,2}|(?=["‘“「『]{1,2}|$))')  # del :;
        sent_list = []
        for ele in sent_sep_pattern.split(text):
            if sent_sep_pattern.match(ele) and sent_list:
                sent_list[-1] += ele
            elif ele:
                sent_list.append(ele)
        return sent_list

    def split_text(self, text: str) -> List[str]:   ##此处需要进一步优化逻辑
        if self.pdf:
            text = re.sub(r"n{3,}", r"n", text)
            text = re.sub('s', " ", text)
            text = re.sub("nn", "", text)

        text = re.sub(r'([;;!?。!??])([^”’])', r"1n2", text)  # 单字符断句符
        text = re.sub(r'(.{6})([^"’”」』])', r"1n2", text)  # 英文省略号
        text = re.sub(r'(…{2})([^"’”」』])', r"1n2", text)  # 中文省略号
        text = re.sub(r'([;;!?。!??]["’”」』]{0,2})([^;;!?,。!??])', r'1n2', text)
        # 如果双引号前有终止符,那么双引号才是句子的终点,把分句符n放到双引号后,注意前面的几句都小心保留了双引号
        text = text.rstrip()  # 段尾如果有多余的n就去掉它
        # 很多规则中会考虑分号;,但是这里我把它忽略不计,破折号、英文双引号等同样忽略,需要的再做些简单调整即可。
        ls = [i for i in text.split("n") if i]
        for ele in ls:
            if len(ele) > self.sentence_size:
                ele1 = re.sub(r'([,,]["’”」』]{0,2})([^,,])', r'1n2', ele)
                ele1_ls = ele1.split("n")
                for ele_ele1 in ele1_ls:
                    if len(ele_ele1) > self.sentence_size:
                        ele_ele2 = re.sub(r'([n]{1,}| {2,}["’”」』]{0,2})([^s])', r'1n2', ele_ele1)
                        ele2_ls = ele_ele2.split("n")
                        for ele_ele2 in ele2_ls:
                            if len(ele_ele2) > self.sentence_size:
                                ele_ele3 = re.sub('( ["’”」』]{0,2})([^ ])', r'1n2', ele_ele2)
                                ele2_id = ele2_ls.index(ele_ele2)
                                ele2_ls = ele2_ls[:ele2_id] + [i for i in ele_ele3.split("n") if i] + ele2_ls[
                                                                                                       ele2_id + 1:]
                        ele_id = ele1_ls.index(ele_ele1)
                        ele1_ls = ele1_ls[:ele_id] + [i for i in ele2_ls if i] + ele1_ls[ele_id + 1:]

                id = ls.index(ele)
                ls = ls[:id] + [i for i in ele1_ls if i] + ls[id + 1:]
        return ls

faiss向量库类

myfaiss.py

from langchain.vectorstores import FAISS
from langchain.vectorstores.base import VectorStore
from langchain.vectorstores.faiss import dependable_faiss_import
from typing import Any, Callable, List, Dict
from langchain.docstore.base import Docstore
from langchain.docstore.document import Document
import numpy as np
import copy
import os


class MyFAISS(FAISS, VectorStore):
    def __init__(
            self,
            embedding_function: Callable,
            index: Any,
            docstore: Docstore,
            index_to_docstore_id: Dict[int, str],
            normalize_L2: bool = False,
    ):
        super().__init__(embedding_function=embedding_function,
                         index=index,
                         docstore=docstore,
                         index_to_docstore_id=index_to_docstore_id,
                         normalize_L2=normalize_L2)

    def seperate_list(self, ls: List[int]) -> List[List[int]]:
        lists = []
        ls1 = [ls[0]]
        source1 = self.index_to_docstore_source(ls[0])
        for i in range(1, len(ls)):
            if ls[i - 1] + 1 == ls[i] and self.index_to_docstore_source(ls[i]) == source1:
                ls1.append(ls[i])
            else:
                lists.append(ls1)
                ls1 = [ls[i]]
                source1 = self.index_to_docstore_source(ls[i])
        lists.append(ls1)
        return lists

    def similarity_search_with_score_by_vector(
            self, embedding: List[float], k: int = 4
    ) -> List[Document]:
        faiss = dependable_faiss_import()
        # (1,1024)
        vector = np.array([embedding], dtype=np.float32)
        # 默认FALSE
        if self._normalize_L2:
            faiss.normalize_L2(vector)
        # shape均为(1, k)
        scores, indices = self.index.search(vector, k)
        docs = []
        id_set = set()
        # 存储关键句
        keysentences = []
        # 遍历找到的k个最近相关文档的索引
        # top-k是第一次的筛选条件,score是第二次的筛选条件
        for j, i in enumerate(indices[0]):
            if i in self.index_to_docstore_id:
                _id = self.index_to_docstore_id[i]
            # 执行接下来的操作
            else:
                continue
            # index→id→content
            doc = self.docstore.search(_id)
            doc.metadata["score"] = int(scores[0][j])
            docs.append(doc)
            # 其实存的都是index
            id_set.add(i)
        docs.sort(key=lambda doc: doc.metadata['score'])
        return docs

嵌入检索类

embedder.py

from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from langchain.document_loaders import TextLoader
from embeddings.splitter import ChineseTextSplitter
from embeddings.myfaiss import MyFAISS
import os
import torch
from config import *

def torch_gc():
    if torch.cuda.is_available():
        # with torch.cuda.device(DEVICE):
        torch.cuda.empty_cache()
        torch.cuda.ipc_collect()
    elif torch.backends.mps.is_available():
        try:
            from torch.mps import empty_cache
            empty_cache()
        except Exception as e:
            print(e)
            print("如果您使用的是 macOS 建议将 pytorch 版本升级至 2.0.0 或更高版本,以支持及时清理 torch 产生的内存占用。")

class Embedder:
    def __init__(self, config):
        self.model = HuggingFaceEmbeddings(model_name=
            "/NLP/LLM/pretrained_model/WordEmbeddings/"+config.emb_model,
            model_kwargs={'device': 'cuda'})
        self.config = config
        self.create_vector_score()
        self.vector_store = MyFAISS.load_local(self.config.db_vs_path, self.model)

    def load_file(self, filepath):
        # 对文件分词
        if filepath.lower().endswith(".txt"):
            loader = TextLoader(filepath, autodetect_encoding=True)
            textsplitter = ChineseTextSplitter(pdf=False, sentence_size=self.config.sentence_size)
            docs = loader.load_and_split(textsplitter)
        else:
            raise Exception("{}文件不是txt格式".format(filepath))
        return docs
    
    def txt2vector_store(self, filepaths):
        # 批量建立知识库
        docs = []
        for filepath in filepaths:
            try:
                docs += self.load_file(filepath)
            except Exception as e:
                raise Exception("{}文件加载失败".format(filepath))
        print("文件加载完毕,正在生成向量库")
        vector_store = MyFAISS.from_documents(docs, self.model)
        torch_gc()
        vector_store.save_local(self.config.db_vs_path)

    def create_vector_score(self):
        if "index.faiss" not in os.listdir(self.config.db_vs_path):
            filepaths = os.listdir(self.config.db_doc_path)
            filepaths = [os.path.join(self.config.db_doc_path, filepath) for filepath in filepaths]
            self.txt2vector_store(filepaths)
        print("向量库已建立成功")

    def get_topk_db(self, query):
        related_dbs_with_score = self.vector_store.similarity_search_with_score(query, k=self.config.sim_k)
        topk_db = [{'匹配句': db_data.page_content, 
                    '数据库': os.path.basename(db_data.metadata['source'])[:-4], 
                    '得分': db_data.metadata['score']} 
                   for db_data in related_dbs_with_score]
        return topk_db

测试代码

Config是用来传参的类,这里略去定义

if __name__ == '__main__':
    Conf = Config()
    configs = Conf.get_config()
    embedder = Embedder(configs)
    query = "公司哪个月的出勤率是最高的?"
    topk_db = embedder.get_topk_db(query)
    print(topk_db)

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>