Kafka之四:Kafka与Streaming集成


文章目录

  • Kafka之四:Kafka与Streaming集成
    • 1. 修改IEDA的maven配置
    • 2. 程序一
    • 3. 程序二:统计次数
    • 4. 提交任务可能遇到的错误

Kafka之一:Kafka简述
Kafka之二:Kafka集群的安装
Kafka之三:Kafka集群工作流程


  • spark官网
  • Kafka作为spark Streaming的一种输入源,当Kafka和Streaming集成时充当消费者角色。(请了解Kafka命令操作)

1. 修改IEDA的maven配置


按照官网配置修改maven配置文件pom.xml

org.apache.spark
spark-streaming-kafka-0-10_2.11
2.1.1


2. 程序一

(1) 官网实例:

    package com.spark.kafkaimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}object Test01 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("xxx")val ssc = new StreamingContext(conf,Seconds(5))val kafkaParams = Map[String,Object]("bootstrap.servers" -> "Cloud01:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "earliest")val topics = Array("first")val stream = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,kafkaParams))stream.map(record => (record.key,record.value)).print()ssc.start()ssc.awaitTermination()}}

(2)提交运行

  • a. 打包上传到主机上
  • b. 开启HDFS、zookeeper集群、spark集群、kafka集群(具体操作请参考对应文章)
    以某一台主机为例(提示:检测防火墙状态并关闭)
  • c. spark提交任务
  •   bin/spark-submit \--class com.spark.kafka.Test01 \/home/duck/Lesson-1.0-SNAPSHOT.jar
    

  • d. 启动kafka生产者,并发送消息
  • bin/kafka-console-producer.sh --broker-list Cloud01:9092 --topic first
  • e. 任务后台显示结果


3. 程序二:统计次数

(1)代码

package com.spark.kafkaimport org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object Test01 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("xxx")val ssc = new StreamingContext(conf,Seconds(5))val kafkaParams = Map[String,Object]("bootstrap.servers" -> "Cloud01:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "earliest")val topics = Array("first")val stream = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,kafkaParams))stream.foreachRDD(rdd=>{val unit:RDD[(String,Int)] = rdd.map(_.value()).flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)unit.foreach(println)})ssc.start()ssc.awaitTermination()}
}

(2)提交运行(详细步骤请参考程序一中的提交运行)

  • a. 打包上传
  • b. 开启HDFS、zookeeper集群、spark集群、kafka集群
  • c. spark提交任务
  • d. 生产者发送消息
  • e. 任务后台结果

4. 提交任务可能遇到的错误

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/StringDeserializer

  • 原因:~/software/spark/jars目录下缺少两个jar包

  • 解决:导入jar包(提供jar包资源 提取码:rre8)

    找到本地jar包路径
    spark-streaming-kafka-0-10_2.11-2.1.1
    kafka-clients-0.10.0.1

然后将两个jar包放入~/software/spark/jars目录下


--->有问题请联系QQ1436281495^_^


Kafka之四:Kafka与Streaming集成相关推荐

  1. sparks streaming集成kafka注意要点

    2018-04-18: 转载请注明出处:落在地上的乐乐https://blog.csdn.net/qq_35946969/article/details/79995514 sparks streami ...

  2. 大数据Spark Structured Streaming集成 Kafka

    目录 1 Kafka 数据消费 2 Kafka 数据源 3 Kafka 接收器 3.1 配置说明 3.2 实时数据ETL架构 3.3 模拟基站日志数据 3.4 实时增量ETL 4 Kafka 特定配置 ...

  3. kafka(组件分析 整合springboot集成 实战)

    kafka 组件 搭建 springboot集成 实战 kafka 组件 搭建 springboot集成 实战 1.应用场景 1.1 kafka场景 1.2 kafka特性 1.3 消息对比 1.4 ...

  4. Kafka 入门和 Spring Boot 集成

    2019独角兽企业重金招聘Python工程师标准>>> Kafka 入门和 Spring Boot 集成 概述 kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流 ...

  5. Kafka学习之四 Kafka常用命令

    2019独角兽企业重金招聘Python工程师标准>>> Kafka学习之四 Kafka常用命令 Kafka常用命令 以下是kafka常用命令行总结: 1.查看topic的详细信息 . ...

  6. [TODO]Kafka及Kafka Streaming架构熟悉

    基本概念 名称 解释 Broker 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群 Topic Kafka根据topic对消息进行归类, ...

  7. 【檀越剑指大厂—kafka】kafka高阶篇

    一.认识 kafka 1.kafka 的定义? Kafka 传统定义:Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域.发布/订阅:消 ...

  8. 聊聊 Kafka: Kafka 的基础架构

    一.我与快递小哥的故事 一个很正常的一个工作日,老周正在忙着啪啪啪的敲代码,办公司好像安静的只剩敲代码的声音.突然,我的电话铃声响起了,顿时打破了这种安静. 我:喂,哪位? 快递小哥:我是顺丰快递的, ...

  9. 【kafka】kafka 生态系统 Ecosystem

    1.概述 官网:kafka 生态系统 Ecosystem 下面是我们被告知在主发行版之外与Kafka集成的工具列表.我们还没有全部尝试过,所以它们可能不起作用! 当然,Clients在这里是单独列出的 ...

最新文章

  1. 软件需求管理用例方法二
  2. matlab多径误差包络,MIMO-OFDM系统添加多径信道仿真结果误差很大
  3. centos下docker1.7 上传文件到容器报错 Error: Path not specified
  4. 用Tableau制作滚动时间轴(上)
  5. java面试笔试大汇总(一)
  6. 提高SQL语句的性能
  7. Hibernate Query Language(HQL)。
  8. SilverLight Test
  9. 今天的圆圆的深圳4j
  10. 两杯咖啡只卖2.5?瑞幸价格出错被“薅羊毛” 官方道歉了...
  11. 单元格填充为0的html,Excel如何在数据信息表中把空单元格填写为数字“0”
  12. php mysql中华图书购物商城
  13. mysql修改表、字段、库的字符集
  14. Ubuntu20.04环境下 安装Tensorflow
  15. 双系统bios如何设置默认启动系统_BIOS(主板)常用功能:设置启动磁盘顺序,迁移系统必备...
  16. 静态网页制作前夕小记录
  17. vs2003远程调试方法
  18. AIX上通过FTP下载压缩包损坏
  19. http://www.cnblogs.com/xd502djj/p/3473516.html
  20. 【跨域】 关于跨域的一些知识整合

热门文章

  1. python爬虫项目之携程网、大众点评和马蜂窝贵州景点差评实战汇总
  2. Shellshock 破壳漏洞(CVE-2014-6271)
  3. 51Nod基础组(Python)
  4. 饿了么UI图片上传的实现
  5. 根据Excel表头的位置数转化为对应列名称
  6. Http--跨域请求
  7. Linux服务器搭建相关教程链接整理
  8. Co-training 初探快切入
  9. 游戏美术设计怎么入门?场景建模该如何学习?
  10. 一般信道容量的计算matlab,DMC信道容量迭代计算的matlab实现