java storm tick ack_关于Storm Tick
转载自kqdongnanf-博客园;Email:[email protected]
1. tick的功能
Apache Storm中内置了一种定时机制——tick,它能够让任何bolt的所有task每隔一段时间(精确到秒级,用户可以自定义)收到一个来自__systemd的__tick stream的tick tuple,bolt收到这样的tuple后可以根据业务需求完成相应的处理。
Tick功能从Apache Storm 0.8.0版本开始支持,本文在Apache Storm 0.9.1上测试。
2. 在代码中使用tick及其作用
在代码中如需使用tick,可以参照下面的方式:
2.1. 为bolt设置tick
若希望某个bolt每隔一段时间做一些操作,那么可以将bolt继承BaseBasicBolt/BaseRichBolt,并重写getComponentConfiguration()方法。在方法中设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值,单位是秒。
getComponentConfiguration()是backtype.storm.topology.IComponent接口中定义的方法,在此方法的实现中可以定义以”Topology.*”开头的此bolt特定的Config。
这样设置之后,此bolt的所有task都会每隔一段时间收到一个来自__systemd的__tick stream的tick tuple,因此execute()方法可以实现如下:
2.2. 为Topology全局设置tick
若希望Topology中的每个bolt都每隔一段时间做一些操作,那么可以定义一个Topology全局的tick,同样是设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值:
2.3. tick设置的优先级
与Linux中的环境变量的优先级类似,storm中的tick也有优先级,即全局tick的作用域是全局bolt,但对每个bolt其优先级低于此bolt定义的tick。
这个参数的名字TOPOLOGY_TICK_TUPLE_FREQ_SECS具有一定的迷惑性,一眼看上去应该是Topology全局的,但实际上每个bolt也可以自己定义。
2.4. tick的精确度
Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS是精确到秒级的。例如某bolt设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS为10s,理论上说bolt的每个task应该每个10s收到一个tick tuple。
实际测试发现,这个时间间隔的精确性是很高的,一般延迟(而不是提前)时间在1ms左右。测试环境:3台虚拟机做supervisor,每台配置:4Cpu、16G内存、千兆网卡。
3. storm tick的实现原理
在bolt中的getComponentConfiguration()定义了该bolt的特定的配置后,storm框架会在TopologyBuilder.setBolt()方法中调用bolt的getComponentConfiguration()方法,从而设置该bolt的配置。
调用路径为:TopologyBuilder.setBolt()-> TopologyBuilder.initCommon()-> getComponentConfiguration()
4. 附件
测试使用的代码:
package storm.starter;
import backtype.storm.Config;
import backtype.storm.Constants;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.starter.spout.RandomSentenceSpout;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class MyTickTestTopology {
public static class WordCount extends BaseBasicBolt {
Map counts = new HashMap();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){
System.out.println("################################WorldCount bolt: "
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));
}
else{
collector.emit(new Values("a", 1));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
@Override
public Map getComponentConfiguration() {
Config conf = new Config();
conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,10);
return conf;
}
}
public static class TickTest extends BaseBasicBolt{
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
// 收到的tuple是tick tuple
if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){
System.out.println("################################TickTest bolt: "
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));
}
// 收到的tuple时正常的tuple
else{
collector.emit(new Values("a"));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("test"));
}
@Override
public Map getComponentConfiguration() {
Config conf = new Config();
conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,20);
return conf;
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 3);
builder.setBolt("count", new WordCount(), 3).shuffleGrouping("spout");
builder.setBolt("tickTest", new TickTest(), 3).shuffleGrouping("count");
Config conf = new Config();
conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 7);
conf.setDebug(false);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
// Thread.sleep(10000);
// cluster.shutdown();
}
}
}
java storm tick ack_关于Storm Tick相关推荐
- storm源码之storm代码结构【译】
说明:本文翻译自Storm在GitHub上的官方Wiki中提供的Storm代码结构描述一节Structure of the codebase,希望对正在基于Storm进行源码级学习和研究的朋友有所帮助 ...
- kfaka storm写入mysql_flume+kafka+storm+mysql架构设计
序言 前段时间学习了storm,最近刚开blog,就把这些资料放上来供大家参考.这个框架用的组件基本都是最新稳定版本,flume-ng1.4+kafka0.8+storm0.9+mysql如果有需要测 ...
- Storm专题一、Storm DRPC 分布式计算
本文译自:https://storm.incubator.apache.org/documentation/Distributed-RPC.html Storm里面引入DRPC主要是利用storm的实 ...
- 大数据技术之_17_Storm学习_Storm 概述+Storm 基础知识+Storm 集群搭建+Storm 常用 API+Storm 分组策略和并发度
大数据技术之_17_Storm学习 一 Storm 概述 1.1 离线计算是什么? 1.2 流式计算是什么? 1.3 Storm 是什么? 1.4 Storm 与 Hadoop 的区别 1.5 Sto ...
- 【Twitter Storm系列之三】 storm简单实例分析
实例来自书籍<Oreilly.Getting.Started.with.Storm.Aug.2012> 先讲下我们这次所需涉及到的概念:Topology.Spout.Blot Topolo ...
- kfaka storm写入mysql_基于Storm+Kafka+Zookeeper锁+Memcached+mysql架构全方位系统Storm项目案例实战...
基于Storm+Kafka+Zookeeper锁+Memcached+mysql架构全方位系统Storm项目案例实战 适应人群 有一定Storm基础.Kafka基础.Memcached基础.Zooke ...
- python storm连接mysql_Python ORM Storm 源码修改
安装 storm : pip install storm 目标:修改 Storm 源代码以支持自动重连 文件:python安装目录/site-packages/storm/database.py 在4 ...
- Linux时间子系统之Tick广播层(Tick Broadcast)
在分析Tick模拟层的时候曾经提到过,当系统中没有别的进程需要处理的时候,会将当前CPU切换到NO_HZ状态,不会每一个Tick都收到定时中断,从而达到节电的目的.但此时,当前CPU上的定时事件设备还 ...
- Linux时间子系统之Tick模拟层(Tick Sched)
在分析高分辨率定时器的时候曾经提到过,一旦切换到高精度模式后,原来的Tick层就失去作用了,高分辨率定时器层将"接管"对底层定时事件设备的控制.这时,也就意味着,系统中原有的Tic ...
最新文章
- 元学习—Meta Learning的兴起
- python软件下载安装win10-Python Win10版本下载
- wxWidgets:运行时类型信息 (RTTI)
- 原生支付url参数错误_小程序支付
- python局部变量含义_Python的变量
- 国家开放大学本科计算机应用基础,【(精华版)最新国家开放大学电大本科《计算机应用基础》网络课网考形考作业一及三试题答案】.docx...
- MiniO 磁盘缓存快速入门
- 技术圈儿002---高并发整体可用性:一文详解降级、限流和熔断
- ActiveMQ从入门到精通(二)
- python可视化拖拽编程平台_PythonEditor可视化拼插编辑器:编程不用写代码,拖拖拽拽就可以!...
- 《STM32从零开始学习历程》——SPI读取FLASH ID
- 匿名mahony互补滤波代码详解
- mac charles4.0.2免费破解版安装
- oracle中imp命令详解,Oracle使用imp命令导入数据详解
- 西伯利亚曾经叫鲜卑利亚
- 个人公众号成长记 - 你为什么要做公众号呢?
- 数据科学入门与细分数据领域盘点
- 小米(社招)测试开发面经-小米手机管家
- 300iq Contest 3简要题解
- C# .NET想要另存一个项目,sln文件丢了怎么办
热门文章
- 全员降薪 65% 五个月,核心高管不领工资:再明星的教育机构也没躲过去啊......
- Android的log机制,androidtv开发总结
- 【C#】AutoMapper 使用手册
- css 并排放置两个div
- 悟空CRM (基于jfinal+vue+ElementUI的前后端分离CRM系统)
- java硬币翻转问题_java – 硬币翻转程序
- [团队] 在Unity项目中使用FMOD来管理你的音效
- 20194311姜晨昊Exp6-MSF应用基础
- 基于 Agora SDK 实现 iOS 端的多人视频互动
- 来自2020年应届生毕业后的第一段实习经历以及对未毕业同学们的一些友好提示