Spark Streaming整合logstash + Kafka wordCount
1、安装logstash,直接解压即可
测试logstash是否可以正常运行
bin/logstash -e 'input { stdin { } } output { stdout {codec => rubydebug } }'
只获取消息
bin/logstash -e 'input { stdin { } } output { stdout {codec => plain { format => "%{message}" } } }'
2、编写logstash配置文件
2、1在logstash目录下创建conf目录
2、2在conf目录下创建文件logstash.conf,内容如下
input {
file {
type => "logs"
path => "/home/hadoop/logs/*.log"
discover_interval => 10
start_position => "beginning"
}
}output {
kafka {
codec => plain {
format => "%{message}"
}
topic_id => "spark"
}
}
logstash input: https://www.elastic.co/guide/en/logstash/current/input-plugins.html
logstash output: https://www.elastic.co/guide/en/logstash/current/output-plugins.html
3、启动logstash采集数据
bin/logstash -f conf/logstash.conf
4、代码
package bigdata.sparkimport org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}/*** Created by Administrator on 2017/4/28.*/
object SparkStreamDemo {def main(args: Array[String]) {val conf = new SparkConf()conf.setAppName("spark_streaming")conf.setMaster("local[*]")val sc = new SparkContext(conf)sc.setCheckpointDir("D:/checkpoints")sc.setLogLevel("ERROR")val ssc = new StreamingContext(sc, Seconds(5))val topics = Map("spark" -> 2)val lines = KafkaUtils.createStream(ssc, "m1:2181,m2:2181,m3:2181", "spark", topics).map(_._2)val ds1 = lines.flatMap(_.split(" ")).map((_, 1))val ds2 = ds1.updateStateByKey[Int]((x:Seq[Int], y:Option[Int]) => {Some(x.sum + y.getOrElse(0))})ds2.print()ssc.start()ssc.awaitTermination()}
}
转载于:https://www.cnblogs.com/heml/p/6796131.html
Spark Streaming整合logstash + Kafka wordCount相关推荐
- spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access
问题描述 spark streaming 使用 直连方式 读取kafka 数据,使用窗口时出现 java.util.ConcurrentModificationException: KafkaCons ...
- Spark Streaming整合flume实战
Spark Streaming对接Flume有两种方式 Poll:Spark Streaming从flume 中拉取数据 Push:Flume将消息Push推给Spark Streaming 1.安装 ...
- DStream实战之Spark Streaming整合fulme实战, Flume向Spark Streaming中push推数据 36
前言 本文所需要的安装包&Flume配置文件,博主都已上传,链接为本文涉及安装包&Flume配置文件本文涉及的安装包&Flume配置文件,请自行下载~ flume作为日志实时采 ...
- Spark Streaming实时流处理学习
目录 1.初识实时流处理 2.分布式日志收集框架Flume 3.分布式发布订阅消息系统Kafka 4.实战环境搭建 5.Spark Streaming入门 6.Spark Streaming核心概念与 ...
- Spark Streaming 2.0 读取Kafka 0.10 小例子
环境版本: Scala 2.11.8; Kafka 0.10.0.1; Spark 2.0.0 如需Scala 2.10.5; Spark 1.6.0; Kafka 0.10.0.1版本请看这篇:Fl ...
- kafka spark java_spark streaming中维护kafka偏移量到外部介质
spark streaming中维护kafka偏移量到外部介质 以kafka偏移量维护到redis为例. redis存储格式 使用的数据结构为string,其中key为topic:partition, ...
- 使用spark.streaming.kafka.consumer.poll.ms和reconnect.backoff.ms解决spark streaming消费kafka时任务不稳定的问题
问题描述 在用spark streaming程序消费kafka的数据时,遇到了一个神奇的现象:同样的数据量.相似的数据,在消费时,有些批次的数据在做map操作时神奇的多了40多秒,具体看下面的数据:在 ...
- 根据官网文档看Spark Streaming对接Kafka的两种方式, 以及如何实现Exactly Once语义
注: 本文算是本人的学习记录, 中间可能有些知识点并不成熟, 不能保证正确性. 只能算是对官网文档作了个翻译和解读, 随时有可能回来更新和纠错 上一篇文章讨论了Spark Streaming的WAL( ...
- Spark Streaming笔记整理(二):案例、SSC、数据源与自定义Receiver
[TOC] 实时WordCount案例 主要是监听网络端口中的数据,并实时进行wc的计算. Java版 测试代码如下: package cn.xpleaf.bigdata.spark.java.str ...
最新文章
- linux驱动之i2c子系统mpu6050设备驱动
- azure多功能成像好用吗_如何使用Azure功能处理高吞吐量消息
- 【安装】Mysql在Linux上安装
- 预测股票价格 模型_建立有马模型来预测股票价格
- flash动画制作作品_flash施工动画制作应该展现哪些内容
- python str函数isdigit、isdecimal、isnumeric的区别
- [转载] python随笔2(列表的增删改查)
- python单元测试框架作用_Python单元测试框架:Pytest简介
- UNIX环境高级编程之第4章:文件和目录-习题
- 缓存与IO(很经典)
- 软件工程人才的社会需求现状与发展分析
- mysql percent_MySQL PERCENT_RANK 函数
- No.005<日常><工具表>《数学符号与希腊字母表》
- 19款国产手机无一幸免:15分钟破解人脸识别,打印眼镜让刷脸形同虚设 ?
- 解决win10搜索框无法搜索本地应用或无任何反应
- Kafka处理服务器发来的消息并与数据库交互——具体流程
- 《杜拉拉升职记》职场36计
- 【BZOJ4716】假摔 二分+暴力
- XYT-EDFA光纤放大器-纤亿通谈超远距离传输神器!
- JAVALM美食推荐网计算机毕业设计Mybatis+系统+数据库+调试部署
热门文章
- 创建虚拟环境和新建工程目录
- arm opcode hook
- 谷歌移动应用强调设计元素:向极简风格转型
- python a and b_python-尽管Numpy建议a.b,为什么a.dot(b)比a @ b更...
- Pytorch RNN(详解RNN+torch.nn.RNN()实现)
- linux用户管理练习题
- 1w存银行一年多少利息_100万存银行一年利息多少?能赚多少钱?
- 求1+2+3+...+n
- AUTOSAR从入门到精通100讲(十)-DoIP协议介绍
- 人工智能AI实战100讲(七)-原理+代码实战 | 双目视觉中的极线校正