Trident是基于Storm进行实时留处理的高级抽象,提供了对实时流4的聚集,投影,过滤等操作,从而大大减少了开发Storm程序的工作量。Trident还提供了针对数据库或则其他持久化存储的有状态的,增量的更新操作的原语。

若我们要开发一个对文本中的词频进行统计的程序,使用Storm框架的话我们需要开发三个Storm组件:

1.一个Spout负责收集文本信息并分段,做为sentence字段发送给下游的Bolt

2.一个Bolt将将每段文本粉刺,将分词结果以word字段发送给下游的Bolt

3.一个Bolt对词频进行统计,把统计结果记录在count字段并存储

如果使用Trident我们可以使用一下代码完成上述操作:

1 FixedBatchSpout spout = new FixedBatchSpout(new Fields("setence"),3,2 new Values("the cow jump over the moon"),3 new Values("the man went to the store and bought some candy"),4 new Values("four score and seven years ago"),5 new Values("how many apples can you eat"));6 spout.setCycle(true);7 TridentTopology topology = newTridentTopology();8 TridentState workcount = topology.newStream("spout",spout)9 .each(new Fields("setence"),new Split(),new Fields("word"))10 .groupBy(new Fields("word"))11 .persistentAggregate(new MemoryMapState.Factory(),new Count(),new Fields("count"))12 .parallelismHint(6);

上述这段代码会被Trident框架转为为使用Storm开发时的三个步骤

代码的前两行使用FixedBatchSpout不断循环生成参数里列出的四个句子,第7行声明了TridentTopology对象,并在第8行的newStream方法中引用了FixedBatchSpout。Trident是按批处理数据的,FixedBatchSpout生成的数据是按照下图的方式一批一批的发送到下一个处理单元的,后续处理单元也是按照这种方式把数据发送到其他节点。

在上述的第9行使用Split对文本分词,并发分词结果存储到Word字段中Split的定义如下:

1 public classSplit extends BaseFunction {2

3 @Override4 public voidexecute(TridentTuple tuple, TridentCollector collector) {5 for(String word: tuple.getString(0).split(" ")) {6 if(word.length() > 0) {7 collector.emit(newValues(word));8 }9 }10 }11

12 }

在each方法宏也可以实现过滤,如只统计单词长度超过10个字母长度的单词的过滤可以定义如下:

.each(new Fields("word"), newBaseFilter() {publicboolean isKeep(TridentTuple tuple) {return tuple.getString(0).length() > 10;

}

})

代码gropuBy(new Fields("word"))对word进行聚集操作,并在其后使用Count对象进行计数。之后将得到的结果储存到内存中。Trident不仅支持将结果存储到内存中,也支持将结果存储到其他的介质,如数据库,Memcached。如要将最终结果以key-value的方式存储到Memcached,可以使用下面的方式:

persistentAggregate(new Memcached.transactional(local),new Count(),new Fields("count"))

实时任务的关键问题是如何处理对数据更新的幂等问题,任务可能失败或则重启,因此更新操作可能被重复执行。以上述为例,发送到Count的数据可能因为节点的重启或则网络故障导致的其他原因致使被重复发送,从而引起数据的重复统计,为了避免这个问题Trident提供了事物支持,由于数据是按批发送到Count节点的,Trident对每批单词都分配一个Transaction id。上面的代码中,每完成一批单词的统计,就将这批数据的统计结果连同Transaction id一起存储到Memcached中。数据更新的时候,Trident会比较Memcached中的Transaction id和新到达数据的Transaction id,如果同一批数据被重复发送,其Transaction id就会等于Memcached存储的Transaction id,新数据将会被忽略。另外每批数据的Transaction id是有严格的顺序的Transaction id 为2的数据没有处理完的情况下,绝对不会处理Transaction id为3的数据。

有时,一个任务有多个数据源,每一个数据源都是以TridentState的形式出现在任务定义中的,比如上面提到的wordcount任务生成的数据就可以被其他的任务所使用,可以使用stateQuery方法引用别的TridentState,stateQuery的定义如下:

Stream stateQuery(TridentState state, QueryFunction function, Fields functionFields)

Trident的数据模型称作"TridentTuple"---带名字的Values列表。在Topology中,tuple是在顺序的操作集合中增量生成的。Operation通常包含一组输入字段和提交的功能字段。Operation的输入字段通常是将tuple中的一个子集作为操作集合的输入,而功能字段则是命名提交的字段。

例如,声明一个名为"students"的Stream,可能包含名字,性别,学号,分数等字段。添加一个按分数过滤的过滤器ScoreFilter,使得tuples只过滤分数大于60的学生,定义一个分数过滤器,当选择输入字段的时候Trident会自动过滤出一个子集,该操作十分的高效。

classScoreFilter extends BaseFilter{publicboolean isKeep(TridentTuple tuple) {return tuple.getInteger(0) >= 60;

}

}

如果我们相对字段进行计算,并且提交给TridentTuple,可以模拟一下计算。

classAddAndSubFuction extends BaseFunction{public voidexecute(TridentTuple tuple, TridentCollector collector) {int res1 = tuple.getInteger(0);int res2 = tuple.getInteger(1);int sub = res1 > res2 ? res1 - res2 : res2 -res1;

collector.emit(new Values(res1+res2,sub));

}

}

此函数接收两个整数作为参数,并计算两个数的和以及差,作为两个新的Fields提交。

storm mysql trident_Storm Trident详解相关推荐

  1. storm mysql trident_storm trident实战 trident state

    一.认识storm trident trident可以理解为storm批处理的高级抽象,提供了分组.分区.聚合.函数等操作,提供一致性和恰好一次处理的语义. 1)元祖被作为batch处理 2)每个ba ...

  2. storm mysql trident_Storm Trident状态

    Trident中有对状态数据进行读取和写入操作的一流抽象工具.状态既可以保存在拓扑内部,比如保存在内容中并由HDFS存储,也可以通过外部存储(比如Memcached或Cassandra)存储在数据库中 ...

  3. Mysql加锁过程详解(3)-关于mysql 幻读理解

    Mysql加锁过程详解(1)-基本知识 Mysql加锁过程详解(2)-关于mysql 幻读理解 Mysql加锁过程详解(3)-关于mysql 幻读理解 Mysql加锁过程详解(4)-select fo ...

  4. mysql status改变_mysql 配置详解mysql SHOW STATUS 详解

    1. back_log 指定MySQL可能的连接数量.当MySQL主线程在很短的时间内得到非常多的连接请求,该参数就起作用,之后主线程花些时间(尽管很短)检查连接并且启动一个新线程. back_log ...

  5. MySQL 表分区详解MyiSam引擎和InnoDb 区别(实测)

    MySQL 表分区详解MyiSam引擎和InnoDb 区别(实测) 一.什么是表分区 通俗地讲表分区是将一大表,根据条件分割成若干个小表.mysql5.1开始支持数据表分区了. 如:某用户表的记录超过 ...

  6. Mysql存储引擎详解(MyISAM与InnoDB的区别)

    Mysql存储引擎详解(MyISAM与InnoDB的区别) 存储引擎     MySQL中的数据用各种不同的技术存储在文件(或者内存)中.这些技术中的每一种技术都使用不同的存储机制.索引技巧.锁定水平 ...

  7. MySQL的Limit详解(转载)

    MySQL的Limit详解 问题:数据库查询语句,如何只返回一部分数据? Top子句 TOP 子句用于规定要返回的记录的数目.对于拥有数千条记录的大型表来说,TOP 子句是非常有用的. 在SQL Se ...

  8. Mysql加锁过程详解(2)-关于mysql 幻读理解

    Mysql加锁过程详解(1)-基本知识 Mysql加锁过程详解(2)-关于mysql 幻读理解 Mysql加锁过程详解(3)-关于mysql 幻读理解 Mysql加锁过程详解(4)-select fo ...

  9. 数据库mysql_row_MYSQL数据库mysql found_row()使用详解

    <MYSQL数据库mysql found_row()使用详解>要点: 本文介绍了MYSQL数据库mysql found_row()使用详解,希望对您有用.如果有疑问,可以联系我们. mys ...

最新文章

  1. 谷歌简单粗暴“复制-粘贴”数据增广,刷新COCO目标检测与实例分割新高度
  2. 你的电池再充几次电就报废?机器学习帮你预测电池寿命
  3. .net多线程 Thread
  4. 文件操作模式扩展、游标操作
  5. 三、Java Web中出现的一些乱码问题总结(详解)
  6. 微机个人笔记-随机存取存储器(RAM)
  7. 集合的划分(信息学奥赛一本通-T1315)
  8. 特征筛选4——斯皮尔曼相关系数筛选特征(单变量筛选)
  9. 苹果锤完Facebook 又轮到谷歌了
  10. Python中的快捷键和注释方式
  11. 关于 动态分流系统 ABTestingGateway 的想法
  12. C# 表格跨行和跨列应用实例
  13. 适合编写C语言代码的编程软件有哪些?大学生赶紧行动起来!
  14. Qt下汉字转拼音,包含二级汉字
  15. win7系统服务器管理器在哪里找,win7打开服务管理器
  16. 以实例说明,网吧组网方案详解!(转)
  17. try的动词用法_try的用法
  18. 到底什么是IaaS、PaaS、SaaS?
  19. 迷宫问题的求解(广度和深度优先搜索)
  20. gitee 链接报错

热门文章

  1. php+laravel+百度智能云人脸识别详解
  2. 中国非正常死亡最高的行业 TOP 10
  3. vue项目中动态创建模块以满足客户定制化需求的解决方案
  4. 【新年快乐】嗷,我的2020年,就这样悄咪咪地溜走了
  5. Monthly expense(二分)
  6. unity 人物走动声音_Unity3D实现人物走动 教程
  7. Windows7系统使用技巧(如何让你的win7用的更酷)
  8. 二叉树先序遍历(递归+迭代)——java
  9. Bootstrap的响应式导航栏
  10. 2022区块链应用全景及未来展望