ElasticSearch是基于全文搜索引擎库Lucene构建的分布式搜索引擎,我们可以直接使用ElasticSearch实现分布式搜索系统的搭建与使用,都知道,Lucene只是一个搜索框架,它提供了搜索引擎操作的基本API,如果要实现一个能够使用的搜索引擎系统,还需要自己基于Lucene的API去实现,工作量很大,而且还需要很好地掌握Lucene的底层实现原理。
ElasticSearch是一个完整的分布式搜索引擎系统,它的一些基本特性包括如下:

  • 全文检索
  • 提供插件机制,可以共享重用插件的功能
  • 分布式文件存储
  • 分布式实时索引和搜索
  • 实时统计分析
  • 可以横向扩展,支持大规模数据的搜索
  • 简单易用的RESTfulAPI
  • 基于Replication实现了数据的高可用特性
  • 与其他系统的集成
  • 支持结构化和非结构化数据
  • 灵活的Schema设计(Mappings)
  • 支持多编程语言客户端

我个人感觉,ElasticSearch尽量屏蔽底层Lucene相关的技术细节,让你根本无从感觉底层Lucene相关的内容,这样你可以省去了了解Lucene 的成本,学习曲线比较平缓,不像Solr,如果想要构造负责的查询(Query),还是要对Lucene有所了解的。另外,在分布式设计方面,ElasticSearch更轻量一些,用起来更简单,而使用Solr的分布式分片功能需要使用SolrCloud,它基于ZooKeeper来实现配置管理,以及Replication功能,而且Solr需要使用Web容器来部署,相对来说有点复杂一些(我个人之前使用的SolrCloud版本大概是3.1~3.5左右,比较早,现在可能更加完善了)。

基本概念

我们熟悉一下ElasticSearch中涉及到的一些基本概念:

索引(Index)

索引(Index)是文档的集合,它是根据实际业务逻辑进行划分的,通常会把相对独立且具有相似结构或者性质的数据作为文档,放在一起,形成一个索引,比如,用户相关信息可以作为一个索引,交易相关信息也可应作为另一个索引。

类型(Type)

类型(Type)是索引内部的一个逻辑划分,在一个索引内部可以定义多个类型(Type),类型将一个索引在逻辑上划分为多个集合,每个类型包含多个属性(字段)。比如,我们基于手机客户端应用App,创建一个了用户相关信息的索引,然后再在这个索引内部定义多个类型:基本信息类型、设备信息类型、行为信息类型,基本信息类型中包含用户编号、证件号码、名称、手机号码、年龄、出生日期,设备信息类型包括设备类型、设备名称、App版本号、渠道来源、系统版本、IMEI、mac地址,用户行为信息包含用户编号、事件编号、事件类型、时间、浏览页面代码、地区编码,这样有3个类型在一个索引当中。ElasticSearch中类型,与HBase中列簇(Column Family)的概念很相似。

文档(Document)

文档(Document)是索引的基本单元,它与关系数据库中的一条记录相类似,包含了一组属性信息,同时包含一个唯一标识这一组属性值的ID,通过该ID可以更新一个文档,也可以删除一个文档。

分片(Shards)&副本(Replicas)

一个索引是很多文档的集合,将一个索引进行分割,分成多个片段(一个索引的子集),每一个片段称为一个分片(Shard),这样划分可以很好地管理索引,跨节点存储,为分布式存储于搜索提供了便利。副本(Replica)是为了保证一个分片(Shard)的可用性,冗余复制存储,当一个分片对应的数据无法读取时,可以读取其副本,正常提供搜索服务。

集群安装配置

ElasticSearch集群安装配置非常容易,安装可以执行如下命令行:

wget https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/2.0.0/elasticsearch-2.0.0.zip
unzip elasticsearch-2.0.0.zip

拿出集群的一个节点的进行配置,修改配置文件config/elasticsearch.yml的内容,如下所示:

# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
#       Before you set out to tweak and tune the configuration, make sure you
#       understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please see the documentation for further information on configuration options:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/setup-configuration.html>
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: dw_search_engine
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: esnode-01
#
# Add custom attributes to the node:
#
# node.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
path.data: /data/dw_search_storage
#
# Path to log files:
#
path.logs: /tmp/es/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
# bootstrap.mlockall: true
#
# Make sure that the `ES_HEAP_SIZE` environment variable is set to about half the memory
# available on the system and that the owner of the process is allowed to use this limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind adress to a specific IP (IPv4 or IPv6):
#
network.host: 10.10.2.62
#
# Set a custom port for HTTP:
#
http.port: 9200
#
# For more information, see the documentation at:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html>
#
# ---------------------------------- Gateway -----------------------------------
#
# Block initial recovery after a full cluster restart until N nodes are started:
#
# gateway.recover_after_nodes: 3
#
# For more information, see the documentation at:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-gateway.html>
#
# --------------------------------- Discovery ----------------------------------
#
# Elasticsearch nodes will find each other via unicast, by default.
#
# Pass an initial list of hosts to perform discovery when new node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
discovery.zen.ping.unicast.hosts: ["es-01", "es-02"]
#
# Prevent the "split brain" by configuring the majority of nodes (total number of nodes / 2 + 1):
#
# discovery.zen.minimum_master_nodes: 3
#
# For more information, see the documentation at:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html>
#
# ---------------------------------- Various -----------------------------------
#
# Disable starting multiple nodes on a single system:
#
# node.max_local_storage_nodes: 1
#
# Require explicit names when deleting indices:
#
# action.destructive_requires_name: true

其它节点的配置,在保证基本存储目录相同的前提下,可以根据需要修改如下几个参数:

node.name
network.host
http.port

最后,在每个节点上分别启动ElasticSearch,执行如下命令:

cd elasticsearch-2.0.0
bin/elasticsearch -d

然后可以查看Web管理界面,需要安装插件elasticsearch-head,后面会介绍,Web管理界面,如下所示:


上图中,我们已经创建了一个索引,可以看到节点的状态,及其分片(Shard)的情况。

RESTful API基本操作

尤其是在进行搜索的时候,为了使得其他系统能够与ElasticSearch搜索系统很好地解耦合,使用ElasticSearch提供的RESTful API是一种不错的选择。下面,我们介绍RESTful API的基本操作。

插件管理
插件的存放目录为elasticsearch-2.0.0/plugins/,插件都是基于该存储目录进行操作的。

安装插件

bin/plugin install analysis-icu
bin/plugin install mobz/elasticsearch-head

可以从不同的位置安装插件,上面第一个称为Core Elasticsearch plugin,它是Elasticsearch提供的,会从Elasticsearch上下载并安装;上面第一个是从Github上自动下载安装。还有其他的方式安装,如从特定的文件系统等进行安装。

列出插件

bin/plugin list


删除插件

bin/plugin remove analysis-icu

安装完一个插件,我们可以查看,例如查看elasticsearch_head插件,查看如下链接:

http://10.10.2.62:9200/_plugin/head/


创建索引

curl -XPUT 'http://10.10.2.62:9200/basis_device_info/'

创建的索引名称为basis_device_info,我们也可以不指定一个索引对应的Mappings,而是在索引的时候自动生成Mappings,所以如果没有指定一个索引的Mappings,则这个索引可以支持任何的Mappings。同样可知,一个索引可以自动地增加不同的type,非常灵活。
也可以指定索引的基本配置,如分片(Shard)数目、副本(Replica)数目,如下所示:

curl -XPUT 'http://10.10.2.62:9200/basis_device_info /' -d '{"settings" : {"index" : {"number_of_shards" : 10,"number_of_replicas" : 1}}
}'

默认是5个分片,不进行复制,上面配置表示索引basis_device_info有10个分片,每个分片1个副本。
下面在创建索引的时候,指定设计的schema,即配置mappings,如下所示:

curl -XPUT 'http://10.10.2.62:9200/basis_device_info/' -d '
{"mappings": {"user": {"_all":       { "enabled": false  },"properties": {"installid":    { "type": "string"  },"appid":    { "type": "string"  },"channel":  { "type":   "string", "index":  "analyzed" },"version":    { "type": "string"  },"osversion":    { "type": "string"  },"device_name":    { "type": "string", "index":  "analyzed"   },"producer":    { "type": "string"  },"device_type":    { "type": "string"  },"resolution":    { "type": "string", "index":  "analyzed"  },"screen_size":    { "type": "string", "index":  "analyzed"  },"mac":    { "type": "string", "index":  "not_analyzed"  },"idfa":    { "type": "string"  },"idfv":    { "type": "string", "index":  "not_analyzed"  },"imei":    { "type": "string", "index":  "not_analyzed"  },"create_time":  {"type":   "date","format": "yyyy-MM-dd HH:mm:ss","index":  "not_analyzed"}}}}
}'

上面创建了索引basis_device_info,同时type为user,有了mappings,我们就知道需要索引的数据的格式了。

删除索引

curl -XDELETE 'http://10.10.2.62:9200/basis_device_info/'

删除索引basis_device_info。

索引文档

curl -PUT 'http://10.10.2.62:9200/basis_device_info/user/CC49E748588490D41BFB89584007B0FA' -d '{"installid":    "0000000L","appid":    "0","udid":     "CC49E748588490D41BFB89584007B0FA","channel":  "wulei1","version":    "3.1.2","osversion":    "8.1","device_name":    "iPhone Retina4 Simulator","producer":    "apple","device_type":    "1","resolution":    "640*1136","screen_size":    "320*568","mac":    "600308A20C5E","idfa":    "dbbbs-fdsfa-fafda-321saf","idfv":    "4283FAE1-19EB-4FA9-B739-8148F76BC8C3","imei":    "af-sfd0fdsa-fad-ff","create_time":  "2015-01-14 20:32:05"
}'

基于我们前面创建的type为user的索引,索引一个文档,文档_id为CC49E748588490D41BFB89584007B0FA,文档内容为一个用户设备信息,使用JSON格式表示。

批量索引
批量索引,可以根据自己熟悉的编程语言或者脚本来实现,ElasticSearch也提供了一些客户端库。下面我们首先根据数据文件,构造成ElasticSearch索引支持的JSON格式,导出文件,然后通过curl工具去进行批量索引,实际上使用的是ElasticSearch提供的bulk API来实现的。
首先处理原始带索引数据,代码如下所示:

package org.shirdrn.es;import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;import net.sf.json.JSONObject;import com.google.common.base.Throwables;public class EsIndexingClient {public static void closeQuietly(Closeable... closeables) {if(closeables != null) {for(Closeable closeable : closeables) {try {closeable.close();} catch (Exception e) { }}}}public static void main(String[] args) {String f = "C:\\Users\\yanjun\\Desktop\\basis_device_info.txt";String out = "C:\\Users\\yanjun\\Desktop\\basis_device_info.json";File in = new File(f);BufferedReader reader = null;BufferedWriter writer = null;try {writer = new BufferedWriter(new FileWriter(out));reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));String line = null;while((line = reader.readLine()) != null) {String[] a = line.split("\t", -1);if(a.length == 16) {String udid = a[2];JSONObject c = new JSONObject();c.put("_index", "basis_device_info");c.put("_type", "user");c.put("_id", udid);JSONObject index = new JSONObject();index.put("index", c);JSONObject doc = new JSONObject();doc.put("installid", a[0]);doc.put("appid", a[1]);doc.put("udid", a[2]);doc.put("channel", a[3]);doc.put("version", a[4]);doc.put("osversion", a[5]);doc.put("device_name", a[6]);doc.put("producer", a[7]);doc.put("device_type", a[8]);doc.put("resolution", a[9]);doc.put("screen_size", a[10]);doc.put("mac", a[11]);doc.put("idfa", a[12]);doc.put("idfv", a[13]);doc.put("imei", a[14]);doc.put("create_time", a[15]);writer.write(index.toString() + "\n");writer.write(doc.toString() + "\n");}}} catch (Exception e) {throw Throwables.propagate(e);} finally {closeQuietly(reader, writer);}}
}

运行代码,输出的数据文件为basis_device_info.json,该文件的格式了,示例如下所示:

{"index":{"_index":"basis_device_info","_type":"user","_id":"1c207122a4b2c9632212ab86bac10f60"}}
{"installid":"00000002","appid":"0","udid":"1c207122a4b2c9632212ab86bac10f60","channel":"itings","version":"3.1.1","osversion":"4.1.2","device_name":"Lenovo P770","producer":"Lenovo","device_type":"0","resolution":"540*960","screen_size":"4.59","mac":"d4:22:3f:83:17:06","idfa":"","idfv":"","imei":"861166023335745","create_time":"2015-01-14 19:39:35"}
{"index":{"_index":"basis_device_info","_type":"user","_id":"FA6B1B98E6FF4E6994A1505A996F6102"}}
{"installid":"00000003","appid":"0","udid":"FA6B1B98E6FF4E6994A1505A996F6102","channel":"appstore","version":"3.1.1","osversion":"8.1.2","device_name":"iPhone 6Plus","producer":"apple","device_type":"1","resolution":"640*1136","screen_size":"320*568","mac":"020000000000","idfa":"84018625-A3C9-47A8-88D0-C57C12F80520","idfv":"9D1E2514-9DC8-47A8-ABD0-129FC0FB3171","imei":"","create_time":"2015-01-14 19:41:21"}
{"index":{"_index":"basis_device_info","_type":"user","_id":"8c5fe70b2408f184abcbe4f34b8f23c3"}}
{"installid":"00000004","appid":"0","udid":"8c5fe70b2408f184abcbe4f34b8f23c3","channel":"itings","version":"3.1.1.014","osversion":"4.2.2","device_name":"2014011","producer":"Xiaomi","device_type":"0","resolution":"720*1280","screen_size":"4.59","mac":"0c:1d:af:4f:48:9f","idfa":"","idfv":"","imei":"865763025472173","create_time":"2015-01-14 19:46:37"}

奇数编号行的内容为索引的指令信息,包括索引名称(_index)、类型(_type)、唯一标识(_id),偶数编号行的内容为实际待索引的文档数据。
然后,通过curl命令来进行批量索引,执行如下命令:

查看源代码打印帮助

curl -s -XPOST http://10.10.2.62:9200/basis_device_info/_bulk --data-binary "@basis_device_info.json"


搜索文档
简单的搜索,可以通过GET方式搜索,如下所示:

http://10.10.2.62:9200/basis_device_info/user/CC49E748588490D41BFB89584007B0FAhttp://10.10.2.62:9200/basis_device_info/user/_search?q=channel:B-hicloud

上面第一个根据唯一的_id进行搜索,结果返回0个或者1个文档;第二个通过指定GET方式参数,其中_search和q是ElasticSearch内置的接口关键字,通过指定字段名称和搜索关键词的方式进行搜索,结果以JSON格式返回。

Request Body搜索
可以设置请求的body内容,能够支持更加复杂的查询条件然后请求搜索,如下所示:

curl -XGET 'http://10.10.2.245:9200/basis_device_info/user/_search' -d '{"query" : {"term" : { "udid": "bc0af2ca66a96725b8b0e0056d4213b6" }}
}'

结果示例,如下所示:

{"took":11,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":9.45967,"hits":[{"_index":"basis_device_info","_type":"user","_id":"bc0af2ca66a96725b8b0e0056d4213b6","_score":9.45967,"_source":{"installid":"00000FPq","appid":"0","udid":"bc0af2ca66a96725b8b0e0056d4213b6","channel":"B-hicloud","version":"3.1.1","osversion":"4.4.2","device_name":"H60-L02","producer":"HUAWEI","device_type":"0","resolution":"720*1184","screen_size":"4.64","mac":"ec:cb:30:c4:93:e3","idfa":"","idfv":"","imei":"864103021536104","create_time":"2015-01-18 01:29:16"}}]}}

基于Lucene查询语法搜索
如果熟悉Lucene查询(Query),可以构造通过构造复杂的Term关系字符串来进行搜索,示例如下所示:

curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d '
{"query": {"query_string": { "query": "(channel:baidu OR device_name:HUAWEI)" }}
}'

查询query字符串的含义是:从channel字段搜索baidu,从device_name字段搜索HUAWEI,然后两者取并集,这实际上一个布尔查询,返回最终结果。

使用multi_match搜索
ElasticSearch支持给定搜索关键词,从多个字段中进行搜索,示例如下所示:

curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d '
{"query": {"multi_match" : {"query":    "HTC","fields": [ "channel", "device_name" ]}}
}'

这样,只要在channel和device_name两个字段中出现关键词HTC,则都返回结果,结果应该是两个字段匹配上的文档集合的并集。

支持Filter搜索
可以在制定Filter进行搜索。例如下面是一个按照时间范围进行过滤,得到搜索结果的查询:

curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d '
{"query": {"filtered": {"query": { "match_all": {} },"filter" : {"range" : {"create_time" : { "from" : "2015-01-16 00:00:00", "to" : "2015-01-16 23:59:59" }}}}}
}'

分页搜索
ElasticSearch支持分页搜索,可以通过在RESTful连接中指定size和from参数,来进行分页搜索,如下所示:

curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search?size=10&from=20' -d '
{"query": {"filtered": {"query": { "match_all": {} },"filter" : {"range" : {"create_time" : { "from" : "2015-01-16 00:00:00", "to" : "2015-01-16 23:59:59" }}}}}
}'

上面搜索的含义是:按照时间范围搜索,从第20个文档开始,返回10个文档,相当于一页取10个文档。

Java客户端

如果熟悉Java语言,而不想使用脚本等其他方式操作ElasticSearch搜索集群,则可以使用ElasticSearch提供的Java客户端API来编码实现,能够更加灵活地控制。ElasticSearch提供的Java客户端支持全部常用操作,如更新索引、索引文档、搜索文档、删除索引等等操作,而且还支持其他一些功能,如同步异步模式、explain查询等,下面我们通过代码来了解一下。
如果使用Maven管理Java代码,可以在pom.xml文件中加入如下依赖:

<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>2.0.0</version>
</dependency>

创建一个ElasticSearch客户端,代码如下所示:

// create & configure client
Settings settings = Settings.settingsBuilder().put("cluster.name", "dw_search_engine").put("client.transport.sniff", true).build();
final Client client = TransportClient.builder().settings(settings).build().addTransportAddress(newAddress("es-01", 9300)).addTransportAddress(newAddress("es-02", 9300));

可以将你的ElasticSearch集群的节点通过上面的addTransportAddress方法,都与Client对象关联起来,这样在操作ElasticSearch集群中的索引/更新/删除/搜索文档的时候,就能够自动感知。上面newAddress方法如下:

private static InetSocketTransportAddress newAddress(String host, int port) throws UnknownHostException {return new InetSocketTransportAddress(InetAddress.getByName(host), port);
}

另外,也可以通过在配置文件elasticsearch.yml中指定相关配置,例如:

cluster.name: dw_search_engine
client.transport.sniff: true
client.transport.ping_timeout: 10s
client.transport.nodes_sampler_interval: 10s

那么,创建客户端需要从配置文件中读取配置内容,具体可以查看官方文档。

准备工作
索引的时候,我们是从一个本地文件中读取数据,并构建索引文档需要的格式,然后请求ElasticSearch集群执行索引操作,下面代码是一些基本准备工作:

final String index = "basis_device_info";
final String type = "user";// index documents
String f = "C:\\Users\\yanjun\\Desktop\\basis_device_info.txt";
File in = new File(f);

从文件中,每次读取一行记录,然后构建一个JSON格式字符串,通过XContentBuilder来表示,代码如下所示:

protected static XContentBuilder createSource(String[] a) throws IOException {return jsonBuilder().startObject().field("installid", a[0]).field("appid", a[1]).field("udid", a[2]).field("channel", a[3]).field("version", a[4]).field("osversion", a[5]).field("device_name", a[6]).field("producer", a[7]).field("device_type", a[8]).field("resolution", a[9]).field("screen_size", a[10]).field("mac", a[11]).field("idfa", a[12]).field("idfv", a[13]).field("imei", a[14]).field("create_time", a[15]).endObject();
}

下面我们从API的功能入手,分别详细说明,并附加代码展示用法。

创建索引
可以直接通过Java客户端库来创建索引,代码如下所示:

protected static void createIndex(final Client client, String index) {Map<String, Object> indexSettings = Maps.newHashMap();indexSettings.put("number_of_shards", "4");indexSettings.put("number_of_replicas", "1");CreateIndexRequest createIndexRequest = new CreateIndexRequest(index, Settings.settingsBuilder().put(indexSettings).build());CreateIndexResponse createIndexResponse = client.admin().indices().create(createIndexRequest).actionGet();System.out.println(createIndexResponse);
}

创建Mappings
通过Java客户端创建Mappings,相对比较复杂一点,需要拼接对应的JSON字符串,实现代码如下所示:

protected static void createMappings(final Client client, String index) throws IOException, InterruptedException, ExecutionException {XContentBuilder basisInfoMapping = jsonBuilder().startObject().startObject("_all").field("enabled", "false").endObject().startObject("properties").startObject("id").field("type", "string").endObject().startObject("name").field("type", "string").field("index", "analyzed").endObject().startObject("age").field("type", "int").endObject().startObject("birthday").field("type", "date").field("format", "yyyy-MM-dd HH:mm:ss").field("index", "not_analyzed").endObject().endObject().endObject();XContentBuilder deviceInfoMapping = jsonBuilder().startObject().startObject("_all").field("enabled", "false").endObject().startObject("properties").startObject("udid").field("type", "string").endObject().startObject("device_name").field("type", "string").field("index", "analyzed").endObject().startObject("privoder").field("type", "string").field("index", "analyzed").endObject().startObject("os_version").field("type", "string").endObject().endObject().endObject();PutMappingRequest putMappingRequest = Requests.putMappingRequest(index).type("basic_info").source(basisInfoMapping).type("device_info").source(deviceInfoMapping);System.out.println(putMappingRequest.indicesOptions());PutMappingResponse putMappingResponse = client.admin().indices().putMapping(putMappingRequest).get();System.out.println(putMappingResponse);
}

上面代码创建了一个名称为app_user_info的索引,该索引具有basic_info和device_info这2个type,可以通过elasticsearch_head插件,在Web管理页面上查看对应的索引信息。

索引单个文档
从文件中读取数据,一条记录构造一个文档,然后执行索引,代码如下所示:

protected static void indexDocs(final Client client, final String index, final String type, File in) {BufferedReader reader = null;try {reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));String line = null;while((line = reader.readLine()) != null) {String[] a = line.split("\t", -1);if(a.length == 16) {String udid = a[2];IndexResponse response =client.prepareIndex(index, type, udid).setSource(createSource(a)).get();System.out.println(response.toString());}}} catch (Exception e) {throw Throwables.propagate(e);} finally {closeQuietly(reader);}
}

批量索引
批量索引有多种方式,首先,通过Bulk API进行索引,我们自己控制每一个batch的大小,代码如下所示:

protected static void indexBulk(final Client client, final String index, final String type, File in) {BulkRequestBuilder bulkRequest = client.prepareBulk();final int batchSize = 100;int counter = 0;BufferedReader reader = null;try {reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));String line = null;while((line = reader.readLine()) != null) {String[] a = line.split("\t", -1);if(a.length == 16) {String udid = a[2];IndexRequestBuilder indexRequestBuilder =client.prepareIndex(index, type, udid).setSource(createSource(a));bulkRequest.add(indexRequestBuilder);if(++counter >= batchSize) {System.out.println(!bulkRequest.get().hasFailures());counter = 0;bulkRequest = client.prepareBulk();}}}} catch (Exception e) {throw Throwables.propagate(e);} finally {System.out.println(!bulkRequest.get().hasFailures());closeQuietly(reader);}
}

另一种方式,是根据ElasticSearch提供的Bulk Processor来实现,只需要设置相关参数,就可以实现批量索引,这种方式更加灵活,示例如下所示:

protected static void indexUsingBulkProcessor(final Client client, final String index, final String type, File in) throws InterruptedException {String name = "device_info_processor";int bulkActions = 1000;ByteSizeValue bulkSize = new ByteSizeValue(100, ByteSizeUnit.MB);TimeValue flushInterval = TimeValue.timeValueSeconds(60);int concurrentRequests = 12;// create bulk processorfinal BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {public void afterBulk(long id, BulkRequest req, BulkResponse resp) {System.out.println("id=" + id + ", resp=" + resp);}public void afterBulk(long id, BulkRequest req, Throwable cause) {System.out.println("id=" + id + ", req=" + req + ", cause=" + cause);              }public void beforeBulk(long id, BulkRequest req) {System.out.println("id=" + id + ", req=" + req);              }}).setName(name).setBulkActions(bulkActions).setBulkSize(bulkSize).setFlushInterval(flushInterval).setConcurrentRequests(concurrentRequests).build();// index documentsBufferedReader reader = null;try {reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));String line = null;while((line = reader.readLine()) != null) {String[] a = line.split("\t", -1);if(a.length == 16) {String udid = a[2];bulkProcessor.add(new IndexRequest(index, type, udid).source(createSource(a)));}}} catch (Exception e) {throw Throwables.propagate(e);} finally {closeQuietly(reader);// close bulk processorbulkProcessor.awaitClose(60, TimeUnit.SECONDS);}
}

可以通过实现自定义的BulkProcessor.Listener,它提供了Hook的功能,比如,索引某个文档失败的话,可以在Hook方法中增加处理,实现重试的功能;再比如,如果索引成功,给其他系统服务一个回调,等等。

更新文档
更新文档中的某些字段,需要指定id的值,以及需要更新的字段的值,代码如下所示:

protected static void updateDoc(final Client client, final String index, final String type) throws IOException, InterruptedException, ExecutionException {String id = "60e90ddcb1a61622028b8d92112a646c";UpdateRequest updateRequest = new UpdateRequest(index, type, id);updateRequest.doc(jsonBuilder().startObject().field("channel", "h-google").field("appid", "1").endObject());UpdateResponse response = client.update(updateRequest).get();System.out.println(response);
}

如果更新文档的时候,文档不存在,则需要先执行索引操作,再进行更新操作,将这两个操作合并到一起,使用upsert操作,代码如下所示:

protected static void upsertDoc(final Client client, final String index, final String type) throws IOException, InterruptedException, ExecutionException {String id = "fdd5ff7f56b613f0acb2c20a1ebc35e4";IndexRequest indexRequest = new IndexRequest(index, type, id).source(jsonBuilder().startObject().field("installid", "00000BSe").field("appid", "0").field("udid", "fdd5ff7f56b613f0acb2c20a1ebc35e4").field("channel", "A-wandoujia").field("version", "3.1.1").field("resolution", "960*540").field("mac", "00:08:22:be:1b:b7").field("device_type", "0").field("device_name", "HTC").field("producer", "alps").field("create_time", "2015-01-17 17:15:36").endObject());UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(jsonBuilder().startObject().field("resolution", "540*960").field("channel", "h-baidu").field("version", "3.1.1").field("imei", "861622010000056").endObject()).upsert(indexRequest);             UpdateResponse response = client.update(updateRequest).get();System.out.println(response);
}


删除文档
删除文档,需要指定文档的id的值,代码如下所示:

protected static void deleteDoc(final Client client, final String index, final String type) {String id = "60e90ddcb1a61622028b8d92112a646c";DeleteResponse response = client.prepareDelete(index, type, id).get();System.out.println(response);
}

搜索文档
搜索文档,可以根据需要构造指定的查询(Query),可以设置过滤器等等,然后提交搜索,示例代码如下所示:

protected static void searchDocs(final Client client, final String index, final String type) {SearchResponse response = client.prepareSearch(index).setTypes(type).setQuery(QueryBuilders.termQuery("device_name", "xiaomi")).setPostFilter(QueryBuilders.rangeQuery("create_time").from("2015-01-16 00:00:00").to("2015-01-16 23:59:59")).setFrom(30).setSize(10).setExplain(true).execute().actionGet();System.out.println(response);
}

查询(Query)的构造有很多的方式,比如构造布尔查询,指定与、或、非关系,然后提交搜索。执行搜索,可以设置搜索文档的起始偏移位置以及每次取多少个结果文档,这便能实现分页功能。

其他话题

ElasticSearch最经典的软件栈组合就是ELK(ElasticSearch Logstash Kibana),其中ElasticSearch提供了实时查询分析数据的功能,是一个非常通用的搜索引擎系统,而Logstash是一个日志管理工具,能够收集日志,对日志进行管理,Kibana是一个基于页面的前端展示工具,非常方便地使ElasticSearch中的数据可视化,具体使用起来如何,如果感兴趣可以尝试一下。
另外,ElasticSearch也被好多开源大数据系统所拥抱,比如Cloudera的CDH也整合了ElasticSearch作为搜索系统,ElasticSearch也可以和其他系统,如Hadoop、HBase等进行整合,使用领域比较广泛。

参考链接

  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/index.html
  • https://www.elastic.co/downloads/elasticsearch
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/_basic_concepts.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/setup.html
  • https://www.elastic.co/guide/en/elasticsearch/plugins/2.0/index.html
  • https://www.elastic.co/guide/en/elasticsearch/plugins/2.0/installation.html
  • https://www.elastic.co/guide/en/elasticsearch/plugins/2.0/analysis-icu.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/indices.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/indices-create-index.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/indices-delete-index.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/index-modules.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/mapping.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/mapping-params.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/search.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/search-search.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/search-uri-request.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/search-request-body.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/docs.html
  • https://www.elastic.co/guide/en/elasticsearch/plugins/2.0/integrations.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/docs-bulk.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/modules-discovery.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/misc-cluster.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/query-filter-context.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/query-dsl-match-all-query.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/query-dsl-dis-max-query.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/query-dsl-bool-query.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/full-text-queries.html
  • https://github.com/mobz/elasticsearch-head
  • https://www.elastic.co/blog/found-java-clients-for-elasticsearch
  • https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.0/client.html
  • https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.0/node-client.html
  • https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.0/java-docs.html
  • https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.0/java-docs-bulk-processor.html

ElasticSearch-2.0.0集群安装配置与API使用实践相关推荐

  1. redis cluster 集群 安装 配置 详解

    redis cluster 集群 安装 配置 详解 张映 发表于 2015-05-01 分类目录: nosql 标签:cluster, redis, 安装, 配置, 集群 Redis 集群是一个提供在 ...

  2. 一步步教你Hadoop多节点集群安装配置

    一步步教你Hadoop多节点集群安装配置 1.集群部署介绍 1.1 Hadoop简介  Hadoop是Apache软件基金会旗下的一个开源分布式计算平台.以Hadoop分布式文件系统HDFS(Hado ...

  3. 原创:centos7.1下 ZooKeeper 集群安装配置+Python实战范例

    centos7.1下 ZooKeeper 集群安装配置+Python实战范例 下载:http://apache.fayea.com/zookeeper/zookeeper-3.4.9/zookeepe ...

  4. websphere一直安装部署_WebSphere集群安装配置及部署应用说明

    <WebSphere集群安装配置及部署应用说明>由会员分享,可在线阅读,更多相关<WebSphere集群安装配置及部署应用说明(27页珍藏版)>请在人人文库网上搜索. 1.We ...

  5. RabbitMQ集群安装配置+HAproxy+Keepalived高可用

    RabbitMQ集群安装配置+HAproxy+Keepalived高可用 转自:https://www.linuxidc.com/Linux/2016-10/136492.htm rabbitmq 集 ...

  6. ZooKeeper-3.3.4集群安装配置

    "ZooKeeper-3.3.4集群安装配置": 关键词:zookeeper-3.3.4 集群 安装 配置 zookeeper是一个分布式开源框架,提供了协调分布式应用的基本服务, ...

  7. Ceph分布式集群安装配置

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到网站:https://www.captainai.net/dongkelun 前言 记录Ceph分布式集群安 ...

  8. Openpbs centos7集群安装配置心得

    Openpbs centos7集群安装配置心得 写在前面 准备工作 1.安装虚拟机 2.创建虚拟机集群 SSH免密登陆 网络环境配置 ssh免密登陆 建立NFS共享目录 关闭各节点防火墙和Selinu ...

  9. biee 12c linux 安装,oracle biee 12c linux ha 集群安装配置手册.pdf

    oracle biee 12c linux ha 集群安装配置手册 Oracle BIEE 12c Linux 系统集群安装配置文档 仇 辉 2017 年 2 月 1 1 文档控制 编制 日期 作者 ...

最新文章

  1. java中集合和数据库中_java中list集合的内容,如何使用像数据库中group by形式那样排序...
  2. java ui调试_如何使用 IBM i System Debugger 调试 Java 程序
  3. CentOS/RHEL6.5中使用WordPress快速建站
  4. Javascript实现的左右滑动菜单
  5. 怒怼腾讯加班的应届生本人回应:已找到新工作
  6. ORA-28009:connection as SYS should be as SYSDBA OR SYSOPER
  7. python怎么背景实现循环_在Python的一段程序中如何使用多次事件循环详解
  8. Fragstats官方教程 [汉译版] 首发预告
  9. win10无法运行jre java_Windows10系统安装不了jre的解决方法
  10. 计算机1级b知识点,初中信息技术等级考试知识点
  11. Git全解 idea github gitee gitlab
  12. linux 添加系统启动,怎样把这个linux系统添加到启动选项?
  13. Win10配置Tensorflow-GPU
  14. QT软件开发之基础控件--2.4.4 QTextEdit文本编辑器
  15. html字体插件,20款jQuery CSS文字特效插件(有图有真相)
  16. Java程序:jstack
  17. 报童问题求解最大利润_提升Abaqus求解效率的七种武器
  18. 信号量机制【操作系统学习笔记】
  19. 深入讲解音视频编码原理,H264码流详解——手写H264编码器
  20. PDF 缩略图无法正常显示 解决办法

热门文章

  1. 苹果有的功能android没有的,安卓手机特有苹果手机没有的几大功能,你知道吗?...
  2. 计算机硬盘的重要性,对设计师而言为何高性能的硬盘很重要
  3. 【西米软件推荐】GoodSync 轻松实现多台电脑同步文件!
  4. ubuntu16.04系统清理
  5. 【计算机毕业设计】73.房屋租赁系统求租合同源码
  6. Win10 解决AMD平台下SVM无法开启的问题
  7. pc 端 TIM双击无法启动解决办法
  8. Node使用微信上传临时素材接口
  9. 五大列级庄_什么是列级酒庄
  10. 该不该花这么多钱学新技能?该不该为了梦想提前消费!