前言

在上一篇,我们了解了apache druid的搭建,以及如何快速导入外部数据源到apache druid中进行数据分析和使用

本篇,我们结合一个实际的简单的应用场景,来说说apache druid如何在实际项目中进行使用

业务场景

如下所示,是一个很常见的数据分析的业务,通常来说,很多实时或准实时的数据(这里理解为外部数据源)需要通过kafka进行中转,即发送到kafka中,

apache druid提供了导入外部数据源的功能,可以接收来自kafka指定topic的数据,然后支持数据分析,将kafka的数据导入到apache druid之后,再通过程序(后台应用)进行数据读取,根据实际的业务需求读取从kafka中摄取的数据进行逻辑处理

最后,应用程序将数据处理之后,进行写库,或者作为大屏展示的数据进行输出

以此为基础,可以将这个过程应用到很多与之相关的场景中,比如源数据是来自大数据引擎的处理结果,或者是python程序爬虫得到的结果…

下面我们来对这个过程从操作到代码实现做一个完整的演示

前置准备

  • docker或者linux环境下搭建的zookeeper和kafka,并提前创建一个topic
  • 启动apache druid服务

做一下kafaka的数据测试,验证topic可以正常的收发消息

1、apache druid 控制台连接kafka

loada data 选择kafka

填写kafka的连接信息即可

然后一路next等待解析,解析完毕,通过顶部的query查看左侧是否出现下面的这个自定义的库名

上面的意思是,将kafka中某个topic中的数据解析到apache druid的库中,然后就可以通过apache druid对导入的数据进行管理和分析了

我们不妨使用sql查询一下,可以看到刚刚我们做测试的数据都展示出来了

2、编写程序,定时向kafka推送消息

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;public class KfkaTest {public static void main(String[] args) {AtomicLong atomicLong = new AtomicLong(1);Runnable runnable = new Runnable() {public void run() {//定时向kafka推送消息long l = atomicLong.incrementAndGet();pushMessage(l);}};ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();// 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间service.scheduleAtFixedRate(runnable, 10, 1, TimeUnit.SECONDS);}public static void pushMessage(long num) {Properties properties = new Properties();properties.put("bootstrap.servers", "IP:9092");properties.put("acks", "all");properties.put("retries", "3");properties.put("batch.size", "16384");properties.put("linger.ms", 1);properties.put("buffer.memory", 33554432);//key和value的序列化properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//构造生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);ObjectMapper objectMapper = new ObjectMapper();Map<String, Object> map = new HashMap<>();map.put("name", "gaoliang:" + num);map.put("age", 19);map.put("city", "深圳");String val = null;try {val = objectMapper.writeValueAsString(map);System.out.println(val);} catch (JsonProcessingException e) {e.printStackTrace();}producer.send(new ProducerRecord<>("study1", "congge ", val));//关闭连接资源producer.close();}}

3、通过程序读取apache druid 的数据

关于这一点,方式就很灵活了,是将读取到的数据做何种处理呢?那就要看业务的具体需求了,比如可以直接通过接口将读取到的最新数据返回给页面做展示呢?还是将数据进行逻辑处理之后入库呢?还是交给其他的服务进一步使用呢?通常来说,进行读取之后,写库和展示的应用场景比较多

下面来演示下,如何在程序中读取apache druid的数据,想必这个是大家关心的

直接在pom文件中添加如下依赖

     <dependency><groupId>org.apache.calcite.avatica</groupId><artifactId>avatica-core</artifactId><version>1.15.0</version></dependency>

apache druid官方提供了jdbc的方式对数据进行查询的连接方式,下面直接上代码了

import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;public class DruidTest {private static final String DRUID_URL = "jdbc:avatica:remote:url=http://IP:8888/druid/v2/sql/avatica/";private static ThreadLocal<AvaticaConnection> threadLocal = new ThreadLocal<>();/*** 打开连接* @param* @return* @throws SQLException*/public static AvaticaConnection connection() throws SQLException {Properties properties = new Properties();AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(DRUID_URL, properties);threadLocal.set(connection);return connection;}/*** 关闭连接* @throws SQLException*/public static void closeConnection() throws SQLException{System.out.println("关闭线程:"+threadLocal.get());AvaticaConnection conn = threadLocal.get();if(conn != null){conn.close();threadLocal.remove();}}/*** 根据sql查询结果* @param* @param sql* @return* @throws SQLException*/public static ResultSet executeQuery (String sql) throws SQLException{AvaticaStatement statement = connection().createStatement();ResultSet resultSet = statement.executeQuery(sql);return resultSet;}public static void main(String[] args) {try {String sql = "SELECT * FROM \"study1\" limit 10";for (int i = 0; i < 5; i++) {ResultSet resultSet = executeQuery(sql);System.out.println("开始连接"+i + ";   连接线程:"+threadLocal.get());while(resultSet.next()){String name = resultSet.getString("name");System.out.println(name + "   ;   "+ name);}closeConnection();}} catch (SQLException throwables) {throwables.printStackTrace();}}}

这时候不妨往kafka的study1的topic中再推一条消息

界面上查询,可以看到,数据已经过来了

再次运行程序,也能成功读取到

以上,通过程序结合控制台讲述了如何利用java程序连接kafka和apache druid的一种简单的业务场景,本篇的处理较为简单,未涉及到具体的功能层面的整合,主要是为使用apache druid进行进一步的深入使用做一个铺垫,希望对看到的同学有用!

apache druid 与kafka整合使用相关推荐

  1. 【Druid】(八)Apache Druid 核心插件 Kafka Indexing Service SLS Indexing Service

    文章目录 一.前言 二.与 Kafka 集群交互 三.使用 Apache Druid Kafka Indexing Service 实时消费 Kafka 数据 四.关于 SLS Indexing Se ...

  2. 如何使用 Druid 和 Kafka 构造 Kappa 架构完成流量分析

    NTT 是一家全球电信公司,总部设在日本东京.在<财富>世界 500 强中,NTT 是世界第四大电信公司.NTT 通信 (NTT Com) 是 NTT 的子公司,其全球 IP 网络 (GI ...

  3. Apache Druid(一)简介

    翻译自 Apache Druid Apache Druid(正在孵化)是一个开源的分布式数据存储.德鲁伊的核心设计结合了OLAP /分析数据库,时间序列数据库和搜索系统的思想,为广泛的用例创建了一个统 ...

  4. Flume与Kafka整合案例详解

    环境配置 名称 版本 下载地址 Centos 7.0 64x 百度 Zookeeper 3.4.5   Flume 1.6.0   Kafka 2.1.0   flume笔记 直接贴配置文件 [roo ...

  5. 【Kafka】测试Kafka整合Flume

    本文简单测试Kafka整合Flume,从而实现"日志 -> Flume -> Kafka". 操作环境: Kafka版本:1.0.1 Flume版本:1.6.0 测试前 ...

  6. spark第十篇:Spark与Kafka整合

    spark与kafka整合需要引入spark-streaming-kafka.jar,该jar根据kafka版本有2个分支,分别是spark-streaming-kafka-0-8和spark-str ...

  7. 大数据———Flume与Kafka整合

    环境配置 名称 版本 下载地址 Centos 7.0 64x 百度 Flume 1.8.0 http://flume.apache.org/download.html Kafka 2.11 http: ...

  8. 【Druid】(四)Apache Druid 部署和配置(单机版 / Docker 容器版 / Kubernetes 集群版)

    文章目录 一.Apache Druid 部署 1.1 单机版 1.1.1 Jar 包下载 1.1.2 Druid 的安装部署 1.2 Docker 容器版 1.2.1 下载 1.2.2 配置 Dock ...

  9. 基于 Apache Druid 的实时分析平台在爱奇艺的实践

    - 导读 - 最近几年大数据技术在各行各业得到广泛应用,为企业的运营决策和各种业务提供支持.随着数据的增长,业务对数据时效性的要求,给企业的大数据分析带来了巨大挑战.针对海量数据的实时分析需求,近年来 ...

最新文章

  1. 谷歌无人车之父刚推出的“无人驾驶入门”课,到底能学到啥?
  2. 在mysql中创建表的命令行_如何在命令行创建一个MySQL数据库
  3. Java反射在JVM的实现
  4. 计算机usb2.0失效,Win10电脑USB2.0-CRW没有驱动程序的解决方法
  5. android实现推送方式解决方案,Android实现推送方式解决方案系列教程
  6. 关于Jquery EasyUI中的DataGrid服务器端分页随记
  7. ltp-ddt的makefile结构
  8. [msi]获取msi安装包的ProductCode
  9. java day57【 Spring 概述 、 IoC 的概念和作用、使用 spring 的 IOC 解决程序耦合 】...
  10. 20200816每日一句
  11. Ubuntu安装微软Onedrive教程
  12. 鸿蒙生死印是谁的,逆天邪神:鸿蒙印的器灵还存在,或许云澈将知道些关于远古的秘密...
  13. Web漏洞-Xss跨站
  14. 配置 Eureka Server 集群
  15. 中国丙烯酸酯橡胶行业研究与投资预测报告(2022版)
  16. Postgresql进程卡住无法退出原因和解决方法
  17. Android混淆技术综述
  18. netty tcp空闲设置
  19. ...startWebLogic.sh: line 202:21293 已杀死
  20. Unity Joystick手势操作

热门文章

  1. 脆弱的是生命 不脆弱的是精神 雅安 挺住!
  2. 25+ 个单色背景的网页设计实例
  3. 网络中的最基本的服务器DNS的相关知识的介绍
  4. 如何妥善处理WebBrowser对Javascript的错误问题,阻止JS弹出框,提高用户体验(原创)...
  5. 问答一:回答高中生关于前端的疑问
  6. 四个修改Docker默认存储位置的方法
  7. Lua学习笔记5:类及继承的实现
  8. 如何手动实现C语言中的字符串操作
  9. WF4 常用类第二篇
  10. sink的简历(2011-6-20),寻工作一份