该模型基于rocketmq 也可以扩展到其他类似任务处理。 使用策略模式 将任务管理 单独 抽出来一个模块。真实应用程序只需实现任务具体处理逻辑 返回成功失败
/* Interface Comment
implement it at rocketexe
*/
type taskDealMsg interface {
DealMsg(msgStruct InputMsg, msgDealCount int) int
}

task-manage 线程

  1. 负责接收消息,将消息存储到 对应客户的task-list 链表中
  2. 周期性处理任务,每次处理遍历客户map,从客户map的task-list 中选取一个头部task。将挑选的task 交由 task-deal 处理线程池,,实现公平调度
    接收task-deal 处理线程的处理结果,是否需要失败重试,如果需要则将task 从当前位置移动至list末尾。如果处理成功从客户map 所属的task-list中删除节点。
  3. 接收退出信号,持久化task-list。由于只有task-deal 线程 执行完成后 才会 删除task节点。即使正在处理消息的task-deal 线程 被强制退出, 也可以保证不丢失消息(消息持久化磁盘)

task-deal 线程池

  1. 负责接收task-manage 传递进来的任务,进行处理
  2. 负责反馈处理结果给task-manage 线程

任务管理策略模式

  1. ms 模式
    上述 task-manage 线程在 周期性处理任务过程中 ,每次遍历map 客户时 一定会选择一个该客户的task list 的一个task处理。(task-list 为空时 删除map中客户)。当可用task-deal gorutines 为0 时,会因等待有可用的task-deal gorutines 阻塞 而不消费新的消息。
    该模式的可能会导致客户间相互影响,当某个时刻 只有某个客户的大量task ,占用 了所有 task-deal gorutines。且每个任务处理周期很长时,其他客户的消息得不到消费。
    ms 意为着以可以处理的最大max-gorutines 为主的一种strategy模式,适合任务处理较块的场景 如缓存推送。没有限制单客户占用的task-deal gorutines数量。

  2. rs 模式
    与ms 模式相比,限制了单个客户的占用task-deal gorutines。限制条件为:当前客户正在处理的任务数 不能 大于 可用的 闲置task-deal gorutines 数。
    上述task-manage 线程 周期性处理任务过程中,每次处理过程 遍历客户map,从客户map的task-list 中选取一个头部task 交由 task-deal 处理线程池,每次迭代处理 每个客户 最多 会被选取一个task进行处理:
    如果当前客户的正在处理任务数 多于 剩余闲置的 task-deal 线程数,则迭代遍历任务时不选取当前客户的task,继续处理选择下一个客户的task。避免上述某个时刻 某个客户占满了 task-deal 线程池且占用时间较久导致其他客户无法被服务。也就是说,下一个客户永远有闲置的线程可以用.直到达到最大并发数。达到最大并发数时,可用task-deal gorutines 为0 时,会因等待有可用的task-deal gorutines 阻塞 而不消费新的消息。
    当 最大并发数未满足,而某个客户却达到了 自身可用的 task-deal gorutines时,会继续接收消息,(并不会妨碍 其他客户的服务),但因此 也可能收到该客户自身的新消息,此时只能继续缓存消息,将该消息 加入 客户的task-list 中。但是因为内存限制不可能无限制加入。如果task-list 达到ReconsumeListLen (ReconsumeListLen)长度时,会通知rocketmq-server 延迟消费(ReconsumerLater), 因此称为Reconsume strategy,该模式较适合 预取 等时间较长的任务。经测试 150w 占用内存300M.

  3. 其他未实现模式
    在rs 模式的基础上不做Reconsume Later 限制,只要是消息就收取,list达到一定长度后 加入数据库(sqlite),另开线程 从sqlite 读取,并发送消息给 taskmanage 线程 删除。该模式在不支持 Reconsume 的消息中间件中必须使用。
    本次实现任务管理策略的过程中 使用了 策略模式的设计模式 且 使用channel 进行数据流传输 非常方便扩展。
    源码 https://github.com/zengxiaobai/task-manage

一个通用的任务管理模型-golang相关推荐

  1. java dao修改语句_一个通用的DAO模型实现增删改查

    首先三个架包: mysql-connector-java-jar commons-dbcp-1.4jar commons-pool-1.5.5jar 导进去: (从上往下一次调用,实现功能) ---- ...

  2. BTA | 张犁 :为什么需要一个通用区块链资产平台?

    在张犁看来,区块链被认为是一个传递资产价值的网络,但直到目前为止,区块链的主要方向还是加密货币,并没有对现实世界中的(物理)资产.服务.虚拟数字资产提出一个通用的解决方案.然后,他对 ERC20 和 ...

  3. 【人工智能 Open AI】设计一个SQL Where DSL模型,使用 golang 代码来实现DSL的翻译成SQL。

    设计一个SQL Where DSL模型,使用 golang 代码来实现DSL的翻译成SQL. 下面是使用golang代码实现DSL模型的示例: package mainimport ("fm ...

  4. 独家 | 哪个更好:一个通用模型还是多个专用模型?

    作者:Samuele Mazzanti翻译:欧阳锦 校对:赵茹萱本文约3900字,建议阅读10分钟本文通过实验验证了一个通用模型优于多个专用模型的有效性的结论. 比较专门针对不同群体训练多个 ML 模 ...

  5. 3模型大小_Github推荐一个国内牛人开发的超轻量级通用人脸检测模型

    Ultra-Light-Fast-Generic-Face-Detector-1MB 1MB轻量级通用人脸检测模型 作者表示该模型设计是为了边缘计算设备以及低功耗设备(如arm)设计的实时超轻量级通用 ...

  6. 写一个通用数据访问组件

    出处:http://www.csharp-corner.com willsound(翻译) 我收到过好多Email来问我如何用一个通用的数据提供者(data provider)在不失自然数据提供者(n ...

  7. 干货!如何设计实现一个通用的分布式事务框架?

    来源:https://www.bytesoft.org/ 一个TCC事务框架需要解决的当然是分布式事务的管理.关于TCC事务机制的介绍,可以参考TCC事务机制简介. TCC事务模型虽然说起来简单,然而 ...

  8. UDSMProt:蛋白质分类通用深度序列模型

    今天给大家介绍由德国弗劳恩霍夫·海因里希·赫兹研究所的研究人员发表在Bioinformatics上的一篇文章.该文章针对大多数蛋白质分类的最先进方法都是为单个分类任务量身定制,并且依赖手工制作特征的问 ...

  9. 数学之美 系列十六 (下)- 不要把所有的鸡蛋放在一个篮子里 最大熵模型

    数学之美 系列十六 (下)- 不要把所有的鸡蛋放在一个篮子里 最大熵模型 我们上次谈到用最大熵模型可以将各种信息综合在一起.我们留下一个问题没有回答,就是如何构造最大熵模型.我们已经所有的最大熵模型都 ...

最新文章

  1. (转)java.lang.OutOfMemoryError: Java heap space错误及处理办法(收集整理、转)
  2. c++层次遍历_动画:二叉树遍历的多种姿势
  3. 【SSL】keytool复制证书
  4. [转]C++中sleep()函数的使用
  5. .NET访问PI数据库
  6. Nginx的Gzip和sendfile的共存问题
  7. c语言中规定,程序中各函数之间,C语言题库-函数_(参考).doc
  8. gRPC in ASP.NET Core 3.x - gRPC 简介(2)
  9. sublime text安装插件出现问题
  10. 第一章:Shiro简介
  11. wpf之默认窗口模板研究
  12. svn 同步 linux,linux SVN 中 配置钩子 实现 线上项目同步
  13. Symfony 2.0 认识Request, Response, Session, Cookie
  14. 安装VMware时提示无效驱动器:D:\的解决办法
  15. C语言数组 :用户输入一个数, 我要用这个数当数组的长度。怎么办呢
  16. VC新潮流,Tiger DAO VC以DAO形式入侵
  17. STM32工程模板简单套用教程(Keil MDK)
  18. catlan数和超级catlan数(施罗德数)
  19. 口碑问答营销推广如何做?广告联盟同样需要
  20. LabVIEW使用NIPM安装软件报错

热门文章

  1. 使用drbd进行磁盘扩容,小磁盘扩容大磁盘后大小未变的问题解决方法
  2. 【Android源码剖析】(API 19)[View-----MeasureSpec]
  3. mysql localhost和127.0.0.1的区别
  4. C#/Net代码精简优化技巧(3)
  5. ['1', '2', '3'].map(parseInt) what why ?
  6. cocos2dx 运动+旋转动画 CCSequence CCAnimation CCAnimate CCMoveTo CCCallFuncN
  7. mvc 普通上传, 图片转二进制上传
  8. Oracle里面的用户smsdb无法登录 LOCKED(TIMED)
  9. Linux - 手册(manual)使用 详解
  10. 最长公共子序列的C++实现---附二维指针的使用方法