转载自kqdongnanf-博客园;Email:kqdongnanf@yahoo.com。

1. tick的功能

Apache Storm中内置了一种定时机制——tick,它能够让任何bolt的所有task每隔一段时间(精确到秒级,用户可以自定义)收到一个来自__systemd的__tick stream的tick tuple,bolt收到这样的tuple后可以根据业务需求完成相应的处理。

Tick功能从Apache Storm 0.8.0版本开始支持,本文在Apache Storm 0.9.1上测试。

2. 在代码中使用tick及其作用

在代码中如需使用tick,可以参照下面的方式:

2.1. 为bolt设置tick

若希望某个bolt每隔一段时间做一些操作,那么可以将bolt继承BaseBasicBolt/BaseRichBolt,并重写getComponentConfiguration()方法。在方法中设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值,单位是秒。

getComponentConfiguration()是backtype.storm.topology.IComponent接口中定义的方法,在此方法的实现中可以定义以”Topology.*”开头的此bolt特定的Config。

这样设置之后,此bolt的所有task都会每隔一段时间收到一个来自__systemd的__tick stream的tick tuple,因此execute()方法可以实现如下:

2.2. 为Topology全局设置tick

若希望Topology中的每个bolt都每隔一段时间做一些操作,那么可以定义一个Topology全局的tick,同样是设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值:

2.3. tick设置的优先级

与Linux中的环境变量的优先级类似,storm中的tick也有优先级,即全局tick的作用域是全局bolt,但对每个bolt其优先级低于此bolt定义的tick。

这个参数的名字TOPOLOGY_TICK_TUPLE_FREQ_SECS具有一定的迷惑性,一眼看上去应该是Topology全局的,但实际上每个bolt也可以自己定义。

2.4. tick的精确度

Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS是精确到秒级的。例如某bolt设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS为10s,理论上说bolt的每个task应该每个10s收到一个tick tuple。
实际测试发现,这个时间间隔的精确性是很高的,一般延迟(而不是提前)时间在1ms左右。测试环境:3台虚拟机做supervisor,每台配置:4Cpu、16G内存、千兆网卡。

3. storm tick的实现原理

在bolt中的getComponentConfiguration()定义了该bolt的特定的配置后,storm框架会在TopologyBuilder.setBolt()方法中调用bolt的getComponentConfiguration()方法,从而设置该bolt的配置。

调用路径为:TopologyBuilder.setBolt()-> TopologyBuilder.initCommon()-> getComponentConfiguration()

4. 附件

测试使用的代码:

package storm.starter;import backtype.storm.Config;
import backtype.storm.Constants;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.starter.spout.RandomSentenceSpout;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;public class MyTickTestTopology {public static class WordCount extends BaseBasicBolt {Map<String, Integer> counts = new HashMap<String, Integer>();@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){System.out.println("################################WorldCount bolt: "+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));}else{collector.emit(new Values("a", 1));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}@Overridepublic Map<String, Object> getComponentConfiguration() {Config conf = new Config();conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,10);return conf;}}public static class TickTest extends BaseBasicBolt{@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {// 收到的tuple是tick tupleif (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){System.out.println("################################TickTest bolt: "+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));}// 收到的tuple时正常的tupleelse{collector.emit(new Values("a"));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("test"));}@Overridepublic Map<String, Object> getComponentConfiguration() {Config conf = new Config();conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,20);return conf;}}public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new RandomSentenceSpout(), 3);builder.setBolt("count", new WordCount(), 3).shuffleGrouping("spout");builder.setBolt("tickTest", new TickTest(), 3).shuffleGrouping("count");Config conf = new Config();conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 7);conf.setDebug(false);if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, builder.createTopology());}else {conf.setMaxTaskParallelism(3);LocalCluster cluster = new LocalCluster();cluster.submitTopology("word-count", conf, builder.createTopology());//      Thread.sleep(10000);
//      cluster.shutdown();}}
}

关于Storm Tick相关推荐

  1. Storm tick 功能

    1. tick的功能 Apache Storm中内置了一种定时机制--tick,它能够让任何bolt的所有task每隔一段时间(精确到秒级,用户可以自定义)收到一个来自__systemd的__tick ...

  2. java storm tick ack_关于Storm Tick

    转载自kqdongnanf-博客园:Email:[email protected] 1. tick的功能 Apache Storm中内置了一种定时机制--tick,它能够让任何bolt的所有task每 ...

  3. Storm Bolt之定时机制Tick应用

    Storm中有一种内置的定时机制Storm Bolt之Tick,可以在任何bolt的task每个一定时间(支持通过用户自定义配置)收到来自System Id的tick tuple.Bolt在收到这样的 ...

  4. 在archlinux上搭建twitter storm cluster

    本文详细描述如何在archlinux上搭建twitter storm cluster,转载请注明出处,谢谢. 有关archlinux基本系统安装,请参照archlinux简明安装指南一文,下面以上述为 ...

  5. Storm 01之 Storm基本概念及第一个demo

    2.1 Storm基本概念 在运行一个Storm任务之前,需要了解一些概念: Topologies :[tə'pɑ:lədʒɪ]拓扑结构 Streams Spouts:[spaʊt]喷出; 喷射; 滔 ...

  6. Storm概念学习系列之storm-starter项目(完整版)(博主推荐)

    这是书籍<从零开始学Storm>赵必厦 2014年出版的配套代码! storm-starter项目包含使用storm的各种各样的例子.项目托管在GitHub上面,其网址为: http:// ...

  7. Storm的本地运行模式示例

    以word count为例,本地化运行模式(不需要安装zookeeper.storm集群),maven工程, pom.xml文件如下: <project xmlns="http://m ...

  8. centos7下安装storm步骤

      前言 真是后知后觉,最近忙也要学习,把以前丢的都要拾起来.原理懂不懂也把环境搭起来学习.   环境  centos7 jdk 1.8 zookeeper 3.4.13 storm 1.2.2 安装 ...

  9. Storm编程入门API系列之Storm的可靠性的ACK消息确认机制

    概念,见博客 Storm概念学习系列之storm的可靠性  什么业务场景需要storm可靠性的ACK确认机制? 答:想要保住数据不丢,或者保住数据总是被处理.即若没被处理的,得让我们知道. publi ...

最新文章

  1. HDFS读写过程解析
  2. java使用localstorage_sessionStorage和localStorage的使用
  3. 【NLP】中文BERT上分新技巧,多粒度信息来帮忙
  4. js实现获取当前周,过去和未来周的时间段日期
  5. 【HDU - 1080】Human Gene Functions(dp,可编辑距离类问题)
  6. c语言实现线性表的算法,数据结构算法代码实现——线性表的定义(一)
  7. 华睿相机sdk 开发_索尼发布相机远程操作SDK(软件开发工具包)
  8. pytorch 深入理解 tensor.scatter_ ()用法
  9. 管理感悟:一偷懒,必出错
  10. matlab 频率分辨率,功率谱、频率分辨率、频谱泄漏与窗函数
  11. SOUI使用过程知识点小结1
  12. java 光通信_超通俗易懂科普:什么是光通信?
  13. 银联 php hex2bin,银联支付
  14. CF1380D.Berserk And Fireball 【2000】你值得学习的【思维】+【模拟】+【贪心】
  15. Input length must be multiple of 8 when decrypting with padded cipher
  16. Oracle官网用户名密码
  17. 使用eBPF将网络功能Offload到网卡
  18. chmod -R xxx 3位数字权限对照表
  19. leetcode 1534. Count Good Triplets(python)
  20. 怎样清除手机上的微信小程序的缓存?

热门文章

  1. php 做积分策略,积分策略
  2. 玩转Google开源C++单元测试框架Google Test系列(gtest)之八 - 打造自己的单元测试框架
  3. c++ reference counting引用计数原理
  4. 为什么将0.1f改为0会使性能降低10倍?
  5. 多媒体技术 PI 第二期:OSS 圆桌预告
  6. Netflix 付费用户达2亿、苹果VR眼镜、抖音电子钱包、虚幻引擎用于电影制作等|Decode the Week...
  7. NETINT刘迅思:底层软件开发向上层应用靠拢
  8. MSU发布2018年视频压缩评比报告
  9. NodeJS开发c++扩展模块
  10. 安装Google Object Detection API