spark 终止 运行_如何在数据源运行ou时停止spark流
我有一个spark流作业,它每5秒从Kafka读取一次,对传入的数据进行一些转换,然后写入文件系统。
这不一定是一个流媒体工作,实际上,我只想每天运行一次,将消息排放到文件系统中。不过,我不知道如何停止这项工作。
如果我将超时传递给streamingContext.awaittemination,它不会停止进程,它所做的一切都会导致进程在迭代流时产生错误(请参阅下面的错误)
我想做的最好的方法是什么
这是针对Python上的Spark 1.6的
编辑:
多亏了@marios,解决方案是:ssc.start()
ssc.awaitTermination(10)
ssc.stop()
在停止之前运行脚本10秒。
简化代码:conf = SparkConf().setAppName("Vehicle Data Consolidator").set('spark.files.overwrite','true')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 5)
stream = KafkaUtils.createStream(
ssc,
kafkaParams["zookeeper.connect"],
"vehicle-data-importer",
topicPartitions,
kafkaParams)
stream.saveAsTextFiles('stream-output/kafka-vehicle-data')
ssc.start()
ssc.awaitTermination(10)
错误:16/01/29 15:05:44 INFO BlockManagerInfo: Added input-0-1454097944200 in memory on localhost:58960 (size: 3.0 MB, free: 48.1 MB)
16/01/29 15:05:44 WARN BlockManager: Block input-0-1454097944200 replicated to only 0 peer(s) instead of 1 peers
16/01/29 15:05:44 INFO BlockGenerator: Pushed block input-0-1454097944200
16/01/29 15:05:45 ERROR JobScheduler: Error generating jobs for time 1454097945000 ms
py4j.Py4JException: Cannot obtain a new communication channel
at py4j.CallbackClient.sendCommand(CallbackClient.java:232)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:111)
at com.sun.proxy.$Proxy14.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:230)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/01/29 15:05:45 INFO MemoryStore: Block input-0-1454097944800 stored as bytes in memory (estimated size 3.0 MB, free 466.1 MB)
16/01/29 15:05:45 INFO BlockManagerInfo: Added input-0-1454097944800 in memory on localhost:58960 (size: 3.0 MB, free: 45.1 MB)
spark 终止 运行_如何在数据源运行ou时停止spark流相关推荐
- spark应用程序转换_打包并提交运行Spark应用程序jar包
基于eclipse的Spark IDE可在 http://scala-ide.org/ 下载. 以WordCount为例: package com.lxw.test import org.apache ...
- 两个条件一个为false就运行_设置一个自动运行网格条件单
如何设置一个能够自动运行策略的网格条件单呢? 首先我们可以选择一个标的(哪些是适合的,这个做相信在手动网格的朋友一定再清楚不过,咱们这里先跳过,下次再讲) 1. 选择好标的以后,输入代码 2. 设置好 ...
- java怎么在记事本里写过运行_[置顶] 如何运行用记事本写的java程序
今天用记事本写了一个java程序,测试能运行,现在把它分解成几个步骤,利于大家理解: 1. 新建一个记事本,后缀名是 .java :然后在里面写一段java的代码,如图: 2.把写好的java文件 ...
- python可以在linux运行_服务器(Linux)上运行python总结
跑实验换了几次服务器了,每次遇到相似问题都要重新百度,而且每次百度搜索出的顺序都不一样,又得重新找半天,这次把遇到的问题都总结一下. 1.准备 PuTTY和FileZilla FileZilla使用F ...
- java提升权限运行_提升代码的运行权限,实现模拟管理员身份的功能
SPSecurity.RunWithElevatedPrivileges(delegate() { // implementation details omitted }); 可以提升代码的运行权限, ...
- python代码怎么运行_使用Joblib并行运行Python代码
微信公众号:测度空间 对于大多数问题,并行计算确实可以提高计算速度. 随着PC计算能力的提高,我们可以通过在PC中运行并行代码来简单地提升计算速度.Joblib就是这样一个可以简单地将Python代码 ...
- python脱离环境运行_脱离Python环境运行的问题所使用的工具介绍
如果你对脱离Python环境运行中存在不解之处时,就可以浏览以下的文章对如何脱离Python环境运行的相关实际操作,希望你在浏览完下面的文章对其有个相关的了解,以下就是文章的具体描述. 关于脱离Pyt ...
- java++记录+运行_记录java+testng运行selenium(三)---xml、ini、excel、日志等配置
一: ini文件 ini目前只用处存储浏览类型及需要打开的url,ini文件放在configs文件夹下面. 读取ini代码如下: 1 packagetoolskit.documents;2 3 imp ...
- python制作动图、怎么运行_用Python2.7运行下面这个代码,但是出现了问题,请问如何可以解决,使之生成图像?...
Process finished with exit code -1073741795 (0xC000001D) 这是什么意思,如何可以解决这个问题. import numpy as np from ...
最新文章
- 在建工程直接费用化_计入在建工程的成本怎么算
- 一起学react day1
- for else语句小tips : RUNOOB python练习题36
- aix解锁oracle用户,aix用户被锁定的解决办法
- 一个25岁董事长给程序员的18条忠告
- HTML如何实现利表自动求和,使用模板标记在html模板中求和
- javascript原生事件总结
- Hibernate初次搭建与第一个测试例子
- GitHub 的“封神”之路!
- Hibernate笔记①--myeclipse制动配置hibernate
- java链式语法_javaScript链式调用原理以及加法实现
- Linux下安装Scala
- 从技术原理洞悉摄像头破解及防范
- 小米Pad进入开发者模式
- 郑州计算机安全协会安全员考试成绩,安全员C证考试结果查询
- poi读取Excel文档(.xls .xlsx)包含合并单元格
- html/css横向竖向导航栏的绘制
- 大整数乘法(Karatsuba算法的字符串形式的C++实现)
- Two classes have the same XML type name 排错
- 关键词:MAU,DAU,DAU/MAU