1.概述

转载:https://www.cnblogs.com/smartloli/p/6978645.html

目前,随着大数据的浪潮,Kafka 被越来越多的企业所认可,如今的Kafka已发展到0.10.x,其优秀的特性也带给我们解决实际业务的方案。对于数据分流来说,既可以分流到离线存储平台(HDFS),离线计算平台(Hive仓库),也可以分流实时流水计算(Storm,Spark)等,同样也可以分流到海量数据查询(HBase),或是及时查询(ElasticSearch)。而今天笔者给大家分享的就是Kafka 分流数据到 ElasticSearch。

2.内容

我们知道,ElasticSearch是有其自己的套件的,简称ELK,即ElasticSearch,Logstash以及Kibana。ElasticSearch负责存储,Logstash负责收集数据来源,Kibana负责可视化数据,分工明确。想要分流Kafka中的消息数据,可以使用Logstash的插件直接消费,但是需要我们编写复杂的过滤条件,和特殊的映射处理,比如系统保留的_uid字段等需要我们额外的转化。今天我们使用另外一种方式来处理数据,使用Kafka的消费API和ES的存储API来处理分流数据。通过编写Kafka消费者,消费对应的业务数据,将消费的数据通过ES存储API,通过创建对应的索引的,存储到ES中。其流程如下图所示:


上图可知,消费收集的数据,通过ES提供的存储接口进行存储。存储的数据,这里我们可以规划,做定时调度。最后,我们可以通过Kibana来可视化ES中的数据,对外提供业务调用接口,进行数据共享。

3.实现

下面,我们开始进行实现细节处理,这里给大家提供实现的核心代码部分,实现代码如下所示:

3.1 定义ES格式

我们以插件的形式进行消费,从Kafka到ES的数据流向,只需要定义插件格式,如下所示:

{"job": {"content": {"reader": {"name": "kafka","parameter": {"topic": "kafka_es_client_error","groupid": "es2","bootstrapServers": "k1:9094,k2:9094,k3:9094"},"threads": 6},"writer": {"name": "es","parameter": {"host": ["es1:9300,es2:9300,es3:9300"],"index": "client_error_%s","type": "client_error"}}}}
}

这里处理消费存储的方式,将读和写的源分开,配置各自属性即可。

3.2 数据存储

这里,我们通过每天建立索引进行存储,便于业务查询,实现细节如下所示:

public class EsProducer {private final static Logger LOG = LoggerFactory.getLogger(EsProducer.class);private final KafkaConsumer<String, String> consumer;private ExecutorService executorService;private Configuration conf = null;private static int counter = 0;public EsProducer() {String root = System.getProperty("user.dir") + "/conf/";String path = SystemConfigUtils.getProperty("kafka.x.plugins.exec.path");conf = Configuration.from(new File(root + path));Properties props = new Properties();props.put("bootstrap.servers", conf.getString("job.content.reader.parameter.bootstrapServers"));props.put("group.id", conf.getString("job.content.reader.parameter.groupid"));props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(conf.getString("job.content.reader.parameter.topic")));}public void execute() {executorService = Executors.newFixedThreadPool(conf.getInt("job.content.reader.threads"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);if (null != records) {executorService.submit(new KafkaConsumerThread(records, consumer));}}}public void shutdown() {try {if (consumer != null) {consumer.close();}if (executorService != null) {executorService.shutdown();}if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {LOG.error("Shutdown kafka consumer thread timeout.");}} catch (InterruptedException ignored) {Thread.currentThread().interrupt();}}class KafkaConsumerThread implements Runnable {private ConsumerRecords<String, String> records;public KafkaConsumerThread(ConsumerRecords<String, String> records, KafkaConsumer<String, String> consumer) {this.records = records;}@Overridepublic void run() {String index = conf.getString("job.content.writer.parameter.index");String type = conf.getString("job.content.writer.parameter.type");for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);for (ConsumerRecord<String, String> record : partitionRecords) {JSONObject json = JSON.parseObject(record.value());List<Map<String, Object>> list = new ArrayList<>();Map<String, Object> map = new HashMap<>();index = String.format(index, CalendarUtils.timeSpan2EsDay(json.getLongValue("_tm") * 1000L));if (counter < 10) {LOG.info("Index : " + index);counter++;}for (String key : json.keySet()) {if ("_uid".equals(key)) {map.put("uid", json.get(key));} else {map.put(key, json.get(key));}list.add(map);}EsUtils.write2Es(index, type, list);}}}}}

这里消费的数据源就处理好了,接下来,开始ES的存储,实现代码如下所示:

public class EsUtils {private static TransportClient client = null;static {if (client == null) {client = new PreBuiltTransportClient(Settings.EMPTY);}String root = System.getProperty("user.dir") + "/conf/";String path = SystemConfigUtils.getProperty("kafka.x.plugins.exec.path");Configuration conf = Configuration.from(new File(root + path));List<Object> hosts = conf.getList("job.content.writer.parameter.host");for (Object object : hosts) {try {client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(object.toString().split(":")[0]), Integer.parseInt(object.toString().split(":")[1])));} catch (Exception e) {e.printStackTrace();}}}public static void write2Es(String index, String type, List<Map<String, Object>> dataSets) {BulkRequestBuilder bulkRequest = client.prepareBulk();for (Map<String, Object> dataSet : dataSets) {bulkRequest.add(client.prepareIndex(index, type).setSource(dataSet));}bulkRequest.execute().actionGet();// if (client != null) {// client.close();// }}public static void close() {if (client != null) {client.close();}}
}

这里,我们利用BulkRequestBuilder进行批量写入,减少频繁写入率。

4.调度

存储在ES中的数据,如果不需要长期存储,比如:我们只需要存储及时查询数据一个月,对于一个月以前的数据需要清除掉。这里,我们可以编写脚本直接使用Crontab来进行简单调用即可,脚本如下所示:

#!/bin/sh
# <Usage>: ./delete_es_by_day.sh kafka_error_client logsdate 30 </Usage>echo "<Usage>: ./delete_es_by_day.sh kafka_error_client logsdate 30 </Usage>"index_name=$1
daycolumn=$2
savedays=$3
format_day=$4if [ ! -n "$savedays" ]; thenecho "Oops. The args is not right,please input again...."exit 1
fiif [ ! -n "$format_day" ]; thenformat_day='%Y%m%d'
fisevendayago=`date -d "-${savedays} day " +${format_day}`curl -XDELETE "es1:9200/${index_name}/_query?pretty" -d "
{"query": {"filtered": {"filter": {"bool": {"must": {"range": {"${daycolumn}": {"from": null,"to": ${sevendayago},"include_lower": true,"include_upper": true}}}}}}}
}"echo "Finished."

然后,在Crontab中进行定时调度即可。

5.总结

这里,我们在进行数据写入ES的时候,需要注意,有些字段是ES保留字段,比如_uid,这里我们需要转化,不然写到ES的时候,会引发冲突导致异常,最终写入失败。

【Kafka】Elasticsearch 与 Kafka 整合剖析相关推荐

  1. 2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明

    目录 Kafka快速回顾 消息队列: 发布/订阅模式: Kafka 重要概念: 常用命令 整合说明 两种方式 两个版本API 在实际项目中,无论使用Storm还是SparkStreaming与Flin ...

  2. 当Elasticsearch遇见Kafka

    欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文由michelmu发表于云+社区专栏 Elasticsearch作为当前主流的全文检索引擎,除了强大的全文检索能力和高扩展性之外,对多种 ...

  3. kafka专题:kafka单机和集群安装详情,Spring Boot如何整合Kafka

    文章目录 1. kafka单机安装 1.1 server.properties核心配置 2. kafka集群安装 2.1 kafka集群可视化界面 3. springboot如何整合kafka 4. ...

  4. 【Kafka】测试Kafka整合Flume

    本文简单测试Kafka整合Flume,从而实现"日志 -> Flume -> Kafka". 操作环境: Kafka版本:1.0.1 Flume版本:1.6.0 测试前 ...

  5. Apache Kafka(七)- Kafka ElasticSearch Comsumer

    Kafka ElasticSearch Consumer 对于Kafka Consumer,我们会写一个例子用于消费Kafka 数据传输到ElasticSearch. 1. 构造ElasticSear ...

  6. filebeat+redis+logstash+elasticsearch filebeat+kafka+zookeeper+logstash+elasticsearch

    收集日志的工具 日志易(收费) splunk(国外,按流量收费) 介绍 发展史:使用java语言,在luncen的基础上做二次封装,提供restful接口 搜索的原理:倒排索引 特点:水平扩展方便.提 ...

  7. linux查看kafka队列消息,Kafka消息队列-从开始到上线

    运行环境 操作系统:Windows 10 : Linux发行版:CentOS Linux release 7.6.1810 (Core) JDK版本:1.8.0_221 说在前面 kafka作为开源的 ...

  8. 【Kafka】从kafka中读取最新数据

    [Kafka]从kafka中读取最新数据 一.死循环无限拉取kafka数据 1.1 整体框架剖析 1.2 测试 二.@KafkaListener注解 实现监听kafka数据 三.参考资料 前情提要:我 ...

  9. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0

    如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...

  10. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装

    一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120master192.168.0.121slave1192.168.0.122 slave2 ...

最新文章

  1. SAP PM 初级系列22 - IW38可以批量处理维修工单
  2. TCP之三次握手四次挥手 1
  3. 树的高度从零还是一开始数_数据结构与算法之1——树与二叉树
  4. sobol敏感性分析 matlab代码
  5. 简单的 php 防注入、防跨站 函数
  6. 前端用Sass实现星级评定效果,简单快捷实现星级切换。
  7. STL源码剖析 map
  8. bool python 运算_python中的布尔操作
  9. linux修改可执行程序,反汇编,修改,然后重新组装一个Linux可执行文件
  10. Netty工作笔记0085---TCP粘包拆包内容梳理
  11. Python 机器学习:多元线性回归
  12. 【操作系统/OS笔记12】同步互斥的三种实现方法:禁用硬件中断、基于软件的解决方案、更高级的抽象
  13. ModalPopupExtender使用技巧( operate ModalPopupExtender by JavaScript)
  14. 推荐一个可以减少开发量50%的插件!
  15. 【大咖有约】58同城孙玄:58同城从MongoDB到MySQL迁移之路
  16. HCIP-Cloud Service Solutions Architect
  17. caxa发生文件读写异常_文件和异常
  18. 一小时看懂Ruby代码基本逻辑(自定义metasploit模块)
  19. 用C语言短除法求最大公因数用,用短除法求最大公因数
  20. cocos2d-x 从win32到android移植的全套解决方案

热门文章

  1. 杨笠代言电脑遭投诉抵制,网友吵翻!英特尔回应了...
  2. 一加9系列全网预约量破200万:3月24日见!
  3. 马斯克称自己可能染上中度新冠肺炎
  4. “富二代”京东健康狂奔,这一次能否赢了阿里?
  5. 网友调侃特斯拉股价要冲向火星 马斯克:疯狂的时代
  6. 组织来了!特斯拉中国车友俱乐部开启官方认证
  7. 会玩!今年天猫双11可以买房了 还是特价 网友:满300减40吗?
  8. 花了10块钱,我在朋友圈成为了富豪...
  9. 星巴克、喜茶们左右围守 瑞幸的大师故事还能讲多久
  10. 货物被偷把沃尔玛逼急了 将在逾1000家门店安装AI相机