1.概述

转载并且补充:Elasticsearch 通信模块的分析

Elasticsearch是一个基于Lucene的分布式实时搜索框架,它本身能够接受用户发来的http 请求, 集群节点之间也会有相关的通信。

通信模块的简介

Elasticsearch 中的通信相关的配置都是由NetworkModule 这个类完成的。 NetworkModule 里面的配置主要分三大部分:

  1. HttpServerTransport: 这个主要负责接受用户发来的请求,然后分发请求
  2. Transport: 这个主要负责集群间的通信,应该是Elasticsearch 的RPC
  3. TransportInterceptor 是对连接之间的拦截,在连接发送之前 或是接到之后先做一些相关处理,这个在Elasticseach 使用的并不多,目前只是提供了这功能的接口,可以让之后更容扩展.

由于3在Elasticseach 使用的并不多,我在这里面不多讲,主要讲1 和2

Elasticsearch是一个非常扩展性非常强的系统,每个功能都模块化,服务化。而且它提供了插件(Plugin)的接口,让每一个功能都很容易可以扩展,实现了可插拔。对于网络相关的的插件是NetworkPlugin

NetworkPlugin 提供了三个函数来分别获得和配置HttpServerTransport, Transport, TransportInterceptor.

/*** Plugin for extending network and transport related classes*/
public interface NetworkPlugin {/*** Returns a list of {@link TransportInterceptor} instances that are used to intercept incoming and outgoing* transport (inter-node) requests. This must not return <code>null</code>** @param namedWriteableRegistry registry of all named writeables registered* @param threadContext a {@link ThreadContext} of the current nodes or clients {@link ThreadPool} that can be used to set additional*                      headers in the interceptors*/default List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry,ThreadContext threadContext) {return Collections.emptyList();}/*** Returns a map of {@link Transport} suppliers.* See {@link org.elasticsearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation.*/default Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,CircuitBreakerService circuitBreakerService,NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {return Collections.emptyMap();}/*** Returns a map of {@link HttpServerTransport} suppliers.* See {@link org.elasticsearch.common.network.NetworkModule#HTTP_TYPE_SETTING} to configure a specific implementation.*/default Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,PageCacheRecycler pageCacheRecycler,CircuitBreakerService circuitBreakerService,NamedXContentRegistry xContentRegistry,NetworkService networkService,HttpServerTransport.Dispatcher dispatcher,ClusterSettings clusterSettings) {return Collections.emptyMap();}
}

NetworkModule 会在它的构造函数里面遍历所有的network plugin 然后缓存到内存里面。

/*** Creates a network module that custom networking classes can be plugged into.** 创建可插入自定义网络类的网络模块。** @param settings The settings for the node* @param transportClient True if only transport classes should be allowed to be registered, false otherwise.*/public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,BigArrays bigArrays,PageCacheRecycler pageCacheRecycler,CircuitBreakerService circuitBreakerService,NamedWriteableRegistry namedWriteableRegistry,NamedXContentRegistry xContentRegistry,NetworkService networkService, HttpServerTransport.Dispatcher dispatcher,ClusterSettings clusterSettings) {this.settings = settings;this.transportClient = transportClient;for (NetworkPlugin plugin : plugins) {Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher, clusterSettings);// HttpServerTransportif (transportClient == false) {for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {registerHttpTransport(entry.getKey(), entry.getValue());}}// TransportMap<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler,circuitBreakerService, namedWriteableRegistry, networkService);for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {registerTransport(entry.getKey(), entry.getValue());}// TransportInteceptorList<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry,threadPool.getThreadContext());for (TransportInterceptor interceptor : transportInterceptors) {registerTransportInterceptor(interceptor);}}logger.info("初始化 NetworkModule 完毕");}

Netty Network Plugin

Elasticsearch 的底层通信是用了高性能异步io 框架Netty。

Netty 的性能非常优秀,底层使用了kqueue or epoll 来时实现对io 的高复用,然后使用的zero copy buffer 技术来提高了cpu 的效率。

Elasticsearch 是以插件的模式把Netty 的实现插入它本身的系统里面


public class Netty4Plugin extends Plugin implements NetworkPlugin {public static final String NETTY_TRANSPORT_NAME = "netty4";public static final String NETTY_HTTP_TRANSPORT_NAME = "netty4";@Overridepublic List<Setting<?>> getSettings() {return Arrays.asList(Netty4HttpServerTransport.SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS,Netty4HttpServerTransport.SETTING_HTTP_WORKER_COUNT,Netty4HttpServerTransport.SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE,Netty4Transport.WORKER_COUNT,Netty4Transport.NETTY_RECEIVE_PREDICTOR_SIZE,Netty4Transport.NETTY_RECEIVE_PREDICTOR_MIN,Netty4Transport.NETTY_RECEIVE_PREDICTOR_MAX,Netty4Transport.NETTY_BOSS_COUNT);}@Overridepublic Settings additionalSettings() {return Settings.builder()// here we set the netty4 transport and http transport as the default. This is a set once setting// ie. if another plugin does that as well the server will fail - only one default network can exist!.put(NetworkModule.HTTP_DEFAULT_TYPE_SETTING.getKey(), NETTY_HTTP_TRANSPORT_NAME).put(NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING.getKey(), NETTY_TRANSPORT_NAME).build();}@Overridepublic Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,CircuitBreakerService circuitBreakerService,NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService));}@Overridepublic Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,PageCacheRecycler pageCacheRecycler,CircuitBreakerService circuitBreakerService,NamedXContentRegistry xContentRegistry,NetworkService networkService,HttpServerTransport.Dispatcher dispatcher,ClusterSettings clusterSettings) {// 创建 Netty4HttpServerTransportreturn Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,() -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher,clusterSettings));}
}

这里可以参考文章:

【ElasticSearch】Es 源码之 HttpServerTransport 源码解读

【ElasticSearch】Es 源码之 Netty4HttpServerTransport 源码解读

从上面代码来看。 Netty 主要 ️两个重要的部分组成:

1 Netty4HttpServerTransport。2 Netty4Transport。

Netty4HttpServerTransport

Netty4HttpServerTransport 是插件里面对HttpServerTransport 的实现,它继承了AbstractLifecycleComponent 实现了HttpServerTransport 的接口,这样Netty4HttpServerTransport 就拥有了和整个系统一样的一样的生命周期。它会在系统启动的时候被启动, 在系统结束的时候被关闭。

 public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, ClusterSettings clusterSettings) {super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings);// 设置Netty用于调整各种资源(例如,线程池)大小的可用处理器数量。Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);this.maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings);this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);// 工作线程this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings);// 读取超时时间this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis());ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings);// 分片大小笑你个头的缓存recvByteBufAllocator = new FixedRecvByteBufAllocator(receivePredictor.bytesAsInt());logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], " +"receive_predictor[{}], max_composite_buffer_components[{}], pipelining_max_events[{}]",maxChunkSize, maxHeaderSize, maxInitialLineLength, maxContentLength, receivePredictor, maxCompositeBufferComponents,pipeliningMaxEvents);}

doStart 方法

@Overrideprotected void doStart() {boolean success = false;try {// 创建 ServerBootstrapserverBootstrap = new ServerBootstrap();serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));// NettyAllocator will return the channel type designed to work with the configuredAllocator// NettyAllocator将返回用于配置dallocator的通道类型  默认的channel 类型是 org.elasticsearch.transport.CopyBytesServerSocketChannelserverBootstrap.channel(NettyAllocator.getServerChannelType());// Set the allocators for both the server channel and the child channels created// 为服务器通道和所创建的子通道设置分配器serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator()); // io.netty.buffer.PooledByteBufAllocatorserverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());// io.netty.buffer.PooledByteBufAllocator// 设置 HttpChannelHandlerserverBootstrap.childHandler(configureServerChannelHandler());// 设置异常捕获serverBootstrap.handler(new ServerChannelExceptionHandler(this));serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings)); // 默认为trueserverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));// 默认为true// 设置连接属性if (SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)) {// Netty logs a warning if it can't set the option, so try this only on supported platformsif (IOUtils.LINUX || IOUtils.MAC_OS_X) {if (SETTING_HTTP_TCP_KEEP_IDLE.get(settings) >= 0) {final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();if (keepIdleOption != null) {serverBootstrap.childOption(NioChannelOption.of(keepIdleOption), SETTING_HTTP_TCP_KEEP_IDLE.get(settings));}}if (SETTING_HTTP_TCP_KEEP_INTERVAL.get(settings) >= 0) {final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();if (keepIntervalOption != null) {serverBootstrap.childOption(NioChannelOption.of(keepIntervalOption),SETTING_HTTP_TCP_KEEP_INTERVAL.get(settings));}}if (SETTING_HTTP_TCP_KEEP_COUNT.get(settings) >= 0) {final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();if (keepCountOption != null) {serverBootstrap.childOption(NioChannelOption.of(keepCountOption), SETTING_HTTP_TCP_KEEP_COUNT.get(settings));}}}}// 设置发送buffer大小 默认-1bfinal ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);if (tcpSendBufferSize.getBytes() > 0) {serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));}// 设置接收buffer大小  默认-1bfinal ByteSizeValue tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);if (tcpReceiveBufferSize.getBytes() > 0) {serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));}serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);// 设置 地址final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);// 绑定serverbindServer();success = true;} finally {if (success == false) {doStop(); // otherwise we leak threads since we never moved to started}}}

initChannel 方法

 @Overrideprotected void initChannel(Channel ch) throws Exception {Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);// 添加读取超时时间ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));// http 请求解密final HttpRequestDecoder decoder = new HttpRequestDecoder(handlingSettings.getMaxInitialLineLength(),handlingSettings.getMaxHeaderSize(),handlingSettings.getMaxChunkSize());decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);ch.pipeline().addLast("decoder", decoder);// 添加http内容解压器ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());// 添加http response 编码器ch.pipeline().addLast("encoder", new HttpResponseEncoder());final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);ch.pipeline().addLast("aggregator", aggregator);if (handlingSettings.isCompression()) {// 添加http 内容压缩器ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));}if (handlingSettings.isCorsEnabled()) {// 添加跨域请求ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.corsConfig));}// 实现HTTP管道排序,确保响应完全按照其对应的请求的顺序服务。ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));ch.pipeline().addLast("handler", requestHandler);transport.serverAcceptedChannel(nettyHttpChannel);}

Netty4HttpServerTransport 会在自己启动的时候的中创建一个Netty服务器然后去监听9200端口。

服务器收到请求后会去回调HttpChannelHandler这个回调函数。 HttpChannelHandler主要做的对接收到的请求进行解码然后分发给不同的模块去执行。因为Netty的通信层面是在TCP/IP 层面而不是Http 层面,所以对于接受来的请求,必须要先解析成Http请求然后再交给个Netty4HttpRequestHandler 去处理。Netty4HttpRequestHandler 本身做的是对Netty 的请求再一次包装,把它包装成Elasticseach 自己定义的RestRequest 还有RestChannel然后用Dispatcher 去进行分发解析好的请求。

对Netty Http 请求的再包装的作用主要是实现对Netty 解耦,如果以后有新的更好的通信架构,对通信模块的重构会更加容易

Dispatcher(RestController.java)

此时可以参考:【ElasticSearch】Es 源码之 RestController 源码解读

RestController 属于ActionModule 里面的功能, ActionModule 主要的任务就是注册各种操作(action) 需要执行的函数,建立起action 的名字和 函数的对应关系。

actions.register(SearchAction.INSTANCE, TransportSearchAction.class);

例如搜索这个的action,它回去调用TransportSearchAction 去做搜索的操作。

RestController(Dispatcher) 主要的任务是根据Netty4HttpRequestHandler 转过来的请求的url进行解析,然后寻求相对应的action 然后执行。

RestController 对与path 的解析的时候也做了一些优化, 它使用了trie (字典树) 这个数据结构来提升性能查找的性能

例如:

GET /_cluster/state/metadata

这个uri 对应的action 函数。在内存里面存储的情况应该是类似这样的

public class TrieNode {private transient String key;private transient T value;// (回调函数)private final String wildcard;private transient String namedWildcard;private Map<String, TrieNode> children;private TrieNode(String key, T value, String wildcard) {this.key = key;this.wildcard = wildcard;this.value = value;this.children = emptyMap();if (isNamedWildcard(key)) {namedWildcard = key.substring(key.indexOf('{') + 1, key.indexOf('}'));} else {namedWildcard = null;}}

每一个节点都是一个key 和一个value (回调函数) ,还有一个hashmap 来保存他的子节点, wildcard 是用来判断这个key 是不是{*} , 也就是这段路径可能是任意值。

PathTrie 会先对请求的url 对‘/’符号进行分割,然后一层一层的找下去,直到找到相匹配的函数。 PathTrie的优势在众多注册的url 的中以最快的速度找到相匹配的函数。

GET /_cluster/state/metadata

这个url 的查找的次数是[_cluster, state, metadata].size() 也就是3次。 其实有很多优秀的url router 都是利用字典树实现的,例如这个go 的high performnace router

https://github.com/julienschmidt/httprouter

Netty4Transport


Netty4Transport 相当于Elasticsearch 的RPC (remote procedure call)。 它在Elasticsearch启动的时候也去会启动一个的Netty Server 然后去监听另外一个9300端口来处理其他Node 发来的请求。 同时自己也会初始化一个Netty Client 来给别的Node发请求。

创建一个新的Netty Server 的好处是可以实现与HttpServerTransport的解耦,把RPC 接受的逻辑和HttpServerTransport分开, 同时也可以对RPC 信息的序列化可以进一步优化。对于RPC信息序列化, Elasticsearch 并没有用Http 还有Json,而是自己设定一套规则。所有的发送和接受都是在TCP/IP 层面,这样减少了Http 层面的解析 , 对于发送的消息也进一步的压缩来提高传输效率。

@Overrideprotected void doStart() {boolean success = false;try {// 启动client 端bootstrap = createBootstrap();if (NetworkService.NETWORK_SERVER.get(settings)) {for (ProfileSettings profileSettings : profileSettings) {createServerBootstrap(profileSettings);bindServer(profileSettings);// server 端}}super.doStart();success = true;} finally {if (success == false) {doStop();}}}

Netty4Transport 在Client 和Server 中共同使用了这个Netty4MessageChannelHandler 回调函数。Client 在发送请求给远方的Node 的时候会把在信息的header里面标注为request 这个状态,所以这个回调函数判断到底是远方Node 发来的请求还是返回执行的结果是就是根据这个 TransportStatus.isRequest(status) 状态

org.elasticsearch.transport.InboundHandler#messageReceivedprivate void messageReceived(TcpChannel channel, InboundMessage message) throws IOException {final InetSocketAddress remoteAddress = channel.getRemoteAddress();final Header header = message.getHeader();assert header.needsToReadVariableHeader() == false;ThreadContext threadContext = threadPool.getThreadContext();try (ThreadContext.StoredContext existing = threadContext.stashContext()) {// Place the context with the headers from the messagethreadContext.setHeaders(header.getHeaders());threadContext.putTransient("_remote_address", remoteAddress);if (header.isRequest()) {handleRequest(channel, header, message);} else {// Responses do not support short circuiting currentlyassert message.isShortCircuit() == false;final StreamInput streamInput = namedWriteableStream(message.openOrGetStreamInput());assertRemoteVersion(streamInput, header.getVersion());final TransportResponseHandler<?> handler;long requestId = header.getRequestId();if (header.isHandshake()) {handler = handshaker.removeHandlerForHandshake(requestId);} else {TransportResponseHandler<? extends TransportResponse> theHandler =responseHandlers.onResponseReceived(requestId, messageListener);if (theHandler == null && header.isError()) {handler = handshaker.removeHandlerForHandshake(requestId);} else {handler = theHandler;}}// ignore if its null, the service logs itif (handler != null) {if (header.isError()) {handlerResponseError(streamInput, handler);} else {handleResponse(remoteAddress, streamInput, handler);}// Check the entire message has been readfinal int nextByte = streamInput.read();// calling read() is useful to make sure the message is fully read, even if there is an EOS markerif (nextByte != -1) {throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler ["+ handler + "], error [" + header.isError() + "]; resetting");}}}}}

Transport 有两种定义好的回调函数, 一个是TransportRequestHandler, 一个是TransportResponseHandler。

public interface TransportRequestHandler<T extends TransportRequest> {/*** Override this method if access to the Task parameter is needed*/default void messageReceived(final T request, final TransportChannel channel, Task task) throws Exception {messageReceived(request, channel);}void messageReceived(T request, TransportChannel channel) throws Exception;
}public interface TransportResponseHandler<T extends TransportResponse> extends Writeable.Reader<T> {/*** @deprecated Implement {@link #read(StreamInput)} instead.*/@Deprecateddefault T newInstance() {throw new UnsupportedOperationException();}/*** deserializes a new instance of the return type from the stream.* called by the infra when de-serializing the response.** @return the deserialized response.*/@SuppressWarnings("deprecation")@Overridedefault T read(StreamInput in) throws IOException {T instance = newInstance();instance.readFrom(in);return instance;}void handleResponse(T response);void handleException(TransportException exp);String executor();
}

TransportRequestHandler 是处理远方节点发来请求的回调函数,它会根据发来请求做出对应的操作

TransportResponseHandler 就是发给远方节点执行后返回的结果的回调函数,主要功能是整合这些返回信息,返回给用户或是再分发到其他节点上。

下面我举个例子,来讲述get 一个文档的整个流程。

例如用户想直接get 一个ID 是123 博客。 他发了一个这样的请求 GET /website/blog/123 给ElasticSearch 节点1


请求在节点1 (node1)接收到之后,这个请求会被HttpServerTransport 处理,HttpServerTransport 里面的回调函数 HttpChannelHandler 会通过PathTrie解析到这个请求对应的action应该是TransportGetAction 。

这个action 是专门执行取 一个文档的操作,这个action 会先在clusterstate 里面找到/website/blog/123 的index shard 是在哪个节点里面。 如果它发现shard 是在本地, 它会异步的方式去lucence 里面读取这个文档,然后返回结果。 如果它发现shard 不在本地而在远方的节点2(node2), 它会用Transport 发到节点2 (node2)的9300,然后node2 9300 接收到这个请求之后调用用注册的ShardTransportHandler 这个回调函数会去本地lucence 里面搜索结果,然后把结果返回给node1。 node1 会把得到的结果序列化成json 返回给用户。

通过Elasticseach 来看对netty 的用法

Elasticsearch 利用Netty 的思路还是相当棒的,由于Netty 是单线程Event loop 的模式,所以Elasticsearch 在对回调函数上面尽可能用线程池来异步处理。但是它又不会盲目都去使用格外线程执行这这些回调函数。

例如对于读取clusterstate 这种存内存操作,它都是用当时线程来执行,这样减少对线程池的压力和格外资源的开销,还会使得代码复杂性降低。而且它也还会根据不同的操作来分配大小不同的线程池例如 search 的线程池里面的线程数量非常多,而且可以自动扩展。Elasticsearch对线程使用的控制也是属于fine-grained。不会傻瓜的使用一个线程走到底,而是尽可能分步骤执行。

【Elasticsearch】Elasticsearch 通信模块的分析相关推荐

  1. 用于Elasticsearch数据可视化和分析的强大工具

    The goal is to turn data into information, and information into insight. 目标是将数据转化为信息,并将信息转化为洞察力. ―Ca ...

  2. Elasticsearch CCR源码分析(补充)

    接上篇TODO Elasticsearch CCR源码分析 上篇TODO: http请求(ccr/follow)接收到后,follow集群节点开始全量同步,是以snapshot的模式去拉leader集 ...

  3. ElasticSearch搜索引擎: 内存分析与设置

    在 Elasticsearch 的运行过程中,如何合理分配与设置内存是一件十分重要的事情,否则十分容易出现各种问题. 一.Elasticsearch为什么吃内存: 我们先看下 ES 服务器的总体内存消 ...

  4. elasticsearch date_Elasticsearch在日志分析领域应用和运维实践

    主要讲述了: 基于ELK + Kafka 的日志分析系统 Elasticsearch 优化经验 Elasticsearch 运维实践 ElasticSearch介绍 分布式实时分析搜索引擎,优点包括: ...

  5. Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统

    文章目录 一.前言 二.背景信息 三.操作流程 四.准备工作 1.Docker 环境 2.Docker Compose 环境 3.版本准备 4.环境初始化 5.服务安装 6.服务设置 五.配置 Fil ...

  6. Elasticsearch CCR源码分析

    本文基于Elasticsearch6.8.5版本 ES使用的是Guice框架,依赖注入和暴露接口的方式和Spring差距较大,可先查看guice框架 节点启动过程: org/elasticsearch ...

  7. Elasticsearch 分页查询聚合分析

    分页查询 关于 Elasticsearch 分页查询,这几个问题经常被问到 问题1:想请问下,一次性获取索引上的某个字段的所有值(100 万左右),除了把 max_result_window 调大 , ...

  8. elasticsearch _field_stats 源码分析

    _field_stats 实现的功能:https://www.elastic.co/guide/en/elasticsearch/reference/5.6/search-field-stats.ht ...

  9. 搭建ELK(ElasticSearch+Logstash+Kibana)日志分析系统(二) Logstash简介及常见配置语法

    Logstash通常被分为shipper和indexer两种角色,其中shipper负责收集转发日志至redis,而indexer负责将redis传送过来的日志输出到elasticSearch,如下图 ...

最新文章

  1. C++知识点26——使用C++标准库(常用的泛型算法1)
  2. React兄弟组件之间通信
  3. 4.6 计算机网络之网络层IP组播(IGMP、组播路由选择协议、组播地址)
  4. 日历记事本java代码_急需日历记事本JAVA源代码
  5. springboot 整合mybatis_SpringBoot整合Mybatis、MybatisPuls
  6. 解决使用vue-awesome-swiper组件分页器pagination样式设置失效问题
  7. java如何获取管理员权限
  8. IOS开发 百度语音实现播报及IOS12.1后的播报功能问题与实现
  9. Spring Boot 接入支付宝完整流程实战
  10. ORCA(Optimal Reciprocal Collision Avoidance)笔记
  11. 汽车故障诊断仪的原理与作用
  12. 拓嘉辰丰:拼多多运营方法和技巧
  13. 在Win32中使用OpenGL
  14. 环境变量LC相关设置
  15. 空中网4k/5k月薪挑选大四实习生的线程题
  16. Unity中用Shader实现镜子效果
  17. hutool 解读(三)—— IO流
  18. 计算机电源已接通但未充电,笔记本电源,详细教您电源已连接未充电怎么解决...
  19. 必收藏宝典:2023纽约通票价格景点大比拼!
  20. 话说软件结构设计的图形工具

热门文章

  1. 由于芯片短缺 现代汽车牙山工厂将再度停产
  2. 联想拯救者电竞手机Pro透明版马上就到:一眼就能看到“芯”
  3. 卢伟冰宣布Redmi新机即将发布 疑为Redmi 8A
  4. 荣耀赵明评苹果发布会掉队5G:意料之中 情理之外
  5. 降为千元机!小米6X 6+64G版到手价999元
  6. P30年订单或超2000万 正与苹果抢流水线
  7. 谐音梗?小米推出首款短视频应用“朕惊视频”:专为年轻人打造
  8. 面试了上百位性能测试后,我发现了一个令人不安的事实
  9. 调起引用市场,引导用户进行评分
  10. 数组(array)(小谈)