Kafka连接SparkStreaming的两种方式
第一种方式代码:
1 import org.apache.spark.storage.StorageLevel 2 import org.apache.spark.{HashPartitioner, SparkConf} 3 import org.apache.spark.streaming.kafka.KafkaUtils 4 import org.apache.spark.streaming.{Seconds, StreamingContext} 5 6 object KafkaWordCount { 7 val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => { 8 //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x))) 9 iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i)) } 10 } 11 12 def main(args: Array[String]) { 13 LoggerLevels.setStreamingLogLevels() 14 val Array(zkQuorum, group, topics, numThreads) = args 15 val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]") 16 val ssc = new StreamingContext(sparkConf, Seconds(5)) 17 ssc.checkpoint("c://ck2") 18 //"alog-2016-04-16,alog-2016-04-17,alog-2016-04-18" 19 //"Array((alog-2016-04-16, 2), (alog-2016-04-17, 2), (alog-2016-04-18, 2))" 20 val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap 21 val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER) 22 val words = data.map(_._2).flatMap(_.split(" ")) 23 val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true) 24 wordCounts.print()//老师给的代码文件中没有这句话 必须要有一个Action,否则报错 25 //java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 26 ssc.start() 27 ssc.awaitTermination() 28 } 29 }
第二种方式代码:
1 import kafka.serializer.StringDecoder 2 import org.apache.log4j.{Level, Logger} 3 import org.apache.spark.SparkConf 4 import org.apache.spark.rdd.RDD 5 import org.apache.spark.streaming.kafka.{KafkaManager, KafkaUtils} 6 import org.apache.spark.streaming.{Seconds, StreamingContext} 7 8 9 object DirectKafkaWordCount { 10 11 /* def dealLine(line: String): String = { 12 val list = line.split(',').toList 13 // val list = AnalysisUtil.dealString(line, ',', '"')// 把dealString函数当做split即可 14 list.get(0).substring(0, 10) + "-" + list.get(26) 15 }*/ 16 17 def processRdd(rdd: RDD[(String, String)]): Unit = { 18 val lines = rdd.map(_._2) 19 val words = lines.map(_.split(" ")) 20 val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) 21 wordCounts.foreach(println) 22 } 23 24 def main(args: Array[String]) { 25 if (args.length < 3) { 26 System.err.println( 27 s""" 28 |Usage: DirectKafkaWordCount <brokers> <topics> <groupid> 29 | <brokers> is a list of one or more Kafka brokers 30 | <topics> is a list of one or more kafka topics to consume from 31 | <groupid> is a consume group 32 | 33 """.stripMargin) 34 System.exit(1) 35 } 36 37 Logger.getLogger("org").setLevel(Level.WARN) 38 39 val Array(brokers, topics, groupId) = args 40 41 // Create context with 2 second batch interval 42 val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") 43 sparkConf.setMaster("local[*]") 44 sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "5") 45 sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 46 47 val ssc = new StreamingContext(sparkConf, Seconds(2)) 48 49 // Create direct kafka stream with brokers and topics 50 val topicsSet = topics.split(",").toSet 51 val kafkaParams = Map[String, String]( 52 "metadata.broker.list" -> brokers, 53 "group.id" -> groupId, 54 "auto.offset.reset" -> "smallest" 55 ) 56 57 val km = new KafkaManager(kafkaParams) 58 59 val messages = km.createDirectStream[String, String, StringDecoder, StringDecoder]( 60 ssc, kafkaParams, topicsSet) 61 62 messages.foreachRDD(rdd => { 63 if (!rdd.isEmpty()) { 64 // 先处理消息 65 processRdd(rdd) 66 // 再更新offsets 67 km.updateZKOffsets(rdd) 68 } 69 }) 70 71 ssc.start() 72 ssc.awaitTermination() 73 } 74 }
关于第二种方式可以参考:
http://blog.csdn.net/ligt0610/article/details/47311771
转载于:https://www.cnblogs.com/DreamDrive/p/6810238.html
Kafka连接SparkStreaming的两种方式相关推荐
- 动态连接库的两种方式
动态连接库的两种方式? 答案:调用一个DLL中的函数有两种方法: 1.载入时动态链接(load-time dynamic linking),模块非常明确调用某个导出函数,使得他们就像本地函数一样.这需 ...
- php mysql 连接方法 对比_Mysql实例php连接MySQL的两种方式对比
<Mysql实例php连接MySQL的两种方式对比>要点: 本文介绍了Mysql实例php连接MySQL的两种方式对比,希望对您有用.如果有疑问,可以联系我们. MYSQL数据库记录一下P ...
- beeline连接hive的两种方式
前提先要启动hiveserver2,不启动怎么连接呢. hive --service hiveserver2 & 方式1 先登录beeline,进入beeline的命令行环境,然后连接 [ro ...
- Kafka结合Spark-streaming 的两种连接方式(AWL与直连)
kafka结合spark-streaming的用法及说明之前博客有些,这里就不赘述了. 这篇文章说下他们结合使用的两种连接方式.(AWL与直连) 先看一张图: 这是kafka与streaming结合的 ...
- Kafka创建Topic的两种方式
创建topic的两种方式: 方法一: bin/kafka-topics.sh --create --topic topicname --replication-factor 1 --partition ...
- adb连接手机的两种方式
首先,感恩原创:https://www.cnblogs.com/leo0621/p/9158698.html adb连接手机进行调试有两种方式,一种使用USB线,一种使用无线WiFi. 第一种 使用U ...
- C++连接mysql的两种方式(ADO连接和mysql api连接)
一.ADO连接mysql 1.安装mysql-5.5.20-win32.msi和mysql-connector-odbc-5.3.4-win32.msi(一般两个安装程序要匹配,否则可能连接不上) ...
- java代码怎样连接es,Elasticsearch 连接ES的两种方式
1.创建客户端节点来连接: 其中client(true)将node指定为客户端节点,所以这个不能写漏掉,客户端节点是不持有数据的, Java代码 Node node = NodeBuilder.n ...
- 树莓派开机连接桌面的两种方式
目录 1.利用HDMI线接显示屏 2.利用VNC Viewer远程无线连接树莓派桌面 (1)用笔记本电脑开启热点 (2)在SD卡里设置好网络配置 (3)将SD卡安到树莓派中,查IP地址 (4)利用远程 ...
- 根据官网文档看Spark Streaming对接Kafka的两种方式, 以及如何实现Exactly Once语义
注: 本文算是本人的学习记录, 中间可能有些知识点并不成熟, 不能保证正确性. 只能算是对官网文档作了个翻译和解读, 随时有可能回来更新和纠错 上一篇文章讨论了Spark Streaming的WAL( ...
最新文章
- expdp\impdp及exp\imp
- [转]web.xml 中的listener、 filter、servlet 加载顺序及其详解
- win7下安装 python2 和python3
- c语言简单的24点游戏,C语言解24点游戏程序
- java自定义标签遍历_自定义标签 - CarlDing的个人页面 - OSCHINA - 中文开源技术交流社区...
- 《南溪的目标检测学习笔记》——backbone的学习笔记
- 英文如何区分小括号和花括号
- 满月啦,Linux公众号!
- 一文了解地理数据和三维地理信息系统
- 试简述smtp通信的三个阶段的过程_对通信技术来说,物联网起了什么样的作用?...
- python numpy的shape函数
- 【c语言】数字金字塔
- 邮箱授权码正确,却连接失败
- 一个电子商务网站的设计及开发环境配置文档
- RSA整理--频谱路由分配算法
- 学计算机上海哪个学校好,上海的大学中哪几所学校计算机系比较好
- 工欲善其事必先利其器(Windows)
- 微信小程序模板消息还能群发?无限制推送?
- 比top更优秀的htop
- python 组合优化 回撤最小_Python进阶量化交易专栏场外篇23-Markowitz实现股票最优组合...