扫一扫加入大数据公众号和技术交流群,了解更多大数据技术,还有免费资料等你哦

简介

ES提供了两种java的API对数据进行ES集群操作:TransportClient,Java REST Client。但是有以下几点需要注意:

  1. 计划在7中删除TransportClient客户端,并在8中完全删除它。
  2. Java REST Client客户端目前支持更常用的API,但仍有很多需要添加的API。
  3. 任何缺失的API都可以通过使用JSON请求和响应体的低级Java REST客户端来实现。

TransportClient即将过时,虽然RestHighLevelClient还不完善,还需要增加新API,但是RestLowLevelClient非常完善,满足我们的API需求。因此本文主要介绍如何使用Java REST Client,以及Low Level REST Client和High Level REST Client的使用。

ES  java api官网地址:

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-search.html

注意:在使用 High-level-rest-client 的方式创建 client 的时候务必注意版本的情况,我这里使用的是6.3.2版本的,不同版本之间创建 client 的方式的差别还是比较大的

High Level 与Low Level REST Client

Java REST客户端有两种风格:

  1. Java低级别REST客户端(Java Low Level REST Client):Elasticsearch的官方low-level客户端。 它允许通过http与Elasticsearch集群进行通信。 不会对请求进行编码和响应解码。 它与所有Elasticsearch版本兼容。
  2. Java高级REST客户端(Java High Level REST Client):Elasticsearch的官方high-level客户端。 基于low-level客户端,它公开了API特定的方法,并负责处理。

Java High Level REST Client

Java高级别REST客户端(The Java High Level REST Client),内部仍然是基于低级客户端。它提供了更多的API,接受请求对象作为参数并返回响应对象,由客户端自己处理编码和解码。每个API都可以同步或异步调用。 同步方法返回一个响应对象,而异步方法的名称以async后缀结尾,需要一个监听器参数,一旦收到响应或错误,就会被通知(由低级客户端管理的线程池)。高级客户端依赖于Elasticsearch core项目。 它接受与TransportClient相同的请求参数并返回相同的响应对象。

依赖

高级客户端依赖于以下部件及其传递依赖关系:

  • org.elasticsearch.client:elasticsearch-rest-client
  • org.elasticsearch:elasticsearch

初始化

RestHighLevelClient实例需要低级客户端构建器来构建,如下所示:

RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http"),new HttpHost("localhost", 9200, "http")));

高级客户端将在内部创建低级客户端,用来执行基于提供的构建器的请求,并管理其生命周期。当不再需要时,需要关闭高级客户端实例,以便它所使用的所有资源以及底层的http客户端实例及其线程得到正确释放。可以通过close方法来完成,该方法将关闭内部的RestClient实例。

Java Low Level REST Client

ES的官方low-level客户端。 它允许通过http与Elasticsearch集群进行通信。 不会对请求进行编码和响应解码。 它与所有ES版本兼容。

依赖

低级客户端在内部使用Apache Http Async Client发送http请求。 它依赖于以下部件,即the async http client及其自身的传递依赖:

org.apache.httpcomponents:httpasyncclient
org.apache.httpcomponents:httpcore-nio
org.apache.httpcomponents:httpclient
org.apache.httpcomponents:httpcore
commons-codec:commons-codec
commons-logging:commons-logging

Shading

为了避免版本冲突,依赖需要shaded和打包到一个单独的jar文件中。(该操作也被称作"uber JAR"或"fat JAR",是一种可执行的Jar包。FatJar和普通的jar不同在于它包含了依赖的jar包。)对依赖进行隐藏需要取其内容(资源文件和java类文件),然后在放到jar文件之前会对一些包进行重命名。

初始化

RestClient实例可以通过RestClientBuilder类创建,通过RestClient 的 builder(HttpHost ...)静态方法创建。 唯一需要的参数是客户端将与之通信的一个或多个主机,如下所示:

RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http"),new HttpHost("localhost", 9200, "http")).build();

RestClient类是线程安全的,理想情况下与使用它的应用程序具有相同的生命周期。当不再需要时关闭它是非常重要的,这样它所使用的所有资源以及底层http客户端实例及其线程都可以得到释放。

restClient.close();

执行请求

一旦创建了RestClient,就可以调用performRequest或performRequestAsync方法来发送请求。

performRequest方法:是同步的,直接返回响应,这意味着客户端将被阻塞并等待响应返回。

performRequestAsync方法:返回void,并接受一个ResponseListener作为参数,这意味着它们是异步执行的。当请求完成或失败时,监听器将被通知。

代码

对ES集群数据的曾删改查

第一步:导入meavn依赖

 <dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>6.3.2</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>6.3.2</version></dependency>
public class EsUtil {private static final Logger LOGGER = LoggerFactory.getLogger(EsUtil.class);/*** 获取client连接*/public  static RestHighLevelClient getHighLevelClient(String host,int port,String schema) {/* //单个ip创建方式HttpHost  httpHost= new HttpHost(host, port, schema);RestClientBuilder builder = RestClient.builder(httpHost);RestHighLevelClient  client = new RestHighLevelClient(builder);return client;*/String[] ipArr = host.split(",");HttpHost[] httpHosts = new HttpHost[ipArr.length];for (int i = 0; i < ipArr.length; i++) {httpHosts[i] = new HttpHost(ipArr[i], port, schema);}RestClientBuilder builder = RestClient.builder(httpHosts);RestHighLevelClient  client = new RestHighLevelClient(builder);return  client;}/*** 创建索引* @param index* @return*/public static boolean createIndex(String index,RestHighLevelClient client) {//index名必须全小写,否则报错CreateIndexRequest request = new CreateIndexRequest(index);try {CreateIndexResponse indexResponse = client.indices().create(request);if (indexResponse.isAcknowledged()) {LOGGER.info("创建索引成功");} else {LOGGER.info("创建索引失败");}return indexResponse.isAcknowledged();} catch (IOException e) {e.printStackTrace();}return false;}/*** 查询所有数据* @return*/public static String queryAll(String indexName,String esType,RestHighLevelClient client) {try {HttpEntity entity = new NStringEntity("{ \"query\": { \"match_all\": {}}}",ContentType.APPLICATION_JSON);String endPoint = "/" + indexName + "/" + esType + "/_search";Response response = EsUtil.getLowLevelClient(client).performRequest("POST", endPoint, Collections.<String, String>emptyMap(), entity);return EntityUtils.toString(response.getEntity());} catch (IOException e) {e.printStackTrace();}return "查询数据出错";}/*** 根据条件查询* @return*/public static String queryByMatch(String indexName,String esType,String fieldKey,String fieldValue,RestHighLevelClient client){try {String endPoint = "/" + indexName + "/" + esType + "/_search";IndexRequest indexRequest = new IndexRequest();XContentBuilder builder;try {builder = JsonXContent.contentBuilder().startObject().startObject("query").startObject("match").field( fieldKey+".keyword", fieldValue).endObject().endObject().endObject();indexRequest.source(builder);} catch (IOException e) {e.printStackTrace();}String source = indexRequest.source().utf8ToString();HttpEntity entity = new NStringEntity(source, ContentType.APPLICATION_JSON);Response response = EsUtil.getLowLevelClient(client).performRequest("POST", endPoint, Collections.<String, String>emptyMap(), entity);return EntityUtils.toString(response.getEntity());} catch (IOException e) {e.printStackTrace();}return "查询数据出错";}/*** 插入数据* @param index* @param type* @param object* @return*/public static String addData(RestHighLevelClient client, String index, String type, String id, Map object) {String dataId;IndexRequest indexRequest = new IndexRequest(index, type, id);try {indexRequest.source(object, XContentType.JSON);IndexResponse indexResponse = client.index(indexRequest);return indexResponse.getId();} catch (Exception e) {e.printStackTrace();}return "";}/*** 检查索引是否存在* @param index* @return* @throws IOException*/public static boolean checkIndexExist(RestHighLevelClient client,String index) {try {Response response = client.getLowLevelClient().performRequest("HEAD", index);boolean exist = response.getStatusLine().getReasonPhrase().equals("OK");return exist;} catch (IOException e) {e.printStackTrace();}return false;}/*** 获取低水平客户端* @return*/public static RestClient getLowLevelClient(RestHighLevelClient client) {return client.getLowLevelClient();}/*** 关闭连接*/public static void close(RestHighLevelClient client) {if (client != null) {try {client.close();} catch (IOException e) {e.printStackTrace();}}}
}

测试:

public class EsUtilTest {
public static void main(String[] args) {String host = "ip1,ip2,ip3";int port = 9200;String schema = "http";//获取客户端RestHighLevelClient client = EsUtil.getHighLevelClient(host, port, schema);boolean flog = EsUtil.checkIndexExist(client, "es_index");System.out.println(flog);HashMap<String, String> map = new HashMap<>();map.put("name","wzh");map.put("city","beijing");map.put("sex","men");//插入数据EsUtil.addData(client,"es_index","user","51",map);//查询数据String s = EsUtil.queryByMatch("es_flink2", "user", "city","beijing", client);System.out.println("结果数据"+s);EsUtil.close}
}

Bulk API 批处理

注意:务必注意es版本的情况,Bulk API的测试,这里使用的是6.0.0版本的,不同版本之间api的差别还是比较大的,再次强调,你值得注意。

Bulk request

之前的文档说明过,bulk接口是批量index/update/delete操作
在API中,只需要一个bulk request就可以完成一批请求。

BulkRequest request = new BulkRequest();
request.add(new IndexRequest("posts", "doc", "1")  .source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts", "doc", "2")  .source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts", "doc", "3")  .source(XContentType.JSON,"field", "baz"));
  • 注意,Bulk API只接受JSON和SMILE格式.其他格式的数据将会报错。
  • 不同类型的request可以写在同一个bulk request里。

bulk processor

BulkProcessor 简化bulk API的使用,并且使整个批量操作透明化。
BulkProcessor 的执行需要三部分组成:

  1. RestHighLevelClient :执行bulk请求并拿到响应对象。
  2. BulkProcessor.Listener:在执行bulk request之前、之后和当bulk response发生错误时调用。
  3. ThreadPool:bulk request在这个线程池中执行操作,这使得每个请求不会被挡住,在其他请求正在执行时,也可以接收新的请求。

代码

public class EsBulkUtil {private static final Logger LOGGER = LoggerFactory.getLogger(EsBulkUtil.class);private static  RestHighLevelClient  client;private static  BulkProcessor bulkProcessor;public EsBulkUtil() {}/*** 获取client连接*/public  void  initHighLevelClient(String host, int port, String schema) {if (client == null) {synchronized (EsBulkUtil.class) {if (client == null) {LOGGER.info("es create connection");String[] ipArr = host.split(",");HttpHost[] httpHosts = new HttpHost[ipArr.length];for (int i = 0; i < ipArr.length; i++) {httpHosts[i] = new HttpHost(ipArr[i], port, schema);}RestClientBuilder builder = RestClient.builder(httpHosts);this.client = new RestHighLevelClient(builder);LOGGER.info("es create connection success");}}}}/*** 获取BulkProcessor操作对象* @param* @return*/public void initBulkProcessor() {Settings settings = Settings.builder().build();ThreadPool threadPool = new ThreadPool(settings);BulkProcessor.Listener listener = new BulkProcessor.Listener() {@Overridepublic void beforeBulk(long executionId, BulkRequest request) {//重写beforeBulk,在每次bulk request发出前执行,在这个方法里面可以知道在本次批量操作中有多少操作数int numberOfActions = request.numberOfActions();LOGGER.debug("Executing bulk [{" + executionId + "}] with {" + numberOfActions + "} requests");}@Overridepublic void afterBulk(long executionId, BulkRequest request, BulkResponse response) {//重写afterBulk方法,每次批量请求结束后执行,可以在这里知道是否有错误发生if (response.hasFailures()) {LOGGER.warn("Bulk [{" + executionId + "}] executed with failures");} else {LOGGER.debug("Bulk [{" + executionId + "}] completed in {" + response.getTook().getMillis() + "} milliseconds");}}@Overridepublic void afterBulk(long executionId, BulkRequest request, Throwable failure) {//重写方法,如果发生错误就会调用。LOGGER.error("Failed to execute bulk", failure);failure.printStackTrace();}};BulkProcessor.Builder BulkProcessorBuilder = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool);//1w次请求执行一次bulkBulkProcessorBuilder.setBulkActions(100000);//64MB的数据刷新一次bulkBulkProcessorBuilder.setBulkSize(new ByteSizeValue(64L, ByteSizeUnit.MB));//并发请求数量,0不并发,1并发允许执行BulkProcessorBuilder.setConcurrentRequests(0);//固定60s刷新一次BulkProcessorBuilder.setFlushInterval(TimeValue.timeValueSeconds(60L));//设置退避,1s后执行,最大请求3次BulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));//在这里调用build()方法构造bulkProcessor,在底层实际上是用了bulk的异步操作bulkProcessor = BulkProcessorBuilder.build();}/*** 插入数据* @return*/public  void  bulkAdd(String indexName, String typeName, String indexId, Map<String, String> map) {try {IndexRequest indexRequest = new IndexRequest(indexName, typeName, indexId).source(map);UpdateRequest updateRequest = new UpdateRequest(indexName, typeName, indexId).doc(map).upsert(indexRequest);bulkProcessor.add(updateRequest);} catch (Exception e) {e.printStackTrace();}}/*** 删除数据* @return*/public  void  bulkDelete(String indexName, String typeName, String indexId) {try {bulkProcessor.add(new DeleteRequest(indexName, typeName, indexId));} catch (Exception e) {e.printStackTrace();}}/*** 刷新bulkProcessor* @param bulkProcessor*/public void flush(BulkProcessor bulkProcessor) {try {bulkProcessor.flush();} catch (Exception e) {e.printStackTrace();}}/*** 关闭连接*/public static void close() {if (client != null) {try {bulkProcessor.flush();bulkProcessor.close();client.close();} catch (IOException e) {e.printStackTrace();}}}
}

测试:

public class test {public static void main(String[] args) {String host = "ip1,ip2,ip3";int port = 9200;String schema = "http";EsBulkUtil esBulkUtil = new EsBulkUtil();//初始化clientesBulkUtil.initHighLevelClient(host,port,schema);//初始化bulkProcessoresBulkUtil.initBulkProcessor();HashMap<String, String> map = new HashMap<>();map.put("name","wzh1");map.put("city","beijing1");map.put("sex","men1");//插入数据esBulkUtil.bulkAdd("es_flink2","user","1",map);esBulkUtil.flush();esBulkUtil.close();}
}

扫一扫加入大数据公众号和技术交流群,了解更多大数据技术,还有免费资料等你哦

扫一扫加入大数据公众号和技术交流群,了解更多大数据技术,还有免费资料等你哦

扫一扫加入大数据公众号和技术交流群,了解更多大数据技术,还有免费资料等你哦

es (Elasticsearch)--Java REST Client解析相关推荐

  1. Elasticsearch RestHighLevelClient 已标记为被弃用 它的替代方案 Elasticsearch Java API Client 的基础教程及迁移方案

    在Elasticsearch7.15版本之后,Elasticsearch官方将它的高级客户端RestHighLevelClient标记为弃用状态.同时推出了全新的Java API客户端Elastics ...

  2. SpringBoot整合最新Elasticsearch Java API Client 7.16教程

    文章目录 前言 一.Elasticsearch和Kibana 7.16版本安装 二.pom.xml文件引入依赖 三.代码实例 总结 前言 最新在学习SpringBoot整合es的一些知识,浏览了网上的 ...

  3. Elasticsearch8.0版本中Elasticsearch Java API Client客户端的基本使用方法

    关于Elasticsearch Java API Client客户端如何连接以及如何对索引和文档进行基本的增删改查操作请查看我的上一篇博文:Elasticsearch RestHighLevelCli ...

  4. Elasticsearch Java 操作client

    0.题记 之前Elasticsearch的应用比较多,但大多集中在关系型.非关系型数据库与Elasticsearch之间的同步.以上内容完成了Elasticsearch所需要的基础数据量的供给.但想要 ...

  5. Springboot整合ES8(Java API Client)

    在 Elasticsearch7.15版本之后,Elasticsearch官方将它的高级客户端 RestHighLevelClient标记为弃用状态.同时推出了全新的 Java API客户端 Elas ...

  6. 干货 | Elasticsearch Java 客户端演进历史和选型指南

    1.Elasticsearch java 客户端为什么要选型? Elasticsearch 官方提供了很多版本的 Java 客户端,包含但不限于: Transport 客户端 Java REST 客户 ...

  7. ElasticSearch Java High level Rest Client 官方文档中文翻译(一)

    ElasticSearch Java High level Rest Client 官方文档中文翻译 一 纯粹记录自己在看官网的es rest high level api 时的翻译笔记,可以对照着官 ...

  8. Elasticsearch java api操作(一)(Java Low Level Rest Client)

    一.说明: 一.Elasticsearch提供了两个JAVA REST Client版本: 1.java low level rest client: 低级别的rest客户端,通过http与集群交互, ...

  9. Elasticsearch Java API 6.2(java client)

    前言 本节描述了Elasticsearch提供的Java API,所有的Elasticsearch操作都使用客户端对象执行,所有操作本质上都是完全异步的(要么接收监听器,要么未来返回). 此外,客户端 ...

最新文章

  1. COM:微生物促进植物的氮获得
  2. 从一道面试题说去 2
  3. JavaScript之实例练习(正反选、二级联动)
  4. 学生信息管理系统问题集锦(三)
  5. 2020年8月4日王者服务器维修,2020年8月4日小鸡正确的答案
  6. 网络爬虫——原理简介
  7. Slidworks2018基础到实战设计视频教程 产品建模 渲染 钣金设计
  8. Ubuntu 16.04 使用校园网客户端上网
  9. SAP 批导长文本字段自动和手动换行
  10. Element表格内容不垂直居中和内容换行显示
  11. 恒大帝景220平文华东路
  12. Algorithm 4th environment setup
  13. dnf打团正在连接服务器进不去是吗鬼,DNF打团速成职业注意事项解读 不再做手残辅助...
  14. Redis实现分布式限流(学习笔记
  15. C++ template —— tuple(十三)
  16. ncurses库的安装与入门
  17. 软件测试模型 — V模型
  18. 2022年血糖仪行业现状
  19. python培训网校
  20. thhinkphp5前后端分离微信公众号支付

热门文章

  1. 药物靶点信息数据库有哪些?都有哪些特点?
  2. Nginx 网站使用 acme配置 https证书访问步骤
  3. ArduCopter —— ArduPilot—— 飞行模式
  4. java quartz管理,SpringBoot中使用Quartz管理定时任务的方法
  5. Neville算法MATLAB实现
  6. 重玩 40 年前的经典游戏小蜜蜂,这次通关了源码
  7. Linux 系统裁剪--制作一个最小化的Linux iso镜像
  8. Linux终端L2层发包程序
  9. 单片机课程设计--网络时钟
  10. 洛谷OJ:P1379 八数码难题(双向搜索)