sparkstreaming消费receive
package two
/**
* Created by zhoucw on 上午2:11.
*/
import java.util.HashMap
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
* <zkQuorum> is a list of one or more zookeeper servers that make quorum
* <group> is the name of kafka consumer group
* <topics> is a list of one or more kafka topics to consume from
* <numThreads> is the number of threads the kafka consumer should use
*
* Example:
* `$ bin/run-example \
* org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
* my-consumer-group topic1,topic2 1`
*/
object ReceiveKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
// scalastyle:o
转载于:https://www.cnblogs.com/heguoxiu/p/10149665.html
sparkstreaming消费receive相关推荐
- 使用idea编写SparkStreaming消费kafka中的数据,并对数据进行某一指标的处理【小案例】(五)
接 使用idea编写SparkStreaming消费kafka中的数据[小案例](四) https://georgedage.blog.csdn.net/article/details/1035 ...
- 使用idea编写SparkStreaming消费kafka中的数据【小案例】(四)
继续接 使用idea编写消费者,接收生产者的持续日志输出[小案例](三) https://georgedage.blog.csdn.net/article/details/103506165 ...
- SparkStreaming消费kafka数据时出现序列化问题 org.apache.kafka.common.serialization.StringDeserializer could not b
问题呈现 Invalid value org.apache. kafka.common.serialization.StringSerializer for configuration key.ser ...
- 大数据开发笔记(八):Sparkstreaming
一.Spark Streaming处理框架: Spark Streaming接收Kafka.Flume.HDFS等各种来源的实时输入数据,可以使用诸如map.reduce.join等高级函数进行复杂算 ...
- Sparkstreaming实时开发详解
一.Spark Streaming处理框架: Spark Streaming接收Kafka.Flume.HDFS等各种来源的实时输入数据,可以使用诸如map.reduce.join等高级函数进行复杂算 ...
- sparkStreaming连接kafka整合hbase和redis
sparkStreaming消费kafka数据,并将数据保存到redis和hbase当中去,实现实时 import org.apache.hadoop.hbase.client.{Admin, Con ...
- SparkStreaming面试题
1. SparkStreaming第一次运行不丢失数据 kafka参数auto.offset.reset设置为earliest从最初的偏移量开始消费数据. 2. SparkStreaming精准一次性 ...
- spark-streaming从入门到精通
1.spark streaming获取kafka的数据有两种形式:(现在基本都是用direct方式了) receiver 通过zookeeper来连接kafka队列来获取数据.如果要做到容错,就要启用 ...
- Kafka启动成功且运行程序无报错,无法消费数据,即外网无法连接Kafka的消费者或生产者
Kafka启动成功且运行程序无报错,无法消费数据,即外网无法连接Kafka的消费者或生产者 sparkStreaming消费kafka中的数据,得不到数据以及无报错信息,找错误如下 首先检查一下,Ka ...
- SparkStreaming读取Kafka的Json数据然后保存到MySQL
一般我们使用SparkStreaming消费kafka数据,获取到数据后解析,使用JDBC的方式写入数据库,如下所示. 以上的方式没什么毛病,但是当我们消费的kafka数据类型比较多样的时候,我们需要 ...
最新文章
- [ruby] wxRuby安装
- 产品策划系列:洞察需求(四)
- [原+转]CSS hack 小技巧 让你的CSS 兼容ff ie6.0 ie7.0
- 好程序员分享如何看待CSS中BEM的命名方式?
- ORACLE11g R2下载地址
- android 音频压缩 silk,有损音频压缩格式大比拼─MP3、Ogg、AAC、HE-AAC、HE-AACv2、Opus究竟谁才是王者?...
- 飞利浦 PHILIPS 电动牙刷HX6730 拆解
- latex normal是几号字_LaTeX 中英文字体字号设置
- windows启动时自动运行程序四种方法(登录或不登录都可以的)
- 【canvas画图】画一个彩虹
- Android Studio Text组件介绍
- java解析json天气api,免费天气API,全国天气 JSON API接口,可以获取五天的天气预报...
- cool edit工具介绍及使用
- 【优化模型】图论与TSP模型结合
- iOS——MVC设计模式
- 为了网络安全被束缚的无人机背后的故事!
- win10上elasticsearch-head显示集群健康值未连接问题
- Dubbo 3 易用性升级之 Dubbo 官网大改版
- React (四)— 复杂组件
- Alertmanager 使用阿里云电话告警。
热门文章
- 优秀!5部顶级数学纪录片,假期看剧涨知识必备!
- 如何成为一名卓越的数据科学家——开篇七剑
- 建模算法(八)——插值
- 1.不同角度的性能测试
- BigDecimal 基本使用 比较大小和加减乘除
- spring boot2整合dubbox全注解
- Nginx配置多域名代理
- C/C++ 跨平台交叉编译、静态库/动态库编译、MinGW、Cygwin、CodeBlocks使用原理及链接参数选项...
- 写一个程序,打印数字1到100,3的倍数打印“Fizz”来替换这个数,5的倍数打印“Buzz”,对于既是3的倍数又是5的倍数的数字打印“FizzBuzz”...
- 对于拼接进去的html原来绑定的jq事件失效