kafka 四:(设计模式)Kafka数据上传至Hbase
kafka 四:(设计模式)Kafka数据上传至Hbase
- datamove目录
- AttendeesHandler类
- IParseHandler接口
- UserFriendHandler类
- work
- EventAttendeesWorker类
- IWriter接口
- KafkaParentWorker类
- writer
- HBaseWriter类
- IWriter接口
工程所需的依赖和建立maven工程请参考同栏目的其它博客
详细的代码请免积分下载,连接如下:
xxxxxxxxxxxxxxxxxxx
datamove目录
AttendeesHandler类
package nj.zb.kb05.hbase;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;import javax.rmi.PortableRemoteObject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;/*** 将 kafka中 topic event_attendees 数据导入到 hbase event_attendee表中* @author* @Des* @date 2020/5/27*/
public class EventAttendeeshb {public static void main(String[] args) {// HDFS基础配置Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.153.141:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");prop.put(ConsumerConfig.GROUP_ID_CONFIG,"eventattendees1");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);consumer.subscribe(Collections.singletonList("event_attendees"));// HBASE基础配置final Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum","192.168.153.141");conf.set("hbase.zookeeper.property.clientPort","2181");conf.set("hbase.rootdir","hdfs://192.168.153.141:9000/hbase");try {final Connection connection = ConnectionFactory.createConnection(conf);final Table table = connection.getTable(TableName.valueOf("events_db:event_attendee"));while (true){List<Put> datas = new ArrayList<>();final ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord record : records) {System.out.println(record.value());final String[] info = record.value().toString().split(",");Put put = new Put(Bytes.toBytes((info[0]+info[1]+info[2]).hashCode()));put.addColumn("euat".getBytes(),"eventid".getBytes(),info[0].getBytes());put.addColumn("euat".getBytes(),"friendid".getBytes(),info[1].getBytes());put.addColumn("euat".getBytes(),"stat".getBytes(),info[2].getBytes());datas.add(put);}
// consumer.commitAsync();table.put(datas);
// table.close();}} catch (IOException e) {e.printStackTrace();}}
}
IParseHandler接口
package nj.zb.kb05.designer.datamove;import org.apache.hadoop.hbase.client.Put;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.List;// 负责处理数据,将数据转换成List<Put>形式
public interface IParseHandler {public List<Put> parse(ConsumerRecords<String, String> records);
}
UserFriendHandler类
package nj.zb.kb05.designer.datamove;import org.apache.hadoop.hbase.client.Put;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.List;/*** @author* @Des* @date 2020/5/28*/
public class UuserFriendHandler implements IParseHandler {@Overridepublic List<Put> parse(ConsumerRecords<String, String> records) {return null;}
}
work
EventAttendeesWorker类
package nj.zb.kb05.designer.work;import nj.zb.kb05.designer.writer.IWriter;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Collections;/*** @author* @Des* @date 2020/5/28*/
public class EventAttendeesWorker extends KafkaParentWorker {private IWriter writer;public EventAttendeesWorker(String topic, long time, String groupId, IWriter writer) {super(topic, time, groupId);this.writer = writer;}public EventAttendeesWorker(String topic,IWriter writer) {super(topic);this.writer = writer;}@Overridepublic void pushData(String tableName) {KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(super.prop);consumer.subscribe(Collections.singletonList(this.getTopicName()));while (true){final ConsumerRecords<String, String> records = consumer.poll(this.getPoolTime());writer.write(records,tableName);}}
}
IWriter接口
package nj.zb.kb05.designer.work;public interface IWorker {public void pushData(String tableName);
}
KafkaParentWorker类
package nj.zb.kb05.designer.work;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;/*** @author* @Des* @date 2020/5/28*/
public abstract class KafkaParentWorker implements IWorker {// HDFS基础配置protected Properties prop = new Properties();private String topicName =null;private long poolTime = 0;public KafkaParentWorker(String topic, long time, String groupId){prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.153.141:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");prop.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);this.topicName = topic;this.poolTime = time;}public KafkaParentWorker(String topic){this(topic, 100, "grout1");}public String getTopicName() {return topicName;}public void setTopicName(String topicName) {this.topicName = topicName;}public long getPoolTime() {return poolTime;}public void setPoolTime(long poolTime) {this.poolTime = poolTime;}
}
writer
HBaseWriter类
package nj.zb.kb05.designer.writer;import nj.zb.kb05.designer.datamove.IParseHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;/*** @author* @Des* @date 2020/5/28*/
public class HBaseWriter implements IWriter {private IParseHandler parseHandler;private Connection connection;public HBaseWriter(IParseHandler handler){this.parseHandler = handler;Configuration config = HBaseConfiguration.create();config.set("hbase.rootdir","hdfs://192.168.153.141:9000/hbase");config.set("hbase.zookeeper.quorum","192.168.153.141");config.set("hbase.zookeeper.property.clientPort","2181");try {Connection connection = ConnectionFactory.createConnection(config);} catch (IOException e) {e.printStackTrace();}}@Overridepublic int write(ConsumerRecords<String, String> records, String tableName) {try {final Table table = connection.getTable(TableName.valueOf(tableName) );// 下午好好思考,如果,tablename有很多个的时候,使用 if elseif else 结构是否合适List<Put> datas = parseHandler.parse(records);
// table.put(datas);table.close();} catch (IOException e) {e.printStackTrace();}return 0;}
}
IWriter接口
package nj.zb.kb05.designer.writer;import org.apache.kafka.clients.consumer.ConsumerRecords;/*** @author* @Des* @date 2020/5/28*/
public interface IWriter {public int write(ConsumerRecords<String, String> records, String tableName);
}
kafka 四:(设计模式)Kafka数据上传至Hbase相关推荐
- STM32--ESP8266物联网WIFI模块(贝壳物联)--温湿度数据上传服务器显示
本文适用于STM32F103C8T6等MCU,其他MCU可以移植,完整资源见文末链接 一.简介 随着移动物联网的发展,各场景下对于物联控制.数据上传.远程控制的诉求也越来越多,基于此乐鑫科技推出了便宜 ...
- 【AllJoyn专题】基于AllJoyn和Yeelink的传感器数据上传与指令下行的研究
接触高通物联网框架AllJoyn不太久,但确是被深深地吸引了.在我看来,促进我深入学习的原因有三点:一.AllJoyn开源,对开源的软硬件总会有种莫名的喜爱,虽然或许不会都深入下去:二.顺应潮流,物联 ...
- m5310模组数据上传至onenet_硬核干货!基于M5310-A的NB-IoT水表通信模块软件业务逻辑分享...
根据不同的应用场景需求,目前NB-IoT水表主要有以下几种方案: 图1 几种常见NB水表方案 接下来将从NB-IoT水表上电开机.模组初始化.入网判断.业务逻辑四个环节来详细讲述,以下业务流程仅供参考 ...
- # nest笔记四:文件的上传与下载
nest笔记四:文件的上传与下载 nest是基于express之上的,所以,其文件上传和下载的功能,实际上就是express的功能. 下载 文件下载有两种,一个是sendFile,一个是downloa ...
- 【Hadoop】HDFS操作、数据上传与下载原理解析、高级特性及底层原理
HDFS操作.数据上传与下载原理解析.高级特性及底层原理 1 HDFS操作 1.1 Web Console网页工具 1.2 命令行 1.2.1 普通的操作命令 1.2.2 管理员命令 1.3 Java ...
- NodeMCU(ESP-12E)+阿里云实现数据上传和控制继电器开锁
源码及工具下载:https://github.com/RL-Y/NodeMCU-aliyun.git Arduino15:提取码:ythf :链接: https://pan.baidu.com/s/1 ...
- 通过GPRS将GPS数据上传到服务器
文章目录 一.目的 二.使用的器件 1. GPRS模块和物联网卡 2. GPS模块 3. MCU 三.电路连接 四.程序设计 五.程序代码 一.目的 将GPS获取到的位置信息,通过GPRS将数据上 ...
- 正点原子STM32f103ZE精英开发板实现基于ESP8266 WIFI模块温湿度数据上传至乐联网平台
文章目录 一.准备工作 二.实现流程 1.AT指令 2.接入乐联网平台 3.代码实现 三.数据可视化分析 一.准备工作 1.准备一块正点原子STM32f103ZE精英开发板 2.在某宝上购买好正点原子 ...
- 新大陆物联网-Android实现网关功能-连接云平台并上传传感器数据-获取执行器指令并执行-Android网关开发-通信-数据上传云平台-JAVA原理讲解-免费云平台使用-竞赛2022国赛真题
目录 一.任务要求 二.开发环境 三.网关上线 四.数据上传与命令下发 五.JSON命令解析思路 六.总结 一.任务要求 我们将要实现的效果是:Android开发平板与Lora板进行有线串口通信,解析 ...
最新文章
- Linux下对MySQL数据库的常见操作【创建】【删除】【导入数据库】
- 分针网——Javascript不同浏览器差异及兼容方法
- 遗传算法与直接搜索工具箱学习笔记 -----从直接搜索算法开始
- spark 中的RDD编程:基于Java api
- matlab 刻度非均匀控制,MATLAB 出一张好看的图
- PHP 编写和使用web服务 第一节
- php ip获取邮政编码,php获取指定(访客)IP所有信息(地址、邮政编码、国家、经纬度等)的方法...
- [python]json.loads 几种错误 解决方案
- java 确定对象的引用_JVM学习笔记之了解对象存活判断和4种引用【三】
- iPhone:你知道这 13 年我是怎么过的吗?
- 基础学习day06---面向对象二---static,类的初始化和调用顺序、单例模式
- win10 python ffmpeg推流到b站
- C,java,Python,这些名字背后的江湖!
- 中文分词之维特比算法详解
- page_to_phys()和virt_to_phys()
- 算法设计与分析基础 第六章谜题
- 数学小课堂:数学思维(从逻辑出发想问题)
- linux如何配置ipv6DNS,linuxipv6dns服务器配置.doc
- n的阶乘c语言输出为负数,为什么 n 为20 阶乘为负数
- 数据争用(data race) 和竞态条件(race condition)