spark读取hbase数据
- import org.apache.hadoop.hbase.HBaseConfiguration
- import org.apache.hadoop.hbase.mapreduce.TableInputFormat
- import org.apache.spark.sql.SparkSession
- import scala.collection.mutable
- object HbaseUtils {
- /**
- * 指定HBase 表 字段 开始结束 rowkey scan
- *
- * @param spark
- * @param tableName
- * @param columns
- * @param startRow
- * @param endRow
- * @return
- */
- def scan(spark: SparkSession, tableName: String, columns: mutable.ArrayBuffer[String], startRow: String = null, endRow: String = null) = {
- val scan_columns = columns.mkString(" ")
- val scanConf = HBaseConfiguration.create()
- scanConf.set(TableInputFormat.INPUT_TABLE, tableName)
- if(columns.length == 0){
- scanConf.set(TableInputFormat.SCAN_COLUMNS, scan_columns)
- }
- if(StringUtils.isNotEmpty(startRow)){
- scanConf.set(TableInputFormat.SCAN_ROW_START, startRow)
- }
- if(StringUtils.isNotEmpty(endRow)){
- scanConf.set(TableInputFormat.SCAN_ROW_STOP, endRow)
- }
- scanConf.set("mapreduce.task.timeout", "1200000")
- scanConf.set("hbase.client.scanner.timeout.period", "600000")
- scanConf.set("hbase.rpc.timeout", "600000")
- val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(scanConf, classOf[TableInputFormat],
- classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
- classOf[org.apache.hadoop.hbase.client.Result])
- hBaseRDD
- }
- }
- import org.apache.hadoop.hbase.client.Result
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.spark.SparkConf
- import org.apache.spark.sql.SparkSession
- import org.apache.log4j.{Level, Logger}
- import scala.collection.mutable
- import org.apache.spark.sql.functions.{col, split}
- object MakeBaseInfo {
- val BaseInfo: Array[String] = Array("uid", "age", "height", "weight", "role", "vbadge",
- "has_photos", "video_verified", "is_human_face","has_description", "ip_location","has_avatar",
- "vip")
- val Follow: Array[String] = Array("followed_num", "followed_with_time", "follower_num")
- val StatisFeature:Array[String] = Array("click","clicked","show","send_session",
- "receive_session","desc_len","online_click","online_clicked","online_show","online_showed",
- "nearby_click","nearby_clicked","nearby_show","nearby_showed","newbie_click","newbie_clicked",
- "newbie_show","newbie_showed","social_stay_time","visit_count","visited_count")
- val TagInfo: Array[String] = Array("" +
- "tag_1_1", "tag_1_2", "tag_1_3", "tag_1_4",
- "tag_2_1", "tag_2_2", "tag_2_3", "tag_2_4", "tag_2_5", "tag_2_6", "tag_2_7", "tag_2_8", "tag_2_9", "tag_2_10", "tag_2_11", "tag_2_12",
- "tag_3_1", "tag_3_2", "tag_3_3", "tag_3_4",
- "tag_4_1", "tag_4_2", "tag_4_3", "tag_4_4", "tag_4_5", "tag_4_6", "tag_4_7", "tag_4_8", "tag_4_9", "tag_4_10", "tag_4_11", "tag_4_12",
- "tag_5_1", "tag_5_2", "tag_5_3", "tag_5_4", "tag_5_5")
- val humanFeature: Array[String] = Array("max_ratio", "max_beauty", "is_human_body")
- def main(args: Array[String]): Unit = {
- val spark = getSparkSql
- import spark.implicits._
- val online_feature = getBaseUsersInfo(spark).toDF("uid", "item")
- val online_feature_csv = online_feature.select(online_feature("uid"),
- split(col("item"),",").getItem(0).as("age"),
- split(col("item"),",").getItem(1).as("height"),
- split(col("item"),",").getItem(2).as("weight"),
- split(col("item"),",").getItem(3).as("role"),
- split(col("item"),",").getItem(4).as("vbadge"),
- split(col("item"),",").getItem(5).as("has_photos"),
- split(col("item"),",").getItem(6).as("video_verified"),
- split(col("item"),",").getItem(7).as("is_human_face"),
- split(col("item"),",").getItem(8).as("has_description"),
- split(col("item"),",").getItem(9).as("ip_location"),
- split(col("item"),",").getItem(10).as("has_avatar"),
- split(col("item"),",").getItem(11).as("vip"),
- split(col("item"),",").getItem(12).as("followed_num"),
- split(col("item"),",").getItem(13).as("follower_num"),
- split(col("item"),",").getItem(14).as("click"),
- split(col("item"),",").getItem(15).as("clicked"),
- split(col("item"),",").getItem(16).as("show"),
- split(col("item"),",").getItem(17).as("send_session"),
- split(col("item"),",").getItem(18).as("receive_session"),
- split(col("item"),",").getItem(19).as("desc_len"),
- split(col("item"),",").getItem(20).as("online_click"),
- split(col("item"),",").getItem(21).as("online_clicked"),
- split(col("item"),",").getItem(22).as("online_show"),
- split(col("item"),",").getItem(23).as("online_showed"),
- split(col("item"),",").getItem(24).as("nearby_click"),
- split(col("item"),",").getItem(25).as("nearby_clicked"),
- split(col("item"),",").getItem(26).as("nearby_show"),
- split(col("item"),",").getItem(27).as("nearby_showed"),
- split(col("item"),",").getItem(28).as("newbie_click"),
- split(col("item"),",").getItem(29).as("newbie_clicked"),
- split(col("item"),",").getItem(30).as("newbie_show"),
- split(col("item"),",").getItem(31).as("newbie_showed"),
- split(col("item"),",").getItem(32).as("social_stay_time"),
- split(col("item"),",").getItem(33).as("visit_count"),
- split(col("item"),",").getItem(34).as("visited_count"),
- split(col("item"),",").getItem(35).as("tag_1_1"),
- split(col("item"),",").getItem(36).as("tag_1_2"),
- split(col("item"),",").getItem(37).as("tag_1_3"),
- split(col("item"),",").getItem(38).as("tag_1_4"),
- split(col("item"),",").getItem(39).as("tag_2_1"),
- split(col("item"),",").getItem(40).as("tag_2_2"),
- split(col("item"),",").getItem(41).as("tag_2_3"),
- split(col("item"),",").getItem(42).as("tag_2_4"),
- split(col("item"),",").getItem(43).as("tag_2_5"),
- split(col("item"),",").getItem(44).as("tag_2_6"),
- split(col("item"),",").getItem(45).as("tag_2_7"),
- split(col("item"),",").getItem(46).as("tag_2_8"),
- split(col("item"),",").getItem(47).as("tag_2_9"),
- split(col("item"),",").getItem(48).as("tag_2_10"),
- split(col("item"),",").getItem(49).as("tag_2_11"),
- split(col("item"),",").getItem(50).as("tag_2_12"),
- split(col("item"),",").getItem(51).as("tag_3_1"),
- split(col("item"),",").getItem(52).as("tag_3_2"),
- split(col("item"),",").getItem(53).as("tag_3_3"),
- split(col("item"),",").getItem(54).as("tag_3_4"),
- split(col("item"),",").getItem(55).as("tag_4_1"),
- split(col("item"),",").getItem(56).as("tag_4_2"),
- split(col("item"),",").getItem(57).as("tag_4_3"),
- split(col("item"),",").getItem(58).as("tag_4_4"),
- split(col("item"),",").getItem(59).as("tag_4_5"),
- split(col("item"),",").getItem(60).as("tag_4_6"),
- split(col("item"),",").getItem(61).as("tag_4_7"),
- split(col("item"),",").getItem(62).as("tag_4_8"),
- split(col("item"),",").getItem(63).as("tag_4_9"),
- split(col("item"),",").getItem(64).as("tag_4_10"),
- split(col("item"),",").getItem(65).as("tag_4_11"),
- split(col("item"),",").getItem(66).as("tag_4_12"),
- split(col("item"),",").getItem(67).as("tag_5_1"),
- split(col("item"),",").getItem(68).as("tag_5_2"),
- split(col("item"),",").getItem(69).as("tag_5_3"),
- split(col("item"),",").getItem(70).as("tag_5_4"),
- split(col("item"),",").getItem(71).as("tag_5_5"))
- online_feature_csv.write.format("csv").option("header","true").save("/data/wangtao/test/")
- spark.stop()
- }
- def getSparkSql: SparkSession = {
- val JobName="aaa"
- val parallelism="100"
- Logger.getLogger("org").setLevel(Level.WARN)
- val sparkConf = new SparkConf().setAppName(JobName)
- sparkConf.set("spark.default.parallelism", parallelism)
- sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- sparkConf.set("spark.hadoop.validateOutputSpecs", "false")
- SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
- }
- private def getBaseUsersInfo(spark: SparkSession)= {
- val columns = mutable.ArrayBuffer[String]()
- for (field <- BaseInfo) {
- columns.append("f1:%s".format(field))
- }
- for (field <- Follow) {
- columns.append("f1:%s".format(field))
- }
- for (field <- TagInfo) {
- columns.append("f1:%s".format(field))
- }
- for (field <- StatisFeature) {
- columns.append("f1:%s".format(field))
- }
- HbaseUtils.scan(spark, "online_social_feature", columns)
- .map {
- case (_, result) =>
- //通过列族和列名获取列
- val uid = Bytes.toString(result.getRow)
- // 收集数据
- val features = mutable.ArrayBuffer[String]()
- val BaseInfo: Array[String] = Array("uid", "age", "height", "weight", "role", "vbadge",
- "has_photos", "video_verified", "is_human_face","has_description", "ip_location","has_avatar",
- "vip")
- val Follow: Array[String] = Array("followed_num","follower_num")
- val StatisFeature:Array[String] = Array("click","clicked","show","send_session",
- "receive_session","desc_len","online_click","online_clicked","online_show","online_showed",
- "nearby_click","nearby_clicked","nearby_show","nearby_showed","newbie_click","newbie_clicked",
- "newbie_show","newbie_showed","social_stay_time","visit_count","visited_count")
- val TagInfo: Array[String] = Array("" +
- "tag_1_1", "tag_1_2", "tag_1_3", "tag_1_4",
- "tag_2_1", "tag_2_2", "tag_2_3", "tag_2_4", "tag_2_5", "tag_2_6", "tag_2_7", "tag_2_8", "tag_2_9", "tag_2_10", "tag_2_11", "tag_2_12",
- "tag_3_1", "tag_3_2", "tag_3_3", "tag_3_4",
- "tag_4_1", "tag_4_2", "tag_4_3", "tag_4_4", "tag_4_5", "tag_4_6", "tag_4_7", "tag_4_8", "tag_4_9", "tag_4_10", "tag_4_11", "tag_4_12",
- "tag_5_1", "tag_5_2", "tag_5_3", "tag_5_4", "tag_5_5")
- val humanFeature: Array[String] = Array("max_ratio", "max_beauty", "is_human_body")
- takeRowValue(result, features, "f1", "age", "None")
- takeRowValue(result, features, "f1", "height", "None")
- takeRowValue(result, features, "f1", "weight", "None")
- takeRowValue(result, features, "f1", "role", "-1")
- takeRowValue(result, features, "f1", "vbadge", "0")
- takeRowValue(result, features, "f1", "has_photos", "0")
- takeRowValue(result, features, "f1", "video_verified", "0")
- takeRowValue(result, features, "f1", "is_human_face", "None")
- takeRowValue(result, features, "f1", "has_description", "0")
- takeRowValue(result, features, "f1", "ip_location", "None")
- takeRowValue(result, features, "f1", "has_avatar", "None")
- takeRowValue(result, features, "f1", "vip", "None")
- takeRowValue(result, features, "f1", "followed_num", "0")
- takeRowValue(result, features, "f1", "follower_num", "0")
- takeRowValue(result, features, "f1", "click", "None")
- takeRowValue(result, features, "f1", "clicked", "None")
- takeRowValue(result, features, "f1", "show", "None")
- takeRowValue(result, features, "f1", "send_session", "None")
- takeRowValue(result, features, "f1", "receive_session", "None")
- takeRowValue(result, features, "f1", "desc_len", "None")
- takeRowValue(result, features, "f1", "online_click", "None")
- takeRowValue(result, features, "f1", "online_clicked", "None")
- takeRowValue(result, features, "f1", "online_show", "None")
- takeRowValue(result, features, "f1", "online_showed", "None")
- takeRowValue(result, features, "f1", "nearby_click", "None")
- takeRowValue(result, features, "f1", "nearby_clicked", "None")
- takeRowValue(result, features, "f1", "nearby_show", "None")
- takeRowValue(result, features, "f1", "nearby_showed", "None")
- takeRowValue(result, features, "f1", "newbie_click", "None")
- takeRowValue(result, features, "f1", "newbie_clicked", "None")
- takeRowValue(result, features, "f1", "newbie_show", "None")
- takeRowValue(result, features, "f1", "newbie_showed", "None")
- takeRowValue(result, features, "f1", "social_stay_time", "None")
- takeRowValue(result, features, "f1", "visit_count", "None")
- takeRowValue(result, features, "f1", "visited_count", "None")
- takeRowValue(result, features, "f1", "tag_1_1", "0")
- takeRowValue(result, features, "f1", "tag_1_2", "0")
- takeRowValue(result, features, "f1", "tag_1_3", "0")
- takeRowValue(result, features, "f1", "tag_1_4", "0")
- takeRowValue(result, features, "f1", "tag_2_1", "0")
- takeRowValue(result, features, "f1", "tag_2_2", "0")
- takeRowValue(result, features, "f1", "tag_2_3", "0")
- takeRowValue(result, features, "f1", "tag_2_4", "0")
- takeRowValue(result, features, "f1", "tag_2_5", "0")
- takeRowValue(result, features, "f1", "tag_2_6", "0")
- takeRowValue(result, features, "f1", "tag_2_7", "0")
- takeRowValue(result, features, "f1", "tag_2_8", "0")
- takeRowValue(result, features, "f1", "tag_2_9", "0")
- takeRowValue(result, features, "f1", "tag_2_10", "0")
- takeRowValue(result, features, "f1", "tag_2_11", "0")
- takeRowValue(result, features, "f1", "tag_2_12", "0")
- takeRowValue(result, features, "f1", "tag_3_1", "0")
- takeRowValue(result, features, "f1", "tag_3_2", "0")
- takeRowValue(result, features, "f1", "tag_3_3", "0")
- takeRowValue(result, features, "f1", "tag_3_4", "0")
- takeRowValue(result, features, "f1", "tag_4_1", "0")
- takeRowValue(result, features, "f1", "tag_4_2", "0")
- takeRowValue(result, features, "f1", "tag_4_3", "0")
- takeRowValue(result, features, "f1", "tag_4_4", "0")
- takeRowValue(result, features, "f1", "tag_4_5", "0")
- takeRowValue(result, features, "f1", "tag_4_6", "0")
- takeRowValue(result, features, "f1", "tag_4_7", "0")
- takeRowValue(result, features, "f1", "tag_4_8", "0")
- takeRowValue(result, features, "f1", "tag_4_9", "0")
- takeRowValue(result, features, "f1", "tag_4_10", "0")
- takeRowValue(result, features, "f1", "tag_4_11", "0")
- takeRowValue(result, features, "f1", "tag_4_12", "0")
- takeRowValue(result, features, "f1", "tag_5_1", "0")
- takeRowValue(result, features, "f1", "tag_5_2", "0")
- takeRowValue(result, features, "f1", "tag_5_3", "0")
- takeRowValue(result, features, "f1", "tag_5_4", "0")
- takeRowValue(result, features, "f1", "tag_5_5", "0")
- // uid + "," + features.mkString(",")
- (uid,features.mkString(","))
- }
- }
- def takeRowValue(result: Result, features: mutable.ArrayBuffer[String],
- cf: String, field: String, default: String): Unit = {
- var value = Bytes.toString(result.getValue(cf.getBytes, field.getBytes))
- if (filedValueIsEmpty(value)) {
- value = default
- }
- features.append(s"$value")
- }
- def filedValueIsEmpty(value: String): Boolean = {
- if (value == null || StringUtils.isEmpty(value) || value == "NULL" || value == "null" || value == "None")
- return true
- false
- }
- }
spark读取hbase数据相关推荐
- mongodb数据导入hbase,spark读取hbase数据分析
为什么80%的码农都做不了架构师?>>> 使用mavn管理相关依赖包pom.xml <project xmlns="http://maven.apache.or ...
- Spark读取Hive数据的两种方式与保存数据到HDFS
Spark读取Hive数据的两种方式与保存数据到HDFS Spark读取Hive数据的方式主要有两种 1. 通过访问hive metastore的方式,这种方式通过访问hive的metastore元数 ...
- kettle读取hbase数据
一,说明 kettle不仅可以处理传统的数据库数据或文件,也对大数据集群有很好的支持,本文提供一个kettle读取hbase列式数据库的例子,本例中,大数据集群使用的是CDH5.14,kettle使用 ...
- spark读Hbase数据集成Hbase Filter(过滤器)
文章目录 过滤器简介 spark 读Hbase集成Filter TableInputFormat 源码 代码示例 基于hbase版本2.3.5 过滤器简介 Hbase 提供了种类丰富的过滤器(filt ...
- FlinkSQL读取Hbase数据
概述 最近项目中用到flink进行实时计算,流程为从kafka读取数据,如果是维度数据,则插入到hbase中,如果是需要实时计算的数据, 则进行实时计算,并将计算结果保存到MySQL中.在实时计算过程 ...
- java读avro的流_0016-Avro序列化反序列化和Spark读取Avro数据
1.简介 本篇文章主要讲如何使用java生成Avro格式数据以及如何通过spark将Avro数据文件转换成DataSet和DataFrame进行操作. 1.1Apache Arvo是什么? Apach ...
- 0016-Avro序列化反序列化和Spark读取Avro数据
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看. 1.简介 本篇文章主要讲如何使用java生成Avro格式数据以及如何通过spark将Avro数据文件转换成DataSet和DataFra ...
- Spark学习:spark读取HBase数据报异常java.io.NotSerializableException
1.准备工作,安装好HABSE之后,执行Hbase shell create '表名称', '列名称1','列名称2','列名称N' create '表名称','列族名称' 在hbase中列是可以动态 ...
- spark 读取elasticsearch数据
添加依赖 <dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsear ...
最新文章
- linux两个文件修改主机名
- 【转】Linux命令工具 top详解
- linux id 命令 显示用户id和组id信息
- Windows server 2008 R2远程桌面3389端口号修改
- 106. Leetcode 122. 买卖股票的最佳时机 II (动态规划-股票交易)
- UNITY 带spriterender的对象导出为prefab时主贴图丢失的BUG
- 肝货满满!CV学习笔记:入坑必备
- AI+AR如何提升花椒直播的体验?
- 非抢占式优先算法例题_非抢占短作业优先算法源代码(C语言)
- 产品经理和程序员之间的“潜台词”,你能听懂多少?
- 一名老程序员的一点感悟给未来的程序员
- idea js检查太卡_IntelliJ IDEA抑制、禁用与启用检查
- python对数组排序代码实现_Python自定义类的数组排序实现代码
- 元宇宙大杀器来了!小扎祭出4款VR头显,挑战视觉图灵测试
- 什么是L2十档行情API接口
- [OSGI] Felix基本环境搭建及操作
- 三个月速成Java--一些小建议和感概
- B站首页(推荐页)分析
- stdin,stderr,stdout
- 国外物联网平台大盘点
热门文章
- 详细解析Java中抽象类和接口的区别
- flex----导航
- linux 查看文件夹大小 du命令
- 图书大甩卖(操作系统、C语言、Linux) 已成交
- 游戏外挂设计技术探讨
- 让你的应用程序支持高对比度模式
- 对结构体变量进行清零操作
- python基础学习[python编程从入门到实践读书笔记(连载五)]:数据可视化项目第16章
- Leetcode1703. 得到连续 K 个 1 的最少相邻交换次数[C++题解]:难(货仓选址加强版+滑动窗口+前缀和)
- cmd中如何切换指定目录