当前位置: 首页 > news >正文

天津通用网站建设收费市场调查报告

天津通用网站建设收费,市场调查报告,基于html5的购物商城网站,android搭建wordpress一、相关的API的handler1、接收HTTP请求的handler2、往数据节点发送查询请求的action(TransportSearchAction)3、通过transportService把查询请求发送到指定的数据节点 二、数据节点收到请求的处理逻辑1、尝试从缓存中加载查询结果2、不通过缓存查询#xff0c;直接执行查询(1… 一、相关的API的handler1、接收HTTP请求的handler2、往数据节点发送查询请求的action(TransportSearchAction)3、通过transportService把查询请求发送到指定的数据节点 二、数据节点收到请求的处理逻辑1、尝试从缓存中加载查询结果2、不通过缓存查询直接执行查询(1)executeQuery和executeRank两种查询方式(2)、根据搜索上下文在查询之前添加各种查询搜集器(3) 执行查询操作遍历此索引在此数据节点所有的分片 这里只是HTTP发送查询请求到主节点主节点再转发到数据节点数据节点再到调用lucene.search实际查询数据之前如何处理的源码逻辑 一、相关的API的handler 在ActionModule.java registerHandler.accept(new RestSearchAction(restController.getSearchUsageHolder()));actions.register(SearchAction.INSTANCE, TransportSearchAction.class); 1、接收HTTP请求的handler 下面这个类RestSearchAction有长该省略的方法我都已经省略了首先通过routes请求到这个方法的prepareRequest(主要是组装searchRequest)这个方法内部会调用parseSearchSource(主要是组装searchSourceBuilder) public class RestSearchAction extends BaseRestHandler {Overridepublic ListRoute routes() {return List.of(new Route(GET, /_search),new Route(POST, /_search),new Route(GET, /{index}/_search),new Route(POST, /{index}/_search),Route.builder(GET, /{index}/{type}/_search).deprecated(TYPES_DEPRECATION_MESSAGE, RestApiVersion.V_7).build(),Route.builder(POST, /{index}/{type}/_search).deprecated(TYPES_DEPRECATION_MESSAGE, RestApiVersion.V_7).build());}//入口 Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {SearchRequest searchRequest;if (request.hasParam(min_compatible_shard_node)) {searchRequest new SearchRequest(Version.fromString(request.param(min_compatible_shard_node)));} else {searchRequest new SearchRequest();}/** 我们必须拉出对 source.sizesize 的调用因为 _update_by_query 和 _delete_by_query 使用相同的解析路径但在看到 size url 参数时设置了不同的变量* 请注意我们不能使用 searchRequest.sourcesize因为 searchRequest.source 现在是 null。我们不必防止它在 IntConsumer 中为 null因为它以后不能为 null。*///组装SearchRequestIntConsumer setSize size - searchRequest.source().size(size);request.withContentOrSourceParamParserOrNull(parser - parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize, searchUsageHolder));//请求发送return channel - {RestCancellableNodeClient cancelClient new RestCancellableNodeClient(client, request.getHttpChannel());cancelClient.execute(SearchAction.INSTANCE, searchRequest, new RestChunkedToXContentListener(channel));};}//组装searchRequestpublic static void parseSearchRequest(SearchRequest searchRequest,RestRequest request,Nullable XContentParser requestContentParser,NamedWriteableRegistry namedWriteableRegistry,IntConsumer setSize,Nullable SearchUsageHolder searchUsageHolder) throws IOException {//检查请求的 REST API 版本和参数是否兼容并在必要时记录警告日志。if (request.getRestApiVersion() RestApiVersion.V_7 request.hasParam(type)) {request.param(type);deprecationLogger.compatibleCritical(search_with_types, TYPES_DEPRECATION_MESSAGE);}//如果搜索请求的源为空创建一个新的 SearchSourceBuilder 作为源if (searchRequest.source() null) {searchRequest.source(new SearchSourceBuilder());}//将请求中的索引参数解析为一个索引数组并设置到搜索请求中。searchRequest.indices(Strings.splitStringByCommaToArray(request.param(index)));//如果提供了 requestContentParser则解析请求内容if (requestContentParser ! null) {//并根据是否提供了 searchUsageHolder 参数选择解析方式if (searchUsageHolder null) {searchRequest.source().parseXContent(requestContentParser, true);} else {searchRequest.source().parseXContent(requestContentParser, true, searchUsageHolder);}}//设置批量减少大小参数。final int batchedReduceSize request.paramAsInt(batched_reduce_size, searchRequest.getBatchedReduceSize());searchRequest.setBatchedReduceSize(batchedReduceSize);//如果请求中包含了 pre_filter_shard_size预过滤器分片大小 参数则设置搜索请求的 preFilterShardSize。if (request.hasParam(pre_filter_shard_size)) {searchRequest.setPreFilterShardSize(request.paramAsInt(pre_filter_shard_size, SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE));}//如果请求中包含了 enable_fields_emulation 参数则忽略该参数从8.0版本开始已不再使用。if (request.hasParam(enable_fields_emulation)) {// this flag is a no-op from 8.0 on, we only want to consume it so its presence doesnt cause errorsrequest.paramAsBoolean(enable_fields_emulation, false);}//如果请求中包含了 max_concurrent_shard_requests(最大并发分片请求数) 参数则设置搜索请求的 maxConcurrentShardRequests。if (request.hasParam(max_concurrent_shard_requests)) {final int maxConcurrentShardRequests request.paramAsInt(max_concurrent_shard_requests,searchRequest.getMaxConcurrentShardRequests());searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests);}//如果请求中包含了 allow_partial_search_results(允许部分搜索结果) 参数则设置搜索请求的 allowPartialSearchResults。if (request.hasParam(allow_partial_search_results)) {//仅当我们传递了参数以覆盖集群级默认值时才设置searchRequest.allowPartialSearchResults(request.paramAsBoolean(allow_partial_search_results, null));}//设置搜索类型参数。searchRequest.searchType(request.param(search_type));//调用 parseSearchSource 方法解析搜索源。parseSearchSource(searchRequest.source(), request, setSize);//设置请求缓存参数searchRequest.requestCache(request.paramAsBoolean(request_cache, searchRequest.requestCache()));//解析并设置滚动参数。String scroll request.param(scroll);if (scroll ! null) {searchRequest.scroll(new Scroll(parseTimeValue(scroll, null, scroll)));}//设置路由参数。searchRequest.routing(request.param(routing));//设置首选项参数。searchRequest.preference(request.param(preference));//设置索引选项。searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));//验证搜索请求。validateSearchRequest(request, searchRequest);//如果搜索请求中有 pointInTimeBuilder则准备点在时间请求。if (searchRequest.pointInTimeBuilder() ! null) {preparePointInTime(searchRequest, request, namedWriteableRegistry);} else {//否则设置 ccsMinimizeRoundtrips 参数searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean(ccs_minimize_roundtrips, searchRequest.isCcsMinimizeRoundtrips()));}//如果请求中包含了 force_synthetic_source 参数则设置搜索请求的 forceSyntheticSource。if (request.paramAsBoolean(force_synthetic_source, false)) {searchRequest.setForceSyntheticSource(true);}}//组装searchSourceBuilderprivate static void parseSearchSource(final SearchSourceBuilder searchSourceBuilder, RestRequest request, IntConsumer setSize) {//RestRequest对象的URL参数转换为QueryBuilder对象QueryBuilder queryBuilder RestActions.urlParamsToQueryBuilder(request);//并将其设置为SearchSourceBuilder对象的查询条件if (queryBuilder ! null) {searchSourceBuilder.query(queryBuilder);}//如果RestRequest对象包含from参数则将其转换为整数并设置为SearchSourceBuilder对象的from属性if (request.hasParam(from)) {searchSourceBuilder.from(request.paramAsInt(from, 0));}if (request.hasParam(size)) {int size request.paramAsInt(size, SearchService.DEFAULT_SIZE);if (request.getRestApiVersion() RestApiVersion.V_7 size -1) {// we treat -1 as not-set, but deprecate it to be able to later remove this funny extra treatmentdeprecationLogger.compatibleCritical(search-api-size-1,Using search size of -1 is deprecated and will be removed in future versions. Instead, dont use the size parameter if you dont want to set it explicitly.);} else {setSize.accept(size);}}if (request.hasParam(explain)) {searchSourceBuilder.explain(request.paramAsBoolean(explain, null));}if (request.hasParam(version)) {searchSourceBuilder.version(request.paramAsBoolean(version, null));}if (request.hasParam(seq_no_primary_term)) {searchSourceBuilder.seqNoAndPrimaryTerm(request.paramAsBoolean(seq_no_primary_term, null));}if (request.hasParam(timeout)) {searchSourceBuilder.timeout(request.paramAsTime(timeout, null));}if (request.hasParam(terminate_after)) {int terminateAfter request.paramAsInt(terminate_after, SearchContext.DEFAULT_TERMINATE_AFTER);searchSourceBuilder.terminateAfter(terminateAfter);}StoredFieldsContext storedFieldsContext StoredFieldsContext.fromRestRequest(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(),request);if (storedFieldsContext ! null) {searchSourceBuilder.storedFields(storedFieldsContext);}String sDocValueFields request.param(docvalue_fields);if (sDocValueFields ! null) {if (Strings.hasText(sDocValueFields)) {String[] sFields Strings.splitStringByCommaToArray(sDocValueFields);for (String field : sFields) {searchSourceBuilder.docValueField(field, null);}}}FetchSourceContext fetchSourceContext FetchSourceContext.parseFromRestRequest(request);if (fetchSourceContext ! null) {searchSourceBuilder.fetchSource(fetchSourceContext);}if (request.hasParam(track_scores)) {searchSourceBuilder.trackScores(request.paramAsBoolean(track_scores, false));}if (request.hasParam(track_total_hits)) {if (Booleans.isBoolean(request.param(track_total_hits))) {searchSourceBuilder.trackTotalHits(request.paramAsBoolean(track_total_hits, true));} else {searchSourceBuilder.trackTotalHitsUpTo(request.paramAsInt(track_total_hits, SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO));}}String sSorts request.param(sort);if (sSorts ! null) {String[] sorts Strings.splitStringByCommaToArray(sSorts);for (String sort : sorts) {int delimiter sort.lastIndexOf(:);if (delimiter ! -1) {String sortField sort.substring(0, delimiter);String reverse sort.substring(delimiter 1);if (asc.equals(reverse)) {searchSourceBuilder.sort(sortField, SortOrder.ASC);} else if (desc.equals(reverse)) {searchSourceBuilder.sort(sortField, SortOrder.DESC);}} else {searchSourceBuilder.sort(sort);}}}String sStats request.param(stats);if (sStats ! null) {searchSourceBuilder.stats(Arrays.asList(Strings.splitStringByCommaToArray(sStats)));}SuggestBuilder suggestBuilder parseSuggestUrlParameters(request);if (suggestBuilder ! null) {searchSourceBuilder.suggest(suggestBuilder);}} } 2、往数据节点发送查询请求的action(TransportSearchAction) 下面这个TransportSearchAction也有点长主要流程是doExecute-executeLocalSearch-executeSearch-接口SearchPhaseProvider的实现类AsyncSearchActionProvider public class TransportSearchAction extends HandledTransportActionSearchRequest, SearchResponse {//执行方法Overrideprotected void doExecute(Task task, SearchRequest searchRequest, ActionListenerSearchResponse listener) {executeRequest((SearchTask) task, searchRequest, listener, AsyncSearchActionProvider::new);}//主要功能是执行搜索请求并根据不同的情况选择执行本地搜索或远程搜索void executeRequest(SearchTask task,SearchRequest original,ActionListenerSearchResponse listener,FunctionActionListenerSearchResponse, SearchPhaseProvider searchPhaseProvider) {//获取相对开始时间戳和时间提供器final long relativeStartNanos System.nanoTime();final SearchTimeProvider timeProvider new SearchTimeProvider(original.getOrCreateAbsoluteStartMillis(),relativeStartNanos,System::nanoTime);//使用重写监听器对搜索请求进行重写并根据重写后的请求获取搜索上下文和远程集群索引。ActionListenerSearchRequest rewriteListener ActionListener.wrap(rewritten - {//搜索上下文final SearchContextId searchContext;//远程集群索引。final MapString, OriginalIndices remoteClusterIndices;if (ccsCheckCompatibility) {checkCCSVersionCompatibility(rewritten);}if (rewritten.pointInTimeBuilder() ! null) {//则获取搜索上下文的 ID并从中获取索引信息并将结果保存在 remoteClusterIndices 中。searchContext rewritten.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);remoteClusterIndices getIndicesFromSearchContexts(searchContext, rewritten.indicesOptions());} else {//将 searchContext 设置为 null并通过 remoteClusterService.groupIndices(rewritten.indicesOptions(), rewritten.indices()) 方法获取远程集群索引。searchContext null;remoteClusterIndices remoteClusterService.groupIndices(rewritten.indicesOptions(), rewritten.indices());}//从 remoteClusterIndices 中移除 RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY 对应的本地索引并将结果保存在 localIndices 中。OriginalIndices localIndices remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);//获取当前集群状态 clusterState。final ClusterState clusterState clusterService.state();//如果远程集群索引为空则执行本地搜索if (remoteClusterIndices.isEmpty()) {executeLocalSearch(task,timeProvider,rewritten,localIndices,clusterState,SearchResponse.Clusters.EMPTY,searchContext,searchPhaseProvider.apply(listener));} else {//如果远程集群索引不为空则根据是否需要最小化往返次数选择执行远程搜索或本地搜索。//省略目前不涉及到远程集群}}}, listener::onFailure);Rewriteable.rewriteAndFetch(original, searchService.getRewriteContext(timeProvider::absoluteStartMillis), rewriteListener);}void executeLocalSearch(Task task,SearchTimeProvider timeProvider,SearchRequest searchRequest,OriginalIndices localIndices,ClusterState clusterState,SearchResponse.Clusters clusterInfo,SearchContextId searchContext,SearchPhaseProvider searchPhaseProvider) {executeSearch((SearchTask) task,timeProvider,searchRequest,localIndices,Collections.emptyList(),(clusterName, nodeId) - null,clusterState,Collections.emptyMap(),clusterInfo,searchContext,searchPhaseProvider);}private void executeSearch(SearchTask task,SearchTimeProvider timeProvider,SearchRequest searchRequest,OriginalIndices localIndices,ListSearchShardIterator remoteShardIterators,BiFunctionString, String, DiscoveryNode remoteConnections,ClusterState clusterState,MapString, AliasFilter remoteAliasMap,SearchResponse.Clusters clusters,Nullable SearchContextId searchContext,SearchPhaseProvider searchPhaseProvider) {//检查全局集群阻塞状态是否允许读取操作clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);//检查搜索请求中是否定义了allowPartialSearchResults(允许部分搜索结果)参数如果没有则应用集群服务的默认设置。if (searchRequest.allowPartialSearchResults() null) {//默认允许部分搜索结果searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());}final ListSearchShardIterator localShardIterators;final MapString, AliasFilter aliasFilter;final String[] concreteLocalIndices;//根据搜索上下文的存在与否确定本地和远程的索引和别名过滤器。if (searchContext ! null) {assert searchRequest.pointInTimeBuilder() ! null;aliasFilter searchContext.aliasFilter();concreteLocalIndices localIndices null ? new String[0] : localIndices.indices();localShardIterators getLocalLocalShardsIteratorFromPointInTime(clusterState,localIndices,searchRequest.getLocalClusterAlias(),searchContext,searchRequest.pointInTimeBuilder().getKeepAlive(),searchRequest.allowPartialSearchResults());} else {//解析本地索引获取Index对象数组indices。final Index[] indices resolveLocalIndices(localIndices, clusterState, timeProvider);//将indices数组中的每个Index对象的名称提取出来并存储在concreteLocalIndices数组中。concreteLocalIndices Arrays.stream(indices).map(Index::getName).toArray(String[]::new);//解析索引名称表达式获取与搜索请求中的索引相关的索引和别名的集合indicesAndAliases。final SetString indicesAndAliases indexNameExpressionResolver.resolveExpressions(clusterState, searchRequest.indices());//构建索引别名过滤器aliasFilter buildIndexAliasFilters(clusterState, indicesAndAliases, indices);//将remoteAliasMap中的所有映射添加到aliasFilter中aliasFilter.putAll(remoteAliasMap);//取本地分片迭代器localShardIteratorslocalShardIterators getLocalShardsIterator(clusterState,searchRequest,searchRequest.getLocalClusterAlias(),indicesAndAliases,concreteLocalIndices);}//合并创建一个GroupShardsIteratorSearchShardIterator对象并赋值给shardIterators变量。final GroupShardsIteratorSearchShardIterator shardIterators mergeShardsIterators(localShardIterators, remoteShardIterators);//检查shardIterators的大小是否超过了集群设定的分片数量限制如果超过则抛出异常。failIfOverShardCountLimit(clusterService, shardIterators.size());//WaitForCheckpointsbuwei1if (searchRequest.getWaitForCheckpoints().isEmpty() false) {if (remoteShardIterators.isEmpty() false) {throw new IllegalArgumentException(Cannot use wait_for_checkpoints parameter with cross-cluster searches.);} else {validateAndResolveWaitForCheckpoint(clusterState, indexNameExpressionResolver, searchRequest, concreteLocalIndices);}}MapString, Float concreteIndexBoosts resolveIndexBoosts(searchRequest, clusterState);//shardIterators的大小调整搜索类型。adjustSearchType(searchRequest, shardIterators.size() 1);//获取集群的节点信息。final DiscoveryNodes nodes clusterState.nodes();//构建一个连接查询函数connectionLookup用于根据索引和节点名称获取连接对象。BiFunctionString, String, Transport.Connection connectionLookup buildConnectionLookup(searchRequest.getLocalClusterAlias(),nodes::get,remoteConnections,searchTransportService::getConnection);//创建一个异步搜索执行器asyncSearchExecutor用于执行异步搜索。final Executor asyncSearchExecutor asyncSearchExecutor(concreteLocalIndices);//根据条件判断是否需要预过滤搜索分片。final boolean preFilterSearchShards shouldPreFilterSearchShards(clusterState,searchRequest,concreteLocalIndices,localShardIterators.size() remoteShardIterators.size(),defaultPreFilterShardSize);//调用searchPhaseProvider的newSearchPhase方法开始执行搜索阶段//searchPhaseProvider的实现用的是AsyncSearchActionProvidersearchPhaseProvider.newSearchPhase(task,searchRequest,asyncSearchExecutor,shardIterators,timeProvider,connectionLookup,clusterState,Collections.unmodifiableMap(aliasFilter),concreteIndexBoosts,preFilterSearchShards,threadPool,clusters).start();}//一个接口interface SearchPhaseProvider {SearchPhase newSearchPhase(SearchTask task,SearchRequest searchRequest,Executor executor,GroupShardsIteratorSearchShardIterator shardIterators,SearchTimeProvider timeProvider,BiFunctionString, String, Transport.Connection connectionLookup,ClusterState clusterState,MapString, AliasFilter aliasFilter,MapString, Float concreteIndexBoosts,boolean preFilter,ThreadPool threadPool,SearchResponse.Clusters clusters);}//接口SearchPhaseProvider的一个实现类private class AsyncSearchActionProvider implements SearchPhaseProvider {private final ActionListenerSearchResponse listener;AsyncSearchActionProvider(ActionListenerSearchResponse listener) {this.listener listener;}Overridepublic SearchPhase newSearchPhase(SearchTask task,SearchRequest searchRequest,Executor executor,GroupShardsIteratorSearchShardIterator shardIterators,SearchTimeProvider timeProvider,BiFunctionString, String, Transport.Connection connectionLookup,ClusterState clusterState,MapString, AliasFilter aliasFilter,MapString, Float concreteIndexBoosts,boolean preFilter,ThreadPool threadPool,SearchResponse.Clusters clusters) {if (preFilter) {//省略} else {final QueryPhaseResultConsumer queryResultConsumer searchPhaseController.newSearchPhaseResults(executor,circuitBreaker,task::isCancelled,task.getProgressListener(),searchRequest,shardIterators.size(),exc - searchTransportService.cancelSearchTask(task, failed to merge result [ exc.getMessage() ]));//该阶段用于计算分布项频率以实现更准确的评分if (searchRequest.searchType() DFS_QUERY_THEN_FETCH) {//省略} else {//对所有分片执行查询assert searchRequest.searchType() QUERY_THEN_FETCH : searchRequest.searchType();return new SearchQueryThenFetchAsyncAction(logger,searchTransportService,connectionLookup,aliasFilter,concreteIndexBoosts,executor,queryResultConsumer,searchRequest,listener,shardIterators,timeProvider,clusterState,task,clusters);}}}} }其中searchType 有以下几种 public enum SearchType {/*** 与 {link QUERY_THEN_FETCH} 相同但初始散射阶段除外该阶段用于计算分布项频率以实现更准确的评分。*/DFS_QUERY_THEN_FETCH((byte) 0),/** 对所有分片执行查询但仅返回足够的信息而不是文档内容。然后对结果进行排序和排名并基于此* 仅要求相关分片提供实际文档内容。返回的命中数与大小中指定的命中数完全相同因为它们是唯一被提取的命中数。当索引有很多分片不是副本、分片 ID 组时这非常方便。*/QUERY_THEN_FETCH((byte) 1);// 2 used to be DFS_QUERY_AND_FETCH// 3 used to be QUERY_AND_FETCH/** * 默认搜索类型*/public static final SearchType DEFAULT QUERY_THEN_FETCH; }SearchQueryThenFetchAsyncAction的实现如下 class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncActionSearchPhaseResult {private final SearchProgressListener progressListener;// informations to track the best bottom top doc globally.private final int topDocsSize;private final int trackTotalHitsUpTo;private volatile BottomSortValuesCollector bottomSortCollector;SearchQueryThenFetchAsyncAction(final Logger logger,final SearchTransportService searchTransportService,final BiFunctionString, String, Transport.Connection nodeIdToConnection,final MapString, AliasFilter aliasFilter,final MapString, Float concreteIndexBoosts,final Executor executor,final QueryPhaseResultConsumer resultConsumer,final SearchRequest request,final ActionListenerSearchResponse listener,final GroupShardsIteratorSearchShardIterator shardsIts,final TransportSearchAction.SearchTimeProvider timeProvider,ClusterState clusterState,SearchTask task,SearchResponse.Clusters clusters) {super(query,logger,searchTransportService,nodeIdToConnection,aliasFilter,concreteIndexBoosts,executor,request,listener,shardsIts,timeProvider,clusterState,task,resultConsumer,request.getMaxConcurrentShardRequests(),clusters);//省略代码}//父类的performPhaseOnShard方法会调用这个方法protected void executePhaseOnShard(final SearchShardIterator shardIt,final SearchShardTarget shard,final SearchActionListenerSearchPhaseResult listener) {ShardSearchRequest request rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt, listener.requestIndex));getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(), shard.getNodeId()), request, getTask(), listener);}//省略代码 }其中上面的executeSearch方法中searchPhaseProvider.newSearchPhase().start()实际执行的是SearchQueryThenFetchAsyncAction的父类AbstractSearchAsyncAction中的start方法 abstract class AbstractSearchAsyncActionResult extends SearchPhaseResult extends SearchPhase implements SearchPhaseContext {/*** 这是搜索的主要入口点。此方法启动初始阶段的搜索执行。*/public final void start() {if (getNumShards() 0) {//省略return;}executePhase(this);}private void executePhase(SearchPhase phase) {try {phase.run();} catch (Exception e) {if (logger.isDebugEnabled()) {logger.debug(() - format(Failed to execute [%s] while moving to [%s] phase, request, phase.getName()), e);}onPhaseFailure(phase, , e);}}Overridepublic final void run() {//toSkipShardsIts中的每个SearchShardIterator对象调用skip()方法并断言其返回值为true然后调用skipShard()方法for (final SearchShardIterator iterator : toSkipShardsIts) {assert iterator.skip();skipShard(iterator);}//如果shardsIts的大小大于0if (shardsIts.size() 0) {//省略代码//如果请求中允许部分搜索结果为falseif (request.allowPartialSearchResults() false) {//省略代码}//对于shardsIts中的每个索引获取对应的SearchShardIterator对象shardRoutings然后执行performPhaseOnShard()方法。//这里会遍历每一个分片for (int i 0; i shardsIts.size(); i) {final SearchShardIterator shardRoutings shardsIts.get(i);assert shardRoutings.skip() false;assert shardIndexMap.containsKey(shardRoutings);int shardIndex shardIndexMap.get(shardRoutings);performPhaseOnShard(shardIndex, shardRoutings, shardRoutings.nextOrNull());}}}protected void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) { if (shard null) {//该分片未分配给任何节点会触发onShardFailure方法处理该情况assert assertExecuteOnStartThread();SearchShardTarget unassignedShard new SearchShardTarget(null, shardIt.shardId(), shardIt.getClusterAlias());onShardFailure(shardIndex, unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId()));} else {//创建一个Runnable对象并执行executePhaseOnShard方法来在分片上执行搜索操作。final PendingExecutions pendingExecutions throttleConcurrentRequests? pendingExecutionsPerNode.computeIfAbsent(shard.getNodeId(), n - new PendingExecutions(maxConcurrentRequestsPerNode)): null;Runnable r () - {final Thread thread Thread.currentThread();try {executePhaseOnShard(shardIt, shard, new SearchActionListenerResult(shard, shardIndex) {//定义了一个SearchActionListener的匿名子类用于处理搜索操作的响应。Overridepublic void innerOnResponse(Result result) {try {在响应成功时会调用onShardResult方法处理搜索结果onShardResult(result, shardIt);} catch (Exception exc) {//在响应失败时会调用onShardFailure方法处理错误情况onShardFailure(shardIndex, shard, shardIt, exc);} finally {//无论成功还是失败最后都会调用executeNext方法执行下一个操作。executeNext(pendingExecutions, thread);}}Overridepublic void onFailure(Exception t) {try {onShardFailure(shardIndex, shard, shardIt, t);} finally {executeNext(pendingExecutions, thread);}}});} catch (final Exception e) {try {fork(() - onShardFailure(shardIndex, shard, shardIt, e));} finally {executeNext(pendingExecutions, thread);}}};//如果throttleConcurrentRequests为true则会使用pendingExecutions对象来限制并发请求的数量。否则直接执行r.run()方法。if (throttleConcurrentRequests) {pendingExecutions.tryRun(r);} else {r.run();}} }/*** 这个抽象方法由子类SearchQueryThenFetchAsyncAction实现* 将请求发送到实际分片。*/protected abstract void executePhaseOnShard(SearchShardIterator shardIt,SearchShardTarget shard,SearchActionListenerResult listener); }3、通过transportService把查询请求发送到指定的数据节点 在SearchTransportService类中 public static void registerRequestHandler(TransportService transportService, SearchService searchService) {transportService.registerRequestHandler(QUERY_ACTION_NAME,ThreadPool.Names.SAME,ShardSearchRequest::new,(request, channel, task) - searchService.executeQueryPhase(request,(SearchShardTask) task,new ChannelActionListener(channel))); }public void sendExecuteQuery(Transport.Connection connection,final ShardSearchRequest request,SearchTask task,final SearchActionListener? super SearchPhaseResult listener) {// we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request// this used to be the QUERY_AND_FETCH which doesnt exist anymore.//我们对此进行了优化如果我们在搜索请求中只有一个分片则期望 QueryFetchSearchResult这曾经是不再存在的QUERY_AND_FETCH。final boolean fetchDocuments request.numberOfShards() 1 (request.source() null || request.source().rankBuilder() null);Writeable.ReaderSearchPhaseResult reader fetchDocuments ? QueryFetchSearchResult::new : in - new QuerySearchResult(in, true);final ActionListener? super SearchPhaseResult handler responseWrapper.apply(connection, listener);//上面根据QUERY_ACTION_NAME注册的实际调用的是 searchService.executeQueryPhasetransportService.sendChildRequest(connection,QUERY_ACTION_NAME,request,task,new ConnectionCountingHandler(handler, reader, clientConnections, connection.getNode().getId()));}二、数据节点收到请求的处理逻辑 public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListenerSearchPhaseResult listener) {assert request.canReturnNullResponseIfMatchNoDocs() false || request.numberOfShards() 1: empty responses require more than one shard;//根据request对象获取一个IndexShard对象final IndexShard shard getShard(request);//调用rewriteAndFetchShardRequest方法对shard和request进行重写和获取请求。rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, orig) - {// check if we can shortcut the query phase entirely.//检查我们是否可以完全缩短查询阶段if (orig.canReturnNullResponseIfMatchNoDocs()) {assert orig.scroll() null;final CanMatchShardResponse canMatchResp;try {//创建一个ShardSearchRequest对象的副本clone并调用canMatch方法进行匹配检查。ShardSearchRequest clone new ShardSearchRequest(orig);canMatchResp canMatch(clone, false);} catch (Exception exc) {l.onFailure(exc);return;}if (canMatchResp.canMatch() false) {l.onResponse(QuerySearchResult.nullInstance());return;}}//其中会执行executeQueryPhase方法的递归调用。ensureAfterSeqNoRefreshed(shard, orig, () - executeQueryPhase(orig, task), l);}));}/** 返回的 {link SearchPhaseResult} 的引用计数将通过此方法递增。调用方有责任确保在不再需要对象时正确递减引用计数。*/private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception {//创建或获取ReaderContext对象。final ReaderContext readerContext createOrGetReaderContext(request);try (//创建SearchContext对象并设置相关参数。Releasable scope tracer.withScope(task);Releasable ignored readerContext.markAsUsed(getKeepAlive(request));SearchContext context createContext(readerContext, request, task, ResultsType.QUERY, true)) {//开始跟踪执行查询阶段。tracer.startTrace(executeQueryPhase, Map.of());final long afterQueryTime;//使用SearchOperationListenerExecutor执行加载或执行查询阶段的操作。try (SearchOperationListenerExecutor executor new SearchOperationListenerExecutor(context)) {loadOrExecuteQueryPhase(request, context);//检查查询结果是否具有搜索上下文并根据需要释放ReaderContext对象。if (context.queryResult().hasSearchContext() false readerContext.singleSession()) {freeReaderContext(readerContext.id());}afterQueryTime executor.success();} finally {//停止跟踪执行查询阶段。tracer.stopTrace();}//根据条件判断是否需要执行提取阶段if (request.numberOfShards() 1 (request.source() null || request.source().rankBuilder() null)) {//我们已经有了查询结果但我们可以同时运行 fetchcontext.addFetchResult();//如果需要执行提取阶段则将提取结果添加到SearchContext对象并调用executeFetchPhase方法执行提取阶段。return executeFetchPhase(readerContext, context, afterQueryTime);} else {//将RescoreDocIds对象传递给queryResult并返回context.queryResult()。//将 rescoreDocIds 传递给 queryResult以将它们发送到协调节点并在提取阶段接收它们。我们还将 rescoreDocIds 传递给 LegacyReaderContext以防搜索状态需要保留在数据节点中final RescoreDocIds rescoreDocIds context.rescoreDocIds();context.queryResult().setRescoreDocIds(rescoreDocIds);readerContext.setRescoreDocIds(rescoreDocIds);context.queryResult().incRef();return context.queryResult();}} catch (Exception e) {// execution exception can happen while loading the cache, strip itif (e instanceof ExecutionException) {e (e.getCause() null || e.getCause() instanceof Exception)? (Exception) e.getCause(): new ElasticsearchException(e.getCause());}logger.trace(Query phase failed, e);processFailure(readerContext, e);throw e;}} 其中 /*** 如果调用方可以处理 null 响应 {link QuerySearchResultnullInstance}* 则返回 true。默认值为 false因为协调器节点至少需要一个分片响应来构建全局响应。*/public boolean canReturnNullResponseIfMatchNoDocs() {return canReturnNullResponseIfMatchNoDocs;}1、尝试从缓存中加载查询结果 /*** 如果无法使用缓存请尝试从缓存加载查询结果或直接执行查询阶段。*/private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {final boolean canCache IndicesService.canCache(request, context);context.getSearchExecutionContext().freezeContext();if (canCache) {indicesService.loadIntoContext(request, context);} else {QueryPhase.execute(context);}}/*** 加载缓存结果根据需要通过执行查询阶段进行计算否则将缓存值反序列化为 {link SearchContextqueryResult 上下文的查询结果}。* load compute 的组合允许进行单个加载操作这将导致具有相同密钥的其他请求等待直到其加载并重用相同的缓存。*/public void loadIntoContext(ShardSearchRequest request, SearchContext context) throws Exception {assert canCache(request, context);//context 的搜索器searcher的目录阅读器DirectoryReaderfinal DirectoryReader directoryReader context.searcher().getDirectoryReader();//创建了一个布尔类型的数组 loadedFromCache并将其初始值设为 trueboolean[] loadedFromCache new boolean[] { true };//代码通过调用 request 的 cacheKey 方法生成一个缓存键cacheKey并使用该键将一些结果缓存到分片级别cacheShardLevelResult。BytesReference cacheKey request.cacheKey(requestCacheKeyDifferentiator);BytesReference bytesReference cacheShardLevelResult(context.indexShard(),context.getSearchExecutionContext().mappingCacheKey(),directoryReader,cacheKey,//代码执行了一些查询操作QueryPhase.execute并将查询结果写入到输出流out中同时将 loadedFromCache 的值设为 false。out - {QueryPhase.execute(context);context.queryResult().writeToNoId(out);loadedFromCache[0] false;});//loadedFromCache 的值如果为 true则表示结果已从缓存加载。//在这种情况下代码将缓存的查询结果恢复到上下文中并设置一些其他属性。if (loadedFromCache[0]) {// restore the cached query result into the contextfinal QuerySearchResult result context.queryResult();StreamInput in new NamedWriteableAwareStreamInput(bytesReference.streamInput(), namedWriteableRegistry);result.readFromWithId(context.id(), in);result.setSearchShardTarget(context.shardTarget());} else if (context.queryResult().searchTimedOut()) {//上下文的查询结果超时searchTimedOut则代码会执行一些操作来使缓存无效//这样做的原因是如果缓存了一个超时的查询结果不能简单地抛出异常来通知外部世界因为如果有多个请求等待计算缓存条目它们都会失败并抛出相同的异常。//相反代码会使缓存结果无效并返回超时结果给其他使用相同缓存键的搜索。同时在导致超时的线程中使结果无效。indicesRequestCache.invalidate(new IndexShardCacheEntity(context.indexShard()),context.getSearchExecutionContext().mappingCacheKey(),directoryReader,cacheKey);//如果启用了日志跟踪logger.isTraceEnabled()代码还会记录一条日志说明查询超时并且缓存条目被无效。if (logger.isTraceEnabled()) {logger.trace(Query timed out, invalidating cache entry for request on shard [{}]:\n {},request.shardId(),request.source());}}}/*** 缓存在分片级别计算的内容*/private BytesReference cacheShardLevelResult(IndexShard shard,//shard索引分片对象MappingLookup.CacheKey mappingCacheKey,//mappingCacheKey映射缓存键DirectoryReader reader, //reader目录阅读器对象BytesReference cacheKey, //cacheKey缓存键CheckedConsumerStreamOutput, IOException loader //一个带有StreamOutput参数的回调函数用于加载数据) throws Exception {//创建一个IndexShardCacheEntity对象用于表示索引分片的缓存实体IndexShardCacheEntity cacheEntity new IndexShardCacheEntity(shard);//创建一个CheckedSupplier对象用于生成缓存数据。CheckedSupplierBytesReference, IOException supplier () - {//这个对象内部使用BytesStreamOutput它允许指定预期的字节大小// 但默认使用16k作为页面大小。为了避免对小查询结果浪费太多内存将预期大小设置为512字节。final int expectedSizeInBytes 512;//在BytesStreamOutput中执行loader回调函数将数据写入输出流中try (BytesStreamOutput out new BytesStreamOutput(expectedSizeInBytes)) {loader.accept(out);//将输出流的字节表示返回作为缓存数据。return out.bytes();}};//通过调用indicesRequestCache.getOrCompute方法使用缓存实体、缓存数据生成器、映射缓存键、目录阅读器和缓存键作为参数获取或计算缓存数据return indicesRequestCache.getOrCompute(cacheEntity, supplier, mappingCacheKey, reader, cacheKey);}这里要知道supplier内部会执行loader.accept(out);而传过来的loader是如下 out - {QueryPhase.execute(context);context.queryResult().writeToNoId(out);loadedFromCache[0] false;}其实意味着如果执行了loader说明缓存中没有而是直接查询的继续往下 BytesReference getOrCompute(CacheEntity cacheEntity,CheckedSupplierBytesReference, IOException loader,MappingLookup.CacheKey mappingCacheKey,DirectoryReader reader,BytesReference cacheKey) throws Exception {final ESCacheHelper cacheHelper ElasticsearchDirectoryReader.getESReaderCacheHelper(reader);assert cacheHelper ! null;final Key key new Key(cacheEntity, mappingCacheKey, cacheHelper.getKey(), cacheKey);Loader cacheLoader new Loader(cacheEntity, loader);BytesReference value cache.computeIfAbsent(key, cacheLoader);if (cacheLoader.isLoaded()) {key.entity.onMiss();//看看这是否是我们第一次看到这个读取器并确保注册一个清理密钥CleanupKey cleanupKey new CleanupKey(cacheEntity, cacheHelper.getKey());if (registeredClosedListeners.containsKey(cleanupKey) false) {Boolean previous registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE);if (previous null) {cacheHelper.addClosedListener(cleanupKey);}}} else {key.entity.onHit();}return value;}/***如果指定的键尚未与值关联或映射到 null则尝试使用给定的映射函数计算其值并将其输入到此映射中除非为 null。给定键的 load 方法最多调用一次。*在同一键上同时使用不同的 {link CacheLoader} 实现可能会导致仅调用第一个加载器函数* 而第二个加载器函数将返回第一个加载器函数提供的结果包括执行第一个加载器函数期间引发的任何异常*/public V computeIfAbsent(K key, CacheLoaderK, V loader) throws ExecutionException {//首先获取当前时间戳。long now now();// we have to eagerly evict expired entries or our putIfAbsent call below will fail//尝试从缓存中获取与给定键关联的值如果值已过期则会在获取前将其删除。V value get(key, now, true);if (value null) {//我们需要同步加载给定键的值;但是在调用 load 时按住段锁可能会导致由于依赖键加载而对另一个线程进行死锁;// 因此我们需要一种机制来确保最多调用一次 load但我们不会在按住段锁时调用 load;// 为此我们原子地在映射中放置一个可以加载值的 future然后在赢得竞赛的线程上从这个 future 中获取值以将 future 放入 segment map 中//首先获取与给定键关联的缓存段CacheSegment。CacheSegment segment getCacheSegment(key);CompletableFutureEntryK, V future;//创建一个CompletableFuture对象用于在加载完成后获取值。CompletableFutureEntryK, V completableFuture new CompletableFuture();//使用段锁将键和CompletableFuture对象放入段的映射Map中。try (ReleasableLock ignored segment.writeLock.acquire()) {if (segment.map null) {segment.map new HashMap();}future segment.map.putIfAbsent(key, completableFuture);}BiFunction? super EntryK, V, Throwable, ? extends V handler (ok, ex) - {//如果ok不为空if (ok ! null) {promote(ok, now);return ok.value;} else {//如果ok为空获取一个写锁 (segment.writeLock.acquire())并使用try-with-resources语句来确保锁被释放。try (ReleasableLock ignored segment.writeLock.acquire()) {//检查segment.map是否为空如果不为空则尝试从segment.map中获取与key对应的CompletableFutureEntryK, V对象CompletableFutureEntryK, V sanity segment.map null ? null : segment.map.get(key);if (sanity ! null sanity.isCompletedExceptionally()) {//如果sanity不为空且已经完成异常则从segment.map中移除key。segment.map.remove(key);if (segment.map.isEmpty()) {//如果segment.map为空则将其赋值为null。segment.map null;}}}return null;}};CompletableFutureV completableValue;//如果该键之前不存在映射则说明当前线程赢得了竞争需要执行加载操作。if (future null) {future completableFuture;completableValue future.handle(handler);V loaded;//调用加载器的load方法加载值并将其放入CompletableFuture对象中。try {loaded loader.load(key);} catch (Exception e) {future.completeExceptionally(e);throw new ExecutionException(e);}if (loaded null) {NullPointerException npe new NullPointerException(loader returned a null value);future.completeExceptionally(npe);throw new ExecutionException(npe);} else {//将加载的值包装成一个Entry对象并完成CompletableFuture对象。future.complete(new Entry(key, loaded, now));}} else {//说明该键存在映射直接调用completableValue future.handle(handler);}//通过completableValue.get()获取加载完成的值try {value completableValue.get();// check to ensure the future hasnt been completed with an exceptionif (future.isCompletedExceptionally()) {future.get(); // call get to force the exception to be thrown for other concurrent callersthrow new IllegalStateException(the future was completed exceptionally but no exception was thrown);}} catch (InterruptedException e) {throw new IllegalStateException(e);}}return value;}这里面在从缓存中没有得到指定的CacheSegment 则会调用loader.load(key) 里面实际调用的是QueryPhase.execute(context); 最后放入到缓存中再从completableValue 把得到的数据当方法结果返回 2、不通过缓存查询直接执行查询 这里就看一下QueryPhase.execute(context);的实现源码 /*** //搜索请求的查询阶段用于运行查询并从每个分片中获取有关匹配文档的信息*/ public class QueryPhase {if (searchContext.rankShardContext() null) {executeQuery(searchContext);} else {executeRank(searchContext);}}(1)executeQuery和executeRank两种查询方式 static void executeRank(SearchContext searchContext) throws QueryPhaseExecutionException {//获取排名的上下文信息和查询结果信息RankShardContext rankShardContext searchContext.rankShardContext();QuerySearchResult querySearchResult searchContext.queryResult();//然后根据条件判断是否需要执行组合布尔查询以获取总命中数或聚合结果,if (searchContext.trackTotalHitsUpTo() ! TRACK_TOTAL_HITS_DISABLED || searchContext.aggregations() ! null) {//需要的话则size0再执行executeQuery,来获取总命中数和聚合结果searchContext.size(0);QueryPhase.executeQuery(searchContext);} else {//将查询结果的topDocs设置为空即命中文档为空。searchContext.queryResult().topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),new DocValueFormat[0]);}ListTopDocs rrfRankResults new ArrayList();boolean searchTimedOut querySearchResult.searchTimedOut();long serviceTimeEWMA querySearchResult.serviceTimeEWMA();int nodeQueueSize querySearchResult.nodeQueueSize();//迭代rankShardContext.queries()中的每个排名查询来执行排名操作for (Query rankQuery : rankShardContext.queries()) {//如果搜索超时将中断排名操作并返回部分结果if (searchTimedOut) {break;}//对于每个排名查询创建一个RankSearchContext对象RankSearchContext rankSearchContext new RankSearchContext(searchContext, rankQuery, rankShardContext.windowSize());//并添加收集器和搜索操作QueryPhase.addCollectorsAndSearch(rankSearchContext);//然后将查询结果添加到rrfRankResults列表中并更新服务时间、节点队列大小和搜索超时的状态。QuerySearchResult rrfQuerySearchResult rankSearchContext.queryResult();rrfRankResults.add(rrfQuerySearchResult.topDocs().topDocs);serviceTimeEWMA rrfQuerySearchResult.serviceTimeEWMA();nodeQueueSize Math.max(nodeQueueSize, rrfQuerySearchResult.nodeQueueSize());searchTimedOut rrfQuerySearchResult.searchTimedOut();}//将排名结果通过rankShardContext.combine方法进行合并并将相关的值记录到querySearchResult中querySearchResult.setRankShardResult(rankShardContext.combine(rrfRankResults));//包括搜索超时状态、服务时间和节点队列大小。// record values relevant to all queriesquerySearchResult.searchTimedOut(searchTimedOut);querySearchResult.serviceTimeEWMA(serviceTimeEWMA);querySearchResult.nodeQueueSize(nodeQueueSize);}static void executeQuery(SearchContext searchContext) throws QueryPhaseExecutionException {//检查searchContext是否只有建议suggest操作如果是就执行建议阶段的操作并返回一个空的查询结果if (searchContext.hasOnlySuggest()) {SuggestPhase.execute(searchContext);searchContext.queryResult().topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),new DocValueFormat[0]);return;}if (LOGGER.isTraceEnabled()) {LOGGER.trace({}, new SearchContextSourcePrinter(searchContext));}// Pre-process aggregations as late as possible. In the case of a DFS_Q_T_F// request, preProcess is called on the DFS phase, this is why we pre-process them// here to make sure it happens during the QUERY phase//聚合aggregation进行预处理操作AggregationPhase.preProcess(searchContext);//添加收集器collectors并执行搜索操作addCollectorsAndSearch(searchContext);//执行重新评分rescore阶段的操作RescorePhase.execute(searchContext);//再次执行建议阶段的操作。SuggestPhase.execute(searchContext);//执行聚合阶段的操作。AggregationPhase.execute(searchContext);//如果searchContext中包含性能分析器profiler则对查询阶段的性能结果进行分析。if (searchContext.getProfilers() ! null) {searchContext.queryResult().profileResults(searchContext.getProfilers().buildQueryPhaseResults());}}这两种最后还是要调用 QueryPhase.addCollectorsAndSearch进行查询只是executeRank 会多一层判断执行两遍addCollectorsAndSearch (2)、根据搜索上下文在查询之前添加各种查询搜集器 static void addCollectorsAndSearch(SearchContext searchContext) throws QueryPhaseExecutionException {//获取搜索器和索引阅读器对象。final ContextIndexSearcher searcher searchContext.searcher();final IndexReader reader searcher.getIndexReader();QuerySearchResult queryResult searchContext.queryResult();//设置查询结果的超时状态queryResult.searchTimedOut(false);try {//起始位置和大小。queryResult.from(searchContext.from());queryResult.size(searchContext.size());//重写查询并通过断言确认查询已经重写。Query query searchContext.rewrittenQuery();assert query searcher.rewrite(query); // already rewritten//如果是滚动查询final ScrollContext scrollContext searchContext.scrollContext();if (scrollContext ! null) {//如果是第一轮滚动查询不做任何优化if (scrollContext.totalHits null) {// first roundassert scrollContext.lastEmittedDoc null;// there is not much that we can optimize here since we want to collect all// documents in order to get the total number of hits//我们在这里可以优化的不多因为我们想收集所有文档以获得总点击数} else {//如果不是第一轮滚动查询根据排序条件判断是否可以提前终止查询并构建新的查询对象。final ScoreDoc after scrollContext.lastEmittedDoc;if (canEarlyTerminate(reader, searchContext.sort())) {// now this gets interesting: since the search sort is a prefix of the index sort, we can directly// skip to the desired doc//由于搜索排序是索引排序的前缀我们可以直接跳到所需的文档if (after ! null) {query new BooleanQuery.Builder().add(query, BooleanClause.Occur.MUST).add(new SearchAfterSortedDocQuery(searchContext.sort().sort, (FieldDoc) after), BooleanClause.Occur.FILTER).build();}}}}//创建顶部文档收集器。// create the top docs collector last when the other collectors are knownfinal TopDocsCollectorManagerFactory topDocsFactory createTopDocsCollectorFactory(searchContext,searchContext.parsedPostFilter() ! null || searchContext.minimumScore() ! null);CollectorManagerCollector, Void collectorManager wrapWithProfilerCollectorManagerIfNeeded(searchContext.getProfilers(),topDocsFactory.collectorManager(),topDocsFactory.profilerName);//根据条件添加收集器//如果设置了terminate_after参数添加一个用于终止查询的收集器。if (searchContext.terminateAfter() ! SearchContext.DEFAULT_TERMINATE_AFTER) {// add terminate_after before the filter collectors// it will only be applied on documents accepted by these filter collectorsTerminateAfterCollector terminateAfterCollector new TerminateAfterCollector(searchContext.terminateAfter());final Collector collector collectorManager.newCollector();collectorManager wrapWithProfilerCollectorManagerIfNeeded(searchContext.getProfilers(),new SingleThreadCollectorManager(MultiCollector.wrap(terminateAfterCollector, collector)),REASON_SEARCH_TERMINATE_AFTER_COUNT,collector);}//如果存在后置过滤器添加一个用于过滤结果的收集器。if (searchContext.parsedPostFilter() ! null) {// add post filters before aggregations// it will only be applied to top hitsfinal Weight filterWeight searcher.createWeight(searcher.rewrite(searchContext.parsedPostFilter().query()),ScoreMode.COMPLETE_NO_SCORES,1f);final Collector collector collectorManager.newCollector();collectorManager wrapWithProfilerCollectorManagerIfNeeded(searchContext.getProfilers(),new SingleThreadCollectorManager(new FilteredCollector(collector, filterWeight)),REASON_SEARCH_POST_FILTER,collector);}//如果存在聚合操作添加一个用于聚合的收集器。if (searchContext.aggregations() ! null) {final Collector collector collectorManager.newCollector();final Collector aggsCollector searchContext.aggregations().getAggsCollectorManager().newCollector();collectorManager wrapWithProfilerCollectorManagerIfNeeded(searchContext.getProfilers(),new SingleThreadCollectorManager(MultiCollector.wrap(collector, aggsCollector)),REASON_SEARCH_MULTI,collector,aggsCollector);}//如果设置了最小分数添加一个用于过滤低分结果的收集器。if (searchContext.minimumScore() ! null) {final Collector collector collectorManager.newCollector();// apply the minimum score after multi collector so we filter aggs as wellcollectorManager wrapWithProfilerCollectorManagerIfNeeded(searchContext.getProfilers(),new SingleThreadCollectorManager(new MinimumScoreCollector(collector, searchContext.minimumScore())),REASON_SEARCH_MIN_SCORE,collector);}//根据超时设置添加查询超时检查的任务。final Runnable timeoutRunnable getTimeoutCheck(searchContext);if (timeoutRunnable ! null) {searcher.addQueryCancellation(timeoutRunnable);}try {//使用收集器管理器执行查询并更新查询结果。searchWithCollectorManager(searchContext, searcher, query, collectorManager, timeoutRunnable ! null);queryResult.topDocs(topDocsFactory.topDocsAndMaxScore(), topDocsFactory.sortValueFormats);//获取线程池执行器对象并根据类型更新查询结果的节点队列大小和服务时间指数加权移动平均值。ExecutorService executor searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor|| (executor instanceof EsThreadPoolExecutor false /* in case thread pool is mocked out in tests */): SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type executor.getClass();if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) {queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());}} finally {// Search phase has finished, no longer need to check for timeout// otherwise aggregation phase might get cancelled.//取消查询超时检查的任务if (timeoutRunnable ! null) {searcher.removeQueryCancellation(timeoutRunnable);}}} catch (Exception e) {//并处理异常情况。throw new QueryPhaseExecutionException(searchContext.shardTarget(), Failed to execute main query, e);}}会通过searchWithCollectorManager 来执行查询 private static void searchWithCollectorManager(SearchContext searchContext,ContextIndexSearcher searcher,Query query,CollectorManagerCollector, Void collectorManager,boolean timeoutSet) throws IOException {//如果profilers不为null则获取当前查询的分析器并将collectorManager设置为InternalProfileCollectorManager的getCollectorTree方法。if (searchContext.getProfilers() ! null) {searchContext.getProfilers().getCurrentQueryProfiler().setCollectorManager(((InternalProfileCollectorManager) collectorManager)::getCollectorTree);}//获取searchContext的查询结果对象queryResult。QuerySearchResult queryResult searchContext.queryResult();try {//使用searcher和collectorManager执行查询操作。searcher.search(query, collectorManager);} catch (TerminateAfterCollector.EarlyTerminationException e) {//如果查询被TerminateAfterCollector.EarlyTerminationException异常提前终止则将queryResult的terminatedEarly属性设置为true。queryResult.terminatedEarly(true);} catch (TimeExceededException e) {assert timeoutSet : TimeExceededException thrown even though timeout wasnt set;//如果查询超时且timeoutSet为true则检查searchContext的request是否允许部分搜索结果。if (searchContext.request().allowPartialSearchResults() false) {//如果不允许部分搜索结果则抛出QueryPhaseExecutionException异常指示查询超时。// Cant rethrow TimeExceededException because not serializablethrow new QueryPhaseExecutionException(searchContext.shardTarget(), Time exceeded);}//如果允许部分搜索结果则将queryResult的searchTimedOut属性设置为true。queryResult.searchTimedOut(true);}//如果searchContext的terminateAfter属性不等于SearchContext.DEFAULT_TERMINATE_AFTER且queryResult的terminatedEarly属性为null则将queryResult的terminatedEarly属性设置为false。if (searchContext.terminateAfter() ! SearchContext.DEFAULT_TERMINATE_AFTER queryResult.terminatedEarly() null) {queryResult.terminatedEarly(false);}}(3) 执行查询操作遍历此索引在此数据节点所有的分片 Overridepublic C extends Collector, T T search(Query query, CollectorManagerC, T collectorManager) throws IOException {//通过collectorManager创建一个收集器(Collector)的实例firstCollector。final C firstCollector collectorManager.newCollector();// Take advantage of the few extra rewrite rules of ConstantScoreQuery when score are not needed.//根据firstCollector的评分模式(scoreMode)判断是否需要评分,如果需要评分则使用rewrite方法对查询进行重写如果不需要评分则使用ConstantScoreQuery对查询进行重写。query firstCollector.scoreMode().needsScores() ? rewrite(query) : rewrite(new ConstantScoreQuery(query));//根据重写后的查询(query)、评分模式(scoreMode)和权重(Weight)创建一个权重对象(weight)。final Weight weight createWeight(query, firstCollector.scoreMode(), 1);//调用search方法传入权重对象、收集器管理器和第一个收集器执行搜索操作并返回结果。return search(weight, collectorManager, firstCollector);}/**每个元素表示一个分片的信息*LeafSlice的数量就代表了索引的分片数量。每个LeafSlice对象代表了一个分片的信息和上下文。* 如果一个索引在这个数据节点有5个分片则这个的长度为5*/private final LeafSlice[] leafSlices;/* *类似于 lucene 实现但它会等待所有线程 fisinsh 然后返回即使抛出错误也是如此。 在这种情况下将忽略其他异常并在所有线程完成后引发第一个异常 */private C extends Collector, T T search(Weight weight, CollectorManagerC, T collectorManager, C firstCollector) throws IOException {//如果queueSizeBasedExecutor为空或者leafSlices的长度小于等于1//LeafSlice关键字解释是IndexSearcher用来管理和表示索引搜索分片的类,如果小于等于1则此数据节点只有一个分片if (queueSizeBasedExecutor null || leafSlices.length 1) {//那么直接在leafContexts上执行搜索操作search(leafContexts, weight, firstCollector);//并通过collectorManager.reduce方法将结果收集起来返回。return collectorManager.reduce(Collections.singletonList(firstCollector));} else {//根据leafSlices的长度创建多个收集器final ListC collectors new ArrayList(leafSlices.length);collectors.add(firstCollector);final ScoreMode scoreMode firstCollector.scoreMode();//并使用collectorManager创建新的收集器。for (int i 1; i leafSlices.length; i) {final C collector collectorManager.newCollector();collectors.add(collector);if (scoreMode ! collector.scoreMode()) {throw new IllegalStateException(CollectorManager does not always produce collectors with the same score mode);}}//创建一个FutureTask列表listTasks用于异步执行搜索操作final ListFutureTaskC listTasks new ArrayList();//遍历leafSlices对每个leafSlices创建一个FutureTask并将其添加到listTasks中。for (int i 0; i leafSlices.length; i) {final LeafReaderContext[] leaves leafSlices[i].leaves;final C collector collectors.get(i);FutureTaskC task new FutureTask(() - {search(Arrays.asList(leaves), weight, collector);return collector;});listTasks.add(task);}//使用queueSizeBasedExecutor的invokeAll方法执行所有的listTasks等待它们的执行完成。queueSizeBasedExecutor.invokeAll(listTasks);RuntimeException exception null;final ListC collectedCollectors new ArrayList();//遍历listTasks获取每个任务的结果并将其添加到collectedCollectors列表中。for (FutureC future : listTasks) {try {collectedCollectors.add(future.get());// TODO: when there is an exception and we dont want partial results, it would be great// to cancel the queries / threads} catch (InterruptedException e) {//省略代码} catch (ExecutionException e) {//省略代码}}//通过collectorManager.reduce方法将所有收集器的结果进行组合并返回最终结果。return collectorManager.reduce(collectedCollectors);}}需要注意leafSlices 这个数组代表的此索引在此数据节点的所有分片信息 Overridepublic void search(ListLeafReaderContext leaves, Weight weight, Collector collector) throws IOException {collector.setWeight(weight);for (LeafReaderContext ctx : leaves) { // search each subreadersearchLeaf(ctx, weight, collector);}}1、主节点将查询请求路由(根据分片找到数据节点)到对应的数据节点执行查询请求因为数据节点无法知道查询请求是否仅针对某一个具体的分片。数据节点会在所有分片上执行查询操作并将结果进行合并、去重和处理以产生最终的结果。 2、因此即使主节点发送的查询请求只涉及一个分片但在实际查询过程中数据节点会遍历该数据节点上所有与该索引对应的分片以保证查询结果的完整性。 但是至少遍历多个分片用的是异步同时进行查询的方式 private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {//检查是否取消了搜索操作。cancellable.checkCancelled();//获取当前叶子节点的LeafCollector。用于收集匹配的文档。final LeafCollector leafCollector;try {//获取当前叶子节点的存活文档集合。leafCollector collector.getLeafCollector(ctx);} catch (CollectionTerminatedException e) {return;}//获取当前叶子节点的存活文档集合位集合表示哪些文档未被删除Bits liveDocs ctx.reader().getLiveDocs();//将存活文档集合转换为稀疏位集合BitSet liveDocsBitSet getSparseBitSetOrNull(liveDocs);//如果存活文档集合不是稀疏位集合那么使用BulkScorer进行评分操作。if (liveDocsBitSet null) {BulkScorer bulkScorer weight.bulkScorer(ctx);if (bulkScorer ! null) {if (cancellable.isEnabled()) {bulkScorer new CancellableBulkScorer(bulkScorer, cancellable::checkCancelled);}try {//使用BulkScorer对匹配的文档进行评分操作。bulkScorer.score(leafCollector, liveDocs);} catch (CollectionTerminatedException e) {// collection was terminated prematurely// continue with the following leaf}}} else {//如果存活文档集合为稀疏位集合//获取权重对象的Scorer用于计算每个候选文档的得分Scorer scorer weight.scorer(ctx);if (scorer ! null) {try {// 使用Scorer和稀疏位集合liveDocsBitSet对匹配的文档进行交集计算并进行评分操作。intersectScorerAndBitSet(scorer,liveDocsBitSet,leafCollector,this.cancellable.isEnabled() ? cancellable::checkCancelled : () - {});} catch (CollectionTerminatedException e) {// collection was terminated prematurely// continue with the following leaf}}}}liveDocsBitSet 的作用是在搜索阶段过滤掉已被删除的文档并仅处理存活文档。通过与其他集合的结合使用可以在执行搜索和评分操作时仅处理存活的有效文档可以大大减少内存占用。从而提升搜索性能和准确性。 其他集合例如Bits、BulkScorer和Scorer可能用于执行搜索和评分操作但往往与存活文档集合无直接关联 //在给定的Scorer、BitSet、LeafCollector和checkCancelled函数的基础上计算它们的交集并将结果收集到collector中。static void intersectScorerAndBitSet(Scorer scorer, BitSet acceptDocs, LeafCollector collector, Runnable checkCancelled)throws IOException {//将scorer设置为collector的scorer。collector.setScorer(scorer);// ConjunctionDISI uses the DocIdSetIterator#cost() to order the iterators, so if roleBits has the lowest cardinality it should// be used first://创建一个迭代器iterator通过将acceptDocs和scorer的迭代器传递给ConjunctionUtils.intersectIterators()方法来计算它们的交集。DocIdSetIterator iterator ConjunctionUtils.intersectIterators(Arrays.asList(new BitSetIterator(acceptDocs, acceptDocs.approximateCardinality()), scorer.iterator()));int seen 0;checkCancelled.run();//通过迭代器遍历交集中的文档对于每个文档如果满足一定条件则将其收集到collector中。for (int docId iterator.nextDoc(); docId DocIdSetIterator.NO_MORE_DOCS; docId iterator.nextDoc()) {//在每次遍历一定数量的文档后调用checkCancelled函数检查是否取消操作。if (seen % CHECK_CANCELLED_SCORER_INTERVAL 0) {checkCancelled.run();}collector.collect(docId);}//最后再次调用checkCancelled函数。checkCancelled.run();}
http://www.hkea.cn/news/14311489/

相关文章:

  • 长春火车站建在哪里局域网搭建wordpress慢
  • seo网站排名软件新的网站建设
  • 医院网站asp源码wordpress跳转后端IP
  • 哪些网站做任务可以赚钱的知言 wordpress
  • 网站建设开发流程变装WordPress
  • 网站开发硬件环境优化的基本意思
  • 课堂资源管理网站开发需求分析个人做多个网站备案
  • 网站建设企业网站制作平台传奇手游开服表网站
  • 成都网站建设 四川冠辰网站建设迈诺网站建设
  • qq群推广网站免费秒进不拦截网页的浏览器
  • 网站建设服务合同 付款方式建设团购网站费用
  • 咸阳网站开发哪家好网站推广的目标是什么
  • 微网站设计与开发竞赛网页设计与制作课程设计报告书
  • 网站建设找祥赢如何做聚合类网站
  • 如何搭建手机网站东莞网站建设周期
  • 二手车 东莞网站建设wordpress网易音乐
  • 公司论坛网站建设规划书wordpress增加下载量显示
  • seo视频教程百度网盘南宁seo推广服务
  • 网站做区块链然后往里面投钱织梦可以做大型网站吗
  • 网站建设方案书1500字湖北省建设厅行政审批网站
  • 阿里云服务器可以做彩票网站吗网络平台的推广方法
  • 南京搜必应网站优化百度山西授权代理
  • 企业级网站开发原理图谷歌排名查询
  • 外贸建站推广公司医院网站域名备案
  • 网站中数据查询如何做你有网站 我做房东 只收佣金的网站
  • 安徽外贸网站建设辛集网站建设
  • 做视频编辑哪个网站素材比较好wordpress时间调用
  • 永康网站设计建设网站那个公司好
  • 郑州网站优化平台高企达建设有限公司网站
  • 珠海网站建设找哪家工信部网站备案信息怎么查询