要创建一个KafkaSpout对象,必须要传入一个SpoutConfig对象,KafkaSpout的构造函数定义如下:

public KafkaSpout(SpoutConfig spoutConf) {_spoutConfig = spoutConf;
}

SpoutConfig继承KafkaConfig,并实现Serializable,由于在KafkaConfig中所有的属性字段都是public的因此在SpoutConfig中可以直接引用,SpoutConfig类的定义如下其中核心字段添加了注释。

public class SpoutConfig extends KafkaConfig implements Serializable {//记录zookeeper的地址列表public List<String> zkServers = null;//zookeeper端口号public Integer zkPort = null;//该参数是Consumer消费的meta信息,保存在zk的路径,自己指定public String zkRoot = null;//唯一idpublic String id = null;//向zookeeper记录offset的间隔时间public long stateUpdateIntervalMs = 2000;public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {super(hosts, topic);this.zkRoot = zkRoot;this.id = id;}
}

KafkaConfig的定义如下:

public class KafkaConfig implements Serializable {//用以获取Kafka broker和partition的信息public final BrokerHosts hosts;//读消息的topicpublic final String topic;//消息者所用的client idpublic final String clientId;//每次从kafka读取的byte数public int fetchSizeBytes = 1024 * 1024;//Consumer连接kafka server超时时间public int socketTimeoutMs = 10000;//当服务器没有新消息时,消费者会等待这些时间public int fetchMaxWait = 10000;//consumer段的缓冲区大小public int bufferSizeBytes = 1024 * 1024;//数据发送的序列化和反序列化定义的Schemepublic MultiScheme scheme = new RawMultiScheme();//是否强制从kafka中offset最小开始读数据,和startOffsetTime,一起用,默认情况下,为false,一旦startOffsetTime被设置,就要置为true  public boolean forceFromStart = false;//从何offset时间开始读,默认为最旧的offsetpublic long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//每次kafka会读取一批offset存放在list中,当zk offset比当前本地保存的commitOffse相减大于这个值时,重新设置commitOffset为当前zk offsetpublic long maxOffsetBehind = Long.MAX_VALUE;//如果所请求的offset对应的消息在Kafka中不存在,是否使用startOffsetTimepublic boolean useStartOffsetTimeIfOffsetOutOfRange = true;//多长时间统计一次metricspublic int metricsTimeBucketSizeInSecs = 60;public KafkaConfig(BrokerHosts hosts, String topic) {this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());}public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {this.hosts = hosts;this.topic = topic;this.clientId = clientId;}
}

影响初始读取进度的配置

在一个topology上线后,KafkaSpout从何处开始读消息呢,有几个配置影响读消息的位置,先罗列如下:

  • SpoutConfig中的id字段:如果想让一个topology从另一个topology之前的处理进度处读取数据,他们需要有相同的id
  • KafkaConfig的forceFromStart字段:如果该字段为true,在topology上线后会忽略之前id相同的topology的进度,重新从最早的消息处读取
  • KafkaConfig的startOffsetTime字段:默认为kafka.api.OffsetRequest.EarliestTime()开始读,也就是从Kafka中最早的消息开始处理。也可以设成kafka.api.OffsetRequest.LatestOffset,也就是最晚的消息开始读。也可以自己指定具体的值
  • KafkaConfig的maxOffsetBehind字段:这个字段对于KafkaSpout的多个处理流程都有影响。当提交一个新topology时,如果没有forceFromStart, 当KafkaSpout对某个partition的处理进度落后startOffsetTime对应的offset多于此值时,KafkaSpout会丢弃中间的消息,从而强制赶上目标进度.比如,如果startOffsetTime设成了lastestTime,那么如果进度落后超过maxOffsetBehind,KafkaSpout会直接从latestTime对应的offset开始处理。如果设成了froceFromStart,则在提交新任务时,始终会从EarliestTime开始读。
  • KafkaSpout的userStartOffsetTimeIfOffsetOutOfRange字段:如果设成true,那么当fetch消息时出错,且FetchResponse显示的出错原因是OFFSET_OUT_OF_RANGE,那么就会尝试从KafkaSpout指定的startOffsetTime对应的消息开始读。例如,如果有一批消息因为超过了保存期限被Kafka删除,并且zk里记录的消息在这批被删除的消息里。如果KafkaSpout试图从zk的记录继续读,那么就会出现OFFSET_OUT_OF_RANGE的错误,从而触发这个配置

转载于:https://www.cnblogs.com/senlinyang/p/8289926.html

Storm-kafka源码分析之Config相关类相关推荐

  1. Kafka 源码分析之网络层(二)

    上一篇介绍了概述和网络层模型实现<Kafka 源码分析之网络层(一)>,本编主要介绍在Processor中使用的nio selector的又一封装,负责具体数据的接收和发送. PS:丰富的 ...

  2. Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)

    文章很长,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈 奉上以下珍贵的学习资源: 免费赠送 经典图书:<Java高并发核心编程(卷1)> 面试必备 + 大厂必备 +涨薪 ...

  3. kafka源码分析-consumer的分区策略

    kafka源码分析-consumer的分区策略 1.AbstractPartitionAssignor 2.RangeAssignor 3.RoundRobinAssignor 4.StickyAss ...

  4. spring源码分析之BeanDefinition相关

    目录 前言: BeanDefinition的家族系列 1.BeanDefintion的UML类图 2.BeanDefintion家族类详解 2.1.通用接口 2.2.BeanDefintion接口 2 ...

  5. kafka源码分析之一server启动分析

    0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...

  6. Kafka 源码分析之网络层(一)

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据.小编会给大家带来几期 Kafka 相关的源码分析文章.这一系列文章是基于kafka 0.9.1版本,今天 ...

  7. apache kafka源码分析-Producer分析---转载

    原文地址:http://www.aboutyun.com/thread-9938-1-1.html 问题导读 1.Kafka提供了Producer类作为java producer的api,此类有几种发 ...

  8. CI源码分析(一)—config配置文件模块

    (一) 使用方式 (a) 系统级配置 语言.字符编码.session.cookie等配置项 文件位置: application/config/config.php 加载方式: 自动加载 调用方式: $ ...

  9. 远哥Amoeba源码分析之:核心类说明

    同事喜欢叫我远哥,所以我把这个笔记称为远哥系列,今天有兴趣分析一下Amoeba的源码,并且在此记录一下,并且随时更新. Amoeba目前一共有三个项目产品,分别是: Amoeba for MySQL: ...

最新文章

  1. zipfile java 解压速率,使用java.util.ZipFile在同一层次中解压缩zipfile
  2. 80热敏打印机打印TxPrnMod.dll
  3. 昨晚,B站崩了!看了网友们的评论,我差点笑死...
  4. AI理论知识整理(13)-标准基
  5. c++类名字查找与类的作用域
  6. java.lang.UnsupportedClassVersionError: Bad version number in .class file 解决方法
  7. 零基础小白10分钟用Python搭建小说网站!网友:我可以!
  8. fastjson 判断是否包含_Fastjson, Gson, org.json.JSON三者对于JSONObject及JSONArray的判断
  9. zabbix通过ODBC监控sybase举例
  10. Linux-系统编程-知识点概述
  11. java List操作
  12. linux etcfstab文件,Linux中/etc/fstab文件详解
  13. Ubuntu下Opencv安装与使用
  14. EOF和BOF的区别
  15. 如何查询电脑系统和服务器地址,如何查询电脑系统和服务器地址
  16. 图像分割评测指标MIOU之python代码详解
  17. Win11系统鼠标右键无法打开一直转圈解决方法
  18. 51单片机stc15w204s串口通信发数据接收数据串口中断发中文字符串完美运行软件延时发送一字节函数全注释
  19. 一键生成轮播图,轮播图插件
  20. 《麦肯锡方法》读书笔记14

热门文章

  1. python中两个矩阵之间的点乘_Python基础--数据分析库--Numpy
  2. 网站优化中哪些设置会影响蜘蛛的抓取?对网站SEO产生什么影响?
  3. SEO优化可以从这几个方面着手
  4. html怎样添加日历控件,向日历控件中添加自定义内容
  5. mac上投屏android_全平台Win/Mac全设备Android/iOS 免费无线投屏神器
  6. 用python爬取网站数据期末作业_python实战第一周作业:爬取一页商品数据
  7. 开发日记-20190517 关键词 函数式编程(一)
  8. 开发日记-20190511 关键词 onStart()和onResume()存在的原因(猜测篇)
  9. 罗辑思维 - 当代的学习方法
  10. 一些带dga域名的恶意软件