package two

/**
* Created by zhoucw on 上午2:11.
*/
import java.util.HashMap

import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
* <zkQuorum> is a list of one or more zookeeper servers that make quorum
* <group> is the name of kafka consumer group
* <topics> is a list of one or more kafka topics to consume from
* <numThreads> is the number of threads the kafka consumer should use
*
* Example:
* `$ bin/run-example \
* org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
* my-consumer-group topic1,topic2 1`
*/
object ReceiveKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}

val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()

ssc.start()
ssc.awaitTermination()
}
}

// scalastyle:o

转载于:https://www.cnblogs.com/heguoxiu/p/10149665.html

sparkstreaming消费receive相关推荐

  1. 使用idea编写SparkStreaming消费kafka中的数据,并对数据进行某一指标的处理【小案例】(五)

    接    使用idea编写SparkStreaming消费kafka中的数据[小案例](四) https://georgedage.blog.csdn.net/article/details/1035 ...

  2. 使用idea编写SparkStreaming消费kafka中的数据【小案例】(四)

    继续接     使用idea编写消费者,接收生产者的持续日志输出[小案例](三) https://georgedage.blog.csdn.net/article/details/103506165 ...

  3. SparkStreaming消费kafka数据时出现序列化问题 org.apache.kafka.common.serialization.StringDeserializer could not b

    问题呈现 Invalid value org.apache. kafka.common.serialization.StringSerializer for configuration key.ser ...

  4. 大数据开发笔记(八):Sparkstreaming

    一.Spark Streaming处理框架: Spark Streaming接收Kafka.Flume.HDFS等各种来源的实时输入数据,可以使用诸如map.reduce.join等高级函数进行复杂算 ...

  5. Sparkstreaming实时开发详解

    一.Spark Streaming处理框架: Spark Streaming接收Kafka.Flume.HDFS等各种来源的实时输入数据,可以使用诸如map.reduce.join等高级函数进行复杂算 ...

  6. sparkStreaming连接kafka整合hbase和redis

    sparkStreaming消费kafka数据,并将数据保存到redis和hbase当中去,实现实时 import org.apache.hadoop.hbase.client.{Admin, Con ...

  7. SparkStreaming面试题

    1. SparkStreaming第一次运行不丢失数据 kafka参数auto.offset.reset设置为earliest从最初的偏移量开始消费数据. 2. SparkStreaming精准一次性 ...

  8. spark-streaming从入门到精通

    1.spark streaming获取kafka的数据有两种形式:(现在基本都是用direct方式了) receiver 通过zookeeper来连接kafka队列来获取数据.如果要做到容错,就要启用 ...

  9. Kafka启动成功且运行程序无报错,无法消费数据,即外网无法连接Kafka的消费者或生产者

    Kafka启动成功且运行程序无报错,无法消费数据,即外网无法连接Kafka的消费者或生产者 sparkStreaming消费kafka中的数据,得不到数据以及无报错信息,找错误如下 首先检查一下,Ka ...

  10. SparkStreaming读取Kafka的Json数据然后保存到MySQL

    一般我们使用SparkStreaming消费kafka数据,获取到数据后解析,使用JDBC的方式写入数据库,如下所示. 以上的方式没什么毛病,但是当我们消费的kafka数据类型比较多样的时候,我们需要 ...

最新文章

  1. [ruby] wxRuby安装
  2. 产品策划系列:洞察需求(四)
  3. [原+转]CSS hack 小技巧 让你的CSS 兼容ff ie6.0 ie7.0
  4. 好程序员分享如何看待CSS中BEM的命名方式?
  5. ORACLE11g R2下载地址
  6. android 音频压缩 silk,有损音频压缩格式大比拼─MP3、Ogg、AAC、HE-AAC、HE-AACv2、Opus究竟谁才是王者?...
  7. 飞利浦 PHILIPS 电动牙刷HX6730 拆解
  8. latex normal是几号字_LaTeX 中英文字体字号设置
  9. windows启动时自动运行程序四种方法(登录或不登录都可以的)
  10. 【canvas画图】画一个彩虹
  11. Android Studio Text组件介绍
  12. java解析json天气api,免费天气API,全国天气 JSON API接口,可以获取五天的天气预报...
  13. cool edit工具介绍及使用
  14. 【优化模型】图论与TSP模型结合
  15. iOS——MVC设计模式
  16. 为了网络安全被束缚的无人机背后的故事!
  17. win10上elasticsearch-head显示集群健康值未连接问题
  18. Dubbo 3 易用性升级之 Dubbo 官网大改版
  19. React (四)— 复杂组件
  20. Alertmanager 使用阿里云电话告警。

热门文章

  1. 优秀!5部顶级数学纪录片,假期看剧涨知识必备!
  2. 如何成为一名卓越的数据科学家——开篇七剑
  3. 建模算法(八)——插值
  4. 1.不同角度的性能测试
  5. BigDecimal 基本使用 比较大小和加减乘除
  6. spring boot2整合dubbox全注解
  7. Nginx配置多域名代理
  8. C/C++ 跨平台交叉编译、静态库/动态库编译、MinGW、Cygwin、CodeBlocks使用原理及链接参数选项...
  9. 写一个程序,打印数字1到100,3的倍数打印“Fizz”来替换这个数,5的倍数打印“Buzz”,对于既是3的倍数又是5的倍数的数字打印“FizzBuzz”...
  10. 对于拼接进去的html原来绑定的jq事件失效