今天学习了Redis中比較高大上的名词,“公布订阅模式”。公布订阅模式这个词在我最開始接触听说的时候是在JMS(Java Message Service)java消息服务中听说的。这个名次用通俗的一点话说。就是我订阅了这类消息,当仅仅有这类的消息进行广播发送的时候。我才会。其它的消息直接过滤,保证了一个高效的传输效率。以下切入正题。学习一下Redis是怎样实现这个公布订阅模式的。先看看里面的简单的API构造;

/*-----------------------------------------------------------------------------* Pubsub low level API*----------------------------------------------------------------------------*/
void freePubsubPattern(void *p) /* 释放公布订阅的模式 */
int listMatchPubsubPattern(void *a, void *b) /* 公布订阅模式是否匹配 */
int clientSubscriptionsCount(redisClient *c) /* 返回client的所订阅的数量,包含channels + patterns管道和模式 */
int pubsubSubscribeChannel(redisClient *c, robj *channel) /* Client订阅一个Channel管道 */
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) /* 取消订阅Client中的Channel */
int pubsubSubscribePattern(redisClient *c, robj *pattern) /* Clientclient订阅一种模式 */
int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) /* Clientclient取消订阅pattern模式 */
int pubsubUnsubscribeAllChannels(redisClient *c, int notify) /* client取消自身订阅的全部Channel */
int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) /* client取消订阅全部的pattern模式 */
int pubsubPublishMessage(robj *channel, robj *message) /* 为全部订阅了Channel的Client发送消息message *//* ------------PUB/SUB API ---------------- */
void subscribeCommand(redisClient *c) /* 订阅Channel的命令 */
void unsubscribeCommand(redisClient *c) /* 取消订阅Channel的命令 */
void psubscribeCommand(redisClient *c) /* 订阅模式命令 */
void punsubscribeCommand(redisClient *c) /* 取消订阅模式命令 */
void publishCommand(redisClient *c) /* 公布消息命令 */
void pubsubCommand(redisClient *c) /* 公布订阅命令 */

在这里面出现了高频的词Pattern(模式)和Channel(频道,叫管道比較别扭)。也就是说,兴许全部的关于公布订阅的东东都是基于这2者展开进行的。如今大致解说一下在Redis中是怎样实现此中模式的:

1.在RedisClient 内部维护了一个pubsub_channels的Channel列表。记录了此client所订阅的频道

2.在Server服务端。相同维护着一个类似的变量叫做,pubsub_channels,这是一个dict字典变量,每个Channel相应着一批订阅了此频道的Client,也就是Channel-->list of Clients

3.当一个Client publish一个message的时候。会先去服务端的pubsub_channels找对应的Channel,遍历里面的Client。然后发送通知,即完毕了整个公布订阅模式。

我们能够简单的看一下Redis订阅一个Channel的方法实现;

/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or* 0 if the client was already subscribed to that channel. */
/* Client订阅一个Channel管道 */
int pubsubSubscribeChannel(redisClient *c, robj *channel) {struct dictEntry *de;list *clients = NULL;int retval = 0;/* Add the channel to the client -> channels hash table *///在Client的字典pubsub_channels中加入Channelif (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {retval = 1;incrRefCount(channel);/* Add the client to the channel -> list of clients hash table *///加入Clietn到server中的pubsub_channels,相应的列表中de = dictFind(server.pubsub_channels,channel);if (de == NULL) {//假设此频道的Client列表为空,则创建新列表并加入clients = listCreate();dictAdd(server.pubsub_channels,channel,clients);incrRefCount(channel);} else {//否则,获取这个频道的客户端列表。在尾部加入新的客户端clients = dictGetVal(de);}listAddNodeTail(clients,c);}/* Notify the client *///加入给回复客户端addReply(c,shared.mbulkhdr[3]);addReply(c,shared.subscribebulk);addReplyBulk(c,channel);addReplyLongLong(c,clientSubscriptionsCount(c));return retval;
}

加入操作主要分2部,Client自身的内部维护的pubsub_channels的加入。是一个dict字典对象,然后,是server端维护的pubsub_channels中的client列表的加入。在进行Channel频道的删除的时候,也是运行的这2步骤操作:

/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or* 0 if the client was not subscribed to the specified channel. */
/* 取消订阅Client中的Channel */
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {struct dictEntry *de;list *clients;listNode *ln;int retval = 0;/* Remove the channel from the client -> channels hash table */incrRefCount(channel); /* channel may be just a pointer to the same objectwe have in the hash tables. Protect it... *///字典删除Client中pubsub_channels中的Channelif (dictDelete(c->pubsub_channels,channel) == DICT_OK) {retval = 1;/* Remove the client from the channel -> clients list hash table *///再移除Channel相应的Client列表de = dictFind(server.pubsub_channels,channel);redisAssertWithInfo(c,NULL,de != NULL);clients = dictGetVal(de);ln = listSearchKey(clients,c);redisAssertWithInfo(c,NULL,ln != NULL);listDelNode(clients,ln);if (listLength(clients) == 0) {/* Free the list and associated hash entry at all if this was* the latest client, so that it will be possible to abuse* Redis PUBSUB creating millions of channels. */dictDelete(server.pubsub_channels,channel);}}/* Notify the client */if (notify) {addReply(c,shared.mbulkhdr[3]);addReply(c,shared.unsubscribebulk);addReplyBulk(c,channel);addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));}decrRefCount(channel); /* it is finally safe to release it */return retval;
}

里面还有相应的模式的订阅和取消订阅的操作,原理和channel全然一致。二者的差别在于,pattern是用来匹配的Channel的,这个是什么意思呢。在后面会做出答案,接着看。最后看一个最最核心的方法,客户端发步消息方法:

/* Publish a message */
/* 为全部订阅了Channel的Client发送消息message */
int pubsubPublishMessage(robj *channel, robj *message) {int receivers = 0;struct dictEntry *de;listNode *ln;listIter li;/* Send to clients listening for that channel *///找到Channel所相应的dictEntryde = dictFind(server.pubsub_channels,channel);if (de) {//获取此Channel相应的客户单列表list *list = dictGetVal(de);listNode *ln;listIter li;listRewind(list,&li);while ((ln = listNext(&li)) != NULL) {//依次取出List中的客户单,加入消息回复redisClient *c = ln->value;addReply(c,shared.mbulkhdr[3]);addReply(c,shared.messagebulk);addReplyBulk(c,channel);//加入消息回复addReplyBulk(c,message);receivers++;}}/* Send to clients listening to matching channels *//* 发送给尝试匹配该Channel的客户端消息 */if (listLength(server.pubsub_patterns)) {listRewind(server.pubsub_patterns,&li);channel = getDecodedObject(channel);while ((ln = listNext(&li)) != NULL) {pubsubPattern *pat = ln->value;//客户端的模式假设匹配了Channel。也会发送消息if (stringmatchlen((char*)pat->pattern->ptr,sdslen(pat->pattern->ptr),(char*)channel->ptr,sdslen(channel->ptr),0)) {addReply(pat->client,shared.mbulkhdr[4]);addReply(pat->client,shared.pmessagebulk);addReplyBulk(pat->client,pat->pattern);addReplyBulk(pat->client,channel);addReplyBulk(pat->client,message);receivers++;}}decrRefCount(channel);}return receivers;
}

pattern的作用就在上面体现了,假设某种pattern匹配了Channel频道,则模式的客户端也会接收消息。在server->pubsub_patterns中,pubsub_patterns是一个list列表,里面的每个pattern仅仅相应一个Client,就是上面的pat->client,这一点和Channel还是有本质的差别的。

讲完公布订阅模式的基本操作后。顺便把与此相关的notify通知类也稍稍讲讲,通知仅仅有3个方法。

/* ----------------- API ------------------- */
int keyspaceEventsStringToFlags(char *classes) /* 键值字符类型转为相应的Class类型 */
sds keyspaceEventsFlagsToString(int flags) /* 通过输入的flag值类,转为字符类型*/
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) /* 公布通知方法,分为2类,keySpace的通知。keyEvent的通知 */

涉及到string To flag 和flag To String 的转换,也不知道这个会在哪里用到;

/* Turn a string representing notification classes into an integer* representing notification classes flags xored.** The function returns -1 if the input contains characters not mapping to* any class. */
/* 键值字符类型转为相应的Class类型 */
int keyspaceEventsStringToFlags(char *classes) {char *p = classes;int c, flags = 0;while((c = *p++) != '\0') {switch(c) {case 'A': flags |= REDIS_NOTIFY_ALL; break;case 'g': flags |= REDIS_NOTIFY_GENERIC; break;case '$': flags |= REDIS_NOTIFY_STRING; break;case 'l': flags |= REDIS_NOTIFY_LIST; break;case 's': flags |= REDIS_NOTIFY_SET; break;case 'h': flags |= REDIS_NOTIFY_HASH; break;case 'z': flags |= REDIS_NOTIFY_ZSET; break;case 'x': flags |= REDIS_NOTIFY_EXPIRED; break;case 'e': flags |= REDIS_NOTIFY_EVICTED; break;case 'K': flags |= REDIS_NOTIFY_KEYSPACE; break;case 'E': flags |= REDIS_NOTIFY_KEYEVENT; break;default: return -1;}}return flags;
}

应该是响应键盘输入的类型和Redis类型之间的转换。在notify的方法另一个event事件的通知方法:

/* The API provided to the rest of the Redis core is a simple function:** notifyKeyspaceEvent(char *event, robj *key, int dbid);** 'event' is a C string representing the event name.* 'key' is a Redis object representing the key name.* 'dbid' is the database ID where the key lives.  */
/* 公布通知方法,分为2类,keySpace的通知,keyEvent的通知 */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {sds chan;robj *chanobj, *eventobj;int len = -1;char buf[24];/* If notifications for this class of events are off, return ASAP. */if (!(server.notify_keyspace_events & type)) return;eventobj = createStringObject(event,strlen(event));//2种的通知形式,略有区别/* __keyspace@<db>__:<key> <event> notifications. */if (server.notify_keyspace_events & REDIS_NOTIFY_KEYSPACE) {chan = sdsnewlen("__keyspace@",11);len = ll2string(buf,sizeof(buf),dbid);chan = sdscatlen(chan, buf, len);chan = sdscatlen(chan, "__:", 3);chan = sdscatsds(chan, key->ptr);chanobj = createObject(REDIS_STRING, chan);//上述几步操作,组件格式字符串。最后公布消息。以下keyEvent的通知同理pubsubPublishMessage(chanobj, eventobj);decrRefCount(chanobj);}/* __keyevente@<db>__:<event> <key> notifications. */if (server.notify_keyspace_events & REDIS_NOTIFY_KEYEVENT) {chan = sdsnewlen("__keyevent@",11);if (len == -1) len = ll2string(buf,sizeof(buf),dbid);chan = sdscatlen(chan, buf, len);chan = sdscatlen(chan, "__:", 3);chan = sdscatsds(chan, eventobj->ptr);chanobj = createObject(REDIS_STRING, chan);pubsubPublishMessage(chanobj, key);decrRefCount(chanobj);}decrRefCount(eventobj);
}

有keySpace和keyEvent的2种事件通知。

详细怎么用。等后面碰到的时候在看看。

Redis源代码分析(三十)--- pubsub公布订阅模式相关推荐

  1. Redis源代码分析(十)--- testhelp.h小测试框架和redis-check-aof.c 日志检测

    周期分析struct结构体redis代码.最后,越多越发现很多的代码其实大同小异.于struct有袋1,2不分析文件,关于set集合的一些东西,就放在下次分析好了,在选择下个分析的对象时,我考虑了一下 ...

  2. Nouveau源代码分析(三):NVIDIA设备初始化之nouveau_drm_probe

    Nouveau源代码分析(三) 向DRM注冊了Nouveau驱动之后,内核中的PCI模块就会扫描全部没有相应驱动的设备,然后和nouveau_drm_pci_table对比. 对于匹配的设备,PCI模 ...

  3. redis源代码分析 – event library - Dicky - 开源中国社区

    redis源代码分析 – event library - Dicky - 开源中国社区 redis源代码分析 – event library

  4. Redis进阶实践之十八 使用管道模式提高Redis查询的速度

    Redis进阶实践之十八 使用管道模式提高Redis查询的速度 原文:Redis进阶实践之十八 使用管道模式提高Redis查询的速度 一.引言 学习redis 也有一段时间了,该接触的也差不多了.后来 ...

  5. 【RabbitMQ】基础三:发布与订阅模式(Publish/Subscribe)

    [RabbitMQ]基础三:发布与订阅模式(Publish/Subscribe) 1. 订阅模式 2. 发布与订阅模式说明 3. 代码示例 3.1 生产者 3.2 消费者 3.3 测试 4. 总结 1 ...

  6. Android 中View的绘制机制源代码分析 三

    到眼下为止,measure过程已经解说完了,今天開始我们就来学习layout过程.只是在学习layout过程之前.大家有没有发现我换了编辑器,哈哈.最终下定决心从Html编辑器切换为markdown编 ...

  7. Redis源代码分析(十一年)--- memtest内存测试

    今天,我们继续redis源代码test下测试在封装中的其它文件.今天读数memtest档,翻译了,那是,memory test 存储器测试工具..可是里面的提及了非常多东西,也给我涨了非常多见识,网上 ...

  8. redis进阶之实现消息队列发布/订阅模式使用(七)

    Redis发布订阅 Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息.微信. 微博.关注系统! Redis 客户端可以订阅任意数量的频道. ...

  9. Redis 高级特性(2)—— 发布 订阅模式

    Redis 高级特性 -- 发布订阅 1. 发布-订阅介绍 "发布-订阅"模式包含两种角色,分别为发布者和订阅者.订阅者可以订阅一个或者若干个频道(channel),而发布者可以向 ...

  10. 【原创】Kakfa utils源代码分析(三)

    Kafka utils包最后一篇~~~ 十五.ShutdownableThread.scala 可关闭的线程抽象类! 继承自Thread同时还接收一个boolean变量isInterruptible表 ...

最新文章

  1. 对 Azure 虚拟网络网关的改进
  2. win7休眠设置在哪里_电脑休眠好不好?
  3. Angular 下的 directive (part 2)
  4. java基础----Base64算法的使用
  5. Android P DP1:WiFi-RTT、刘海、多摄像头、GIF动画、NNAPI 1.1
  6. java socket 二次发送_发过2次帖子,都没有了,再发。JAVA中SOCKET通信中的数据压缩问题...
  7. UI设计和UX设计有什么区别?
  8. 关于我对区块链和比特币的看法
  9. LayaAir 快捷键设置与资源命名规则
  10. 如何点击单选框 radio 后面的文字,选中单选框
  11. 详解Java的交互式编程环境:jshell
  12. 阿里云mysql1227_Navicat连接阿里云Mysql遇到的的坑
  13. 计算机组成原理字发生器,计算机组成原理实验2.7时序发生器赖晓铮剖析.ppt
  14. 用计算机软件绘制思维导图和手绘思维导图,原来手绘思维导图的好处这么多,你还在用软件画导图吗?...
  15. [足式机器人]Part3机构运动微分几何学分析与综合Ch02-3 平面机构离散运动鞍点综合——【读书笔记】
  16. Vert.x(vertx) 实现TCP服务
  17. 用墨刀做出的交互动效,10个优秀作品欣赏
  18. 广东公立二本计算机专业比较好,广东2a大学计算机专业比较好排名
  19. 关于主机的思维导图_「停课不停学」思维导图—初中语文全部知识点总结,高清可打印...
  20. Matlab数学建模笔记

热门文章

  1. Atitit 每个人都应该实施的互联网金融战略 attilax总结
  2. Atitit.eclise的ide特性-------abt 编译
  3. Atitit js版本es5 es6新特性
  4. paip.python错误解决18
  5. paip.得到程序运行实际命令
  6. paip.ASP 开发调试大总结
  7. 张志峰:华尔街归来11年
  8. 蚂蚁金服招聘-高级数据技术工程师、大数据研发工程师/专家
  9. 【图像去噪】基于matlab即插即用法图像去噪【含Matlab源码 152期】
  10. SPSS遇到缺失值怎么办?删除还是替换?【SPSS 067期】