代码场景:

1)设定的几种数据场景,遍历所有场景:依次统计满足每种场景条件下的数据,并把统计结果存入hive;

2)已有代码如下:

    case class IndoorOTTCalibrateBuildingVecotrLegend(oid: Int, minHeight: Int, maxHeight: Int, minGridIDCount: Int, maxGridIDCount: Int, heightType: Int) extends Serializable    //  实例化建筑物区间段:按照栅格的个数(面积)、楼的高度(商场等场景)来划分场景val buildingHeightLegends = List(IndoorOTTCalibrateBuildingVecotrLegend(1, 1, 30, 1, 21, BuildingCalibrateHeightType.HeightType1.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(2, 1, 30, 21, 45, BuildingCalibrateHeightType.HeightType2.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(3, 1, 30, 45, 100, BuildingCalibrateHeightType.HeightType3.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(4, 30, 50, 1, 21, BuildingCalibrateHeightType.HeightType4.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(5, 30, 50, 21, 45, BuildingCalibrateHeightType.HeightType5.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(6, 30, 50, 45, 100, BuildingCalibrateHeightType.HeightType6.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(7, 50, 5000, 1, 100, BuildingCalibrateHeightType.HeightType7.toString.toInt))spark.sparkContext.parallelize(buildingHeightLegends).collect().foreach(buildingHeightLegend => {generateSampleBySenceType(spark, p_city, p_hour_start, p_hour_end, p_fpb_day, p_day_sample, linkLossCalibrateParameter, buildingHeightLegend)})

备注:

在generateSampleBySenceType()函数内部包含有:

spark.sql(s"""|xxx
|where t10.heihgt>=${buildingHieghtLegend.MinHeight} and t10.height<${buildingHieghtLegend.MaxHeight}
|and t10.gridcount<=${buildingHieghtLegend.MinGridIDCount} and  t10.gridcount>${buildingHieghtLegend.MaxGridIDCount}|""".stripMargin)

如果把代码修改:

    val buildingHeightLegends_df = spark.sqlContext.createDataFrame(buildingHeightLegends)buildingHeightLegends_df.createOrReplaceTempView("temp_buildingheightlegends")sql(s"""|select * from temp_buildingheightlegends""".stripMargin).repartition(buildingHeightLegends.length).foreachPartition(rows => {for (row <- rows) {val buildingHeightLegend = new IndoorOTTCalibrateBuildingVecotrLegend(row.getAs[Int]("oid"),row.getAs[Int]("minheight"),row.getAs[Int]("maxheight"),row.getAs[Int]("mingrididcount"),row.getAs[Int]("maxgrididcount"),row.getAs[Int]("heighttype"))generateSampleBySenceType(spark, p_city, p_hour_start, p_hour_end, p_fpb_day, p_day_sample, linkLossCalibrateParameter, buildingHeightLegend)}})

则会提示:generateSampleBySenceType()内部sql代码位置抛出SparkSession为NULL的异常。

修改方案:

把buildingHeightLegends注册为临时表temp_buildingHeightLegends,去掉外层的foreach,之后在generateSampleBySenceType()内部把temp_buildingHeightLegends与其他结果集合进行cross join:

测试代码如下:

-- 场景表
CREATE TABLE [dbo].[test_senceitems]([sencetype] [int] NULL,[minheight] [int] NULL,[maxheight] [int] NULL,[mingridcount] [int] NULL,[maxgridcount] [int] NULL
)
INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (1, 1, 30, 1, 21)
INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (2, 1, 30, 21, 45)
INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (3, 1, 30, 45, 100)
INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (4, 30, 50, 1, 21)
INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (5, 30, 50, 21, 45)
INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (6, 30, 50, 45, 100)
INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (7, 50, 5000, 1, 100)-- 业务过滤统计表
CREATE TABLE [dbo].[test_grid]([gridid] [nvarchar](50) NULL,[height] [int] NULL,[gridcount] [int] NULL
) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g1', 8, 23)
INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g2', 3, 87)
INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g3', 4, 34)
INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g4', 30, 54)
INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g5', 32, 32)
INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g6', 32, 20)
INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g7', 120, 34)
INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g8', 89, 54)
INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g9', 9, 16)

替换generateSampleBySenceType()内部sql(s"""|""".stripMargin)代码类似如下:

select t10.*,t11.*
from test_grid t10
cross join test_senceitems t11
where t10.height>=t11.minheight and t10.height<t11.maxheight
and t10.gridcount>=t11.mingridcount and t10.gridcount<t11.maxgridcount

转载于:https://www.cnblogs.com/yy3b2007com/p/8505152.html

Spark:如何替换sc.parallelize(List(item1,item2)).collect().foreach(row={})为并行?相关推荐

  1. 批量导入数据到hive表中:假设我有60张主子表如何批量创建导入数据

    背景:根据业务需要需要把60张主子表批量入库到hive表. 创建测试数据: 1 def createBatchTestFile(): Unit = { 2 for (layer <- 0 to ...

  2. Spark编程指南(Python版)

    Spark编程指南 译者说在前面:最近在学习Spark相关的知识,在网上没有找到比较详细的中文教程,只找到了官网的教程.出于自己学习同时也造福其他初学者的目的,把这篇指南翻译成了中文,笔者水平有限,文 ...

  3. Spark编程指南V1.4.0(翻译)

    Spark编程指南V1.4.0 ·        简介 ·        接入Spark ·        Spark初始化 ·        使用Shell ·        在集群上部署代码 ·  ...

  4. Spark的Transformations算子(理解+实例)

    把每个Transformations算子都敲着练习几遍会理解的更深刻 Transformations算子之后要写action算子才会进行计算. 1. map(func) 描述:返回一个新的RDD,该R ...

  5. Spark之RDD实战篇

    Spark RDD创建.转换.行动算子.RDD的持久化: RDD编程 在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换.经过一系列的transformations定义RDD之 ...

  6. Spark入门阶段一之扫盲笔记

    介绍 spark是分布式并行数据处理框架 与mapreduce的区别: mapreduce通常将中间结果放在hdfs上,spark是基于内存并行大数据框架,中间结果放在内存,对于迭代数据spark效率 ...

  7. Spark RDD用法

    RDD简介 并行化集合 外部数据集 读取文本文件 读取其他数据格式 存储RDD RDD 操作 向Spark传递函数 理解闭包 本地模式 vs 集群模式 打印RDD元素 使用键值对 中间操作Transf ...

  8. spark知识点_RDD

    来自官网的Spark Programming Guide,包括个人理解的东西. 这里有一个疑惑点,pyspark是否支持Python内置函数(list.tuple.dictionary相关操作)?思考 ...

  9. PyCharm搭建Spark开发环境windows下安装pyspark

    目录 windows下安装pyspark PyCharm搭建Spark开发环境 windows下安装pyspark spark和hadoop版本版本之间有对应关系 安装jdk 安装hadoop 下载 ...

最新文章

  1. 【SAP-PM模块】维护业务处理流程
  2. 修改oralce字符集
  3. 计算机能力操作系统试题,全国计算机一级考试操作系统论述题及答案
  4. MapReduce简述、工作流程
  5. 安卓学习笔记34:默默工作的服务
  6. 加密+拜占庭将军_屡屡被提及拜占庭将军问题,究竟和比特币是什么关系?
  7. Codeforces Round #275 (Div. 2) D
  8. sql 2005中全文索引的使用
  9. ifv播放器android 版,ifv格式播放器
  10. 成分句法分析与依存句法分析
  11. excel中输入身份证号后三位变成0怎么办
  12. 正面管教php_我就这样走进正面管教
  13. 计算机课程给你最大的收获,计算机课程心得体会.doc
  14. 北大青鸟消防控制器组网_北大青鸟消防报警主机维修与调试
  15. 利用Java求解“鸡兔同笼问题”:鸡和兔在一个笼子里,共有腿100条,头40个,问鸡兔各有几只?
  16. php代码自动生成文档-phpDocumentor
  17. 低成本多串口ARM9工控主板解决方案
  18. 红旗7linux安装教程,红旗Linux7.0硬盘安装简明教程
  19. memory check error at 0x03D70F16 = 0x00, should be 0xFD.
  20. 对比学习系列(四)---BYOL

热门文章

  1. 不同外置模拟器用Android Studio连接指令
  2. dokuwiki 部署笔记
  3. Linux (redhat 6.5)服务器上安装Webmin
  4. 2018汇总数据结构算法篇
  5. 原 Ubuntu使用VNC运行基于Docker的桌面系统
  6. 用Android Studio调试Framework层代码
  7. 【问链财经-区块链基础知识系列】 第三十八课 以太坊智能合约账户全解析
  8. php accesscontrolalloworigin,php – Access-Control-Allow-Origin没有显示在codeigniter的响应头中...
  9. android socket 长连接_php socket如何实现长连接
  10. cadence 常见pcb电阻_经验分享|高频PCB设计中出现的干扰分析及对策