文章目录

  • Jest Github地址
  • 搭建源码环境
  • Jest配置ES集群
  • Jest 配置ES集群,确保应用高可用的原理探究
    • 初始化 JestClient
    • NodeChecker 源码分析
    • 发起请求的过程
  • 遇到的问题


Jest Github地址

直接访问 https://github.com/searchbox-io/Jest ,把源码拉下来


搭建源码环境

我拉了个5.3.4的版本,最新版本为6.3.1 ,大同小异

test 这个module是我自己写的测试集群代码,GitHub上是没有这个的 .


Jest配置ES集群

单例Client ,有个属性JestClient ,需要初始化。

package com.artisan.test;import com.google.gson.GsonBuilder;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;import java.util.Arrays;
import java.util.concurrent.TimeUnit;public class Client {// volatile修饰,确保内存可见private volatile static Client client = null;private static JestClient jestClient;/*** 私有构造函数*/private Client() {initJestClient(); // 初始化JestClient}/*** 懒汉模式* double Check* @return*/public static Client getInstance() {if (client == null) {synchronized (Client.class) {if (client == null) {client = new Client();}}}return client;}/*** 获取JestClient* @return*/public static JestClient getJestClient() {return jestClient;}private void initJestClient() {// 初始化的集群节点String[] serverUris = new String[]{"http://127.0.0.1:9200", "http://127.0.0.1:8200"};JestClientFactory factory = new JestClientFactory();// 设置HttpClientConfigfactory.setHttpClientConfig(new HttpClientConfig.Builder(Arrays.asList(serverUris)).discoveryEnabled(true) // 节点发现,确保访问的节点都是存活的节点,达到高可用.discoveryFrequency(2000, TimeUnit.MILLISECONDS) // NodeChecker的执行频率,默认10S.gson(new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss").create()).multiThreaded(true).readTimeout(10000).build());// 返回jestClientjestClient = factory.getObject();}
}

测试类

package com.artisan.test;import io.searchbox.client.JestResult;
import io.searchbox.core.Get;import java.io.IOException;public class JestClientTest {/*** 构造函数*/public JestClientTest() {Client.getInstance();// 初始化Client}private static void getDocumentMyStroe(String id) {Get get = new Get.Builder("my_store", id).type("product").build();JestResult result ;try {result = Client.getJestClient().execute(get);if (result != null) System.out.println(id + ":" + result.getJsonObject());} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {Thread.sleep(5000);// 先让NodeChecker运行,获取存活的节点,主线程这里先休眠5秒for (int i = 0; i < Integer.MAX_VALUE; i++) {Thread.sleep(2000);getDocumentMyStroe("998");}}
}

Jest 配置ES集群,确保应用高可用的原理探究

来看看关键点.discoveryEnabled(true) 都干了啥?

初始化 JestClient

到 JestClientFactory#getObject() 方法 中看下 ,大致说下整个方法的逻辑:

public JestClient getObject() {// 初始化 JestHttpClientJestHttpClient client = new JestHttpClient();if (httpClientConfig == null) {log.debug("There is no configuration to create http client. Going to create simple client with default values");httpClientConfig = new HttpClientConfig.Builder("http://localhost:9200").build();}client.setRequestCompressionEnabled(httpClientConfig.isRequestCompressionEnabled());// 初始化的es集群节点client.setServers(httpClientConfig.getServerList());// 设置HttpClient、AsyncClient final HttpClientConnectionManager connectionManager = getConnectionManager();final NHttpClientConnectionManager asyncConnectionManager = getAsyncConnectionManager();client.setHttpClient(createHttpClient(connectionManager));client.setAsyncClient(createAsyncHttpClient(asyncConnectionManager));// 设置自定义的GsonGson gson = httpClientConfig.getGson();if (gson == null) {log.info("Using default GSON instance");} else {log.info("Using custom GSON instance");client.setGson(gson);}// 创建NodeChecker并启动Node Discovery// set discovery (should be set after setting the httpClient on jestClient)if (httpClientConfig.isDiscoveryEnabled()) {log.info("Node Discovery enabled...");if (!Strings.isNullOrEmpty(httpClientConfig.getDiscoveryFilter())) {log.info("Node Discovery filtering nodes on \"{}\"", httpClientConfig.getDiscoveryFilter());}NodeChecker nodeChecker = createNodeChecker(client, httpClientConfig);client.setNodeChecker(nodeChecker);nodeChecker.startAsync();nodeChecker.awaitRunning();} else {log.info("Node Discovery disabled...");}//  如果maxConnectionIdleTime大于0则会创建IdleConnectionReaper,进行Idle connection reaping  (空闲线程回收)// schedule idle connection reaping if configuredif (httpClientConfig.getMaxConnectionIdleTime() > 0) {log.info("Idle connection reaping enabled...");IdleConnectionReaper reaper = new IdleConnectionReaper(httpClientConfig, new HttpReapableConnectionManager(connectionManager, asyncConnectionManager));client.setIdleConnectionReaper(reaper);reaper.startAsync();reaper.awaitRunning();} else {log.info("Idle connection reaping disabled...");}Set<HttpHost> preemptiveAuthTargetHosts = httpClientConfig.getPreemptiveAuthTargetHosts();if (!preemptiveAuthTargetHosts.isEmpty()) {log.info("Authentication cache set for preemptive authentication");client.setHttpClientContextTemplate(createPreemptiveAuthContext(preemptiveAuthTargetHosts));}return client;}

重点看下 discoveryEnable 设置为true的情况下,Jest的处理逻辑


NodeChecker 源码分析

NodeChecker继承了com.google.common.util.concurrent.AbstractScheduledService

它的构造器根据clientConfig的discoveryFrequency及discoveryFrequencyTimeUnit了fixedDelayScheduler来执行node checker;

public NodeChecker(JestClient jestClient, ClientConfig clientConfig) {// 构建action ,可以根据前面HttpClientConfig#discoveryFilter(String discoveryFilter) 添加Nodeaction = new NodesInfo.Builder().withHttp().addNode(clientConfig.getDiscoveryFilter()).build();this.client = jestClient;this.defaultScheme = clientConfig.getDefaultSchemeForDiscoveredNodes();// 根据discoveryFrequency(2000, TimeUnit.MILLISECONDS) 实例化一个定时任务出来 使用的Google Guava的包 this.scheduler = Scheduler.newFixedDelaySchedule(0l,clientConfig.getDiscoveryFrequency(),clientConfig.getDiscoveryFrequencyTimeUnit());// 初始化的根节点 this.bootstrapServerList = ImmutableSet.copyOf(clientConfig.getServerList());// 实例化 discoveredServerList  为空,后续使用 this.discoveredServerList = new LinkedHashSet<String>();}

实现了runOneIteration方法,该方法主要是发送NodesInfo请求 GET /_nodes/_all/http

 @Overrideprotected void runOneIteration() throws Exception {JestResult result;try {result = client.execute(action);} catch (CouldNotConnectException cnce) {// Can't connect to this node, remove it from the listlog.error("Connect exception executing NodesInfo!", cnce);removeNodeAndUpdateServers(cnce.getHost());return;// do not elevate the exception since that will stop the scheduled calls.// throw new RuntimeException("Error executing NodesInfo!", e);} catch (Exception e) {log.error("Error executing NodesInfo!", e);client.setServers(bootstrapServerList);return;// do not elevate the exception since that will stop the scheduled calls.// throw new RuntimeException("Error executing NodesInfo!", e);}  if (result.isSucceeded()) {LinkedHashSet<String> httpHosts = new LinkedHashSet<String>();JsonObject jsonMap = result.getJsonObject();JsonObject nodes = (JsonObject) jsonMap.get("nodes");if (nodes != null) {for (Entry<String, JsonElement> entry : nodes.entrySet()) {JsonObject host = entry.getValue().getAsJsonObject();JsonElement addressElement = null;if (host.has("version")) {int majorVersion = Integer.parseInt(Splitter.on('.').splitToList(host.get("version").getAsString()).get(0));if (majorVersion >= 5) {JsonObject http = host.getAsJsonObject("http");if (http != null && http.has(PUBLISH_ADDRESS_KEY_V5))addressElement = http.get(PUBLISH_ADDRESS_KEY_V5);}}if (addressElement == null) {// get as a JsonElement first as some nodes in the cluster may not have an http_addressif (host.has(PUBLISH_ADDRESS_KEY)) addressElement = host.get(PUBLISH_ADDRESS_KEY);}if (addressElement != null && !addressElement.isJsonNull()) {String httpAddress = getHttpAddress(addressElement.getAsString());if(httpAddress != null) httpHosts.add(httpAddress);}}}if (log.isDebugEnabled()) {log.debug("Discovered {} HTTP hosts: {}", httpHosts.size(), Joiner.on(',').join(httpHosts));}discoveredServerList = httpHosts;client.setServers(discoveredServerList);} else {log.warn("NodesInfo request resulted in error: {}", result.getErrorMessage());client.setServers(bootstrapServerList);}}
  • 请求成功的话 解析body,如果nodes下面有version,取第一位,判断大于等于5的话则取http节点下面的PUBLISH_ADDRESS_KEY_V5[publish_address]属性值,封装成http后添加到discoveredServerList ,供请求获取URL使用。(里面都是存活的节点),如果没有取到,则取PUBLISH_ADDRESS_KEY[http_address]属性值,封装成http后添加到discoveredServerList。
  • 请求抛出CouldNotConnectException则调用removeNodeAndUpdateServers方法移除该host;如果抛出其他的Exception则将client的servers重置为bootstrapServerList


发起请求的过程

执行的execute方法。Client.getJestClient 返回的是 JestClient接口

看下 JestHttpClient#execute

 /*** @throws IOException in case of a problem or the connection was aborted during request,*                     or in case of a problem while reading the response stream* @throws CouldNotConnectException if an {@link HttpHostConnectException} is encountered*/@Overridepublic <T extends JestResult> T execute(Action<T> clientRequest) throws IOException {return execute(clientRequest, null);}

继续

public <T extends JestResult> T execute(Action<T> clientRequest, RequestConfig requestConfig) throws IOException {// 获取 HttpUriRequest HttpUriRequest request = prepareRequest(clientRequest, requestConfig);CloseableHttpResponse response = null;try {response = executeRequest(request);return deserializeResponse(response, request, clientRequest);} catch (HttpHostConnectException ex) {throw new CouldNotConnectException(ex.getHost().toURI(), ex);} finally {if (response != null) {try {response.close();} catch (IOException ex) {log.error("Exception occurred while closing response stream.", ex);}}}}

重点来了

 HttpUriRequest request = prepareRequest(clientRequest, requestConfig);

继续跟到prepareRequest

   protected <T extends JestResult> HttpUriRequest prepareRequest(final Action<T> clientRequest, final RequestConfig requestConfig) {String elasticSearchRestUrl = getRequestURL(getNextServer(), clientRequest.getURI());HttpUriRequest request = constructHttpMethod(clientRequest.getRestMethodName(), elasticSearchRestUrl, clientRequest.getData(gson), requestConfig);log.debug("Request method={} url={}", clientRequest.getRestMethodName(), elasticSearchRestUrl);// add headers added to actionfor (Entry<String, Object> header : clientRequest.getHeaders().entrySet()) {request.addHeader(header.getKey(), header.getValue().toString());}return request;}

重点: getNextServer()

 /*** @throws io.searchbox.client.config.exception.NoServerConfiguredException*/protected String getNextServer() {return serverPoolReference.get().getNextServer();}

继续

总结一下:

  • JestHttpClient继承了AbstractJestClient,它的execute及executeAsync方法都调用了prepareRequest来构造HttpUriRequest;
  • prepareRequest方法会先调用getNextServer方法来获取要请求的elasticSearchServer的地址;
  • 而getNextServer方法则是调用的serverPoolReference.get().getNextServer()
  • 看看 serverPoolReference 是个啥?
 private final AtomicReference<ServerPool> serverPoolReference =new AtomicReference<ServerPool>(new ServerPool(ImmutableSet.<String>of()));
  • 再看看刚才NodeChecker 处理完成后调用的 client.setServers(discoveredServerList);

到 AbstractJestClient 类中看下 setServers方法

AbstractJestClient有一个serverPoolReference属性,AtomicReference,其泛型为ServerPool;setServers方法则是创建新的ServerPool,然后更新serverPoolReference

ServerPool有个AtomicInteger类型的nextServerIndex,getNextServer方法则是通过nextServerIndex.getAndIncrement() % serversRing.size()来确定取的serversRing这个List的index,其实现的是Round Robin策略;极端情况下出现IndexOutOfBoundsException的话,则会重置nextServerIndex为0,然后继续按Round Robin策略取下一个server

是不是就对上了? NodeChecker负责更新,execute则从里面取,所里取出来的都是 存活的节点。 这样就做到了动态的发现。

节点上线后,自动发送到该节点,节点挂掉后,能自动移除。 全称无需干预。

再说一点, NodeChecker有个执行频率, 确保这个执行完了以后,再请求ES。 举个例子,比如3个节点,你启动应用的时候,正好有一个节点是挂掉的,而且正常的业务请求正好请求到了这个坏的节点上,是不是就挂了。 如果NodeChecker执行完以后,那取出的节点肯定是都是存活的。


遇到的问题

说下背景, 老项目 升级 , 以前是 单个ES节点,所以 没有配置 集群,且Jest版本为Jdk1.7

初始化JestClient如下

  JestClientFactory factory = new JestClientFactory();factory.setHttpClientConfig(new  HttpClientConfi.Builder("http://127.0.0.1:9200").gson(new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss").create()).multiThreaded(true).readTimeout(10000).build());jestClient = factory.getObject();

配置连接集群的地址,最重要的一行代码,增加 .discoveryEnabled(true)

用的是2.4.0的版本, 升级到了5.3.4以后,去debug jest的源码的时候,打上的断点,总和是源码对不起来 … 结果是 IDEA 发布的Tomcat工程路径中 老的2.4.0的jar包还在原来的目录下面,导致Tomcat加载了2.4.0 jar包中的类,删除老的jar包,重新编译测试,通过。

做了几件事儿

  1. 升级JDK到1.8
  2. Jest 升级到 5.3.4
  3. 依赖的Guava升级到了19.0

感兴趣的同学,用我上面提供的测试代码测试即可。


Elasticsearch-Jest 配置ES集群源码解读相关推荐

  1. ElasticSearch(二):ES集群安装

    安装Elasticsearch 创建普通用户 ES不能使用root用户来启动,必须使用普通用户来安装启动.这里我们创建一个普通用户以及定义一些常规目录用于存放我们的数据文件以及安装包等. 创建一个es ...

  2. Elasticsearch——Windows下ES集群部署 Linux下ES单节点、集群部署

    1.开篇 在之前的两篇文章中,说白了就是在windows下部署的ES单节点的环境. 这篇文章主要是说一下windows下部署ES集群.Linux下单节点部署. 单台 Elasticsearch 服务器 ...

  3. ElasticSearch 5. 搭建ES集群

    Elasticsearch集群 1. why? 提高负载能力 提高存储容量上限 实现高可用 提高并发处理能力 - 2. 数据分片(Shard) ​ es集群把数据拆分成多份,每一份存储到不同节点(no ...

  4. Nacos源码系列——第三章(全网最经典的Nacos集群源码主线剖析)

    上两个章节讲述了Nacos在单机模式下的服务注册,发现等源码剖析过程,实战当中 其实单机是远远不够的,那么Nacos是如何在集群模式下是如何保证节点状态同步,以及服 务变动,新增数据同步的过程的!   ...

  5. python连接es数据库_Python Elasticsearch API操作ES集群

    环境Centos 7.4 Python 2.7 Pip 2.7 MySQL-python 1.2.5 Elasticsearc 6.3.1 Elasitcsearch6.3.2 知识点调用Python ...

  6. Seatunnel提交任务到Flink集群源码解析

    一:首先查看seatunnel提交任务到flink集群的时候的shell脚本start-seatunnel-flink-13-connector-v2.sh,查看最后会调用一个类FlinkStarte ...

  7. 25000linux集群源码,一文看懂 Redis5 搭建集群

    1.简要说明 2018年十月 Redis 发布了稳定版本的 5.0 版本,推出了各种新特性,其中一点是放弃 Ruby的集群方式,改为 使用 C语言编写的 redis-cli的方式,是集群的构建方式复杂 ...

  8. CentOS6.4+rabbitmq集群——源码安装

    1.实验环境 192.168.56.101r1.com 192.168.56.102r2.com 2.修改主机名 # cat /etc/hosts 127.0.0.1 localhost localh ...

  9. linux搭建es集群

    准备 安装docker. 安装好Docker Compose. 注意:运行内存最好8g以上,es运行会占用很多内存(2-3g) 方式1: 单机多节点. 参考官网的方式创建(docker-compose ...

最新文章

  1. 从键盘上输入一个正整数n,请按照以下五行杨辉三角形的显示方式, 输出杨辉三角形的前n行。请采用循环控制语句来实现。...
  2. Oracle分页查询格式(八)
  3. 多数据源切换(拦截器)
  4. python之web框架(3):WSGI之web应用完善
  5. ARM 内核寄存器 和 基本汇编语言讲解
  6. 带CAN唤醒能力的TJA1043
  7. alpha测试什么意思,和Beta测试有何区别?
  8. 基本类型包装及数学工具类的使用
  9. Slove the {Failed to load unit 'HGCM' (VERR_INVALID_PARAMETER)}
  10. 令人惋惜:Sigfox撑不下去了!这回,可不能怪NB-IoT、LoRa......
  11. Pyhton opencv 图片裁剪
  12. Linux那些事儿之我是U盘(5)外面的世界很精彩
  13. Web(七)CSS页面布局-网页布局页面的制作
  14. pythoncad标注教程_CAD 2014二维三维建模渲染标注基础与提升视频教程
  15. 数据分析的终极目标-预测第1辑
  16. P4编程环境安装(ubuntu16.04,p4c+bmv2+mininet+PI+tutorial)
  17. 数学建模|预测方法:马尔科夫预测
  18. 2010计算机系助学金,2010-2011学年计算机系国家助学金学生名单公示
  19. Pyqt5设置背景图片
  20. 记微服务架构实战学习笔记

热门文章

  1. SOCKET/串口通信粘包问题处理,附带详细代码
  2. C 语言链表其他实现
  3. c语言编写程序x的y次方,C语言变为编程y = x-x立方/ 3! + x五次方力量/ 5! -x7th power / 7!...
  4. python访问数据库oracle_python连接oracle数据库
  5. 107. Leetcode 123. 买卖股票的最佳时机 III (动态规划-股票交易)
  6. Leetcode 153. 寻找旋转排序数组中的最小值 (每日一题 20211014)
  7. tableau可视化数据分析60讲(十七)-tableau常用可视化视图(凹凸图甘特图直方图)
  8. 深度学习核心技术精讲100篇(五十四)-阿里文娱多模态视频分类算法中的特征改进
  9. 深度学习核心技术精讲100篇(六)-keras 实战系列之知识蒸馏(Knowledge Distilling)
  10. linux下面子目录绑定域名的方法,.htaccess绑定子域名到子目录方法