2019独角兽企业重金招聘Python工程师标准>>>

在《 modb 开发之需求和总体设计 》中,第一个要实现的功能点就是 “支持多消费者单生产者”。下面就讲解下设计这个功能的。

【功能的具体化】
      首先,因为经我改造后的 rabbitmq-c 客户端程序是基于 libevent 的,所以天然可以做到单线程中同时处理多 TCP 连接。理论上,可以对外宣称”该客户端程序支持任意数量生产者和消费者的组合“,而不用担心多线程切换的开销。唯一可能拖慢整个系统的地方,是在 Consumer 针对 RabbitMQ 服务器应答进行回调处理的过程中,即回调处理函数中不能,或者说不应该,进行耗时的操作。

a. 处理应答的回调函数中可能需要实现的处理逻辑有哪些?

  1. 获取本地应用发来的 rabbitmq msg,再按目的地(routing_key)进行纯转发动作。对应模型为【1P + 1C + 内部转发 Queue】,运行在单独一个线程中。其中 P 代表 Producer,C 代表 Consumer,均是 RabbitMQ 里的概念。
  2. 获取外部应用发来的 rabbitmq msg,提取消息中含有的 sql 语句并执行,执行成功后再发送 rabbitmq msg 通知本地应用数据库更新成功。对应模型为【1P + nC + 内部 sql 处理 Queue】,运行于两个线程中,一个用于 rabbitmq 相关消息处理和转发,另一用于 sql 处理。
  3. 需要支持 json 或/和 XML 解析。

b. 采用单线模型还是多线程模型(每个线程中都有独自的 event_base)?
      对于功能 1,单线程足矣;对于功能 2,目前看来至少需要2个线程,即 1P + nC 使用一个线程,sql 处理使用单独一个线程。

c. 1P + 1C 和 1P + nC 是否可以或者应该在一个线程中处理?
      按照前文的说明,一个线程中可以处理任意多 TCP 连接(当前改造后的 rabbitmq-c 的基础前提是:一个 TCP 连接上仅使用一个 channel,所以每个 Producer 和 Consumer 都需要独占一个 TCP 连接),所以关键问题是是否应该这么做。对于 Consumer 来说,因为可读事件是会被即时检测的,所以对于两种业务模型的差别就在于回调函数的实现:前者仅需关注转发目的地,后者需要额外的线程来执行 sql 语句。
      一种可能的执行流程如下:Consumer 在回调函数处理中将待执行 sql 语句提取出来后放入内部 sql 处理 Queue 中,待 sql 处理线程从中提取并完成 sql 执行后,将结果 push 进另外一个内部 sql_result Queue 中,而 Producer 会通过定时检测的方式获取 sql_result Queue 中的结果,并发送到用于通知本地业务的 exchange 上。
      上述流程没有详细说明的细节:Consumer 从 RabbitMQ 服务器消费消息时采用的是”自动应答“模式还是”手动应答“模式,这将会对消息的可靠性产生影响,间接影响到数据的一致性。因为在”自动应答“模式下,消息一旦被消费就会被 RabbitMQ 服务器删除掉,而一旦在 sql 执行过程中出现异常,则会导致数据不一致,需要自行实现某种机制保证数据的一致性。若在”手动应答“模式下,则可能的一种实现方式为,针对从 RabbitMQ 消费消息的应答,需要等待 sql 执行完成后才能进行。一则会拖慢整个事件驱动的轮转,二则当 sql 执行失败后,即使使用 REJECT 信令告之 RabbitMQ 服务器无法正确处理,该消息仍然会被删除掉,依然会有数据不一致的问题存在。如果使用了 带有 requeue 属性的 REJECT 信令呢?结果还是一样的,因为只有唯一一个 Consumer 去消费该消息,当再次重新消费该消息时,之前出现的 sql 异常可能还会出现。看来只有华山一条路了,自己实现某种纠错机制,保证数据的一致性。(关于纠错的问题,后续博客再说)

【实现中遇到的挫折】

a. rabbitmq 线程与 sql 执行线程之间的跨线程通信手段
      如何在 sql 线程执行完成后,告之 Producer 去发送通知消息给其他业务。手段可能有很多种,但是考虑到 rabbitmq 线程是基于 libevent 运转的,这就需要一点小技巧了,后续有单独博客进行说明。

b. 消息传递 Queue 的使用
      在最初的设计中,【1P + 1C + 内部转发 Queue】模型里使用的内部转发 Queue 是基于 Producer 和 Consumer 所使用的 conn 连接的,即每一个 conn 上都有这样一个 Queue 的存在。当开始调试【1P + 1C】功能时,发现当 Consumer 拿到 msg 后只能将其存放在自身 conn 上的 Queue 中,而 Producer 无法直接获得这些 msg (当然可以借助外部处理来 pop 和 push ,但似乎不是个好方案),所以决定还是采用全局 msgQ(转发 msg 用)的方式来处理。
      在改为全局 Queue 后又发现一个新问题,即 Consumer 对 msg 的接收具有即时性,但 Producer 对 msg 的发送却无法做到。原因在于,可以针对 Consumer 对应 conn 的 sockfd 监听可读和超时事件,但针对 Producer 对应 conn 的 sockfd 却不能去监听可写事件。这就导致了一个问题,即 Producer 只能按照定时器指定的时间发送 msg 。换句话说,即使设置的定时时间再短,也达不到即时发送的效果。那么如何解决这个问题呢?留个思考给大家把~~

(在只使用可读、可写、超时事件的情况下应该是无解的,估计可以利用信号事件解决)

最后附上一张【1P + 1C】的结构图:

=== 未完待续 ===

转载于:https://my.oschina.net/moooofly/blog/184231

【原创】modb 功能设计之“支持多消费者单生产者”相关推荐

  1. 【原创】modb 功能设计之“支持部分MySQL客户端协议”-1

          最初的想法是,rabbitmq 客户端从 queue 消费到了包含 sql 语句的消息后,需要提取并分析该 sql 后,通过 MySQL 协议再要求数据库执行该 sql 语句.这就要求我这 ...

  2. 【原创】modb 功能设计之“跨线程通信”

    2019独角兽企业重金招聘Python工程师标准>>> 在< modb 功能设计之"支持多消费者单生产者" >中曾提到,需要解决"rabbi ...

  3. 操作系统(二十五)吸烟者问题-单生产者多消费者问题

    2.3.8 吸烟者问题-单生产者多消费者问题 假设一个系统有三个抽烟者进程和一个供应者进程.每个抽烟者不停地卷烟并抽掉它,但是要卷起并抽掉一支烟,抽烟者需要有三种材料:烟草.纸和胶水.三个抽烟者中,第 ...

  4. 并发无锁队列学习(单生产者单消费者模型)

    1.引言 本文介绍单生产者单消费者模型的队列.依据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种. 单生产者单消费者模型的队列操作过程是不须要进行加锁的.生产 ...

  5. RabbitMQ入门学习系列(二),单生产者消费者

    友情提示 我对我的文章负责,发现好多网上的文章 没有实践,都发出来的,让人走很多弯路,如果你在我的文章中遇到无法实现,或者无法走通的问题.可以直接在公众号<爱码农爱生活 >留言.必定会再次 ...

  6. 基于jeecgboot的支持online表单审批的功能正式发布

    基于jeecgboot的flowable流程支持online表单审批线上发布一个完整的版本,利用原有online表单功能,通过选择现有online表单数据进行审批申请与操作,基本支持各自表单包括主从表 ...

  7. CPFair:推荐系统的个性化消费者和生产者公平重新排序

    摘要 最近,人们越来越意识到,当机器学习 (ML) 算法用于自动化选择时,他们可能会不公平地对待/影响个人,具有法律.道德或经济后果.推荐系统是此类 ML 系统的重要示例,帮助用户做出高风险的判断. ...

  8. 用三个线程实现生产者消费者模型,其中一个线程作为生产者,二个线程作为消费者,生产者随机生产一个时间戳或者字符串,消费者消费这个时间戳,并不能重复消费,并将其打印出来

    题目要求: 用三个线程实现生产者消费者模型,其中一个线程作为生产者,二个线程作为消费者,生产者随机生产一个时间戳或者字符串,消费者消费这个时间戳,并不能重复消费,并将其打印出来.(这是一道百度面试的算 ...

  9. java并发:初探消费者和生产者模式

    消费者和生产者模式 用继承Thread方式,用wait和notifyAll方法实现. 消费者和生产者模式的特点 1. 什么时候生产:仓库没有满的时候,生产者这可以生产,消费者也可以消费,仓库满的时候停 ...

最新文章

  1. java-结合c3p0封装的db 事务 类
  2. golang strings Replace 字符串替换
  3. Centos进入紧急模式解决方法
  4. JAVA中一维数组的作用,JAVA中一维数组和二维数组的定义
  5. 服务器双网卡冗余备份技术的实现
  6. android界面元素识别,Android 10不能使用uiautomatorviewer定位元素的终极解决方法
  7. 从java读取Excel继续说大道至简 .
  8. Aliyun mysql配置 远程访问 10038
  9. 《UnityAPI.Transform变换》(Yanlz+Unity+SteamVR+云技术+5G+AI+VR云游戏+Transform+eulerAngles+LookAt+立钻哥哥++OK++)
  10. c51 c语言 16位二进制转换为bcd码,16位二进制转换为BCD码的C51汇编程序.doc
  11. UVA 10451 Ancient Village Sports UVA 11909 Soya Milk UVA 11152 Colourful Flowers
  12. 自己使用的jquery公用common.js
  13. 10G至40G互连方案-40G QSFP+ PSM4单模光模块
  14. 零基础成为3D游戏建模师需要多久?
  15. 什么是 ETL ?什么是 ELT ?
  16. Python中range()函数的用法
  17. 你真的会使用github吗?
  18. 企业微信外部群和内部群有什么区别?如何创建外部群
  19. Kali Linux安装
  20. CLC(ClearCarry Flag)

热门文章

  1. android 跨进程点击方式总结
  2. 关于数据库的水平切分和垂直切分的一些概念(转)
  3. WCF学习之旅—WCF概述(四)
  4. Jquery导航条淡进淡出相册(动态无刷新加载数据库数据)
  5. MySQL中处理Null时要注意两大陷阱
  6. 满足StrataFlash嵌入式存储器要求的LDO应用电路
  7. RabbitMq(十四)消息的事务支持及代码演示
  8. RocketMQ源码解析-Producer消息发送
  9. 深入理解K8S网络原理上
  10. 总结一下切换git地址 重合代码的过程