一个消费者组可以消费多个topic,以前写过一篇一个消费者消费一个topic的,这次的是一个消费者组通过直连方式消费多个topic,做了小测试,结果是正确的,通过查看zookeeper的客户端,zookeeper记录了偏移量

package day04

/*
消费多个topic
*/
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import scala.collection.mutable.ListBuffer
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}

object OrderDemoYY1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("yy").setMaster("local[*]")
val ssc = new StreamingContext(conf,Duration(5000))
//消费3个topic
val topic1 = "wc"
val topic2 ="wc1"
val topic3 ="wc2"
//组名
val groupid ="GPMMVV"
//zookeeper地址
val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
//brokerList
val brokerList = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
//把消费的分区放到Set集合中,可以在第一次读取时作为参数传入
val topics = Set(topic1,topic2,topic3)
//ListBuffer时有序的,按下标有序
val topicsList = ListBuffer[String](topic1,topic2,topic3)
//设置kafka的参数
val kafkaParams = Map(
"metadata.broker.list"->brokerList,
"groupid"->groupid,
"auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString
//默认时从头开始读的
)

//new ListBuffer用来存放ZKGroupTopicDirs, 用来保存偏移量的地址
//因为有多个topic,对应的也就有多个ZKGroupTopicDirs
var zkGTList:ListBuffer[ZKGroupTopicDirs] =new ListBuffer[ZKGroupTopicDirs]()
//根据topicList 新建 ZKGroupTopicDirs 添加到zkGTList
for(tp <- topicsList){
val topicDirs = new ZKGroupTopicDirs(groupid,tp)
zkGTList += topicDirs
}
//新建zkClient,用来获取偏移量和更新偏移量
val zkClient = new ZkClient(zkQuorum)
//新建一个InputDStream,要是var,因为有两种情况,消费过? 没有消费过? 根据情况赋值
var kafkaDStream :InputDStream[(String,String)] = null
//创建一个Map,(key,value)-》( 对应的时Topic和分区 ,偏移量)
var fromOffset = Map[TopicAndPartition,Long]()

//获取每个topic是否被消费过
var childrens:ListBuffer[Int] =new ListBuffer[Int]()
var flag = false //有topic被消费过则为true
for (topicDir <- zkGTList){ //循环存放偏移量的
//通过zkClient.countChidren来获取每个topic对应的分区中的偏移量ZKGroupTopicDirs的对象
val child: Int = zkClient.countChildren(topicDir.consumerOffsetDir)
childrens +www.mhylpt.com= child
if(child>0){
flag = true
}
}

if(flag){//消费过
for(z <- 0 until topics.size){ //根据topicsList的的下表获取相应的child和ZKGroupTopicDirs
val child = childrens(z)
val gpDirs = zkGTList(z)
val topicn = topicsList(z)
for(i <- 0 until child)www.mcyllpt.com/{
//循环child, 根据使用zkClient.readData方法,u获取topic的每个分区的偏移量
val offset = zkClient.readData[String](gpDirs.consumerOffsetDir+"/"+i)
val tp = new TopicAndPartition(www.michenggw.com/ topicn,i)
fromOffset += tp -> offset.toLong
}
}
//返回的而结果是 kafka的key,默认是null, value是kafka中的值
val messageHandler =www.gcyl159.com/ (mmd:MessageAndMetadata[String,String])=www.gcyl152.com>{
(mmd.key(),mmd.message())
}
//创建kafkaDStream
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](
ssc,kafkaParams,fromOffset,messageHandler
)
}else{//以前没有读取过
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
ssc,kafkaParams,topics
)
}

/*val children1 = zkClient.countChildren(zKGroupTopicDirs1.consumerOffsetDir)
val children2 = zkClient.countChildren(zKGroupTopicDirs2.consumerOffsetDir)
if(children1>0 || children2>0){
if(children1>0){
for (i <- 0 until children1){
val offset = zkClient.readData[String](zKGroupTopicDirs1.consumerOffsetDir+"/"+i)
val tp = new TopicAndPartition(topic1,i)
fromOffset += tp ->offset.toLong
}
}
if(children2>0){
for (i <- 0 until children1){
val offset = zkClient.readData[String](zKGroupTopicDirs2.consumerOffsetDir+"/"+i)
val tp = new TopicAndPartition(topic2,i)
fromOffset += tp ->offset.toLong
}
}
val messageHandler =(mmd:MessageAndMetadata[String,String])=>{
(mmd.key(),mmd.message())
}
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,
kafkaParams,fromOffset,messageHandler)
}else{
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
}*/

var offsetRanges = Array[OffsetRange]www.hjpt521.com() //用来记录更新的每个topic的分区偏移量

kafkaDStream.foreachRDD(kafkaRDD=>{
//kafkaRDD是一个KafkaRDD,可以转换成HasOffsetRanges对象,从而获取offsetRanges
offsetRanges= kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
kafkaRDD.foreach(println)www.365soke.com //打印

for(o <- offsetRanges){
val topicNN: String = o.topic //获取topic
val offset: Long = o.untilOffset //获取偏移量
val partition: Int = o.partition //获取分区
val i = topicsList.indexOf(topicNN) //通过topicList查找topic的下标,找到与之对应的ZKGroupTopicDirs
val gpDir = zkGTList(i)
//通过ZkUtils更新偏移量
ZkUtils.updatePersistentPath(zkClient,gpDir.consumerOffsetDir+"/"+partition,offset.toString)
/*if(topicNN.equals(topic1)){
ZkUtils.updatePersistentPath(zkClient,zKGroupTopicDirs1.consumerOffsetDir+"/"+partition,offset.toString)
}else if(topicNN.equals(topic2)){
ZkUtils.updatePersistentPath(zkClient,zKGroupTopicDirs2.consumerOffsetDir+"/"+partition,offset.toString)
}*/
}
})

ssc.start()
ssc.awaitTermination(www.dfgjyl.cn)

可以通过zookeeper的客户端,在/consumers中查看偏移量,
我的3个topic中,其中wc和wc1只有1个分区,可以通过下图可看出wc1的0分区偏移量13

转载于:https://www.cnblogs.com/qwangxiao/p/9971006.html

kafka直连方式消费多个topic相关推荐

  1. kafka同一个group 消费两个topic吗_MQ: 一张图读懂kafka工作原理

    1.关于kafka Kafka是由Apache软件基金会开发的一个开源消息队列,由Scala和Java编写. 相关文章参考: MQ: 消息队列常见应用场景及主流消息队列ActiveMQ.RabbitM ...

  2. .net Kafka.Client多个Consumer Group对Topic消费不能完全覆盖研究总结(二)

    依据Partition和Consumer的Rebalance策略,找到Kafka.Client Rebalance代码块,还原本地环境,跟踪调试,发现自定义Consumer Group 的Consum ...

  3. Kafka消费者订阅方式

    Kafka消费者订阅方式 1.指定主题消费 2.指定分区消费 3.取消订阅 4.总结 Kafka为消费者提供了三种类型的订阅消费方式:订阅主题集合.正则表达式订阅主题.订阅指定主题的分区集合.三种方式 ...

  4. kafka怎么保证数据消费一次且仅消费一次?使用消息队列如何保证幂等性?

    精确一次处理语义(exactly onece semantic–EOS),Kafka的EOS主要体现在3个方面: 1)幂等producer 保证单个分区的只会发送一次,不会出现重复消息 2)事务(tr ...

  5. spring整合kafka项目生产和消费测试结果记录(一)

    使用spring+springMVC+mybatis+kafka做了两个web项目,一个是生产者,一个是消费者. 通过JMeter测试工具模拟100个用户并发访问生产者项目,发送json数据给生产者的 ...

  6. Dubbo——初识RPC、Dubbo框架、使用直连方式实现Dubbo

    文章目录: 1.RPC & 软件架构 1.1 单一应用架构 1.2 分布式微服务架构 1.3 RPC 2.Dubbo概述 2.1基本架构 2.2 dubbo支持的协议 3.直连方式实现dubb ...

  7. zookeeper kafka迁移后logstash消费不了Kafka消息

    zookeeper和kafka安装在同3台机器上,组成一个集群,对外提供服务 因为种种原因,需要将kafka和zookeeper 做整体迁移 首先 申请3台机器部署好zookeeper服务,加到原来的 ...

  8. 用JDBC直连方式访问SQL Server 2005详解

    用JDBC直连方式访问SQL Server 2005详解 1.安装JDK,配置其环境变量:(笔者所用版本为1.6版) (1)从官方网http://java.sun.com/jdk下载安装文件. (2) ...

  9. Kafka消费者群组消费不到消息解决办法

    测试环境发Kafka消息,不能消费,我让测试重启一下brokekr,再发消息,发现能正常消费了.

最新文章

  1. 基于Foursquare, 我们还能做什么 ?
  2. 全球及中国皮裤行业消费需求及未来产销前景预测报告2022-2027年
  3. OpenSSL 与 SSL 数字证书概念贴
  4. 《MySQL——幻读与next-key lock与间隙锁带来的死锁》
  5. MySQL安装错误: unknown option '--skip-federated'
  6. ionic2.0关于表单的验证
  7. 数据结构 二叉搜索树BST的实现与应用
  8. 消息中间件学习总结(12)——Kafka与RocketMQ的多Topic对性能稳定性的影响比较分析
  9. 微软实习期的我,纠正了一位高级开发人员的错误......
  10. 当select查询为空
  11. python制作照片_python3一键排版证件照(一寸照、二寸照),附源代码
  12. 2015-5-10分享pdf
  13. python离线语音识别_python语音识别模块
  14. 《缠中说禅108课》15:没有趋势,没有背驰
  15. 用HTML+CSS做一个漂亮简单的个人网页——樱木花道篮球3个页面 学生个人网页设计作品 学生个人网页模板 简单个人主页
  16. 小程序搜索框搜索、查询(模糊搜索、关键字查询)
  17. 白马非马----继承 (转)
  18. 自步对比学习(Self-paced Contrastive Learning with Hybrid Memory for Domain Adaptive Object Re-ID)
  19. STM32F411核心板固件库开发(二) USART配置
  20. [Python] 一行代码让你明白什么叫艺术

热门文章

  1. PWN-PRACTICE-BUUCTF-17
  2. PWN-PRACTICE-BUUCTF-2
  3. 【牛客 - 303B第十五届浙江大学宁波理工学院程序设计大赛(同步赛)】Fibonacci and Counting(Fib数性质,gcd辗转相除法性质)
  4. div内容居中和布局居中样式总结
  5. 计算机英语第六单元,计算机专业英语第六版第十单元课后汉译英,We do use other forms....这个do...
  6. java 自动启动监听_Spring Boot 启动事件和监听器,太强大了!
  7. codesys 串口通讯实例_常见的PLC程序实例,车库自动门的PLC控制!
  8. combox 增加请选择_娱乐测试:选择四种花束中的一种,测试你对婚姻的看法
  9. java 基本类型 引用类型_Java中的基本类型和引用类型变量的区别
  10. Mysql处理海量数据时的一些优化查询速度方法