1.概述

  目前,随着大数据的浪潮,Kafka 被越来越多的企业所认可,如今的Kafka已发展到0.10.x,其优秀的特性也带给我们解决实际业务的方案。对于数据分流来说,既可以分流到离线存储平台(HDFS),离线计算平台(Hive仓库),也可以分流实时流水计算(Storm,Spark)等,同样也可以分流到海量数据查询(HBase),或是及时查询(ElasticSearch)。而今天笔者给大家分享的就是Kafka 分流数据到 ElasticSearch。

2.内容

  我们知道,ElasticSearch是有其自己的套件的,简称ELK,即ElasticSearch,Logstash以及Kibana。ElasticSearch负责存储,Logstash负责收集数据来源,Kibana负责可视化数据,分工明确。想要分流Kafka中的消息数据,可以使用Logstash的插件直接消费,但是需要我们编写复杂的过滤条件,和特殊的映射处理,比如系统保留的`_uid`字段等需要我们额外的转化。今天我们使用另外一种方式来处理数据,使用Kafka的消费API和ES的存储API来处理分流数据。通过编写Kafka消费者,消费对应的业务数据,将消费的数据通过ES存储API,通过创建对应的索引的,存储到ES中。其流程如下图所示:

  上图可知,消费收集的数据,通过ES提供的存储接口进行存储。存储的数据,这里我们可以规划,做定时调度。最后,我们可以通过Kibana来可视化ES中的数据,对外提供业务调用接口,进行数据共享。

3.实现

  下面,我们开始进行实现细节处理,这里给大家提供实现的核心代码部分,实现代码如下所示:

3.1 定义ES格式

  我们以插件的形式进行消费,从Kafka到ES的数据流向,只需要定义插件格式,如下所示:

{"job": {"content": {"reader": {"name": "kafka","parameter": {"topic": "kafka_es_client_error","groupid": "es2","bootstrapServers": "k1:9094,k2:9094,k3:9094"},"threads": 6},"writer": {"name": "es","parameter": {"host": ["es1:9300,es2:9300,es3:9300"],"index": "client_error_%s","type": "client_error"}}}}
}

  这里处理消费存储的方式,将读和写的源分开,配置各自属性即可。

3.2 数据存储

  这里,我们通过每天建立索引进行存储,便于业务查询,实现细节如下所示:

public class EsProducer {private final static Logger LOG = LoggerFactory.getLogger(EsProducer.class);private final KafkaConsumer<String, String> consumer;private ExecutorService executorService;private Configuration conf = null;private static int counter = 0;public EsProducer() {String root = System.getProperty("user.dir") + "/conf/";String path = SystemConfigUtils.getProperty("kafka.x.plugins.exec.path");conf = Configuration.from(new File(root + path));Properties props = new Properties();props.put("bootstrap.servers", conf.getString("job.content.reader.parameter.bootstrapServers"));props.put("group.id", conf.getString("job.content.reader.parameter.groupid"));props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(conf.getString("job.content.reader.parameter.topic")));}public void execute() {executorService = Executors.newFixedThreadPool(conf.getInt("job.content.reader.threads"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);if (null != records) {executorService.submit(new KafkaConsumerThread(records, consumer));}}}public void shutdown() {try {if (consumer != null) {consumer.close();}if (executorService != null) {executorService.shutdown();}if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {LOG.error("Shutdown kafka consumer thread timeout.");}} catch (InterruptedException ignored) {Thread.currentThread().interrupt();}}class KafkaConsumerThread implements Runnable {private ConsumerRecords<String, String> records;public KafkaConsumerThread(ConsumerRecords<String, String> records, KafkaConsumer<String, String> consumer) {this.records = records;}@Overridepublic void run() {String index = conf.getString("job.content.writer.parameter.index");String type = conf.getString("job.content.writer.parameter.type");for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);for (ConsumerRecord<String, String> record : partitionRecords) {JSONObject json = JSON.parseObject(record.value());List<Map<String, Object>> list = new ArrayList<>();Map<String, Object> map = new HashMap<>();index = String.format(index, CalendarUtils.timeSpan2EsDay(json.getLongValue("_tm") * 1000L));if (counter < 10) {LOG.info("Index : " + index);counter++;}for (String key : json.keySet()) {if ("_uid".equals(key)) {map.put("uid", json.get(key));} else {map.put(key, json.get(key));}list.add(map);}EsUtils.write2Es(index, type, list);}}}}}

  这里消费的数据源就处理好了,接下来,开始ES的存储,实现代码如下所示:

public class EsUtils {private static TransportClient client = null;static {if (client == null) {client = new PreBuiltTransportClient(Settings.EMPTY);}String root = System.getProperty("user.dir") + "/conf/";String path = SystemConfigUtils.getProperty("kafka.x.plugins.exec.path");Configuration conf = Configuration.from(new File(root + path));List<Object> hosts = conf.getList("job.content.writer.parameter.host");for (Object object : hosts) {try {client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(object.toString().split(":")[0]), Integer.parseInt(object.toString().split(":")[1])));} catch (Exception e) {e.printStackTrace();}}}public static void write2Es(String index, String type, List<Map<String, Object>> dataSets) {BulkRequestBuilder bulkRequest = client.prepareBulk();for (Map<String, Object> dataSet : dataSets) {bulkRequest.add(client.prepareIndex(index, type).setSource(dataSet));}bulkRequest.execute().actionGet();// if (client != null) {// client.close();// }}public static void close() {if (client != null) {client.close();}}
}

  这里,我们利用BulkRequestBuilder进行批量写入,减少频繁写入率。

4.调度

  存储在ES中的数据,如果不需要长期存储,比如:我们只需要存储及时查询数据一个月,对于一个月以前的数据需要清除掉。这里,我们可以编写脚本直接使用Crontab来进行简单调用即可,脚本如下所示:

#!/bin/sh
# <Usage>: ./delete_es_by_day.sh kafka_error_client logsdate 30 </Usage>echo "<Usage>: ./delete_es_by_day.sh kafka_error_client logsdate 30 </Usage>"
index_name=$1
daycolumn=$2
savedays=$3
format_day=$4if [ ! -n "$savedays" ]; thenecho "Oops. The args is not right,please input again...."exit 1
fiif [ ! -n "$format_day" ]; thenformat_day='%Y%m%d'
fisevendayago=`date -d "-${savedays} day " +${format_day}`curl -XDELETE "es1:9200/${index_name}/_query?pretty" -d "
{"query": {"filtered": {"filter": {"bool": {"must": {"range": {"${daycolumn}": {"from": null,"to": ${sevendayago},"include_lower": true,"include_upper": true}}}}}}}
}"

echo "Finished."

然后,在Crontab中进行定时调度即可。

5.总结

  这里,我们在进行数据写入ES的时候,需要注意,有些字段是ES保留字段,比如`_uid`,这里我们需要转化,不然写到ES的时候,会引发冲突导致异常,最终写入失败。

6.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉

联系方式:
邮箱:smartloli.org@gmail.com
Twitter:https://twitter.com/smartloli
QQ群(Hadoop - 交流社区1):424769183
温馨提示:请大家加群的时候写上加群理由(姓名+公司/学校),方便管理员审核,谢谢!

热爱生活,享受编程,与君共勉!

作者:哥不是小萝莉 [关于我][犒赏]

出处:http://www.cnblogs.com/smartloli/

转载请注明出处,谢谢合作!

Elasticsearch 与 Kafka 整合剖析相关推荐

  1. 【Kafka】Elasticsearch 与 Kafka 整合剖析

    1.概述 转载:https://www.cnblogs.com/smartloli/p/6978645.html 目前,随着大数据的浪潮,Kafka 被越来越多的企业所认可,如今的Kafka已发展到0 ...

  2. 当Elasticsearch遇见Kafka

    欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文由michelmu发表于云+社区专栏 Elasticsearch作为当前主流的全文检索引擎,除了强大的全文检索能力和高扩展性之外,对多种 ...

  3. Elasticsearch分布式一致性原理剖析(一)-节点篇

    2019独角兽企业重金招聘Python工程师标准>>> 摘要: ES目前是最流行的开源分布式搜索引擎系统,其使用Lucene作为单机存储引擎并提供强大的搜索查询能力.学习其搜索原理, ...

  4. Elasticsearch分布式一致性原理剖析(三)-Data篇

    前言 "Elasticsearch分布式一致性原理剖析"系列将会对Elasticsearch的分布式一致性原理进行详细的剖析,介绍其实现方式.原理以及其存在的问题等(基于6.2版本 ...

  5. 【Kafka】测试Kafka整合Flume

    本文简单测试Kafka整合Flume,从而实现"日志 -> Flume -> Kafka". 操作环境: Kafka版本:1.0.1 Flume版本:1.6.0 测试前 ...

  6. spark第十篇:Spark与Kafka整合

    spark与kafka整合需要引入spark-streaming-kafka.jar,该jar根据kafka版本有2个分支,分别是spark-streaming-kafka-0-8和spark-str ...

  7. 大数据———Flume与Kafka整合

    环境配置 名称 版本 下载地址 Centos 7.0 64x 百度 Flume 1.8.0 http://flume.apache.org/download.html Kafka 2.11 http: ...

  8. Spring和Elasticsearch全文搜索整合详解

    Spring和Elasticsearch全文搜索整合详解 一.概述 ElasticSearch是一个基于Lucene的搜索服务器.它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web ...

  9. 使用Elasticsearch,Kafka和Cassandra构建流式数据中心

    在过去的一年里,我遇到了一些软件公司讨论如何处理应用程序的数据(通常以日志和metrics的形式).在这些讨论中,我经常会听到挫折感,他们不得不用一组零碎的工具,随着时间的推移将这些数据汇总起来.这些 ...

最新文章

  1. 【OpenCV 4开发详解】图像仿射变换
  2. 新能源关键技术预见的研究
  3. 初看Windows Media Center
  4. Filtering microblogging messages for Social TV
  5. TCP三次握手的原理及***手段
  6. C#调用C++(opencv)中图片数据传递的问题
  7. 华景机器人怎么控制_【华景QQ机器人怎么用】华景QQ机器人好不好_使用技巧-ZOL软件百科...
  8. 用java输出图形_java基础-输出一个简单的图形。
  9. 想做好seo优化,关键词的选择可是重中之重!
  10. 独家专访VB100:趋势科技退出缘于新病毒检测失败
  11. FLUKE OTDR光纤测试仪OFP2-100-Q双光纤双向测试的方法
  12. 没有期刊申请清华博士_没有论文,也可申请麻省理工学院博士及奖学金
  13. Redis集群之脑裂:一次奇怪的数据丢失
  14. Detected applied migration not resolved locally:
  15. DOTA数据集应用于Yolo-v4(-Tiny)系列2——使用Pytorch框架的Yolov4(-Tiny)训练与推测
  16. java生成二维码,并在前端展示。
  17. echats 柱状图的点击事件及高亮
  18. 删除所有奇数顺序表c语言,如何删除列表中的所有奇数序数项?
  19. 【soft6星评论】中台只是一种说法,中小企业主们要擦亮眼睛
  20. 关于抽象类说法以下哪些是正确的?

热门文章

  1. 【Linux使用技巧】linux 死机了怎么办
  2. CISCO ACL的匹配数问题
  3. 一旦一个业务可以由一个人来全部完成而不涉及分工,就会产生单干的情况
  4. 使用CSS3实现超炫的Loading(加载)动画效果
  5. mssql,mysql,oracle中查询数据库表的比较
  6. android ant打包
  7. python file_python3之File文件方法
  8. 设计模式 — 行为型模式 — 备忘录模式
  9. 设计模式 — 行为型模式 — 解释器模式
  10. 设计模式 — 行为型模式 — 访问者模式