Storm - Trident
[align=center][size=large]Trident[/size][/align]
一、Storm 保证性
1.数据一定会发送
通过 ack / fail 方法确认,若失败,则提供重新发送的机制
2.数据一定只会统计一次
数据发送后有一个唯一性的标识,通过判断此标识,若存在,则不处理
3.数据一定会按照顺序进行处理
数据发送后有一个唯一性的标识,按照标识编号的顺序进行处理
二、Storm 保证性实现
1.逐个发送,逐个处理
如果这样处理,则原有的并行处理会变成穿行处理,不可取
2.批量发送,批量处理
如果这样处理,则如果当前这批数据处理完毕但未发送,则无法处理下一批数据,且这一批数据之间的处理顺序是并发的在进行的
3.分成两个步骤
一个处理数据,一个发送数据;
数据处理完毕,则继续处理下一批数据;数据是否发送到下一个缓解,由发送数据的步骤决定
采用此方式
三、Trident
1.Spout
package com.study.storm.trident.wordcount;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;
/** * @description * 数据来源 * 模拟批量数据发送 * <br/> * @remark * Storm 的保证及实现 * 1.数据一定被发送 * 通过 ack() 、 fail() 的确认机制,若发送失败,则重新发送 * 2.数据只被处理一次 * 数据发送时带有唯一的编号,判断此编号是否被处理过,若是,则忽略,不处理 * 3.数据被按照一定的顺序处理 * 数据发送时带有唯一的编号,按照编号的顺序进行处理,若数据不是按照顺序到达,则等待 * * <br/> * * Trident 处理批量数据 * */public class SentenceSpout extends BaseRichSpout {
/** * */ private static final long serialVersionUID = 2122598284858356171L;
private SpoutOutputCollector collector = null ;
/** * 模拟批量数据发送 * key : name * value : sentence */ private Values [] valuesArray = new Values[] { new Values("a","111111111111"), new Values("b","222222222222"), new Values("c","333333333333"), new Values("d","444444444444"), new Values("e","555555555555"), new Values("f","666666666666"), new Values("g","777777777777"), new Values("h","888888888888") };
@SuppressWarnings("rawtypes") @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector ; }
// 发送的顺序,即数据组合的下标,标识数据发送到哪个位置 private int index = 0 ;
@Override public void nextTuple() {
if(index >= valuesArray.length){ return ; } index = index == valuesArray.length ? 0 : index++ ; this.collector.emit(valuesArray[index]); }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("name","sentence")); }
}
简化实现
package com.study.storm.trident.wordcount;
import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.generated.StormTopology;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;import storm.trident.Stream;import storm.trident.TridentTopology;import storm.trident.testing.FixedBatchSpout;
public class TridentTopologyDemo {
public static void main(String[] args) {
// 相当于原有的 Spout 实现 @SuppressWarnings("unchecked") FixedBatchSpout tridentSpout = new FixedBatchSpout(new Fields("name","sentence"), 1, new Values("a","111111111111"), new Values("b","222222222222"), new Values("c","333333333333"), new Values("d","444444444444"), new Values("e","555555555555"), new Values("f","666666666666"), new Values("g","777777777777"), new Values("h","888888888888")); // 是否循环发送,false 不 tridentSpout.setCycle(false);
TridentTopology topology = new TridentTopology(); /** * 1.本地过滤器设置 */ // 设置数据源 Stream initStream = topology.newStream("tridentSpout", tridentSpout); // 设置过滤器 -- 过滤name : d 的数据 initStream = initStream.each(new Fields("name"),new RemovePartDataFilter()); // 添加函数,输出字母对应的位置 initStream = initStream.each(new Fields("name"),new NameIndexFunction() ,new Fields("indexNum"));
// 设置过滤器 -- 拦截数据并打印 Stream filterPrintStream = initStream.each(new Fields("name","sentence"), new PrintFilter());
//--提交Topology给集群运行 Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("MyTopology", conf, topology.build());
//--运行10秒钟后杀死Topology关闭集群 Utils.sleep(1000 * 10); cluster.killTopology("MyTopology"); cluster.shutdown(); }
}
package com.study.storm.trident.wordcount;
import java.util.Iterator;
import backtype.storm.tuple.Fields;import storm.trident.operation.BaseFilter;import storm.trident.tuple.TridentTuple;
/** * @description * 打印:key 与 value ,fields 与 fields 对应传输的内容 */public class PrintFilter extends BaseFilter {
/** * */ private static final long serialVersionUID = 4393484291178519442L;
@Override public boolean isKeep(TridentTuple tuple) { Fields fields = tuple.getFields(); Iterator<String> iterator = fields.iterator(); while(iterator.hasNext()){ String key = iterator.next(); Object valueByField = tuple.getValueByField(key); System.out.println("fields : "+ key + " values : "+valueByField); }
return true; }
}
package com.study.storm.trident.wordcount;
import storm.trident.operation.BaseFilter;import storm.trident.tuple.TridentTuple;
/** * 过滤name = d 的数据 * return false 过滤 * return true 继续传递 */public class RemovePartDataFilter extends BaseFilter {
/** * */ private static final long serialVersionUID = 8639858690618579558L;
@Override public boolean isKeep(TridentTuple tuple) { String stringByField = tuple.getStringByField("name"); return !stringByField.equals("d"); }
}
package com.study.storm.trident.wordcount;
import java.util.HashMap;import java.util.Map;
import backtype.storm.tuple.Values;import storm.trident.operation.BaseFunction;import storm.trident.operation.TridentCollector;import storm.trident.tuple.TridentTuple;
public class NameIndexFunction extends BaseFunction {
/** * */ private static final long serialVersionUID = 9085021905838331812L;
static Map<String,Integer> indexMap = new HashMap<String,Integer>(); static { indexMap.put("a", 1); indexMap.put("b", 2); indexMap.put("c", 3); indexMap.put("d", 4); indexMap.put("e", 5); indexMap.put("f", 6); indexMap.put("g", 7); indexMap.put("h", 8); indexMap.put("i", 9); }
@Override public void execute(TridentTuple tuple, TridentCollector collector) { String name = tuple.getStringByField("name"); collector.emit(new Values(indexMap.get(name))); }
}
Storm - Trident相关推荐
- Storm Trident API
在Storm Trident中有五种操作类型 Apply Locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输 Repartitioning:数据流重定向,单纯的改变数据流向,不会 ...
- Storm Trident拓扑中的错误处理
这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法. 我在这里关注代码设计,而不是监督或冗余之类的部署良好实践. 由于Storm的实时流性质,当面对大多数错误时,我们最终将不得不 ...
- Storm Trident示例shuffleparallelismHint
本例包括Storm Trident中shuffle与parallelismHint的使用. 代码当中包括注释 maven <dependency><groupId>org.ap ...
- Storm Trident简介
转载自:[翻译][Trident] Storm Trident 教程 英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial -- ...
- [Trident] Storm Trident 教程,state详解、trident api详解及实例
英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个 ...
- storm trident mysql_Trident-MySQL
使用事物TridentTopology 持久化数据到MySQL1.构建拓扑JDBCTopology类packagestorm.trident.mysql;importjava.util.Arrays; ...
- Storm Trident 详细介绍
一.概要 1.1 Storm(简介) Storm是一个实时的可靠地分布式流计算框架. 具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息( ...
- storm trident
Trident是在storm基础上,一个以实时计算为目标的高度抽象. 它在提供处理大吞吐量数据能力(每秒百万次消息)的同时,也提供了低延时分布式查询和有状态流式处理的能力. 如果你对Pig和Casca ...
- storm trident mysql,storm_Trident
简介: Trident 是 Storm 的一种高度抽象的实时计算模型,它可以将高吞吐量(每秒百万级)数据输入.有状态的流式处理与低延时的分布式查询无缝结合起来.如果你了解 Pig 或者 Cascadi ...
最新文章
- mobilefacenet
- python读取excel一列-Python从Excel中读取日期一列的方法
- 计算机操作系统课后题答案第三章,计算机操作系统教程习题与实验指导(第3版)...
- json 在后天怎么接_长相显老怎么办?以同岁的马伊琍和刘敏涛为例,解析显年轻的技巧...
- 【EOJ Monthly 2019.02 - F】方差(数学,前缀和,积的前缀和)
- my40_MySQL锁概述之意向锁
- AOE网与关键路径简介
- [AWDwR4] No JQuery call matches [:html, #cart]
- 32. My Experiences in the Factories 我在工厂的经历
- css中的clip:rect() 只能在绝对定位的元素上使用
- Windows编程入门
- 【Spring基础】CGLIB动态代理实现原理
- linux终端无法输入大写字母,linux不能打大写字母
- Flash制作大雪纷飞效果动画
- 【题解】2019,7.14 模拟赛(阿鲁巴)
- 1063 Set Similarity (25 分) java 题解
- 安装SQL server出现“服务没有及时响应启动或控制请求”
- win10系统提示:“重新启动以修复驱动器错误”解决办法
- 【航线运输驾驶员理论考试】人的行为能力
- Windows AIK+Imagex+DISM+U盘PE+U盘安装win7+PE