第一种方式代码:

 1 import org.apache.spark.storage.StorageLevel
 2 import org.apache.spark.{HashPartitioner, SparkConf}
 3 import org.apache.spark.streaming.kafka.KafkaUtils
 4 import org.apache.spark.streaming.{Seconds, StreamingContext}
 5
 6 object KafkaWordCount {
 7   val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
 8     //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
 9     iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i)) }
10   }
11
12   def main(args: Array[String]) {
13     LoggerLevels.setStreamingLogLevels()
14     val Array(zkQuorum, group, topics, numThreads) = args
15     val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
16     val ssc = new StreamingContext(sparkConf, Seconds(5))
17     ssc.checkpoint("c://ck2")
18     //"alog-2016-04-16,alog-2016-04-17,alog-2016-04-18"
19     //"Array((alog-2016-04-16, 2), (alog-2016-04-17, 2), (alog-2016-04-18, 2))"
20     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
21     val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)
22     val words = data.map(_._2).flatMap(_.split(" "))
23     val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
24     wordCounts.print()//老师给的代码文件中没有这句话  必须要有一个Action,否则报错
25     //java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
26     ssc.start()
27     ssc.awaitTermination()
28   }
29 }

第二种方式代码:

 1 import kafka.serializer.StringDecoder
 2 import org.apache.log4j.{Level, Logger}
 3 import org.apache.spark.SparkConf
 4 import org.apache.spark.rdd.RDD
 5 import org.apache.spark.streaming.kafka.{KafkaManager, KafkaUtils}
 6 import org.apache.spark.streaming.{Seconds, StreamingContext}
 7
 8
 9 object DirectKafkaWordCount {
10
11   /*  def dealLine(line: String): String = {
12       val list = line.split(',').toList
13   //    val list = AnalysisUtil.dealString(line, ',', '"')// 把dealString函数当做split即可
14       list.get(0).substring(0, 10) + "-" + list.get(26)
15     }*/
16
17   def processRdd(rdd: RDD[(String, String)]): Unit = {
18     val lines = rdd.map(_._2)
19     val words = lines.map(_.split(" "))
20     val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
21     wordCounts.foreach(println)
22   }
23
24   def main(args: Array[String]) {
25     if (args.length < 3) {
26       System.err.println(
27         s"""
28            |Usage: DirectKafkaWordCount <brokers> <topics> <groupid>
29            |  <brokers> is a list of one or more Kafka brokers
30            |  <topics> is a list of one or more kafka topics to consume from
31            |  <groupid> is a consume group
32            |
33         """.stripMargin)
34       System.exit(1)
35     }
36
37     Logger.getLogger("org").setLevel(Level.WARN)
38
39     val Array(brokers, topics, groupId) = args
40
41     // Create context with 2 second batch interval
42     val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
43     sparkConf.setMaster("local[*]")
44     sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "5")
45     sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
46
47     val ssc = new StreamingContext(sparkConf, Seconds(2))
48
49     // Create direct kafka stream with brokers and topics
50     val topicsSet = topics.split(",").toSet
51     val kafkaParams = Map[String, String](
52       "metadata.broker.list" -> brokers,
53       "group.id" -> groupId,
54       "auto.offset.reset" -> "smallest"
55     )
56
57     val km = new KafkaManager(kafkaParams)
58
59     val messages = km.createDirectStream[String, String, StringDecoder, StringDecoder](
60       ssc, kafkaParams, topicsSet)
61
62     messages.foreachRDD(rdd => {
63       if (!rdd.isEmpty()) {
64         // 先处理消息
65         processRdd(rdd)
66         // 再更新offsets
67         km.updateZKOffsets(rdd)
68       }
69     })
70
71     ssc.start()
72     ssc.awaitTermination()
73   }
74 }

关于第二种方式可以参考:

http://blog.csdn.net/ligt0610/article/details/47311771

转载于:https://www.cnblogs.com/DreamDrive/p/6810238.html

Kafka连接SparkStreaming的两种方式相关推荐

  1. 动态连接库的两种方式

    动态连接库的两种方式? 答案:调用一个DLL中的函数有两种方法: 1.载入时动态链接(load-time dynamic linking),模块非常明确调用某个导出函数,使得他们就像本地函数一样.这需 ...

  2. php mysql 连接方法 对比_Mysql实例php连接MySQL的两种方式对比

    <Mysql实例php连接MySQL的两种方式对比>要点: 本文介绍了Mysql实例php连接MySQL的两种方式对比,希望对您有用.如果有疑问,可以联系我们. MYSQL数据库记录一下P ...

  3. beeline连接hive的两种方式

    前提先要启动hiveserver2,不启动怎么连接呢. hive --service hiveserver2 & 方式1 先登录beeline,进入beeline的命令行环境,然后连接 [ro ...

  4. Kafka结合Spark-streaming 的两种连接方式(AWL与直连)

    kafka结合spark-streaming的用法及说明之前博客有些,这里就不赘述了. 这篇文章说下他们结合使用的两种连接方式.(AWL与直连) 先看一张图: 这是kafka与streaming结合的 ...

  5. Kafka创建Topic的两种方式

    创建topic的两种方式: 方法一: bin/kafka-topics.sh --create --topic topicname --replication-factor 1 --partition ...

  6. adb连接手机的两种方式

    首先,感恩原创:https://www.cnblogs.com/leo0621/p/9158698.html adb连接手机进行调试有两种方式,一种使用USB线,一种使用无线WiFi. 第一种 使用U ...

  7. C++连接mysql的两种方式(ADO连接和mysql api连接)

    一.ADO连接mysql 1.安装mysql-5.5.20-win32.msi和mysql-connector-odbc-5.3.4-win32.msi(一般两个安装程序要匹配,否则可能连接不上)   ...

  8. java代码怎样连接es,Elasticsearch 连接ES的两种方式

    1.创建客户端节点来连接: 其中client(true)将node指定为客户端节点,所以这个不能写漏掉,客户端节点是不持有数据的, Java代码   Node node = NodeBuilder.n ...

  9. 树莓派开机连接桌面的两种方式

    目录 1.利用HDMI线接显示屏 2.利用VNC Viewer远程无线连接树莓派桌面 (1)用笔记本电脑开启热点 (2)在SD卡里设置好网络配置 (3)将SD卡安到树莓派中,查IP地址 (4)利用远程 ...

  10. 根据官网文档看Spark Streaming对接Kafka的两种方式, 以及如何实现Exactly Once语义

    注: 本文算是本人的学习记录, 中间可能有些知识点并不成熟, 不能保证正确性. 只能算是对官网文档作了个翻译和解读, 随时有可能回来更新和纠错 上一篇文章讨论了Spark Streaming的WAL( ...

最新文章

  1. expdp\impdp及exp\imp
  2. [转]web.xml 中的listener、 filter、servlet 加载顺序及其详解
  3. win7下安装 python2 和python3
  4. c语言简单的24点游戏,C语言解24点游戏程序
  5. java自定义标签遍历_自定义标签 - CarlDing的个人页面 - OSCHINA - 中文开源技术交流社区...
  6. 《南溪的目标检测学习笔记》——backbone的学习笔记
  7. 英文如何区分小括号和花括号
  8. 满月啦,Linux公众号!
  9. 一文了解地理数据和三维地理信息系统
  10. 试简述smtp通信的三个阶段的过程_对通信技术来说,物联网起了什么样的作用?...
  11. python numpy的shape函数
  12. 【c语言】数字金字塔
  13. 邮箱授权码正确,却连接失败
  14. 一个电子商务网站的设计及开发环境配置文档
  15. RSA整理--频谱路由分配算法
  16. 学计算机上海哪个学校好,上海的大学中哪几所学校计算机系比较好
  17. 工欲善其事必先利其器(Windows)
  18. 微信小程序模板消息还能群发?无限制推送?
  19. 比top更优秀的htop
  20. python 组合优化 回撤最小_Python进阶量化交易专栏场外篇23-Markowitz实现股票最优组合...

热门文章

  1. java 字符串数字验证_验证一个字符串是否由数字组成(Java)
  2. SVN commit failed: 'xxx' is not under version control
  3. 牛客练习赛29 F 算式子
  4. 179 Largest Number 把数组排成最大的数
  5. js关于两个字符串的加减乘除运算
  6. 匹配0-59 0-23 的正则表达式
  7. linux中ONBOOT=yes
  8. Java基础之访问文件与目录——获取与文件存储有关的信息(GetFileStores)
  9. JAVAIO流经验总结
  10. Exchange 2016通过IIS限制不允许访问OWA的域名或网段