我有一个spark流作业,它每5秒从Kafka读取一次,对传入的数据进行一些转换,然后写入文件系统。

这不一定是一个流媒体工作,实际上,我只想每天运行一次,将消息排放到文件系统中。不过,我不知道如何停止这项工作。

如果我将超时传递给streamingContext.awaittemination,它不会停止进程,它所做的一切都会导致进程在迭代流时产生错误(请参阅下面的错误)

我想做的最好的方法是什么

这是针对Python上的Spark 1.6的

编辑:

多亏了@marios,解决方案是:ssc.start()

ssc.awaitTermination(10)

ssc.stop()

在停止之前运行脚本10秒。

简化代码:conf = SparkConf().setAppName("Vehicle Data Consolidator").set('spark.files.overwrite','true')

sc = SparkContext(conf=conf)

ssc = StreamingContext(sc, 5)

stream = KafkaUtils.createStream(

ssc,

kafkaParams["zookeeper.connect"],

"vehicle-data-importer",

topicPartitions,

kafkaParams)

stream.saveAsTextFiles('stream-output/kafka-vehicle-data')

ssc.start()

ssc.awaitTermination(10)

错误:16/01/29 15:05:44 INFO BlockManagerInfo: Added input-0-1454097944200 in memory on localhost:58960 (size: 3.0 MB, free: 48.1 MB)

16/01/29 15:05:44 WARN BlockManager: Block input-0-1454097944200 replicated to only 0 peer(s) instead of 1 peers

16/01/29 15:05:44 INFO BlockGenerator: Pushed block input-0-1454097944200

16/01/29 15:05:45 ERROR JobScheduler: Error generating jobs for time 1454097945000 ms

py4j.Py4JException: Cannot obtain a new communication channel

at py4j.CallbackClient.sendCommand(CallbackClient.java:232)

at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:111)

at com.sun.proxy.$Proxy14.call(Unknown Source)

at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)

at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)

at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:230)

at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)

at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)

at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)

at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)

at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)

at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)

at scala.Option.orElse(Option.scala:257)

at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)

at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)

at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)

at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)

at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)

at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)

at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)

at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)

at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)

at scala.util.Try$.apply(Try.scala:161)

at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)

at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)

at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)

at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

16/01/29 15:05:45 INFO MemoryStore: Block input-0-1454097944800 stored as bytes in memory (estimated size 3.0 MB, free 466.1 MB)

16/01/29 15:05:45 INFO BlockManagerInfo: Added input-0-1454097944800 in memory on localhost:58960 (size: 3.0 MB, free: 45.1 MB)

spark 终止 运行_如何在数据源运行ou时停止spark流相关推荐

  1. spark应用程序转换_打包并提交运行Spark应用程序jar包

    基于eclipse的Spark IDE可在 http://scala-ide.org/ 下载. 以WordCount为例: package com.lxw.test import org.apache ...

  2. 两个条件一个为false就运行_设置一个自动运行网格条件单

    如何设置一个能够自动运行策略的网格条件单呢? 首先我们可以选择一个标的(哪些是适合的,这个做相信在手动网格的朋友一定再清楚不过,咱们这里先跳过,下次再讲) 1. 选择好标的以后,输入代码 2. 设置好 ...

  3. java怎么在记事本里写过运行_[置顶] 如何运行用记事本写的java程序

    今天用记事本写了一个java程序,测试能运行,现在把它分解成几个步骤,利于大家理解: 1. 新建一个记事本,后缀名是  .java  :然后在里面写一段java的代码,如图: 2.把写好的java文件 ...

  4. python可以在linux运行_服务器(Linux)上运行python总结

    跑实验换了几次服务器了,每次遇到相似问题都要重新百度,而且每次百度搜索出的顺序都不一样,又得重新找半天,这次把遇到的问题都总结一下. 1.准备 PuTTY和FileZilla FileZilla使用F ...

  5. java提升权限运行_提升代码的运行权限,实现模拟管理员身份的功能

    SPSecurity.RunWithElevatedPrivileges(delegate() { // implementation details omitted }); 可以提升代码的运行权限, ...

  6. python代码怎么运行_使用Joblib并行运行Python代码

    微信公众号:测度空间 对于大多数问题,并行计算确实可以提高计算速度. 随着PC计算能力的提高,我们可以通过在PC中运行并行代码来简单地提升计算速度.Joblib就是这样一个可以简单地将Python代码 ...

  7. python脱离环境运行_脱离Python环境运行的问题所使用的工具介绍

    如果你对脱离Python环境运行中存在不解之处时,就可以浏览以下的文章对如何脱离Python环境运行的相关实际操作,希望你在浏览完下面的文章对其有个相关的了解,以下就是文章的具体描述. 关于脱离Pyt ...

  8. java++记录+运行_记录java+testng运行selenium(三)---xml、ini、excel、日志等配置

    一: ini文件 ini目前只用处存储浏览类型及需要打开的url,ini文件放在configs文件夹下面. 读取ini代码如下: 1 packagetoolskit.documents;2 3 imp ...

  9. python制作动图、怎么运行_用Python2.7运行下面这个代码,但是出现了问题,请问如何可以解决,使之生成图像?...

    Process finished with exit code -1073741795 (0xC000001D) 这是什么意思,如何可以解决这个问题. import numpy as np from ...

最新文章

  1. 在建工程直接费用化_计入在建工程的成本怎么算
  2. 一起学react day1
  3. for else语句小tips : RUNOOB python练习题36
  4. aix解锁oracle用户,aix用户被锁定的解决办法
  5. 一个25岁董事长给程序员的18条忠告
  6. HTML如何实现利表自动求和,使用模板标记在html模板中求和
  7. javascript原生事件总结
  8. Hibernate初次搭建与第一个测试例子
  9. GitHub 的“封神”之路!
  10. Hibernate笔记①--myeclipse制动配置hibernate
  11. java链式语法_javaScript链式调用原理以及加法实现
  12. Linux下安装Scala
  13. 从技术原理洞悉摄像头破解及防范
  14. 小米Pad进入开发者模式
  15. 郑州计算机安全协会安全员考试成绩,安全员C证考试结果查询
  16. poi读取Excel文档(.xls .xlsx)包含合并单元格
  17. html/css横向竖向导航栏的绘制
  18. 大整数乘法(Karatsuba算法的字符串形式的C++实现)
  19. Two classes have the same XML type name 排错
  20. 关键词:MAU,DAU,DAU/MAU

热门文章

  1. 2022G3锅炉水处理国家题库及答案
  2. JAVA java学习(9)——————java常用开发工具介绍
  3. JavaScript - 语言进阶
  4. Windows10升级21H1黑屏解决办法
  5. 重磅!2023 IEEE Fellow名单出炉:唐立新、宗成庆、朱军、姬水旺、刘威等入选
  6. 几本关于用户体验的书籍
  7. 可视化利器Tensorboard
  8. 虚拟创业云|宝妈和大学生兼职和手机网赚兼职的任务平台大全
  9. Vue + vite 切换 favicon图标
  10. 工程伦理--3.1 遭遇伦理困境