Kafka之四:Kafka与Streaming集成
Kafka之四:Kafka与Streaming集成
文章目录
- Kafka之四:Kafka与Streaming集成
- 1. 修改IEDA的maven配置
- 2. 程序一
- 3. 程序二:统计次数
- 4. 提交任务可能遇到的错误
Kafka之一:Kafka简述
Kafka之二:Kafka集群的安装
Kafka之三:Kafka集群工作流程
- spark官网
- Kafka作为spark Streaming的一种输入源,当Kafka和Streaming集成时充当消费者角色。(请了解Kafka命令操作)
1. 修改IEDA的maven配置
按照官网配置修改maven配置文件pom.xml
org.apache.spark
spark-streaming-kafka-0-10_2.11
2.1.1
2. 程序一
(1) 官网实例:
package com.spark.kafkaimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}object Test01 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("xxx")val ssc = new StreamingContext(conf,Seconds(5))val kafkaParams = Map[String,Object]("bootstrap.servers" -> "Cloud01:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "earliest")val topics = Array("first")val stream = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,kafkaParams))stream.map(record => (record.key,record.value)).print()ssc.start()ssc.awaitTermination()}}
(2)提交运行
- a. 打包上传到主机上
- b. 开启HDFS、zookeeper集群、spark集群、kafka集群(具体操作请参考对应文章)
以某一台主机为例(提示:检测防火墙状态并关闭)
- c. spark提交任务
bin/spark-submit \--class com.spark.kafka.Test01 \/home/duck/Lesson-1.0-SNAPSHOT.jar
- d. 启动kafka生产者,并发送消息
bin/kafka-console-producer.sh --broker-list Cloud01:9092 --topic first
- e. 任务后台显示结果
3. 程序二:统计次数
(1)代码
package com.spark.kafkaimport org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object Test01 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("xxx")val ssc = new StreamingContext(conf,Seconds(5))val kafkaParams = Map[String,Object]("bootstrap.servers" -> "Cloud01:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "earliest")val topics = Array("first")val stream = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,kafkaParams))stream.foreachRDD(rdd=>{val unit:RDD[(String,Int)] = rdd.map(_.value()).flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)unit.foreach(println)})ssc.start()ssc.awaitTermination()}
}
(2)提交运行(详细步骤请参考程序一中的提交运行)
- a. 打包上传
- b. 开启HDFS、zookeeper集群、spark集群、kafka集群
- c. spark提交任务
- d. 生产者发送消息
- e. 任务后台结果
4. 提交任务可能遇到的错误
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/StringDeserializer
原因:~/software/spark/jars目录下缺少两个jar包
解决:导入jar包(提供jar包资源 提取码:rre8)
找到本地jar包路径
spark-streaming-kafka-0-10_2.11-2.1.1
kafka-clients-0.10.0.1
然后将两个jar包放入~/software/spark/jars目录下
--->有问题请联系QQ1436281495^_^
Kafka之四:Kafka与Streaming集成相关推荐
- sparks streaming集成kafka注意要点
2018-04-18: 转载请注明出处:落在地上的乐乐https://blog.csdn.net/qq_35946969/article/details/79995514 sparks streami ...
- 大数据Spark Structured Streaming集成 Kafka
目录 1 Kafka 数据消费 2 Kafka 数据源 3 Kafka 接收器 3.1 配置说明 3.2 实时数据ETL架构 3.3 模拟基站日志数据 3.4 实时增量ETL 4 Kafka 特定配置 ...
- kafka(组件分析 整合springboot集成 实战)
kafka 组件 搭建 springboot集成 实战 kafka 组件 搭建 springboot集成 实战 1.应用场景 1.1 kafka场景 1.2 kafka特性 1.3 消息对比 1.4 ...
- Kafka 入门和 Spring Boot 集成
2019独角兽企业重金招聘Python工程师标准>>> Kafka 入门和 Spring Boot 集成 概述 kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流 ...
- Kafka学习之四 Kafka常用命令
2019独角兽企业重金招聘Python工程师标准>>> Kafka学习之四 Kafka常用命令 Kafka常用命令 以下是kafka常用命令行总结: 1.查看topic的详细信息 . ...
- [TODO]Kafka及Kafka Streaming架构熟悉
基本概念 名称 解释 Broker 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群 Topic Kafka根据topic对消息进行归类, ...
- 【檀越剑指大厂—kafka】kafka高阶篇
一.认识 kafka 1.kafka 的定义? Kafka 传统定义:Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域.发布/订阅:消 ...
- 聊聊 Kafka: Kafka 的基础架构
一.我与快递小哥的故事 一个很正常的一个工作日,老周正在忙着啪啪啪的敲代码,办公司好像安静的只剩敲代码的声音.突然,我的电话铃声响起了,顿时打破了这种安静. 我:喂,哪位? 快递小哥:我是顺丰快递的, ...
- 【kafka】kafka 生态系统 Ecosystem
1.概述 官网:kafka 生态系统 Ecosystem 下面是我们被告知在主发行版之外与Kafka集成的工具列表.我们还没有全部尝试过,所以它们可能不起作用! 当然,Clients在这里是单独列出的 ...
最新文章
- 软件需求管理用例方法二
- matlab多径误差包络,MIMO-OFDM系统添加多径信道仿真结果误差很大
- centos下docker1.7 上传文件到容器报错 Error: Path not specified
- 用Tableau制作滚动时间轴(上)
- java面试笔试大汇总(一)
- 提高SQL语句的性能
- Hibernate Query Language(HQL)。
- SilverLight Test
- 今天的圆圆的深圳4j
- 两杯咖啡只卖2.5?瑞幸价格出错被“薅羊毛” 官方道歉了...
- 单元格填充为0的html,Excel如何在数据信息表中把空单元格填写为数字“0”
- php mysql中华图书购物商城
- mysql修改表、字段、库的字符集
- Ubuntu20.04环境下 安装Tensorflow
- 双系统bios如何设置默认启动系统_BIOS(主板)常用功能:设置启动磁盘顺序,迁移系统必备...
- 静态网页制作前夕小记录
- vs2003远程调试方法
- AIX上通过FTP下载压缩包损坏
- http://www.cnblogs.com/xd502djj/p/3473516.html
- 【跨域】 关于跨域的一些知识整合