继续接     使用idea编写消费者,接收生产者的持续日志输出【小案例】(三)

https://georgedage.blog.csdn.net/article/details/103506165

使用spark-Streaming进行流式接收处理。

记得添加pom

        <dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.3.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.3.1</version></dependency>

代码如下:

package com.kafkaimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object KafkaConsumerStreamingDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[3]").setAppName("kcs")val sc = new SparkContext(conf)sc.setLogLevel("error")val topic = List("george")val map = Map("bootstrap.servers" -> "henu2:9092","group.id" -> "george","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val ssc = new StreamingContext(sc,Seconds(5))val ds = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topic, map))val mapDs = ds.map(_.value())mapDs.print()ssc.start()ssc.awaitTermination()}
}

生产者:

https://georgedage.blog.csdn.net/article/details/103504598

代码如下:

package com.kafkaimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}import scala.collection.mutable.Map
import scala.util.Randomobject KafkaProducerDemo {def main(args: Array[String]): Unit = {val props = new Properties()props.setProperty("bootstrap.servers","henu2:9092")props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")val kp = new KafkaProducer[String,String](props)val provinces = List[String]("henan", "beijing", "guangzhou", "hebei")val cities = Map[String, List[String]]()cities.put("henan", List[String]("kaifeng", "zhengzhou"))cities.put("beijing", List[String]("daxing", "haidian"))cities.put("guangzhou", List[String]("zhuhai", "zhongshan"))cities.put("hebei", List[String]("shijiazhuang", "handan"))val random = new Random()while (true) {val timestamp = System.currentTimeMillis()val userId = random.nextInt(1000)val adId = random.nextInt(50)val proIndex = random.nextInt(4)val privince = provinces(proIndex)val cIndex = random.nextInt(2)val city = cities.getOrElse(privince, List(""))(cIndex)val log = timestamp + " " + userId + " " + adId + " " + privince + " " + citykp.send(new ProducerRecord[String,String]("george",log))Thread.sleep(1000)}}
}

然后先启动生产者,在启动消费者

结果展示:

使用idea编写SparkStreaming消费kafka中的数据【小案例】(四)相关推荐

  1. 使用idea编写SparkStreaming消费kafka中的数据,并对数据进行某一指标的处理【小案例】(五)

    接    使用idea编写SparkStreaming消费kafka中的数据[小案例](四) https://georgedage.blog.csdn.net/article/details/1035 ...

  2. linux 中kafka发送数据,C++ 向kafka中发送数据

    kafka是一个分布式流处理的平台,通过kafka我们可以发布和订阅流式记录.有关kafka的介绍可以参考官网或者这篇文章https://juejin.im/post/6844903495670169 ...

  3. Flink 获取 Kafka 中的数据,分流存储到 Redis、MySQL 中

    文章目录 案例:实时处理电商订单信息 需求一:统计商城实时订单实收金额 需求二:将上面的最后计算的结果,存储到 Redis 中(Key 为:totalprice) Redis Sink 自定义 Red ...

  4. Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数

    1.创建Maven项目 创建的过程参考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2.启动Kafka A:安装kafka集 ...

  5. SparkStreaming读取Kafka的Json数据然后保存到MySQL

    一般我们使用SparkStreaming消费kafka数据,获取到数据后解析,使用JDBC的方式写入数据库,如下所示. 以上的方式没什么毛病,但是当我们消费的kafka数据类型比较多样的时候,我们需要 ...

  6. Kafka中产生数据积压的原因以及解决方案

    Kafka中产生数据积压的原因以及解决方案 1.kafka中数据积压的原因 kafka作为消息队列,其中数据积压也是经常遇到的问题之一.我们都知道,数据积压的直接原因,一定是系统中的某个部分出现了性能 ...

  7. 查看使用linkedIn Camus 把Kafka中的数据导入HDFS中生成的.deflate文件

    在使用Camus好不容易把kafka中的数据导入了HDFS,但是直接download后打开,显示的会是乱码.经查询,带.deflate后缀的文件是使用DEFLATE算法压缩过的,所以要查看,只需使用h ...

  8. flink实时消费kafka中oracle的DML数据写入mysql

    1.需要环境 zookeeper,小编安装环境为zookeeper-3.4.10 kakfa,小编安装环境为kafka_2.13-2.8.0 kafka-connect-oracle,此为kafka- ...

  9. 昨晚,我们的消费者居然停止消费kafka集群数据了

    以下文章来源方志朋的博客,回复"666"获面试宝典 图片来源:伪装者 来源 | https://juejin.im/post/6874957625998606344 笔者所在的是一 ...

最新文章

  1. nginx php站点配置文件,php网站修改默认访问文件的nginx配置
  2. Nature解析中国AI现状,2030年能引领全球吗?
  3. GoogleReader的Likes操作数据如何获取?
  4. 图文并茂的带你彻底理解悲观锁与乐观锁
  5. Eclipse中怎样使用ERMaster进行单表设计并导出为DDL
  6. 广东金融学院java实验报告_《大学计算机Ⅰ》实验报告实验三
  7. matlab不同调制方式下性能比较,用不同调制方式实现跳/扩频混合通信的抗干扰性能...
  8. 电商小程序 -- 商品多规格选择弹框
  9. 中秋佳节共团圆,送3本Python书
  10. ERP原理:第一节 ERP的总体结构
  11. 测试手机功耗软件,借助软件测试手机基本峰值功耗
  12. laravel seeder 数据填充
  13. android 音频转mp3格式,音频 (六)- 安卓 ndk 将 pcm 转换为 mp3
  14. 采样频率和带宽的关系_示波器关键参数---带宽
  15. 【卡尔曼滤波介绍与原理解析】
  16. 【HTML基础-1】HTML标签简介及常用标签
  17. 操作系统存储器管理管理试验
  18. cesium--添加模型
  19. 收藏多个不错的画架构图工具
  20. 超级条理清晰代码混淆(直接搬用即可)

热门文章

  1. SyntaxError: Missing parentheses in call to 'print'
  2. CodeForces - 1333D Challenges in school №41(构造+模拟)
  3. CodeForces - 467C George and Job(二维dp)
  4. CodeForces - 932D Tree(树上倍增,好题)
  5. POJ - 1273 Drainage Ditches(最大流)
  6. CodeForces - 353E Antichain(贪心+思维)
  7. [luoguP4705]玩游戏
  8. 朴素容斥原理[ZJOI2016][bzoj4455]小星星
  9. 【Boost】以boost::function和boost:bind取代虚函数
  10. Win32多线程编程(1) — 基础概念篇