本文主要研究一下Elasticsearch的NodesSniffer

NodesSniffer

elasticsearch-7.0.1/client/sniffer/src/main/java/org/elasticsearch/client/sniff/NodesSniffer.java

/*** Responsible for sniffing the http hosts*/
public interface NodesSniffer {/*** Returns the sniffed Elasticsearch nodes.*/List<Node> sniff() throws IOException;
}
复制代码
  • NodesSniffer接口定义了sniff方法用于获取sniffed Elasticsearch nodes,它有一个实现类为ElasticsearchNodesSniffer

ElasticsearchNodesSniffer

elasticsearch-7.0.1/client/sniffer/src/main/java/org/elasticsearch/client/sniff/ElasticsearchNodesSniffer.java

public final class ElasticsearchNodesSniffer implements NodesSniffer {private static final Log logger = LogFactory.getLog(ElasticsearchNodesSniffer.class);public static final long DEFAULT_SNIFF_REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(1);private final RestClient restClient;private final Request request;private final Scheme scheme;private final JsonFactory jsonFactory = new JsonFactory();public ElasticsearchNodesSniffer(RestClient restClient) {this(restClient, DEFAULT_SNIFF_REQUEST_TIMEOUT, ElasticsearchNodesSniffer.Scheme.HTTP);}public ElasticsearchNodesSniffer(RestClient restClient, long sniffRequestTimeoutMillis, Scheme scheme) {this.restClient = Objects.requireNonNull(restClient, "restClient cannot be null");if (sniffRequestTimeoutMillis < 0) {throw new IllegalArgumentException("sniffRequestTimeoutMillis must be greater than 0");}this.request = new Request("GET", "/_nodes/http");request.addParameter("timeout", sniffRequestTimeoutMillis + "ms");this.scheme = Objects.requireNonNull(scheme, "scheme cannot be null");}/*** Calls the elasticsearch nodes info api, parses the response and returns all the found http hosts*/@Overridepublic List<Node> sniff() throws IOException {Response response = restClient.performRequest(request);return readHosts(response.getEntity(), scheme, jsonFactory);}static List<Node> readHosts(HttpEntity entity, Scheme scheme, JsonFactory jsonFactory) throws IOException {try (InputStream inputStream = entity.getContent()) {JsonParser parser = jsonFactory.createParser(inputStream);if (parser.nextToken() != JsonToken.START_OBJECT) {throw new IOException("expected data to start with an object");}List<Node> nodes = new ArrayList<>();while (parser.nextToken() != JsonToken.END_OBJECT) {if (parser.getCurrentToken() == JsonToken.START_OBJECT) {if ("nodes".equals(parser.getCurrentName())) {while (parser.nextToken() != JsonToken.END_OBJECT) {JsonToken token = parser.nextToken();assert token == JsonToken.START_OBJECT;String nodeId = parser.getCurrentName();Node node = readNode(nodeId, parser, scheme);if (node != null) {nodes.add(node);}}} else {parser.skipChildren();}}}return nodes;}}//......public enum Scheme {HTTP("http"), HTTPS("https");private final String name;Scheme(String name) {this.name = name;}@Overridepublic String toString() {return name;}}}
复制代码
  • ElasticsearchNodesSniffer的构造器需要restClient、sniffRequestTimeoutMillis、scheme三个参数,其中sniffRequestTimeoutMillis默认为1秒,scheme默认为HTTP;它的构造器创建了GET /_nodes/http的request;sniff方法使用restClient.performRequest来执行这个GET /_nodes/http的request,之后调用readHosts来解析response,其中调用了readNode方法来解析nodes部分

GET /_nodes/http实例

{"_nodes" : {"total" : 1,"successful" : 1,"failed" : 0},"cluster_name" : "docker-cluster","nodes" : {"d7w2wdw7Q7SqERe5_fxZYA" : {"name" : "d7w2wdw","transport_address" : "172.17.0.2:9300","host" : "172.17.0.2","ip" : "172.17.0.2","version" : "6.6.2","build_flavor" : "oss","build_type" : "tar","build_hash" : "3bd3e59","roles" : ["master","data","ingest"],"http" : {"bound_address" : ["0.0.0.0:9200"],"publish_address" : "192.168.99.100:9200","max_content_length_in_bytes" : 104857600}}}
}
复制代码
  • 这里http部分有publish_address信息

readNode

elasticsearch-7.0.1/client/sniffer/src/main/java/org/elasticsearch/client/sniff/ElasticsearchNodesSniffer.java

public final class ElasticsearchNodesSniffer implements NodesSniffer {//......private static Node readNode(String nodeId, JsonParser parser, Scheme scheme) throws IOException {HttpHost publishedHost = null;/** We sniff the bound hosts so we can look up the node based on any* address on which it is listening. This is useful in Elasticsearch's* test framework where we sometimes publish ipv6 addresses but the* tests contact the node on ipv4.*/Set<HttpHost> boundHosts = new HashSet<>();String name = null;String version = null;/** Multi-valued attributes come with key = `real_key.index` and we* unflip them after reading them because we can't rely on the order* that they arive.*/final Map<String, String> protoAttributes = new HashMap<String, String>();boolean sawRoles = false;boolean master = false;boolean data = false;boolean ingest = false;String fieldName = null;while (parser.nextToken() != JsonToken.END_OBJECT) {if (parser.getCurrentToken() == JsonToken.FIELD_NAME) {fieldName = parser.getCurrentName();} else if (parser.getCurrentToken() == JsonToken.START_OBJECT) {if ("http".equals(fieldName)) {while (parser.nextToken() != JsonToken.END_OBJECT) {if (parser.getCurrentToken() == JsonToken.VALUE_STRING && "publish_address".equals(parser.getCurrentName())) {URI publishAddressAsURI = URI.create(scheme + "://" + parser.getValueAsString());publishedHost = new HttpHost(publishAddressAsURI.getHost(), publishAddressAsURI.getPort(),publishAddressAsURI.getScheme());} else if (parser.currentToken() == JsonToken.START_ARRAY && "bound_address".equals(parser.getCurrentName())) {while (parser.nextToken() != JsonToken.END_ARRAY) {URI boundAddressAsURI = URI.create(scheme + "://" + parser.getValueAsString());boundHosts.add(new HttpHost(boundAddressAsURI.getHost(), boundAddressAsURI.getPort(),boundAddressAsURI.getScheme()));}} else if (parser.getCurrentToken() == JsonToken.START_OBJECT) {parser.skipChildren();}}} else if ("attributes".equals(fieldName)) {while (parser.nextToken() != JsonToken.END_OBJECT) {if (parser.getCurrentToken() == JsonToken.VALUE_STRING) {String oldValue = protoAttributes.put(parser.getCurrentName(), parser.getValueAsString());if (oldValue != null) {throw new IOException("repeated attribute key [" + parser.getCurrentName() + "]");}} else {parser.skipChildren();}}} else {parser.skipChildren();}} else if (parser.currentToken() == JsonToken.START_ARRAY) {if ("roles".equals(fieldName)) {sawRoles = true;while (parser.nextToken() != JsonToken.END_ARRAY) {switch (parser.getText()) {case "master":master = true;break;case "data":data = true;break;case "ingest":ingest = true;break;default:logger.warn("unknown role [" + parser.getText() + "] on node [" + nodeId + "]");}}} else {parser.skipChildren();}} else if (parser.currentToken().isScalarValue()) {if ("version".equals(fieldName)) {version = parser.getText();} else if ("name".equals(fieldName)) {name = parser.getText();}}}//http section is not present if http is not enabled on the node, ignore such nodesif (publishedHost == null) {logger.debug("skipping node [" + nodeId + "] with http disabled");return null;}Map<String, List<String>> realAttributes = new HashMap<>(protoAttributes.size());List<String> keys = new ArrayList<>(protoAttributes.keySet());for (String key : keys) {if (key.endsWith(".0")) {String realKey = key.substring(0, key.length() - 2);List<String> values = new ArrayList<>();int i = 0;while (true) {String value = protoAttributes.remove(realKey + "." + i);if (value == null) {break;}values.add(value);i++;}realAttributes.put(realKey, unmodifiableList(values));}}for (Map.Entry<String, String> entry : protoAttributes.entrySet()) {realAttributes.put(entry.getKey(), singletonList(entry.getValue()));}if (version.startsWith("2.")) {/** 2.x doesn't send roles, instead we try to read them from* attributes.*/boolean clientAttribute = v2RoleAttributeValue(realAttributes, "client", false);Boolean masterAttribute = v2RoleAttributeValue(realAttributes, "master", null);Boolean dataAttribute = v2RoleAttributeValue(realAttributes, "data", null);master = masterAttribute == null ? false == clientAttribute : masterAttribute;data = dataAttribute == null ? false == clientAttribute : dataAttribute;} else {assert sawRoles : "didn't see roles for [" + nodeId + "]";}assert boundHosts.contains(publishedHost) :"[" + nodeId + "] doesn't make sense! publishedHost should be in boundHosts";logger.trace("adding node [" + nodeId + "]");return new Node(publishedHost, boundHosts, name, version, new Roles(master, data, ingest),unmodifiableMap(realAttributes));}/*** Returns {@code defaultValue} if the attribute didn't come back,* {@code true} or {@code false} if it did come back as* either of those, or throws an IOException if the attribute* came back in a strange way.*/private static Boolean v2RoleAttributeValue(Map<String, List<String>> attributes,String name, Boolean defaultValue) throws IOException {List<String> valueList = attributes.remove(name);if (valueList == null) {return defaultValue;}if (valueList.size() != 1) {throw new IOException("expected only a single attribute value for [" + name + "] but got "+ valueList);}switch (valueList.get(0)) {case "true":return true;case "false":return false;default:throw new IOException("expected [" + name + "] to be either [true] or [false] but was ["+ valueList.get(0) + "]");}}//......
}
复制代码
  • readNode方法用于解析nodes部分的数据,它会解析http、attributes、roles、version;然后会对2.x版本的进行特殊处理,最后使用publishedHost、boundHosts、name、version、master、data、ingest、realAttributes构建Node实例并返回

小结

  • NodesSniffer接口定义了sniff方法用于获取sniffed Elasticsearch nodes,它有一个实现类为ElasticsearchNodesSniffer
  • ElasticsearchNodesSniffer的构造器需要restClient、sniffRequestTimeoutMillis、scheme三个参数,其中sniffRequestTimeoutMillis默认为1秒,scheme默认为HTTP;它的构造器创建了GET /_nodes/http的request;sniff方法使用restClient.performRequest来执行这个GET /_nodes/http的request,之后调用readHosts来解析response,其中调用了readNode方法来解析nodes部分
  • readNode方法用于解析nodes部分的数据,它会解析http、attributes、roles、version;然后会对2.x版本的进行特殊处理,最后使用publishedHost、boundHosts、name、version、master、data、ingest、realAttributes构建Node实例并返回

doc

  • NodesSniffer

转载于:https://juejin.im/post/5ce01749e51d45109618dc3b

聊聊Elasticsearch的NodesSniffer相关推荐

  1. 聊聊Elasticsearch的ExponentiallyWeightedMovingAverage

    序 本文主要研究一下Elasticsearch的ExponentiallyWeightedMovingAverage ExponentiallyWeightedMovingAverage elasti ...

  2. 聊聊elasticsearch的RoutingService

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下elasticsearch的RoutingService RoutingService elasticsearch ...

  3. 聊聊Elasticsearch的TimedRunnable

    序 本文主要研究一下Elasticsearch的TimedRunnable TimedRunnable elasticsearch-7.0.1/server/src/main/java/org/ela ...

  4. 聊聊Elasticsearch的Iterables

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下Elasticsearch的Iterables Iterables elasticsearch-7.0.1/ser ...

  5. 聊聊Elasticsearch RestClient的RequestLogger

    序 本文主要研究一下Elasticsearch RestClient的RequestLogger RequestLogger elasticsearch-7.0.1/client/rest/src/m ...

  6. 聊聊Elasticsearch的CachedSupplier

    序 本文主要研究一下Elasticsearch的CachedSupplier CachedSupplier elasticsearch-7.0.1/server/src/main/java/org/e ...

  7. 聊聊Elasticsearch的BootstrapCheck

    序 本文主要研究一下Elasticsearch的BootstrapCheck BootstrapCheck elasticsearch-7.0.1/server/src/main/java/org/e ...

  8. 聊聊Elasticsearch的RunOnce

    序 本文主要研究一下Elasticsearch的RunOnce RunOnce elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/c ...

  9. 转发和重定向简介及与之相关的(URL)参数(parameter)、属性(attribute)问题探讨

    转发和重定向简介及与之相关的(URL)参数(parameter).属性(attribute)问题探讨 蜀中孤鹰 2019-07-15 15:55:49 101 收藏 最后发布:2019-07-15 1 ...

最新文章

  1. 广告基本知识-在线广告的市场
  2. 哈佛研究人员开发基于折纸造型高精度微型手术机器人
  3. Python+Django+Ansible Playbook自动化运维项目实战(二)
  4. yum mysql 启动失败_Linux下MySQL数据库yum升级后无法启动解决办法 | 系统运维
  5. 去掉文化管理系统广告
  6. ubuntu: no module named _sqlite
  7. 基于FPGA的异构计算在多媒体中的应用
  8. 回归分析中的正则化问题
  9. 上传 jar 包到 nexus3、上传本地 jar 包到 maven 私服
  10. printf,sprintf,vsprintf 区别【转】
  11. SSM框架-添加信息及图片上传到本地MultipartResolver-foreknow_cms
  12. 构建之法读书笔记05
  13. 浪潮配置ipim_浪潮服务器管理口IP设置_IPMI设置
  14. 详解示波器的三个主要参数:采样率,存储深度,带宽
  15. 虚拟化技术介绍 hypervisor简介
  16. 香橙派 Orangepi Zero2 外壳天线改装
  17. 用Python机器人监听微信群聊
  18. 【无标题】神马TV(前端apk文件_后端苹果cms v10)
  19. 代码与程序的区别与联系
  20. 三轴点胶机程序 用台达AS228T和威纶触摸屏编写。 注意软件是用台达新款软件ISPSOFT

热门文章

  1. C++中调用MatLab接口等
  2. 作用域和改变this指向
  3. 服务器电源系统,服务器电源系统于新一代数据中心设计的基础意义
  4. 极验滑块验证码破解与研究(二):缺口图片还原
  5. 携程违反银联禁止记录CVC码的规定 可能面临重罚
  6. speedoffice表格中怎样给文字添加删除线?
  7. Outlook邮件创建的规则失效,可能的原因
  8. 揭秘 Google 成长史:荒诞梦想的副产品
  9. HI3861学习笔记(19)——WiFi接口使用(STA和AP模式)
  10. T-SQL查询语句(一)