spark-streaming连接flume时报错org.jboss.netty.channel.ChannelException: Failed to bind to: /IP:PORT
本文转自http://blog.csdn.net/aaa1117a8w5s6d/article/details/42875867,所有权力归原作者所有。
注:该文虽然解决了Spark Streaming程序运行时报出的Failed bind错误,但是通过作者的解释以及这种解决办法来看,集群中可以用来获取Flume的端口被限定住了,这样不仅不符合常规的逻辑,增加了Spark程序编写的难度,而且也集群的部署变得生硬,不灵活。不知道会不会对以后要尝试的多个端口并行接收数据造成什么更麻烦的影响。慢慢地,一步一步来~~~
http://bbs.csdn.net/topics/390971594?page=1#post-398808154
上面是我当时提问用的,折磨了我好几天,后来发现问题了,分析如下:
连接flume是通过
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(jssc, args[0], Integer.parseInt(args[1]));
- FlumeUtils {
- /**
- * Create a input stream from a Flume source.
- * @param ssc StreamingContext object
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createStream (
- ssc: StreamingContext,
- hostname: String,
- port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): ReceiverInputDStream[SparkFlumeEvent] = {
- val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
- inputStream
- }
在跟进FlumeInputDStream内部:
- class FlumeInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
- host: String,
- port: Int,
- storageLevel: StorageLevel
- ) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
- override def getReceiver(): Receiver[SparkFlumeEvent] = {
- new FlumeReceiver(host, port, storageLevel)
- }
- }
在跟进FlumeReceiver
- class FlumeReceiver(
- host: String,
- port: Int,
- storageLevel: StorageLevel
- ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
- lazy val responder = new SpecificResponder(
- classOf[AvroSourceProtocol], new FlumeEventServer(this))
- lazy val server = new NettyServer(responder, new <span style="color:#ff0000;">InetSocketAddress</span>(host, port))
- def onStart() {
- server.start()
- logInfo("Flume receiver started")
- }
- def onStop() {
- server.close()
- logInfo("Flume receiver stopped")
- }
- override def preferredLocation = Some(host)
- }
这就是建立端口连接的地方了,我们会发现InetSocketAddress这个类,他属于Jdk的jar包rt.jar
现在把这个jar包也放入调用,命令里:
spark-submit --class com.kingsoft.spark.SparkFlumeTest --master yarn-cluster --executor-memory 10G --num-executors 50 --jars /home/hadoop/spark-streaming-flume_2.10-1.1.0.jar,/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/avro/avro-ipc-1.7.5-cdh5.1.0.jar,/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/flume-ng/lib/flume-ng-sdk-1.5.0-cdh5.1.0.jar,/home/hadoop/fastjson-1.1.41.jar,/home/hadoop/rt.jar/home/hadoop/SparkStreaming-0.0.1-SNAPSHOT.jar 10.4.22.16 58006
发现可以了,究其原因:主要是我们集群的JAVA_HOME没有设置的缘故。
PS:养成看源码的习惯,还是不错的。。
如果上面的解释有问题,还请路过的大神指点,谢谢
----------------------------------------------------------------------------------------------------
不是上面的问题,问题还在解决中~~~
----------------------------------------------------------------------------------------------------
折磨了我好几天的事情终于解决了,看看自己之前的排错信息,很是可笑啊,这件事还是自己对yarn和spark不了解所致:
详情可以看这篇文章:
http://m.blog.csdn.net/blog/gengqi88/39089349
从日志看是端口没有启动。查看下yarn container 上启动的worker,同样报没有绑定端口的异常。
查看 yarn 上job的启动的节点,在配置接收flume 数据的节点上,并没有worker的启动。问题正好出现这这里,由于yarn 上发布container 是有RM 根据集群的资源使用情况进行分配的,事先并不值得哪个节点上有启动到container,也就无法值得那个节点上有spark的worker了。更谈不上事先设置在启动spark stream 程序中的host 是否能真正有worker再运行了。这个或许是spark在yarn模式下的一个BUG 吧。
解决方法是,将host 修改为0.0.0.0 进行运行。等spark 程序启动后,查看在哪个节点上启动了接收flume流数据的端口,在将该主机和端口配置到flume的配置文件中,启动flume,就可以实现数据得传输了。
意思就是把程序提交到yarn集群后,RM会根据资源情况分配哪些container来执行这个程序,比如一个集群有1——10台node,在机器1上提交application到yarn,yarn的RM分配机器2和机器3上的container来执行该application,那么命令如下:
spark-submit --class com.kingsoft.spark.SparkFlumeTest --master yarn --deploy-mode cluster --jars /home/hadoop/spark-streaming-flume_2.10-1.0.1.jar,/home/hadoop/avro-ipc-1.7.5-cdh5.1.0.jar,/home/hadoop/flume-ng-sdk-1.5.0.1.jar,/home/hadoop/fastjson-1.1.41.jar /home/hadoop/SparkStreaming-0.0.1-SNAPSHOT.jar 10.4.22.16 58006 (10.4.22.16是机器1的IP)
但是在机器1上并没有运行application的程序,所以无法打开58006端口,这个是spark的BUG。解决办法是调用如下命令:
spark-submit --class com.kingsoft.spark.SparkFlumeTest --master yarn --deploy-mode cluster --jars /home/hadoop/spark-streaming-flume_2.10-1.0.1.jar,/home/hadoop/avro-ipc-1.7.5-cdh5.1.0.jar,/home/hadoop/flume-ng-sdk-1.5.0.1.jar,/home/hadoop/fastjson-1.1.41.jar /home/hadoop/SparkStreaming-0.0.1-SNAPSHOT.jar 0.0.0.0 58006
用0.0.0.0代替,不能使用127.0.0.1,否则flume连不上对应机器。(我也不知道为什么连不上)当application运行时,在到各个机器上去查看哪台机器的58006端口在listen。netstat -anp | grep 58006
这时把flume的配置文件改成该地址,重启就OK了。
啊,多么痛的领悟啊。
---------------------------------------------------------------------------------
启动命令中可以设置--num-executors 10,意思就是启动10个executors,在输出的日志中也会看到如下信息:
15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app002041.hz01.ksyun.com:8041 15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app022054.hz01.ksyun.com:8041 15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app022045.hz01.ksyun.com:8041 15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app022018.ksc.com:8041 15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app002023.hz01.ksyun.com:8041 15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app022016.ksc.com:8041 15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app022055.hz01.ksyun.com:8041 15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app002024.hz01.ksyun.com:8041 15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app002026.hz01.ksyun.com:8041 15/01/22 15:32:50 INFO AMRMClientImpl: Received new token for : hz01-cs-app022042.hz01.ksyun.com:8041
就是在上面10个节点上会有一个节点启动flume监听端口,那么我们就需要挨个到对应的机器上去检查哪个被启动了。使用以下脚本,在调用时传入机器列表,就可以快速定位:
- for arg in $* ; do
- re=`ssh -o strictHostKeyChecking=no $arg -t "sudo netstat -anp | grep 58006 | grep LISTEN |wc -l"`
- if [[ $re > "1" ]];then
- echo $arg
- fi
- done
其中的58006是你对应启动的那个端口。前提是这个脚本所在的机器能够ssh到对应的机器列表中。
spark-streaming连接flume时报错org.jboss.netty.channel.ChannelException: Failed to bind to: /IP:PORT相关推荐
- 测试连接redis时报错redis.clients.jedis.exceptions.JedisConnectionException: Failed to create socket.
解决方法 如果关注Redis的问题,有这么几个方面: 配置文件中应该注释69行的127地址:# bind 127.0.0.1 配置文件中应该修改修改88行为no:protected-mode no 然 ...
- Spark Streaming整合flume实战
Spark Streaming对接Flume有两种方式 Poll:Spark Streaming从flume 中拉取数据 Push:Flume将消息Push推给Spark Streaming 1.安装 ...
- mysql第二天无法连接_MySQL第二天早上第一次连接超时报错,解决方法com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:...
http://zeusami.iteye.com/blog/1112827 MySQL第二天早上第一次连接超时报错,解决方法com.mysql.jdbc.exceptions.jdbc4.Commun ...
- 连接mysql时报错:The driver has not received any packets from the server.
连接mysql时报错. 前几天可以打开的数据库也不能打开,报java.net.ConnectException: Connection refused: connect.错误. 报错原因:mysql服 ...
- 【linux】在CentOS7上更改端口号时报错:Job for sshd.service failed because the control process exited with error
1.问题描述 在在CentOS7上更改端口号时报错: Job for sshd.service failed because the control process exited with error ...
- Spark Streaming和Flume集成指南V1.4.1
Apache Flume是一个用来有效地收集,聚集和移动大量日志数据的分布式的,有效的服务.这里我们解释一下怎样配置Flume和Spark Streaming来从Flume获取数据.这里有两个方法. ...
- 解决ubuntu中连接mysql时报错:Access denied for user ‘root‘@‘localhost‘
在ubuntu安装好mysql后无脑的跟着别人的博客配置了远程访问的权限,然后在连接时报错: itcast@itcast-virtual-machine:/usr/share/mysql$ mysql ...
- spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access
问题描述 spark streaming 使用 直连方式 读取kafka 数据,使用窗口时出现 java.util.ConcurrentModificationException: KafkaCons ...
- 无线打印 打印服务器错误,HP LaserJet Pro 400 M401 系列打印机 - 网络连接打印时报错79 ERROR...
1.查看打印机中是否安装了附加设备,例如:内存条,字库条等.将附加设备取下测试打印是否报错,是否由某个附加设备导致. 2.打印hp laserjet m401配置页,查看是否出现79ERROR报错. ...
最新文章
- 人工神经网络背后的数学原理!
- Kong APIGW — Plugins — 监控告警、日志审计
- BUAA_OO第三单元作业总结——JML
- 11G Oracle RAC添加新表空间时数据文件误放置到本地文件系统的修正
- std::map用法总结
- [Redux/Mobx] Redux怎么添加新的中间件?
- input全选和取消全选
- ThinkPHP_5对数据库的CURL操作
- MVC4发布到IIS7报404错误
- 文件服务器复杂权限,运用技巧:如何提高文件服务器权限?
- hive Beeline plus HiveServer2简单使用
- 再也不学AJAX了!(一)AJAX概述
- Matlab读取音频文件并进行分析
- 常见的几种多媒体设计框架
- 那些年你错过的SOA
- Nmap使用技巧总结
- 鼠标滑过卡片的上浮效果
- spring事务管理总结 .
- 英语老师教计算机,英语教案-小学计算机老师教案?
- 【STM32H7教程】第51章 STM32H7的LTDC应用之LCD汉字显示和2D图形显示
热门文章
- asp.net(c#)网页跳转七种方法小结
- 深度学习 2 机器学习 神经网络 卷积神经网络
- 资源放送丨《高并发Oracle OLTP系统的故障案例分享》PPT视频
- SQL优化之一则MySQL中的DELETE、UPDATE 子查询的锁机制失效案例
- DB2 Vs MySQL系列 | MySQL与DB2的数据类型对比
- 运行npm install命令的时候会发生什么?
- 华为云媒体査勇:华为云在视频AI转码领域的技术实践
- 电商千万级交易的金手指:分布式事务管理
- Spark优化之小文件是否需要合并?
- 【论文笔记】一种有效攻击BERT等模型的方法