用过sparkstreaming的人都知道,当使用sparkstreaming on yarn模式的时候,如果我们想查看系统运行的log,是没法直接看的,就算能看也只是一部分。

这里的log分:

(1)spark本身运行的log

(2)代码里面业务产生的log

spark on yarn模式,如果你的hadoop集群有100台,那么意味着你的sparkstreaming的log有可能会随机分布在100台中,你想查看log必须登录上每台机器上,一个个查看,如果通过Hadoop的8088页面查看,你也得打开可能几十个页面才能看到所有的log,那么问题来了?

能不能将这个job运行所有的log统一收集到某一个目录里面呢? 如果收集到一起的话排查log就非常方便了。

答案是很遗憾,在sparkstreaming里面没法做到,因为sparkstreaming程序永远不停机,就算你开启hadoop的log聚合也没用,只有当sparkstreaming程序停掉,hadoop的log聚合才能把所有的log收集到一个目录里面,所以其他的非sparkstreaming程序,比如MR,Spark 运行完后,如果开启log聚合,hadoop会负责把运行在各个节点上的log给统一收集到HDFS上,这样的话我们查看log就非常方便了。

现在的问题是sparkstreaming不能停机,那么还能集中收集log到指定的地方吗?答案是可以的,我们使用log4j收集日志然后异步发送至kafka里面,最后再通过logstash收集kafka里面的日志进入es即可,这样一条龙服务打通之后,出现任何异常都可以非常快和方便的在es中排查问题,效率大大提升。至于使用logstash从kafka收集到es里面,不是本文的重点,有兴趣的参考散仙前面的文章:http://qindongliang.iteye.com/blog/2278642。

下面会介绍下如何使用:

streaming项目中的log4j使用的是apache log4j

        <dependency>            <groupId>log4j</groupId>            <artifactId>log4j</artifactId>            <version>1.2.17</version>        </dependency>

sparkstreaming项目可以单独提交某个job的log4j文件,这样就能定制每个job的log输出格式,如果提交的时候不提交log4j文件,那么默认用的是spark安装目录下面的log4j文件。
看下我们log4j文件的内容:

log4j.rootLogger=WARN,console,kafka

#log4j.logger.com.demo.kafka=DEBUG,kafka# appender kafkalog4j.appender.kafka=kafka.producer.KafkaLog4jAppenderlog4j.appender.kafka.topic=kp_diag_log# multiple brokers are separated by comma ",".log4j.appender.kafka.brokerList=192.168.201.6:9092,192.168.201.7:9092,192.168.201.8:9092log4j.appender.kafka.compressionType=nonelog4j.appender.kafka.syncSend=falselog4j.appender.kafka.layout=org.apache.log4j.PatternLayout#log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%nlog4j.appender.kafka.layout.ConversionPattern=[%d] [%p] [%t] %m%n

# appender consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=System.outlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=[%d] [%p] [%t] %m%n#log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

最后看下提交脚本:



jars=`echo /home/spark/x_spark_job/streaming_lib/*jar | sed 's/ /,/g'`

echo $jars

#nohup /opt/bigdata/spark/bin/spark-submit  --class  com.bigdata.xuele.streaming.SparkStreamingKmd  --master yarn    --deploy-mode cluster --executor-cores 3  --driver-memory 4g   --executor-memory 4g  --num-executors 10  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=logback.xml"   --jars  $jars    kpdiag-stream-1.0.0-SNAPSHOT.jar  &> streaming.log  &

nohup /opt/bigdata/spark/bin/spark-submit    --class  com.bigdata.xuele.streaming.SparkStreamingKmd  --master yarn  --deploy-mode cluster \ --files "/home/spark/x_spark_job/log4j.properties" \ --executor-cores 3   --driver-memory 3g   --executor-memory 3g  --num-executors 12    --jars  $jars  \ --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties"   \ --driver-class-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ --driver-library-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar  \ --conf spark.executor.extraClassPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar   \ --conf spark.executor.extraLibraryPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar  \ kpdiag-stream-1.0.0-SNAPSHOT.jar &> kp.log &

注意上面提交脚本中,/opt/bigdata/jars/spark/这个路径引用的jar包,必须在每台hadoop机器上都要存在,sparkstreaming运行过程中,会从本地加载jar包,此外log4j.properties文件以及参数里面--jars 后面的依赖jar 可以在提交机器上放一份即可,不需要每台机器上都存放。

提交任务后,在kafka的节点上执行消费者命令就能看到对应的log输出:
执行命令:

kafka-console-consumer --zookeeper 192.168.201.5:2181 --topic kp_diag_log

收集到的log内容如下:

[2017-01-21 16:37:03,154] [WARN] [Driver] Support for Java 7 is deprecated as of Spark 2.0.0

[2017-01-21 16:37:19,714] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0

[2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0

[2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0

[2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0

[2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0

[2017-01-21 16:37:19,740] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0

[2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0

[2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0

[2017-01-21 16:37:19,842] [WARN] [Executor task launch worker-0] 题目id:b07e88feff464659ab5a351bf1e68ee0在redis不存在

至此,我们的log就统一收集成功了,后续我们可以把log从kafka导入到es中,就可以任意分析和查询了。

这里需要注意一点,sparkstreaming运行时候,系统本身也有大量的log,如果把这个系统log也收集到kafka里面本身的量是非常大的,而且好多信息不重要,其实
我们只需要关注业务重点log即可,主要是WARN+ERROR级别的,调试的时候可以把info级别打开,代码里重点关注的log都放在warn级别,异常什么的放在ERROR即可
这样排查问题时候也容易而且了避免了大量log的产生从应用本身性能的影响。

[b][color=green][size=large]
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。
[/size][/color][/b]
[img]http://dl2.iteye.com/upload/attachment/0104/9948/3214000f-5633-3c17-a3d7-83ebda9aebff.jpg[/img]

如何收集SparkSteaming运行日志实时进入kafka中相关推荐

  1. 自定义Flume拦截器,并将收集的日志存储到Kafka中(案例)

    1.引入POM文件 如果想调用Flume,需要引入flume相关的jar包依赖,jar包依赖如下: <?xml version="1.0" encoding="UT ...

  2. flink实时消费kafka中oracle的DML数据写入mysql

    1.需要环境 zookeeper,小编安装环境为zookeeper-3.4.10 kakfa,小编安装环境为kafka_2.13-2.8.0 kafka-connect-oracle,此为kafka- ...

  3. 使用MongoShake实现MongoDB数据实时导入Kafka

    一.MongoShake简介 MongoShake是阿里云以Golang语言编写的通用平台型服务工具,它通过读取MongoDB的Oplog操作日志来复制MongoDB的数据以实现特定需求. Mongo ...

  4. Flume实时采集mysql数据到kafka中并输出

    环境说明 centos7 flume1.9.0(flume-ng-sql-source插件版本1.5.3) jdk1.8 kafka 2.1.1 zookeeper(这个我用的kafka内置的zk) ...

  5. Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据

    Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据 一.引入flink相关依赖 二.properties保存连接kafka的配置 三.构建flink实时消费环境 ...

  6. Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数

    1.创建Maven项目 创建的过程参考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2.启动Kafka A:安装kafka集 ...

  7. 聊聊Kafka中值得我们学习的优秀设计

    我们今天来聊一聊Kafka中优秀的设计,希望可以提高你的设计能力.写代码能力! 一.Kafka基础 消息系统的作用 应该大部份小伙伴都清楚,用机油装箱举个例子 所以消息系统就是如上图我们所说的仓库,能 ...

  8. kafka中文文档(0.10.0)

    kafka中文文档(0.10.0) 作者:链上研发-老杨叔叔 时间:2016-07-22 版本:Apache Kafka 0.10.0 (2016年5月底发布) .目录 kafka中文文档0100 目 ...

  9. Netty、Kafka中的零拷贝技术到底有多牛?

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 来源:rrd.me/ggFBd 零拷贝,从字面意思理解就是数据不需 ...

最新文章

  1. 用python管理自己的密码
  2. Oracle11g密码忘记处理方法
  3. 用li列表模拟table式的表
  4. Attach()和Detach()函数
  5. cfile清空文件内容_电脑C盘文件夹哪些可以删除?教你如何快速清理,旧电脑还能用3年...
  6. Cannot connect to database because the database client
  7. 程序员自我提高情绪10招
  8. scrapy 整合 djangoitem,摆脱保存数据时SQL报错的困扰
  9. LINUX SAN 500G重新分区、格式化,后成功恢复数据90%
  10. 负载均衡技术沙龙2期圆满结束(现场图文、PPT)
  11. 多线程之阻塞队列ArrayBlockingQueue,BlockingQueue
  12. mysql笛卡尔积效率_SQL优化 MySQL版 -分析explain SQL执行计划与笛卡尔积
  13. python爬取微信群聊内容_群聊变赌场 微信QQ的这种“新玩法”应远离
  14. Manjaro Linux 魔兽世界 使用黑盒工坊安装插件
  15. OS=Windows and the assembly descriptor contains a *nix-specific root-relative-reference (starting wi
  16. 如何在一个小时内加密你的全部数字生活?
  17. ansible从入门到放弃
  18. 标准差详解-一文搞懂标准差的含义
  19. ubuntu LibreOffice writer的基本操作
  20. drupal_Drupal中的Gutenberg编辑器入门

热门文章

  1. [几何] BZOJ 4246 两个人的星座
  2. js 周期性定时器
  3. 华师大副校长任友群:互联网+校园新挑战
  4. POJ3322Bloxorz I
  5. VS1003调试例程
  6. 刘泽云《计量经济学实验教程》笔记
  7. python独立样本t检验 图_SPSS实操两个独立样本均数比较的t检验
  8. html css标记文本,HTML图像标记和CSS核心基础和文本相关样式
  9. 黄反词测试,接口监控
  10. 指令,机器指令,指令周期,机器周期的辨析