ElasticSearch-2.0.0集群安装配置与API使用实践
ElasticSearch是基于全文搜索引擎库Lucene构建的分布式搜索引擎,我们可以直接使用ElasticSearch实现分布式搜索系统的搭建与使用,都知道,Lucene只是一个搜索框架,它提供了搜索引擎操作的基本API,如果要实现一个能够使用的搜索引擎系统,还需要自己基于Lucene的API去实现,工作量很大,而且还需要很好地掌握Lucene的底层实现原理。
ElasticSearch是一个完整的分布式搜索引擎系统,它的一些基本特性包括如下:
- 全文检索
- 提供插件机制,可以共享重用插件的功能
- 分布式文件存储
- 分布式实时索引和搜索
- 实时统计分析
- 可以横向扩展,支持大规模数据的搜索
- 简单易用的RESTfulAPI
- 基于Replication实现了数据的高可用特性
- 与其他系统的集成
- 支持结构化和非结构化数据
- 灵活的Schema设计(Mappings)
- 支持多编程语言客户端
我个人感觉,ElasticSearch尽量屏蔽底层Lucene相关的技术细节,让你根本无从感觉底层Lucene相关的内容,这样你可以省去了了解Lucene 的成本,学习曲线比较平缓,不像Solr,如果想要构造负责的查询(Query),还是要对Lucene有所了解的。另外,在分布式设计方面,ElasticSearch更轻量一些,用起来更简单,而使用Solr的分布式分片功能需要使用SolrCloud,它基于ZooKeeper来实现配置管理,以及Replication功能,而且Solr需要使用Web容器来部署,相对来说有点复杂一些(我个人之前使用的SolrCloud版本大概是3.1~3.5左右,比较早,现在可能更加完善了)。
基本概念
我们熟悉一下ElasticSearch中涉及到的一些基本概念:
索引(Index)
索引(Index)是文档的集合,它是根据实际业务逻辑进行划分的,通常会把相对独立且具有相似结构或者性质的数据作为文档,放在一起,形成一个索引,比如,用户相关信息可以作为一个索引,交易相关信息也可应作为另一个索引。
类型(Type)
类型(Type)是索引内部的一个逻辑划分,在一个索引内部可以定义多个类型(Type),类型将一个索引在逻辑上划分为多个集合,每个类型包含多个属性(字段)。比如,我们基于手机客户端应用App,创建一个了用户相关信息的索引,然后再在这个索引内部定义多个类型:基本信息类型、设备信息类型、行为信息类型,基本信息类型中包含用户编号、证件号码、名称、手机号码、年龄、出生日期,设备信息类型包括设备类型、设备名称、App版本号、渠道来源、系统版本、IMEI、mac地址,用户行为信息包含用户编号、事件编号、事件类型、时间、浏览页面代码、地区编码,这样有3个类型在一个索引当中。ElasticSearch中类型,与HBase中列簇(Column Family)的概念很相似。
文档(Document)
文档(Document)是索引的基本单元,它与关系数据库中的一条记录相类似,包含了一组属性信息,同时包含一个唯一标识这一组属性值的ID,通过该ID可以更新一个文档,也可以删除一个文档。
分片(Shards)&副本(Replicas)
一个索引是很多文档的集合,将一个索引进行分割,分成多个片段(一个索引的子集),每一个片段称为一个分片(Shard),这样划分可以很好地管理索引,跨节点存储,为分布式存储于搜索提供了便利。副本(Replica)是为了保证一个分片(Shard)的可用性,冗余复制存储,当一个分片对应的数据无法读取时,可以读取其副本,正常提供搜索服务。
集群安装配置
ElasticSearch集群安装配置非常容易,安装可以执行如下命令行:
wget https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/2.0.0/elasticsearch-2.0.0.zip
unzip elasticsearch-2.0.0.zip
拿出集群的一个节点的进行配置,修改配置文件config/elasticsearch.yml的内容,如下所示:
# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
# Before you set out to tweak and tune the configuration, make sure you
# understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please see the documentation for further information on configuration options:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/setup-configuration.html>
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: dw_search_engine
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: esnode-01
#
# Add custom attributes to the node:
#
# node.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
path.data: /data/dw_search_storage
#
# Path to log files:
#
path.logs: /tmp/es/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
# bootstrap.mlockall: true
#
# Make sure that the `ES_HEAP_SIZE` environment variable is set to about half the memory
# available on the system and that the owner of the process is allowed to use this limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind adress to a specific IP (IPv4 or IPv6):
#
network.host: 10.10.2.62
#
# Set a custom port for HTTP:
#
http.port: 9200
#
# For more information, see the documentation at:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html>
#
# ---------------------------------- Gateway -----------------------------------
#
# Block initial recovery after a full cluster restart until N nodes are started:
#
# gateway.recover_after_nodes: 3
#
# For more information, see the documentation at:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-gateway.html>
#
# --------------------------------- Discovery ----------------------------------
#
# Elasticsearch nodes will find each other via unicast, by default.
#
# Pass an initial list of hosts to perform discovery when new node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
discovery.zen.ping.unicast.hosts: ["es-01", "es-02"]
#
# Prevent the "split brain" by configuring the majority of nodes (total number of nodes / 2 + 1):
#
# discovery.zen.minimum_master_nodes: 3
#
# For more information, see the documentation at:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html>
#
# ---------------------------------- Various -----------------------------------
#
# Disable starting multiple nodes on a single system:
#
# node.max_local_storage_nodes: 1
#
# Require explicit names when deleting indices:
#
# action.destructive_requires_name: true
其它节点的配置,在保证基本存储目录相同的前提下,可以根据需要修改如下几个参数:
node.name
network.host
http.port
最后,在每个节点上分别启动ElasticSearch,执行如下命令:
cd elasticsearch-2.0.0
bin/elasticsearch -d
然后可以查看Web管理界面,需要安装插件elasticsearch-head,后面会介绍,Web管理界面,如下所示:
上图中,我们已经创建了一个索引,可以看到节点的状态,及其分片(Shard)的情况。
RESTful API基本操作
尤其是在进行搜索的时候,为了使得其他系统能够与ElasticSearch搜索系统很好地解耦合,使用ElasticSearch提供的RESTful API是一种不错的选择。下面,我们介绍RESTful API的基本操作。
插件管理
插件的存放目录为elasticsearch-2.0.0/plugins/,插件都是基于该存储目录进行操作的。
安装插件
bin/plugin install analysis-icu
bin/plugin install mobz/elasticsearch-head
可以从不同的位置安装插件,上面第一个称为Core Elasticsearch plugin,它是Elasticsearch提供的,会从Elasticsearch上下载并安装;上面第一个是从Github上自动下载安装。还有其他的方式安装,如从特定的文件系统等进行安装。
列出插件
bin/plugin list
删除插件
bin/plugin remove analysis-icu
安装完一个插件,我们可以查看,例如查看elasticsearch_head插件,查看如下链接:
http://10.10.2.62:9200/_plugin/head/
创建索引
curl -XPUT 'http://10.10.2.62:9200/basis_device_info/'
创建的索引名称为basis_device_info,我们也可以不指定一个索引对应的Mappings,而是在索引的时候自动生成Mappings,所以如果没有指定一个索引的Mappings,则这个索引可以支持任何的Mappings。同样可知,一个索引可以自动地增加不同的type,非常灵活。
也可以指定索引的基本配置,如分片(Shard)数目、副本(Replica)数目,如下所示:
curl -XPUT 'http://10.10.2.62:9200/basis_device_info /' -d '{"settings" : {"index" : {"number_of_shards" : 10,"number_of_replicas" : 1}}
}'
默认是5个分片,不进行复制,上面配置表示索引basis_device_info有10个分片,每个分片1个副本。
下面在创建索引的时候,指定设计的schema,即配置mappings,如下所示:
curl -XPUT 'http://10.10.2.62:9200/basis_device_info/' -d '
{"mappings": {"user": {"_all": { "enabled": false },"properties": {"installid": { "type": "string" },"appid": { "type": "string" },"channel": { "type": "string", "index": "analyzed" },"version": { "type": "string" },"osversion": { "type": "string" },"device_name": { "type": "string", "index": "analyzed" },"producer": { "type": "string" },"device_type": { "type": "string" },"resolution": { "type": "string", "index": "analyzed" },"screen_size": { "type": "string", "index": "analyzed" },"mac": { "type": "string", "index": "not_analyzed" },"idfa": { "type": "string" },"idfv": { "type": "string", "index": "not_analyzed" },"imei": { "type": "string", "index": "not_analyzed" },"create_time": {"type": "date","format": "yyyy-MM-dd HH:mm:ss","index": "not_analyzed"}}}}
}'
上面创建了索引basis_device_info,同时type为user,有了mappings,我们就知道需要索引的数据的格式了。
删除索引
curl -XDELETE 'http://10.10.2.62:9200/basis_device_info/'
删除索引basis_device_info。
索引文档
curl -PUT 'http://10.10.2.62:9200/basis_device_info/user/CC49E748588490D41BFB89584007B0FA' -d '{"installid": "0000000L","appid": "0","udid": "CC49E748588490D41BFB89584007B0FA","channel": "wulei1","version": "3.1.2","osversion": "8.1","device_name": "iPhone Retina4 Simulator","producer": "apple","device_type": "1","resolution": "640*1136","screen_size": "320*568","mac": "600308A20C5E","idfa": "dbbbs-fdsfa-fafda-321saf","idfv": "4283FAE1-19EB-4FA9-B739-8148F76BC8C3","imei": "af-sfd0fdsa-fad-ff","create_time": "2015-01-14 20:32:05"
}'
基于我们前面创建的type为user的索引,索引一个文档,文档_id为CC49E748588490D41BFB89584007B0FA,文档内容为一个用户设备信息,使用JSON格式表示。
批量索引
批量索引,可以根据自己熟悉的编程语言或者脚本来实现,ElasticSearch也提供了一些客户端库。下面我们首先根据数据文件,构造成ElasticSearch索引支持的JSON格式,导出文件,然后通过curl工具去进行批量索引,实际上使用的是ElasticSearch提供的bulk API来实现的。
首先处理原始带索引数据,代码如下所示:
package org.shirdrn.es;import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;import net.sf.json.JSONObject;import com.google.common.base.Throwables;public class EsIndexingClient {public static void closeQuietly(Closeable... closeables) {if(closeables != null) {for(Closeable closeable : closeables) {try {closeable.close();} catch (Exception e) { }}}}public static void main(String[] args) {String f = "C:\\Users\\yanjun\\Desktop\\basis_device_info.txt";String out = "C:\\Users\\yanjun\\Desktop\\basis_device_info.json";File in = new File(f);BufferedReader reader = null;BufferedWriter writer = null;try {writer = new BufferedWriter(new FileWriter(out));reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));String line = null;while((line = reader.readLine()) != null) {String[] a = line.split("\t", -1);if(a.length == 16) {String udid = a[2];JSONObject c = new JSONObject();c.put("_index", "basis_device_info");c.put("_type", "user");c.put("_id", udid);JSONObject index = new JSONObject();index.put("index", c);JSONObject doc = new JSONObject();doc.put("installid", a[0]);doc.put("appid", a[1]);doc.put("udid", a[2]);doc.put("channel", a[3]);doc.put("version", a[4]);doc.put("osversion", a[5]);doc.put("device_name", a[6]);doc.put("producer", a[7]);doc.put("device_type", a[8]);doc.put("resolution", a[9]);doc.put("screen_size", a[10]);doc.put("mac", a[11]);doc.put("idfa", a[12]);doc.put("idfv", a[13]);doc.put("imei", a[14]);doc.put("create_time", a[15]);writer.write(index.toString() + "\n");writer.write(doc.toString() + "\n");}}} catch (Exception e) {throw Throwables.propagate(e);} finally {closeQuietly(reader, writer);}}
}
运行代码,输出的数据文件为basis_device_info.json,该文件的格式了,示例如下所示:
{"index":{"_index":"basis_device_info","_type":"user","_id":"1c207122a4b2c9632212ab86bac10f60"}}
{"installid":"00000002","appid":"0","udid":"1c207122a4b2c9632212ab86bac10f60","channel":"itings","version":"3.1.1","osversion":"4.1.2","device_name":"Lenovo P770","producer":"Lenovo","device_type":"0","resolution":"540*960","screen_size":"4.59","mac":"d4:22:3f:83:17:06","idfa":"","idfv":"","imei":"861166023335745","create_time":"2015-01-14 19:39:35"}
{"index":{"_index":"basis_device_info","_type":"user","_id":"FA6B1B98E6FF4E6994A1505A996F6102"}}
{"installid":"00000003","appid":"0","udid":"FA6B1B98E6FF4E6994A1505A996F6102","channel":"appstore","version":"3.1.1","osversion":"8.1.2","device_name":"iPhone 6Plus","producer":"apple","device_type":"1","resolution":"640*1136","screen_size":"320*568","mac":"020000000000","idfa":"84018625-A3C9-47A8-88D0-C57C12F80520","idfv":"9D1E2514-9DC8-47A8-ABD0-129FC0FB3171","imei":"","create_time":"2015-01-14 19:41:21"}
{"index":{"_index":"basis_device_info","_type":"user","_id":"8c5fe70b2408f184abcbe4f34b8f23c3"}}
{"installid":"00000004","appid":"0","udid":"8c5fe70b2408f184abcbe4f34b8f23c3","channel":"itings","version":"3.1.1.014","osversion":"4.2.2","device_name":"2014011","producer":"Xiaomi","device_type":"0","resolution":"720*1280","screen_size":"4.59","mac":"0c:1d:af:4f:48:9f","idfa":"","idfv":"","imei":"865763025472173","create_time":"2015-01-14 19:46:37"}
奇数编号行的内容为索引的指令信息,包括索引名称(_index)、类型(_type)、唯一标识(_id),偶数编号行的内容为实际待索引的文档数据。
然后,通过curl命令来进行批量索引,执行如下命令:
查看源代码打印帮助
curl -s -XPOST http://10.10.2.62:9200/basis_device_info/_bulk --data-binary "@basis_device_info.json"
搜索文档
简单的搜索,可以通过GET方式搜索,如下所示:
http://10.10.2.62:9200/basis_device_info/user/CC49E748588490D41BFB89584007B0FAhttp://10.10.2.62:9200/basis_device_info/user/_search?q=channel:B-hicloud
上面第一个根据唯一的_id进行搜索,结果返回0个或者1个文档;第二个通过指定GET方式参数,其中_search和q是ElasticSearch内置的接口关键字,通过指定字段名称和搜索关键词的方式进行搜索,结果以JSON格式返回。
Request Body搜索
可以设置请求的body内容,能够支持更加复杂的查询条件然后请求搜索,如下所示:
curl -XGET 'http://10.10.2.245:9200/basis_device_info/user/_search' -d '{"query" : {"term" : { "udid": "bc0af2ca66a96725b8b0e0056d4213b6" }}
}'
结果示例,如下所示:
{"took":11,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":9.45967,"hits":[{"_index":"basis_device_info","_type":"user","_id":"bc0af2ca66a96725b8b0e0056d4213b6","_score":9.45967,"_source":{"installid":"00000FPq","appid":"0","udid":"bc0af2ca66a96725b8b0e0056d4213b6","channel":"B-hicloud","version":"3.1.1","osversion":"4.4.2","device_name":"H60-L02","producer":"HUAWEI","device_type":"0","resolution":"720*1184","screen_size":"4.64","mac":"ec:cb:30:c4:93:e3","idfa":"","idfv":"","imei":"864103021536104","create_time":"2015-01-18 01:29:16"}}]}}
基于Lucene查询语法搜索
如果熟悉Lucene查询(Query),可以构造通过构造复杂的Term关系字符串来进行搜索,示例如下所示:
curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d '
{"query": {"query_string": { "query": "(channel:baidu OR device_name:HUAWEI)" }}
}'
查询query字符串的含义是:从channel字段搜索baidu,从device_name字段搜索HUAWEI,然后两者取并集,这实际上一个布尔查询,返回最终结果。
使用multi_match搜索
ElasticSearch支持给定搜索关键词,从多个字段中进行搜索,示例如下所示:
curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d '
{"query": {"multi_match" : {"query": "HTC","fields": [ "channel", "device_name" ]}}
}'
这样,只要在channel和device_name两个字段中出现关键词HTC,则都返回结果,结果应该是两个字段匹配上的文档集合的并集。
支持Filter搜索
可以在制定Filter进行搜索。例如下面是一个按照时间范围进行过滤,得到搜索结果的查询:
curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d '
{"query": {"filtered": {"query": { "match_all": {} },"filter" : {"range" : {"create_time" : { "from" : "2015-01-16 00:00:00", "to" : "2015-01-16 23:59:59" }}}}}
}'
分页搜索
ElasticSearch支持分页搜索,可以通过在RESTful连接中指定size和from参数,来进行分页搜索,如下所示:
curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search?size=10&from=20' -d '
{"query": {"filtered": {"query": { "match_all": {} },"filter" : {"range" : {"create_time" : { "from" : "2015-01-16 00:00:00", "to" : "2015-01-16 23:59:59" }}}}}
}'
上面搜索的含义是:按照时间范围搜索,从第20个文档开始,返回10个文档,相当于一页取10个文档。
Java客户端
如果熟悉Java语言,而不想使用脚本等其他方式操作ElasticSearch搜索集群,则可以使用ElasticSearch提供的Java客户端API来编码实现,能够更加灵活地控制。ElasticSearch提供的Java客户端支持全部常用操作,如更新索引、索引文档、搜索文档、删除索引等等操作,而且还支持其他一些功能,如同步异步模式、explain查询等,下面我们通过代码来了解一下。
如果使用Maven管理Java代码,可以在pom.xml文件中加入如下依赖:
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>2.0.0</version>
</dependency>
创建一个ElasticSearch客户端,代码如下所示:
// create & configure client
Settings settings = Settings.settingsBuilder().put("cluster.name", "dw_search_engine").put("client.transport.sniff", true).build();
final Client client = TransportClient.builder().settings(settings).build().addTransportAddress(newAddress("es-01", 9300)).addTransportAddress(newAddress("es-02", 9300));
可以将你的ElasticSearch集群的节点通过上面的addTransportAddress方法,都与Client对象关联起来,这样在操作ElasticSearch集群中的索引/更新/删除/搜索文档的时候,就能够自动感知。上面newAddress方法如下:
private static InetSocketTransportAddress newAddress(String host, int port) throws UnknownHostException {return new InetSocketTransportAddress(InetAddress.getByName(host), port);
}
另外,也可以通过在配置文件elasticsearch.yml中指定相关配置,例如:
cluster.name: dw_search_engine
client.transport.sniff: true
client.transport.ping_timeout: 10s
client.transport.nodes_sampler_interval: 10s
那么,创建客户端需要从配置文件中读取配置内容,具体可以查看官方文档。
准备工作
索引的时候,我们是从一个本地文件中读取数据,并构建索引文档需要的格式,然后请求ElasticSearch集群执行索引操作,下面代码是一些基本准备工作:
final String index = "basis_device_info";
final String type = "user";// index documents
String f = "C:\\Users\\yanjun\\Desktop\\basis_device_info.txt";
File in = new File(f);
从文件中,每次读取一行记录,然后构建一个JSON格式字符串,通过XContentBuilder来表示,代码如下所示:
protected static XContentBuilder createSource(String[] a) throws IOException {return jsonBuilder().startObject().field("installid", a[0]).field("appid", a[1]).field("udid", a[2]).field("channel", a[3]).field("version", a[4]).field("osversion", a[5]).field("device_name", a[6]).field("producer", a[7]).field("device_type", a[8]).field("resolution", a[9]).field("screen_size", a[10]).field("mac", a[11]).field("idfa", a[12]).field("idfv", a[13]).field("imei", a[14]).field("create_time", a[15]).endObject();
}
下面我们从API的功能入手,分别详细说明,并附加代码展示用法。
创建索引
可以直接通过Java客户端库来创建索引,代码如下所示:
protected static void createIndex(final Client client, String index) {Map<String, Object> indexSettings = Maps.newHashMap();indexSettings.put("number_of_shards", "4");indexSettings.put("number_of_replicas", "1");CreateIndexRequest createIndexRequest = new CreateIndexRequest(index, Settings.settingsBuilder().put(indexSettings).build());CreateIndexResponse createIndexResponse = client.admin().indices().create(createIndexRequest).actionGet();System.out.println(createIndexResponse);
}
创建Mappings
通过Java客户端创建Mappings,相对比较复杂一点,需要拼接对应的JSON字符串,实现代码如下所示:
protected static void createMappings(final Client client, String index) throws IOException, InterruptedException, ExecutionException {XContentBuilder basisInfoMapping = jsonBuilder().startObject().startObject("_all").field("enabled", "false").endObject().startObject("properties").startObject("id").field("type", "string").endObject().startObject("name").field("type", "string").field("index", "analyzed").endObject().startObject("age").field("type", "int").endObject().startObject("birthday").field("type", "date").field("format", "yyyy-MM-dd HH:mm:ss").field("index", "not_analyzed").endObject().endObject().endObject();XContentBuilder deviceInfoMapping = jsonBuilder().startObject().startObject("_all").field("enabled", "false").endObject().startObject("properties").startObject("udid").field("type", "string").endObject().startObject("device_name").field("type", "string").field("index", "analyzed").endObject().startObject("privoder").field("type", "string").field("index", "analyzed").endObject().startObject("os_version").field("type", "string").endObject().endObject().endObject();PutMappingRequest putMappingRequest = Requests.putMappingRequest(index).type("basic_info").source(basisInfoMapping).type("device_info").source(deviceInfoMapping);System.out.println(putMappingRequest.indicesOptions());PutMappingResponse putMappingResponse = client.admin().indices().putMapping(putMappingRequest).get();System.out.println(putMappingResponse);
}
上面代码创建了一个名称为app_user_info的索引,该索引具有basic_info和device_info这2个type,可以通过elasticsearch_head插件,在Web管理页面上查看对应的索引信息。
索引单个文档
从文件中读取数据,一条记录构造一个文档,然后执行索引,代码如下所示:
protected static void indexDocs(final Client client, final String index, final String type, File in) {BufferedReader reader = null;try {reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));String line = null;while((line = reader.readLine()) != null) {String[] a = line.split("\t", -1);if(a.length == 16) {String udid = a[2];IndexResponse response =client.prepareIndex(index, type, udid).setSource(createSource(a)).get();System.out.println(response.toString());}}} catch (Exception e) {throw Throwables.propagate(e);} finally {closeQuietly(reader);}
}
批量索引
批量索引有多种方式,首先,通过Bulk API进行索引,我们自己控制每一个batch的大小,代码如下所示:
protected static void indexBulk(final Client client, final String index, final String type, File in) {BulkRequestBuilder bulkRequest = client.prepareBulk();final int batchSize = 100;int counter = 0;BufferedReader reader = null;try {reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));String line = null;while((line = reader.readLine()) != null) {String[] a = line.split("\t", -1);if(a.length == 16) {String udid = a[2];IndexRequestBuilder indexRequestBuilder =client.prepareIndex(index, type, udid).setSource(createSource(a));bulkRequest.add(indexRequestBuilder);if(++counter >= batchSize) {System.out.println(!bulkRequest.get().hasFailures());counter = 0;bulkRequest = client.prepareBulk();}}}} catch (Exception e) {throw Throwables.propagate(e);} finally {System.out.println(!bulkRequest.get().hasFailures());closeQuietly(reader);}
}
另一种方式,是根据ElasticSearch提供的Bulk Processor来实现,只需要设置相关参数,就可以实现批量索引,这种方式更加灵活,示例如下所示:
protected static void indexUsingBulkProcessor(final Client client, final String index, final String type, File in) throws InterruptedException {String name = "device_info_processor";int bulkActions = 1000;ByteSizeValue bulkSize = new ByteSizeValue(100, ByteSizeUnit.MB);TimeValue flushInterval = TimeValue.timeValueSeconds(60);int concurrentRequests = 12;// create bulk processorfinal BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {public void afterBulk(long id, BulkRequest req, BulkResponse resp) {System.out.println("id=" + id + ", resp=" + resp);}public void afterBulk(long id, BulkRequest req, Throwable cause) {System.out.println("id=" + id + ", req=" + req + ", cause=" + cause); }public void beforeBulk(long id, BulkRequest req) {System.out.println("id=" + id + ", req=" + req); }}).setName(name).setBulkActions(bulkActions).setBulkSize(bulkSize).setFlushInterval(flushInterval).setConcurrentRequests(concurrentRequests).build();// index documentsBufferedReader reader = null;try {reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));String line = null;while((line = reader.readLine()) != null) {String[] a = line.split("\t", -1);if(a.length == 16) {String udid = a[2];bulkProcessor.add(new IndexRequest(index, type, udid).source(createSource(a)));}}} catch (Exception e) {throw Throwables.propagate(e);} finally {closeQuietly(reader);// close bulk processorbulkProcessor.awaitClose(60, TimeUnit.SECONDS);}
}
可以通过实现自定义的BulkProcessor.Listener,它提供了Hook的功能,比如,索引某个文档失败的话,可以在Hook方法中增加处理,实现重试的功能;再比如,如果索引成功,给其他系统服务一个回调,等等。
更新文档
更新文档中的某些字段,需要指定id的值,以及需要更新的字段的值,代码如下所示:
protected static void updateDoc(final Client client, final String index, final String type) throws IOException, InterruptedException, ExecutionException {String id = "60e90ddcb1a61622028b8d92112a646c";UpdateRequest updateRequest = new UpdateRequest(index, type, id);updateRequest.doc(jsonBuilder().startObject().field("channel", "h-google").field("appid", "1").endObject());UpdateResponse response = client.update(updateRequest).get();System.out.println(response);
}
如果更新文档的时候,文档不存在,则需要先执行索引操作,再进行更新操作,将这两个操作合并到一起,使用upsert操作,代码如下所示:
protected static void upsertDoc(final Client client, final String index, final String type) throws IOException, InterruptedException, ExecutionException {String id = "fdd5ff7f56b613f0acb2c20a1ebc35e4";IndexRequest indexRequest = new IndexRequest(index, type, id).source(jsonBuilder().startObject().field("installid", "00000BSe").field("appid", "0").field("udid", "fdd5ff7f56b613f0acb2c20a1ebc35e4").field("channel", "A-wandoujia").field("version", "3.1.1").field("resolution", "960*540").field("mac", "00:08:22:be:1b:b7").field("device_type", "0").field("device_name", "HTC").field("producer", "alps").field("create_time", "2015-01-17 17:15:36").endObject());UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(jsonBuilder().startObject().field("resolution", "540*960").field("channel", "h-baidu").field("version", "3.1.1").field("imei", "861622010000056").endObject()).upsert(indexRequest); UpdateResponse response = client.update(updateRequest).get();System.out.println(response);
}
删除文档
删除文档,需要指定文档的id的值,代码如下所示:
protected static void deleteDoc(final Client client, final String index, final String type) {String id = "60e90ddcb1a61622028b8d92112a646c";DeleteResponse response = client.prepareDelete(index, type, id).get();System.out.println(response);
}
搜索文档
搜索文档,可以根据需要构造指定的查询(Query),可以设置过滤器等等,然后提交搜索,示例代码如下所示:
protected static void searchDocs(final Client client, final String index, final String type) {SearchResponse response = client.prepareSearch(index).setTypes(type).setQuery(QueryBuilders.termQuery("device_name", "xiaomi")).setPostFilter(QueryBuilders.rangeQuery("create_time").from("2015-01-16 00:00:00").to("2015-01-16 23:59:59")).setFrom(30).setSize(10).setExplain(true).execute().actionGet();System.out.println(response);
}
查询(Query)的构造有很多的方式,比如构造布尔查询,指定与、或、非关系,然后提交搜索。执行搜索,可以设置搜索文档的起始偏移位置以及每次取多少个结果文档,这便能实现分页功能。
其他话题
ElasticSearch最经典的软件栈组合就是ELK(ElasticSearch Logstash Kibana),其中ElasticSearch提供了实时查询分析数据的功能,是一个非常通用的搜索引擎系统,而Logstash是一个日志管理工具,能够收集日志,对日志进行管理,Kibana是一个基于页面的前端展示工具,非常方便地使ElasticSearch中的数据可视化,具体使用起来如何,如果感兴趣可以尝试一下。
另外,ElasticSearch也被好多开源大数据系统所拥抱,比如Cloudera的CDH也整合了ElasticSearch作为搜索系统,ElasticSearch也可以和其他系统,如Hadoop、HBase等进行整合,使用领域比较广泛。
参考链接
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/index.html
- https://www.elastic.co/downloads/elasticsearch
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/_basic_concepts.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/setup.html
- https://www.elastic.co/guide/en/elasticsearch/plugins/2.0/index.html
- https://www.elastic.co/guide/en/elasticsearch/plugins/2.0/installation.html
- https://www.elastic.co/guide/en/elasticsearch/plugins/2.0/analysis-icu.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/indices.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/indices-create-index.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/indices-delete-index.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/index-modules.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/mapping.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/mapping-params.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/search.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/search-search.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/search-uri-request.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/search-request-body.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/docs.html
- https://www.elastic.co/guide/en/elasticsearch/plugins/2.0/integrations.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/docs-bulk.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/modules-discovery.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/misc-cluster.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/query-filter-context.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/query-dsl-match-all-query.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/query-dsl-dis-max-query.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/query-dsl-bool-query.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/full-text-queries.html
- https://github.com/mobz/elasticsearch-head
- https://www.elastic.co/blog/found-java-clients-for-elasticsearch
- https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.0/client.html
- https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.0/node-client.html
- https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.0/java-docs.html
- https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.0/java-docs-bulk-processor.html
ElasticSearch-2.0.0集群安装配置与API使用实践相关推荐
- redis cluster 集群 安装 配置 详解
redis cluster 集群 安装 配置 详解 张映 发表于 2015-05-01 分类目录: nosql 标签:cluster, redis, 安装, 配置, 集群 Redis 集群是一个提供在 ...
- 一步步教你Hadoop多节点集群安装配置
一步步教你Hadoop多节点集群安装配置 1.集群部署介绍 1.1 Hadoop简介 Hadoop是Apache软件基金会旗下的一个开源分布式计算平台.以Hadoop分布式文件系统HDFS(Hado ...
- 原创:centos7.1下 ZooKeeper 集群安装配置+Python实战范例
centos7.1下 ZooKeeper 集群安装配置+Python实战范例 下载:http://apache.fayea.com/zookeeper/zookeeper-3.4.9/zookeepe ...
- websphere一直安装部署_WebSphere集群安装配置及部署应用说明
<WebSphere集群安装配置及部署应用说明>由会员分享,可在线阅读,更多相关<WebSphere集群安装配置及部署应用说明(27页珍藏版)>请在人人文库网上搜索. 1.We ...
- RabbitMQ集群安装配置+HAproxy+Keepalived高可用
RabbitMQ集群安装配置+HAproxy+Keepalived高可用 转自:https://www.linuxidc.com/Linux/2016-10/136492.htm rabbitmq 集 ...
- ZooKeeper-3.3.4集群安装配置
"ZooKeeper-3.3.4集群安装配置": 关键词:zookeeper-3.3.4 集群 安装 配置 zookeeper是一个分布式开源框架,提供了协调分布式应用的基本服务, ...
- Ceph分布式集群安装配置
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到网站:https://www.captainai.net/dongkelun 前言 记录Ceph分布式集群安 ...
- Openpbs centos7集群安装配置心得
Openpbs centos7集群安装配置心得 写在前面 准备工作 1.安装虚拟机 2.创建虚拟机集群 SSH免密登陆 网络环境配置 ssh免密登陆 建立NFS共享目录 关闭各节点防火墙和Selinu ...
- biee 12c linux 安装,oracle biee 12c linux ha 集群安装配置手册.pdf
oracle biee 12c linux ha 集群安装配置手册 Oracle BIEE 12c Linux 系统集群安装配置文档 仇 辉 2017 年 2 月 1 1 文档控制 编制 日期 作者 ...
最新文章
- java中集合和数据库中_java中list集合的内容,如何使用像数据库中group by形式那样排序...
- java ui调试_如何使用 IBM i System Debugger 调试 Java 程序
- CentOS/RHEL6.5中使用WordPress快速建站
- Javascript实现的左右滑动菜单
- 怒怼腾讯加班的应届生本人回应:已找到新工作
- ORA-28009:connection as SYS should be as SYSDBA OR SYSOPER
- python怎么背景实现循环_在Python的一段程序中如何使用多次事件循环详解
- Fragstats官方教程 [汉译版] 首发预告
- win10无法运行jre java_Windows10系统安装不了jre的解决方法
- 计算机1级b知识点,初中信息技术等级考试知识点
- Git全解 idea github gitee gitlab
- linux 添加系统启动,怎样把这个linux系统添加到启动选项?
- Win10配置Tensorflow-GPU
- QT软件开发之基础控件--2.4.4 QTextEdit文本编辑器
- html字体插件,20款jQuery CSS文字特效插件(有图有真相)
- Java程序:jstack
- 报童问题求解最大利润_提升Abaqus求解效率的七种武器
- 信号量机制【操作系统学习笔记】
- 深入讲解音视频编码原理,H264码流详解——手写H264编码器
- PDF 缩略图无法正常显示 解决办法