1. import org.apache.hadoop.hbase.HBaseConfiguration
  2. import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  3. import org.apache.spark.sql.SparkSession
  4. import scala.collection.mutable
  5. object HbaseUtils {
  6. /**
  7. * 指定HBase 表 字段 开始结束 rowkey scan
  8. *
  9. * @param spark
  10. * @param tableName
  11. * @param columns
  12. * @param startRow
  13. * @param endRow
  14. * @return
  15. */
  16. def scan(spark: SparkSession, tableName: String, columns: mutable.ArrayBuffer[String], startRow: String = null, endRow: String = null) = {
  17. val scan_columns = columns.mkString(" ")
  18. val scanConf = HBaseConfiguration.create()
  19. scanConf.set(TableInputFormat.INPUT_TABLE, tableName)
  20. if(columns.length == 0){
  21. scanConf.set(TableInputFormat.SCAN_COLUMNS, scan_columns)
  22. }
  23. if(StringUtils.isNotEmpty(startRow)){
  24. scanConf.set(TableInputFormat.SCAN_ROW_START, startRow)
  25. }
  26. if(StringUtils.isNotEmpty(endRow)){
  27. scanConf.set(TableInputFormat.SCAN_ROW_STOP, endRow)
  28. }
  29. scanConf.set("mapreduce.task.timeout", "1200000")
  30. scanConf.set("hbase.client.scanner.timeout.period", "600000")
  31. scanConf.set("hbase.rpc.timeout", "600000")
  32. val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(scanConf, classOf[TableInputFormat],
  33. classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  34. classOf[org.apache.hadoop.hbase.client.Result])
  35. hBaseRDD
  36. }
  37. }
  38. import org.apache.hadoop.hbase.client.Result
  39. import org.apache.hadoop.hbase.util.Bytes
  40. import org.apache.spark.SparkConf
  41. import org.apache.spark.sql.SparkSession
  42. import org.apache.log4j.{Level, Logger}
  43. import scala.collection.mutable
  44. import org.apache.spark.sql.functions.{col, split}
  45. object MakeBaseInfo {
  46. val BaseInfo: Array[String] = Array("uid", "age", "height", "weight", "role", "vbadge",
  47. "has_photos", "video_verified", "is_human_face","has_description", "ip_location","has_avatar",
  48. "vip")
  49. val Follow: Array[String] = Array("followed_num", "followed_with_time", "follower_num")
  50. val StatisFeature:Array[String] = Array("click","clicked","show","send_session",
  51. "receive_session","desc_len","online_click","online_clicked","online_show","online_showed",
  52. "nearby_click","nearby_clicked","nearby_show","nearby_showed","newbie_click","newbie_clicked",
  53. "newbie_show","newbie_showed","social_stay_time","visit_count","visited_count")
  54. val TagInfo: Array[String] = Array("" +
  55. "tag_1_1", "tag_1_2", "tag_1_3", "tag_1_4",
  56. "tag_2_1", "tag_2_2", "tag_2_3", "tag_2_4", "tag_2_5", "tag_2_6", "tag_2_7", "tag_2_8", "tag_2_9", "tag_2_10", "tag_2_11", "tag_2_12",
  57. "tag_3_1", "tag_3_2", "tag_3_3", "tag_3_4",
  58. "tag_4_1", "tag_4_2", "tag_4_3", "tag_4_4", "tag_4_5", "tag_4_6", "tag_4_7", "tag_4_8", "tag_4_9", "tag_4_10", "tag_4_11", "tag_4_12",
  59. "tag_5_1", "tag_5_2", "tag_5_3", "tag_5_4", "tag_5_5")
  60. val humanFeature: Array[String] = Array("max_ratio", "max_beauty", "is_human_body")
  61. def main(args: Array[String]): Unit = {
  62. val spark = getSparkSql
  63. import spark.implicits._
  64. val online_feature = getBaseUsersInfo(spark).toDF("uid", "item")
  65. val online_feature_csv = online_feature.select(online_feature("uid"),
  66. split(col("item"),",").getItem(0).as("age"),
  67. split(col("item"),",").getItem(1).as("height"),
  68. split(col("item"),",").getItem(2).as("weight"),
  69. split(col("item"),",").getItem(3).as("role"),
  70. split(col("item"),",").getItem(4).as("vbadge"),
  71. split(col("item"),",").getItem(5).as("has_photos"),
  72. split(col("item"),",").getItem(6).as("video_verified"),
  73. split(col("item"),",").getItem(7).as("is_human_face"),
  74. split(col("item"),",").getItem(8).as("has_description"),
  75. split(col("item"),",").getItem(9).as("ip_location"),
  76. split(col("item"),",").getItem(10).as("has_avatar"),
  77. split(col("item"),",").getItem(11).as("vip"),
  78. split(col("item"),",").getItem(12).as("followed_num"),
  79. split(col("item"),",").getItem(13).as("follower_num"),
  80. split(col("item"),",").getItem(14).as("click"),
  81. split(col("item"),",").getItem(15).as("clicked"),
  82. split(col("item"),",").getItem(16).as("show"),
  83. split(col("item"),",").getItem(17).as("send_session"),
  84. split(col("item"),",").getItem(18).as("receive_session"),
  85. split(col("item"),",").getItem(19).as("desc_len"),
  86. split(col("item"),",").getItem(20).as("online_click"),
  87. split(col("item"),",").getItem(21).as("online_clicked"),
  88. split(col("item"),",").getItem(22).as("online_show"),
  89. split(col("item"),",").getItem(23).as("online_showed"),
  90. split(col("item"),",").getItem(24).as("nearby_click"),
  91. split(col("item"),",").getItem(25).as("nearby_clicked"),
  92. split(col("item"),",").getItem(26).as("nearby_show"),
  93. split(col("item"),",").getItem(27).as("nearby_showed"),
  94. split(col("item"),",").getItem(28).as("newbie_click"),
  95. split(col("item"),",").getItem(29).as("newbie_clicked"),
  96. split(col("item"),",").getItem(30).as("newbie_show"),
  97. split(col("item"),",").getItem(31).as("newbie_showed"),
  98. split(col("item"),",").getItem(32).as("social_stay_time"),
  99. split(col("item"),",").getItem(33).as("visit_count"),
  100. split(col("item"),",").getItem(34).as("visited_count"),
  101. split(col("item"),",").getItem(35).as("tag_1_1"),
  102. split(col("item"),",").getItem(36).as("tag_1_2"),
  103. split(col("item"),",").getItem(37).as("tag_1_3"),
  104. split(col("item"),",").getItem(38).as("tag_1_4"),
  105. split(col("item"),",").getItem(39).as("tag_2_1"),
  106. split(col("item"),",").getItem(40).as("tag_2_2"),
  107. split(col("item"),",").getItem(41).as("tag_2_3"),
  108. split(col("item"),",").getItem(42).as("tag_2_4"),
  109. split(col("item"),",").getItem(43).as("tag_2_5"),
  110. split(col("item"),",").getItem(44).as("tag_2_6"),
  111. split(col("item"),",").getItem(45).as("tag_2_7"),
  112. split(col("item"),",").getItem(46).as("tag_2_8"),
  113. split(col("item"),",").getItem(47).as("tag_2_9"),
  114. split(col("item"),",").getItem(48).as("tag_2_10"),
  115. split(col("item"),",").getItem(49).as("tag_2_11"),
  116. split(col("item"),",").getItem(50).as("tag_2_12"),
  117. split(col("item"),",").getItem(51).as("tag_3_1"),
  118. split(col("item"),",").getItem(52).as("tag_3_2"),
  119. split(col("item"),",").getItem(53).as("tag_3_3"),
  120. split(col("item"),",").getItem(54).as("tag_3_4"),
  121. split(col("item"),",").getItem(55).as("tag_4_1"),
  122. split(col("item"),",").getItem(56).as("tag_4_2"),
  123. split(col("item"),",").getItem(57).as("tag_4_3"),
  124. split(col("item"),",").getItem(58).as("tag_4_4"),
  125. split(col("item"),",").getItem(59).as("tag_4_5"),
  126. split(col("item"),",").getItem(60).as("tag_4_6"),
  127. split(col("item"),",").getItem(61).as("tag_4_7"),
  128. split(col("item"),",").getItem(62).as("tag_4_8"),
  129. split(col("item"),",").getItem(63).as("tag_4_9"),
  130. split(col("item"),",").getItem(64).as("tag_4_10"),
  131. split(col("item"),",").getItem(65).as("tag_4_11"),
  132. split(col("item"),",").getItem(66).as("tag_4_12"),
  133. split(col("item"),",").getItem(67).as("tag_5_1"),
  134. split(col("item"),",").getItem(68).as("tag_5_2"),
  135. split(col("item"),",").getItem(69).as("tag_5_3"),
  136. split(col("item"),",").getItem(70).as("tag_5_4"),
  137. split(col("item"),",").getItem(71).as("tag_5_5"))
  138. online_feature_csv.write.format("csv").option("header","true").save("/data/wangtao/test/")
  139. spark.stop()
  140. }
  141. def getSparkSql: SparkSession = {
  142. val JobName="aaa"
  143. val parallelism="100"
  144. Logger.getLogger("org").setLevel(Level.WARN)
  145. val sparkConf = new SparkConf().setAppName(JobName)
  146. sparkConf.set("spark.default.parallelism", parallelism)
  147. sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  148. sparkConf.set("spark.hadoop.validateOutputSpecs", "false")
  149. SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
  150. }
  151. private def getBaseUsersInfo(spark: SparkSession)= {
  152. val columns = mutable.ArrayBuffer[String]()
  153. for (field <- BaseInfo) {
  154. columns.append("f1:%s".format(field))
  155. }
  156. for (field <- Follow) {
  157. columns.append("f1:%s".format(field))
  158. }
  159. for (field <- TagInfo) {
  160. columns.append("f1:%s".format(field))
  161. }
  162. for (field <- StatisFeature) {
  163. columns.append("f1:%s".format(field))
  164. }
  165. HbaseUtils.scan(spark, "online_social_feature", columns)
  166. .map {
  167. case (_, result) =>
  168. //通过列族和列名获取列
  169. val uid = Bytes.toString(result.getRow)
  170. // 收集数据
  171. val features = mutable.ArrayBuffer[String]()
  172. val BaseInfo: Array[String] = Array("uid", "age", "height", "weight", "role", "vbadge",
  173. "has_photos", "video_verified", "is_human_face","has_description", "ip_location","has_avatar",
  174. "vip")
  175. val Follow: Array[String] = Array("followed_num","follower_num")
  176. val StatisFeature:Array[String] = Array("click","clicked","show","send_session",
  177. "receive_session","desc_len","online_click","online_clicked","online_show","online_showed",
  178. "nearby_click","nearby_clicked","nearby_show","nearby_showed","newbie_click","newbie_clicked",
  179. "newbie_show","newbie_showed","social_stay_time","visit_count","visited_count")
  180. val TagInfo: Array[String] = Array("" +
  181. "tag_1_1", "tag_1_2", "tag_1_3", "tag_1_4",
  182. "tag_2_1", "tag_2_2", "tag_2_3", "tag_2_4", "tag_2_5", "tag_2_6", "tag_2_7", "tag_2_8", "tag_2_9", "tag_2_10", "tag_2_11", "tag_2_12",
  183. "tag_3_1", "tag_3_2", "tag_3_3", "tag_3_4",
  184. "tag_4_1", "tag_4_2", "tag_4_3", "tag_4_4", "tag_4_5", "tag_4_6", "tag_4_7", "tag_4_8", "tag_4_9", "tag_4_10", "tag_4_11", "tag_4_12",
  185. "tag_5_1", "tag_5_2", "tag_5_3", "tag_5_4", "tag_5_5")
  186. val humanFeature: Array[String] = Array("max_ratio", "max_beauty", "is_human_body")
  187. takeRowValue(result, features, "f1", "age", "None")
  188. takeRowValue(result, features, "f1", "height", "None")
  189. takeRowValue(result, features, "f1", "weight", "None")
  190. takeRowValue(result, features, "f1", "role", "-1")
  191. takeRowValue(result, features, "f1", "vbadge", "0")
  192. takeRowValue(result, features, "f1", "has_photos", "0")
  193. takeRowValue(result, features, "f1", "video_verified", "0")
  194. takeRowValue(result, features, "f1", "is_human_face", "None")
  195. takeRowValue(result, features, "f1", "has_description", "0")
  196. takeRowValue(result, features, "f1", "ip_location", "None")
  197. takeRowValue(result, features, "f1", "has_avatar", "None")
  198. takeRowValue(result, features, "f1", "vip", "None")
  199. takeRowValue(result, features, "f1", "followed_num", "0")
  200. takeRowValue(result, features, "f1", "follower_num", "0")
  201. takeRowValue(result, features, "f1", "click", "None")
  202. takeRowValue(result, features, "f1", "clicked", "None")
  203. takeRowValue(result, features, "f1", "show", "None")
  204. takeRowValue(result, features, "f1", "send_session", "None")
  205. takeRowValue(result, features, "f1", "receive_session", "None")
  206. takeRowValue(result, features, "f1", "desc_len", "None")
  207. takeRowValue(result, features, "f1", "online_click", "None")
  208. takeRowValue(result, features, "f1", "online_clicked", "None")
  209. takeRowValue(result, features, "f1", "online_show", "None")
  210. takeRowValue(result, features, "f1", "online_showed", "None")
  211. takeRowValue(result, features, "f1", "nearby_click", "None")
  212. takeRowValue(result, features, "f1", "nearby_clicked", "None")
  213. takeRowValue(result, features, "f1", "nearby_show", "None")
  214. takeRowValue(result, features, "f1", "nearby_showed", "None")
  215. takeRowValue(result, features, "f1", "newbie_click", "None")
  216. takeRowValue(result, features, "f1", "newbie_clicked", "None")
  217. takeRowValue(result, features, "f1", "newbie_show", "None")
  218. takeRowValue(result, features, "f1", "newbie_showed", "None")
  219. takeRowValue(result, features, "f1", "social_stay_time", "None")
  220. takeRowValue(result, features, "f1", "visit_count", "None")
  221. takeRowValue(result, features, "f1", "visited_count", "None")
  222. takeRowValue(result, features, "f1", "tag_1_1", "0")
  223. takeRowValue(result, features, "f1", "tag_1_2", "0")
  224. takeRowValue(result, features, "f1", "tag_1_3", "0")
  225. takeRowValue(result, features, "f1", "tag_1_4", "0")
  226. takeRowValue(result, features, "f1", "tag_2_1", "0")
  227. takeRowValue(result, features, "f1", "tag_2_2", "0")
  228. takeRowValue(result, features, "f1", "tag_2_3", "0")
  229. takeRowValue(result, features, "f1", "tag_2_4", "0")
  230. takeRowValue(result, features, "f1", "tag_2_5", "0")
  231. takeRowValue(result, features, "f1", "tag_2_6", "0")
  232. takeRowValue(result, features, "f1", "tag_2_7", "0")
  233. takeRowValue(result, features, "f1", "tag_2_8", "0")
  234. takeRowValue(result, features, "f1", "tag_2_9", "0")
  235. takeRowValue(result, features, "f1", "tag_2_10", "0")
  236. takeRowValue(result, features, "f1", "tag_2_11", "0")
  237. takeRowValue(result, features, "f1", "tag_2_12", "0")
  238. takeRowValue(result, features, "f1", "tag_3_1", "0")
  239. takeRowValue(result, features, "f1", "tag_3_2", "0")
  240. takeRowValue(result, features, "f1", "tag_3_3", "0")
  241. takeRowValue(result, features, "f1", "tag_3_4", "0")
  242. takeRowValue(result, features, "f1", "tag_4_1", "0")
  243. takeRowValue(result, features, "f1", "tag_4_2", "0")
  244. takeRowValue(result, features, "f1", "tag_4_3", "0")
  245. takeRowValue(result, features, "f1", "tag_4_4", "0")
  246. takeRowValue(result, features, "f1", "tag_4_5", "0")
  247. takeRowValue(result, features, "f1", "tag_4_6", "0")
  248. takeRowValue(result, features, "f1", "tag_4_7", "0")
  249. takeRowValue(result, features, "f1", "tag_4_8", "0")
  250. takeRowValue(result, features, "f1", "tag_4_9", "0")
  251. takeRowValue(result, features, "f1", "tag_4_10", "0")
  252. takeRowValue(result, features, "f1", "tag_4_11", "0")
  253. takeRowValue(result, features, "f1", "tag_4_12", "0")
  254. takeRowValue(result, features, "f1", "tag_5_1", "0")
  255. takeRowValue(result, features, "f1", "tag_5_2", "0")
  256. takeRowValue(result, features, "f1", "tag_5_3", "0")
  257. takeRowValue(result, features, "f1", "tag_5_4", "0")
  258. takeRowValue(result, features, "f1", "tag_5_5", "0")
  259. //          uid + "," + features.mkString(",")
  260. (uid,features.mkString(","))
  261. }
  262. }
  263. def takeRowValue(result: Result, features: mutable.ArrayBuffer[String],
  264. cf: String, field: String, default: String): Unit = {
  265. var value = Bytes.toString(result.getValue(cf.getBytes, field.getBytes))
  266. if (filedValueIsEmpty(value)) {
  267. value = default
  268. }
  269. features.append(s"$value")
  270. }
  271. def filedValueIsEmpty(value: String): Boolean = {
  272. if (value == null || StringUtils.isEmpty(value) || value == "NULL" || value == "null" || value == "None")
  273. return true
  274. false
  275. }
  276. }

spark读取hbase数据相关推荐

  1. mongodb数据导入hbase,spark读取hbase数据分析

    为什么80%的码农都做不了架构师?>>>    使用mavn管理相关依赖包pom.xml <project xmlns="http://maven.apache.or ...

  2. Spark读取Hive数据的两种方式与保存数据到HDFS

    Spark读取Hive数据的两种方式与保存数据到HDFS Spark读取Hive数据的方式主要有两种 1. 通过访问hive metastore的方式,这种方式通过访问hive的metastore元数 ...

  3. kettle读取hbase数据

    一,说明 kettle不仅可以处理传统的数据库数据或文件,也对大数据集群有很好的支持,本文提供一个kettle读取hbase列式数据库的例子,本例中,大数据集群使用的是CDH5.14,kettle使用 ...

  4. spark读Hbase数据集成Hbase Filter(过滤器)

    文章目录 过滤器简介 spark 读Hbase集成Filter TableInputFormat 源码 代码示例 基于hbase版本2.3.5 过滤器简介 Hbase 提供了种类丰富的过滤器(filt ...

  5. FlinkSQL读取Hbase数据

    概述 最近项目中用到flink进行实时计算,流程为从kafka读取数据,如果是维度数据,则插入到hbase中,如果是需要实时计算的数据, 则进行实时计算,并将计算结果保存到MySQL中.在实时计算过程 ...

  6. java读avro的流_0016-Avro序列化反序列化和Spark读取Avro数据

    1.简介 本篇文章主要讲如何使用java生成Avro格式数据以及如何通过spark将Avro数据文件转换成DataSet和DataFrame进行操作. 1.1Apache Arvo是什么? Apach ...

  7. 0016-Avro序列化反序列化和Spark读取Avro数据

    温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看. 1.简介 本篇文章主要讲如何使用java生成Avro格式数据以及如何通过spark将Avro数据文件转换成DataSet和DataFra ...

  8. Spark学习:spark读取HBase数据报异常java.io.NotSerializableException

    1.准备工作,安装好HABSE之后,执行Hbase shell create '表名称', '列名称1','列名称2','列名称N' create '表名称','列族名称' 在hbase中列是可以动态 ...

  9. spark 读取elasticsearch数据

    添加依赖 <dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsear ...

最新文章

  1. linux两个文件修改主机名
  2. 【转】Linux命令工具 top详解
  3. linux id 命令 显示用户id和组id信息
  4. Windows server 2008 R2远程桌面3389端口号修改
  5. 106. Leetcode 122. 买卖股票的最佳时机 II (动态规划-股票交易)
  6. UNITY 带spriterender的对象导出为prefab时主贴图丢失的BUG
  7. 肝货满满!CV学习笔记:入坑必备
  8. AI+AR如何提升花椒直播的体验?
  9. 非抢占式优先算法例题_非抢占短作业优先算法源代码(C语言)
  10. 产品经理和程序员之间的“潜台词”,你能听懂多少?
  11. 一名老程序员的一点感悟给未来的程序员
  12. idea js检查太卡_IntelliJ IDEA抑制、禁用与启用检查
  13. python对数组排序代码实现_Python自定义类的数组排序实现代码
  14. 元宇宙大杀器来了!小扎祭出4款VR头显,挑战视觉图灵测试
  15. 什么是L2十档行情API接口
  16. [OSGI] Felix基本环境搭建及操作
  17. 三个月速成Java--一些小建议和感概
  18. B站首页(推荐页)分析
  19. stdin,stderr,stdout
  20. 国外物联网平台大盘点

热门文章

  1. 详细解析Java中抽象类和接口的区别
  2. flex----导航
  3. linux 查看文件夹大小 du命令
  4. 图书大甩卖(操作系统、C语言、Linux) 已成交
  5. 游戏外挂设计技术探讨
  6. 让你的应用程序支持高对比度模式
  7. 对结构体变量进行清零操作
  8. python基础学习[python编程从入门到实践读书笔记(连载五)]:数据可视化项目第16章
  9. Leetcode1703. 得到连续 K 个 1 的最少相邻交换次数[C++题解]:难(货仓选址加强版+滑动窗口+前缀和)
  10. cmd中如何切换指定目录