本文已收录github:https://github.com/BigDataScholar/TheKingOfBigData,里面有大数据高频考点,Java一线大厂面试题资源,上百本免费电子书籍,作者亲绘大数据生态圈思维导图…持续更新,欢迎star!

前言

上一篇文章已经为大家介绍了 MySQL 在用户画像的标签数据存储中的具体应用场景,本篇我们来谈谈 HBase 的使用!

原著作者:赵宏田
来源:《用户画像方法论与工程化解决方案》

HBase存储

1. HBase简介

HBase是一个高性能、列存储、可伸缩、实时读写的分布式存储系统,同样运行在HDFS之上。与Hive不同的是,HBase能够在数据库上实时运行,而不是跑MapReduce任务,适合进行大数据的实时查询

画像系统中每天在Hive里跑出的结果集数据可同步到 HBase数据库 ,用于线上实时应用的场景。

下面介绍几个基本概念:

  • row key:用来表示唯一一行记录的键,HBase的数据是按照 row key 的字典顺序进行全局排列的。访问HBase中的行只有3种方式:
  • 通过单个row key访问
  • 通过row key的正则访问
  • 全表扫描

由于HBase通过 rowkey 对数据进行检索,而rowkey 由于长度限制的因素不能将很多查询条件拼接在 rowkey 中,因此 HBase 无法像关系数据库那样根据多种条件对数据进行筛选。一般地,HBase需建立二级索引来满足根据复杂条件查询数据的需求。

Rowkey设计时需要遵循三大原则:

  • 唯一性原则:rowkey需要保证唯一性,不存在重复的情况。在画像中一般使用用户id作为rowkey
  • 长度原则:rowkey的长度一般为10-100bytes
  • 散列原则:rowkey的散列分布有利于数据均衡分布在每个RegionServer,可实现负载均衡

– columns family:指列簇,HBase中的每个列都归属于某个列簇。列簇是表的schema的一部分,必须在使用表之前定义。划分columns family的原则如下:

  • 是否具有相似的数据格式
  • 是否具有相似的访问类型

常用的增删改查命令如下:

1)创建一个表,指定表名和列簇名:

create  '<table name>','<column family>'

2)扫描表中数据,并显示其中的10条记录:

scan  '<table name>',{LIMIT=>10}

3)使用get命令读取数据:

get  '<table name>','row1'

4)插入数据:

put  '<table name>','row1','<colfamily:colname>','<value>'

5)更新数据:

put  '<table name>','row ','Column family:column name','new value'

6)在删除表之前先将其禁用,然后删除:

disable  '<table name>'
drop  '<table name>'

下面通过一个案例来介绍HBase在画像系统中的应用场景和工程化实现方式。

2. 应用场景

渠道运营人员为促进未注册的新安装用户注册、下单,计划通过App首页弹窗(如下图所示)发放红包或优惠券的方式进行引导。在该场景中可通过画像系统实现对应功能。

业务逻辑上,渠道运营人员通过组合用户标签(如“未注册用户”和“安装距今天数”小于××天)筛选出对应的用户群,然后选择将对应人群推送到“广告系统”,这样每天画像系统的ETL调度完成后对应人群数据就被推送到HBase数据库进行存储。满足条件的新用户来访App时,由在线接口读取HBase数据库,在查询到该用户时为其推送该弹窗。

下面通过某工程案例来讲解HBase在该触达用户场景中的应用方式。

3. 工程化案例

运营人员在画像系统中根据业务规则定义组合用户标签筛选出用户群,并将该人群上线到广告系统中


        在业务人员配置好规则后,下面我们来看在数据调度层面是如何运行的。

用户标签数据经过ETL将每个用户身上的标签聚合后插入到目标表中,如dw.userprofile_userlabel_map_all。聚合后数据存储为每个用户id,以及他身上对应的标签集合,数据格式如图所示:


        接下来需要将 Hive 中的数据导入HBase,便于线上接口实时调用库中数据。

HBase的服务器体系结构遵循主从服务器架构(如图所示),同一时刻只有一个HMaster处于活跃状态,当活跃的Master挂掉后,Backup HMaster自动接管整个HBase集群。在同步数据前,首先需要判断HBase的当前活跃节点是哪台机器。


        执行如下脚本:

# 判断活跃节点
global activenode
for node in ("10.xxx.xx.xxx","10.xxx.xx.xxx"):   # 两台机器作为Master,判断哪台HMaster处于活跃状态command = "curl http://"+ str(node) + ":9870/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus"status = os.popen(command).read()print("HBase Master status: ".format(status))if ("active" in status):activenode = node

执行完毕后,可通过返回的“State”字段判断当前节点状态(活跃为“active”,不活跃为“standby”),如图所示。

        为避免数据都写入一个region,造成HBase的数据倾斜问题。在当前HMaster活跃的节点上,创建预分区表:

create 'userprofile_labels', { NAME => "f", BLOCKCACHE => "true" , BLOOMFILTER => "ROWCOL" , COMPRESSION => 'snappy', IN_MEMORY => 'true' }, {NUMREGIONS => 10,SPLITALGO => 'HexStringSplit'}

将待同步的数据写入HFile,HFile中的数据以 key-value 键值对方式存储,然后将 HFile 数据使用 BulkLoad 批量写入 HBase 集群中。Scala脚本执行如下:

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.HBase.client.ConnectionFactory
import org.apache.hadoop.HBase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.HBase.io.ImmutableBytesWritable
import org.apache.hadoop.HBase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.HBase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.SparkSessionobject Hive2HBase {def main(args: Array[String]): Unit = {// 传入日期参数 和 当前活跃的master节点val data_date = args(0)val node = args(1)   //当前活跃的节点ipval spark = SparkSession.builder().appName("Hive2HBase").config("spark.serializer","org.apache.spark.serializer.KryoSerializer").config("spark.storage.memoryFraction", "0.1").config("spark.shuffle.memoryFraction", "0.7").config("spark.memory.useLegacyMode", "true").enableHiveSupport().getOrCreate()//创建HBase的配置val conf = HBaseConfiguration.create()conf.set("HBase.zookeeper.quorum", "10.xxx.xxx.xxx,10.xxx.xxx.xxx")conf.set("HBase.zookeeper.property.clientPort", "8020")//为了预防hfile文件数过多无法进行导入,设置参数值conf.setInt("HBase.hregion.max.filesize", 10737418240)conf.setInt("HBase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 3200)val Data = spark.sql(s"select userid,userlabels from dw.userprofile_usergroup_labels_all where data_date='${data_date}'")val dataRdd = Data.rdd.flatMap(row => {    val rowkey = row.getAs[String]("userid".toLowerCase)val tagsmap = row.getAs[Map[String, Object]]("userlabels".toLowerCase)val sbkey = new StringBuffer()  // 对MAP结构转化 a->b  'a':'b'val sbvalue = new StringBuffer()for ((key, value) <- tagsmap){sbkey.append(key + ":")val labelght = if (value == ""){"-999999"} else {value}sbvalue.append(labelght + ":")}val item = sbkey.substring(0,sbkey.length -1)val score = sbvalue.substring(0,sbvalue.length -1)Array((rowkey,("f","i",item)),(rowkey,("f","s",score)))})// 将rdd转换成HFile需要的格式val rdds = dataRdd.filter(x=>x._1 != null).sortBy(x=>(x._1,x._2._1, x._2._2)).map(x => {//KeyValue的实例为valueval rowKey = Bytes.toBytes(x._1)val family = Bytes.toBytes(x._2._1)val colum = Bytes.toBytes(x._2._2)val value = Bytes.toBytes(x._2._3.toString)(new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, value))})//文件保存在hdfs的位置val locatedir = "hdfs://" + node.toString + ":8020/user/bulkload/hfile/usergroup_HBase_" + data_date//在locatedir生成的Hfile文件rdds.saveAsNewAPIHadoopFile(locatedir,classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat2],conf)//HFile导入到HBaseval load = new LoadIncrementalHFiles(conf)//HBase的表名val tableName = "userprofile_labels"//创建HBase的链接,利用默认的配置文件,读取HBase的master地址val conn = ConnectionFactory.createConnection(conf)//根据表名获取表val table = conn.getTable(TableName.valueOf(tableName))try {//获取HBase表的region分布val regionLocation = conn.getregionLocation(TableName.valueOf(tableName))//创建一个hadoop的mapreduce的jobval job = Job.getInstance(conf)//设置job名称,任意命名job.setJobName("Hive2HBase")//输出文件的内容KeyValuejob.setMapOutputValueClass(classOf[KeyValue])//设置文件输出key, outkey要用ImmutableBytesWritablejob.setMapOutputKeyClass(classOf[ImmutableBytesWritable])//配置HFileOutputFormat2的信息HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocation)//开始导入load.doBulkLoad(new Path(locatedir), conn.getAdmin, table, regionLocation)} finally {table.close()conn.close()}spark.close()}
}

提交Spark任务,将HFile中数据bulkload到HBase中。执行完成后,可以在HBase中看到该数据已经写入“userprofile_labels”中


        在线接口在查询HBase中数据时,由于HBase无法像关系数据库那样根据多种条件对数据进行筛选(类似SQL语言中的where筛选条件)。一般地HBase需建立二级索引来满足根据复杂条件查询数据的需求,本案中选用 Elasticsearch 存储HBase索引数据

在组合标签查询对应的用户人群场景中,首先通过组合标签的条件在 Elasticsearch 中查询对应的索引数据,然后通过索引数据去 HBase中批量获取 rowkey 对应的数据(Elasticsearch中的documentid和HBase中的rowkey都设计为用户id


        为了避免从 Hive 向 HBase 灌入数据时缺失,在向HBase数据同步完成后,还需要校验HBase和Hive中数据量是否一致,如出现较大的波动则发送告警信息。

下面通过Python脚本来看该HBase状态表数据校验逻辑:

# 查询Hive中数据
def check_Hive_data(data_date):r = os.popen("Hive -S -e\"select count(1) from dw.userprofile_usergroup_labels_all where data_date='"+data_date+"'\"")Hive_userid_count = r.read()r.close()Hive_count = str(int(Hive_userid_count) print "Hive_result: " + str(Hive_count)print "Hive select finished!"# 查询HBase中数据
def check_HBase_data(data_date):r = os.popen("HBase  org.apache.hadoop.HBase.mapreduce.RowCounter 'userprofile_labels'\" 2>&1 |grep ROWS")HBase_count = r.read().strip()[5:]r.close()print "HBase result: " + str(HBase_count)print "HBase select finished!"# 连接 DB,将查询结果插入表
db = MySQLdb.connect(host="xx.xx.xx.xx",port=3306,user="username", passwd="password", db="xxx", charset="utf8")
cursor = db.cursor()
cursor.execute("INSERT INTO service_monitor(date, service_type, Hive_count, HBase_count) VALUES('"+datestr_+"', 'advertisement', "+str(Hive_userid_count)+","+str(HBase_count)+")")
db.commit()

本案例中将 userid 作为 rowkey 存入HBase,一方面在组合标签的场景中可以支持条件查询多用户人群,另一方面可以支持单个用户标签的查询,例如查看某 id 用户身上的标签,以便运营人员决定是否对其进行运营操作。

HBase在离线数仓环境的服务架构如图所示

小结

本篇文章主要介绍了在用户画像的业务场景下,HBase存储相关数据的真实应用场景!下一篇为大家介绍 Elasticsearch ,敬请期待!

用户画像 | 标签数据存储之HBase真实应用相关推荐

  1. 用户画像 | 标签数据存储之MySQL真实应用

    本文已收录github:https://github.com/BigDataScholar/TheKingOfBigData,里面有大数据高频考点,Java一线大厂面试题资源,上百本免费电子书籍,作者 ...

  2. 用户画像 | 标签数据存储之Elasticsearch真实应用

    本文已收录github:https://github.com/BigDataScholar/TheKingOfBigData,里面有大数据高频考点,Java一线大厂面试题资源,上百本免费电子书籍,作者 ...

  3. 用户画像 | 标签数据存储之Hive真实应用

    本文已收录github:https://github.com/BigDataScholar/TheKingOfBigData,里面有大数据高频考点,Java一线大厂面试题资源,上百本免费电子书籍,作者 ...

  4. 用户画像标签数据存储之Elasticsearch存储

    目录 0. 相关文章链接 1. Elasticsearch简介 2. 应用场景 3. 工程化案例 4. 用户画像标签数据存储总结 注:此博文为根据 赵宏田 老师的 用户画像·方法论与工程化解决方案 一 ...

  5. 用户画像标签数据存储之MySQL存储

    目录 0. 相关文章链接 1. 元数据管理 2. 监控预警数据 2.1. 标签计算数据监控 2.2. 服务层同步数据监控 2.3. 结果集存储 注:此博文为根据 赵宏田 老师的 用户画像·方法论与工程 ...

  6. 用户画像标签数据存储之Hive存储

    目录 0. 相关文章链接 1. Hive数据仓库 2. 分区存储 3. 标签汇聚 4. ID-Mapping 注:此博文为根据 赵宏田 老师的 用户画像·方法论与工程化解决方案 一书读后笔记而来,仅供 ...

  7. 用户画像标签数据开发之组合标签计算

    目录 0. 相关文章链接 1. 什么是组合标签计算 2. 应用场景 3. 数据计算 注:此博文为根据 赵宏田 老师的 用户画像·方法论与工程化解决方案 一书读后笔记而来,仅供学习使用 0. 相关文章链 ...

  8. 用户画像标签数据开发之标签相似度计算

    目录 0. 相关文章链接 1. 什么是标签相似度计算 2. 案例场景 3. 数据开发 注:此博文为根据 赵宏田 老师的 用户画像·方法论与工程化解决方案 一书读后笔记而来,仅供学习使用 0. 相关文章 ...

  9. 用户画像标签数据开发之用户特征库开发

    目录 0. 相关文章链接 1. 什么是用户特征库 2. 特征库规划 3. 数据开发 4. 其他特征库规划 注:此博文为根据 赵宏田 老师的 用户画像·方法论与工程化解决方案 一书读后笔记而来,仅供学习 ...

最新文章

  1. php 开发restful api,用PHP创建RESTful API?
  2. HashSet中的add()方法( 三 )(详尽版)
  3. C语言开发windows桌面程序,开发 windows 桌面软件,现在主流用什么技术?
  4. 【DIY】RGB光立方
  5. Boost:GPU上的2D图像中绘制最终的随机“walk”,并使用OpenCV进行显示
  6. Python爬虫 搜索并下载图片
  7. jp在java中无法编译_JPanal上加图片的问题!
  8. 互联网人才流向说明了什么
  9. 适用于特殊类型自然语言分类的自适应特征谱神经网络
  10. CISO的真正挑战:密码管理、IoT安全合规性
  11. 如何调整标题字体大小_软网推荐:找回调整Windows 10字号功能
  12. 什么是 开发环境、测试环境、生产环境、UAT环境、仿真环境
  13. 中职计算机考证的软件
  14. 联想服务器系统初始化失败怎么回事,win10重置初始化失败怎么解决
  15. 大数据基础课02 从萌芽到爆发,大数据经历了哪些发展?
  16. 在Spyder 中安装第三方包
  17. 免费无版权可商用图标、图片素材
  18. 网络能看到计算机 但是进不去,共享文件夹 在网络邻居看到别人的电脑 进不去...
  19. [环境配置]Win10 这台计算机中已经安装了 .NET Framework 4.5.2 或版本更高的更新
  20. 树莓派远程4G遥控车教程(三)-摄像头安装及实现局域网实时监控

热门文章

  1. Windows的字体LOGFONT
  2. android相机添加一层图片,android 在照片上加标签(贴纸标签相机)源码
  3. 内网渗透思路06之一次完整的渗透测试
  4. sqlserver卸载及重装到其他盘
  5. 无网络linux本地yum源,Linux无网配置CentOS本地yum源
  6. trello_从Gmail侧边栏快速添加Trello卡
  7. 淮海工学院软件测试技术实验三功能测试,软件测试——实验1.doc
  8. (3)UVM验证平台搭建之介绍
  9. 肾缺血再灌注动物模型 大小鼠肾缺血再灌注IR模型
  10. RNN架构解析——认识RNN模型