项目纪实–大型数据平台系统构建

背景:18年入职这家轻松的国企,在19年难得接(抢)到一个有意思的项目,开始定义还比较简单:写一个CMS用于近期某XX项目中发布数据,开始是找到别人被别婉拒后我主动给接了过来。本身没什么难点,且背景我在前期早已了解过一些,所以答应接过来。原来各项目/产品中总是需要用到些采集处理好的各类数据,这些数据存在于各数据库各表中(多来源多库多结构表),之前的解决方式是各个项目/产品需要数据就问我们要,如果DB里面已有我们就直接把数据位置(哪个库哪个表)和连接配置告诉他们让他们自己去拿,库里面没有的话就写生产流程让它有再给。这样做的坏处相信写过些项目的程序员应该都能知道,来个数据类需求就给一份数据库连接配置出去随他们怎么用。再加上还有个采集监控看板直接每天通过高频查库统计结果数据(很显然这里有两种可替代更优方案:1.从源数据生产过程日志统计.2.从入库管道统计)
更多技术老旧,历史遗留等各种原因积弊就不多说了,本文目的也不在此。总之,反应到具体使用体验上面是各种死锁和慢查询一堆,反应慢体验差…
显然这个任务如果还是如之前各项目那般掏数据库也能解决需求,但前面提到的原来各项目中出现的问题也都会出现。我可不愿重蹈覆辙。
那么是时候做出改变了,虽然还不明确后面将要实现的是什么(反正不是一开始说的CMS)以及如何实现,但面对各项目/产品中数据消费类需求场景,从根本上统一并彻底解决的方向及目标基本是想象出来了:一个系统解决所有数据类需求,高性能可扩展高灵活配置热更新易维护等。

***
(1) TRANSACTION:
TRANSACTION 268814508, ACTIVE 0 sec inserting
mysql tables in use 1, locked 1
LOCK WAIT 4 lock struct(s), heap size 1136, 4 row lock(s), undo log entries 1
MySQL thread id 2615055, OS thread handle 139826487912192, query id 170694941 127.0.0.1 xwcj update
insert into web_article (name,category,leadtitle,title,subtitle,abstract,author,editor,image_list,image_desc,content,htmlcontent,keywords,url,referer,publish_date,fetch_date) values ('XX网','图片',NULL,'“离婚冷静期”的前后两个30天,你注意到了吗?',NULL,'XX新闻是国内主流新闻网站中工网旗下综合新闻资讯门户,也是XX网核心主频道,每天24小时为广大网民滚动报道国内、国际及社会新闻。','XX新闻','高冲','2020-05/20200530/XX网/54ce9478f38b08d0ba5cc15682fe542169a435c5.jpg',NULL,'5月30日,《中华人民共和国民法典》颁布后的首个周六,北京市朝阳区民政局门前人头攒动,前来办理婚姻登记的人在办事大厅外排起了长队。据工作人员透露,5月30日当天有300多对前来办理婚姻登记的个人,“结婚的多,离婚的也不少,排在前面的几对全是办离
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 516 page no 691642 n bits 288 index FTS_DOC_ID_INDEX of table `xwcj`.`web_article` trx id 268814508 lock_mode X insert intention waiting
*** (2) TRANSACTION:
TRANSACTION 268814497, ACTIVE 0 sec inserting
mysql tables in use 1, locked 1
6 lock struct(s), heap size 1136, 6 row lock(s), undo log entries 1
MySQL thread id 2615252, OS thread handle 139826764535552, query id 170694920 127.0.0.1 xwcj update
insert into web_article (name,category,leadtitle,title,subtitle,abstract,author,editor,image_list,image_desc,content,htmlcontent,keywords,url,referer,publish_date,fetch_date) values ('xx网','国内新闻',NULL,'中国颁发第二届全国创新争先奖  10团队和286个人获奖',NULL,'全国创新争先奖由中国科协、人力资源和社会保障部、科技部、国务院国资委于2017年共同设立,评选颁奖周期为3年。今年的第二届评奖还特别设立“疫情防控”和“脱贫攻坚”专题,表彰为疫情防控和脱贫攻坚作出重要贡献的科技工作者。','cnews','翟璐','2020-05/20200530/xx网/a58e814ba0a3cc019a41967cffed140b408d27a3.jpg,2020-05/20200530/xx网/3457e1bfaa7d68e493c78d74aede2e721c7d797b.jpg,2020-05/20200530/xx网/a46598144454a84002577a28a76f1a00b938481a.jpg','5月30日,第二届全国创新争先奖表彰奖励大会在北京
*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 516 page no 691642 n bits 288 index FTS_DOC_ID_INDEX of table `xwcj`.`web_article` trx id 268814497 lock_mode X
*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 516 page no 691642 n bits 288 index FTS_DOC_ID_INDEX of table `xwcj`.`web_article` trx id 268814497 lock_mode X insert intention waiting
*** WE ROLL BACK TRANSACTION (1)
-- *一些慢查询的情况*

因为在接手前已经了解过足够多细节和问题,所以能猜出它大致想要做出个什么东西(内容管理/数据分发系统/平台)及主要目的(更高效可靠的分发最好能兼容多差异项目且灵活配置热更新且解救数据库于水深火热)最差能接受的结果(简单加个redis缓存层也比原来掏数据库强多了),最理想能达成的实现效果以及我在哪些方面能做得更出彩些以实现项目最终实际效能的可能更大突破。显然如果仅仅是写一个CMS满足此XX项目需求也是可以完成任务,但后面如果又来了一个B项目C项目有其它的数据类需求呢?而且遇到项目中的需求变更呢?虽相比于直接掏数据库,CMS对项目需求可能解决的稍高效方便些,但依然是每来一个项目需求就又要重新改代码改配置部署测试等重复一遍,与之前直接写SQL掏数据库的开发流程并没有提高太多,只是项目内的优化而不是全局性质的优化并没有持久的长远提高整体效率。那么如果是一个高效可靠具有成熟的分发平台呢?这个理想中的数据平台后面可以统一支撑解决所有项目/产品中这些数据类的需求? 所有项目/产品都作为平台租户将租户注册频道订阅退订等生命周期管理及数据消费及统计等核心功能统一托管在平台之上呢?
以上为背景交代,后面开始讲开发过程及其中一些思路

开发环节

需求分析

数据如何消费即客户端拿到数据后的处理转换使用等过程这里并不会提及。只讲作为平台提供服务涉及相关。首先,我们的基础数据大致有4~5个大类,个大类下面又有少则几百多则几千级别的小类。而各个项目/产品对这些各类数据的需求是差异非常大的,有的项目(客户)对各大类里面个小类都有数据需求,还有的项目只需要某一大类里面部分小类…总之千人千面定制化很多还时常有使用过程中的后续变更,具体就不一一举例了。
对于上面这种场景,很容易就能抽象出一种用户(租户)对频道的多对多订阅消费关系。每个有数据需求的项目/产品都可以对应为一个租户,对应的小类则可称为频道。同属一个大类的频道则可称为在同一频道组内,对应的频道组也可用于批量订阅。类似的场景可联想到很多,比如通信里面的单播(点对点),组播,广播;ROS里面的Publisher/Subscriber(依赖的是MQ)…所以具体实现我们也有很多现成的工具可供选择,比如:redis里面的pubsub,kafka里面的Topic,各种MQ(rabbitMq,zeroMq…)里面的topic/pub-sub…为了方便和直观,后面所有对应的对象和关系都只用频道/用户/订阅来替代说明。

原来的对接方式,有数据需求的用户那边直接写SQL取数据,对于某些高产频道每天数据生产增量更新的频率比较高数据量也比较多,统计排查慢查询时有一种情况是这样:对应订阅很多这种高产频道的用户每天多频次用SQL取对应频道全量数据越取越多越来越慢…(为什么不是通过表内id增量SQL取,因为频道数据不仅是插入新数据还有很多已有数据的更新)。用户那边也不是写完SQL测过就完事了,后面用需求还可能会变更(往往订阅更多的频道或者更新),而我们的频道也会变的,有些频道因为数据来源的原因相当于没数据(失效删除),还有的是更名,还有迁移到其它频道组的可能,总之频道数据提供方和用户消费端都会有各种变化。而原来的开发方式决定了:1.显式的工作量,无论是提供方数据结构有变化还是用户消费端需求变化往往就面临两端都要进行一系列变更测试维护等,如果是提供方的变化还会导致变化涉及到的所有用户消费端的更改。2.即便当时测试通过,还有变更埋下的可能潜在隐患,另外还有一些冷门项目后期无人/少人维护,提供数据变更后可能都没人注意到…
各用户对数据需求的记录及管理(变更),取数据的方式以及过程中一致性效率可靠性的问题,本来就不应该是用户端的问题,这些由服务端就可以处理好,也本来就该是由服务端处理的。意识到这些后,需求基本就确定了:要用什么样的API提供哪些服务,这些服务分别需要做到什么程度(哪些是最主要的要高可用,哪些非关键),关键api(核心功能)中要处理好其中涉及的哪些问题(一致性,高效率),后台需要记录和维护哪些数据,后台可能扩展哪些功能接口以提升平台效能。
关于最终实现效果,不妨先有这么些比较理想化的想象(后期基本都实现了):
1.用户总是以增量的方式取到最新的数据。
2.用户取数据的方式简单可靠,对于最大流量最高频使用接口最高效的理想情况是登录后仅通过Get方法请求固定接口不用带任何其它参数即请求得到所需要的数据。(所有频道数据更新和所有用户消费数据等事务的状态全部由服务端记录维护。其中经常用到信号这一机制来解耦实现:即频道更新和用户事务(消费)异常都是利用信号通知)
3.多用户取多频道中大量数据是高效且可靠的(服务要高可用高并发,同时要可应对用户取数据过程中网络中断客户端处理异常服务端发送异常等导致的一致性等问题…)。
4.用户每次需求变更不需要在客户端做任何代码修改及变更维护等(所有用户对数据频道的订阅关系由服务端统一管理,并实时更新影响到到分发接口业务逻辑中)。
5.用户在客户端的消费数据频率及其他设置(几种输出格式及储存层可配置选择)是由客户端自己决定,所以各用户之间消费逻辑完全隔离互不可见,服务端各频道数据更新频率及可能的各频道之间的具体差异化由服务端信号控制,单独解耦出信号接口,更灵活自由。
6.频道内数据来源由持久存储数据库主动同步至高效内存数据库(redis),每次同步(更新)数据的同时给对应信号接口发一个信号。信号内容及控制粒度暂定频道级别(通知哪些频道有更新)。
7.6中提到的信号驱动机制仅在生产端(前半部分流程),事实上在此数据模型中使用Event-Driven-Programming(基于事件信号signal)来实现全流程的主动推送也是可以的。后面会讲到项目的实际实现是在频道数据同步更新(持久数据库主动同步到Redis)这一步用了Event-Driven-Programming的信号来触发频道激活。那么由频道激活触发主动推送至用户端显然也是顺理成章不难实现。具体为什么没有EDP一直到底实现实时主动推送后面会讲,这里只是提供一种思路讨论,也从一个方向说明数据模型的灵活兼容有效性。
8.以上提到所有功能细节对客户端无成本,客户端从此不需要写SQL仅需用账号无脑调API就就获得比以前更优质(永无慢查询)的服务。

从零到一,后台框架雏形

前面说过,几乎所有业务直接击中mysql它会扛不住导致的问题太多,所以用个缓存Redis业务层不直接从mysql表里面取数据分发是很自然的选择。虽然初始数据还是在Mysql里面,后来也进行了热点/非热点分表(近3天,历史,当时也初略统计下,近3天的新数据不到10G)。这样一来高频率大流量的最主要的业务读逻辑就全分到Redis中了。那么还有用户和频道信息以及他们之间的多对多订阅关系之类的这些元数据,这类数据的特征是:体量小(主要就几张表,频道表,用户表,外键关系表等,最大也就频道表几千条记录)读多写少更新也少但安全性反而要求更高的的非热点数据,就目前体量以及读写更新的性质显然关系型Mysql就足以适用,支撑的主要也就是用户和频道的CURD和订阅管理信号等,这部分逻辑后端实际上仅用ORM全部解决,甚至不需要写一句SQL。而实际生产使用中对应的业务流程在前端SPA的支撑下仅需要在浏览器上勾勾选选点点订阅退订之类的就足以轻松应对(几乎所有新需求建设:新建用户订阅频道及后期变更维护等),想想之前的灾难现场…(这里的关键思想是:把元数据单独隔离出来类似于配置化的记录维护,实时灵活便捷的热更新又无碍于生产数据的实时分发更新等大流量性能瓶颈业务。)

分发方案及其中一致性如何保证----几种设计方案及它们各自优劣

方案一:Kafka Topic

Kafka是很成熟可靠的分布式流式日志处理组件,在很多场景中都有很成功应用实践。对于多对多的订阅分发显然也是一个很适合的场景…
简单概括下优缺点:

优点

1.高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。(高效的存取基于多partition实例,零拷贝,mmap直接内存映射,本地磁盘顺序批量操作等技术)
2.可扩展性:kafka集群支持热扩展。
3.持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。(从mmap内存映射用类buffer机制segment存满1G持久化到磁盘一次)
4.容错性:允许集群中节点失败(基于replica副本,若副本数量为n,则允许n-1个节点失败)。对于消息个体有ack机制可保证不丢失,但不保证不重复消费。
5.高并发:支持数千个客户端同时读写(多partition)。
6.可配置的多等级一致性ack机制(多等级0,1,-1,权衡点:一致性与效率天然负相关)。

缺点

1.空间占用相对大,相对使用redis而言,一个Topic多个partition占用的空间(多consumer场景为了增加吞吐量是通过加partition的数量来横向扩展,空间换时间)。
2.环境厚重,依赖繁多,对jvm的依赖,对zookeeper的依赖(其实不那么依赖,非必要)…无疑增加了复杂度降低了可用性维护性…
3.kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的本身特性.且无论任何OS下,对文件系统本身的优化是非常艰难的,虽然使用了buffer机制来减少IO提高性能,但总体还是比不上直接使用内存的Redis。
4.不是传统典型的MQ,不支持事务(没有TTL,DL等)、可通过ack保证不丢失,但其他如重试、不重复、顺序等不是很好支持。

Kakfa Broker Leader的选举机制:Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。原有的controller挂了则会重新选举注册新的conctroller。

ack确认机制三个等级,0(at most once):生产者最多仅发送一次,不管成功失败;1(at least once):仅需确认到一个分区副本成功写入;-1(all ):需所有节点确认成功接收。

----具体的Kafka底层原理剖析,常见应用场景及高性能高可用方案等这里也不进一步多说了,涉及另一个话题,有空的话再单开一篇。

不是说Kafka这个组件本身不够好所以不符合项目要求,而是项目目前本身业务模式与它不太匹配,要求还不够必须使用它的程度且使用它会带来新的问题也需要花成本处理好,仅是由于订阅分发的主场景。假如引入使用kafka的话虽然能用Topic解决订阅分发但也会引入一系列新的问题,比如:可用性降低,整体架构的复杂度提高,一致性问题,可能的消息丢失(生产者丢失,消息队列丢失,消费者丢失)。订阅关系管理及持久化的问题及可能的失误及其导致的消息消费问题(记得前面讲的场景中频道及订阅消费关系时常变更吗?后来的生产架构中没有用Kafka且用一个灵活独立的元数据管理方案将这个问题处理的很好)。
虽然对应这些常见问题它们也都有对应方案来解决,比如:持久化机制和双端(ack)确认机制解决消息丢失,多点集群满足高可用,使用zk解决一致性等。但这么一来就逐渐偏离了软件架构的本质。
我们常常为了解决一个问题而引入一个组件,然后引入组建的使用又带来了更多新的问题,然后为了解决这些新问题又不得不引入更多的组件,最后项目越来臃肿至深陷泥潭无法自拔…具体到工作层面上常表现为:工作量增多,工期延后,推进困难等。而且由于国内的开发团队氛围及管理特性,架构师往往身居高位带有一定领导者性质还不用写代码,所以最后问题反而归咎在实际写代码的实现者身上,诸如:能力不足(kpi),沟通理解不到位等。最后解决方式也往往只能是单纯的堆时间堆人(还不一定能做好)。这其实是特别讽刺的一种事情,但却经常或者说一直在发生:在架构设计环节犯下的错误埋下的坑,却要后面的实现者执行者来承担,而且由于对问题定位不清晰导致难以解决问题本身。
其实很多软件架构者(其中甚至不乏一些高级架构师或者大公司里的高等级牛Title的人)都会犯这类错误。关于如何避免这类错误,很多软件工程学领域的书籍(比如《人月神话》,《重构…》)都对这些问题进行过深入讨论并有些实践后积累的真知灼见非常值得学习。这里仅尝试总结出几点建议供参考:1.搞清楚你真正面对的问题并解决问题本身。2.项目的架构设计者应该参与到其设计架构方案的具体实现工作中以切身了解到可能的不合理和待改进之处并直面问题及时反馈改进。3.奥卡姆剃刀。
扯了这么一堆,回到项目中来。我们面对的问题究竟是什么?:对已存在于缓存中固定格式的的数据进行满足一致性的订阅分发。那么首先消息组件或者MQ不等于一致性的分发,反过来说对于分发和一致性来说MQ也并非是必要的。虽然引入它也能解决,但会带来更多诸如开发成本、使用中危险情况审查及处理、资源消耗、维护性、可用性等待解决问题,这些都是高成本且值得考量,且更重要的是这样的做法本身违反了软件架构设计的本质:控制混乱,简化复杂,监督生长。
这么一权衡过后,首先被想到(Kafka Topic)方案也最先被否决了。

软件架构的本质:
控制混乱:使用结构和秩序控制混乱。
简化复杂:使用分解、分离、分类简化复杂。
监督生长:监督软件的生长方式,防止畸形发展;

方案二:Redis pubsub

Redis是很流行的缓存/NoSQL数据库解决方案,但可能很多人不知道的是,它其实还有很多其它功能模块,其中就有pubsub订阅分发模块,实际上它也是高效且可靠的,尤其是在sentinel cluster方案加持下,足以应对大多数大流量高并发的场景且同时保证一致性高可用。(Redis集群方案中的强一致性需要客户端参与处理实现)
且使用起来也较为简便,尤其是对于本项目实际场景已有限定条件:需要进行分发消费生产数据已经在Redis缓存中以固定格式提供。
关于Redis的原理剖析,优缺点,应用场景及高性能高可用方案等这里也不细讲了,有空单开一篇。

对于消费者客户端:

subscribe, psubscribe 订阅通道channel
消费者客户端通过SUBSCRIBE命令执行订阅通道(channel),客户端可以多次执行该命令, 也可以一次订阅多个通道,多个客户端可以订阅相同的通道。PSUBSCRIBE与它基本相同,区别是支持glob模式匹配。需要注意的是假如SUBSCRIBE与PSUBSCRIBE订阅的channel有交集,重复的channel中消息会重复接收。
unsubscribe, punsubscribe 相反的退订,使用方法类似
UNSUBSCRIBE命令取消订阅指定的通道.可以指定一个或者多个取消的订阅通道名称,也可以不带任何参数,此时将取消所有的订阅的通道(不包括glob通道).该命令的响应包括三部分, 依次是:命令名称(字符串unsubscribe),取消的订阅通道名称,总共订阅的通道数(包含glob通道)
PUNSUBSCRIBE命令取消订阅指定的glob模式通道.可以指定一个或者多个取消的glob模式的订阅通道名称,也可以不带任何参数,此时将取消所有的glob模式订阅的通道(不包括非glob通道).该命令的响应包括三部分, 依次是:命令名称(字符串punsubscribe),取消的glob模式的订阅通道名称,总共订阅的通道数(包含非glob通道)。

消费者模拟仿真

如果项目采用此方案应用,那么客户端情况大概会如下:
用户客户端每次开启在消费数据之前初始化时都需要执行一遍订阅所有频道的逻辑,然后才能接收数据进行消费,过程中有任何异常退出或网络问题等都需要将redis client初始化再订阅频道等逻辑从头开始再执行一遍。且有一种情况就是对于一个静默一段时间的redis客户端连接,redis服务端可能觉得它没用而断开又无人知情也不会报错,反映到消费者客户端就是可能一直被动的接受消息然后一段时间后却没消息了也没有任何异常报错(redis连接已经无效了),对此种情况一般是用一个心跳检测+重连的机制来应对。可能的代码逻辑:
首先是初始化,建立连接订阅频道之类的逻辑,这里先粗略写一下python客户端与redis sentinel集群建立连接的方式(__init__方法中),其它两种Redis类(或StrictRedis类),ConnectionPool连接池(可保持长连接)连接方式就省略。

from redis.sentinel import Sentinel
class consumer():def __init__(self):# Sentinel(config.RedisConfig.HOST_PORT, password=config.RedisConfig.PASSWORD)self._sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)# self._sentinel.master_for(config.RedisConfig.MASTER)self.conn = self._sentinel.master_for('mymaster', socket_timeout=0.1)def sub(self, channels):ps = self.conn.pubsub()ps.subscribe('my-first-channel', 'my-second-channel', ...)  # 订阅一个或多个频道# ps.psubscribe('my-*', ...)  # glob模式订阅ps.subscribe(**{'my-channel': my_handler})  # 订阅频道channel且同时注册其回调函数为my_handler,推荐使用方式def consume(self):# Get message and hander it, Not recommended.# get_message() uses the system’s ‘select’ module to quickly poll the connection’s socket.while True:item = ps.get_message()print(item) # {'channel': b'my-first-channel', 'data': b'some data', 'pattern': None, 'type': 'message'}def my_handler(message):print('MY HANDLER: ', message['data'])

而订阅的逻辑则如sub函数中所示,可以一次订阅订阅多个频道也可以只订阅一个还有可选的psubscribe glob匹配模式订阅。退订unsubscribe(或者glob模式退订 p.punsubscribe('my-*'))的使用也类似,需要注意的是p.unsubscribe()不带参数的话是一次退订所有频道。
Unsubscribing works just like subscribing. If no arguments are passed to [p]unsubscribe, all channels or patterns will be unsubscribed from.
– https://pypi.org/project/redis/
至于消费的逻辑一般是通过get_message()函数,consume中的写法其实很不OOP,也不够优雅容错。更好的写法是构造一个消息处理函数handler,然后在订阅频道是将其注册为回调函数。如sub函数中的ps.subscribe(**{'my-channel': my_handler}),需要注意的是已经注册了回调函数的频道(channel)中消息再通过get_message函数接受会返回为空,即便你在handler中return something,because the message was handled by our handler.

对于生产者客户端:

PUBLISH命令在指定的通道上发布消息.只能在一个通道上发布消息,不能在多个通道上同时发布消息.该命令的响应包括通知的接收者个数,需要注意的是,这里的接收者数目大于等于订阅该通道的客户端数目(因为一个客户端的glob通道和非glob通道同时匹配发布通道的话,则视为两个接收者).而在接收端,收到的响应包括三部分,依次是 :message或者pmessage字符串(取决于是否为glob匹配),匹配的通道名称,发布的消息内容.
PUBSUB命令执行状态查询.支持若干子命令.需要注意的是,该命令不能在客户端进入订阅后执行。如:pubsub channels查看所有活动通道

生产者模拟仿真

对应生产者服务端的逻辑就简单些,主要就是往各对应频道里面发布数据,简要模拟单点redis情况代码如下:

r = redis.Redis(host="localhost", port=6379,password="123456")
# if redis cluster, use redis.sentinel.Sentinel
r.publish('my-first-channel', 'some data')
r.publish('my-second-channel', 'more data')

客户端将会接收到的消息大致如下(type, channel, data):

1) "message"
2) "my-first-ch
3) "some data"

用pubsub模式来构建的话,逻辑其实都挺简单的,这就是它最大的优点。而缺点也不仅是前文中所说的客户端经常重复建立redis连接并执行订阅一系列逻辑。

其中以下几点需注意,也可以视作缺点

1.客户端执行订阅以后,除了可以继续订阅(SUBSCRIBE或者PSUBSCRIBE),取消订阅(UNSUBSCRIBE或者PUNSUBSCRIBE), PING命令和结束连接(QUIT)外, 不能执行其他操作,客户端将阻塞直到订阅通道上发布消息的到来。(客户端操作不够灵活,比如我接受消息过程中需要订阅一个新的频道…)
2.发布的消息在Redis系统中不存储.因此,必须先执行订阅,再等待消息发布. 但是,相反的顺序则不支持。(假如某频道发布消息时,某客户端恰好正在重新连接不就消息丢失了?…此种模式对所有消费者客户端全部实时在线的要求非常高)
3.订阅的通道名称支持glob模式匹配.如果客户端同时订阅了glob模式的通道和非glob模式的通道,并且名称存在交集,则对于一个发布的消息,该执行订阅的客户端接收到两个消息。(重复消费的问题)
4.前面提到过的,客户端redis连接每次建立需重新执行一遍所有的订阅逻辑。

模拟仿真:可能面临的弊端/问题及其解决
1.连接失效,前面提到过的用户消费者所持有的redis连接可能在一段时间后被redis服务端丢弃而悄无声息的失效了,相当于实时连接中断导致消费中断。那么很自然的就有这么一个心跳检测重连的机制来应对它。具体模拟代码如下:

 def _ping(self):while True:time.sleep(60) # 60 秒if not self.conn.ping():  # 连接失效print("Connection failed!")# 重新建立连接并返回给 self.conndel self._sentinelself._sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)self.conn = self._sentinel.master_for('mymaster', socket_timeout=0.1)

然后另起一个线程专门负责这个心跳检测重连函数,或者也可以在类初始化__init__()时处理这个逻辑:

def keep_alive(self):"""保持客户端长连接"""ka_thread = threading.Thread(target=self._ping)ka_thread.start()

2.状态同步,用户消费数据可能失败,失败的可能原因很多,比如用户客户端异常中止退出,比如消费函数内部阻塞或偶发性错误,比如消费逻辑中所依赖组件偶发性异常导致消息成功接收却实际在消费过程中丢失,不一而足。在软件开发过程中,我们不能假设每一次运算或操作都能按照理想情况被计算机准确无误地执行,那是新手或学生才会做的假设。所以更常见且可靠的是,我们会在一开始就尽可能的考虑到可能在哪些环节会发生的异常或错误,并准备好应对方案以对其进行妥善处理。 —哪本书上应该说过类似的话。
好了,现在我们知道这个需要解决的问题是什么了:某个用户在某一次数据消费过程中可能出现的异常。很常规解决就是将用户每一次的消费状态同步至服务端以使消费异常时下一次重新发送对应的数据再次消费。那具体是如何同步呢(问题1)?是每次消费后都要再多发一个请求告知服务端此次消费状态吗(问题2)?服务端面对这么多用户的多次状态又是如何记录维护和影响下一次数据发布的具体内容的呢(问题3)?(问题1,2:)首先我们假设一个用户订阅了channel0,channel1,channel2这三个频道,在今天的某一次消费事务中出现异常导致消费失败。为方便记录我们可以将每个用户的每一次消费事务对应一个ID,每个用户事务必要关键内容和ID都在服务端作保存,而为了节约资源提高效率客户端默认是消费成功不用返回通知的,仅在出现异常事务时才将事务ID告知服务端,而服务端发过来的事务在用户端也可以通过判断事务ID避免重复消费(参考Mysql中的GTID机制)。(问题3:)显然如果服务端对于任何用户可能的某个事物异常是知情的,仅每个用户在服务端类似于维护一个状态池,每次收到消费反馈通知即更新对应用户的状态池。如果前一次异常仅需要(检查状态池后)在下次事务中将之前失败的事务内容再添加进去一起发送给用户就完了。redis pubsub是一个实时连续的发布模型,并不是象mysql那样有明显的事务机制。思来想去,要记录维护这么一个用户状态池,我们需要人为附加很多工作来达成,具体比如:对于每个用户同时在服务端维护一个用户副本复制所有的频道订阅关系且对于每天所有应该的消费数据进行事务ID处理后记录,具体的事物粒度根据实际权衡抉择,每个频道中的每个消息都对应一个事务ID是一种方案,但在某些特大数据量的场景中可能不太友好,有种折衷的做法是固定时间段或者固定流量(消息数量)一个事务(ID)。同时在发布的每一个消息中内置一个所属事务ID,并在异常反馈时带上。一种理想可能是这样,一个频道内的每个消息(item)都有一个自增ID,服务端往频道中每发布一条消息数据时原来频道内最大消息ID自增一。在发布数据内容本地已存在的情况下所有已发送消息都要在服务端另外有一份包含ID的副本备份(redis的pubsub是没有本地存储的)。用户客户端对于所有消费数据都要有一个本地的ID记录以便在异常发生过后重新消费。

听上去好像很有道理逻辑自洽,在某些场景下确实也不失为一种可选方案。可指摘改进的可能是所有数据内容都在服务端保存两份的蠢做法(一份原始数据,一份含消息ID)。实际上也是可解决的:按频道分桶仅存一份含递增ID的数据,每递增存(确认机制保证存入)的同时异步pubsub进行发布。
这是个可选方案(甚至是个好方案),即使是在采用了redis sentinel cluster高可用方案的基础上(单点pubsub进行分发,多点持久化储存),没有使用它主要原因总结为以下几点:1.不必要维护的长连接。2.实时分发中的峰值处理。3.订阅关系由客户端自己维护每次退出需重新初始化。4.还有一些场景条件限制的原因。(数据源及格式等。)
具体:用户端不管是初始化还是redis client连接失效时总是要重复建立连接订阅频道这一系列逻辑过程。客户端异常或不在线时的消息丢失需解决,重复消费需解决。还有我们目前并没有足够集群资源部署redis sentinel cluster,也没有必要。除非是类似于即时通讯IM场景下需要用过横向扩展cluster来保证所有用户消费者都即时接收消息。因为各频道内数据生产是不固定的,这就意味着有峰值现象(虽然削峰处理也不难但又引入成本):可能某一时间内多频道内突然生产极大量数据进行分发,对于单机或者仅是主从双机的redis是否能扛得住就有个资源瓶颈方面的问题(即便机器资源通过扩展解决了还有网络带宽呢)。
但如果我们牺牲一部分不必要的实时性要求的话,就能仅通过简单的双机方案解决问题且避开峰值状况下的资源瓶颈。非高实时性要求的场景下维持多用户的长连接且解决流量峰值等问题是费力不讨好的事情。这也是我放弃pubsub这个方案的原因之一。

补充:

Parsers
Parser classes provide a way to control how responses from the Redis server are parsed. redis-py ships with two parser classes, the PythonParser and the HiredisParser. By default, redis-py will attempt to use the HiredisParser if you have the hiredis module installed and will fallback to the PythonParser otherwise.

Hiredis is a C library maintained by the core Redis team. Pieter Noordhuis was kind enough to create Python bindings. Using Hiredis can provide up to a 10x speed improvement in parsing responses from the Redis server. The performance increase is most noticeable when retrieving many pieces of data, such as from LRANGE or SMEMBERS operations.

Hiredis is available on PyPI, and can be installed via pip just like redis-py.

$ pip install hiredis
hiredis是一个更高效的解析器,如果安装了它redis-py默认会使用它(否则会使用PythonParser)。它比PythonParser快十等级倍。
使redis client从redis中接收解析消息更快

方案三:Redis 自定义开发

如同经济学里面的蒙代尔三角(货币政策独立性,固定汇率和完全的资本自由流动不可同时满足),世界上往往没有一种万全的方案能解决某个场景下的所有问题。在计算机软件世界里的分布式系统场景下,对应的CAP不可同时满足也是如此。一致性(consistency)往往是与可用性(Availability)和分区容错性(Partition tolerance)是矛盾的,且即便是在抉择放弃了某一项之后(比如选择CA放弃P),其他项往往还要侵占一定资源才能达成(比如C还要牺牲一定效率来达成,更强的一致性往往意味着更低的效率,可以在很多经典案例中发现这种现象,如kafka中的多等级ACK机制)。
具体到此项目实际中,每个用户客户端都有一个本地实际的消费状态,而在服务端相当于也记录了每个用户的这么一个消费状态。那么服务端记录维护的所有用户的消费状态必然是有可能出现与某个用户客户端实际状态不一致的情况。我们先假设是单点中心式的服务端,那么多用户客户端在数据消费过程中是需要同时利用一种机制来同步状态到服务端来达到最终一致性。

CAP原则又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。

种种原因,最后选择的是自定义方案。并不是说上面那两种方案不好(没有银弹,刚好适合能解决问题的方案就是好方案,如果又是低成本且高可用高灵活可扩展那就更好了),如果这个项目在数据源结构都没有受限定从头开发的的话,或者使用场景条件有一定改变,又或者流量及实时性要求再高一些,可能最终选择会是上面其它方案之一,方案二可能性更大(奥卡姆剃刀:如非必要,毋增实体。多引入一个组件就意味着增加复杂度和降低可用性及维护性。)

生活中没有很多如果,理想终究只存在于虚无缥缈。谈起技术,往往坐而论道滔滔不绝,言必说专精某项深入底层解决领域问题并以此为傲…延伸到具体开发中即常有的理想主义和拿着锤子找钉子…但须知现实如夜履沟壑,不如意十之八九且往往无处争辩。世界是多元(割裂)且动态均衡(永远局部失衡)的,沉迷于任务场景执着于真实事务逻辑的理想主义者的突然见到现实中不一样的的荒唐与讽刺可能会手足无措难以接受。职场中也是如此,当然有些好的环境能达成较好的逻辑真实与权责均衡以成事为目标,但现实更多环境中是割裂的:总有人负责决策有人负责执行有人负责加班有人负责背锅有人负责摘桃…执行者做事能被满足理想的任务所需条件是幸运的,业务能根据真实的背景条件按照应有的较优逻辑运行也是幸运的,成果被按照本来的实现者贡献者署名并分配利益是幸运的(世界是割裂的,功劳不唐捐其实是句有毒废话,真有合理的价值分配就不会有如此剧烈的贫富分化和经济危机了)。但往往还有在位者其它维度的利益角逐权责分配势力纷争等是底层执行者不能左右也不得不接受的。就如同软件架构,你必须考虑所有可能情况(尤其是其中的坏情况)。诸多讽刺又无奈的现实…典型的技术人开发者往往专注于业务逻辑(因为认为最直接助于成事),却往往忽视外部环境条件,这些是开发者不能左右而是主要由公司文化和管理氛围决定的。经历了足够之后,开发者就应当对这些情况都有足够的心理准备,也(希望)能一定程度上应对各种意义下坏情况的发生。

总之项目背景生产数据是以已经限定的HashMap格式缓存在Redis中的,key value均为生产意义内容(消息ID递增存作offset的机制也用不上了:更新时递增ID插入同时删除旧ID相同key数据)。缓存更新的逻辑是定时由MySQL表中搜索新数据向Redis中更新。(可扩展主要是在这里Redis,不管是集群高可用还是分区扩容)

那么先从一个最基础的逻辑一步步优化吧:
一个频道对应Redis里面一个hash,key为不重复的url。每个用户对应建立一个状态池,具体结构也先假设为HashMap结构:key为订阅的频道名,value为对应频道里已消费过的数据条目item keys(这里多个key用分隔符连接为字符串作为value存储,每次须在用户程序运行时中进行解析拆分后计算处理,可能没利用好redis中计算向数据转移的的高效特性)。用户每次事务消费就更新用户状态池,且这个更新状态池的逻辑是经过ACK机制来容错(想想MQ方案,仅需ACK解决客户端丢失,消息服务端丢失和MQ丢失从根本上避免了)。这样下次消费就可以根据状态池比对类似差集计算从而实现可靠的不重复消费。这里主要就是自实现ACK机制以及状态池的粒度控制。很自然的:粒度越高,成本越高,效率越低。
典型的一次消费过程伪代码如下:

def consume(self):# 获取当前用户状态池的状态status_pool = self.get_status()res = []for channel_name in self.channels:# 关键:根据用户状态从对应频道中取新数据data = self.get_channel_data(channel_name, status_pool[channel_name])res.append(data)return res

以上为用户主动消费的函数逻辑,可以看到关键为:用户每次是拿着状态去频道中取数据(消费)所以用户能不丢失不重复,且每次消费后需对应更新用户状态池,这个更新状态的逻辑可以由用户后面主动地请求触发,可以放进get_channel_data函数中默认执行,也可以默认附带在当前消费函数逻辑中,这里简单举例附带进消费逻辑中的话伪代码可能如下:

def consume(self):# 获取当前用户状态池的状态status_pool = self.get_status()res = []try:for channel_name in self.channels:# 关键:根据用户状态从对应频道中取新数据data = self.get_channel_data(channel_name, status_pool[channel_name])res.append(data)return resexcept Exception:passfinally:self.update_status(data)  # 更新用户状态

return之前finally会执行更新状态池。

ACK机制粒度:
从最严格最细粒度也是效率最低的,用户客户端每次消费后一定时间内需返回内容确认每个频道内部所有具体异常消息条目。或者上述内容在每次消费请求时post带上。超时无请求也默认消费失败。
到最宽粒度的用户每次请求数据默认消费成功,仅在失败时下次请求带上前次失败的频道ID(或更大粒度的请求ID:认为上次请求的所有内容都失败了)。
权衡选择处理:倾向于使用高效宽粒度的ACK确认机制。

以及最终,具体我是如何设计实现一个高效的强一致性分发数据模型

整体架构方案中亮点主要在于(高灵活的元数据管理及生产关系)数据模型以及和其中具体的自实现ack机制(来保证一致性)。
项目具体实现可能换个场景或改个条件就不适用,所以这些细节层面的东西相对于本文主要讨论的模型及架构方案来说没有太大意义,虽然其中涉及的编程技巧和性能优化等各方面知识点也还有点意思。这里简单概述下典型的一次消费的流程思路:客户端一次消费默认成功(不返回)并在服务端默认更新状态池,有消费异常才回复一个反馈请求并对应更新状态池。(还有一种异常情况客户端没有返回异常但超过时间也没有进行预期中的下一次正常请求,可能挂了。可理解为超时反馈,服务端也应有对应状态更新操作。)
服务端先将user key池更新(默认消费成功),如果后面收到异常信号(请求)再多更新一遍(基于刚才默认成功的更新上)。
这样就是高效(发出即及时处理更新key池)且可靠(乐观,不成功再根据信号多更新一次)。服务端除了ACK维护状态池,消费接口有条目数量限制以实现自动削峰。

总结与扩展

好的模型应该是足够简单灵活的,它提供了抽象对象间必要的关联,但不应该对业务逻辑使用的方法以及方法的具体实现细节产生不必要的限制。
主动(拉取)与被动(推送),实时与非实时,乐观(默认消费成功,异常才反馈更新)与悲观(每次都更新),高效(乐观+粗粒度)与严谨(悲观+细粒度),强一致性与弱一致性,这些细节,在具体项目中我们如何实现的都无碍于数据模型的定义及生效。换句话说,无论我们如何权衡抉择实现这些细节,合格的模型都应该能够高效可靠的适应支撑业务最终具体达成。
原因,具体实现细节就不多讲了
真的是这样的吗?我们不妨来假设是在另种场景中另一种实现方式:实时推送用户端被动接收的方式

关键词:长链接,实时推送,削峰,ACK…
(未完待续…)
好像没什么赞,等有人看再续吧…

项目纪实--如何搭建一个高可用强一致性灵活元数据管理的数据平台实现高效可靠的数据分发等功能相关推荐

  1. 搭建一个高可用的镜像仓库,这是我见过最详细、最简单的教程

    作者 | 小碗汤 来源 | 我的小碗汤 今天分享一篇搭建一个高可用镜像仓库的教程.详细中夹杂着简单~. Harbor 部署架构图 harbor 使用 helm 部署在 k8s 集群中,通过 ingre ...

  2. 汇总-13台虚拟机搭建一个高可用负载均衡集群架构

    要求 用13台虚拟机搭建一个高可用负载均衡集群架构出来,并运行三个站点,具体需求如下. 设计你认为合理的架构,用visio把架构图画出来 搭建lnmp.tomcat+jdk环境 三个站点分别为:dis ...

  3. 2 小时快速搭建一个高可用的 IM 系统

    知道的越多,不知道的就越多,业余的像一棵小草! 编辑:业余草 来源:https://www.xttblog.com/?p=4994 本文快速搭建的 IM 系统也是使用 Go 语言来快速实现的,这里先和 ...

  4. Spring Cloud Eureka(三)实现一个高可用的注册中心

    Spring Cloud Eureka(三)实现一个高可用的注册中心 实现一个高可用的注册中心 在微服务结构这样的分布式环境中,我们需要充分考虑发生故障的情况,所以在生产环境中必须为服务的各个组件进行 ...

  5. 一次性搭建Hadoop高可用集群

    前期准备: 1.修改Linux主机名 2.修改IP 3.修改主机名和IP的映射关系 /etc/hosts 注意:如果你们公司是租用的服务器或是使用的云主机(如华为用主机.阿里云主机等) /etc/ho ...

  6. 项目 - 基于Docker Swarm的高可用Web集群

    目录 项目名称:基于Docker Swarm的高可用Web集群 项目环境:Docker 20.10.3,CentOS 8.2 (8台 1核1G),Ansible 2.9.17,Keepalived,N ...

  7. 03_项目-基于Docker Swarm的高可用Web集群

    文章目录 项目名称:基于Docker Swarm的高可用Web集群 网络拓扑图 数据流程图 项目环境:Docker 20.10.3,CentOS 8.2(8台 1核1G),Ansible 2.9.17 ...

  8. etcd集群搭建(高可用)

    一.etcd介绍: ETCD 是一个高可用的分布式键值数据库,可用于服务发现.ETCD 采用 raft 一致性算法,基于 Go 语言实现.etcd作为一个高可用键值存储系统,天生就是为集群化而设计的. ...

  9. Windows环境下安装HBase(Hadoop3.1.3、HBase2.2.5)与Linux环境下搭建HBase高可用集群

    Windows环境下安装HBase 下载HBase 官网: https://hbase.apache.org/downloads.html 不同版本集合:https://archive.apache. ...

最新文章

  1. 什么是用户智能,它与数据有什么关系?
  2. 科大星云诗社动态20210812
  3. boost::uuid模块实现不同形式随机生成器的基准的测试程序
  4. postgresql目录
  5. ASP.NET WebHooks RC 1发布
  6. 【动态规划笔记】背包问题:搬寝室
  7. java反射——构造方法
  8. Microsoft Office 2016(ProPlus/Visio/Project) VOL 简体中文版
  9. Protel99SE WIN10系统下无法添加封装库的解决方法
  10. 使用wamp3.0.6安装LimeSurvey时报“参数默认值只能为NULL”错误的解决办法
  11. UTC和GMT时间区别
  12. linux下修改tomcat默认主页
  13. 2020年如何利用外链提升网站排名和权重?
  14. 激活mircrosoft office2013
  15. Unity Predefined assemblies/assembly definition files
  16. win7系统备份怎么做的几种方法
  17. 【宇麦科技】盘点2022:企业网络安全的七大成功要素
  18. CSS中如何让背景颜色半透明
  19. 2019电商生意经(三):内容化,跨界与反击
  20. Renice INC:葡萄酒常用的保鲜方法

热门文章

  1. 利用Proteus仿真STM32实现按键控制LED灯设计
  2. PMI-ACP练习题(15)
  3. java面向对象一些基本练习题4
  4. 思科高级配置(配置标准ACL)
  5. 德丰杰全球创始人兼董事长Tim Draper让人吃惊的投资
  6. zoj2587 Marlon's String
  7. python 输出数据到Excel表格
  8. C++ Primer Plus(第六版)--学习杂记(第六章)
  9. mysql前台启动linux,mysql 在linux下的启动
  10. 如何扛住游戏流量高峰?Evil Dead 主创这样说