es文本分析java代码_Elasticsearch系列---Java客户端代码Demo
前言
前面历经33篇内容的讲解,与ES的请求操作都是在Kibana平台上用Restful请求完成的,一直没发布Java或python的客户端代码,Restful才是运用、理解ES核心功能最直接的表达方式,但实际项目中肯定是以Java/python来完成ES请求的发起与数据处理的,前面理解了ES的核心功能,后面Java API的使用将会非常简单,剩余的未覆盖的功能API,自行查阅文档即可。
概要
本篇讲解Elasticsearch的客户端API开发的一些示例,以Java语言为主,介绍一些最常用,最核心的API。
代码示例
引入依赖
我们以maven项目为例,添加项目依赖
org.elasticsearch
elasticsearch
6.3.1
org.elasticsearch.client
transport
6.3.1
log4j
log4j
1.2.17
org.apache.logging.log4j
log4j-core
2.12.1
建立ES连接创建Settings对象,指定集群名称
创建TransportClient对象,手动指定IP、端口即可
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
如果集群的节点数比较多,为每个node分别指定IP、Port可行性不高,我们可以使用集群节点自动探查的功能,代码如下:
// 将client.transport.sniff设置为true即可打开集群节点自动探查功能
Settings settings = Settings.builder().put("client.transport.sniff", true)..put("cluster.name", "elasticsearch").build();
// 只需要指定一个node就行
TransportClient client = new PreBuiltTransportClient(settings);
transport.addTransportAddress(new TransportAddress(InetAddress.getByName("192.168.17.137"), 9300));
基本CRUD
最基本的CRUD代码,可以当作入门demo来写:
/**
* 创建员工信息(创建一个document)
* @param client
*/
private static void createEmployee(TransportClient client) throws Exception {
IndexResponse response = client.prepareIndex("company", "employee", "1")
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("name", "jack")
.field("age", 27)
.field("position", "technique")
.field("country", "china")
.field("join_date", "2017-01-01")
.field("salary", 10000)
.endObject())
.get();
System.out.println(response.getResult());
}
/**
* 获取员工信息
* @param client
* @throws Exception
*/
private static void getEmployee(TransportClient client) throws Exception {
GetResponse response = client.prepareGet("company", "employee", "1").get();
System.out.println(response.getSourceAsString());
}
/**
* 修改员工信息
* @param client
* @throws Exception
*/
private static void updateEmployee(TransportClient client) throws Exception {
UpdateResponse response = client.prepareUpdate("company", "employee", "1")
.setDoc(XContentFactory.jsonBuilder()
.startObject()
.field("position", "technique manager")
.endObject())
.get();
System.out.println(response.getResult());
}
/**
* 删除 员工信息
* @param client
* @throws Exception
*/
private static void deleteEmployee(TransportClient client) throws Exception {
DeleteResponse response = client.prepareDelete("company", "employee", "1").get();
System.out.println(response.getResult());
}
搜索
我们之前使用Restful的搜索,现在改用java实现,原有的Restful示例如下:
GET /company/employee/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"position": "technique"
}
}
],
"filter": {
"range": {
"age": {
"gte": 30,
"lte": 40
}
}
}
}
},
"from": 0,
"size": 1
}
等同于这样的Java代码:
SearchResponse response = client.prepareSearch("company")
.setTypes("employee")
.setQuery(QueryBuilders.termQuery("position", "technique")) // Query
.setPostFilter(QueryBuilders.rangeQuery("age").from(30).to(40)) // Filter
.setFrom(0).setSize(60)
.get();
聚合查询
聚合查询稍微麻烦一些,请求的封装和响应报文的解析,都是根据实际返回的结构来做的,例如下面的查询:
需求:按照country国家来进行分组
在每个country分组内,再按照入职年限进行分组
最后计算每个分组内的平均薪资
Restful的请求如下:
GET /company/employee/_search
{
"size": 0,
"aggs": {
"group_by_country": {
"terms": {
"field": "country"
},
"aggs": {
"group_by_join_date": {
"date_histogram": {
"field": "join_date",
"interval": "year"
},
"aggs": {
"avg_salary": {
"avg": {
"field": "salary"
}
}
}
}
}
}
}
}
用Java编写的请求如下:
SearchResponse sr = node.client().prepareSearch()
.addAggregation(
AggregationBuilders.terms("by_country").field("country")
.subAggregation(AggregationBuilders.dateHistogram("by_year")
.field("dateOfBirth")
.dateHistogramInterval(DateHistogramInterval.YEAR)
.subAggregation(AggregationBuilders.avg("avg_children").field("children"))
)
)
.execute().actionGet();
对响应的处理,则需要一层一层获取数据:
Map aggrMap = searchResponse.getAggregations().asMap();
StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country");
Iterator groupByCountryBucketIterator = groupByCountry.getBuckets().iterator();
while(groupByCountryBucketIterator.hasNext()) {
Bucket groupByCountryBucket = groupByCountryBucketIterator.next();
System.out.println(groupByCountryBucket.getKey() + "\t" + groupByCountryBucket.getDocCount());
Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date");
Iterator groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator();
while(groupByJoinDateBucketIterator.hasNext()) {
org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next();
System.out.println(groupByJoinDateBucket.getKey() + "\t" + groupByJoinDateBucket.getDocCount());
Avg avgSalary = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary");
System.out.println(avgSalary.getValue());
}
}
client.close();
}
upsert请求
private static void upsert(TransportClient transport) {
try {
IndexRequest index = new IndexRequest("book_shop", "books", "2").source(
XContentFactory.jsonBuilder().startObject()
.field("name", "mysql从入门到删库跑路")
.field("tags", "mysql")
.field("price", 32.8)
.endObject());
UpdateRequest update = new UpdateRequest("book_shop", "books", "2")
.doc(XContentFactory.jsonBuilder()
.startObject().field("price", 31.8)
.endObject())
.upsert(index);
UpdateResponse response = transport.update(update).get();
System.out.println(response.getVersion());
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
mget请求
public static void mget(TransportClient transport) {
MultiGetResponse res = transport.prepareMultiGet()
.add("book_shop", "books", "1")
.add("book_shop", "books", "2")
.get();
for (MultiGetItemResponse item : res.getResponses()) {
System.out.println(item.getResponse());
}
}
bulk请求
public static void bulk(TransportClient transport) {
try {
BulkRequestBuilder bulk = transport.prepareBulk();
bulk.add(transport.prepareIndex("book_shop", "books", "3").setSource(
XContentFactory.jsonBuilder().startObject()
.field("name", "设计模式从入门到拷贝代码")
.field("tags", "设计模式")
.field("price", 55.9)
.endObject()));
bulk.add(transport.prepareIndex("book_shop", "books", "4").setSource(
XContentFactory.jsonBuilder().startObject()
.field("name", "架构设计从入门到google搜索")
.field("tags", "架构设计")
.field("price", 68.9)
.endObject()));
bulk.add(transport.prepareUpdate("book_shop", "books", "1").setDoc((XContentFactory.jsonBuilder()
.startObject().field("price", 32.8)
.endObject())));
BulkResponse bulkRes = bulk.get();
if (bulkRes.hasFailures()) {
System.out.println("Error...");
}
} catch (IOException e) {
e.printStackTrace();
}
}
scorll请求
public static void scorll(TransportClient client) {
SearchResponse bookShop = client.prepareSearch("book_shop").setScroll(new TimeValue(60000)).setSize(1).get();
int batchCnt = 0;
do {
// 循环读取scrollid信息,直到结果为空
for(SearchHit hit: bookShop.getHits().getHits()) {
System.out.println("batchCnt:" + ++batchCnt);
System.out.println(hit.getSourceAsString());
}
bookShop = client.prepareSearchScroll(bookShop.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
} while (bookShop.getHits().getHits().length != 0);
}
搜索模板
public static void searchTemplates(TransportClient client) {
Map params = new HashMap<>(10);
params.put("from",0);
params.put("size",10);
params.put("tags","java");
SearchTemplateResponse str = new SearchTemplateRequestBuilder(client)
.setScript("page_query_by_tags")
.setScriptType(ScriptType.STORED)
.setScriptParams(params)
.setRequest(new SearchRequest())
.get();
for(SearchHit hit:str.getResponse().getHits().getHits()) {
System.out.println(hit.getSourceAsString());
}
}
多条件组合查询
public static void otherSearch(TransportClient client) {
SearchResponse response1 = client.prepareSearch("book_shop").setQuery(QueryBuilders.termQuery("tags", "java")).get();
SearchResponse response2 = client.prepareSearch("book_shop").setQuery(QueryBuilders.multiMatchQuery("32.8", "price","tags")).get();
SearchResponse response3 = client.prepareSearch("book_shop").setQuery(QueryBuilders.commonTermsQuery("name", "入门")).get();
SearchResponse response4 = client.prepareSearch("book_shop").setQuery(QueryBuilders.prefixQuery("name", "java")).get();
System.out.println(response1.getHits().getHits()[0].getSourceAsString());
System.out.println(response2.getHits().getHits()[0].getSourceAsString());
System.out.println(response3.getHits().getHits()[0].getSourceAsString());
System.out.println(response4.getHits().getHits()[0].getSourceAsString());
// 多个条件组合
SearchResponse response5 = client.prepareSearch("book_shop").setQuery(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("tags", "java"))
.mustNot(QueryBuilders.matchQuery("name", "跑路"))
.should(QueryBuilders.matchQuery("name", "入门"))
.filter(QueryBuilders.rangeQuery("price").gte(23).lte(55))).get();
System.out.println(response5.getHits().getHits()[0].getSourceAsString());
}
地理位置查询
public static void geo(TransportClient client) {
GeoBoundingBoxQueryBuilder query1 = QueryBuilders.geoBoundingBoxQuery("location").setCorners(23, 112, 21, 114);
List points = new ArrayList<>();
points.add(new GeoPoint(23,115));
points.add(new GeoPoint(25,113));
points.add(new GeoPoint(21,112));
GeoPolygonQueryBuilder query2 = QueryBuilders.geoPolygonQuery("location",points);
GeoDistanceQueryBuilder query3 = QueryBuilders.geoDistanceQuery("location").point(22.523375, 113.911231).distance(500, DistanceUnit.METERS);
SearchResponse response = client.prepareSearch("location").setQuery(query3).get();
for(SearchHit hit:response.getHits().getHits()) {
System.out.println(hit.getSourceAsString());
}
}
小结
专注Java高并发、分布式架构,更多技术干货分享与心得,请关注公众号:Java架构社区
es文本分析java代码_Elasticsearch系列---Java客户端代码Demo相关推荐
- 跳槽者、应届生必看JAVA面试题系列 - JAVA基础知识(四)
一: 前言 莫等闲,白了少年头,空悲切. 二: 面试挑战 在文章开始前,首先安利下"面试挑战": 凡是满足下面的挑战条件的,如果一个月内没有拿到一个Offer的,免费提供简历封 ...
- java wsimport方式生成webservice客户端代码
wsimport方法 1.在jdk安装目录bin下wsimport.exe,执行命令(修改红色部分的信息即可) wsimport -keep -d D:\1 -s D:\2 -p com.exampl ...
- JAVA之JUC系列 - JAVA内存模型
Java内存模型(简称JMM),定义了线程本地内存和主内存之间的关系,理解JMM的特性,对深入理解Java多线程中内存的可见性会有很大帮助.下面我们从并发编程模型中关注的两个问题说起. 一. 并发编程 ...
- 一文整理总结常见Java面试题系列——Java集合篇(2022最新版)
关于作者
- 如何用php向wsdl服务器发请求,知道服务器端Wsdl,不写服务端代码,仅写客户端代码能调用服务端的方法吗?...
新手请教,望高手朋友不吝赐教,无比感谢中. 今天一大早开始研究Webservice,有一本不太专业的参考书,随便找了一个服务端的Wsdl,想调用其中的方法,结果搞了一天,无果,老报错,郁闷. 代码如下 ...
- java高并发系列 - 第1天:必须知道的几个概念
java高并发系列-第1天:必须知道的几个概念 同步(Synchronous)和异步(Asynchronous) 同步和异步通常来形容一次方法调用,同步方法调用一旦开始,调用者必须等到方法调用返回后, ...
- Lossless Codec---APE代码解读系列(二)
APE file 一些概念 APE代码解读系列(一) APE代码解读系列(三) 1. 先要了解APE compression level APE主要有5level, 分别是: CompressionL ...
- Java学习星球,Java学习路线
目录 一.Java学习路线 二.学习计划 三.为何会有Java学习星球? 四.加入星球后,你可以得到什么? 五.如何加入Java学习星球? 六.打卡挑战 大家好,我是哪吒,一个靠着热情攀登至C站巅峰的 ...
- 差分隐私代码实现系列数据集及源码
目录 差分隐私系列源码及数据集 关于差分隐私代码实现系列说明 发博客的目的 水平有限 时间有限 个人性格 后续安排 差分隐私系列源码及数据集 Programming Differential Priv ...
最新文章
- 3w最简单led灯电路图_led灯驱动电源电路图大全(六款模拟电路设计原理图详解)...
- bootstrap 树形表格渲染慢_bootstrap-table-treegrid数据量较大时渲染太久了
- Linux Kernel 4.20 生命周期已结束,建议迁移 5.0
- Codeforces Round #108 (Div. 2)
- touchesEnded不响应
- Java Double类hashCode()方法及示例
- 个推开发者服务进阶之路
- python面试题之补充缺失的代码
- 讨论:.NET 4各项技术的应用前景,徐汇区网站设计
- Linux Shell常用命令学习(1)
- 无边框对话框拖动改变大小的实现总结
- HPE服务器做raid5阵列
- Fiddler V5中文版
- BitComet(比特彗星) BT磁力链下载推荐
- 大一计算机引论知识点,计算机引论知识点2015.doc
- 微信api中转站(用python搭建flask服务器)
- 必读的android 文章- 收藏集 - 掘金
- kubelet源码分析(四)之 syncLoopIteration
- CAD VCL Multiplatform SDK 定制Crack
- python设置窗口位置_python中tkinter窗口位置
热门文章
- 下找到vue变量_Vue:npm run serve 到底做了什么?
- MySQL安装之没有配置向导
- STL15-map/multimap容器
- c语言编程判断素数的函数,【面试题】C语言:实现一个函数,判断一个数是不是素数。...
- Python类的构造方法__init__(self)和析构函数__del__详解
- mybatis 控制台打印执行的SQL语句
- p1、查询端口号占用,根据端口查看进程信息/p
- Icon+启动图尺寸
- zabbix v3.0安装部署【转】
- Activiti工作流的简单介绍