当前位置: 首页 > news >正文

空调维修自己做网站站长工具浪潮

空调维修自己做网站,站长工具浪潮,淘宝网络营销方式,北京便宜的网站建设前言 微软目前的graphrag更像个demo,数据量大的时候不是很友好的啊,所以将milvus接入了graphrag,看完这篇文章,其他数据库接入应该也没问题 注:这篇文章只是在search的时候接入进来,index过程或者说整个流…

前言

微软目前的graphrag更像个demo,数据量大的时候不是很友好的啊,所以将milvus接入了graphrag,看完这篇文章,其他数据库接入应该也没问题

注:这篇文章只是在search的时候接入进来,index过程或者说整个流程接入有时间再写一遍博客

连接数据库

在graphrag.query.cil.py 文件中,我们定位到run_local_search函数中,找到

store_entity_semantic_embeddings(entities=entities, vectorstore=description_embedding_store
)

 将其注释掉,然后新加上

    if vector_store_type == VectorStoreType.Milvus:#自定义实现store_text_semantic_embeddings(Texts=text_units, vectorstore=description_embedding_store,final_documents=final_documents)else:store_entity_semantic_embeddings(entities=entities, vectorstore=description_embedding_store)

 其中vector_store_type是graphrag中向量数据库的选择,位于graphrag\vector_stores\typing.py中,我们需要手动加上 Milvus = 'milvus'

class VectorStoreType(str, Enum):"""The supported vector store types."""LanceDB = "lancedb"AzureAISearch = "azure_ai_search"Milvus = 'milvus'

 同时对get_vector_store进行修改,加入case VectorStoreType.Milvus

MilvusVectorStore是自定义类,实现milvus的接口,后续会讲
    @classmethoddef get_vector_store(cls, vector_store_type: VectorStoreType | str, kwargs: dict) -> LanceDBVectorStore | AzureAISearch:"""Get the vector store type from a string."""match vector_store_type:case VectorStoreType.LanceDB:return LanceDBVectorStore(**kwargs)case VectorStoreType.AzureAISearch:return AzureAISearch(**kwargs)case VectorStoreType.Milvus:return MilvusVectorStore(**kwargs)case _:if vector_store_type in cls.vector_store_types:return cls.vector_store_types[vector_store_type](**kwargs)msg = f"Unknown vector store type: {vector_store_type}"raise ValueError(msg)

        

然后是store_text_semantic_embeddings函数是对齐store_entity_semantic_embeddings实现的,位于graphrag\query\input\loaders\dfs.py中

def store_text_semantic_embeddings(Texts: list[TextUnit],vectorstore: BaseVectorStore,final_documents:DataFrame,
) -> BaseVectorStore:"""Store entity semantic embeddings in a vectorstore."""documents = []for Text in Texts:matching_rows = final_documents[final_documents['id'] == Text.document_ids[0]]if not matching_rows.empty: #如果存在文章名字 则存入文章名字 否则存入graphrag生成的文本块iddocument_title = matching_rows['title'].values[0]else:document_title = Text.document_idsattributes_dict = {'document_title': document_title,"entity_ids": Text.entity_ids} #除了文章名字 还有文本块中提取的实例idif Text.attributes:attributes_dict.update({**Text.attributes})documents.append(VectorStoreDocument(id=Text.id,text=Text.text,vector=Text.text_embedding,attributes=attributes_dict))vectorstore.load_documents(documents=documents) #将文本块数据加载进milvus数据库中return vectorstore

 具体代码如下:

from graphrag.query.input.loaders.dfs import (store_entity_semantic_embeddings,store_text_semantic_embeddings
)
def run_local_search(data_dir: str | None,root_dir: str | None,community_level: int,response_type: str,query: str,
):"""Run a local search with the given query."""data_dir, root_dir, config = _configure_paths_and_settings(data_dir, root_dir)data_path = Path(data_dir)final_documents = pd.read_parquet(data_path / "create_final_documents.parquet")final_text_units = pd.read_parquet(data_path / "create_final_text_units.parquet")final_community_reports = pd.read_parquet(data_path / "create_final_community_reports.parquet")final_relationships = pd.read_parquet(data_path / "create_final_relationships.parquet")final_nodes = pd.read_parquet(data_path / "create_final_nodes.parquet")final_entities = pd.read_parquet(data_path / "create_final_entities.parquet")final_covariates_path = data_path / "create_final_covariates.parquet"final_covariates = (pd.read_parquet(final_covariates_path)if final_covariates_path.exists()else None)#不做调整 默认是{}vector_store_args = (config.embeddings.vector_store if config.embeddings.vector_store else {})#获取数据库类型 默认VectorStoreType.LanceDB vector_store_type = vector_store_args.get("type", VectorStoreType.LanceDB)#初始化数据库 默认获取LanceDBdescription_embedding_store = __get_embedding_description_store(vector_store_type=vector_store_type,config_args=vector_store_args,)#获取实例entities = read_indexer_entities(final_nodes, final_entities, community_level)#covariates 默认{}covariates = (read_indexer_covariates(final_covariates)if final_covariates is not Noneelse [])reports = read_indexer_reports(final_community_reports, final_nodes, community_level)text_units = read_indexer_text_units(final_text_units)relationships = read_indexer_relationships(final_relationships)covariates = {"claims": covariates}if vector_store_type == VectorStoreType.Milvus:#自定义实现 将文本块数据存入milvus中store_text_semantic_embeddings(Texts=text_units, vectorstore=description_embedding_store,final_documents=final_documents)else:store_entity_semantic_embeddings(entities=entities, vectorstore=description_embedding_store)search_engine = get_local_search_engine(config,reports=reports,text_units=text_units,entities=entities,relationships=relationships,covariates=covariates,description_embedding_store=description_embedding_store,response_type=response_type,)result = search_engine.search(query=query,method_type=method_type)reporter.success(f"Local Search Response: {result.response}")return result

 然后在graphrag.vector_stores中创建个milvus.py文件,我们实现一个MilvusVectorStore类

 

from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection,utility
from pymilvus import MilvusClient
from tqdm import tqdm
from datetime import datetime
from pymilvus import AnnSearchRequest
from typing import Any
from .base import (BaseVectorStore,VectorStoreDocument,VectorStoreSearchResult,
)
from graphrag.model.types import TextEmbedder
import jsonfrom xinference.client import Client #不是必要
client = Client("http://0.0.0.0:9997")
list_models_run = client.list_models()
model_uid = list_models_run['bge-m3']['id']
embedding_client = client.get_model(model_uid)class MilvusVectorStore(BaseVectorStore):def __init__(self,url:str='0.0.0.0',collection_name:str='data_store',recrate:bool=False,key_word_flag:bool=True):self.key_word_flag = key_word_flagconnections.connect(host=url, port="19530")self.has_collection = utility.has_collection(collection_name) #判断是否存在collectionprint(f'has_collection={self.has_collection}')if recrate and  self.has_collection :s= input(f'Are you sure delete {collection_name}, yes or no \n ')if s =='yes':self.delete_collection(collection_name)print(f'删除{collection_name}成功')if not recrate and self.has_collection: #判断是否存在collection_nameself.collection  = Collection(name=collection_name)else:schema = self.get_schema()self.collection = Collection(name=collection_name, schema=schema)def get_schema(self):id = FieldSchema(name="id", dtype=DataType.INT64,is_primary=True,auto_id=True)  # 主键索引graph_id = FieldSchema(name="graph_id", dtype=DataType.VARCHAR,max_length=128)text = FieldSchema(name="text", dtype=DataType.VARCHAR,max_length=58192)file_name = FieldSchema(name="file_name", dtype=DataType.VARCHAR,max_length=512)text_embedding = FieldSchema(name="text_embedding", dtype=DataType.FLOAT_VECTOR,dim=1024)  # 向量,dim=2代表向量只有两列,自己的数据的话一个向量有多少个元素就多少列#n_tokens = FieldSchema(name="n_tokens", dtype=DataType.INT64)if self.key_word_flag:key_word = FieldSchema(name="key_word", dtype=DataType.VARCHAR, max_length=8192)key_word_embedding = FieldSchema(name="key_word_embedding", dtype=DataType.FLOAT_VECTOR,dim=1024)schema = CollectionSchema(fields=[id,graph_id,text,file_name,text_embedding,key_word,key_word_embedding], description="文本与文本嵌入存储")  # 描述else:schema = CollectionSchema(fields=[id, graph_id,text, file_name, text_embedding],description="文本与文本嵌入存储")  # 描述return schemadef change_collection(self,collection_name):schema = self.get_schema()self.collection  = Collection(name=collection_name,schema=schema)def delete_collection(self,collection_name):utility.drop_collection(collection_name)def release_collection(self):# self.collection.release_collection(collection_name)self.collection.release()def list_collections(self):collections_list = utility.list_collections()return collections_listdef create_index(self,metric_type='L2',index_name='L2'):#utility.drop_collection(collection_name=collection_name)# self.collection = Collection(name=collection_name, schema=schema)index_params = {"index_type": "AUTOINDEX","metric_type":metric_type,"params": {}}self.collection.create_index(field_name="text_embedding",index_params=index_params,index_name= 'text_embedding')if self.key_word_flag:self.collection.create_index(field_name="key_word_embedding",index_params=index_params,index_name='key_word_embedding')self.collection.load() #load_fields=['id',"text_embedding"]def drop_index(self):self.collection.release()self.collection.drop_index()def insert_data(self,data_dict:dict):#text_id_list,text_list,file_name_list,text_embedding_list,key_word_list,key_word_embedding_liststart = datetime.now()self.collection.insert(data_dict)# if self.key_word_flag:#     for id,text,file_name,text_embedding,key_word,key_word_embedding in zip(text_id_list,text_list,file_name_list,text_embedding_list,key_word_list,key_word_embedding_list):#         self.collection.insert([[id],[text],[file_name],[text_embedding],[key_word],[key_word_embedding]])# else:#     for id,text,file_name,text_embedding in zip(text_id_list,text_list,file_name_list,text_embedding_list):#         self.collection.insert([[id],[text],[file_name],[text_embedding]])end = datetime.now()print(f'插入数据消化时间{end-start}')def search(self,query_embedding, top_k=10,metric_type='L2'):search_params = {"metric_type": metric_type,"params": {"level": 2}}results = self.collection.search([query_embedding],anns_field="text_embedding",param=search_params,limit=top_k,output_fields=['graph_id',"text", "file_name",'text_embedding'])[0]return resultsdef hybrid_search(self, query_dense_embedding, query_sparse_embedding, rerank,top_k=10, metric_type='L2'):dense_search_params = {"index_type": "AUTOINDEX","metric_type":metric_type,"params": {}}# dense_req = self.collection.search( [query_dense_embedding],#     anns_field="text_embedding",#     param=dense_search_params,#     limit=top_k,#     output_fields=["text", "file_name"])dense_req = AnnSearchRequest([query_dense_embedding], "text_embedding", dense_search_params, limit=top_k)sparse_search_params = {"index_type": "AUTOINDEX","metric_type":metric_type,"params": {}}# sparse_req = self.collection.search( [query_sparse_embedding],#     anns_field="text_embedding",#     param=sparse_search_params,#     limit=top_k,#     output_fields=["text", "file_name"])sparse_req = AnnSearchRequest([query_sparse_embedding], "key_word_embedding", sparse_search_params, limit=top_k)res = self.collection.hybrid_search([dense_req,sparse_req],rerank=rerank, limit=top_k, output_fields=["text", "file_name"])[0]return resdef reranker_init(self,model_name_or_path,device="cpu"):self.reranker = bge_rf = BGERerankFunction(model_name=model_name_or_path,  # Specify the model name. Defaults to `BAAI/bge-reranker-v2-m3`.device="cpu"  # Specify the device to use, e.g., 'cpu' or 'cuda:0')def rereank(self,query,serach_result,top_k,rerank_client=None):documents_list = [i.entity.get('text') for i in serach_result]#如果外部传入非milvus集成的rerankif rerank_client:response = rerank_client.rerank(query=query,documents=documents_list,top_n=top_k,)rerank_results = response['results']results = []for i in rerank_results:index = i['index']results.append(serach_result[index])h = 1else:results = self.reranker(query=query,documents=documents_list,top_k=top_k,)return resultsdef filter_by_id(self, include_ids: list[str] | list[int]) -> Any:"""Build a query filter to filter documents by id."""if len(include_ids) == 0:self.query_filter = Noneelse:if isinstance(include_ids[0], str):id_filter = ", ".join([f"'{id}'" for id in include_ids])self.query_filter = f"id in ({id_filter})"else:self.query_filter = (f"id in ({', '.join([str(id) for id in include_ids])})")return self.query_filterdef connect(self,url:str='0.0.0.0',collection_name:str='data_store',recrate:bool=False,key_word_flag:bool=False,**kwargs: Any) -> Any:self.key_word_flag = key_word_flagconnections.connect(host=url, port="19530")has_collection = utility.has_collection(collection_name) #判断是否存在collectionif recrate and  has_collection :s= input(f'Are you sure delete {collection_name}, yes or no \n ')if s =='yes':self.delete_collection(collection_name)print(f'删除{collection_name}成功')if not recrate and has_collection: #判断是否存在collection_nameself.collection  = Collection(name=collection_name)else:schema = self.get_schema()self.collection = Collection(name=collection_name, schema=schema)self.create_index()def load_documents(self, documents: list[VectorStoreDocument], overwrite: bool = True) -> None:"""Load documents into vector storage."""documents = [documentfor document in documentsif document.vector is not None]if self.has_collection:s = input(f'Are you want to insert data, yes or no \n ')if s == 'yes':batch = 100documents_len = len(documents)insert_len = int(documents_len / batch) #milvus 一次性不能插入太多数据 所以需要分批次插入data_list = list()start = datetime.now()print(f'插入数据中***')for document in documents:attributes = document.attributesfile_name = attributes.get('document_title')[0]temp_dict = {"graph_id": document.id,"text": document.text,"text_embedding": document.vector,"file_name": file_name,}data_list.append(temp_dict)if len(data_list) >= insert_len:self.collection.insert(data_list)data_list = []if data_list:  # 防止还有数据self.collection.insert(data_list)end = datetime.now()print(f'插入数据消化时间{end-start}')def similarity_search_by_text(self, text: str, text_embedder: TextEmbedder, k: int = 10, **kwargs: Any) -> list[VectorStoreSearchResult]:"""Perform a similarity search using a given input text."""query_embedding = embedding_client.create_embedding([text])['data'][0]['embedding']if query_embedding:search_result = self.similarity_search_by_vector(query_embedding, k)return search_resultreturn []def similarity_search_by_vector(self, query_embedding: list[float], k: int = 10, **kwargs: Any) -> list[VectorStoreSearchResult]:docs = self.search(query_embedding=query_embedding,top_k=k)result = []for doc in docs:file_name = doc.entity.get('file_name')attributes = {'document_title':file_name,'entity':[]}score = abs(float(doc.score))temp = VectorStoreSearchResult(document=VectorStoreDocument(id=doc.entity.get('graph_id'),text=doc.entity.get('text'),vector=doc.entity.get('text_embedding'),attributes=attributes,),score=score,)result.append(temp)# return [#     VectorStoreSearchResult(#         document=VectorStoreDocument(#             id=doc["id"],#             text=doc["text"],#             vector=doc["vector"],#             attributes=json.loads(doc["attributes"]),#         ),#         score=1 - abs(float(doc["_distance"])),#     )#     for doc in docs# ]return resultdef similarity_search_by_query(self, query: str, text_embedder: TextEmbedder, k: int = 10, **kwargs: Any) -> list[VectorStoreSearchResult]:h  =1def similarity_search_by_hybrid(self, query: str, text_embedder: TextEmbedder, k: int = 10,oversample_scaler:int=10, **kwargs: Any) -> list[VectorStoreSearchResult]:h = 1

 修改搜索代码

找到graphrag\query\structured_search\local_search\mixed_context.py文件

或者在graphrag\query\cli.py的run_local_search函数中的get_local_search_engine跳转,

找到get_local_search_engine函数的return中的LocalSearchMixedContext跳转就到了该类的实现代码

定位到build_context函数的map_query_to_entities,进行跳转到函数实现,位于graphrag\query\context_builder\entity_extraction.py中找到

                matched = get_entity_by_key(entities=all_entities, #所有的Entitykey=embedding_vectorstore_key,value=result.document.id,)

修改成

                entity_ids = result.document.attributes.get('entity_ids')if entity_ids:for entity_id in entity_ids:matched = get_entity_by_key(entities=all_entities, #所有的Entitykey=embedding_vectorstore_key,value=entity_id,)if matched:matched_entities.append(matched)

 如果想保留graphrag原本这部分的搜索代码,可以像我这个样子

        for result in search_results:if method_type == 'text_match':entity_ids = result.document.attributes.get('entity_ids')if entity_ids:for entity_id in entity_ids:matched = get_entity_by_key(entities=all_entities, #所有的Entitykey=embedding_vectorstore_key,value=entity_id,)if matched:matched_entities.append(matched)else:matched = get_entity_by_key(entities=all_entities, #所有的Entitykey=embedding_vectorstore_key,value=result.document.id,)if matched:matched_entities.append(matched)

加行参数进行控制或者根据vector_store_type进行控制,最后修改map_query_to_entities函数的return,加上search_results

def map_query_to_entities(query: str,text_embedding_vectorstore: BaseVectorStore,text_embedder: BaseTextEmbedding,all_entities: list[Entity],embedding_vectorstore_key: str = EntityVectorStoreKey.ID,include_entity_names: list[str] | None = None,exclude_entity_names: list[str] | None = None,k: int = 10,oversample_scaler: int = 2,method_type:str|None = None,
) -> list[Entity]:"""Extract entities that match a given query using semantic similarity of text embeddings of query and entity descriptions."""if include_entity_names is None:include_entity_names = []if exclude_entity_names is None:exclude_entity_names = []matched_entities = []if query != "":# get entities with highest semantic similarity to query# oversample to account for excluded entities# 在graphrag文件夹目录的vector_stores目录下的lancedb文件中查看print(f'准备embedding')start_time = datetime.now()#返回的是相似的向量search_results = text_embedding_vectorstore.similarity_search_by_text(text=query,text_embedder=lambda t: text_embedder.embed(t),k=k * oversample_scaler,)end_time = datetime.now()print(f'耗时{end_time-start_time}')for result in search_results:if method_type == 'text_match':entity_ids = result.document.attributes.get('entity_ids')if entity_ids:for entity_id in entity_ids:matched = get_entity_by_key(entities=all_entities, #所有的Entitykey=embedding_vectorstore_key,value=entity_id,)if matched:matched_entities.append(matched)else:matched = get_entity_by_key(entities=all_entities, #所有的Entitykey=embedding_vectorstore_key,value=result.document.id,)if matched:matched_entities.append(matched)else:all_entities.sort(key=lambda x: x.rank if x.rank else 0, reverse=True)matched_entities = all_entities[:k]# filter out excluded entities# 默认exclude_entity_names []if exclude_entity_names:matched_entities = [entityfor entity in matched_entitiesif entity.title not in exclude_entity_names]# add entities in the include_entity listincluded_entities = []#默认include_entity_names []for entity_name in include_entity_names:included_entities.extend(get_entity_by_name(all_entities, entity_name))return included_entities + matched_entities,search_results #原本没有search_results

 不要忘记在graphrag\query\structured_search\local_search\mixed_context.py的build_context函数中修改map_query_to_entities由2个返回值变成了3个

        selected_entities,search_results = map_query_to_entities(query=query,text_embedding_vectorstore=self.entity_text_embeddings,text_embedder=self.text_embedder,all_entities=list(self.entities.values()),embedding_vectorstore_key=self.embedding_vectorstore_key,include_entity_names=include_entity_names,exclude_entity_names=exclude_entity_names,k=top_k_mapped_entities,oversample_scaler=20,#2method_type=method_type)

在build_context函数末尾找到self._build_text_unit_context函数,新加参数传入search_results

        text_unit_context, text_unit_context_data,document_id_context = self._build_text_unit_context(selected_entities=selected_entities,max_tokens=text_unit_tokens,return_candidate_context=return_candidate_context,search_results=search_results,method_type= method_type)

 跳转到该函数的实现位置,仍然在mixed_context.py中,修改或者替换掉下面的代码

            for index, entity in enumerate(selected_entities):if entity.text_unit_ids:for text_id in entity.text_unit_ids:if (text_id not in [unit.id for unit in selected_text_units]and text_id in self.text_units):selected_unit = self.text_units[text_id]num_relationships = count_relationships(selected_unit, entity, self.relationships)if selected_unit.attributes is None:selected_unit.attributes = {}selected_unit.attributes["entity_order"] = indexselected_unit.attributes["num_relationships"] = (num_relationships)selected_text_units.append(selected_unit)# sort selected text units by ascending order of entity order and descending order of number of relationshipsselected_text_units.sort(key=lambda x: (x.attributes["entity_order"],  # type: ignore-x.attributes["num_relationships"],  # type: ignore))for unit in selected_text_units:del unit.attributes["entity_order"]  # type: ignoredel unit.attributes["num_relationships"]  # type: ignore

我的建议还是保留着,反正我是改成了

        if method_type =='text_match':for index, Text in enumerate(search_results):text_id  =Text.document.idif (text_id not in [unit.id for unit in selected_text_units]and text_id in self.text_units):selected_unit = self.text_units[text_id]if selected_unit.attributes is None:selected_unit.attributes = {'documnet_title':Text.document.attributes['document_title']}selected_text_units.append(selected_unit)else:for index, entity in enumerate(selected_entities):if entity.text_unit_ids:for text_id in entity.text_unit_ids:if (text_id not in [unit.id for unit in selected_text_units]and text_id in self.text_units):selected_unit = self.text_units[text_id]num_relationships = count_relationships(selected_unit, entity, self.relationships)if selected_unit.attributes is None:selected_unit.attributes = {}selected_unit.attributes["entity_order"] = indexselected_unit.attributes["num_relationships"] = (num_relationships)selected_text_units.append(selected_unit)# sort selected text units by ascending order of entity order and descending order of number of relationshipsselected_text_units.sort(key=lambda x: (x.attributes["entity_order"],  # type: ignore-x.attributes["num_relationships"],  # type: ignore))for unit in selected_text_units:del unit.attributes["entity_order"]  # type: ignoredel unit.attributes["num_relationships"]  # type: ignore

然后就更换掉了向量数据库了,传入图数据啥的代码量更大 等我有时间再搞

代码量有点大

欢迎大家点赞或收藏~

大家的点赞或收藏可以鼓励作者加快更新~

 

http://www.hkea.cn/news/445687/

相关文章:

  • 迎访问中国建设银行网站_永久免费的电销外呼系统
  • 类似AG网站建设网络营销的十大特点
  • 河北盘古做的网站用的什么服务器品牌策划与推广
  • 做网站开发的是不是程序员品牌营销与推广
  • 安卓android软件seo搜索引擎优化方式
  • 网站设计培训课程引流推广平台
  • 做淘宝美工需要知道的网站app软件推广平台
  • 做自己个人网站搜索竞价
  • 兰州网站优化哪家好手机系统流畅神器
  • 广东深圳住房和城乡建设部网站文章优化软件
  • java制作动态网站开发怎么可以让百度快速收录视频
  • 做网站管理好吗阳泉seo
  • 网站排名优化建设seo人人网
  • html5可以做动态网站惠州seo计费
  • 商城网站带宽控制河南网站建设哪家公司好
  • 贵阳网络公司网站建设网络推广公司深圳
  • 企业网站建设公司电话西安seo分析报告怎么写
  • 岳阳市政府网网站seo优化报告
  • 门头沟网站建设外贸谷歌推广
  • 铜陵市住房和城乡建设委员会网站中国最新疫情最新消息
  • 动态网站建设 教程接广告推广的平台
  • 人力资源和社会保障部是干什么的seo最新快速排名
  • 网站标题关键优化网络营销代运营外包公司
  • 罗山网站建设seo网络推广优化
  • 如何在eclipse上做网站网站链接查询
  • 企业网站如何设计网页直通车推广计划方案
  • 简单的购物网站设计seo网络推广知识
  • 做众筹的网站关键词网站推广
  • 做网站 页面自适应渠道推广
  • 广东企业网站建设策划高端网站设计公司