enode框架系列step by step文章系列索引:

上一篇文章,简单介绍了enode框架中消息队列的设计思路,本文介绍一下enode框架中关系消息的重试机制的设计思路。

对于一个EDA架构为基础的框架,核心就是消息驱动,然后基于最终一致性的原则。所以,非常重要的一点是,如果消息一次执行不成功,那该怎么办?我能想到的对策就是消息的重试。我发现,这篇文章比较难写,因为感觉要把复杂的事情清晰的表达出来,感觉确实不容易。说到重试,那什么是消息的重试呢?怎么重试呢?我这里提到的重试是指,一个消息,从消息队列取出来后,要处理,但是处理失败了,然后要重新尝试再处理该消息;怎么重试?这个问题比较复杂,不能用简单的一两句话来说明。

上面说到,如果消息处理失败要再重试,其实是一个比较粗的回答。因为比如一个消息在处理的时候总共有5个步骤,如果前2步都成功,但是第3步失败了,那重试的时候,前2步还需要再执行吗?我的想法是,在能办到的情况下,就不要再做前2步操作了,而是直接从第3步开始重试。所以说,这种做法相当于是“哪里跌倒,哪里继续”;

那么怎么重试呢?

经过分析,我们发现整个enode框架中需要重试的点是非常多的,比如command产生的event要发送到队列时,如果失败那需要重试;比如event持久化时失败了,也需要重试,等等。所以,显而易见,我们应该设计一个可以被重用的重试服务,提供对某些特定的重试场景的支持。

我们先来想一下,我们希望有什么样的重试功能。以“event持久化时失败”为例,如果这一步失败,我们希望立马对这个步骤重试几次,比如3次,如果3次内成功了,那就成功了,继续往下做下面的逻辑;如果还是失败了呢?我们难道就放弃了吗?实际上,我们不能放弃,因为一般如果事件持久化失败很有可能是由于网络问题或eventstore有什么问题,而且如果我们就这样放弃了,那很可能整个业务逻辑的流程就被中断了,这样就无法做到数据的最终一致性了。所以,因为这种暂时的IO问题导致的失败,我们不能随便就放弃重试,应该在尝试几次重试仍失败时采取必要的手段,可以在IO恢复时,能自动再处理该消息;但是我们又不能使用当前线程无限制的重试下去,因为这样就导致没办法处理其他的消息了;所以我们自然就能想到:我们应该在消息重试几次仍失败时,将该消息放入一个专门的重试队列,然后有另外一个独立的线程会定时从该队列取出要重试的消息,然后重试这些消息;这样,当IO恢复时,这些消息就能很快被成功处理了;

另外一个问题,那这种专门的重试队列需要支持消息持久化吗?不用,我们只需要内存队列就行了,因为当一个消息还没被完全成功处理前,是不会从message store删除的;所以,就算机器重启了,该消息还是能在该机器重启后被处理的;而当该机器没重启时,该专门重试的内存队列会不断地以独立的线程定时重试该消息;

那这种专门的重试队列需要多少个呢?理论上我们可以为每个需要重试的点都设计一个重试队列来支持,但是这样一方面过于复杂,而且线程多了还会影响系统的性能;所以我们需要权衡一下,只对同一个阶段中要做的所有的事情设计一个重试队列,该阶段中这些要做的事情中有任何一步失败,就都放到该阶段对应的重试队列里;

还有一个问题,如果一个消息在某一次重试时成功了,但是我们希望在成功后继续对该消息做后续的步骤,该如何实现呢?这个问题初想想感觉比较麻烦,因为我们可能已经没有了该消息的一些上下文环境。最重要的是,我们如何知道该消息重试成功后接下来该做什么呢?而且就算知道接下来要做什么了,但是要是我们在做这个下一步的步骤时,要是又失败了呢?是不是也要重试呢?所以,我们发现这里很关键。

经过我的一些思考,我发现,如果一个消息在某个阶段要被处理多个步骤,且有些步骤之间有条件依赖,比如只有在第2步处理的结果是成功时,我们才有必要做后面的3步;正常情况,如果一切顺利,那就是一步步从上往下的去做;但是因为考虑到任何一步可能都会出问题,而且我们希望在任何一步失败然后重试成功后,能继续后续的步骤。所以,基于这些特征,我觉得我们可以设计一种类似回调函数的机制,当某个逻辑执行成功后,执行回调函数,我们可以在回调函数中存放接下来要做的逻辑;显然,我觉得我们需要某种递归的数据结构;为了支持上面这种类似回调函数的需求,我设计了如下的一个数据结构:

/// 一个数据结构,封装了一段要执行的逻辑以及一些相关的上下文信息///

public classActionInfo

{/// 表示某个Action的名字///

public string Name { get; private set; }/// 表示某个Action,封装了一段逻辑///

public Func Action { get; private set; }/// 表示Action执行时所需要的数据信息///

public object Data { get; private set; }/// 表示Action执行成功后,要执行的下一个Action的信息,这里体现出递归///

public ActionInfo Next { get; private set; }public ActionInfo(string name, Func action, objectdata, ActionInfo next)

{if (action == null)

{throw new ArgumentNullException("action");

}

Name=name;

Action=action;

Data=data;

Next=next;

}

}

从上面的代码,我们可以清晰的看到,我们设计了一个简单的数据结构,用来包含要执行的逻辑,该逻辑执行时所需要的参数信息,以及该逻辑执行成功后要做的下一个逻辑;通过上面这个数据结构,我们已经为实现上面的重试需求做好了数据结构方面的准备;

接下来,我们需要想想,如何设计一个重试服务。经过上面的分析,我们只要,我们的重试服务需要两个主要功能:1)对某段逻辑连续重试指定次数;2)将某段逻辑放入重试队列定时重试;对于第一个功能需求,比较简单,直接设计一个递归函数即可,代码如下:

public bool TryAction(string actionName, Func action, intmaxRetryCount)

{return TryRecursively(actionName, (x, y, z) => action(), 0, maxRetryCount);

}private bool TryRecursively(string actionName, Func action, int retriedCount, intmaxRetryCount)

{var success = false;try{

success=action(actionName, retriedCount, maxRetryCount);if (retriedCount > 0)

{

_logger.InfoFormat("Retried action {0} for {1} times.", actionName, retriedCount);

}

}catch(Exception ex)

{

_logger.Error(string.Format("Exception raised when tring action {0}, retrid count {1}.", actionName, retriedCount), ex);

}if(success)

{return true;

}else if (retriedCount

{return TryRecursively(actionName, action, retriedCount + 1, maxRetryCount);

}else{return false;

}

}

调用的代码示例如下:

if (_retryService.TryAction("TrySendEvent", () => TrySendEvent(eventStream), 3))

{

FinishExecution(command, queue);

}

简单说明一下:

当我们要重试时,我们首先调用retryService的TrtAction方法,该方法就是用来支持“对某段逻辑的指定次数的连续重试”。该方法的第一个参数是一个字符串,表示要执行的逻辑的名称,这个名称没什么实际用途,只是帮助我们区分当前在执行的逻辑是哪段逻辑,该名称会在记录日志时使用,方便我们后续通过日志分析到底是哪里出错了,或者重试过了;然后第二个参数表示要重试的某个委托;当然,因为我们要知道该委托内部的逻辑是否处理成功,所以需要一个布尔类型的返回值;最后一个参数则是指定需要连续重试多少次,上面的示例代码表示:先执行指定逻辑,如果失败,则连续重试3次;所以,如果每次都失败,相当于总共会执行4次;上面的代码应该不难理解,就不多分析了;

接下来分析一下第一个需求“将某段逻辑放入重试队列定时重试”:

当连续重试还是失败后,我们就会放入内存队列,然后定时重试了。那么如何定时呢?一般用定时器即可;那定时多少呢?这个目前我也是拍脑袋的,目前设定为5秒。为什么是5秒呢?主要是两个考虑:1)为了不希望太频繁的重试,因为太频繁的重试会占用更多的系统资源,导致会影响框架中正常的消息处理性能;2)因为这种定时的重试对实时性一般不会很高,就是说,比如当IO恢复后,我们一般不会要求马上就能重试,过个几秒甚至几分钟后再重试,也能接受。实际上,如果没有这种自动定时的重试机制,我们可能只能等到机器重启后才能再次被重试了,相比之下,已经非常自动和及时了。

所依,总结一下,我们需要:1)定时器,用于定时执行;2)ActionInfo包装要重试的逻辑的相关信息;3)内存队列,用于存放ActionInfo;所以,代码如下:

public classDefaultRetryService : IRetryService

{private const long DefaultPeriod = 5000;private BlockingCollection _retryQueue = new BlockingCollection(new ConcurrentQueue());privateTimer _timer;privateILogger _logger;private bool_looping;publicDefaultRetryService(ILoggerFactory loggerFactory)

{

_logger=loggerFactory.Create(GetType().Name);

_timer= new Timer(Loop, null, 0, DefaultPeriod);

}public void Initialize(longperiod)

{

_timer.Change(0, period);

}public voidRetryInQueue(ActionInfo actionInfo)

{

_retryQueue.Add(actionInfo);

}private void Loop(objectdata)

{try{if (!_looping)

{

_looping= true;

RetryAction();

_looping= false;

}

}catch(Exception ex)

{

_logger.Error("Exception raised when retring action.", ex);

_looping= false;

}

}private voidRetryAction()

{var actionInfo =_retryQueue.Take();if (actionInfo != null)

{var success = false;try{

success=actionInfo.Action(actionInfo.Data);

_logger.InfoFormat("Executed action {0} from queue.", actionInfo.Name);

}catch(Exception ex)

{

_logger.Error(string.Format("Exception raised when executing action {0}.", actionInfo.Name), ex);

}finally{if(success)

{if (actionInfo.Next != null)

{

_retryQueue.Add(actionInfo.Next);

}

}else{

_retryQueue.Add(actionInfo);

}

}

}

}

}

经过上面的分析后,相信大家看代码都应该能理解了。需要注意的点:

我用了BlockingCollection,这是一个支持并发且支持阻塞的基于publish-consumer模式的集合,而且这里,该集合内部封装了ConcurrentQueue,所以,他也是一个队列;这样设计的好处是,在队列中没有元素的时候,线程会被卡住,从而不会浪费资源;只有当队列中有元素时,才会在当天timer周期到来时,能够从队列取出要重试的ActionInfo,然后进行重试操作。

Timer的周期默认设置为5秒,那么,我们为了避免同一时刻,有两个ActionInfo在被同时处理,我加了一个标记位_looping,当当前有ActionIno正在被处理时,则该标记位为true,否则为false。通过该标记位,我们能确保队列中的元素会一个个按顺序被处理,这样就不会混乱,导致莫名其妙的bug出现;

从上面的RetryAction方法中,我们可以看出,当当前的ActionInfo处理成功后,如果下一个ActionInfo存在(Next属性不等于空),则把下一个ActionInfo放入重试队列,等待被处理;通过这样的设计,我们能够以非常统一的方式重试用户希望重试的ActionInfo以及这些ActionInfo重试成功后的回调ActionInfo。另外,如果当前ActionInfo执行失败,则仍然将当前ActionInfo再放回队列,继续重试;

下面我们看一个简单的调用示例吧:

private void CommitAggregate(AggregateRoot dirtyAggregate, ICommand command, IMessageQueuequeue)

{var eventStream =BuildEvents(dirtyAggregate, command);if (_retryService.TryAction("TrySendEvent", () => TrySendEvent(eventStream), 3))

{

FinishExecution(command, queue);

}else{

_retryService.RetryInQueue(newActionInfo("TrySendEvent",

(obj)=> TrySendEvent(obj asEventStream),

eventStream,newActionInfo("SendEventSuccessAction",

(obj)=>{var data = obj asdynamic;var currentCommand = data.Command asICommand;var currentQueue = data.Queue as IMessageQueue;

FinishExecution(currentCommand, currentQueue);return true;

},new { Command = command, Queue =queue },null)));

}

}

说明:

上面的代码是在一个command执行完成后对于产生的事件,框架要提交该聚合根产生的事件;通过BuildEvents方法获取聚合根上产生的事件,然后我们接下来是尝试将该事件发送到一个事件队列,但是因为该事件队列在消息入队时会持久化消息,也就是会有IO操作,所以就可能失败,所以我们先尝试执行一次,如果失败则立马连续尝试重试3次,如果这4次中任意一次成功了,则做成功的逻辑,上例是调用FinishExecution方法;如果这4次都失败,则进入else的逻辑,即放入队列定时重试,但是我们希望在放入队列重试时如果某一次重试成功了也需要保证能调用FinishExecution方法,所以也定义了一个回调的ActionInfo。最后,为了尽量让每个ActionInfo所需要的参数信息语义明确,避免语言层面的闭包等复杂难理解的问题,我们尽量将ActionInfo中的Action所需要的参数信息明确的设置到ActionInfo上,而不是从外层的函数中拿,从外层的函数中拿,要是再多线程时,容易出现问题,而且也容易引起代码修改导致的难以检查出来的闭包问题;当然,这里,大家可以看到我使用了匿名对象,我是偷懒了,如果希望性能更高,则可以显示定义一个类来封装需要的参数信息;

总结:

本文通过代码加思路的方式大概介绍了enode框架中关于消息重试的设计思路。但是我没有介绍enode中到底哪些点会用到重试机制,有很多,至少五六个地方吧。但我觉得这不是重点了,重点是上面我分析的一些思路,具体需要重试的场景是偏业务性质了,涉及到enode框架中从command开始处理到最后event被发布到query端的整个过程中的每个关键的环节。我觉得通过本文的分析,可以帮助想看代码的朋友更容易理解enode中关于重试方面的代码,这样就够了;关于重试方面,还有一个点没有说,就是command的重试,关于这一点,和本文提到的重试有点不同,我准备专门写一篇文章介绍一下吧。

IM消息重试机制Java实现_消息的重试机制的设计思路相关推荐

  1. 消息队列的使用场景_消息队列MQ的特点、选型及应用场景

    一.什么是消息队列 消息队列(Message Queue,简称MQ),指保存消息的一个容器,本质是个队列. 消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可 ...

  2. pbft共识机制 java实现_区块链开发:共识机制PBFT #C09

    拜占庭将军问题是指系统中除了网络延迟.系统宕机等问题外还存在恶意节点,会进行"精神分裂式"投票. BFT(Byzantine Fault Tolerance)系统是指能够容忍拜占庭 ...

  3. java实现踢下线用户_浅谈踢人下线的设计思路!(附代码实现方案)

    前言 前两天写了一篇文章,主要讲了下java中如何实现踢人下线,原文连接:java中如何踢人下线?封禁某个帐号后使其会话当即掉线!前端 原本只是简单阐述一下踢人下线的业务场景和实现方案,没想到引出那么 ...

  4. java orm设计_大搜车orm框架设计思路

    orm基本概念 ORM,即Object-Relational Mapping(对象关系映射),它的作用是在关系型数据库和业务实体对象之间作一个映射,这样,我们在具体的操作业务对象的时候,就不需要再去和 ...

  5. 7系统内部系统组件禁止休眠_海康监控系统平台设计思路(二)

    设计思路 系统设计过程中充分考虑了各个子系统的信息共享要求,对各子系统进行结构化和标准化设计,通过系统间的各种联动方式将其整合成一个有机的整体,使之成为一套整体的.全方位的综合安防系统,达到人防.物防 ...

  6. beanstalkd java使用_消息队列 beanstalkd 介绍

    首先 好东西 http://kr.github.com/beanstalkd/ 其次 真的是好东西 支持 java , python ,perl,ruby,erlang 和我不知道的 语言 官方的原文 ...

  7. ios个推透传消息json接收不到_消息通讯——Websocket

    说起即时通讯大家应该都听过,像各种聊天软件用到的即时通讯技术是最多的.另外开发过程中实现消息推送最传统的做法就是轮询,即按照特定时间间隔由浏览器对服务器发送请求,以获取最新消息,这种传统模式带来很明显 ...

  8. Java毕业设计_代驾到家服务app的设计与实现

    代驾到家服务app的设计与实现 代驾到家服务app的设计与实现mysql数据库创建语句 代驾到家服务app的设计与实现oracle数据库创建语句 代驾到家服务app的设计与实现sqlserver数据库 ...

  9. Java毕业设计_基于javaEE的论坛的设计和实现

    基于javaEE的论坛的设计和实现 基于javaEE的论坛的设计和实现mysql数据库创建语句 基于javaEE的论坛的设计和实现oracle数据库创建语句 基于javaEE的论坛的设计和实现sqls ...

  10. 分数统计设计java程序_(windows综合程序)设计一个学生平时成绩统计软件 最后的Java作业...

    1.(windows综合程序)设计一个学生平时成绩统计软件.要求: (1) 录入课程名称(进入系统时录入).学生姓名.学号.成绩.日期(自动生成日期并在界面显示),除第一次外其他次数输入只需要录入学号 ...

最新文章

  1. Angular CDK Overlay 弹出覆盖物
  2. java中native的用法[转]
  3. Opengl-阴影(分为定向光的和点光源的)
  4. 【渝粤题库】陕西师范大学189101 消费者行为学Ⅰ 作业(高起专)
  5. xgboost算法_工业大数据:分析算法
  6. 【hibernate merge】session1.merge(T entity)方法的含义和update方法的区别
  7. django,form表单,数据库增删改查
  8. 做柱状图加数据标签_Origin绘图:如何优雅的绘制堆叠柱状图
  9. 以服务器时间为基准显示到某一时间的倒计时
  10. 超级楼梯[HDU2041]
  11. Git学习05 --分支管理02
  12. php mysql表单验证登录_使用PHP和MySql简单身份验证 1
  13. 大数据分析对企业管理的影响
  14. python在冒号处显示语法错误_python for常见语法错误
  15. IPv6基础介绍--IPv6路由基础--DHCPv6原理与配置——总结
  16. 渗透测试常用文件传输方法-Linux篇(如何向Linux服务器中上传文件?) (゚益゚メ) 渗透测试
  17. 2018焦作ICPC E. Resistors in Parallel(打表+大数)
  18. 【Python】Pandas Excel file format cannot be determined, you must specify an engine manually.报错【已解决】
  19. 软件项目管理 6.10.成本预算
  20. 新版chrome浏览器字体编码插件Charset v0.5.5

热门文章

  1. python分布式爬虫_python-分布式爬虫
  2. 高德地图输入地址获取经纬度_不打开地图也能获取地址-利用百度地图API和Python实现...
  3. oracle 11g 01017,oracle 11G OEM 出现问题 ORA-01017: inv
  4. nyoj461 Fibonacci数列(4)解通项公式
  5. mysql银行储蓄额度格式_mysql创建表用于银行储蓄系统
  6. mysql中的utf-8_永远不要在MySQL中使用UTF-8
  7. 变速恒频风电机组的优缺点_风电消防安全解决方案解析
  8. 机器学习- 吴恩达Andrew Ng Coursera学习总结合集,编程作业技巧合集
  9. SwitchyOmega规则列表地址
  10. 极客大学架构师训练营 性能优化 进程 线程 锁 存储 分布式数据库 第14课 听课总结