分为频道与模式发布订阅两类

1、相关数据结构

1.1  redisServer

  /* Pubsub */dict *pubsub_channels;  /* Map channels to list of subscribed clients */list *pubsub_patterns;  /* A list of pubsub_patterns */dict *pubsub_patterns_dict;  /* A dict of pubsub_patterns */int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is anxor of NOTIFY_... flags. */

1.2 client

 dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */

2、频道订阅及退订

2.1 订阅

频道订阅是通过命令subscribe来完成的。

void subscribeCommand(client *c) {int j;for (j = 1; j < c->argc; j++)pubsubSubscribeChannel(c,c->argv[j]);c->flags |= CLIENT_PUBSUB;
}

首先通过dictAdd将channle添加到client中的pubsub_channels,如果字典中原来有,添加不成功。没有时才添加成功。然后看server中的pubsub_channels中的是否有channel,没有则创建一个clients的空列表,将channel与clients的对应关系添加到字典中,同时将当前client添加到channel对应的clients列表中。

int pubsubSubscribeChannel(client *c, robj *channel) {dictEntry *de;list *clients = NULL;int retval = 0;/* Add the channel to the client -> channels hash table */if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {retval = 1;incrRefCount(channel);/* Add the client to the channel -> list of clients hash table */de = dictFind(server.pubsub_channels,channel);if (de == NULL) {clients = listCreate();dictAdd(server.pubsub_channels,channel,clients);incrRefCount(channel);} else {clients = dictGetVal(de);}listAddNodeTail(clients,c);}/* Notify the client */addReplyPubsubSubscribed(c,channel);return retval;
}

2.2 退订

通过命令unsubscribe来退订。不带参数,表示退订所有已订阅的通道。带参数表示只退订指定通道的

void unsubscribeCommand(client *c) {if (c->argc == 1) {pubsubUnsubscribeAllChannels(c,1);} else {int j;for (j = 1; j < c->argc; j++)pubsubUnsubscribeChannel(c,c->argv[j],1);}if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}

删除client中pubsub_channels中的channel,同时也会删除server中pubsub_channels中的channel中对应的client

int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {dictEntry *de;list *clients;listNode *ln;int retval = 0;/* Remove the channel from the client -> channels hash table */incrRefCount(channel); /* channel may be just a pointer to the same objectwe have in the hash tables. Protect it... */if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {retval = 1;/* Remove the client from the channel -> clients list hash table */de = dictFind(server.pubsub_channels,channel);serverAssertWithInfo(c,NULL,de != NULL);clients = dictGetVal(de);ln = listSearchKey(clients,c);serverAssertWithInfo(c,NULL,ln != NULL);listDelNode(clients,ln);if (listLength(clients) == 0) {/* Free the list and associated hash entry at all if this was* the latest client, so that it will be possible to abuse* Redis PUBSUB creating millions of channels. */dictDelete(server.pubsub_channels,channel);}}/* Notify the client */if (notify) addReplyPubsubUnsubscribed(c,channel);decrRefCount(channel); /* it is finally safe to release it */return retval;
}

3、模式订阅及退订

3.1 订阅

通过命令psubscribe来完成。

void psubscribeCommand(client *c) {int j;for (j = 1; j < c->argc; j++)pubsubSubscribePattern(c,c->argv[j]);c->flags |= CLIENT_PUBSUB;
}

看client中pubsub_patterns中是否有pattern,如果没有,则添加pubsub_patterns列表末尾。同时添加到sever中的pubsub_patterns列表末尾及pubsub_patterns_dict字典中。

int pubsubSubscribePattern(client *c, robj *pattern) {dictEntry *de;list *clients;int retval = 0;if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {retval = 1;pubsubPattern *pat;listAddNodeTail(c->pubsub_patterns,pattern);incrRefCount(pattern);pat = zmalloc(sizeof(*pat));pat->pattern = getDecodedObject(pattern);pat->client = c;listAddNodeTail(server.pubsub_patterns,pat);/* Add the client to the pattern -> list of clients hash table */de = dictFind(server.pubsub_patterns_dict,pattern);if (de == NULL) {clients = listCreate();dictAdd(server.pubsub_patterns_dict,pattern,clients);incrRefCount(pattern);} else {clients = dictGetVal(de);}listAddNodeTail(clients,c);}/* Notify the client */addReplyPubsubPatSubscribed(c,pattern);return retval;
}

3.2 退订

通过命令punsubscribe来退订。不带参数,表示退订所有已订阅的模式。带参数表示只退订指定模式的

void punsubscribeCommand(client *c) {if (c->argc == 1) {pubsubUnsubscribeAllPatterns(c,1);} else {int j;for (j = 1; j < c->argc; j++)pubsubUnsubscribePattern(c,c->argv[j],1);}if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}

删除client中pubsub_patterns对应的pattern。同时删除server中pubsub_patterns中的pattern,pubsub_patterns_dict中pattern对应的client。

int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {dictEntry *de;list *clients;listNode *ln;pubsubPattern pat;int retval = 0;incrRefCount(pattern); /* Protect the object. May be the same we remove */if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {retval = 1;listDelNode(c->pubsub_patterns,ln);pat.client = c;pat.pattern = pattern;ln = listSearchKey(server.pubsub_patterns,&pat);listDelNode(server.pubsub_patterns,ln);/* Remove the client from the pattern -> clients list hash table */de = dictFind(server.pubsub_patterns_dict,pattern);serverAssertWithInfo(c,NULL,de != NULL);clients = dictGetVal(de);ln = listSearchKey(clients,c);serverAssertWithInfo(c,NULL,ln != NULL);listDelNode(clients,ln);if (listLength(clients) == 0) {/* Free the list and associated hash entry at all if this was* the latest client. */dictDelete(server.pubsub_patterns_dict,pattern);}}/* Notify the client */if (notify) addReplyPubsubPatUnsubscribed(c,pattern);decrRefCount(pattern);return retval;
}

4、发布

通过命令publish来完成

void publishCommand(client *c) {int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);if (server.cluster_enabled)clusterPropagatePublish(c->argv[1],c->argv[2]);elseforceCommandPropagation(c,PROPAGATE_REPL);addReplyLongLong(c,receivers);
}

查找server中的pubsub_channelrs的channel,遍历客户端发布消息。遍历server.pubsub_patterns_dict,看模式是否与channel匹配,如果匹配,则遍历客户端发送消息

int pubsubPublishMessage(robj *channel, robj *message) {int receivers = 0;dictEntry *de;dictIterator *di;listNode *ln;listIter li;/* Send to clients listening for that channel */de = dictFind(server.pubsub_channels,channel);if (de) {list *list = dictGetVal(de);listNode *ln;listIter li;listRewind(list,&li);while ((ln = listNext(&li)) != NULL) {client *c = ln->value;addReplyPubsubMessage(c,channel,message);receivers++;}}/* Send to clients listening to matching channels */di = dictGetIterator(server.pubsub_patterns_dict);if (di) {channel = getDecodedObject(channel);while((de = dictNext(di)) != NULL) {robj *pattern = dictGetKey(de);list *clients = dictGetVal(de);if (!stringmatchlen((char*)pattern->ptr,sdslen(pattern->ptr),(char*)channel->ptr,sdslen(channel->ptr),0)) continue;listRewind(clients,&li);while ((ln = listNext(&li)) != NULL) {client *c = listNodeValue(ln);addReplyPubsubPatMessage(c,pattern,channel,message);receivers++;}}decrRefCount(channel);dictReleaseIterator(di);}return receivers;
}

redis中的发布订阅相关推荐

  1. Redis中的发布订阅模式

    列表的局限 前面我们说通过队列的rpush 和lpop 可以实现消息队列(队尾进队头出),但是消费者需要不停地调用lpop 查看List 中是否有等待处理的消息(比如写一个while 循环).为了减少 ...

  2. Redis 中的发布/订阅功能

    发布/ 订阅系统 是 Web 系统中比较常用的一个功能.简单点说就是 发布者发布消息,订阅者接受消息,这有点类似于我们的报纸/ 杂志社之类的: 虽然可以使用一个 list 列表结构结合 lpush 和 ...

  3. Redis中的发布与订阅

    redis中实现发布与订阅相对于zookeeper非常简单.直接使用publish和subscribe就行. subscrible news; 订阅news这个channel publish news ...

  4. Redis总结之发布订阅

    绪论 理论知识 先介绍一下发布与订阅的基础知识: Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息. 发布者不是直接将消息发送给特定的接收者 ...

  5. 硬核 | Redis Pub/Sub 发布订阅与宅男有什么关系?

    "65 哥,如果你交了个漂亮小姐姐做女朋友,你会通过什么方式将这个消息广而告之给你的微信好友?" "那不得拍点女朋友的美照 + 亲密照弄一个九宫格图文消息在朋友圈发布大肆 ...

  6. Javascript中理解发布--订阅模式

    Javascript中理解发布--订阅模式 阅读目录 发布订阅模式介绍 如何实现发布--订阅模式? 发布---订阅模式的代码封装 如何取消订阅事件? 全局--发布订阅对象代码封装 理解模块间通信 回到 ...

  7. Redis中的发布与订阅的概念与以命令行的方式实现发布订阅举例

    场景 什么是发布与订阅 发布订阅是一种应用程序(系统)之间通讯,传递数据的技术手段,特别是在异构(不同语言)系统之间的作用非常明显. 发布订阅: 类似于微信中关注公众号/订阅号,公众号/订阅号发布的文 ...

  8. redis的观察者模式----------发布订阅功能

    2019独角兽企业重金招聘Python工程师标准>>> 众所周知,Java,C++等面向对象有一种常见的设计模式:观察者模式,redis这种机制叫做发布订阅功能. 以下假设已redi ...

  9. 事物与持久化_揭开Redis面纱,发布订阅、事务、安全、持久化

    一.Redis发布订阅 Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息. 打开两个窗口:session1 和 session2 在sess ...

最新文章

  1. 企业的7种工作管理最佳实践
  2. linux awk 用一个或多个空格做分隔符
  3. java十年技术栈[总结复习用]
  4. UBuntu20.04下安装Matlab2015B
  5. aMDcpu不支持mysql_Oracle 11.2.0.1在AMD CPU 64位硬件,32位操作系统下的BUG 8670579
  6. Android学习之简单地使用碎片
  7. iOS面试题合集(77道)
  8. token与sessionId的区别——学习笔记
  9. find命令日常用法和文件名后缀
  10. Pytorch学习笔记调整学习率torch.optim.lr_scheduler._LRScheduler
  11. 计算机office基础知识题库,计算机一级MS Office基础考试题库
  12. 基于vue的仿网易云音乐播放器
  13. css背景图不失真_CSS如何实现这种背景效果?
  14. 用Hydra工具暴力破解Windows7管理员密码并访问它的共享服务
  15. 基础知识(五)Blend2015 具有强大的组合功能,即合并功能。
  16. 获取Word2vec训练得到的所有词与词向量
  17. Git内部原理之深入解析环境变量
  18. 12. 查询表product——查询库存商品中,最高单价、最低单价分别是多少
  19. uniapp配置全局样式
  20. 安装惠普打印机显示等待php,HP打印机安装过程中报错0x000006be的解决方法

热门文章

  1. 2017-03-02学习心得之Java代码
  2. python爬虫获取的网页数据为什么要加[0-[Python爬虫] 等待网页加载后再获取内容...
  3. 会计专业为什么要学python-一个来自35岁职场高管的忠告:Python为什么不得不学?...
  4. python小课骗局-Python小课怎么样啊?
  5. python 仪表盘-python仪表盘
  6. 如何自学python编程-零基础如何自学编程?
  7. 中国python之父是谁-Python之父:谈Python
  8. python映射类型-python映射类型的相关介绍
  9. 如何查看python是多少位的-请问一下该怎么查看python是32位还是64位?
  10. 零基础学python用什么书-零基础自学python3 好用的入门书籍推荐