apache druid 与kafka整合使用
前言
在上一篇,我们了解了apache druid的搭建,以及如何快速导入外部数据源到apache druid中进行数据分析和使用
本篇,我们结合一个实际的简单的应用场景,来说说apache druid如何在实际项目中进行使用
业务场景
如下所示,是一个很常见的数据分析的业务,通常来说,很多实时或准实时的数据(这里理解为外部数据源)需要通过kafka进行中转,即发送到kafka中,
apache druid提供了导入外部数据源的功能,可以接收来自kafka指定topic的数据,然后支持数据分析,将kafka的数据导入到apache druid之后,再通过程序(后台应用)进行数据读取,根据实际的业务需求读取从kafka中摄取的数据进行逻辑处理
最后,应用程序将数据处理之后,进行写库,或者作为大屏展示的数据进行输出
以此为基础,可以将这个过程应用到很多与之相关的场景中,比如源数据是来自大数据引擎的处理结果,或者是python程序爬虫得到的结果…
下面我们来对这个过程从操作到代码实现做一个完整的演示
前置准备
- docker或者linux环境下搭建的zookeeper和kafka,并提前创建一个topic
- 启动apache druid服务
做一下kafaka的数据测试,验证topic可以正常的收发消息
1、apache druid 控制台连接kafka
loada data 选择kafka
填写kafka的连接信息即可
然后一路next等待解析,解析完毕,通过顶部的query查看左侧是否出现下面的这个自定义的库名
上面的意思是,将kafka中某个topic中的数据解析到apache druid的库中,然后就可以通过apache druid对导入的数据进行管理和分析了
我们不妨使用sql查询一下,可以看到刚刚我们做测试的数据都展示出来了
2、编写程序,定时向kafka推送消息
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;public class KfkaTest {public static void main(String[] args) {AtomicLong atomicLong = new AtomicLong(1);Runnable runnable = new Runnable() {public void run() {//定时向kafka推送消息long l = atomicLong.incrementAndGet();pushMessage(l);}};ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();// 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间service.scheduleAtFixedRate(runnable, 10, 1, TimeUnit.SECONDS);}public static void pushMessage(long num) {Properties properties = new Properties();properties.put("bootstrap.servers", "IP:9092");properties.put("acks", "all");properties.put("retries", "3");properties.put("batch.size", "16384");properties.put("linger.ms", 1);properties.put("buffer.memory", 33554432);//key和value的序列化properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//构造生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);ObjectMapper objectMapper = new ObjectMapper();Map<String, Object> map = new HashMap<>();map.put("name", "gaoliang:" + num);map.put("age", 19);map.put("city", "深圳");String val = null;try {val = objectMapper.writeValueAsString(map);System.out.println(val);} catch (JsonProcessingException e) {e.printStackTrace();}producer.send(new ProducerRecord<>("study1", "congge ", val));//关闭连接资源producer.close();}}
3、通过程序读取apache druid 的数据
关于这一点,方式就很灵活了,是将读取到的数据做何种处理呢?那就要看业务的具体需求了,比如可以直接通过接口将读取到的最新数据返回给页面做展示呢?还是将数据进行逻辑处理之后入库呢?还是交给其他的服务进一步使用呢?通常来说,进行读取之后,写库和展示的应用场景比较多
下面来演示下,如何在程序中读取apache druid的数据,想必这个是大家关心的
直接在pom文件中添加如下依赖
<dependency><groupId>org.apache.calcite.avatica</groupId><artifactId>avatica-core</artifactId><version>1.15.0</version></dependency>
apache druid官方提供了jdbc的方式对数据进行查询的连接方式,下面直接上代码了
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;public class DruidTest {private static final String DRUID_URL = "jdbc:avatica:remote:url=http://IP:8888/druid/v2/sql/avatica/";private static ThreadLocal<AvaticaConnection> threadLocal = new ThreadLocal<>();/*** 打开连接* @param* @return* @throws SQLException*/public static AvaticaConnection connection() throws SQLException {Properties properties = new Properties();AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(DRUID_URL, properties);threadLocal.set(connection);return connection;}/*** 关闭连接* @throws SQLException*/public static void closeConnection() throws SQLException{System.out.println("关闭线程:"+threadLocal.get());AvaticaConnection conn = threadLocal.get();if(conn != null){conn.close();threadLocal.remove();}}/*** 根据sql查询结果* @param* @param sql* @return* @throws SQLException*/public static ResultSet executeQuery (String sql) throws SQLException{AvaticaStatement statement = connection().createStatement();ResultSet resultSet = statement.executeQuery(sql);return resultSet;}public static void main(String[] args) {try {String sql = "SELECT * FROM \"study1\" limit 10";for (int i = 0; i < 5; i++) {ResultSet resultSet = executeQuery(sql);System.out.println("开始连接"+i + "; 连接线程:"+threadLocal.get());while(resultSet.next()){String name = resultSet.getString("name");System.out.println(name + " ; "+ name);}closeConnection();}} catch (SQLException throwables) {throwables.printStackTrace();}}}
这时候不妨往kafka的study1的topic中再推一条消息
界面上查询,可以看到,数据已经过来了
再次运行程序,也能成功读取到
以上,通过程序结合控制台讲述了如何利用java程序连接kafka和apache druid的一种简单的业务场景,本篇的处理较为简单,未涉及到具体的功能层面的整合,主要是为使用apache druid进行进一步的深入使用做一个铺垫,希望对看到的同学有用!
apache druid 与kafka整合使用相关推荐
- 【Druid】(八)Apache Druid 核心插件 Kafka Indexing Service SLS Indexing Service
文章目录 一.前言 二.与 Kafka 集群交互 三.使用 Apache Druid Kafka Indexing Service 实时消费 Kafka 数据 四.关于 SLS Indexing Se ...
- 如何使用 Druid 和 Kafka 构造 Kappa 架构完成流量分析
NTT 是一家全球电信公司,总部设在日本东京.在<财富>世界 500 强中,NTT 是世界第四大电信公司.NTT 通信 (NTT Com) 是 NTT 的子公司,其全球 IP 网络 (GI ...
- Apache Druid(一)简介
翻译自 Apache Druid Apache Druid(正在孵化)是一个开源的分布式数据存储.德鲁伊的核心设计结合了OLAP /分析数据库,时间序列数据库和搜索系统的思想,为广泛的用例创建了一个统 ...
- Flume与Kafka整合案例详解
环境配置 名称 版本 下载地址 Centos 7.0 64x 百度 Zookeeper 3.4.5 Flume 1.6.0 Kafka 2.1.0 flume笔记 直接贴配置文件 [roo ...
- 【Kafka】测试Kafka整合Flume
本文简单测试Kafka整合Flume,从而实现"日志 -> Flume -> Kafka". 操作环境: Kafka版本:1.0.1 Flume版本:1.6.0 测试前 ...
- spark第十篇:Spark与Kafka整合
spark与kafka整合需要引入spark-streaming-kafka.jar,该jar根据kafka版本有2个分支,分别是spark-streaming-kafka-0-8和spark-str ...
- 大数据———Flume与Kafka整合
环境配置 名称 版本 下载地址 Centos 7.0 64x 百度 Flume 1.8.0 http://flume.apache.org/download.html Kafka 2.11 http: ...
- 【Druid】(四)Apache Druid 部署和配置(单机版 / Docker 容器版 / Kubernetes 集群版)
文章目录 一.Apache Druid 部署 1.1 单机版 1.1.1 Jar 包下载 1.1.2 Druid 的安装部署 1.2 Docker 容器版 1.2.1 下载 1.2.2 配置 Dock ...
- 基于 Apache Druid 的实时分析平台在爱奇艺的实践
- 导读 - 最近几年大数据技术在各行各业得到广泛应用,为企业的运营决策和各种业务提供支持.随着数据的增长,业务对数据时效性的要求,给企业的大数据分析带来了巨大挑战.针对海量数据的实时分析需求,近年来 ...
最新文章
- 谷歌无人车之父刚推出的“无人驾驶入门”课,到底能学到啥?
- 在mysql中创建表的命令行_如何在命令行创建一个MySQL数据库
- Java反射在JVM的实现
- 计算机usb2.0失效,Win10电脑USB2.0-CRW没有驱动程序的解决方法
- android实现推送方式解决方案,Android实现推送方式解决方案系列教程
- 关于Jquery EasyUI中的DataGrid服务器端分页随记
- ltp-ddt的makefile结构
- [msi]获取msi安装包的ProductCode
- java day57【 Spring 概述 、 IoC 的概念和作用、使用 spring 的 IOC 解决程序耦合 】...
- 20200816每日一句
- Ubuntu安装微软Onedrive教程
- 鸿蒙生死印是谁的,逆天邪神:鸿蒙印的器灵还存在,或许云澈将知道些关于远古的秘密...
- Web漏洞-Xss跨站
- 配置 Eureka Server 集群
- 中国丙烯酸酯橡胶行业研究与投资预测报告(2022版)
- Postgresql进程卡住无法退出原因和解决方法
- Android混淆技术综述
- netty tcp空闲设置
- ...startWebLogic.sh: line 202:21293 已杀死
- Unity Joystick手势操作