1、安装logstash,直接解压即可

测试logstash是否可以正常运行

bin/logstash -e 'input { stdin { } } output { stdout {codec => rubydebug } }'

只获取消息

bin/logstash -e 'input { stdin { } } output { stdout {codec => plain { format => "%{message}" } } }'

2、编写logstash配置文件
2、1在logstash目录下创建conf目录
2、2在conf目录下创建文件logstash.conf,内容如下

input {
file {
type => "logs"
path => "/home/hadoop/logs/*.log"
discover_interval => 10
start_position => "beginning"
}
}output {
kafka {
codec => plain {
format => "%{message}"
}
topic_id => "spark"
}
}

logstash input: https://www.elastic.co/guide/en/logstash/current/input-plugins.html
logstash output: https://www.elastic.co/guide/en/logstash/current/output-plugins.html

3、启动logstash采集数据

bin/logstash -f conf/logstash.conf

4、代码

package bigdata.sparkimport org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}/*** Created by Administrator on 2017/4/28.*/
object SparkStreamDemo {def main(args: Array[String]) {val conf = new SparkConf()conf.setAppName("spark_streaming")conf.setMaster("local[*]")val sc = new SparkContext(conf)sc.setCheckpointDir("D:/checkpoints")sc.setLogLevel("ERROR")val ssc = new StreamingContext(sc, Seconds(5))val topics = Map("spark" -> 2)val lines = KafkaUtils.createStream(ssc, "m1:2181,m2:2181,m3:2181", "spark", topics).map(_._2)val ds1 = lines.flatMap(_.split(" ")).map((_, 1))val ds2 = ds1.updateStateByKey[Int]((x:Seq[Int], y:Option[Int]) => {Some(x.sum + y.getOrElse(0))})ds2.print()ssc.start()ssc.awaitTermination()}
}

  

转载于:https://www.cnblogs.com/heml/p/6796131.html

Spark Streaming整合logstash + Kafka wordCount相关推荐

  1. spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access

    问题描述 spark streaming 使用 直连方式 读取kafka 数据,使用窗口时出现 java.util.ConcurrentModificationException: KafkaCons ...

  2. Spark Streaming整合flume实战

    Spark Streaming对接Flume有两种方式 Poll:Spark Streaming从flume 中拉取数据 Push:Flume将消息Push推给Spark Streaming 1.安装 ...

  3. DStream实战之Spark Streaming整合fulme实战, Flume向Spark Streaming中push推数据 36

    前言 本文所需要的安装包&Flume配置文件,博主都已上传,链接为本文涉及安装包&Flume配置文件本文涉及的安装包&Flume配置文件,请自行下载~ flume作为日志实时采 ...

  4. Spark Streaming实时流处理学习

    目录 1.初识实时流处理 2.分布式日志收集框架Flume 3.分布式发布订阅消息系统Kafka 4.实战环境搭建 5.Spark Streaming入门 6.Spark Streaming核心概念与 ...

  5. Spark Streaming 2.0 读取Kafka 0.10 小例子

    环境版本: Scala 2.11.8; Kafka 0.10.0.1; Spark 2.0.0 如需Scala 2.10.5; Spark 1.6.0; Kafka 0.10.0.1版本请看这篇:Fl ...

  6. kafka spark java_spark streaming中维护kafka偏移量到外部介质

    spark streaming中维护kafka偏移量到外部介质 以kafka偏移量维护到redis为例. redis存储格式 使用的数据结构为string,其中key为topic:partition, ...

  7. 使用spark.streaming.kafka.consumer.poll.ms和reconnect.backoff.ms解决spark streaming消费kafka时任务不稳定的问题

    问题描述 在用spark streaming程序消费kafka的数据时,遇到了一个神奇的现象:同样的数据量.相似的数据,在消费时,有些批次的数据在做map操作时神奇的多了40多秒,具体看下面的数据:在 ...

  8. 根据官网文档看Spark Streaming对接Kafka的两种方式, 以及如何实现Exactly Once语义

    注: 本文算是本人的学习记录, 中间可能有些知识点并不成熟, 不能保证正确性. 只能算是对官网文档作了个翻译和解读, 随时有可能回来更新和纠错 上一篇文章讨论了Spark Streaming的WAL( ...

  9. Spark Streaming笔记整理(二):案例、SSC、数据源与自定义Receiver

    [TOC] 实时WordCount案例 主要是监听网络端口中的数据,并实时进行wc的计算. Java版 测试代码如下: package cn.xpleaf.bigdata.spark.java.str ...

最新文章

  1. linux驱动之i2c子系统mpu6050设备驱动
  2. azure多功能成像好用吗_如何使用Azure功能处理高吞吐量消息
  3. 【安装】Mysql在Linux上安装
  4. 预测股票价格 模型_建立有马模型来预测股票价格
  5. flash动画制作作品_flash施工动画制作应该展现哪些内容
  6. python str函数isdigit、isdecimal、isnumeric的区别
  7. [转载] python随笔2(列表的增删改查)
  8. python单元测试框架作用_Python单元测试框架:Pytest简介
  9. UNIX环境高级编程之第4章:文件和目录-习题
  10. 缓存与IO(很经典)
  11. 软件工程人才的社会需求现状与发展分析
  12. mysql percent_MySQL PERCENT_RANK 函数
  13. No.005<日常><工具表>《数学符号与希腊字母表》
  14. 19款国产手机无一幸免:15分钟破解人脸识别,打印眼镜让刷脸形同虚设 ?
  15. 解决win10搜索框无法搜索本地应用或无任何反应
  16. Kafka处理服务器发来的消息并与数据库交互——具体流程
  17. 《杜拉拉升职记》职场36计
  18. 【BZOJ4716】假摔 二分+暴力
  19. XYT-EDFA光纤放大器-纤亿通谈超远距离传输神器!
  20. JAVALM美食推荐网计算机毕业设计Mybatis+系统+数据库+调试部署

热门文章

  1. 创建虚拟环境和新建工程目录
  2. arm opcode hook
  3. 谷歌移动应用强调设计元素:向极简风格转型
  4. python a and b_python-尽管Numpy建议a.b,为什么a.dot(b)比a @ b更...
  5. Pytorch RNN(详解RNN+torch.nn.RNN()实现)
  6. linux用户管理练习题
  7. 1w存银行一年多少利息_100万存银行一年利息多少?能赚多少钱?
  8. 求1+2+3+...+n
  9. AUTOSAR从入门到精通100讲(十)-DoIP协议介绍
  10. 人工智能AI实战100讲(七)-原理+代码实战 | 双目视觉中的极线校正