使用idea编写SparkStreaming消费kafka中的数据【小案例】(四)
继续接 使用idea编写消费者,接收生产者的持续日志输出【小案例】(三)
https://georgedage.blog.csdn.net/article/details/103506165
使用spark-Streaming进行流式接收处理。
记得添加pom
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.3.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.3.1</version></dependency>
代码如下:
package com.kafkaimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object KafkaConsumerStreamingDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[3]").setAppName("kcs")val sc = new SparkContext(conf)sc.setLogLevel("error")val topic = List("george")val map = Map("bootstrap.servers" -> "henu2:9092","group.id" -> "george","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val ssc = new StreamingContext(sc,Seconds(5))val ds = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topic, map))val mapDs = ds.map(_.value())mapDs.print()ssc.start()ssc.awaitTermination()}
}
生产者:
https://georgedage.blog.csdn.net/article/details/103504598
代码如下:
package com.kafkaimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}import scala.collection.mutable.Map
import scala.util.Randomobject KafkaProducerDemo {def main(args: Array[String]): Unit = {val props = new Properties()props.setProperty("bootstrap.servers","henu2:9092")props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")val kp = new KafkaProducer[String,String](props)val provinces = List[String]("henan", "beijing", "guangzhou", "hebei")val cities = Map[String, List[String]]()cities.put("henan", List[String]("kaifeng", "zhengzhou"))cities.put("beijing", List[String]("daxing", "haidian"))cities.put("guangzhou", List[String]("zhuhai", "zhongshan"))cities.put("hebei", List[String]("shijiazhuang", "handan"))val random = new Random()while (true) {val timestamp = System.currentTimeMillis()val userId = random.nextInt(1000)val adId = random.nextInt(50)val proIndex = random.nextInt(4)val privince = provinces(proIndex)val cIndex = random.nextInt(2)val city = cities.getOrElse(privince, List(""))(cIndex)val log = timestamp + " " + userId + " " + adId + " " + privince + " " + citykp.send(new ProducerRecord[String,String]("george",log))Thread.sleep(1000)}}
}
然后先启动生产者,在启动消费者
结果展示:
使用idea编写SparkStreaming消费kafka中的数据【小案例】(四)相关推荐
- 使用idea编写SparkStreaming消费kafka中的数据,并对数据进行某一指标的处理【小案例】(五)
接 使用idea编写SparkStreaming消费kafka中的数据[小案例](四) https://georgedage.blog.csdn.net/article/details/1035 ...
- linux 中kafka发送数据,C++ 向kafka中发送数据
kafka是一个分布式流处理的平台,通过kafka我们可以发布和订阅流式记录.有关kafka的介绍可以参考官网或者这篇文章https://juejin.im/post/6844903495670169 ...
- Flink 获取 Kafka 中的数据,分流存储到 Redis、MySQL 中
文章目录 案例:实时处理电商订单信息 需求一:统计商城实时订单实收金额 需求二:将上面的最后计算的结果,存储到 Redis 中(Key 为:totalprice) Redis Sink 自定义 Red ...
- Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数
1.创建Maven项目 创建的过程参考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2.启动Kafka A:安装kafka集 ...
- SparkStreaming读取Kafka的Json数据然后保存到MySQL
一般我们使用SparkStreaming消费kafka数据,获取到数据后解析,使用JDBC的方式写入数据库,如下所示. 以上的方式没什么毛病,但是当我们消费的kafka数据类型比较多样的时候,我们需要 ...
- Kafka中产生数据积压的原因以及解决方案
Kafka中产生数据积压的原因以及解决方案 1.kafka中数据积压的原因 kafka作为消息队列,其中数据积压也是经常遇到的问题之一.我们都知道,数据积压的直接原因,一定是系统中的某个部分出现了性能 ...
- 查看使用linkedIn Camus 把Kafka中的数据导入HDFS中生成的.deflate文件
在使用Camus好不容易把kafka中的数据导入了HDFS,但是直接download后打开,显示的会是乱码.经查询,带.deflate后缀的文件是使用DEFLATE算法压缩过的,所以要查看,只需使用h ...
- flink实时消费kafka中oracle的DML数据写入mysql
1.需要环境 zookeeper,小编安装环境为zookeeper-3.4.10 kakfa,小编安装环境为kafka_2.13-2.8.0 kafka-connect-oracle,此为kafka- ...
- 昨晚,我们的消费者居然停止消费kafka集群数据了
以下文章来源方志朋的博客,回复"666"获面试宝典 图片来源:伪装者 来源 | https://juejin.im/post/6874957625998606344 笔者所在的是一 ...
最新文章
- nginx php站点配置文件,php网站修改默认访问文件的nginx配置
- Nature解析中国AI现状,2030年能引领全球吗?
- GoogleReader的Likes操作数据如何获取?
- 图文并茂的带你彻底理解悲观锁与乐观锁
- Eclipse中怎样使用ERMaster进行单表设计并导出为DDL
- 广东金融学院java实验报告_《大学计算机Ⅰ》实验报告实验三
- matlab不同调制方式下性能比较,用不同调制方式实现跳/扩频混合通信的抗干扰性能...
- 电商小程序 -- 商品多规格选择弹框
- 中秋佳节共团圆,送3本Python书
- ERP原理:第一节 ERP的总体结构
- 测试手机功耗软件,借助软件测试手机基本峰值功耗
- laravel seeder 数据填充
- android 音频转mp3格式,音频 (六)- 安卓 ndk 将 pcm 转换为 mp3
- 采样频率和带宽的关系_示波器关键参数---带宽
- 【卡尔曼滤波介绍与原理解析】
- 【HTML基础-1】HTML标签简介及常用标签
- 操作系统存储器管理管理试验
- cesium--添加模型
- 收藏多个不错的画架构图工具
- 超级条理清晰代码混淆(直接搬用即可)
热门文章
- SyntaxError: Missing parentheses in call to 'print'
- CodeForces - 1333D Challenges in school №41(构造+模拟)
- CodeForces - 467C George and Job(二维dp)
- CodeForces - 932D Tree(树上倍增,好题)
- POJ - 1273 Drainage Ditches(最大流)
- CodeForces - 353E Antichain(贪心+思维)
- [luoguP4705]玩游戏
- 朴素容斥原理[ZJOI2016][bzoj4455]小星星
- 【Boost】以boost::function和boost:bind取代虚函数
- Win32多线程编程(1) — 基础概念篇