Spark:如何替换sc.parallelize(List(item1,item2)).collect().foreach(row={})为并行?
代码场景:
1)设定的几种数据场景,遍历所有场景:依次统计满足每种场景条件下的数据,并把统计结果存入hive;
2)已有代码如下:
case class IndoorOTTCalibrateBuildingVecotrLegend(oid: Int, minHeight: Int, maxHeight: Int, minGridIDCount: Int, maxGridIDCount: Int, heightType: Int) extends Serializable // 实例化建筑物区间段:按照栅格的个数(面积)、楼的高度(商场等场景)来划分场景val buildingHeightLegends = List(IndoorOTTCalibrateBuildingVecotrLegend(1, 1, 30, 1, 21, BuildingCalibrateHeightType.HeightType1.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(2, 1, 30, 21, 45, BuildingCalibrateHeightType.HeightType2.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(3, 1, 30, 45, 100, BuildingCalibrateHeightType.HeightType3.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(4, 30, 50, 1, 21, BuildingCalibrateHeightType.HeightType4.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(5, 30, 50, 21, 45, BuildingCalibrateHeightType.HeightType5.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(6, 30, 50, 45, 100, BuildingCalibrateHeightType.HeightType6.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(7, 50, 5000, 1, 100, BuildingCalibrateHeightType.HeightType7.toString.toInt))spark.sparkContext.parallelize(buildingHeightLegends).collect().foreach(buildingHeightLegend => {generateSampleBySenceType(spark, p_city, p_hour_start, p_hour_end, p_fpb_day, p_day_sample, linkLossCalibrateParameter, buildingHeightLegend)})
备注:
在generateSampleBySenceType()函数内部包含有:
spark.sql(s"""|xxx |where t10.heihgt>=${buildingHieghtLegend.MinHeight} and t10.height<${buildingHieghtLegend.MaxHeight} |and t10.gridcount<=${buildingHieghtLegend.MinGridIDCount} and t10.gridcount>${buildingHieghtLegend.MaxGridIDCount}|""".stripMargin)
如果把代码修改:
val buildingHeightLegends_df = spark.sqlContext.createDataFrame(buildingHeightLegends)buildingHeightLegends_df.createOrReplaceTempView("temp_buildingheightlegends")sql(s"""|select * from temp_buildingheightlegends""".stripMargin).repartition(buildingHeightLegends.length).foreachPartition(rows => {for (row <- rows) {val buildingHeightLegend = new IndoorOTTCalibrateBuildingVecotrLegend(row.getAs[Int]("oid"),row.getAs[Int]("minheight"),row.getAs[Int]("maxheight"),row.getAs[Int]("mingrididcount"),row.getAs[Int]("maxgrididcount"),row.getAs[Int]("heighttype"))generateSampleBySenceType(spark, p_city, p_hour_start, p_hour_end, p_fpb_day, p_day_sample, linkLossCalibrateParameter, buildingHeightLegend)}})
则会提示:generateSampleBySenceType()内部sql代码位置抛出SparkSession为NULL的异常。
修改方案:
把buildingHeightLegends注册为临时表temp_buildingHeightLegends,去掉外层的foreach,之后在generateSampleBySenceType()内部把temp_buildingHeightLegends与其他结果集合进行cross join:
测试代码如下:
-- 场景表 CREATE TABLE [dbo].[test_senceitems]([sencetype] [int] NULL,[minheight] [int] NULL,[maxheight] [int] NULL,[mingridcount] [int] NULL,[maxgridcount] [int] NULL ) INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (1, 1, 30, 1, 21) INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (2, 1, 30, 21, 45) INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (3, 1, 30, 45, 100) INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (4, 30, 50, 1, 21) INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (5, 30, 50, 21, 45) INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (6, 30, 50, 45, 100) INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (7, 50, 5000, 1, 100)-- 业务过滤统计表 CREATE TABLE [dbo].[test_grid]([gridid] [nvarchar](50) NULL,[height] [int] NULL,[gridcount] [int] NULL ) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g1', 8, 23) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g2', 3, 87) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g3', 4, 34) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g4', 30, 54) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g5', 32, 32) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g6', 32, 20) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g7', 120, 34) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g8', 89, 54) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g9', 9, 16)
替换generateSampleBySenceType()内部sql(s"""|""".stripMargin)代码类似如下:
select t10.*,t11.* from test_grid t10 cross join test_senceitems t11 where t10.height>=t11.minheight and t10.height<t11.maxheight and t10.gridcount>=t11.mingridcount and t10.gridcount<t11.maxgridcount
转载于:https://www.cnblogs.com/yy3b2007com/p/8505152.html
Spark:如何替换sc.parallelize(List(item1,item2)).collect().foreach(row={})为并行?相关推荐
- 批量导入数据到hive表中:假设我有60张主子表如何批量创建导入数据
背景:根据业务需要需要把60张主子表批量入库到hive表. 创建测试数据: 1 def createBatchTestFile(): Unit = { 2 for (layer <- 0 to ...
- Spark编程指南(Python版)
Spark编程指南 译者说在前面:最近在学习Spark相关的知识,在网上没有找到比较详细的中文教程,只找到了官网的教程.出于自己学习同时也造福其他初学者的目的,把这篇指南翻译成了中文,笔者水平有限,文 ...
- Spark编程指南V1.4.0(翻译)
Spark编程指南V1.4.0 · 简介 · 接入Spark · Spark初始化 · 使用Shell · 在集群上部署代码 · ...
- Spark的Transformations算子(理解+实例)
把每个Transformations算子都敲着练习几遍会理解的更深刻 Transformations算子之后要写action算子才会进行计算. 1. map(func) 描述:返回一个新的RDD,该R ...
- Spark之RDD实战篇
Spark RDD创建.转换.行动算子.RDD的持久化: RDD编程 在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换.经过一系列的transformations定义RDD之 ...
- Spark入门阶段一之扫盲笔记
介绍 spark是分布式并行数据处理框架 与mapreduce的区别: mapreduce通常将中间结果放在hdfs上,spark是基于内存并行大数据框架,中间结果放在内存,对于迭代数据spark效率 ...
- Spark RDD用法
RDD简介 并行化集合 外部数据集 读取文本文件 读取其他数据格式 存储RDD RDD 操作 向Spark传递函数 理解闭包 本地模式 vs 集群模式 打印RDD元素 使用键值对 中间操作Transf ...
- spark知识点_RDD
来自官网的Spark Programming Guide,包括个人理解的东西. 这里有一个疑惑点,pyspark是否支持Python内置函数(list.tuple.dictionary相关操作)?思考 ...
- PyCharm搭建Spark开发环境windows下安装pyspark
目录 windows下安装pyspark PyCharm搭建Spark开发环境 windows下安装pyspark spark和hadoop版本版本之间有对应关系 安装jdk 安装hadoop 下载 ...
最新文章
- 【SAP-PM模块】维护业务处理流程
- 修改oralce字符集
- 计算机能力操作系统试题,全国计算机一级考试操作系统论述题及答案
- MapReduce简述、工作流程
- 安卓学习笔记34:默默工作的服务
- 加密+拜占庭将军_屡屡被提及拜占庭将军问题,究竟和比特币是什么关系?
- Codeforces Round #275 (Div. 2) D
- sql 2005中全文索引的使用
- ifv播放器android 版,ifv格式播放器
- 成分句法分析与依存句法分析
- excel中输入身份证号后三位变成0怎么办
- 正面管教php_我就这样走进正面管教
- 计算机课程给你最大的收获,计算机课程心得体会.doc
- 北大青鸟消防控制器组网_北大青鸟消防报警主机维修与调试
- 利用Java求解“鸡兔同笼问题”:鸡和兔在一个笼子里,共有腿100条,头40个,问鸡兔各有几只?
- php代码自动生成文档-phpDocumentor
- 低成本多串口ARM9工控主板解决方案
- 红旗7linux安装教程,红旗Linux7.0硬盘安装简明教程
- memory check error at 0x03D70F16 = 0x00, should be 0xFD.
- 对比学习系列(四)---BYOL
热门文章
- 不同外置模拟器用Android Studio连接指令
- dokuwiki 部署笔记
- Linux (redhat 6.5)服务器上安装Webmin
- 2018汇总数据结构算法篇
- 原 Ubuntu使用VNC运行基于Docker的桌面系统
- 用Android Studio调试Framework层代码
- 【问链财经-区块链基础知识系列】 第三十八课 以太坊智能合约账户全解析
- php accesscontrolalloworigin,php – Access-Control-Allow-Origin没有显示在codeigniter的响应头中...
- android socket 长连接_php socket如何实现长连接
- cadence 常见pcb电阻_经验分享|高频PCB设计中出现的干扰分析及对策