聊聊Elasticsearch的NodesSniffer
序
本文主要研究一下Elasticsearch的NodesSniffer
NodesSniffer
elasticsearch-7.0.1/client/sniffer/src/main/java/org/elasticsearch/client/sniff/NodesSniffer.java
/*** Responsible for sniffing the http hosts*/
public interface NodesSniffer {/*** Returns the sniffed Elasticsearch nodes.*/List<Node> sniff() throws IOException;
}
复制代码
- NodesSniffer接口定义了sniff方法用于获取sniffed Elasticsearch nodes,它有一个实现类为ElasticsearchNodesSniffer
ElasticsearchNodesSniffer
elasticsearch-7.0.1/client/sniffer/src/main/java/org/elasticsearch/client/sniff/ElasticsearchNodesSniffer.java
public final class ElasticsearchNodesSniffer implements NodesSniffer {private static final Log logger = LogFactory.getLog(ElasticsearchNodesSniffer.class);public static final long DEFAULT_SNIFF_REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(1);private final RestClient restClient;private final Request request;private final Scheme scheme;private final JsonFactory jsonFactory = new JsonFactory();public ElasticsearchNodesSniffer(RestClient restClient) {this(restClient, DEFAULT_SNIFF_REQUEST_TIMEOUT, ElasticsearchNodesSniffer.Scheme.HTTP);}public ElasticsearchNodesSniffer(RestClient restClient, long sniffRequestTimeoutMillis, Scheme scheme) {this.restClient = Objects.requireNonNull(restClient, "restClient cannot be null");if (sniffRequestTimeoutMillis < 0) {throw new IllegalArgumentException("sniffRequestTimeoutMillis must be greater than 0");}this.request = new Request("GET", "/_nodes/http");request.addParameter("timeout", sniffRequestTimeoutMillis + "ms");this.scheme = Objects.requireNonNull(scheme, "scheme cannot be null");}/*** Calls the elasticsearch nodes info api, parses the response and returns all the found http hosts*/@Overridepublic List<Node> sniff() throws IOException {Response response = restClient.performRequest(request);return readHosts(response.getEntity(), scheme, jsonFactory);}static List<Node> readHosts(HttpEntity entity, Scheme scheme, JsonFactory jsonFactory) throws IOException {try (InputStream inputStream = entity.getContent()) {JsonParser parser = jsonFactory.createParser(inputStream);if (parser.nextToken() != JsonToken.START_OBJECT) {throw new IOException("expected data to start with an object");}List<Node> nodes = new ArrayList<>();while (parser.nextToken() != JsonToken.END_OBJECT) {if (parser.getCurrentToken() == JsonToken.START_OBJECT) {if ("nodes".equals(parser.getCurrentName())) {while (parser.nextToken() != JsonToken.END_OBJECT) {JsonToken token = parser.nextToken();assert token == JsonToken.START_OBJECT;String nodeId = parser.getCurrentName();Node node = readNode(nodeId, parser, scheme);if (node != null) {nodes.add(node);}}} else {parser.skipChildren();}}}return nodes;}}//......public enum Scheme {HTTP("http"), HTTPS("https");private final String name;Scheme(String name) {this.name = name;}@Overridepublic String toString() {return name;}}}
复制代码
- ElasticsearchNodesSniffer的构造器需要restClient、sniffRequestTimeoutMillis、scheme三个参数,其中sniffRequestTimeoutMillis默认为1秒,scheme默认为HTTP;它的构造器创建了GET /_nodes/http的request;sniff方法使用restClient.performRequest来执行这个GET /_nodes/http的request,之后调用readHosts来解析response,其中调用了readNode方法来解析nodes部分
GET /_nodes/http实例
{"_nodes" : {"total" : 1,"successful" : 1,"failed" : 0},"cluster_name" : "docker-cluster","nodes" : {"d7w2wdw7Q7SqERe5_fxZYA" : {"name" : "d7w2wdw","transport_address" : "172.17.0.2:9300","host" : "172.17.0.2","ip" : "172.17.0.2","version" : "6.6.2","build_flavor" : "oss","build_type" : "tar","build_hash" : "3bd3e59","roles" : ["master","data","ingest"],"http" : {"bound_address" : ["0.0.0.0:9200"],"publish_address" : "192.168.99.100:9200","max_content_length_in_bytes" : 104857600}}}
}
复制代码
- 这里http部分有publish_address信息
readNode
elasticsearch-7.0.1/client/sniffer/src/main/java/org/elasticsearch/client/sniff/ElasticsearchNodesSniffer.java
public final class ElasticsearchNodesSniffer implements NodesSniffer {//......private static Node readNode(String nodeId, JsonParser parser, Scheme scheme) throws IOException {HttpHost publishedHost = null;/** We sniff the bound hosts so we can look up the node based on any* address on which it is listening. This is useful in Elasticsearch's* test framework where we sometimes publish ipv6 addresses but the* tests contact the node on ipv4.*/Set<HttpHost> boundHosts = new HashSet<>();String name = null;String version = null;/** Multi-valued attributes come with key = `real_key.index` and we* unflip them after reading them because we can't rely on the order* that they arive.*/final Map<String, String> protoAttributes = new HashMap<String, String>();boolean sawRoles = false;boolean master = false;boolean data = false;boolean ingest = false;String fieldName = null;while (parser.nextToken() != JsonToken.END_OBJECT) {if (parser.getCurrentToken() == JsonToken.FIELD_NAME) {fieldName = parser.getCurrentName();} else if (parser.getCurrentToken() == JsonToken.START_OBJECT) {if ("http".equals(fieldName)) {while (parser.nextToken() != JsonToken.END_OBJECT) {if (parser.getCurrentToken() == JsonToken.VALUE_STRING && "publish_address".equals(parser.getCurrentName())) {URI publishAddressAsURI = URI.create(scheme + "://" + parser.getValueAsString());publishedHost = new HttpHost(publishAddressAsURI.getHost(), publishAddressAsURI.getPort(),publishAddressAsURI.getScheme());} else if (parser.currentToken() == JsonToken.START_ARRAY && "bound_address".equals(parser.getCurrentName())) {while (parser.nextToken() != JsonToken.END_ARRAY) {URI boundAddressAsURI = URI.create(scheme + "://" + parser.getValueAsString());boundHosts.add(new HttpHost(boundAddressAsURI.getHost(), boundAddressAsURI.getPort(),boundAddressAsURI.getScheme()));}} else if (parser.getCurrentToken() == JsonToken.START_OBJECT) {parser.skipChildren();}}} else if ("attributes".equals(fieldName)) {while (parser.nextToken() != JsonToken.END_OBJECT) {if (parser.getCurrentToken() == JsonToken.VALUE_STRING) {String oldValue = protoAttributes.put(parser.getCurrentName(), parser.getValueAsString());if (oldValue != null) {throw new IOException("repeated attribute key [" + parser.getCurrentName() + "]");}} else {parser.skipChildren();}}} else {parser.skipChildren();}} else if (parser.currentToken() == JsonToken.START_ARRAY) {if ("roles".equals(fieldName)) {sawRoles = true;while (parser.nextToken() != JsonToken.END_ARRAY) {switch (parser.getText()) {case "master":master = true;break;case "data":data = true;break;case "ingest":ingest = true;break;default:logger.warn("unknown role [" + parser.getText() + "] on node [" + nodeId + "]");}}} else {parser.skipChildren();}} else if (parser.currentToken().isScalarValue()) {if ("version".equals(fieldName)) {version = parser.getText();} else if ("name".equals(fieldName)) {name = parser.getText();}}}//http section is not present if http is not enabled on the node, ignore such nodesif (publishedHost == null) {logger.debug("skipping node [" + nodeId + "] with http disabled");return null;}Map<String, List<String>> realAttributes = new HashMap<>(protoAttributes.size());List<String> keys = new ArrayList<>(protoAttributes.keySet());for (String key : keys) {if (key.endsWith(".0")) {String realKey = key.substring(0, key.length() - 2);List<String> values = new ArrayList<>();int i = 0;while (true) {String value = protoAttributes.remove(realKey + "." + i);if (value == null) {break;}values.add(value);i++;}realAttributes.put(realKey, unmodifiableList(values));}}for (Map.Entry<String, String> entry : protoAttributes.entrySet()) {realAttributes.put(entry.getKey(), singletonList(entry.getValue()));}if (version.startsWith("2.")) {/** 2.x doesn't send roles, instead we try to read them from* attributes.*/boolean clientAttribute = v2RoleAttributeValue(realAttributes, "client", false);Boolean masterAttribute = v2RoleAttributeValue(realAttributes, "master", null);Boolean dataAttribute = v2RoleAttributeValue(realAttributes, "data", null);master = masterAttribute == null ? false == clientAttribute : masterAttribute;data = dataAttribute == null ? false == clientAttribute : dataAttribute;} else {assert sawRoles : "didn't see roles for [" + nodeId + "]";}assert boundHosts.contains(publishedHost) :"[" + nodeId + "] doesn't make sense! publishedHost should be in boundHosts";logger.trace("adding node [" + nodeId + "]");return new Node(publishedHost, boundHosts, name, version, new Roles(master, data, ingest),unmodifiableMap(realAttributes));}/*** Returns {@code defaultValue} if the attribute didn't come back,* {@code true} or {@code false} if it did come back as* either of those, or throws an IOException if the attribute* came back in a strange way.*/private static Boolean v2RoleAttributeValue(Map<String, List<String>> attributes,String name, Boolean defaultValue) throws IOException {List<String> valueList = attributes.remove(name);if (valueList == null) {return defaultValue;}if (valueList.size() != 1) {throw new IOException("expected only a single attribute value for [" + name + "] but got "+ valueList);}switch (valueList.get(0)) {case "true":return true;case "false":return false;default:throw new IOException("expected [" + name + "] to be either [true] or [false] but was ["+ valueList.get(0) + "]");}}//......
}
复制代码
- readNode方法用于解析nodes部分的数据,它会解析http、attributes、roles、version;然后会对2.x版本的进行特殊处理,最后使用publishedHost、boundHosts、name、version、master、data、ingest、realAttributes构建Node实例并返回
小结
- NodesSniffer接口定义了sniff方法用于获取sniffed Elasticsearch nodes,它有一个实现类为ElasticsearchNodesSniffer
- ElasticsearchNodesSniffer的构造器需要restClient、sniffRequestTimeoutMillis、scheme三个参数,其中sniffRequestTimeoutMillis默认为1秒,scheme默认为HTTP;它的构造器创建了GET /_nodes/http的request;sniff方法使用restClient.performRequest来执行这个GET /_nodes/http的request,之后调用readHosts来解析response,其中调用了readNode方法来解析nodes部分
- readNode方法用于解析nodes部分的数据,它会解析http、attributes、roles、version;然后会对2.x版本的进行特殊处理,最后使用publishedHost、boundHosts、name、version、master、data、ingest、realAttributes构建Node实例并返回
doc
- NodesSniffer
转载于:https://juejin.im/post/5ce01749e51d45109618dc3b
聊聊Elasticsearch的NodesSniffer相关推荐
- 聊聊Elasticsearch的ExponentiallyWeightedMovingAverage
序 本文主要研究一下Elasticsearch的ExponentiallyWeightedMovingAverage ExponentiallyWeightedMovingAverage elasti ...
- 聊聊elasticsearch的RoutingService
为什么80%的码农都做不了架构师?>>> 序 本文主要研究一下elasticsearch的RoutingService RoutingService elasticsearch ...
- 聊聊Elasticsearch的TimedRunnable
序 本文主要研究一下Elasticsearch的TimedRunnable TimedRunnable elasticsearch-7.0.1/server/src/main/java/org/ela ...
- 聊聊Elasticsearch的Iterables
为什么80%的码农都做不了架构师?>>> 序 本文主要研究一下Elasticsearch的Iterables Iterables elasticsearch-7.0.1/ser ...
- 聊聊Elasticsearch RestClient的RequestLogger
序 本文主要研究一下Elasticsearch RestClient的RequestLogger RequestLogger elasticsearch-7.0.1/client/rest/src/m ...
- 聊聊Elasticsearch的CachedSupplier
序 本文主要研究一下Elasticsearch的CachedSupplier CachedSupplier elasticsearch-7.0.1/server/src/main/java/org/e ...
- 聊聊Elasticsearch的BootstrapCheck
序 本文主要研究一下Elasticsearch的BootstrapCheck BootstrapCheck elasticsearch-7.0.1/server/src/main/java/org/e ...
- 聊聊Elasticsearch的RunOnce
序 本文主要研究一下Elasticsearch的RunOnce RunOnce elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/c ...
- 转发和重定向简介及与之相关的(URL)参数(parameter)、属性(attribute)问题探讨
转发和重定向简介及与之相关的(URL)参数(parameter).属性(attribute)问题探讨 蜀中孤鹰 2019-07-15 15:55:49 101 收藏 最后发布:2019-07-15 1 ...
最新文章
- 广告基本知识-在线广告的市场
- 哈佛研究人员开发基于折纸造型高精度微型手术机器人
- Python+Django+Ansible Playbook自动化运维项目实战(二)
- yum mysql 启动失败_Linux下MySQL数据库yum升级后无法启动解决办法 | 系统运维
- 去掉文化管理系统广告
- ubuntu: no module named _sqlite
- 基于FPGA的异构计算在多媒体中的应用
- 回归分析中的正则化问题
- 上传 jar 包到 nexus3、上传本地 jar 包到 maven 私服
- printf,sprintf,vsprintf 区别【转】
- SSM框架-添加信息及图片上传到本地MultipartResolver-foreknow_cms
- 构建之法读书笔记05
- 浪潮配置ipim_浪潮服务器管理口IP设置_IPMI设置
- 详解示波器的三个主要参数:采样率,存储深度,带宽
- 虚拟化技术介绍 hypervisor简介
- 香橙派 Orangepi Zero2 外壳天线改装
- 用Python机器人监听微信群聊
- 【无标题】神马TV(前端apk文件_后端苹果cms v10)
- 代码与程序的区别与联系
- 三轴点胶机程序 用台达AS228T和威纶触摸屏编写。 注意软件是用台达新款软件ISPSOFT