使用 saveAsHadoopDataset 写入数据

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}

import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

import org.apache.hadoop.hbase.mapred.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapred.JobConf

//import org.apache.hadoop.mapreduce.Job

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.SparkSession

/**

* Created by blockchain on 18-9-9 下午3:45 in Beijing.

*/

object SparkHBaseRDD {

def main(args: Array[String]) {

// 屏蔽不必要的日志显示在终端上

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

val spark = SparkSession.builder().appName("SparkHBase").getOrCreate()

val sc = spark.sparkContext

val tablename = "SparkHBase"

val hbaseConf = HBaseConfiguration.create()

hbaseConf.set("hbase.zookeeper.quorum","localhost") //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置

hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //设置zookeeper连接端口,默认2181

hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)

//初始化job,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的

val jobConf = new JobConf(hbaseConf)

jobConf.setOutputFormat(classOf[TableOutputFormat])

val indataRDD = sc.makeRDD(Array("2,jack,16", "1,Lucy,15", "5,mike,17", "3,Lily,14"))

val rdd = indataRDD.map(_.split(',')).map{ arr=>

/*一个Put对象就是一行记录,在构造方法中指定主键

* 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换

* Put.addColumn 方法接收三个参数:列族,列名,数据*/

val put = new Put(Bytes.toBytes(arr(0)))

put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))

put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes(arr(2)))

(new ImmutableBytesWritable, put)

}

rdd.saveAsHadoopDataset(jobConf)

spark.stop()

}

}

使用 newAPIHadoopRDD 读取数据

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}

import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

import org.apache.hadoop.hbase.mapred.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapred.JobConf

//import org.apache.hadoop.mapreduce.Job

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.SparkSession

/**

* Created by blockchain on 18-9-9 下午3:45 in Beijing.

*/

object SparkHBaseRDD {

def main(args: Array[String]) {

// 屏蔽不必要的日志显示在终端上

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

val spark = SparkSession.builder().appName("SparkHBase").getOrCreate()

val sc = spark.sparkContext

val tablename = "SparkHBase"

val hbaseConf = HBaseConfiguration.create()

hbaseConf.set("hbase.zookeeper.quorum","localhost") //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置

hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //设置zookeeper连接端口,默认2181

hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename)

// 如果表不存在则创建表

val admin = new HBaseAdmin(hbaseConf)

if (!admin.isTableAvailable(tablename)) {

val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))

admin.createTable(tableDesc)

}

//读取数据并转化成rdd TableInputFormat 是 org.apache.hadoop.hbase.mapreduce 包下的

val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],

classOf[ImmutableBytesWritable],

classOf[Result])

hBaseRDD.foreach{ case (_ ,result) =>

//获取行键

val key = Bytes.toString(result.getRow)

//通过列族和列名获取列

val name = Bytes.toString(result.getValue("cf1".getBytes,"name".getBytes))

val age = Bytes.toString(result.getValue("cf1".getBytes,"age".getBytes))

println("Row key:"+key+" Name:"+name+" Age:"+age)

}

admin.close()

spark.stop()

}

}

Spark DataFrame 通过 Phoenix 读写 HBase

需要添加的依赖如下:

org.apache.phoenix

phoenix-core

${phoenix.version}

org.apache.phoenix

phoenix-spark

${phoenix.version}

下面老规矩,直接上代码。

package com.ai.spark

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.{SaveMode, SparkSession}

/**

* Created by blockchain on 18-9-9 下午8:33 in Beijing.

*/

object SparkHBaseDataFrame {

def main(args: Array[String]) {

// 屏蔽不必要的日志显示在终端上

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

val spark = SparkSession.builder().appName("SparkHBaseDataFrame").getOrCreate()

val url = s"jdbc:phoenix:localhost:2181"

val dbtable = "PHOENIXTEST"

//spark 读取 phoenix 返回 DataFrame 的 第一种方式

val rdf = spark.read

.format("jdbc")

.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")

.option("url", url)

.option("dbtable", dbtable)

.load()

rdf.printSchema()

//spark 读取 phoenix 返回 DataFrame 的 第二种方式

val df = spark.read

.format("org.apache.phoenix.spark")

.options(Map("table" -> dbtable, "zkUrl" -> url))

.load()

df.printSchema()

//spark DataFrame 写入 phoenix,需要先建好表

df.write

.format("org.apache.phoenix.spark")

.mode(SaveMode.Overwrite)

.options(Map("table" -> "PHOENIXTESTCOPY", "zkUrl" -> url))

.save()

spark.stop()

}

}

参考链接:

spark sql hbase java_Spark 读写 HBase 的两种方式(RDD、DataFrame)相关推荐

  1. SQL Server 2008 数据库同步的两种方式 (发布、订阅)

    参考转载: SQL Server 2008 数据库同步的两种方式 (发布.订阅) 使用Sqlserver事务发布实现数据同步 转载于:https://www.cnblogs.com/YangBinCh ...

  2. spark中自定义UDAF函数实现的两种方式---UserDefinedAggregateFunction和Aggregator

    1.基于UserDefinedAggregateFunction实现平均数的计算 package com.bigdata.wb.sparkimport org.apache.spark.sql.Row ...

  3. spark代码连接hive_Spark SQL入门到实战之(7)spark连接hive(spark-shell和eclipse两种方式)...

    1.在服务器(虚拟机)spark-shell连接hive 1.1 将hive-site.xml拷贝到spark/conf里 cp /opt/apache-hive-2.3.2-bin/conf/hiv ...

  4. python import sql脚本_13-模块介绍-import两种方式-py文件的两种用途-模块搜索路径-项目开发的目录规范...

    1.模块的介绍与使用模块import 1.1.模块的介绍 1.1.1.什么是模块? 模块就是一组功能的集合体,我们的程序可以导入模块来复用模块里的功能.在python中,模块的使用方式都是一样的,但其 ...

  5. 用CSV文件读写数据的两种方式(转)

    导读:有时候我们需要对收集的数据做统计,并在页面提供显示以及下载.除了对传统的excel存取之外,对CSV文件的存取也很重要.本文列出了这两种操作的详细代码. 代码: <?php $file = ...

  6. 利用SQL语句自动生成序号的两种方式

    1.首先,我们来介绍第一种方式: ◆查询的SQL语句如下: select row_number() over (order by name) as rowid, sysobjects.[name] f ...

  7. spark的Web UI查看DAG的两种方式

    提交spark任务后,master:8088->ApplicationMaster 然后会跳转到spark的WEB UI界面. 第一种查看DAG的方式是: Jobs->Descriptio ...

  8. java和sql计算两点经纬度距离的两种方式

    一.java public class GeoUtil {      private static final double EARTH_RADIUS = 6371393; // 平均半径,单位:m ...

  9. 根据官网文档看Spark Streaming对接Kafka的两种方式, 以及如何实现Exactly Once语义

    注: 本文算是本人的学习记录, 中间可能有些知识点并不成熟, 不能保证正确性. 只能算是对官网文档作了个翻译和解读, 随时有可能回来更新和纠错 上一篇文章讨论了Spark Streaming的WAL( ...

最新文章

  1. 第 4 章 Hypertable
  2. spring_在运行时更新代码(已Spring解密)
  3. SVN用户验证,调错
  4. 好久没有处理过故障了
  5. 记与公司内网微博的谈话
  6. php接口 接受ios或android端图片; php接收NSData数据
  7. Minor GC,Major GC,Full GC -- hotspot VM GC讲解
  8. P4318,bzoj2440-完全平方数【二分答案,莫比乌斯函数,容斥】
  9. 【转】在树莓派上实现人脸识别
  10. UVA 11237 - Halloween treats(鸽笼原理)
  11. 为什么算法工程师也叫调参狗?
  12. python实现高斯消元法求线性方程组的解
  13. 松翰单片机--SN8F5702学习笔记(一)uart寄存器
  14. 解决vue中双击事件会触发两次单击事件问题
  15. HTML自定义滚动条附效果图和完整源码
  16. eclipse导入idea项目教程
  17. 洛谷 P1238 走迷宫【搜索】【DFS】
  18. 票房拐点之后的影院并购,继续“小而频”还是大洗牌
  19. 接口测试平台代码实现54:首页重构-2
  20. 弘辽科技:淘宝卖家打造爆款商品的六大技巧!

热门文章

  1. Django 模板系统2
  2. day2-元组 列表-赋值和深浅拷贝
  3. css pointer-event
  4. linux笔记-硬链接和符号链接
  5. windows下MBCS和UNICODE编码的转换
  6. python中使用什么导入模块-python—模块导入和类
  7. POSIX多线程API函数
  8. 支持向量机SVM的python实现
  9. RuntimeError: DataLoader worker (pid(s) 13512, 280, 21040) exited unexpectedly
  10. LaTeX引用多篇bibtex格式文献