把Kafka中的数据传输到HBase中

  • 查看topic中的消息
  • 在HBase中创建topic所需要的表
  • 编写Java程序传输数据
  • 使用设计模式编写Java程序传输数据

查看topic中的消息

  • 查看需要传输的topic中的消息数量
//event_attendees
[root@hadoop100 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop100:9092 --topic event_attendees --time -1 --offsets 1//events
[root@hadoop100 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop100:9092 --topic events --time -1 --offsets 1//train
[root@hadoop100 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop100:9092 --topic train --time -1 --offsets 1//user_friends
[root@hadoop100 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop100:9092 --topic user_friends --time -1 --offsets 1//users
[root@hadoop100 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop100:9092 --topic users --time -1 --offsets 1

在HBase中创建topic所需要的表

  • 查看命名空间
hbase(main):001:0> list_namespace
  • 创建命名空间
hbase(main):001:0> create_namesppace 'events_db'
  • 创建表
//events
hbase(main):002:0> create 'events_db:events','schedule','location','creator','remark'//users
hbase(main):003:0> create 'events_db:users','profile','region','registration'//event_attendee
hbase(main):004:0> create 'events_db:event_attendee','euat'//train
hbase(main):005:0> create 'events_db:train','eu'//user_friend
hbase(main):006:0> create 'events_db:user_friend','uf'
  • 查看表
hbase(main):007:0> list_namespace_tables 'events_db'

编写Java程序传输数据

  • event_attendees
package nj.zb.kb09.kafkatohbase;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;public class UserFriendTOHbase {public static void main(String[] args) {//kafka客户端属性配置Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.100:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);//会话窗口3秒prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,30000);//不自动提交prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);prop.put(ConsumerConfig.GROUP_ID_CONFIG, "userFriend");prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);consumer.subscribe(Collections.singleton("user_friends"));//配置hbase信息 连接hbase数据库Configuration conf = HBaseConfiguration.create();conf.set("hbase.rootdir","hdfs://192.168.136.100:9000/hbase");conf.set("hbase.zookeeper.quorum","192.168.136.100");conf.set("hbase.zookeeper.property.clientPort","2181");try {Connection connection= ConnectionFactory.createConnection(conf);Table eventAttendTable = connection.getTable(TableName.valueOf("events_db:user_friend"));while (true){ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));ArrayList<Put> datas = new ArrayList<Put>();for (ConsumerRecord<String, String> p:poll){System.out.println(p.value());String[] split = p.value().split(",");Put put = new Put(Bytes.toBytes((split[0] + split[1] .hashCode())));put.addColumn("uf".getBytes(),"userid".getBytes(),split[0].getBytes());put.addColumn("uf".getBytes(),"friendid".getBytes(),split[1].getBytes());datas.add(put);}eventAttendTable.put(datas);}} catch (IOException e) {e.printStackTrace();}}
}

使用设计模式编写Java程序传输数据

  • IWorker
package nj.zb.kb09.kafkahbaseUserfriendgj;public interface IWorker {public void fillData();
}
  • IWriter
package nj.zb.kb09.kafkahbaseUserfriendgj;import org.apache.kafka.clients.consumer.ConsumerRecords;import java.io.IOException;public interface IWriter {public int write(ConsumerRecords<String, String>records,String tableName) throws IOException;
}
  • IParseRecord
package nj.zb.kb09.kafkahbaseUserfriendgj;import org.apache.hadoop.hbase.client.Put;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.List;/*
* 组装者
* 将kafka消费的信息,通过加工转化,得到List<Put>对象,用于hbase存储使用
* */
public interface IParseRecord {public List<Put> parse(ConsumerRecords<String,String> records);
}
  • ParentWorker
package nj.zb.kb09.kafkahbaseUserfriendgj;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;public abstract class ParentWorker implements  IWorker {protected Properties prop;public ParentWorker(String groupName) {prop=new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.100:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);//会话窗口3秒prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,30000);//不自动提交prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);prop.put(ConsumerConfig.GROUP_ID_CONFIG,groupName);prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");}
}
  • HbaseWorker
package nj.zb.kb09.kafkahbaseUserfriendgj;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class HbaseWorker extends ParentWorker {private IWriter writer;private String topic;private String target;public HbaseWorker(IWriter writer,String groupName,String topic,String targetTable) {super(groupName);this.topic=topic;this.target=targetTable;this.writer=writer;}@Overridepublic void fillData() {KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);consumer.subscribe(Collections.singleton(this.topic));try {while (true){ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));Integer rowNum= writer.write(poll,this.target);System.out.println("行数:"+rowNum);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}} catch (IOException e) {e.printStackTrace();}}
}
  • HbaseWriter
package nj.zb.kb09.kafkahbaseUserfriendgj;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.io.IOException;
import java.util.List;public class HbaseWriter implements IWriter {private  Connection connection;private IParseRecord parseRecord;/*public IParseRecord getParseRecord() {return parseRecord;}public void setParseRecord(IParseRecord parseRecord) {this.parseRecord = parseRecord;}*/public HbaseWriter(IParseRecord parseRecord) {this.parseRecord=parseRecord;//配置hbase信息 连接hbase数据库Configuration conf = HBaseConfiguration.create();conf.set("hbase.rootdir","hdfs://192.168.136.100:9000/hbase");conf.set("hbase.zookeeper.quorum","192.168.136.100");conf.set("hbase.zookeeper.property.clientPort","2181");try {connection= ConnectionFactory.createConnection(conf);} catch (IOException e) {e.printStackTrace();}}@Overridepublic int write(ConsumerRecords<String, String> records, String tableName) throws IOException {Table eventAttendTable = connection.getTable(TableName.valueOf(tableName));List<Put> datas = parseRecord.parse(records);eventAttendTable.put(datas);return datas.size();}
}
  • UserFriendHandler
package nj.zb.kb09.kafkahbaseUserfriendgj;import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.ArrayList;
import java.util.List;public class UserFriendHandler implements IParseRecord{List<Put> datas=new ArrayList<>();@Overridepublic List<Put> parse(ConsumerRecords<String, String> records) {for (ConsumerRecord<String, String> p:records){System.out.println(p.value());String[] split = p.value().split(",");Put put = new Put(Bytes.toBytes((split[0] + split[1] .hashCode())));put.addColumn("uf".getBytes(),"userid".getBytes(),split[0].getBytes());put.addColumn("uf".getBytes(),"friendid".getBytes(),split[1].getBytes());datas.add(put);}return datas;}
}
  • EventAttendHandler
package nj.zb.kb09.kafkahbaseUserfriendgj;import nj.zb.kb09.kafkahbaseUserfriendgj.IParseRecord;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.ArrayList;
import java.util.List;public class EventAttendHandler implements IParseRecord {List<Put> datas=new ArrayList<>();@Overridepublic List<Put> parse(ConsumerRecords<String, String> records) {for (ConsumerRecord<String, String> p:records){System.out.println(p.value());String[] split = p.value().split(",");Put put = new Put(Bytes.toBytes((split[0] + split[1] + split[2]).hashCode()));put.addColumn("euat".getBytes(),"eventid".getBytes(),split[0].getBytes());put.addColumn("euat".getBytes(),"userid".getBytes(),split[1].getBytes());put.addColumn("euat".getBytes(),"state".getBytes(),split[2].getBytes());datas.add(put);}return datas;}
}
  • EventsHandler
package nj.zb.kb09.kafkahbaseUserfriendgj;import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.ArrayList;
import java.util.List;public class EventsHandler implements IParseRecord{@Overridepublic List<Put> parse(ConsumerRecords<String, String> records) {List<Put> datas=new ArrayList<>();for (ConsumerRecord<String, String> record :records) {String[] split = record.value().split(",");System.out.println(record);Put put=new Put(Bytes.toBytes(split[0].hashCode()));put.addColumn("schedule".getBytes(),"start_time".getBytes(),split[2].getBytes());put.addColumn("location".getBytes(),"city".getBytes(),split[3].getBytes());put.addColumn("location".getBytes(),"state".getBytes(),split[4].getBytes());put.addColumn("location".getBytes(),"zip".getBytes(),split[5].getBytes());put.addColumn("location".getBytes(),"country".getBytes(),split[6].getBytes());put.addColumn("location".getBytes(),"lat".getBytes(),split[7].getBytes());put.addColumn("location".getBytes(),"lng".getBytes(),split[8].getBytes());put.addColumn("creator".getBytes(),"user_id".getBytes(),split[1].getBytes());put.addColumn("remark".getBytes(),"common_words".getBytes(),split[9].getBytes());datas.add(put);}return datas;}
}
  • TrainHandler
package nj.zb.kb09.kafkahbaseUserfriendgj;import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.ArrayList;
import java.util.List;public class TrainHandler implements IParseRecord {@Overridepublic List<Put> parse(ConsumerRecords<String, String> records) {List<Put> datas=new ArrayList<>();for (ConsumerRecord<String, String> record :records) {String[] split = record.value().split(",");System.out.println(record);Put put = new Put(Bytes.toBytes((split[0] + split[1]).hashCode()));put.addColumn("eu".getBytes(),"user".getBytes(),split[0].getBytes());put.addColumn("eu".getBytes(),"event".getBytes(),split[1].getBytes());put.addColumn("eu".getBytes(),"invited".getBytes(),split[2].getBytes());put.addColumn("eu".getBytes(),"timestamp".getBytes(),split[3].getBytes());put.addColumn("eu".getBytes(),"interested".getBytes(),split[4].getBytes());put.addColumn("eu".getBytes(),"not_interested".getBytes(),split[5].getBytes());datas.add(put);}return datas;}
}
  • UsersHandler
package nj.zb.kb09.kafkahbaseUserfriendgj;import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.ArrayList;
import java.util.List;public class UsersHandler implements IParseRecord{@Overridepublic List<Put> parse(ConsumerRecords<String, String> records) {List<Put> datas=new ArrayList<>();for (ConsumerRecord<String, String> record :records) {String[] split = record.value().split(",");if(split[0].trim().length()==0){continue;}System.out.println(record);Put put = new Put(Bytes.toBytes(split[0].hashCode()));put.addColumn("profile".getBytes(),"birthyear".getBytes(),split[2].getBytes());put.addColumn("profile".getBytes(),"gender".getBytes(),split[3].getBytes());put.addColumn("region".getBytes(),"locale".getBytes(),split[1].getBytes());if(split.length>5){put.addColumn("region".getBytes(),"location".getBytes(),split[5].getBytes());}if (split.length>6) {put.addColumn("region".getBytes(), "timezone".getBytes(), split[6].getBytes());}if (split.length>4) {put.addColumn("registration".getBytes(), "joinedAt".getBytes(), split[4].getBytes());}datas.add(put);}return datas;}
}
  • ALLTohbase2
package nj.zb.kb09.kafkahbaseUserfriendgj;public class ALLTohbase2 {public static void main(String[] args) {//user_friend/*IParseRecord record=new UserFriendHandler();IWriter writer=new HbaseWriter(record);IWorker worker=new HbaseWorker(writer,"userfriend","user_friends","events_db:user_friend");
*///等同于IWorker worker=new HbaseWorker(new HbaseWriter(new UserFriendHandler()),"userfriend","user_friends","events_db:user_friend");//worker.fillData();//event_attendee"//new HbaseWorker(new HbaseWriter(new EventAttendHandler()),"eventattend1","event_attendees","event_db:event_attendee").fillData();//users//new HbaseWorker(new HbaseWriter(new UsersHandler()),"users1","users","events_db:users").fillData();//eventsnew HbaseWorker(new HbaseWriter(new EventsHandler()),"events1","events","events_db:events").fillData();//train//new HbaseWorker(new HbaseWriter(new TrainHandler()),"train1","train","events_db:train").fillData();}
}

注意:这里程序运行,需要一个运行结束后,才能传输另一个topic中的数据到hbase中

  • 查看表中的数据量
//events
hbase(main):010:0> count 'events_db:events',INTERVAL => 5000000,CACHE => 5000000//event_attendee
hbase(main):011:0> count 'events_db:event_attendee',INTERVAL => 5000000,CACHE => 5000000//train
hbase(main):012:0> count 'events_db:train',INTERVAL => 5000000,CACHE => 5000000//user_friend
hbase(main):013:0> count 'events_db:user_friend',INTERVAL => 5000000,CACHE => 5000000//users
hbase(main):013:0> count 'events_db:user_friend',INTERVAL => 5000000,CACHE => 5000000

大数据——把Kafka中的数据传输到HBase中相关推荐

  1. kafka分区与分组原理_大数据技术-Kafka入门

    在大数据学习当中,主要的学习重点就是大数据技术框架,针对于大数据处理的不同环节,需要不同的技术框架来解决问题.以Kafka来说,主要就是针对于实时消息处理,在大数据平台当中的应用也很广泛.大数据学习一 ...

  2. 云计算大数据之 Kafka集群搭建

    云计算大数据之 Kafka集群搭建 版权声明: 本文为博主学习整理原创文章,如有不正之处请多多指教. 未经博主允许不得转载.https://blog.csdn.net/qq_42595261/arti ...

  3. 《大数据》杂志——大数据技术发展的十个前沿方向(中)

    大数据技术发展的十个前沿方向(中) 吴甘沙 英特尔中国研究院 doi:10.11959/j.issn.2096-0271.2015034 Ten Fronties for Big Data Techn ...

  4. dataframe数组做元素_大数据技术之使用 DataFrame 读取复杂 JSON 中的嵌套数组

    本篇教程探讨了大数据技术之使用 DataFrame 读取复杂 JSON 中的嵌套数组,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入. 众所周知,在早期Spark版本中就已经支持 ...

  5. 大数据神器Kafka入门

    Kafka安装依赖jdk,运行依赖zookeeper MAC下Kafka安装和启动:https://www.jianshu.com/p/dd2578d47ff6 kafka主要用于大数据量,一般在公司 ...

  6. java大数据组件Kafka

    定义: 大吞吐量,内置分区,可以进行数据备份,同时具备数据容错性的消息系统. Kafka可以完成在线或者离线数据消费,所谓离线数据消费实际上就是kafka将消息数据保留在磁盘上. kafka会做数据备 ...

  7. 关于大数据相关的问答汇总,持续更新中~

    NO.1 想要学好大数据需掌握哪些技术? 答:1,Java编程技术 Java编程技术是大数据学习的基础,Java是一种强类型语言,拥有极高的跨平台能力,可以编写桌面应用程序.Web应用程序.分布式系统 ...

  8. 大数据未来产业的爆发,主要会集中在哪几个方面?

    1.工业企业上云引爆工业APP市场 2017年以来,伴随着工业互联网概念的提出,工业企业上云和工业APP开发成为热点词汇.我国工业领域的云化水平较低,在上云企业数量的广度和企业在云端部署的深度两个维度 ...

  9. 大数据学习--kafka+flume++sqoop+hadoop+zookeeper+spark+flink

    大数据工程师 学习指南 一必备技能 Zookeeper.Hadoop.Hive.HBase.Flume.Sqoop.Flink 等等 1定义(from百度百科) 1.1Zookeeper 百度百科-验 ...

最新文章

  1. linux进程间通信之Posix共享内存用法详解及代码举例
  2. struts2学习笔记(4)接收参数
  3. Sentinel介绍与使用
  4. Python应该怎么去练习和使用
  5. my questions of C#
  6. 爆料称华为P50系列即将进入量产 或将于下月发布
  7. Oracle数据库案例整理-Oracle系统执行时故障-内存过少导致分配共享内存失败
  8. java语言中 负责并发编程的机制是_Java并发编程艺术-并发机制的底层原理实现...
  9. paip.提升效率--数据绑定到table原理和流程Angular js jquery实现
  10. Kinect绿灯闪烁解决方法
  11. 机器学习PAI为你自动写歌词,妈妈再也不用担心我的freestyle了
  12. 2016虾神封箱:虾神的空间统计书单
  13. 【python与excel】
  14. 11.9 至 11.17 四道典型题记录: Counter 弹出 | map函数 | 子集求取 | 有序字符桶分装
  15. python-turtle(海龟绘图)圣诞树
  16. 2022社群扫码进群活码完整系统源码+修复版的
  17. python socket通信 心跳_python socket 编程之三:长连接、短连接以及心跳(转药师Aric的文章)-阿里云开发者社区...
  18. 【opencv】18、视频操作
  19. Android逆向之旅---静态方式破解微信获取聊天记录和通讯录信息
  20. 【计算机毕业设计】微信小程序题目大全 总有一款适合你

热门文章

  1. 检验下载文件完整性、真实性——SHA256校验值
  2. pytorch中fuse_modules
  3. .value和.innerHTML
  4. webpack使用css-loader跟style-loader加载css报错
  5. 定了!考研人务必抓紧!2023考研全程资料免费分享微信大群!全网开启!
  6. 制作系统安装(微软操作系统系统)
  7. win7关闭交互式服务检测
  8. python牛顿法解非线性方程组_用牛顿迭代法解非线性方程组
  9. 新闻/媒体 发布接口定制_每日新闻摘要,19/4/14:老大哥在看
  10. getElementsByTagName用法详解