利用SparkSQL(java版)将离线数据或实时流数据写入hive的用法及坑点
1. 通常利用SparkSQL将离线或实时流数据的SparkRDD数据写入Hive,一般有两种方法。第一种是利用org.apache.spark.sql.types.StructType和org.apache.spark.sql.types.DataTypes来映射拆分RDD的值;第二种方法是利用rdd和Java bean来反射的机制。下面对两种方法做代码举例
2. 利用org.apache.spark.sql.types.StructType和org.apache.spark.sql.types.DataTypes来映射拆分RDD的值
JavaRDD<Row> resultRdd = rdd.map(new Function<String[], Row>() {@Overridepublic Row call(String[] line) throws Exception {if (line != null && line.length > 0) {return helper.createRow(line);}return null;}});StructType structType = helper.createSchame();Dataset<Row> dataFrame = session.createDataFrame(resultRdd, structType);DataFrameWriter<Row> writer = dataFrame.coalesce(1).write().format(TableHelperInter.TABLE_FORMAT_TYPE).mode(SaveMode.Append);String tableName = hiveDataBaseName + "." + helper.getTableName();writer.insertInto(tableName);
这种方法的有点是写入简单,不必去考虑字段映射有误,但缺点是需要去写一个TableHelperInter,而且这种方式对字段的类型要求严格,在做字段类型和字段校验时比对时一旦字段过多会及其复杂,所以不推崇这种写法
3. 利用rdd和Java bean来反射
来一个完整的程序
public class SparkSQLTest {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("yarn").setAppName("SparkSQL_test");JavaSparkContext sc = new JavaSparkContext(conf);String line = "1102,jason,20,male,15927384023,developer,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19,col20";String line2 = "1103,jason1,21,male,15927352023,developer1,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19,col20";List<String> list = new ArrayList<String>();list.add(line);list.add(line2);JavaRDD<String> rdd = sc.parallelize(list);JavaRDD<Person> rddResult = rdd.map(new Function<String, Person>() {@Overridepublic Person call(String s) throws Exception {String[] message = s.split(",");Person person = new Person();person.setNo(message[0]);person.setName(message[1]);person.setAge(message[2]);person.setGender(message[3]);person.setPhone(message[4]);person.setJob(message[5]);person.setCol7(message[6]);person.setCol8(message[7]);person.setCol9(message[8]);person.setCol10(message[9]);person.setCol11(message[10]);person.setCol12(message[11]);person.setCol13(message[12]);person.setCol14(message[13]);person.setCol15(message[14]);person.setCol16(message[15]);person.setCol17(message[16]);person.setCol18(message[17]);person.setCol19(message[18]);person.setCol20(message[19]);person.setCreate_time_p(DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now()));return person;}});//这行代码必须在实例SparkSession不然会出错SparkSession.clearDefaultSession();SparkSession session = SparkSession.builder().config("hive.metastore.uris", "localhost:9083").config("spark.sql.warehouse.dir", "/apps/hive/warehouse").config("hive.exec.dynamic.partition", true).config("spark.sql.sources.partitionColumnTypeInference.enabled", false).config("hive.exec.dynamic.partition.mode", "nonstrict").enableHiveSupport().getOrCreate();Dataset dataset = session.createDataFrame(rddResult,Person.class);dataset.registerTempTable("person_temp_table");session.sql("insert into qwrenzixing.person_table20 partition (create_time_p="+DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now())+") select no,name,age,gender,phone,job,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19,col20 from person_temp_table");}
}
这种方法比较简洁,为了避免去做繁琐的字段比对和校验。可以将字段类型以string写入hive。同时通过SparkSession操作SQL的方法是spark2.0后的。这里是将dataset写成一张临时表,再将临时表的值查询出来insert into到hive表中。但将DataSet通过SparkSQL写成一张临时表的操作,Spark原生提供了四个关于这种操作API
dataset.registerTempTable("temp_table");
dataset.createGlobalTempView("temp_table");
dataset.createOrReplaceTempView("temp_table");
dataset.createTempView("temp_table");
4. 关于这四个将DataSet写成一张临时表的作用和坑点
1>. dataset.registerTempTable("temp_table")
这个方法建议在离线,批处理中使用,在实时流式计算中会导致后续写入hive值与字段不匹配乱序的问题
2>. dataset.createGlobalTempView("temp_table")
这个方法是创建一个全局临时表,意思就是别的spark-submit也可以用,这种场景很少,而且无法用在实时流式计算中,因为创建一次表后不能再创建会包表已经存在的错误
3>. dataset.createOrReplaceTempView("temp_table");
这个其实比较好理解,如果存在就覆盖
4>. dataset.createTempView("temp_table");
这个方法当spark程序没有结束时不能重复创建
这里的创建临时表在spark程序结束后临时表不存在,所以spark streaming程序要特别注意用法
5. 关于Spark SQL的一个坑点
在mysql中insert into有两种方式
INSERT INTO table_name VALUES (value1, value2,....)
INSERT INTO table_name (column1, column2,...) VALUES (value1, value2,....)
要注意第二种写法在SparkSQL会报错,SparkSQL不支持这种写法,只支持第一种写法。这个是为什么其实也很好理解,每个人想法不一样。第一次使用要避免这个坑点
最后附上我在利用SparkSQL将kafka数据写入hive的重要环节的代码:
String tableName = hiveDataBaseName + ".test_data";Dataset dataFrame = session.createDataFrame(resultRdd, SJGJEntity.class);// createOrReplaceTempView API方式将数据写入hive 不存在值与字段名错乱的问题dataFrame.createOrReplaceTempView("temp_table");session.sql("insert into " + tableName + " partition(create_time_p=" + DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now())+ ") select base_name,base_num,serviceCode,phoneno,called_phoneno,call_time,call_longth,lac,ci,xpoint,ypoint,imei,imsi,insert_time,call_address," + "source_table,mark_type,companyId,type,createKafkaTime from temp_table");
转载于:https://www.cnblogs.com/jiashengmei/p/11045887.html
利用SparkSQL(java版)将离线数据或实时流数据写入hive的用法及坑点相关推荐
- 什么是大数据「实时流计算」?深度解析它的4大应用及4个特点
导读:火灾已经爆发后才知道救火,交通已经阻塞后才知道疏通,羊毛已经被"羊毛党"薅光后才知道堵上漏洞,股价已经拉升后才知道后悔--为什么我们不能在这些事情发生之前,或者至少是刚刚发生 ...
- sparkStreaming:实时流数据详解
目录 一.概述 二.wordCount示例 三.初始化StreamingContext 四.DStreams(离散数据流) 五.输入DStream和接收器 Basic sources File Str ...
- java版gRPC实战之五:双向流
欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos <java版gRPC实战>全系列链接 用p ...
- java版gRPC实战之四:客户端流
欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos <java版gRPC实战>全系列链接 用p ...
- python同花顺股票实时数据_web实时股票数据展示
广告关闭 腾讯云11.11云上盛惠 ,精选热门产品助力上云,云服务器首年88元起,买的越多返的越多,最高返5000元! 所有这些都是实时发生的,并推送到仪表板供用户评估事物和行为. 最终,为了能够从任 ...
- 大数据准实时流式系统设计(一)——基于大数据框架设计
前段时间负责了公司一个新的项目,项目不属于直接面向用户的线上实时响应系统,要求做到尽快毫秒级或者秒级响应的准实时系统.结合以前学习的一些大数据理论方面和参与的准实时系统方面的经验,对准实时系统架构设计 ...
- 大数据-07-Spark之流数据
摘自 http://dblab.xmu.edu.cn/blog/1084-2/ 简介 DStream是Spark Streaming的编程模型,DStream的操作包括输入.转换和输出. Spark ...
- 咕咕数据美股实时行情数据
美股实时行情数据 所有美股实时交易行情数据,美股实时数据,支持代码筛选. 1. 产品功能 支持所有美股实时交易数据查询(国内数据延迟 15 分钟): 包含美股实时交易多项指标数据: 毫秒级查询性能: ...
- 大数据应用--实时路况数据
现在手机上装个导航软件,如高德地图,百度地图等等都有实时路况显示,导航和道路规划可以根据实时路况来实施,从而动态躲避拥堵,为出行节省时间,为了显示实时路况就必须有路况数据,今天来说下实时数据的获取方法 ...
最新文章
- Zookeeper源码分析:Leader角色初始化
- AAAI 2020 | 多模态基准指导的生成式多模态自动文摘
- private的误解
- 彻底弄懂dalvik字节码【三】
- unittest单元测试笔记
- C语言 实现登录注册功能
- 一个二线城市程序员的一年【坐标成都】
- Javascript倒计时页面跳转
- maven 下载源码
- mysql查询女生的成绩_MySQL 统计查询实现代码
- python 相关系数矩阵可视化_python seaborn heatmap可视化相关性矩阵实例
- 用计算机写作文教学反思,语文作文教学反思(精选6篇)
- 一个萌新的学习如何写BUG记录(学无止境,更无止境)
- PDF 格式的文件编辑难度非常大, 相比 DOCX 格式,它存在的意义是什么?
- 开机所有网卡全部down掉解决方法,ens33 down
- excel中引用power bi模型数据
- 安徽建筑大学计算机技术909数据结构
- HTML+JS TD
- 2018大学计算机答案,2018年大学计算机程序试题及答案
- 记录一次elasticsearch的写入优化(附带python客户端、golang客户端)
热门文章
- 入门系列之使用Sysdig监视您的Ubuntu 16.04系统
- IOS、Android html5页面输入的表情符号变成了乱码”???“
- repeater 控件嵌套
- LSA 安装及管理应用程序
- 将unicode编码的txt文件转为utf-8编码
- 现今主流计算机语言,现今主流的Python图形化界面主要有哪些
- linux服务器的搭建配置与应用,linux服务器的搭建与配置
- 东北师范大学计算机科学与技术学科评估,东北的大学最强十校,工科是真强,2所211大学无缘前十...
- 控件无法安装,windows已经阻止此软件因为无法验证发行者
- python各种文件数据的读取