Storm0.7.0实现了一个新特性——事务性拓扑,这一特性使消息在语义上确保你可以安全的方式重发消息,并保证它们只会被处理一次。在不支持事务性拓扑的情况下,你无法在准确性,可扩展性,以空错性上得到保证的前提下完成计算。

NOTE:事务性拓扑是一个构建于标准Storm spoutbolt之上的抽象概念。

设计

在事务性拓扑中,Storm以并行和顺序处理混合的方式处理元组。spout并行分批创建供bolt处理的元组(译者注:下文将这种分批创建、分批处理的元组称做批次)。其中一些bolt作为提交者以严格有序的方式提交处理过的批次。这意味着如果你有每批五个元组的两个批次,将有两个元组被bolt并行处理,但是直到提交者成功提交了第一个元组之后,才会提交第二个元组。

NOTE: 使用事务性拓扑时,数据源要能够重发批次,有时候甚至要重复多次。因此确认你的数据源——你连接到的那个spout——具备这个能力。 这个过程可以被描述为两个阶段: 处理阶段 纯并行阶段,许多批次同时处理。 提交阶段 严格有序阶段,直到批次一成功提交之后,才会提交批次二。 这两个阶段合起来称为一个Storm事务。

NOTE: Storm使用zookeeper储存事务元数据,默认情况下就是拓扑使用的那个zookeeper。你可以修改以下两个配置参数键指定其它的zookeeper——transactional.zookeeper.servers和transactional.zookeeper.port。

接下来就看看如何在一个事务性拓扑中实现spout

Spout

一个事务性拓扑的spout与标准spout完全不同。

public class TestTransactionalSpout extends BaseTransactionalSpout<TransactionMetadata>{

正如你在这个类定义中看到的,TestTransactionalSpout继承了带范型的BaseTransactionalSpout。指定的范型类型的对象是事务元数据集合。


协调者Coordinator
下面是本例的协调者实现。

public static class TestTransactionalSpoutCoordinator implements ITransactionalSpout.Coordinator<TransactionMetadata> {TransactionMetadata lastTransactionMetadata;public TestTransactionalSpoutCoordinator () {}@Overridepublic TransactionMetadata initializeTransaction(BigInteger txid, TransactionMetadata prevMetadata) {//处理代码}@Overridepublic boolean isReady() {return true;}@Overridepublic void close() {}
}

值得一提的是,在整个拓扑中只会有一个提交者实例

第一个方法是isReady。在initializeTransaction之前调用它确认数据源已就绪并可读取。此方法应当相应的返回truefalse

最后,执行initializeTransaction。正如你看到的,它接收txidprevMetadata作为参数。第一个参数是Storm生成的事务ID,作为批次的惟一性标识。prevMetadata是协调器生成的前一个事务元数据对象。

在这个例子中,首先确认有多少tweets可读。只要确认了这一点,就创建一个TransactionMetadata对象。元数据对象一经返回,Storm把它跟txid一起保存在zookeeper。这样就确保了一旦发生故障,Storm可以利用分发器(译者注:Emitter,见下文)重新发送批次。

Emitter

创建事务性spout的最后一步是实现分发器(Emitter)。实现如下:

public static class TestTransactionalSpoutEmitter implements ITransactionalSpout.Emitter<TransactionMetadata> {public TestTransactionalSpoutEmitter() {}@Overridepublic void emitBatch(TransactionAttempt tx, TransactionMetadata coordinatorMeta, BatchOutputCollector collector) {//处理代码/**分发器从数据源读取数据并从数据流组发送数据。分发器应当问题能够为相同的事务id和事务元数据发送相同的批次。这样,如果在处理批次的过程中发生了故障,Storm就能够利用分发器重复相同的事务id和事务元数据,并确保批次已经重复过了.*/}@Overridepublic void cleanupBefore(BigInteger txid) {}@Overridepublic void close() {}</pre>
<pre>
}

在这里emitBatch是个重要方法。

Bolts

首先看一下这个拓扑中的标准bolt

public class TestSplitterBolt implements IBasicBolt{private static final long serialVersionUID = 1L;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("users", new Fields("txid","id","test"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {//业务代码}@Overridepublic void cleanup(){}
}

TestSplitterBolt接收元组。HashtagSplitterBolt的实现。

public class HashtagSplitterBolt implements IBasicBolt{private static final long serialVersionUID = 1L;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("hashtags", new Fields("txid","tweet_id","hashtag"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {}@Oerridepublic void execute(Tuple input, BasicOutputCollector collector) {//业务代码}@Overridepublic void cleanup(){}
}

现在看看TestHashTagJoinBolt的实现。首先要注意的是它是一个BaseBatchBolt。这意味着,execute方法会操作接收到的元组,但是不会分发新的元组。批次完成时,Storm会调用finishBatch方法。

public void execute(Tuple tuple) {//业务代码
}

在批次处理完成时,调用finishBatch方法。

@Override
public void finishBatch() {//后续处理代码
}

提交者bolts

我们已经学习了,批次通过协调器和分发器怎样在拓扑中传递。在拓扑中,这些批次中的元组以并行的,没有特定次序的方式处理。

在这里向数据库保存提交的最后一个事务ID。为什么要这样做?记住,如果事务失败了,Storm将会尽可能多的重复必要的次数。如果你不确定已经处理了这个事务,你就会多算,事务拓扑也就没有用了。所以请记住:保存最后提交的事务ID,并在提交前检查。

分区的事务Spouts
对一个spout来说,从一个分区集合中读取批次是很普通的。通过实现IPartitionedTransactionalSpout,Storm提供了一些工具用来管理每个分区的状态并保证重播的能力。
下面我们修改TestTransactionalSpout,使它可以处理数据分区。
首先,继承BasePartitionedTransactionalSpout,它实现了IPartitionedTransactionalSpout

public class TestPartitionedTransactionalSpout extendsBasePartitionedTransactionalSpout<TransactionMetadata> {
...
}

然后告诉Storm谁是你的协调器。

public static class TestPartitionedTransactionalCoordinator implements Coordinator {@Overridepublic int numPartitions() {return 4;}@Overridepublic boolean isReady() {return true;}@Overridepublic void close() {}
}

在这个例子里,协调器很简单。numPartitions方法,告诉Storm一共有多少分区。而且你要注意,不要返回任何元数据。对于IPartitionedTransactionalSpout,元数据由分发器直接管理。
下面是分发器的实现:

public static class TestPartitionedTransactionalEmitterimplements Emitter<TransactionMetadata> {@Overridepublic TransactionMetadata emitPartitionBatchNew(TransactionAttempt tx,BatchOutputCollector collector, int partition,TransactionMetadata lastPartitioonMeta) {//业务处理代码}@Overridepublic void emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector,int partition, TransactionMetadata partitionMeta) {//业务处理代码}@Overridepublic void close() {}
}

这里有两个重要的方法,emitPartitionBatchNew,和emitPartitionBatch。对于emitPartitionBatchNew,从Storm接收分区参数,该参数决定应该从哪个分区读取批次。在这个方法中,决定获取哪些数据,生成相应的元数据对象,调用emitPartitionBatch,返回元数据对象,并且元数据对象会在方法返回时立即保存到zookeeper。
Storm会为每一个分区发送相同的事务ID,表示一个事务贯穿了所有数据分区。通过emitPartitionBatch读取分区中的数据,并向拓扑分发批次。如果批次处理失败了,Storm将会调用emitPartitionBatch利用保存下来的元数据重复这个批次。

模糊的事务性拓扑

到目前为止,你可能已经学会了如何让拥有相同事务ID的批次在出错时重播。但是在有些场景下这样做可能就不太合适了。然后会发生什么呢?

事实证明,你仍然可以实现在语义上精确的事务,不过这需要更多的开发工作,你要记录由Storm重复的事务之前的状态。既然能在不同时刻为相同的事务ID得到不同的元组,你就需要把事务重置到之前的状态,并从那里继续。另外,在之前的一个事务被取消时,每个并行处理的事务都要被取消。这是为了确保你没有丢失任何数据。

你的spout可以实现IOpaquePartitionedTransactionalSpout,而且正如你看到的,协调器和分发器也很简单。

public static class TestOpaquePartitionedTransactionalSpoutCoordinator implements IOpaquePartitionedTransactionalSpout.Coordinator {@Overridepublic boolean isReady() {return true;}
}public static class TestOpaquePartitionedTransactionalSpoutEmitterimplements IOpaquePartitionedTransactionalSpout.Emitter<TransactionMetadata> {@Overridepublic TransactionMetadata emitPartitionBatch(TransactionAttempt tx,BatchOutputCollector collector, int partion,TransactionMetadata lastPartitonMeta) {//处理代码return null;}private void emitMessage(TransactionAttempt tx, BatchOutputCollector collector,int partition, TransactionMetadata partitionMeta) {//处理代码}@Overridepublic int numPartitions() {return 4;}@Overridepublic void close() {}
}

最有趣的方法是emitPartitionBatch,它获取之前提交的元数据。你要用它生成批次。这个批次不需要与之前的那个一致,你可能根本无法创建完全一样的批次。剩余的工作由提交器bolts借助之前的状态完成。

转载于:https://blog.51cto.com/tener/1736754

Storm - 事务管理相关推荐

  1. Spring之事务管理配置

    1. 基于注解的事务配置 1. 在需要添加事务的方法上加上@Transactional注解 2. Spring的配置文件中配置事务管理器 1 <!-- 添加事务管理器组件DataSourceTr ...

  2. Spring事务管理只对出现运行期异常进行回滚

    使用spring难免要用到spring的事务管理,要用事务管理又会很自然的选择声明式的事务管理,在spring的文档中说道,spring声明式事务管理默认对非检查型异常和运行时异常进行事务回滚,而对检 ...

  3. mybatis源码分析之事务管理器

    2019独角兽企业重金招聘Python工程师标准>>> 上一篇:mybatis源码分析之Configuration 主要分析了构建SqlSessionFactory的过程中配置文件的 ...

  4. Spring事务管理 与 SpringAOP

    1,Spring事务的核心接口 Spring事务管理的实现有许多细节,如果对整个接口框架有个大体了解会非常有利于我们理解事务,下面通过讲解Spring的事务接口来了解Spring实现事务的具体策略.  ...

  5. Spring事务管理3----声明式事务管理(1)

     声明式事务管理(1)基于    基于 tx/aop  这种事务管理相比编程式事务管理来说对业务层基本没有改动,通过  TransactionProxyFactoryBean 创建业务层的代理,通过A ...

  6. 什么是事务的传播_这么漂亮的Spring事务管理详解,你不来看看?

    事务概念回顾 什么是事务? 事务是逻辑上的一组操作,要么都执行,要么都不执行. 事物的特性(ACID): 原子性: 事务是最小的执行单位,不允许分割.事务的原子性确保动作要么全部完成,要么完全不起作用 ...

  7. SpringMVC+MyBatis 事务管理一

    前言 spring事务管理包含两种情况,编程式事务.声明式事务.而声明式事务又包括基于注解@Transactional和tx+aop的方式.那么本文先分析编程式注解事务和基于注解的声明式事务. 编程式 ...

  8. Spring中的事务管理详解

    在这里主要介绍Spring对事务管理的一些理论知识,实战方面参考上一篇博文: http://www.cnblogs.com/longshiyVip/p/5061547.html 1. 事务简介: 事务 ...

  9. Spring 使用注解方式进行事务管理

    2019独角兽企业重金招聘Python工程师标准>>> 大家在使用spring的注解式事务管理时,对事务的传播行为和隔离级别可能有点不知所措,下边就详细的介绍下以备方便查阅. 事物注 ...

  10. (转)使用Spring配置文件实现事务管理

    http://blog.csdn.net/yerenyuan_pku/article/details/52886207 前面我们讲解了使用Spring注解方式来管理事务,现在我们就来学习使用Sprin ...

最新文章

  1. DICOM医学图像处理:Dcmtk与fo-dicom保存文件的不同设计模式之“同步VS异步”+“单线程VS多线程”...
  2. Angular CLI版本问题(Your global Angular CLI version (12.2.7) is greater than your local version (9.0.3))
  3. C 图像处理 颜色相关宏定义
  4. Redash 9安装与配置(基于Docker方式)
  5. 如何进行正确的SQL性能优化
  6. _mount_vendor
  7. python技巧 pdf-求教使用python库提取pdf的方法?
  8. 如何将Windows 7 RC升级到RTM(最终版本)
  9. RK3288 USB触摸屏无法使用,需要添加PID和VID
  10. 全志 起家产品 A31S四核:昂达V819mini平板试玩
  11. 【保研记录】2020年信工所二室(第三批)预推免面试经验分享
  12. java digester map_Digester学习笔记
  13. 性质:自反、反自反、对称、反对称、传递、互斥
  14. 千帆竞发待东风――乱弹网络游戏广告
  15. linux 连接打印机
  16. 景深决定照相机什么特性_什么是景深?
  17. 2022中元节前后几天不出门?前三天后三天不能出门是真的吗?
  18. 【转】图解领带的打法10种
  19. A - 非提的救赎 Fzu-2190 (单调栈)
  20. wifi android透传源代码,【终极版】ESP8266远程控制wifi透传模块带调试app

热门文章

  1. linux文件类型elf,Linux下ELF文件的格式(1)
  2. 通过 SQL Server 视图访问另一个IP地址数据库服务器表的方法
  3. 博乐助手连接服务器出错,Game Center无法连接服务器 Game Center无法连接服务器解决方法...
  4. HiWork告诉你:拿什么来拯救你,我的时间!
  5. 【JZOJ B组】【NOI2002】贪吃的九头龙
  6. 数据类型和运算符(使用Python的AI编程2部1单元2课)
  7. PageOffice常用功能之-OA系统中的文档在线编辑及流转
  8. python爬取酷狗音乐的mv地址_python爬取酷狗音乐排行榜
  9. 2020年最全最好用的在线文档盘点,建议收藏
  10. Python Pandas缺失值处理