kafka 四:(设计模式)Kafka数据上传至Hbase

  • datamove目录
    • AttendeesHandler类
    • IParseHandler接口
    • UserFriendHandler类
  • work
    • EventAttendeesWorker类
    • IWriter接口
    • KafkaParentWorker类
  • writer
    • HBaseWriter类
    • IWriter接口

工程所需的依赖和建立maven工程请参考同栏目的其它博客

详细的代码请免积分下载,连接如下:
xxxxxxxxxxxxxxxxxxx

datamove目录

AttendeesHandler类

package nj.zb.kb05.hbase;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;import javax.rmi.PortableRemoteObject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;/*** 将 kafka中 topic  event_attendees 数据导入到  hbase  event_attendee表中* @author* @Des* @date 2020/5/27*/
public class EventAttendeeshb {public static void main(String[] args) {// HDFS基础配置Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.153.141:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");prop.put(ConsumerConfig.GROUP_ID_CONFIG,"eventattendees1");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);consumer.subscribe(Collections.singletonList("event_attendees"));// HBASE基础配置final Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum","192.168.153.141");conf.set("hbase.zookeeper.property.clientPort","2181");conf.set("hbase.rootdir","hdfs://192.168.153.141:9000/hbase");try {final Connection connection = ConnectionFactory.createConnection(conf);final Table table = connection.getTable(TableName.valueOf("events_db:event_attendee"));while (true){List<Put> datas = new ArrayList<>();final ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord record : records) {System.out.println(record.value());final String[] info = record.value().toString().split(",");Put put = new Put(Bytes.toBytes((info[0]+info[1]+info[2]).hashCode()));put.addColumn("euat".getBytes(),"eventid".getBytes(),info[0].getBytes());put.addColumn("euat".getBytes(),"friendid".getBytes(),info[1].getBytes());put.addColumn("euat".getBytes(),"stat".getBytes(),info[2].getBytes());datas.add(put);}
//                consumer.commitAsync();table.put(datas);
//                table.close();}} catch (IOException e) {e.printStackTrace();}}
}

IParseHandler接口

package nj.zb.kb05.designer.datamove;import org.apache.hadoop.hbase.client.Put;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.List;// 负责处理数据,将数据转换成List<Put>形式
public interface IParseHandler {public List<Put> parse(ConsumerRecords<String, String> records);
}

UserFriendHandler类

package nj.zb.kb05.designer.datamove;import org.apache.hadoop.hbase.client.Put;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.List;/*** @author* @Des* @date 2020/5/28*/
public class UuserFriendHandler implements IParseHandler {@Overridepublic List<Put> parse(ConsumerRecords<String, String> records) {return null;}
}

work

EventAttendeesWorker类

package nj.zb.kb05.designer.work;import nj.zb.kb05.designer.writer.IWriter;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Collections;/*** @author* @Des* @date 2020/5/28*/
public class EventAttendeesWorker extends KafkaParentWorker {private IWriter writer;public EventAttendeesWorker(String topic, long time, String groupId, IWriter writer) {super(topic, time, groupId);this.writer = writer;}public EventAttendeesWorker(String topic,IWriter writer) {super(topic);this.writer = writer;}@Overridepublic void pushData(String tableName) {KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(super.prop);consumer.subscribe(Collections.singletonList(this.getTopicName()));while (true){final ConsumerRecords<String, String> records = consumer.poll(this.getPoolTime());writer.write(records,tableName);}}
}

IWriter接口

package nj.zb.kb05.designer.work;public interface IWorker {public void pushData(String tableName);
}

KafkaParentWorker类

package nj.zb.kb05.designer.work;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;/*** @author* @Des* @date 2020/5/28*/
public abstract class KafkaParentWorker implements IWorker {// HDFS基础配置protected Properties prop = new Properties();private String topicName =null;private long poolTime = 0;public KafkaParentWorker(String topic, long time, String groupId){prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.153.141:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");prop.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);this.topicName = topic;this.poolTime = time;}public KafkaParentWorker(String topic){this(topic, 100, "grout1");}public String getTopicName() {return topicName;}public void setTopicName(String topicName) {this.topicName = topicName;}public long getPoolTime() {return poolTime;}public void setPoolTime(long poolTime) {this.poolTime = poolTime;}
}

writer

HBaseWriter类

package nj.zb.kb05.designer.writer;import nj.zb.kb05.designer.datamove.IParseHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;/*** @author* @Des* @date 2020/5/28*/
public class HBaseWriter implements IWriter {private IParseHandler parseHandler;private Connection connection;public HBaseWriter(IParseHandler handler){this.parseHandler = handler;Configuration config = HBaseConfiguration.create();config.set("hbase.rootdir","hdfs://192.168.153.141:9000/hbase");config.set("hbase.zookeeper.quorum","192.168.153.141");config.set("hbase.zookeeper.property.clientPort","2181");try {Connection connection = ConnectionFactory.createConnection(config);} catch (IOException e) {e.printStackTrace();}}@Overridepublic int write(ConsumerRecords<String, String> records, String tableName) {try {final Table table = connection.getTable(TableName.valueOf(tableName) );// 下午好好思考,如果,tablename有很多个的时候,使用 if elseif else 结构是否合适List<Put> datas = parseHandler.parse(records);
//            table.put(datas);table.close();} catch (IOException e) {e.printStackTrace();}return 0;}
}

IWriter接口

package nj.zb.kb05.designer.writer;import org.apache.kafka.clients.consumer.ConsumerRecords;/*** @author* @Des* @date 2020/5/28*/
public interface IWriter {public int write(ConsumerRecords<String, String> records, String tableName);
}

kafka 四:(设计模式)Kafka数据上传至Hbase相关推荐

  1. STM32--ESP8266物联网WIFI模块(贝壳物联)--温湿度数据上传服务器显示

    本文适用于STM32F103C8T6等MCU,其他MCU可以移植,完整资源见文末链接 一.简介 随着移动物联网的发展,各场景下对于物联控制.数据上传.远程控制的诉求也越来越多,基于此乐鑫科技推出了便宜 ...

  2. 【AllJoyn专题】基于AllJoyn和Yeelink的传感器数据上传与指令下行的研究

    接触高通物联网框架AllJoyn不太久,但确是被深深地吸引了.在我看来,促进我深入学习的原因有三点:一.AllJoyn开源,对开源的软硬件总会有种莫名的喜爱,虽然或许不会都深入下去:二.顺应潮流,物联 ...

  3. m5310模组数据上传至onenet_硬核干货!基于M5310-A的NB-IoT水表通信模块软件业务逻辑分享...

    根据不同的应用场景需求,目前NB-IoT水表主要有以下几种方案: 图1 几种常见NB水表方案 接下来将从NB-IoT水表上电开机.模组初始化.入网判断.业务逻辑四个环节来详细讲述,以下业务流程仅供参考 ...

  4. # nest笔记四:文件的上传与下载

    nest笔记四:文件的上传与下载 nest是基于express之上的,所以,其文件上传和下载的功能,实际上就是express的功能. 下载 文件下载有两种,一个是sendFile,一个是downloa ...

  5. 【Hadoop】HDFS操作、数据上传与下载原理解析、高级特性及底层原理

    HDFS操作.数据上传与下载原理解析.高级特性及底层原理 1 HDFS操作 1.1 Web Console网页工具 1.2 命令行 1.2.1 普通的操作命令 1.2.2 管理员命令 1.3 Java ...

  6. NodeMCU(ESP-12E)+阿里云实现数据上传和控制继电器开锁

    源码及工具下载:https://github.com/RL-Y/NodeMCU-aliyun.git Arduino15:提取码:ythf :链接: https://pan.baidu.com/s/1 ...

  7. 通过GPRS将GPS数据上传到服务器

    文章目录 一.目的 二.使用的器件 1. GPRS模块和物联网卡 2. GPS模块 3. MCU 三.电路连接 四.程序设计 五.程序代码 一.目的   将GPS获取到的位置信息,通过GPRS将数据上 ...

  8. 正点原子STM32f103ZE精英开发板实现基于ESP8266 WIFI模块温湿度数据上传至乐联网平台

    文章目录 一.准备工作 二.实现流程 1.AT指令 2.接入乐联网平台 3.代码实现 三.数据可视化分析 一.准备工作 1.准备一块正点原子STM32f103ZE精英开发板 2.在某宝上购买好正点原子 ...

  9. 新大陆物联网-Android实现网关功能-连接云平台并上传传感器数据-获取执行器指令并执行-Android网关开发-通信-数据上传云平台-JAVA原理讲解-免费云平台使用-竞赛2022国赛真题

    目录 一.任务要求 二.开发环境 三.网关上线 四.数据上传与命令下发 五.JSON命令解析思路 六.总结 一.任务要求 我们将要实现的效果是:Android开发平板与Lora板进行有线串口通信,解析 ...

最新文章

  1. Linux下对MySQL数据库的常见操作【创建】【删除】【导入数据库】
  2. 分针网——Javascript不同浏览器差异及兼容方法
  3. 遗传算法与直接搜索工具箱学习笔记 -----从直接搜索算法开始
  4. spark 中的RDD编程:基于Java api
  5. matlab 刻度非均匀控制,MATLAB 出一张好看的图
  6. PHP 编写和使用web服务 第一节
  7. php ip获取邮政编码,php获取指定(访客)IP所有信息(地址、邮政编码、国家、经纬度等)的方法...
  8. [python]json.loads 几种错误 解决方案
  9. java 确定对象的引用_JVM学习笔记之了解对象存活判断和4种引用【三】
  10. iPhone:你知道这 13 年我是怎么过的吗?
  11. 基础学习day06---面向对象二---static,类的初始化和调用顺序、单例模式
  12. win10 python ffmpeg推流到b站
  13. C,java,Python,这些名字背后的江湖!
  14. 中文分词之维特比算法详解
  15. page_to_phys()和virt_to_phys()
  16. 算法设计与分析基础 第六章谜题
  17. 数学小课堂:数学思维(从逻辑出发想问题)
  18. linux如何配置ipv6DNS,linuxipv6dns服务器配置.doc
  19. n的阶乘c语言输出为负数,为什么 n 为20 阶乘为负数
  20. 数据争用(data race) 和竞态条件(race condition)

热门文章

  1. DataStory X Kungfu | DemoDay亮点大揭秘
  2. VC++ 操作Word
  3. JQ(一)--JQ简介
  4. PS给照片添加镜头梦幻光斑动态图片效果
  5. Super Jumper:一个2DOpenGL ES游戏
  6. 理清「万维网」和「互联网」的概念和区别
  7. DevStack环境搭建
  8. 跟着团子学SAP PS-前台篇-WBS元素介绍及相关操作 CJ20N
  9. 管理经济学简答题、计算题与案例分析题
  10. 市面最经典的中文版需求分析说明书模板 详细讲解各目录含义 分离需求说明和需求分析