PySpark写入数据到Hbase的辛酸经历
Pyspark + Hbase
环境配置:
Python:3.7.4
Spark:2.4.4
Hbase:2.2.3
前言:
首先,本人建议使用scala
来做有关spark
的开发,这是和前辈讨论他们给的建议,或者你们可以参考一下这篇文章.
这个真的折磨了我好久(中途还接到需求,断断续续弄了好久,多久就不告诉你们了,免得你们笑我菜),真的辛酸。里面我的做法肯定有很多漏洞,而且还没完全解决,我会持续更新。另外,各位大神,如果友好的建议,评论区提点一下,万分感谢。
另外,本篇可能会有点长,不喜勿喷。。。。
数据格式:
一天一个压缩包(ZIP),然后里面都是很多个TXT文件,分隔符是 “|” ,每个TXT文件大概是9000条数据这样。
前言
我不知道大家有没有这种感觉,反正我是有,就是随着版本的迭代升级,大数据的组件对Python
越来越不友好,学习java
和scala
对于大数据开发来说真的很重要。对于我这种只会python
的菜鸡还真的是刺激。。。
使用spark写入hbase有两个方法,一个是使用hortonworks的开源框架shc
,另一个是使用RDD自带的方法saveAsNewAPIHadoopDataset
存入到hbase中。
hortonworks的开源框架shc
先说说这个。
首先你需要下载个shc的依赖包shc-core-1.1.1-2.1-s_2.11.jar
:spark hbase connect.
版本的选择自己判断把,这个链接往前翻能找到其他版本。
如果你想自行编译这个包,也可以,从这里找到源码下载自己打包:github源码网址.
我已经忘记我怎么找到的了,应该是这一个没错。
代码:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *conf = SparkConf()
config = (("spark.executor.memory", "8g"),("spark.executor.cores", "4"))
conf.setAll(config)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()def read_txt(sc):# input = sc.textFile("file:///home/data/demodata").map(lambda x: x.split('|'))# a_rdd = sc.textFile("file:///home/data/20200302").map(lambda x: x.split('|'))# df = input.toDF()df = spark.read.csv("file:///home/data/demodata", sep="|")return dfdef write_to_hbase1(df):rename_df = df. \withColumnRenamed("_1", "PLA_INFO").\withColumnRenamed("_2", "DATE_TIME").\withColumnRenamed("_3", "KKJ").\withColumnRenamed("_4", "KKW").\withColumnRenamed("_5", "SPEED").\withColumnRenamed("_6", "SPEED_EX").\withColumnRenamed("_7", "ALTITUDE").\withColumnRenamed("_8", "STATUS").\withColumnRenamed("_9", "COLOUR").\withColumnRenamed("_10", "MILEAGE").\withColumnRenamed("_11", "WAY")id_df = rename_df.withColumn("V_ID", monotonically_increasing_id())id_df = id_df.withColumn("V_ID", id_df.V_ID.cast(StringType()))dep = "org.apache.spark.sql.execution.datasources.hbase"# 创建schemacatalog = """{"table":{"namespace":"default", "name":"test", "tableCoder":"PrimitiveType"},"rowkey":"key","columns":{"V_ID":{"cf":"rowkey", "col":"key", "type":"string"},"PLATE":{"cf":"car_info", "col":"PLA_INFO", "type":"string"},"DATE_TIME":{"cf":"car_info", "col":"DATE_TIME", "type":"int"},"KKJD":{"cf":"car_info", "col":"KKJ","type":"int"},"KKWD":{"cf":"car_info", "col":"KKW","type":"string"},"SPEED":{"cf":"car_info", "col":"SPEED", "type":"string"},"SPEED_EX":{"cf":"car_info", "col":"SPEED_EX","type":"string"},"ALTITUDE":{"cf":"car_info", "col":"ALTITUDE","type":"string"},"STATUS":{"cf":"car_info", "col":"STATUS","type":"string"},"COLOUR":{"cf":"car_info", "col":"COLOUR","type":"string"},"MILEAGE":{"cf":"car_info", "col":"MILEAGE","type":"string"},"WAY":{"cf":"car_info", "col":"WAY","type":"string"}}} """id_df.write.options(catalog=catalog).format(dep).save()print('***************************************************************')print('******************* 写 入 成 功 ******************')print('***************************************************************')def main():sc = spark.sparkContextdf = read_txt(sc)write_to_hbase1(df)sc.stop()
if __name__ == '__main__':main()
然后报错了。。。。
给个文字版吧,主要是这一句:
java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
简单来说就是在保存的时候,发生了jar包冲突的错,冲突的是org.json4s.jackson.JsonMethods
对应的是spark里的json4s-core_2.11-3.5.3.jar
。
我用上面的网址下载源码,下载的对用的包,pom里只引用了一个包,应该是引用里面又引用了json4s这个包,我在包里搜索了一下还真的有。。我就不截图了,其实也就是说把这个包去掉重新编译就行了。于是乎,我让我同事帮忙在pom里加了个忽略json4s
包的代码(因为没成功,我就删了,没能贴出来),但是打包的时候一堆错误,所以我放弃了。
网上遇到我上面这个错的都是建议把shc jar包里的剔除了再重新编译就可以了,建议大家试试,应该是可以成功的。
saveAsNewAPIHadoopDataset
这个我直接上代码吧。
代码:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *conf = SparkConf()
config = (("spark.executor.memory", "8g"),("spark.executor.cores", "4"))
conf.setAll(config)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()def read_txt(sc):# input = sc.textFile("file:///home/data/demodata").map(lambda x: x.split('|'))# a_rdd = sc.textFile("file:///home/data/20200302").map(lambda x: x.split('|'))# df = input.toDF()df = spark.read.csv("file:///home/data/demodata", sep="|")return dfdef write_to_hbase2(df):"""将Spark DataFrame 数据写入Hbase"""rename_df = df. \withColumnRenamed("_1", "PLA_INFO").\withColumnRenamed("_2", "DATE_TIME").\withColumnRenamed("_3", "KKJ").\withColumnRenamed("_4", "KKW").\withColumnRenamed("_5", "SPEED").\withColumnRenamed("_6", "SPEED_EX").\withColumnRenamed("_7", "ALTITUDE").\withColumnRenamed("_8", "STATUS").\withColumnRenamed("_9", "COLOUR").\withColumnRenamed("_10", "MILEAGE").\withColumnRenamed("_11", "WAY")id_df = rename_df.withColumn("V_ID", monotonically_increasing_id())id_df = id_df.withColumn("V_ID", id_df.V_ID.cast(StringType()))host = "192.168.0.111"table = "test"keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"conf = {"hbase.zookeeper.quorum": host, "hbase.mapred.outputtable": table,"zookeeper.znode.parent": "/hbase-unsecure","mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}# DataFrame转换为rdd写入hbasecol_list = id_df.columnsprint(col_list)rdd_data = id_df.rdd.map(lambda x:(x["V_ID"],((i,x[i]) for i in col_list[1:])))rdd_data = rdd_data.flatMapValues(lambda x:x)rrd2 = rdd_data.map(lambda x:(x[0],[x[0],'info',x[1][0],x[1][1]]))print(rrd2.take(10))rrd2.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)def main():sc = spark.sparkContextdf = read_txt(sc)write_to_hbase2(df)sc.stop()
if __name__ == '__main__':main()
这个方法有个值得注意的地方,如果你在配置的代码中没有配置zookeeper.znode.parent
,你就会遇到以下错误:
20/04/29 09:28:14 WARN ConnectionImplementation: Retrieve cluster id failed
java.util.concurrent.ExecutionException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /hbase/hbaseid
然后你会一头雾水,因为这个问题你可能在使用hbase shell
的时候应该遇到过,而且你应该已经解决了,然后再往下拉,你就又看到一句:
Caused by: java.io.IOException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /hbase
如果你百度的这些错的话,你能看到的都不是说spark连接Hbase遇到这个问题。。这就很无语。。
其实这里简单来说就是,找不到hbase在zookeeper的路径(不知道我这样表述有没有问题)。所以你只要在配置里指定好zookeeper.znode.parent
就可以了。
这个配置我试过,在scala的代码里不写,也是报一样的错,但是很奇怪的是网上的代码基本都不写这个配置。。可能是我环境配的有问题吧。
好的,配置完之后不报这个错了,又报另一个错:
20/05/07 15:37:18 ERROR Utils: Aborting task
java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.add([B[B[B)Lorg/apache/hadoop/hbase/client/Put;
这里简单来讲就是org.apache.hadoop.hbase.client
里put.add有问题。。。
那么我上网查过,反正意思就是2.0版本之后这个方法改了
解决办法看看这几篇:
https://blog.csdn.net/sinat_37992109/article/details/98735188.
https://www.cnblogs.com/cssdongl/p/7347167.html.
https://blog.csdn.net/cssdongl/article/details/77750515.
简单来讲就是要不呢,你就换版本(spark降低到2.0版本以下),要不呢,你就重新编译spark-exmaple的jar包(怎么又重新编译。。)
我觉得太过于麻烦。。所以我还是写scala
吧
由于文章太长,所以我再分一篇把。。。
PySpark写入数据到Hbase的辛酸经历相关推荐
- CDH集成了Kerberos后写入数据到HBase遇到的几个问题
目录 环境和场景 问题一:Zookeeper认证错误 解决方法 问题二:HBase权限错误 解决方法 环境和场景 环境:CDH6.3.1+Kerberos 场景:数据经Flink处理后写入到Hbase ...
- 读取TXT文件写入数据到Hbase
背景: 接到需求,需要每天把大概一亿条的数据导入到Hbase里,对标同事的mongoDB(大概半个小时内导完) 环境配置: Python:3.7.4 Hbase:2.2.3 数据格式: 一天一个压缩包 ...
- Spark写入数据到Hbase(下)解决scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps
环境配置: Scala:2.11.12(看上篇,原本是2.12.1) Spark:2.4.4 Hbase:2.2.3 前言: 上一篇没报错,后来改动了一下,就报错了,这个错是版本的问题,而且让我十分疑 ...
- [喵咪大数据]Hive+Hbase关联
在之前的章节中我们已经一同学习的Hive和HBase相关的知识,但是Hive和HBase都存在各自的问题,Hive实时性不强单条写入数据慢,HBase查询能力差不具备复杂查询的能力,但是Hive和HB ...
- spark从hbase读取写入数据
将RDD写入hbase 注意点: 依赖: 将lib目录下的hadoop开头jar包.hbase开头jar包添加至classpath 此外还有lib目录下的:zookeeper-3.4.6.jar.me ...
- hbase原理与实践_JAVA连接HBase客户端及HBase写入数据和读取数据原理解析
JAVA连接HBase客户端 接着上篇文章进行代码的实践,从JAVA 客户端对 HBase的客户端进行一系列操作 工具类:HbaseUtil 静态代码块一次性创建连接对象 并赋值 返回连接对象 Con ...
- 大数据查询——HBase读写设计与实践--转
背景介绍 本项目主要解决 check 和 opinion2 张历史数据表(历史数据是指当业务发生过程中的完整中间流程和结果数据)的在线查询.原实现基于 Oracle 提供存储查询服务,随着数据量的不断 ...
- 一条数据的HBase之旅,简明HBase入门教程-开篇
常见的HBase新手问题: 什么样的数据适合用HBase来存储? 既然HBase也是一个数据库,能否用它将现有系统中昂贵的Oracle替换掉? 存放于HBase中的数据记录,为何不直接存放于HDFS之 ...
- 一条数据的HBase之旅,简明HBase入门教程-Write全流程
如果将上篇内容理解为一个冗长的"铺垫",那么,从本文开始,剧情才开始正式展开.本文基于提供的样例数据,介绍了写数据的接口,RowKey定义,数据在客户端的组装,数据路由,打包分发, ...
最新文章
- linux输出重定向%3e退出,Linux学习笔记——第二章:Linux的用户接口与文本编辑器...
- Android SQLite数据库增删改查操作
- 16 分频 32 分频是啥意思_Verilog 数字分频器的设计及验证
- 【渝粤题库】国家开放大学2021春1078复变函数题目
- 安装mysql8._安装MySQL8(附详细图文)
- java enummap_Java EnumMap get()方法与示例
- 彭文华:详解数字化转型的破局之道(附直播视频)
- 对于成功的GDUUU商人来说
- MySQL 异常有这一篇就够了!
- 谁给小鹏P5的勇气?
- 做Tiktok如何选择地区?
- 文字处理(WORD/WP)中,布局与绘制必然分开
- extern dllInport用法
- 【UVA-10891】Game of Sum【区间DP】
- vbs画动态爱心代码_求助,求一个vbs画心形的代码(程序小白的求助)
- 模糊综合评价模型原理及matlab实现
- 尔雅 科学通史(吴国盛) 个人笔记及课后习题 2018 第五章 欧洲科技文明的起源
- python做一个浏览器_用python做一个简单的浏览器
- python贪吃蛇设计思路_Python制作AI贪吃蛇
- 字典(python学习)
热门文章
- php自动加nofollow,WordPress自动外部链接加上nofollow标签且新窗口打开
- word把选择答案弄到题目里_word2003考试选择题及答案
- PHP比java好学?_php好学还是java好学,学php有前途吗
- python海龟画图(2)五星红旗
- 获得安卓手机的相关信息
- 逆向 qq 消息撤回
- SpringBoot:WebSocket实现消息撤回、图片撤回
- windows8.1 ssd优化
- 网络安全工程师的学习路线
- 非root账号安装nginx1.22.0