kafka实际应用--读取数据,并用java实现业务逻辑“行转列"

  • 一、业务需求
  • 二、业务实现
    • 2.1 kafka中创建topic: event_attendees_raw
    • 2.2 创建 flume 读取数据的配置文件
    • 2.3 flume根目录下执行flume配置文件
    • 2.4 复制 event_attendees.csv 至flume指定读取文件的目录
    • 2.5 查看topic:event_attendees_raw 消息队列数量
    • 2.6 创建一个新的topic,用于存放行转列的数据event_attendees
    • 2.7 java代码实现行转列的业务逻辑
    • 2.8 查看topic:event_attendees 消息队列
    • 2.9 创建消费者查看处理过之后行转列的数据
    • 2.10 将kafka topic:"event_attendees"数据存储到hbase中

一、业务需求

现有一个表数据 event_attendees.csv ,表中截取前5行数据,内容如下:

表头数据:

event yes maybe invited no

代表含义为:

  • event: 一个事件的名称代号,可以理解一个人的身份ID,去参加聚会,其余四个字段代表他去不去参加聚会的状态
  • yes: 同意去
  • maybe: 可能会去
  • invited: 受邀请的
  • no: 不去参加

业务需求: 将每行的事件event ,与其对应的状态一 一对应,在加上相应的状态描述,最后将处理后的数据存入到kafka中

1159822043,1975964455 252302513 4226086795 3805886383 1420484491 3831921392 3973364512,2733420590 517546982 1350834692 532087573 583146976 3079807774 1324909047,1723091036 3795873583 4109144917 3560622906 3106484834 2925436522 2284506787 2484438140 3148037960 2142928184 1682878505 486528429 3474278726 2108616219 3589560411 3637870501 1240238615 1317109108 1225824766 2934840191 2245748965 4059548655 1646990930 2361664293 3134324567 2976828530 766986159 1903653283 3090522859 827508055 140395236 2179473237 1316219101 910840851 1177300918 90902339 4099434853 2056657287 717285491 3384129768 4102613628 681694749 3536183215 1017072761 1059775837 1184903017 434306588 903024682 1971107587 3461437762 196870175 2831104766 766089257 2264643432 2868116197 25717625 595482504 985448353 4089810567 1590796286 3920433273 1826725698 3845833055 1674430344 2364895843 1127212779 481590583 1262260593 899673047 4193404875,3575574655 1077296663

上文展示的是第二行数据,现在想要将数据拆分处理后如下展示:

1159822043,1975964455,yes
1159822043,252302513,yes
1159822043,4226086795,yes
...

event_attendees.csv表数据链接:https://pan.baidu.com/s/1pY8xz0BBMMLhRbo38PXe8Q
提取码:euhn

二、业务实现

2.1 kafka中创建topic: event_attendees_raw

kafka根目录下,创建topic “event_attendees_raw”,用于存放使用 flume 读取 event_attendees.csv 的数据

bin/kafka-topics.sh --create --zookeeper 192.168.206.129:2181 --topic event_attendees_raw --partitions 1 --replication-factor 1

2.2 创建 flume 读取数据的配置文件

vi event_attendees-flume-kafka.conf

编辑内容如下:

event_attendees.sources = eventAttendeesSource
event_attendees.channels = eventAttendeesChannel
event_attendees.sinks = eventAttendeesSinkevent_attendees.sources.eventAttendeesSource.type = spooldir
event_attendees.sources.eventAttendeesSource.spoolDir = /opt/dataFile/flumeFile/event_attendeesevent_attendees.sources.eventAttendeesSource.deserializer = LINE
event_attendees.sources.eventAttendeesSource.deserializer.maxLineLength = 60000
event_attendees.sources.eventAttendeesSource.includePattern = event_attendees_[0-9]{4}-[0-9]{2}-[0-9]{2}.csvevent_attendees.channels.eventAttendeesChannel.type = file
event_attendees.channels.eventAttendeesChannel.checkpointDir = /opt/dataFile/flumeFile/checkpoint/event_attendeesevent_attendees.channels.eventAttendeesChannel.dataDir = /opt/dataFile/flumeFile/data/event_attendeesevent_attendees.sinks.eventAttendeesSink.type = org.apache.flume.sink.kafka.KafkaSink
event_attendees.sinks.eventAttendeesSink.batchSize = 640
event_attendees.sinks.eventAttendeesSink.brokerList = 192.168.206.129:9092
event_attendees.sinks.eventAttendeesSink.topic = event_attendees_rawevent_attendees.sources.eventAttendeesSource.channels = eventAttendeesChannel
event_attendees.sinks.eventAttendeesSink.channel = eventAttendeesChannel

2.3 flume根目录下执行flume配置文件

./bin/flume-ng agent --name event_attendees --conf conf/ --conf-file conf/job/event_attendees-flume-kafka.conf -Dflume.root.logger=INFO,console

2.4 复制 event_attendees.csv 至flume指定读取文件的目录

install event_attendees.csv /opt/dataFile/flumeFile/event_attendees/event_attendees_2020-08-24.csv

若以上都设置正确,此时 flume 即可开始读取数据

2.5 查看topic:event_attendees_raw 消息队列数量

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop-single:9092 --topic event_attendees_raw -time -1 -offsets 1


再查看event_attendees.csv表数据的总行数

wc -l event_attendees.csv


我这里数据略有些偏差,可以忽略不影响,不能相差很大,否则无法保证数据准确性

2.6 创建一个新的topic,用于存放行转列的数据event_attendees

bin/kafka-topics.sh --create --zookeeper 192.168.206.129:2181 --topic event_attendees --partitions 1 --replication-factor 1

2.7 java代码实现行转列的业务逻辑

需添加的Maven依赖:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.0.0</version>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>2.0.0</version>
</dependency>

创建java类 MyEventAttendees ,业务代码内容如下:

package cn.kgc.events;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;public class MyEventAttendees {public static void main(String[] args) {Properties prop = new Properties();prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.206.129:9092");      //ip地址prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"myEventAttendees");     //kafka application config的ID,随便写,执行一次需更改ID,不然有缓存可能会报错prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());     //keyprop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());    //valueStreamsBuilder builder = new StreamsBuilder();//TODO 处理掉首行数据,   !v.toString().startsWith("event,"))//TODO event,yes,maybe,invited,nofinal KStream<Object, Object> event_attendees_raw =builder.stream("event_attendees_raw").filter((k, v) -> (!v.toString().startsWith("event,") &&v.toString().split(",").length == 5));event_attendees_raw.flatMap((k,v) ->{       //  假设 1,2,3,4,5 代表5个字段System.out.println(k + " " + v);     // 展开后为  null     1,2,3,4,5List<KeyValue<String,String>> keyValues = new ArrayList<>();String[] split = v.toString().split(",");//[1,2,3,4,5]   将每一行读取到的字段再进行切割,储存到字符串类型的数组中String event = split[0];  //eventString[] yess = split[1].split(" ");  //yesString[] maybes = split[2].split(" ");  //maybeString[] inviteds = split[3].split(" ");  //invitedString[] nos = split[4].split(" ");     //no//对于[2,3,4,5]四个对象操作for (String yes : yess) {KeyValue<String,String> keyValue =new KeyValue<>(null,event+","+yes+",yes");keyValues.add(keyValue);    //遍历每一个字符串数组,将取出的数据放到外层最大的列表keyValues中}for (String maybe : maybes) {KeyValue<String,String> keyValue =new KeyValue<>(null,event+","+maybe+",maybe");keyValues.add(keyValue);}for (String invited : inviteds) {KeyValue<String,String> keyValue =new KeyValue<>(null,event+","+invited+",invited");keyValues.add(keyValue);}for (String no : nos) {KeyValue<String,String> keyValue =new KeyValue<>(null,event+","+no+",no");keyValues.add(keyValue);}return keyValues;   //最后返回这个列表}).to("event_attendees");   //将处理过的数据储存到 topic: "event_attendees" 中//拓步结构Topology topo = builder.build();KafkaStreams streams = new KafkaStreams(topo, prop);CountDownLatch countDownLatch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("test01"){   //多线程运行@Overridepublic void run() {streams.close();countDownLatch.countDown();}});try {streams.start();countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.exit(0);     //执行完成退出}
}

2.8 查看topic:event_attendees 消息队列

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop-single:9092 --topic event_attendees -time -1 -offsets 1

显示如下信息表示数据存储成功

2.9 创建消费者查看处理过之后行转列的数据

kafka-console-consumer.sh --bootstrap-server 192.168.206.129:9092 --topic event_attendees --from-beginning

查询到数据结构如图所示说明数据处理成功(部分截图):

2.10 将kafka topic:"event_attendees"数据存储到hbase中

①hbase中创建命名空间:

create_namespace 'events_db'

②以该命名空间建表:

create 'events_db:event_attendee', 'euat'

③通过java API将kafka中数据导入habse:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;/*** @ClassName: EventAttendeeshb* @Description: TODO  将kafka topic event_attendees数据导入到 hbase  events_db:event_attendee 表中* TODO 在hbse中建表   create_namespace 'events_db'*                    create 'events_db:event_attendee', 'euat'*     event  yes  maybe  invited  no*       0    1      2      3      4*   rowkey:calculated hash-code of event_id + user_id*  euat:  event_id, user_id, attend_type* @author: 我玩的很开心* @date: 2020/9/7  15:15*/public class EventAttendeeshb {public static void main(String[] args) {//kafkaProperties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.206.129:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");prop.put(ConsumerConfig.GROUP_ID_CONFIG, "users123456");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);consumer.subscribe(Collections.singletonList("event_attendees"));//hbaseConfiguration config = HBaseConfiguration.create();config.set("hbase.rootdir","hdfs://192.168.206.129:9000/hbase");config.set("hbase.zookeeper.quorum","192.168.206.129");config.set("hbase.zookeeper.property.clientPort","2181");try {Connection connection = ConnectionFactory.createConnection(config);Table table = connection.getTable(TableName.valueOf("events_db:event_attendee"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);List<Put> putList = new ArrayList<>();for (ConsumerRecord<String, String> record : records) {System.out.println(record);String[] infos = record.value().split(",",-1);if(infos.length == 3){Put put = new Put(Bytes.toBytes((infos[0]+infos[1]).hashCode()));if(infos[0] != null){put.addColumn("euat".getBytes(),"event_id".getBytes(),infos[0].getBytes());}else {put.addColumn("euat".getBytes(),"event_id".getBytes(),null);}if(infos[1] != null){put.addColumn("euat".getBytes(),"user_id".getBytes(),infos[1].getBytes());}else{put.addColumn("euat".getBytes(),"user_id".getBytes(),null);}if(infos[2] != null){put.addColumn("euat".getBytes(),"attend_type".getBytes(),infos[2].getBytes());}else{put.addColumn("euat".getBytes(),"attend_type".getBytes(),null);}putList.add(put);}}table.put(putList);table.close();}} catch (IOException e) {e.printStackTrace();}}}

④查看hbse中 events_db:event_attendee 表数据:

scan 'events_db:event_attendee'

kafka实际应用—>读取数据,并用java实现业务逻辑“行转列”相关推荐

  1. 分层:数据访问层、业务逻辑层、视图层

    分层:开发模式     数据访问层 业务逻辑层:调用数据访问层 视图层:调用业务逻辑层 数据库表 1.创建项目 2.创建包:     com.zking.util         com.zking. ...

  2. ASP.NET2.0数据操作之创建业务逻辑层

    导言 本教程的第一节所描述的数据访问层(Data Access Layer,以下简称为DAL)已经清晰地将表示逻辑与数据访问逻辑区分开了.不过,即使DAL将数据访问的细节从表示层中分离出来了,可它却不 ...

  3. Java 的业务逻辑验证框架 fluent-validator

    背景 在互联网行业中,基于 Java 开发的业务类系统,不管是服务端还是客户端,业务逻辑代码的更新往往是非常频繁的,这源于功能的快速迭代特性.在一般公司内部,特别是使用 Java web 技术构建的平 ...

  4. 从Oracle读取数据并用python处理过程记录(构建BARRA因子遇到的问题)

    第一次用pycharm运行python语句,第一次从Oracle读数据,出现了无数的bug..把过程记录一下以提升效率 1.数据读取并处理成dataframe格式:用逐行查询的方式 import cx ...

  5. java多线程按行读取文件_“java”中多线程按行读取txt且每个线程读的内容不能重复,这么求“demo”?...

    展开全部 你把原来程序中直接读的地62616964757a686964616fe4b893e5b19e31333365646234方,改成调用上面的函数,由该函数统一读行.这样,不管是你有 N 个线程 ...

  6. python读取excel【二】,循环行与列对应数据

    1. 我们需要取到数据如下: 年龄: 19 身高: 152cm 所对应的体重信息,组成一组信息 年龄为:19 身高是 152 cm的人的标准体重是46kg 年龄为:19 身高 153cm 的人的标准体 ...

  7. python某行某列读取数据_使用scrpython从某行的第一列提取数据

    在您的情况下,解决方案是:td:nth-child(1) Selects every element that is the first child of its parent >>> ...

  8. 实用 | 从Apache Kafka到Apache Spark安全读取数据

    引言 随着在CDH平台上物联网(IoT)使用案例的不断增加,针对这些工作负载的安全性显得至关重要.本篇博文对如何以安全的方式在Spark中使用来自Kafka的数据,以及针对物联网(IoT)使用案例的两 ...

  9. java怎么读取数据?

    Java属于入门容易,天花板却极高的编程语言.java怎么读取数据?对于java工程师来说技术的不断发展,需要不断学习java进阶知识.为了帮助大家巩固基础,本文解答了java怎么读取数据?等相关问题 ...

最新文章

  1. 揭示生命的奥秘——生物信息学
  2. OFDM专题之输入的复信号从何而来?
  3. OSPF中 hello报文的 内容
  4. Cocos2d-x列表嵌套裁剪bug
  5. Android --- 怎么设置 EditText 控件中光标默认位置,当 EditText 里有文字的时候,光标跑到了最前面
  6. django2.2连接mysql遇到的坑(亲测)
  7. 手工收集awr报告_oracle手工生成AWR报告方法记录
  8. 点云上的卷积神经网络及其部分应用
  9. 小米武大共建人工智能实验室,先期提供1000万研发经费
  10. python求解典型相关系数_三大相关系数: pearson, spearman, kendall(python示例实现)...
  11. python threading join_浅谈Python中threading join和setDaemon用法及区别说明
  12. 使用airdrop在iphone与ubuntu之间共享文件
  13. 软件开发人员如何做出好看的UI界面
  14. ROMS简单应用——绘制区域温度图
  15. 英特尔400系列服务器芯片组,驱动支持列表再立功 Intel 400系列芯片组曝光
  16. 青云mysql_青云分布式数据库RadonDB 深度兼容MySQL
  17. 【刷爆LeetCode】七月算法集训(14)栈
  18. 菁英杯计算机能力大赛试题,“第四届全国大学生计算机系统能力培养大赛(龙芯杯)”决赛结果...
  19. django项目 网易云音乐
  20. mediasoup json 通信协议2--room.js和router.cpp信令

热门文章

  1. 数据可视化分析教学课件——FineBI实验册节选====库存与账款分析
  2. Lattice CrossLink NX系列FPGA使用分享
  3. 安装matlab201*版本过程中文转英文(附matlab英文版安装教程)
  4. 温度压力测试软件什么好,温度压力测试
  5. 软考中级哪个通过率高且简单?
  6. 现代opengl 设计 3D模型文件导入显示
  7. ipadpro玩吃鸡用什么耳机,适合玩游戏用的低延迟蓝牙耳机推荐
  8. EndNote20如何下载并安装中文参考文献格式GBT7714
  9. Linux之虚拟机修改时间
  10. STM32心率滤波器实现