前言

前面历经33篇内容的讲解,与ES的请求操作都是在Kibana平台上用Restful请求完成的,一直没发布Java或python的客户端代码,Restful才是运用、理解ES核心功能最直接的表达方式,但实际项目中肯定是以Java/python来完成ES请求的发起与数据处理的,前面理解了ES的核心功能,后面Java API的使用将会非常简单,剩余的未覆盖的功能API,自行查阅文档即可。

概要

本篇讲解Elasticsearch的客户端API开发的一些示例,以Java语言为主,介绍一些最常用,最核心的API。

代码示例

引入依赖

我们以maven项目为例,添加项目依赖

org.elasticsearch

elasticsearch

6.3.1

org.elasticsearch.client

transport

6.3.1

log4j

log4j

1.2.17

org.apache.logging.log4j

log4j-core

2.12.1

建立ES连接创建Settings对象,指定集群名称

创建TransportClient对象,手动指定IP、端口即可

Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();

TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));

如果集群的节点数比较多,为每个node分别指定IP、Port可行性不高,我们可以使用集群节点自动探查的功能,代码如下:

// 将client.transport.sniff设置为true即可打开集群节点自动探查功能

Settings settings = Settings.builder().put("client.transport.sniff", true)..put("cluster.name", "elasticsearch").build();

// 只需要指定一个node就行

TransportClient client = new PreBuiltTransportClient(settings);

transport.addTransportAddress(new TransportAddress(InetAddress.getByName("192.168.17.137"), 9300));

基本CRUD

最基本的CRUD代码,可以当作入门demo来写:

/**

* 创建员工信息(创建一个document)

* @param client

*/

private static void createEmployee(TransportClient client) throws Exception {

IndexResponse response = client.prepareIndex("company", "employee", "1")

.setSource(XContentFactory.jsonBuilder()

.startObject()

.field("name", "jack")

.field("age", 27)

.field("position", "technique")

.field("country", "china")

.field("join_date", "2017-01-01")

.field("salary", 10000)

.endObject())

.get();

System.out.println(response.getResult());

}

/**

* 获取员工信息

* @param client

* @throws Exception

*/

private static void getEmployee(TransportClient client) throws Exception {

GetResponse response = client.prepareGet("company", "employee", "1").get();

System.out.println(response.getSourceAsString());

}

/**

* 修改员工信息

* @param client

* @throws Exception

*/

private static void updateEmployee(TransportClient client) throws Exception {

UpdateResponse response = client.prepareUpdate("company", "employee", "1")

.setDoc(XContentFactory.jsonBuilder()

.startObject()

.field("position", "technique manager")

.endObject())

.get();

System.out.println(response.getResult());

}

/**

* 删除 员工信息

* @param client

* @throws Exception

*/

private static void deleteEmployee(TransportClient client) throws Exception {

DeleteResponse response = client.prepareDelete("company", "employee", "1").get();

System.out.println(response.getResult());

}

搜索

我们之前使用Restful的搜索,现在改用java实现,原有的Restful示例如下:

GET /company/employee/_search

{

"query": {

"bool": {

"must": [

{

"match": {

"position": "technique"

}

}

],

"filter": {

"range": {

"age": {

"gte": 30,

"lte": 40

}

}

}

}

},

"from": 0,

"size": 1

}

等同于这样的Java代码:

SearchResponse response = client.prepareSearch("company")

.setTypes("employee")

.setQuery(QueryBuilders.termQuery("position", "technique")) // Query

.setPostFilter(QueryBuilders.rangeQuery("age").from(30).to(40)) // Filter

.setFrom(0).setSize(60)

.get();

聚合查询

聚合查询稍微麻烦一些,请求的封装和响应报文的解析,都是根据实际返回的结构来做的,例如下面的查询:

需求:按照country国家来进行分组

在每个country分组内,再按照入职年限进行分组

最后计算每个分组内的平均薪资

Restful的请求如下:

GET /company/employee/_search

{

"size": 0,

"aggs": {

"group_by_country": {

"terms": {

"field": "country"

},

"aggs": {

"group_by_join_date": {

"date_histogram": {

"field": "join_date",

"interval": "year"

},

"aggs": {

"avg_salary": {

"avg": {

"field": "salary"

}

}

}

}

}

}

}

}

用Java编写的请求如下:

SearchResponse sr = node.client().prepareSearch()

.addAggregation(

AggregationBuilders.terms("by_country").field("country")

.subAggregation(AggregationBuilders.dateHistogram("by_year")

.field("dateOfBirth")

.dateHistogramInterval(DateHistogramInterval.YEAR)

.subAggregation(AggregationBuilders.avg("avg_children").field("children"))

)

)

.execute().actionGet();

对响应的处理,则需要一层一层获取数据:

Map aggrMap = searchResponse.getAggregations().asMap();

StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country");

Iterator groupByCountryBucketIterator = groupByCountry.getBuckets().iterator();

while(groupByCountryBucketIterator.hasNext()) {

Bucket groupByCountryBucket = groupByCountryBucketIterator.next();

System.out.println(groupByCountryBucket.getKey() + "\t" + groupByCountryBucket.getDocCount());

Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date");

Iterator groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator();

while(groupByJoinDateBucketIterator.hasNext()) {

org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next();

System.out.println(groupByJoinDateBucket.getKey() + "\t" + groupByJoinDateBucket.getDocCount());

Avg avgSalary = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary");

System.out.println(avgSalary.getValue());

}

}

client.close();

}

upsert请求

private static void upsert(TransportClient transport) {

try {

IndexRequest index = new IndexRequest("book_shop", "books", "2").source(

XContentFactory.jsonBuilder().startObject()

.field("name", "mysql从入门到删库跑路")

.field("tags", "mysql")

.field("price", 32.8)

.endObject());

UpdateRequest update = new UpdateRequest("book_shop", "books", "2")

.doc(XContentFactory.jsonBuilder()

.startObject().field("price", 31.8)

.endObject())

.upsert(index);

UpdateResponse response = transport.update(update).get();

System.out.println(response.getVersion());

} catch (IOException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

}

mget请求

public static void mget(TransportClient transport) {

MultiGetResponse res = transport.prepareMultiGet()

.add("book_shop", "books", "1")

.add("book_shop", "books", "2")

.get();

for (MultiGetItemResponse item : res.getResponses()) {

System.out.println(item.getResponse());

}

}

bulk请求

public static void bulk(TransportClient transport) {

try {

BulkRequestBuilder bulk = transport.prepareBulk();

bulk.add(transport.prepareIndex("book_shop", "books", "3").setSource(

XContentFactory.jsonBuilder().startObject()

.field("name", "设计模式从入门到拷贝代码")

.field("tags", "设计模式")

.field("price", 55.9)

.endObject()));

bulk.add(transport.prepareIndex("book_shop", "books", "4").setSource(

XContentFactory.jsonBuilder().startObject()

.field("name", "架构设计从入门到google搜索")

.field("tags", "架构设计")

.field("price", 68.9)

.endObject()));

bulk.add(transport.prepareUpdate("book_shop", "books", "1").setDoc((XContentFactory.jsonBuilder()

.startObject().field("price", 32.8)

.endObject())));

BulkResponse bulkRes = bulk.get();

if (bulkRes.hasFailures()) {

System.out.println("Error...");

}

} catch (IOException e) {

e.printStackTrace();

}

}

scorll请求

public static void scorll(TransportClient client) {

SearchResponse bookShop = client.prepareSearch("book_shop").setScroll(new TimeValue(60000)).setSize(1).get();

int batchCnt = 0;

do {

// 循环读取scrollid信息,直到结果为空

for(SearchHit hit: bookShop.getHits().getHits()) {

System.out.println("batchCnt:" + ++batchCnt);

System.out.println(hit.getSourceAsString());

}

bookShop = client.prepareSearchScroll(bookShop.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();

} while (bookShop.getHits().getHits().length != 0);

}

搜索模板

public static void searchTemplates(TransportClient client) {

Map params = new HashMap<>(10);

params.put("from",0);

params.put("size",10);

params.put("tags","java");

SearchTemplateResponse str = new SearchTemplateRequestBuilder(client)

.setScript("page_query_by_tags")

.setScriptType(ScriptType.STORED)

.setScriptParams(params)

.setRequest(new SearchRequest())

.get();

for(SearchHit hit:str.getResponse().getHits().getHits()) {

System.out.println(hit.getSourceAsString());

}

}

多条件组合查询

public static void otherSearch(TransportClient client) {

SearchResponse response1 = client.prepareSearch("book_shop").setQuery(QueryBuilders.termQuery("tags", "java")).get();

SearchResponse response2 = client.prepareSearch("book_shop").setQuery(QueryBuilders.multiMatchQuery("32.8", "price","tags")).get();

SearchResponse response3 = client.prepareSearch("book_shop").setQuery(QueryBuilders.commonTermsQuery("name", "入门")).get();

SearchResponse response4 = client.prepareSearch("book_shop").setQuery(QueryBuilders.prefixQuery("name", "java")).get();

System.out.println(response1.getHits().getHits()[0].getSourceAsString());

System.out.println(response2.getHits().getHits()[0].getSourceAsString());

System.out.println(response3.getHits().getHits()[0].getSourceAsString());

System.out.println(response4.getHits().getHits()[0].getSourceAsString());

// 多个条件组合

SearchResponse response5 = client.prepareSearch("book_shop").setQuery(QueryBuilders.boolQuery()

.must(QueryBuilders.termQuery("tags", "java"))

.mustNot(QueryBuilders.matchQuery("name", "跑路"))

.should(QueryBuilders.matchQuery("name", "入门"))

.filter(QueryBuilders.rangeQuery("price").gte(23).lte(55))).get();

System.out.println(response5.getHits().getHits()[0].getSourceAsString());

}

地理位置查询

public static void geo(TransportClient client) {

GeoBoundingBoxQueryBuilder query1 = QueryBuilders.geoBoundingBoxQuery("location").setCorners(23, 112, 21, 114);

List points = new ArrayList<>();

points.add(new GeoPoint(23,115));

points.add(new GeoPoint(25,113));

points.add(new GeoPoint(21,112));

GeoPolygonQueryBuilder query2 = QueryBuilders.geoPolygonQuery("location",points);

GeoDistanceQueryBuilder query3 = QueryBuilders.geoDistanceQuery("location").point(22.523375, 113.911231).distance(500, DistanceUnit.METERS);

SearchResponse response = client.prepareSearch("location").setQuery(query3).get();

for(SearchHit hit:response.getHits().getHits()) {

System.out.println(hit.getSourceAsString());

}

}

小结

专注Java高并发、分布式架构,更多技术干货分享与心得,请关注公众号:Java架构社区

es文本分析java代码_Elasticsearch系列---Java客户端代码Demo相关推荐

  1. 跳槽者、应届生必看JAVA面试题系列 - JAVA基础知识(四)

    一: 前言 莫等闲,白了少年头,空悲切. 二: 面试挑战   在文章开始前,首先安利下"面试挑战": 凡是满足下面的挑战条件的,如果一个月内没有拿到一个Offer的,免费提供简历封 ...

  2. java wsimport方式生成webservice客户端代码

    wsimport方法 1.在jdk安装目录bin下wsimport.exe,执行命令(修改红色部分的信息即可) wsimport -keep -d D:\1 -s D:\2 -p com.exampl ...

  3. JAVA之JUC系列 - JAVA内存模型

    Java内存模型(简称JMM),定义了线程本地内存和主内存之间的关系,理解JMM的特性,对深入理解Java多线程中内存的可见性会有很大帮助.下面我们从并发编程模型中关注的两个问题说起. 一. 并发编程 ...

  4. 一文整理总结常见Java面试题系列——Java集合篇(2022最新版)

    关于作者

  5. 如何用php向wsdl服务器发请求,知道服务器端Wsdl,不写服务端代码,仅写客户端代码能调用服务端的方法吗?...

    新手请教,望高手朋友不吝赐教,无比感谢中. 今天一大早开始研究Webservice,有一本不太专业的参考书,随便找了一个服务端的Wsdl,想调用其中的方法,结果搞了一天,无果,老报错,郁闷. 代码如下 ...

  6. java高并发系列 - 第1天:必须知道的几个概念

    java高并发系列-第1天:必须知道的几个概念 同步(Synchronous)和异步(Asynchronous) 同步和异步通常来形容一次方法调用,同步方法调用一旦开始,调用者必须等到方法调用返回后, ...

  7. Lossless Codec---APE代码解读系列(二)

    APE file 一些概念 APE代码解读系列(一) APE代码解读系列(三) 1. 先要了解APE compression level APE主要有5level, 分别是: CompressionL ...

  8. Java学习星球,Java学习路线

    目录 一.Java学习路线 二.学习计划 三.为何会有Java学习星球? 四.加入星球后,你可以得到什么? 五.如何加入Java学习星球? 六.打卡挑战 大家好,我是哪吒,一个靠着热情攀登至C站巅峰的 ...

  9. 差分隐私代码实现系列数据集及源码

    目录 差分隐私系列源码及数据集 关于差分隐私代码实现系列说明 发博客的目的 水平有限 时间有限 个人性格 后续安排 差分隐私系列源码及数据集 Programming Differential Priv ...

最新文章

  1. 3w最简单led灯电路图_led灯驱动电源电路图大全(六款模拟电路设计原理图详解)...
  2. bootstrap 树形表格渲染慢_bootstrap-table-treegrid数据量较大时渲染太久了
  3. Linux Kernel 4.20 生命周期已结束,建议迁移 5.0
  4. Codeforces Round #108 (Div. 2)
  5. touchesEnded不响应
  6. Java Double类hashCode()方法及示例
  7. 个推开发者服务进阶之路
  8. python面试题之补充缺失的代码
  9. 讨论:.NET 4各项技术的应用前景,徐汇区网站设计
  10. Linux Shell常用命令学习(1)
  11. 无边框对话框拖动改变大小的实现总结
  12. HPE服务器做raid5阵列
  13. Fiddler V5中文版
  14. BitComet(比特彗星) BT磁力链下载推荐
  15. 大一计算机引论知识点,计算机引论知识点2015.doc
  16. 微信api中转站(用python搭建flask服务器)
  17. 必读的android 文章- 收藏集 - 掘金
  18. kubelet源码分析(四)之 syncLoopIteration
  19. CAD VCL Multiplatform SDK 定制Crack
  20. python设置窗口位置_python中tkinter窗口位置

热门文章

  1. 下找到vue变量_Vue:npm run serve 到底做了什么?
  2. MySQL安装之没有配置向导
  3. STL15-map/multimap容器
  4. c语言编程判断素数的函数,【面试题】C语言:实现一个函数,判断一个数是不是素数。...
  5. Python类的构造方法__init__(self)和析构函数__del__详解
  6. mybatis 控制台打印执行的SQL语句
  7. p1、查询端口号占用,根据端口查看进程信息/p
  8. Icon+启动图尺寸
  9. zabbix v3.0安装部署【转】
  10. Activiti工作流的简单介绍