一、问题描述

WMB消息总线是58内部提供的消息队列服务。消息队列主要解决了应用间的耦合、异步处理事件、流量削峰填谷等问题,是系统架构不可缺少的组件。

现有的消费者客户端,通过注册回调函数来处理消息,

    function callback ($msg) {var_dump($msg);}$keyPath = "./testkey.key?clientid=2";ESBclient_consumer_loop('callback', $keyPath, 123456);

这种使用方式存在一些问题,包括 
1)不方便开发调试,需要消息生产者和消费者同时参与。而且多数情况下,生产者和消费者在不同的业务部门,增大了调试成本。 
2)消费端依次处理拉取来的消息,性能不高。 
3)由于PHP语言本身的特性,对多线程支持不好,消费端不能开启多线程模式。为了加快处理,只能多开进程。但是由于wmb服务端限制,只能开64个消费端,假设处理每个消息需要100ms,QPS上限就是640左右。 
4)消费者客户端和消息处理代码耦合在一起,而且与业务集群要分开部署,浪费机器。

二、引入代理

计算机领域有句话:“计算机科学中的任何问题都可以通过增加一个中间层来解决”。为了解决上面的问题,引入了消费队列代理。代理通过消费客户端拉取消息,然后分发到真正处理消息的接口上,回调接口可以部署在本机,也可以在其他机器。设计图如下:

主进程启动后,解析配置文件,为每个主题初始化Channel、创建puller和dispatcher进程,注册信号处理程序,执行swoole_event_wait()进入事件循环。

Channel继承自swoole_channel。swoole_channel是一个类似于Go的Channel,可用于用户态的高性能内存队列。Channel在swoole_channel基础上,增加了导入、导出数据到本地文件,在程序退出且共享内存中还有未处理的消息时,导出到本地文件,在程序下次启动时处理。

puller进程是wmb的消费者客户端,从wmb server端拉取消息,通过回调函数,把消息保存到Channel中。puller会定时检查Channel中的消息数量,如果超过dispatcher的处理能力,则暂缓拉取消息。decoder提供消息解码,据当前主题的配置把protobuf或者json格式的消息解码成PHP数组。filter提供消息过滤功能,根据正则匹配,把不关注的消息提前过滤掉。

dispatcher进程从Channel中读取消息,通过http调用,把消息传递给real consumer处理。http客户端使用的是swoole_http_client,基于事件的异步客户端,有较高的性能。dispatcher根据配置文件控制http调用的并发数和失败时的重试次数、重试间隔。encoder在发送前把消息编码,目前只支持json。

三、稳定性和扩展性

消息队列作为基础服务,对可用性的要求高一些。由于下游依赖的服务多,一旦出现异常,会影响大面积服务。代理在设计时,对下面问题做了考虑,保障代理的可用性:

  1. 如何保证代理稳定,是否是单点,是否可扩展?
  2. 如何保证代理重启,消息不丢失?
  3. 如何保证回调接口挂了,消息不丢失?
  4. 如何解决高吞吐量的问题?

首先wmb服务支持多消费者客户端,同样部署多个代理时,服务端会均衡各代理的负载。当单个代理出现问题时,消息会由其他代理处理。对于回调接口,可以通过域名配置到集群上,保证了整个系统没有单点。代理和集群可以通过增加机器来实现水平扩容,具有可扩展性。

Channel在共享内存的基础上,增加了导入、导出消息到本地文件的功能,即当程序退出时,会主动读取共享内存的内容,写入文件;在代理下次启动时,首先检查本地文件是否有上一次未处理的消息,如果有则写入到共享内存,由dispatcher优先处理。

只有当回调接口明确返回成功时,才会认为该消息处理完成,否则会根据配置重试。为了降低重试带给回调接口的压力,下一次请求至少在1s后开始,重试指定次数,直到请求成功或者到达次数限制。当然也可以配置成不重试,这样请求失败后,当前消息直接抛弃,继续处理下一条。

代理本身并不处理消息,它的主要工作在于接收和转发。puller和dispatcher都是基于epoll的异步通信进程,理论上代理的吞吐量由回调接口可支持的并发数和处理时间决定。

四、性能和吞吐量

在8核CPU、16G内存的虚拟机上压测代理,回调接口部署在另一台机器上,回调耗时设置成100ms。接入一个主题的情况时,数据如下:

QPS CPU占用率(puller) CPU占用率(dispatcher) 内存使用(puller) 内存使用(dispatcher) 网络吞吐量(kB/s)
800 7.3% 15.8% 19m 147m 1970
1600 7.8% 31.7% 19M 149m 3948
2400 10.9% 50.8% 19M 152m 5900
3200 13.9% 66.8% 19M 155m 7851

代理处理一个主题时需要2个进程,占用2个CPU核心。在8核虚拟机上,可以对同一个主题接入4次,即在开启4个消费端的情况下,QPS可以达到12800,远大于直接使用消费端时的640。

同时,代理可以水平扩展。在代理扩展到16台虚拟机时,每台虚拟机开启4个消费端,达到64个消费端的上限,此时可以处理的QPS理论上为204800。

五、易于接入

通过代理也简化了开发过程。如果需要接入一个主题,只需要简单的实现一个回调接口,而调试http形式的回调接口要简单的多,可由接入方独立完成。需要注意的是,回调接口要保证幂等性。

部署上更加方便,只需要在配置文件上加入新的配置项,包括新接入主题的id、客户端id、wmb key、回调接口等,重新启动即可。

运维上需要注意不要暴力操作。代理在退出时需要将还没处理的消息保存到本地文件中。

PHP的WMB队列消费代理的实现相关推荐

  1. 实现java内存队列消费事件-ConcurrentLinkedQueue

    import com.google.common.collect.Queues;public class Pusher implements Runnable {private Queue<St ...

  2. Python Redis Stream 消息队列 消费组

    项目有用到消息队列来消费不断新增的任务,本来看到Redis有Pub Sub就没准备用kafka了,后来看了下Redis 5.0新加的Stream,感觉刚好符合项目要求,看下文档就直接用上了,类似一个简 ...

  3. 【消息队列】kafka是如何保证消息不被重复消费的

    一.kafka自带的消费机制 kafka有个offset的概念,当每个消息被写进去后,都有一个offset,代表他的序号,然后consumer消费该数据之后,隔一段时间,会把自己消费过的消息的offs ...

  4. java如何保证mq一定被消费_消费端如何保证消息队列MQ的有序消费

    消息无序产生的原因 消息队列,既然是队列就能保证消息在进入队列,以及出队列的时候保证消息的有序性,显然这是在消息的生产端(Producer),但是往往在生产环境中有多个消息的消费端(Consumer) ...

  5. 通过 Lotus Domino Java 代理消费 Web 服务

    Web 服务是一种允许两台或更多的计算机在网络中交互的系统设计.这种服务的主要优点是,它是在多台不同操作系统的计算机和应用服务器之间发送对象的标准解决方法.例如,我们的公司使用 Web 服务从一台运行 ...

  6. 消息队列面试 - 如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?

    消息队列面试 - 如何保证消息不被重复消费? 面试题 如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性? 面试官心理分析 其实这是很常见的一个问题,这俩问题基本可以连起来问.既然是消费消息, ...

  7. 消息中间件--RabbitMQ ---高级特性之消费端ACK与重回队列

    什么是消费端的ACK和重回队列? 消费端的手工ACK和NACK 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿 如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障 ...

  8. rocketmq 重复消费_消息队列 RocketMQ

    引言 本文整理了RocketMQ的相关知识,方便以后查阅. 功能介绍 简单来说,消息队列就是基础数据结构课程里"先进先出"的一种数据结构,但是如果要消除单点故障,保证消息传输的可靠 ...

  9. 字节跳动面试官这样问消息队列:高可用、不重复消费、可靠传输、顺序消费、消息堆积,我整理了下

    写在前面 又到了年底跳槽高峰季,很多小伙伴出去面试时,不少面试官都会问到消息队列的问题,不少小伙伴回答的不是很完美,有些小伙伴是心里知道答案,嘴上却没有很好的表达出来,究其根本原因,还是对相关的知识点 ...

  10. 重启服务后Redisson队列一直阻塞 不消费过期数据

    目录 前言 解决方案 关闭程序时的异常 参考 前言 用 Redisson + Redis做了个延迟队列,但是我重启之后居然不消费到期的数据了,非要我再往队列新增一条才开始消费.blockingDequ ...

最新文章

  1. 用python画关系网络图-python networkx 包绘制复杂网络关系图
  2. FP、FN、TP、TN、精确率(Precision)、召回率(Recall)、准确率(Accuracy)评价指标详述
  3. AMD Cubemapgen for physically based rendering
  4. 手机modem开发(3)---Android Modem log分析
  5. php正则表达式叹号,js库前加一个感叹号(!)是什么意思??
  6. node.js(二)创建服务器
  7. libjpeg php,libjpeg62_turbo
  8. 专利学习笔记5:CPC客户端的安装方法
  9. 【第102期】游戏策划:在校生求职简历怎么写?
  10. 弱电工程施工规划实施
  11. 电脑重启只剩下c盘怎么办_我的电脑正在正常运行突然自动重启了,重启后发现系统只剩下C盘了,请教怎么恢复?...
  12. java ai寻路_AI自动寻路
  13. ps 简单的抠图操作
  14. 三极管自激振荡升压电路笔记
  15. 美团/饿了么外卖CPS联盟返利公众号小程序核心源码代码
  16. Java中Lambda表达式使用及详解
  17. 为Dynamics 365 USD设置打开调试面板的自定义快捷键
  18. SVN使用中的一个问题
  19. Ubuntu软件装机问题
  20. 面试官:说说二维码扫码登录是什么原理吗?

热门文章

  1. HTML,js,jQuery的1+S证书学习资料
  2. 魔兽世界单机(芒果3.3.5a)机器人操作命令大全
  3. Japanese Student Championship 2019 Qualification
  4. 国际足联还不考虑在中国办一届世界杯吗?
  5. 市场需求文档MRD书写范例
  6. 对标苹果开“旧机发布会”?罗永浩出任转转品牌推广大使
  7. k8s-v1.2.3部署mysql-8服务
  8. 数电设计--交通灯控制系统
  9. 如何在Tungsten Fabric上整合裸金属服务器(附配置验证过程)
  10. 实时时钟芯片DS1302