在上一篇博文中遗留了一个问题,就是只能处理DataFrame 的一行一列,虽然给出一个折中的办法处理多个列,但是对于字段多的DataFrame却略显臃肿,经过我的研究,实现了从一个列族、一个列到一个列族、多个列扩展。

此文章再此记录实现方法

实现思路:

保存为HFile的关键是下面这个方法saveAsNewAPIHadoopFile(save_path,      classOf[ImmutableBytesWritable],      classOf[KeyValue],      classOf[HFileOutputFormat2],      job.getConfiguration)

要使用这个方法就要保证最后的结果数据需要是RDD[(ImmutableBytesWritable, KeyValue)]类型的,所以这就是我们努力前进的方向。在这个过程中有几个问题需要解决

1. 如何一次处理DataFrame 的众多字段val result1: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = resultDataFrame

.map(row => {        var kvlist: Seq[KeyValue] = List()        var rowkey: Array[Byte] = null

var cn: Array[Byte] = null

var v: Array[Byte] = null

var kv: KeyValue = null

val cf: Array[Byte] = clounmFamily.getBytes //列族

rowkey = Bytes.toBytes(row.getAs[String]("key")) //key

for (i

cn = columnsName(i).getBytes() //列的名称

v = Bytes.toBytes(row.getAs[String](columnsName(i))) //列的值

//将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key

kv = new KeyValue(rowkey, cf, cn, v) //封装一下 rowkey, cf, clounmVale, value

//

kvlist = kvlist :+ kv //将新的kv加在kvlist后面(不能反 需要整体有序)

}

(new ImmutableBytesWritable(rowkey), kvlist)

})上述代码中通过map取出每一行row,用一个for循环通过所有字段的名称(去除掉“key”这个字段)对每个字段进行封装处理,每处理完一个字段加入kvlist。

在此处有个地方需要注意的是,我们要保证 kvlist 里面的数据整体有序(升序),这里的有序由字段名称排序和加入 kvlist 的位置来保证,kvlist 通过 :+ 将后一个数据放在List的后面,至于字段名称排序在后面说明。

至于此处为什么要去除掉key,这是因为我默认DataFrame第一个字段就是key,因为需要对所有字段名称进行排序,如果不把key拿出来后续不知道key在哪里了,如果按照正常走下去,key值也会被当成value被保存一次,这显然不符合我们的要求,当然有兴趣的同学可以自己实现更全面的方法。

2. 如何对DataFrame 的所有字段名排序var columnsName: Array[String] = resultDataFrame.columns //获取列名 第一个为key

columnsName = columnsName.drop(1).sorted //把key去掉  因为要排序通过resultDataFrame.columns获取所有列名,通过drop(1)删掉“key”,(序号从1开始)

通过sorted 对列名进行排序,默认就是升序的

通过上面方法处理后数据类型是

RDD[(ImmutableBytesWritable, Seq[KeyValue])]

这显然不是我们需要的,但是距离

RDD[(ImmutableBytesWritable, KeyValue)]

已然不远矣

3. 如何将value的Seq[KeyValue] 穿换成 KeyValueval result: RDD[(ImmutableBytesWritable, KeyValue)] = result1.flatMapValues(s => {

s.iterator

})这点其实很简单,但是脑子当时短路还纠结很久,直接用flatMapValues这个方法即可,最后处理出来的就是我们的目标RDD[(ImmutableBytesWritable, KeyValue)]

4. 目标路径已经存在怎么办/**

* 删除hdfs下的文件

*

* @param url 需要删除的路径

*/

def delete_hdfspath(url: String) {

val hdfs: FileSystem = FileSystem.get(new Configuration)

val path: Path = new Path(url)    if (hdfs.exists(path)) {

val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)

hdfs.delete(path, true)

}

}存在就删除呗,新建个方法delete_hdfspath将路径删除即可

5. 如何生成 HFile 和 load 数据到Hbase

执行方法saveAsNewAPIHadoopFile()生成HFile

注意:此处要对key进行排序(升序)//保存数据

result

.sortBy(x => x._1, true) //要保持 整体有序

.saveAsNewAPIHadoopFile(save_path,

classOf[ImmutableBytesWritable],

classOf[KeyValue],

classOf[HFileOutputFormat2],

job.getConfiguration)

load 数据到Hbase

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile iptv:spark_test

过程中出现的问题DataFrame 字段名称没有排序处理18/10/15 14:19:32 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 2.0 (TID 3, iptve2e03): java.io.IOException: Added a key not lexically larger than previous.

Current cell = 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ/cf_info:area_code/1539584366048/Put/vlen=5/seqid=0,

lastCell = 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ/cf_info:dict_id/1539584366048/Put/vlen=2/seqid=0

上面的意思是当前列名cf_info:area_code比前一个列名cf_info:dict_id小,这就是为什么需要对列名排序的原因,同时还要把key删除掉,因为不删除会出现cf_info:key这个列

完整代码

依赖:sbtlibraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2"libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2"libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2"libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2"libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2"libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2"libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2"libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.38"libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.2"libraryDependencies += "com.yammer.metrics" % "metrics-core" % "2.2.0"

关键代码import java.text.SimpleDateFormatimport java.util.{Calendar, Date}import com.iptv.domain.DatePatternimport com.iptv.job.JobBaseimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs.{FileSystem, Path}import org.apache.hadoop.fs.permission.{FsAction, FsPermission}import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue}import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2import org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.mapreduce.Jobimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.functions.{concat, lit}import org.apache.spark.sql.{DataFrame, SQLContext}import org.apache.spark.{SparkConf, SparkContext}  /**

* 将DataFrame 保存为 HFile

*

* @param resultDataFrame 需要保存为HFile的 DataFrame,DataFrame的第一个字段必须为"key"

* @param clounmFamily 列族名称(必须在Hbase中存在,否则在load数据的时候会失败)

* @param save_path HFile的保存路径

*/

def saveASHfFile(resultDataFrame: DataFrame, clounmFamily: String, save_path: String): Unit = {

val conf: Configuration = HBaseConfiguration.create()

lazy val job = Job.getInstance(conf)

job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) //设置MapOutput Key Value 的数据类型

job.setMapOutputValueClass(classOf[KeyValue])    var columnsName: Array[String] = resultDataFrame.columns //获取列名 第一个为key

columnsName = columnsName.drop(1).sorted //把key去掉  因为要排序

val result1: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = resultDataFrame

.map(row => {        var kvlist: Seq[KeyValue] = List()        var rowkey: Array[Byte] = null

var cn: Array[Byte] = null

var v: Array[Byte] = null

var kv: KeyValue = null

val cf: Array[Byte] = clounmFamily.getBytes //列族

rowkey = Bytes.toBytes(row.getAs[String]("key")) //key

for (i

cn = columnsName(i).getBytes() //列的名称

v = Bytes.toBytes(row.getAs[String](columnsName(i))) //列的值

//将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key

kv = new KeyValue(rowkey, cf, cn, v) //封装一下 rowkey, cf, clounmVale, value

//

kvlist = kvlist :+ kv //将新的kv加在kvlist后面(不能反 需要整体有序)

}

(new ImmutableBytesWritable(rowkey), kvlist)

})

delete_hdfspath(save_path) //删除save_path 原来的数据

//RDD[(ImmutableBytesWritable, Seq[KeyValue])] 转换成 RDD[(ImmutableBytesWritable, KeyValue)]

val result: RDD[(ImmutableBytesWritable, KeyValue)] = result1.flatMapValues(s => {

s.iterator

})    //保存数据

result

.sortBy(x => x._1, true) //要保持 整体有序

.saveAsNewAPIHadoopFile(save_path,

classOf[ImmutableBytesWritable],

classOf[KeyValue],

classOf[HFileOutputFormat2],

job.getConfiguration)

}  /**

* 删除hdfs下的文件

* @param url 需要删除的路径

*/

def delete_hdfspath(url: String) {

val hdfs: FileSystem = FileSystem.get(new Configuration)

val path: Path = new Path(url)    if (hdfs.exists(path)) {

val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)

hdfs.delete(path, true)

}

}

使用示例package com.iptv.job.basedataimport com.iptv.job.JobBaseimport org.apache.spark.sql.functions.{concat, lit}import org.apache.spark.sql.{DataFrame, SQLContext}import org.apache.spark.{SparkConf, SparkContext}/**

* @author 利伊奥克儿-lillcol

*         2018/10/14-11:08

*

*/object TestHFile extends JobBase {  var hdfsPath: String = ""

var proPath: String = ""

var DATE: String = ""

val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)

val sc: SparkContext = new SparkContext(sparkConf)

val sqlContext: SQLContext = getSQLContext(sc)  import sqlContext.implicits._

def main(args: Array[String]): Unit = {

hdfsPath = args(0)

proPath = args(1)    //HFile保存路径

val save_path: String = hdfsPath + "zzzHFile"

//获取测试DataFrame

val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "DIM_SYS_CITY_DICT", proPath)

val resultDataFrame: DataFrame = dim_sys_city_dict

.select(concat($"city_id", lit("_"), $"city_name", lit("_"), $"city_code").as("key"), $"*")    //注:resultDataFrame 里面的 key 要放在第一位,因为后面需要对字段名排序

saveASHfFile(resultDataFrame, "cf_info", save_path)

}

}

上述读取mysql数据为DataFrame的放大可以参考

Spark:读取mysql数据作为DataFrame

此为个人工作过程中的总结,转载请标出处!!!!!

作者:利伊奥克儿

链接:https://www.jianshu.com/p/f19df831534b

java创建hbase多个列族_Spark:DataFrame写HFile (Hbase)一个列族、一个列扩展一个列族、多个列...相关推荐

  1. 列式存储的分布式数据库——HBase Shell与SQL实战操作(HBase Master高可用实现)

    文章目录 一.前言 二.命令行操作(hbase shell) 1)连接HBase 2)创建表(create) 3)添加数据(put) 4)查询数据(scan ) 5)获取单行数据(get) 6)禁用/ ...

  2. hbase中列簇和列_为什么不建议在hbase中使用过多的列簇

    我们知道,hbase表可以设置一个至多个列簇(column families),但是为什么说越少的列簇越好呢? 官网原文: HBase currently does not do well with ...

  3. phoenix创建索引报错“ Mutable secondary indexes must have the hbase.regionserver.wal.codec property”

    phoenix 创建hbase表索引时异常,报错如下 Error: ERROR 1029 (42Y88): Mutable secondary indexes must have the hbase. ...

  4. java创建型_Java创建型模式

    Java创建型模式 在软件工程中,创建型模式是处理对象创建的设计模式,试图根据实际情况使用合适的方式创建对象.基本的对象创建方式可能会导致设计上的问题,或增加设计的复杂度.创建型模式通过以某种方式控制 ...

  5. java 创建用户界面_建立图形用户界面 JAVA实验

    实验 7 建立图形用户界面 一.实验目的 了解图形用户界面基本组件窗口.按钮.文本框.选择框.滚动条等的使用方法,了解如何使用布局管理器对组件进行管理,以及如何使用 Java 的事件处理机制. 二.实 ...

  6. R语言使用多个数据类型不同的向量数据创建一个dataframe数据对象、使用[]操作符和列名称访问dataframe指定数据列的数据(dataframe column data)

    R语言使用多个数据类型不同的向量数据创建一个dataframe数据对象.使用[]操作符和列名称访问dataframe指定数据列的数据(dataframe column data) 目录 R语言使用多个 ...

  7. java刷新透视表数据源,Java 创建、刷新Excel透视表/设置透视表行折叠、展开

    Java 创建.刷新Excel透视表/设置透视表行折叠.展开 透视表是依据已有数据源来创建的交互式表格,我们可在excel中创建透视表,也可编辑已有透视表.本文以创建透视表.刷新透视表以及设置透视表的 ...

  8. Java 创建型模式

    Java 创建型模式 持续更新- 创建者模式的主要关注点是'怎样创建对象?',它的主要特点是'将对象的创建和使用分离'. 这样可以降低系统的耦合度,使用者不需要关注对象的创建细节. 创建型模式分为: ...

  9. 使用java创建pdf 并返回流给前端

    使用java创建pdf 并返回流给前端 业务需求,页面点击打印预览文件,然后点击打印调用系统(浏览器)打印,经网上查阅资料,自己整理后记录下来 首先是自己整理的帮助类 import cn.bt.com ...

最新文章

  1. mysql数据库常见进阶使用
  2. python对象不接受参数什么意思___new\=TypeError:object()不接受参数
  3. 加快windows上对大文件,以及很多很多小文件进行不同磁盘拷贝的速度——windows上的最快拷贝软件FastCopy
  4. Ajax 基础——未完待续
  5. redis如何解决秒杀超卖java_Spring Boot + redis解决商品秒杀库存超卖,看这篇文章就够了...
  6. spring的aop名词解释
  7. C# - 多线程(基础)
  8. 51 -算法 -斐波拉奇数列 -LeetCode 70 -递推
  9. NBA 投篮数据可视化,4行代码就能实现!
  10. 华为防火墙管理员角色和级别详解
  11. Windows autoKeras的下载与安装连接
  12. 计算机c盘只能新建文件夹,1.C盘只能新建文件夹不能新建文件(用户权限问题)...
  13. 小米多主题思路分析-重定向资源篇
  14. 计算机软件毕业设计项目源码大全
  15. cad线性标注命令_CAD线性标注快捷键是什么,怎么使用
  16. JAVA基础知识点大全之一
  17. 微信开通状态检测工具
  18. 二进制安装Kubernetes(k8s) v1.25.0 IPv4/IPv6双栈
  19. 焕然一新的 Vue 3 中文文档要来了
  20. Oracle:ORA-00054 资源正忙

热门文章

  1. Vue项目中如何实现用户登录及token验证?
  2. php mysql难不难_PHP操作mysql数据库
  3. java 删除文件夹和文件_如何创建无法删除的文件夹?
  4. winapi编程获取文件版本信息的代码_.Net调用WinAPI轻松实现POS小票并口打印
  5. pb数据窗口怎么调用视图_大数据架构如何做到流批一体?
  6. windows10中如何在d盘新建kaoshi.log文件_命令行备份Windows 10驱动amp;设备管理器中安装驱动。...
  7. cad lisp 微盘 程序_使用CAD无法避免的3个坑,你知道怎么绕开吗|AutoCAD断舍离
  8. python 30个小代码_30个Python常用极简代码,拿走就用
  9. bearer token_四,接口认证方式:Bearer Token
  10. matlab中平方根法,平方根法和改进的平方根法解线性方程组(Matlab程序)