整合kafka和storm例子网上很多,自行查找

问题描述:

kafka是之前早就搭建好的,新建的storm集群要消费kafka的主题,由于kafka中已经记录了很多消息,storm消费时从最开始消费

问题解决:

下面是摘自官网的一段话:

How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures

As shown in the above KafkaConfig properties, you can control from where in the Kafka topic the spout begins to read by setting KafkaConfig.startOffsetTime as follows:

kafka.api.OffsetRequest.EarliestTime(): read from the beginning of the topic (i.e. from the oldest messages onwards)

kafka.api.OffsetRequest.LatestTime(): read from the end of the topic (i.e. any new messsages that are being written to the topic)

A Unix timestamp aka seconds since the epoch (e.g. via System.currentTimeMillis()): see How do I accurately get offsets of messages for a certain timestamp using OffsetRequest? in the Kafka FAQ

As the topology runs the Kafka spout keeps track of the offsets it has read and emitted by storing state information under the ZooKeeper path SpoutConfig.zkRoot+ "/" + SpoutConfig.id. In the case of failures it recovers from the last written offset in ZooKeeper.

Important: When re-deploying a topology make sure that the settings for SpoutConfig.zkRoot and SpoutConfig.id were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.

This means that when a topology has run once the setting KafkaConfig.startOffsetTime will not have an effect for subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in ZooKeeper to determine from where it should begin (more precisely: resume) reading. If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter KafkaConfig.ignoreZkOffsets to true. If true, the spout will always begin reading from the offset defined by KafkaConfig.startOffsetTime as described above.

这段话的包含的内容大概有,通过SpoutConfig对象的startOffsetTime字段设置消费进度,默认值是kafka.api.OffsetRequest.EarliestTime(),也就是从最早的消息开始消费,如果想从最新的消息开始消费需要手动设置成kafka.api.OffsetRequest.LatestTime()。另外还有一个问题是,这个字段只会在第一次消费消息时起作用,之后消费的offset是从zookeeper中记录的offset开始的(存放消费记录的地方是SpoutConfig对象的zkroot字段,未验证)

如果想要当前的topology的消费进度接着上一个topology的消费进度继续消费,那么不要修改SpoutConfig对象的id。换言之,如果你第一次已经从最早的消息开始消费了,那么如果不换id的话,它就要从最早的消息一直消费到最新的消息,这个时候如果想要跳过中间的消息直接从最新的消息开始消费,那么修改SpoutConfig对象的id就可以了

下面是SpoutConfig对象的一些字段的含义,其实是继承的KafkaConfig的字段,可看源码

public int fetchSizeBytes = 1024 * 1024; //发给Kafka的每个FetchRequest中,用此指定想要的response中总的消息的大小

public int socketTimeoutMs = 10000;//与Kafka broker的连接的socket超时时间

public int fetchMaxWait = 10000; //当服务器没有新消息时,消费者会等待这些时间

public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的读缓冲区大小

public MultiScheme scheme = new RawMultiScheme();//从Kafka中取出的byte[],该如何反序列化

public boolean forceFromStart = false;//是否强制从Kafka中offset最小的开始读起

public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从何时的offset时间开始读,默认为最旧的offset

public long maxOffsetBehind = Long.MAX_VALUE;//KafkaSpout读取的进度与目标进度相差多少,相差太多,Spout会丢弃中间的消息

public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所请求的offset对应的消息在Kafka中不存在,是否使用startOffsetTime

public int metricsTimeBucketSizeInSecs = 60;//多长时间统计一次metrics

HDP 3.1.0 集成 Sqoop2 踩坑问题记录

HDP 3.1.0 集成 Sqoop2 踩坑问题记录 本文原始地址:https://sitoi.cn/posts/65261.html 问题一 $ sqoop:000> start job -n ...

lubuntu踩坑全记录

为了降低系统占用,毕业之后一直用lubuntu不用ubuntu...操作其实差不多,就是lubuntu有一些小坑坑:P 本文是我的踩坑全记录.长期更新. 调分辨率  升级命令lubuntu不出登录页面 ...

Windows Server 2012搭建SQL Server Always On踩坑全记录

Windows Server 2012搭建SQL Server Always On踩坑全记录 环境信息: Windows Server 2012 R2 Sql Server 2012 整个搭建集群的过 ...

Solr 7 部署与使用踩坑全记录

前言 Solr 是一种可供企业使用的.基于 Lucene 的搜索服务器,它支持层面搜索.命中醒目显示和多种输出格式.在这篇文章中,我将介绍 Solr 的部署和使用的基本操作,希望能让初次使用的朋友们少 ...

JavaScript两数相加(踩坑)记录

Adding two numbers concatenates them instead of calculating the sum JavaScript里两个变量 var a = 2: var b ...

【bug记录】OS Lab3 踩坑记

OS Lab3 踩坑记 Lab3在之前Lab2的基础上,增加了进程建立.调度和中断异常处理.其中测试包括进程建立以及进程调度部分. 由于是第一次做bug记录,而且是调试完bug后再做的记录,所以导致记 ...

unionId突然不能获取的踩坑记录

昨天(2016-2-2日),突然发现系统的一个微信接口使用不了了.后来经查发现,是在网页授权获取用户基本信息的时候,unionid获取失败导致的. 在网页授权获取用户基本信息的介绍中(http://m ...

【踩坑记录】记一次MySQL主从复制延迟的坑

最近开发中遇到的一个MySQL主从延迟的坑,记录并总结,避免再次犯同样的错误. 情景 一个活动信息需要审批,审批之后才能生效.因为之后活动要编辑,编辑后也可能触发审批,审批中展示的是编辑前的活动内容, ...

CentOS7.4安装MySQL踩坑记录

CentOS7.4安装MySQL踩坑记录 time: 2018.3.19 CentOS7.4安装MySQL时网上的文档虽然多但是不靠谱的也多, 可能因为版本与时间的问题, 所以记录下自己踩坑的过程, ...

随机推荐

JAVA设计模式之1-单例模式

设计模式是什么? 设计模式是一种思路,是在前辈们的软件工程中总结出来的套路,并且这些套路已经经过很多项目的测试,是比较成熟的思路,所以现在来总结一下常见的设计模式. 最简单最常用的就是单例模式: 一般 ...

factory service provide自定义服务

1.factory factory , 就是你提供一个方法, 该方法返回一个对象的实例, 对于 AngularJS 的 factory 来说, 就是先定义一个对象, 给这个对象添加属性和方法, 然后返 ...

转:linux的源码查看, c++语法 查看网站

http://linux.die.net/ http://www.cplusplus.com/

Java JDBC批处理插入数据操作(转)

在此笔记里,我们将看到我们如何可以使用像Statement和PreparedStatement JDBC API来批量在任何数据库中插入数据.此外,我们将努力探索一些场景,如在内存不足时正常运行,以及 ...

python大规模爬取京东

python大规模爬取京东 主要工具 scrapy BeautifulSoup requests 分析步骤 打开京东首页,输入裤子将会看到页面跳转到了这里,这就是我们要分析的起点 我们可以看到这个页面 ...

MYSQL 面试查询系列问题

表结构: `student`('id'.'name'.'code'.'age'.'sex')学生表 `teacher`('id'.'name')教师表 `course`('id'.'name'.'te ...

Object is not a function

如图报了一个这样的错,百度好多都说是函数名和html元素重名的问题.可是这个问题我想我这里是不存在的 可以看到就一个绑定事件,而且id名不是关键字 报错是在$.ajax这一行,索性就把submit-i ...

Nginx模块 ngx_http_limit_req_module 限制请求速率

The ngx_http_limit_req_module module (0.7.21) is used to limit the request processing rate per a def ...

phpmyadmin 安装

首先,安装mysql $ sudo apt-get install mysql-server$ sudo apt-get install mysql-client安装时输出root用户的密码在安装ph ...

iOS UI基础-10.0 QQ聊天布局之键盘及文本使用

要实现的效果:   这里只说用到的几个知识点 1.图片包含文字 在设置文字的Frame的时候,使用背景(按钮)的尺寸,文字使用了内边距 背景图片,使用拉伸 /** * 返回一张可以随意拉伸不变形的图片 ...

storm mysql spout_storm kafkaSpout 踩坑问题记录! offset问题!相关推荐

  1. storm kafkaSpout 踩坑问题记录! offset问题!

    整合kafka和storm例子网上很多,自行查找 问题描述: kafka是之前早就搭建好的,新建的storm集群要消费kafka的主题,由于kafka中已经记录了很多消息,storm消费时从最开始消费 ...

  2. 轻量应用服务器MySQL远程连接踩坑

    不算是给阿里云打广告吧,因为被阿里云的"云服务器ECS" 和 "轻量应用服务器"搞的很蛋疼.很多年前,阿里云的学生机"云翼计划"默认就只有& ...

  3. Jetson Nano配置踩坑全记录

    Jetson Nano配置踩坑全记录 Jetson Nano相关参数:JetPack 4.6,cuda 10.2, SD卡内存:512G 一.Jetson Nano系统镜像烧录 在Nvidia官网下载 ...

  4. vue在微信里面的兼容问题_详解Vue微信公众号开发踩坑全记录

    本文介绍了Vue微信公众号开发踩坑全记录,分享给大家,也给自己留个笔记. 需求 微信授权登录(基于公众号的登录方案) 接入JS-SDK实现图片上传,分享等功能 现状及难点 采用的Vue框架,前后端分 ...

  5. 容器化部署(k8s)任务调度平台xxl-job(部署过程及踩坑问题记录)

    文章预览: 1 部署过程(下方ip代表服务器的ip哈) 1.1 制作服务打包镜像DockerFile 1.2 制作执行脚本run.sh 1.3 jar包上上传 1.4 kuboard创建----配置信 ...

  6. Linux 环境下安装 MySQL,各种踩坑、疑难杂症 | 原力计划

    作者 | 红颜祸水nvn 来源 | CSDN博客,责编 | 夕颜 头图 | CSDN 下载自视觉中国 出品 | CSDN(ID:CSDNnews) 本文中,作者总结了在使用Linux CentOS 6 ...

  7. linux ubuntu安装pytorch(深度学习环境搭建记录,无sudo权限)踩坑全记录

    一些牢骚:深度学习没怎么学习几次,搭建环境已经把我搞秃了哈哈哈. 之前在网上找到的搭建环境的步骤,我没有root权限,很多操作都不行(比如运行.run文件,cuda 和cudnn的安装和修改也需要ro ...

  8. php kafka storm,php的kafka踩坑(二)

    接上一篇文章,上次没有解决的一个问题就是在做一个队列的时候,存在多消费者消费到同一个消息的情况,今天终于解决了这个问题,问题的本质是因为运维给我创建的topic是有问题的,他创建的分区数量是0,我今天 ...

  9. HyperLPR 开源车牌识别系统搭建踩坑全记录

    (介绍略) 一个开源的车牌识别系统. github地址:https://github.com/szad670401/HyperLPR 一.下载 git clone https://github.com ...

最新文章

  1. c++内存中字节对齐问题详解
  2. Mapreduce基本工作流程
  3. install pyinstaller
  4. 使用outlet在SAP Spartacus中添加自定义UI
  5. 自控matlab设计,自动控制原理课程设计--基于MATLAB软件的自动控制系统仿真
  6. JAVA Linux 排查CPU 过高的方法
  7. 云熙板式家具参数化拆单软件免锁版_数控开料机拆单软件如何选择?
  8. ORACLE批量更新四种方法比较
  9. 2009年2月26日经济报道:奥巴马2010年3.5万亿美元预算。
  10. html期末作品,走完HTML和CSS,进军期末
  11. 每日算法系列【LeetCode 1006】笨阶乘
  12. 分布式 NewSQL 数据库TiDB 3.1.0 版本发布,修复多项问题
  13. unity自动生成敌人_敌人的自动生成 - Unity脚本编程 — Project 2:慕课英雄 MOOC HERO(第三人称射击简易版) | Coursera...
  14. [LeetCode刷题] 476. 数字的补数--Java实现
  15. 【路由协议】和【应用协议】
  16. 低功耗MCU的选择方法
  17. 马赛克颗粒感天空Canvasjs特效
  18. 一文带你了解typeScript
  19. isotope自动布局
  20. PUSH消息是什么?

热门文章

  1. protobuf_name_conflict问题解决
  2. 驻场开发跟人力外包有什么区别?
  3. proguard学习
  4. 生命不息,折腾不止—小黑升win10
  5. 2.9 Nginx一致性HASH算法
  6. 【数据结构和算法】2谈谈算法
  7. python序列化模块struct_Python进阶-XII serialize(序列化)、序列化模块
  8. 和韩雪冬学到的网页设计点滴
  9. React 界面样式设计
  10. 阿里云和腾讯云免费SSL证书 专题