spark sql hbase java_Spark 读写 HBase 的两种方式(RDD、DataFrame)
使用 saveAsHadoopDataset 写入数据
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
//import org.apache.hadoop.mapreduce.Job
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
/**
* Created by blockchain on 18-9-9 下午3:45 in Beijing.
*/
object SparkHBaseRDD {
def main(args: Array[String]) {
// 屏蔽不必要的日志显示在终端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val spark = SparkSession.builder().appName("SparkHBase").getOrCreate()
val sc = spark.sparkContext
val tablename = "SparkHBase"
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum","localhost") //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //设置zookeeper连接端口,默认2181
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
//初始化job,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的
val jobConf = new JobConf(hbaseConf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
val indataRDD = sc.makeRDD(Array("2,jack,16", "1,Lucy,15", "5,mike,17", "3,Lily,14"))
val rdd = indataRDD.map(_.split(',')).map{ arr=>
/*一个Put对象就是一行记录,在构造方法中指定主键
* 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
* Put.addColumn 方法接收三个参数:列族,列名,数据*/
val put = new Put(Bytes.toBytes(arr(0)))
put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes(arr(2)))
(new ImmutableBytesWritable, put)
}
rdd.saveAsHadoopDataset(jobConf)
spark.stop()
}
}
使用 newAPIHadoopRDD 读取数据
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
//import org.apache.hadoop.mapreduce.Job
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
/**
* Created by blockchain on 18-9-9 下午3:45 in Beijing.
*/
object SparkHBaseRDD {
def main(args: Array[String]) {
// 屏蔽不必要的日志显示在终端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val spark = SparkSession.builder().appName("SparkHBase").getOrCreate()
val sc = spark.sparkContext
val tablename = "SparkHBase"
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum","localhost") //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //设置zookeeper连接端口,默认2181
hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename)
// 如果表不存在则创建表
val admin = new HBaseAdmin(hbaseConf)
if (!admin.isTableAvailable(tablename)) {
val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
admin.createTable(tableDesc)
}
//读取数据并转化成rdd TableInputFormat 是 org.apache.hadoop.hbase.mapreduce 包下的
val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
hBaseRDD.foreach{ case (_ ,result) =>
//获取行键
val key = Bytes.toString(result.getRow)
//通过列族和列名获取列
val name = Bytes.toString(result.getValue("cf1".getBytes,"name".getBytes))
val age = Bytes.toString(result.getValue("cf1".getBytes,"age".getBytes))
println("Row key:"+key+" Name:"+name+" Age:"+age)
}
admin.close()
spark.stop()
}
}
Spark DataFrame 通过 Phoenix 读写 HBase
需要添加的依赖如下:
org.apache.phoenix
phoenix-core
${phoenix.version}
org.apache.phoenix
phoenix-spark
${phoenix.version}
下面老规矩,直接上代码。
package com.ai.spark
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* Created by blockchain on 18-9-9 下午8:33 in Beijing.
*/
object SparkHBaseDataFrame {
def main(args: Array[String]) {
// 屏蔽不必要的日志显示在终端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val spark = SparkSession.builder().appName("SparkHBaseDataFrame").getOrCreate()
val url = s"jdbc:phoenix:localhost:2181"
val dbtable = "PHOENIXTEST"
//spark 读取 phoenix 返回 DataFrame 的 第一种方式
val rdf = spark.read
.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("url", url)
.option("dbtable", dbtable)
.load()
rdf.printSchema()
//spark 读取 phoenix 返回 DataFrame 的 第二种方式
val df = spark.read
.format("org.apache.phoenix.spark")
.options(Map("table" -> dbtable, "zkUrl" -> url))
.load()
df.printSchema()
//spark DataFrame 写入 phoenix,需要先建好表
df.write
.format("org.apache.phoenix.spark")
.mode(SaveMode.Overwrite)
.options(Map("table" -> "PHOENIXTESTCOPY", "zkUrl" -> url))
.save()
spark.stop()
}
}
参考链接:
spark sql hbase java_Spark 读写 HBase 的两种方式(RDD、DataFrame)相关推荐
- SQL Server 2008 数据库同步的两种方式 (发布、订阅)
参考转载: SQL Server 2008 数据库同步的两种方式 (发布.订阅) 使用Sqlserver事务发布实现数据同步 转载于:https://www.cnblogs.com/YangBinCh ...
- spark中自定义UDAF函数实现的两种方式---UserDefinedAggregateFunction和Aggregator
1.基于UserDefinedAggregateFunction实现平均数的计算 package com.bigdata.wb.sparkimport org.apache.spark.sql.Row ...
- spark代码连接hive_Spark SQL入门到实战之(7)spark连接hive(spark-shell和eclipse两种方式)...
1.在服务器(虚拟机)spark-shell连接hive 1.1 将hive-site.xml拷贝到spark/conf里 cp /opt/apache-hive-2.3.2-bin/conf/hiv ...
- python import sql脚本_13-模块介绍-import两种方式-py文件的两种用途-模块搜索路径-项目开发的目录规范...
1.模块的介绍与使用模块import 1.1.模块的介绍 1.1.1.什么是模块? 模块就是一组功能的集合体,我们的程序可以导入模块来复用模块里的功能.在python中,模块的使用方式都是一样的,但其 ...
- 用CSV文件读写数据的两种方式(转)
导读:有时候我们需要对收集的数据做统计,并在页面提供显示以及下载.除了对传统的excel存取之外,对CSV文件的存取也很重要.本文列出了这两种操作的详细代码. 代码: <?php $file = ...
- 利用SQL语句自动生成序号的两种方式
1.首先,我们来介绍第一种方式: ◆查询的SQL语句如下: select row_number() over (order by name) as rowid, sysobjects.[name] f ...
- spark的Web UI查看DAG的两种方式
提交spark任务后,master:8088->ApplicationMaster 然后会跳转到spark的WEB UI界面. 第一种查看DAG的方式是: Jobs->Descriptio ...
- java和sql计算两点经纬度距离的两种方式
一.java public class GeoUtil { private static final double EARTH_RADIUS = 6371393; // 平均半径,单位:m ...
- 根据官网文档看Spark Streaming对接Kafka的两种方式, 以及如何实现Exactly Once语义
注: 本文算是本人的学习记录, 中间可能有些知识点并不成熟, 不能保证正确性. 只能算是对官网文档作了个翻译和解读, 随时有可能回来更新和纠错 上一篇文章讨论了Spark Streaming的WAL( ...
最新文章
- 第 4 章 Hypertable
- spring_在运行时更新代码(已Spring解密)
- SVN用户验证,调错
- 好久没有处理过故障了
- 记与公司内网微博的谈话
- php接口 接受ios或android端图片; php接收NSData数据
- Minor GC,Major GC,Full GC -- hotspot VM GC讲解
- P4318,bzoj2440-完全平方数【二分答案,莫比乌斯函数,容斥】
- 【转】在树莓派上实现人脸识别
- UVA 11237 - Halloween treats(鸽笼原理)
- 为什么算法工程师也叫调参狗?
- python实现高斯消元法求线性方程组的解
- 松翰单片机--SN8F5702学习笔记(一)uart寄存器
- 解决vue中双击事件会触发两次单击事件问题
- HTML自定义滚动条附效果图和完整源码
- eclipse导入idea项目教程
- 洛谷 P1238 走迷宫【搜索】【DFS】
- 票房拐点之后的影院并购,继续“小而频”还是大洗牌
- 接口测试平台代码实现54:首页重构-2
- 弘辽科技:淘宝卖家打造爆款商品的六大技巧!