elasticsearch基本操作之--使用java操作elasticsearch
本文转载自: https://www.cnblogs.com/wenbronk/p/6386043.html 作者:wenbronk 转载请注明该声明。
/**
* 系统环境: vm12 下的centos 7.2
* 当前安装版本: elasticsearch-2.4.0.tar.gz
*/
es 查询共有4种查询类型
QUERY_AND_FETCH:
主节点将查询请求分发到所有的分片中,各个分片按照自己的查询规则即词频文档频率进行打分排序,然后将结果返回给主节点,主节点对所有数据进行汇总排序然后再返回给客户端,此种方式只需要和es交互一次。
这种查询方式存在数据量和排序问题,主节点会汇总所有分片返回的数据这样数据量会比较大,二是各个分片上的规则可能不一致。
QUERY_THEN_FETCH:
主节点将请求分发给所有分片,各个分片打分排序后将数据的id和分值返回给主节点,主节点收到后进行汇总排序再根据排序后的id到对应的节点读取对应的数据再返回给客户端,此种方式需要和es交互两次。
这种方式解决了数据量问题但是排序问题依然存在而且是es的默认查询方式
DEF_QUERY_AND_FETCH: 和 DFS_QUERY_THEN_FETCH:
将各个分片的规则统一起来进行打分。解决了排序问题但是DFS_QUERY_AND_FETCH仍然存在数据量问题,DFS_QUERY_THEN_FETCH两种噢乖你问题都解决但是效率是最差的。
1, 获取client, 两种方式获取
@Beforepublic void before() throws Exception {Map<String, String> map = new HashMap<String, String>(); map.put("cluster.name", "elasticsearch_wenbronk"); Settings.Builder settings = Settings.builder().put(map); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300"))); }
@Beforepublic void before11() throws Exception {// 创建客户端, 使用的默认集群名, "elasticSearch" // client = TransportClient.builder().build() // .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), 9300));// 通过setting对象指定集群配置信息, 配置的集群名Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch_wenbronk") // 设置集群名 // .put("client.transport.sniff", true) // 开启嗅探 , 开启后会一直连接不上, 原因未知 // .put("network.host", "192.168.50.37").put("client.transport.ignore_cluster_name", true) // 忽略集群名字验证, 打开后集群名字不对也能连接上 // .put("client.transport.nodes_sampler_interval", 5) //报错, // .put("client.transport.ping_timeout", 5) // 报错, ping等待时间, .build();client = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300)));// 默认5s// 多久打开连接, 默认5sSystem.out.println("success connect");} PS: 官网给的2种方式都不能用, 需要合起来才能用, 浪费老子一下午...
其他参数的意义:
代码:
package com.wenbronk.javaes;import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit;import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkProcessor.Listener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.script.Script; import org.junit.Before; import org.junit.Test;import com.alibaba.fastjson.JSONObject;/*** 使用java API操作elasticSearch* * @author 231**/ public class JavaESTest {private TransportClient client;private IndexRequest source;/*** 获取连接, 第一种方式* @throws Exception*/ // @Beforepublic void before() throws Exception {Map<String, String> map = new HashMap<String, String>(); map.put("cluster.name", "elasticsearch_wenbronk"); Settings.Builder settings = Settings.builder().put(map); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300"))); } /*** 查看集群信息*/@Testpublic void testInfo() {List<DiscoveryNode> nodes = client.connectedNodes();for (DiscoveryNode node : nodes) {System.out.println(node.getHostAddress());}}/*** 组织json串, 方式1,直接拼接*/public String createJson1() {String json = "{" +"\"user\":\"kimchy\"," +"\"postDate\":\"2013-01-30\"," +"\"message\":\"trying out Elasticsearch\"" +"}";return json;}/*** 使用map创建json*/public Map<String, Object> createJson2() {Map<String,Object> json = new HashMap<String, Object>();json.put("user", "kimchy");json.put("postDate", new Date());json.put("message", "trying out elasticsearch");return json;}/*** 使用fastjson创建*/public JSONObject createJson3() {JSONObject json = new JSONObject();json.put("user", "kimchy");json.put("postDate", new Date());json.put("message", "trying out elasticsearch");return json;}/*** 使用es的帮助类*/public XContentBuilder createJson4() throws Exception {// 创建json对象, 其中一个创建json的方式XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("user", "kimchy").field("postDate", new Date()).field("message", "trying to out ElasticSearch").endObject();return source;}/*** 存入索引中* @throws Exception*/@Testpublic void test1() throws Exception {XContentBuilder source = createJson4();// 存json入索引中IndexResponse response = client.prepareIndex("twitter", "tweet", "1").setSource(source).get(); // // 结果获取String index = response.getIndex();String type = response.getType();String id = response.getId();long version = response.getVersion();boolean created = response.isCreated();System.out.println(index + " : " + type + ": " + id + ": " + version + ": " + created);}/*** get API 获取指定文档信息*/@Testpublic void testGet() { // GetResponse response = client.prepareGet("twitter", "tweet", "1") // .get();GetResponse response = client.prepareGet("twitter", "tweet", "1").setOperationThreaded(false) // 线程安全.get();System.out.println(response.getSourceAsString());}/*** 测试 delete api*/@Testpublic void testDelete() {DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();String index = response.getIndex();String type = response.getType();String id = response.getId();long version = response.getVersion();System.out.println(index + " : " + type + ": " + id + ": " + version);}/*** 测试更新 update API* 使用 updateRequest 对象* @throws Exception */@Testpublic void testUpdate() throws Exception {UpdateRequest updateRequest = new UpdateRequest();updateRequest.index("twitter");updateRequest.type("tweet");updateRequest.id("1");updateRequest.doc(XContentFactory.jsonBuilder().startObject()// 对没有的字段添加, 对已有的字段替换.field("gender", "male").field("message", "hello").endObject());UpdateResponse response = client.update(updateRequest).get();// 打印String index = response.getIndex();String type = response.getType();String id = response.getId();long version = response.getVersion();System.out.println(index + " : " + type + ": " + id + ": " + version);}/*** 测试update api, 使用client* @throws Exception */@Testpublic void testUpdate2() throws Exception {// 使用Script对象进行更新 // UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1") // .setScript(new Script("hits._source.gender = \"male\"")) // .get();// 使用XContFactory.jsonBuilder() 进行更新 // UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1") // .setDoc(XContentFactory.jsonBuilder() // .startObject() // .field("gender", "malelelele") // .endObject()).get();// 使用updateRequest对象及script // UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1") // .script(new Script("ctx._source.gender=\"male\"")); // UpdateResponse response = client.update(updateRequest).get();// 使用updateRequest对象及documents进行更新UpdateResponse response = client.update(new UpdateRequest("twitter", "tweet", "1").doc(XContentFactory.jsonBuilder().startObject().field("gender", "male").endObject())).get();System.out.println(response.getIndex());}/*** 测试update* 使用updateRequest* @throws Exception * @throws InterruptedException */@Testpublic void testUpdate3() throws InterruptedException, Exception {UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1").script(new Script("ctx._source.gender=\"male\""));UpdateResponse response = client.update(updateRequest).get();}/*** 测试upsert方法* @throws Exception * */@Testpublic void testUpsert() throws Exception {// 设置查询条件, 查找不到则添加生效IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "2").source(XContentFactory.jsonBuilder().startObject().field("name", "214").field("gender", "gfrerq").endObject());// 设置更新, 查找到更新下面的设置UpdateRequest upsert = new UpdateRequest("twitter", "tweet", "2").doc(XContentFactory.jsonBuilder().startObject().field("user", "wenbronk").endObject()).upsert(indexRequest);client.update(upsert).get();}/*** 测试multi get api* 从不同的index, type, 和id中获取*/@Testpublic void testMultiGet() {MultiGetResponse multiGetResponse = client.prepareMultiGet().add("twitter", "tweet", "1").add("twitter", "tweet", "2", "3", "4").add("anothoer", "type", "foo").get();for (MultiGetItemResponse itemResponse : multiGetResponse) {GetResponse response = itemResponse.getResponse();if (response.isExists()) {String sourceAsString = response.getSourceAsString();System.out.println(sourceAsString);}}}/*** bulk 批量执行* 一次查询可以update 或 delete多个document*/@Testpublic void testBulk() throws Exception {BulkRequestBuilder bulkRequest = client.prepareBulk();bulkRequest.add(client.prepareIndex("twitter", "tweet", "1").setSource(XContentFactory.jsonBuilder().startObject().field("user", "kimchy").field("postDate", new Date()).field("message", "trying out Elasticsearch").endObject()));bulkRequest.add(client.prepareIndex("twitter", "tweet", "2").setSource(XContentFactory.jsonBuilder().startObject().field("user", "kimchy").field("postDate", new Date()).field("message", "another post").endObject()));BulkResponse response = bulkRequest.get();System.out.println(response.getHeaders());}/*** 使用bulk processor* @throws Exception */@Testpublic void testBulkProcessor() throws Exception {// 创建BulkPorcessor对象BulkProcessor bulkProcessor = BulkProcessor.builder(client, new Listener() {public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) {// TODO Auto-generated method stub }// 执行出错时执行public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {// TODO Auto-generated method stub }public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {// TODO Auto-generated method stub }})// 1w次请求执行一次bulk.setBulkActions(10000)// 1gb的数据刷新一次bulk.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 固定5s必须刷新一次.setFlushInterval(TimeValue.timeValueSeconds(5))// 并发请求数量, 0不并发, 1并发允许执行.setConcurrentRequests(1)// 设置退避, 100ms后执行, 最大请求3次 .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();// 添加单次请求bulkProcessor.add(new IndexRequest("twitter", "tweet", "1"));bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));// 关闭bulkProcessor.awaitClose(10, TimeUnit.MINUTES);// 或者 bulkProcessor.close();} }
tes2代码:
package com.wenbronk.javaes;import java.net.InetSocketAddress;import org.apache.lucene.queryparser.xml.FilterBuilderFactory; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings.Builder; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortParseElement; import org.junit.Before; import org.junit.Test;/*** 使用java API操作elasticSearch* search API* @author 231**/ public class JavaESTest2 {private TransportClient client;/*** 获取client对象*/@Beforepublic void testBefore() {Builder builder = Settings.settingsBuilder();builder.put("cluster.name", "wenbronk_escluster"); // .put("client.transport.ignore_cluster_name", true);Settings settings = builder.build();org.elasticsearch.client.transport.TransportClient.Builder transportBuild = TransportClient.builder();TransportClient client1 = transportBuild.settings(settings).build();client = client1.addTransportAddress((new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300))));System.out.println("success connect to escluster");}/*** 测试查询*/@Testpublic void testSearch() { // SearchRequestBuilder searchRequestBuilder = client.prepareSearch("twitter", "tweet", "1"); // SearchResponse response = searchRequestBuilder.setTypes("type1", "type2") // .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) // .setQuery(QueryBuilders.termQuery("user", "test")) // .setPostFilter(QueryBuilders.rangeQuery("age").from(0).to(1)) // .setFrom(0).setSize(2).setExplain(true) // .execute().actionGet();SearchResponse response = client.prepareSearch().execute().actionGet(); // SearchHits hits = response.getHits(); // for (SearchHit searchHit : hits) { // for(Iterator<SearchHitField> iterator = searchHit.iterator(); iterator.hasNext(); ) { // SearchHitField next = iterator.next(); // System.out.println(next.getValues()); // } // }System.out.println(response);}/*** 测试scroll api* 对大量数据的处理更有效*/@Testpublic void testScrolls() {QueryBuilder queryBuilder = QueryBuilders.termQuery("twitter", "tweet");SearchResponse response = client.prepareSearch("twitter").addSort(SortParseElement.DOC_FIELD_NAME, SortOrder.ASC).setScroll(new TimeValue(60000)).setQuery(queryBuilder).setSize(100).execute().actionGet();while(true) {for (SearchHit hit : response.getHits().getHits()) {System.out.println("i am coming");}SearchResponse response2 = client.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();if (response2.getHits().getHits().length == 0) {System.out.println("oh no=====");break;}}}/*** 测试multiSearch*/@Testpublic void testMultiSearch() {QueryBuilder qb1 = QueryBuilders.queryStringQuery("elasticsearch");SearchRequestBuilder requestBuilder1 = client.prepareSearch().setQuery(qb1).setSize(1);QueryBuilder qb2 = QueryBuilders.matchQuery("user", "kimchy");SearchRequestBuilder requestBuilder2 = client.prepareSearch().setQuery(qb2).setSize(1);MultiSearchResponse multiResponse = client.prepareMultiSearch().add(requestBuilder1).add(requestBuilder2).execute().actionGet();long nbHits = 0;for (MultiSearchResponse.Item item : multiResponse.getResponses()) {SearchResponse response = item.getResponse();nbHits = response.getHits().getTotalHits();SearchHit[] hits = response.getHits().getHits();System.out.println(nbHits);}}/*** 测试聚合查询*/@Testpublic void testAggregation() {SearchResponse response = client.prepareSearch().setQuery(QueryBuilders.matchAllQuery()) // 先使用query过滤掉一部分.addAggregation(AggregationBuilders.terms("term").field("user")).addAggregation(AggregationBuilders.dateHistogram("agg2").field("birth").interval(DateHistogramInterval.YEAR)).execute().actionGet();Aggregation aggregation2 = response.getAggregations().get("term");Aggregation aggregation = response.getAggregations().get("agg2"); // SearchResponse response2 = client.search(new SearchRequest().searchType(SearchType.QUERY_AND_FETCH)).actionGet(); }/*** 测试terminate*/@Testpublic void testTerminateAfter() {SearchResponse response = client.prepareSearch("twitter").setTerminateAfter(1000).get();if (response.isTerminatedEarly()) {System.out.println("ternimate");}}/*** 过滤查询: 大于gt, 小于lt, 小于等于lte, 大于等于gte*/@Testpublic void testFilter() {SearchResponse response = client.prepareSearch("twitter") .setTypes("") .setQuery(QueryBuilders.matchAllQuery()) //查询所有 .setSearchType(SearchType.QUERY_THEN_FETCH) // .setPostFilter(FilterBuilders.rangeFilter("age").from(0).to(19) // .includeLower(true).includeUpper(true)) // .setPostFilter(FilterBuilderFactory .rangeFilter("age").gte(18).lte(22)) .setExplain(true) //explain为true表示根据数据相关度排序,和关键字匹配最高的排在前面 .get(); }/*** 分组查询*/@Testpublic void testGroupBy() {client.prepareSearch("twitter").setTypes("tweet").setQuery(QueryBuilders.matchAllQuery()).setSearchType(SearchType.QUERY_THEN_FETCH).addAggregation(AggregationBuilders.terms("user").field("user").size(0) // 根据user进行分组// size(0) 也是10).get();}}
elasticsearch基本操作之--使用java操作elasticsearch相关推荐
- java操作elasticsearch实现query String
1.CommonTersQuery: 指定字段进行模糊查询 //commonTermsQuery @Test public void test35() throws UnknownHostExcept ...
- java操作elasticsearch实现前缀查询、wildcard、fuzzy模糊查询、ids查询
1.前缀查询(prefix) //prefix前缀查询 @Testpublic void test15() throws UnknownHostException {//1.指定es集群 cluste ...
- java操作elasticsearch实现批量添加数据(bulk)
java操作elasticsearch实现批量添加主要使用了bulk 代码如下: //bulk批量操作(批量添加) @Testpublic void test7() throws IOExceptio ...
- Java操作Elasticsearch的所有方法
使用Java操作Elasticsearch的所有方法 13.1 Elasticsearch简介 Elasticsearch是基于Lucene开发的一个分布式全文检索框架,向Elasticsearch中 ...
- (六)Java操作elasticSearch(2)
Java操作elasticSearch(2) 一.DSL查询文档: 0.DSL: 1.DSL查询分类 2.全文检索查询 3.精准查询 4.地理坐标查询 5.组合查询 二.搜索结果的处理: 0.搜索结果 ...
- elasticsearch基本操作 --- 使用java操作elasticsearch
随着大数据的兴起,面对越来越多的数据和越来越复杂的业务场景,系统对后端也提出了更高的要求,尤其是用户体验上,低延迟.快速响应已经成为检验后端程序是否高效很重要的标准,在后端的数据存储框架中,elast ...
- java操作ElasticSearch(es)进行增删查改操作
ElasticSearch(名称太长,后面简称ES)作为一个搜索引擎,目前可谓是如日中天,几乎和solr齐驾并驱.关于他能做什么,跟云计算有什么关系,在此不再描述.但是ES的官方文档,特别是关于jav ...
- java操作elasticsearch出现:NoNodeAvailableException[None of the configured nodes are available
使用java练习操作elasticsearch创建索引的时候报了个这个异常 抛出错误 :NoNodeAvailableException[None of the configured nodes ar ...
- Elasticsearch笔记五之java操作es
Java操作es集群步骤1:配置集群对象信息:2:创建客户端:3:查看集群信息 1:集群名称 默认集群名为elasticsearch,如果集群名称和指定的不一致则在使用节点资源时会报错. 2:嗅探功能 ...
最新文章
- 真正拖垮年轻人的,是沉没成本
- HDOJ HDU 1106 排序 ACM 1106 IN HDU
- MapReduce编程实战之“调试”
- Permissions for id_rsa are too open
- win7(32位)U盘安装、卸载ubuntu(64位)双系统
- 使用axis公布weblogic(一个)
- centos 6.7 perl 版本 This is perl 5, version 22 安装DBI DBD
- 解决 Chrome最新版右键工具中的编码修改功能没有了的工具
- android icon换不掉图标文件夹,看烦了 Windows 原生文件夹图标?收下这套最全的更换图标教程...
- PowerPoint储存此文件时发生错误 出现错误的问题解决方法
- 夜神模拟器连接手柄无反应_夜神安卓模拟器怎么连接手柄 夜神模拟器连接手柄教程...
- android 播放多个音频文件,android – 如何同时播放多个ogg或mp3 ..?
- 狄克斯特拉算法——python实现
- c语言cast的用法,static_cast 用法
- vscode远程开发基础教程
- HTML粒子漩涡特效,使用HTML5 Canvas绘制经典漩涡粒子特效
- 转来的,我每次看都有收获
- 【转】幻想传说:全技能奥义、料理、物品、称号、交易品 获得条件(图文)...
- 第1个ARM裸板程序及引申(第004节_汇编与机器码)【修改机器码点亮led2]
- App Store Connect 更改已上架App主语言
热门文章
- 一款小清新的 SpringBoot+ Mybatis 前后端分离后台管理系统
- 计算机毕业设计Java校园绿化管理系统(源码+系统+mysql数据库+Lw文档)
- 计算机cmd入门,教大家5个装逼用的CMD命令,让人一看你就是个电脑高手
- 微型计算机的别名中不正确的是,XX计算机应用基础模拟题「附答案」
- NOJ [1527] 你的狗尾巴草呢
- 5G时代运营商内容运营策略初探
- Html的a标签onclick属性,关于通过js给a标签增加onclick属性
- linux设置为桥接后ipv4不见,centOS不显示ipv4地址的解决办法
- 设置计算机从u盘启动的英文,phoenix bios设置中英文图解(U盘启动+启动顺序)
- Studio One6最新版本升级 新增20项功能详解