将RDD写入hbase
注意点:

依赖:

将lib目录下的hadoop开头jar包、hbase开头jar包添加至classpath

此外还有lib目录下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少会提示hbase RpcRetryingCaller: Call exception不断尝试重连hbase,不报错)、htrace-core-3.1.0-incubating.jar、guava-12.0.1.jar

$SPARK_HOME/lib目录下的 spark-assembly-1.6.1-hadoop2.4.0.jar

不同的package中可能会有相同名称的类,不要导错

连接集群:

spark应用需要连接到zookeeper集群,然后借助zookeeper访问hbase。一般可以通过两种方式连接到zookeeper:

第一种是将hbase-site.xml文件加入classpath

第二种是在HBaseConfiguration实例中设置

如果不设置,默认连接的是localhost:2181会报错:connection refused

本文使用的是第二种方式。

hbase创建表:

虽然可以在spark应用中创建hbase表,但是不建议这样做,最好在hbase shell中创建表,spark写或读数据

使用saveAsHadoopDataset写入数据

package com.test
 
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
 
object TestHBase {
 
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
    val sc = new SparkContext(sparkConf)
 
    val conf = HBaseConfiguration.create()
    //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
    conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")
    //设置zookeeper连接端口,默认2181
    conf.set("hbase.zookeeper.property.clientPort", "2181")
 
    val tablename = "account"
    
    //初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
    val jobConf = new JobConf(conf)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
    
    val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
 
 
    val rdd = indataRDD.map(_.split(',')).map{arr=>{
      /*一个Put对象就是一行记录,在构造方法中指定主键
       * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
       * Put.add方法接收三个参数:列族,列名,数据
       */
      val put = new Put(Bytes.toBytes(arr(0).toInt))
      put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
      put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
      //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
      (new ImmutableBytesWritable, put) 
    }}
    
    rdd.saveAsHadoopDataset(jobConf)
    
    sc.stop()
  }
 
}

使用saveAsNewAPIHadoopDataset写入数据

package com.test
 
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
 
object TestHBase3 {
 
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
    val sc = new SparkContext(sparkConf)
    
    val tablename = "account"
    
    sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3")
    sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
    
    val job = new Job(sc.hadoopConfiguration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])  
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])  
 
    val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
    val rdd = indataRDD.map(_.split(',')).map{arr=>{
      val put = new Put(Bytes.toBytes(arr(0)))
      put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
      put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
      (new ImmutableBytesWritable, put) 
    }}
    
    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
  }
 
}

从hbase读取数据转化成RDD
本例基于官方提供的例子

package com.test
 
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark._
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io._
 
object TestHBase2 {
 
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
    val sc = new SparkContext(sparkConf)
    
    val tablename = "account"
    val conf = HBaseConfiguration.create()
    //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
    conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")
    //设置zookeeper连接端口,默认2181
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set(TableInputFormat.INPUT_TABLE, tablename)
 
    // 如果表不存在则创建表
    val admin = new HBaseAdmin(conf)
    if (!admin.isTableAvailable(tablename)) {
      val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
      admin.createTable(tableDesc)
    }
 
    //读取数据并转化成rdd
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
 
    val count = hBaseRDD.count()
    println(count)
    hBaseRDD.foreach{case (_,result) =>{
      //获取行键
      val key = Bytes.toString(result.getRow)
      //通过列族和列名获取列
      val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
      val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
      println("Row key:"+key+" Name:"+name+" Age:"+age)
    }}
 
    sc.stop()
    admin.close()
  }
}

spark从hbase读取写入数据相关推荐

  1. 用多态来实现U盘,Mp3,移动硬盘和电脑的对接,读取写入数据。

    using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.T ...

  2. vb6 串口同时读取写入数据怎么避免冲突_实例:S7-200 SMART通过Modbus-RTU读取温湿度传感器数据...

    本实例我们介绍下西门子S7-200 SMART PLC如何通过Modbus-RTU协议读取温湿度传感器的数值.实例使用的硬件如下: S7-200 SMART CPU ST20: 温湿度传感器(支持Mo ...

  3. vb6 串口同时读取写入数据怎么避免冲突_分布式场景下的数据复制究竟怎么做...

    主从复制 集群中有一个主节点,写操作都必须经过主节点完成,读操作主从节点都可以处理. 同步复制 数据在副本上落盘才返回. 优点:保证在副本上的数据是最新数据. 缺点:延迟高,响应慢. 异步复制 数据不 ...

  4. Springboot + Easyexcel读取写入数据,多头行数,多sheet,复杂表头简单实现

    Springboot + Easyexcel 读取数据 简单读取excel文件 读取下图的 excel 数据 导入依赖,阿里的easyexcel插件 <dependency><gro ...

  5. 嵌入式C语言STM32在FLASH中读取写入数据

    STM32F4XX向指定FLASH地址读写 向FLASH中写入数据的主体思想就是先解锁,然后清标志位,然后找到要写入的地址,然后改变标志准备写入,然后在按已有的函数按地址一字节一字节的写入,最后要将F ...

  6. C# 操作地址 从内存中读取写入数据(初级)

    本示例以植物大战僵尸为例, 实现功能为 每1秒让阳光刷新为 9999.本示例使用的游戏版本为 [植物大战僵尸2010年度版], 使用的辅助查看内存地址的工具是  CE. 由于每次启动游戏, 游戏中阳光 ...

  7. HBase批量写入数据

    一.HBase安装 1.上传解压 2.修改环境变量 vi /etc/profile export HBASE_HOME=/home/hadoop/hbase export PATH=$PATH:$HB ...

  8. spark保存到mysql_Spark写入数据到MySQL

    以下是本人做的测试,如果有错误请及时指正,有问题欢迎一起讨论. 情景: 需要读取HDFS上的数据,处理之后,写入到MySQL数据库里面去. 实现: 1.版本 spark版本:1.2.1 MySQL版本 ...

  9. java spark读写hdfs_Spark读取HDFS数据输出到不同的文件

    最近有一个需求是这样的:原来的数据是存储在MySQL,然后通过Sqoop将MySQL的数据抽取到了HDFS集群上,抽取到HDFS上的数据都是纯数据,字段值之间以\t分隔,现在需要将这部分数据还原为js ...

最新文章

  1. 2019年企业云呈现五大技术发展趋势
  2. 剑指 offer 链表倒数的第k个数
  3. cisco无线网络实施方案
  4. 在unity 中,使用http请求,下载文件到可读可写路径
  5. 第一款Micropython图形化编辑器—Python Editor
  6. 大家不要催!雷军的螺丝刀已经准备好了...
  7. 【Java】Java StreamCorruptedException: invalid stream header: EFBFBDEF
  8. android 涂鸦之图片叠加,android图像处理系列之七--图片涂鸦,水印-图片叠加...
  9. windows 2003 server右键菜单没有共享选项的解决办法
  10. [渝粤教育] 信阳农林学院 鱼类学 参考 资料
  11. Win10专业版启用.NET FrameWork 3.5
  12. 01超精美渐变色动态背景完整示例【CSS动效实战(纯CSS与JS动效)】
  13. 生活记录:压抑暂时解脱
  14. 4.1 数据仓库基础与Apache Hive入门
  15. xp系统桌面没有计算机,在xp系统中,为什么桌面所有图标都消失?
  16. 数据可视化笔记1 数据可视化简介(简史、分类、功能、目标)
  17. 市场监管新规下Android接入的友盟Umeng移动统计/推送/分享SDK过程问题总结
  18. 闭关修炼,看了老大的博客,才发现自己是多么的技术低,原来我就达到06年的他
  19. 超声波测距仪编程_51单片机控制的超声波测距仪程序
  20. 人生需要规划——感受徐小平 (转自徐小平博客)

热门文章

  1. java tomcat jndi,Tomcat JNDI 资源
  2. c语言学生成绩删除功能,c语言学生成绩管理系统程序设计,有添加,查找,删除,输出,修改,排序等功能!!!...
  3. 为什么c语言要定义变量,C语言为什么要规定对所用到的变量要“先定义,后使用”...
  4. gen文件下有两个R.java_gen目录无法更新,或者gen目录下的R.JAVA文件无法生成
  5. 小程序引入的echarts过大如何解决_解决微信小程序引用echarts视图模糊的问题
  6. android 定制ui,AndroidSDK-UI定制
  7. 液晶弹性自由能计算_自由能方法应用(一)开放计算平台BRIDGE的介绍及使用案例...
  8. python文件实时同步_python文件自动同步备份v1.2【运维必备】2020/12/31
  9. linux查看文件列表内存地址ll,linux指令之文件查看 ls
  10. python读取pdf文档书签 bookmark_Python利用PyPDF2库获取PDF文件总页码实例