Pubsub: Publish-subscribe发布订阅模式

运行环境

版本:go-ipfs@v0.4.23 go-libp2p-pubsub@v0.0.3

本文运行两个节点,一个在ubuntu,另外一个在windows,下文用ipfs1代表ubuntu端的ipfs,用ipfs2代表windows端的ipfs。两个节点将彼此的地址添加到彼此的bootstrap中,形成由2个节点组成的测试网。

两者的peerID分别为:

ipfs1: QmUB36eFCLEN4PvwSQaJ2tsEBr9epTm5h1rATuY11baZ6o
ipfs2: Qmco9fPhEC9aYsFxY3ZekoUMZucmNa9soWDXh6xgr6FsJy

两个节点启动daemon的时候都添加参数:–enable-pubsub-experiment

$ ipfs daemon --enable-pubsub-experiment

两个节点都将bitswap/dht/namesys/pubsub的log等级设置为debug

$ ipfs log level bitswap debug & ipfs log level dht debug & ipfs log level namesys debug & ipfs log level pubsub debug

ipfs pubsub命令实操

$ ipfs pubsub
USAGEipfs pubsub - An experimental publish-subscribe system on ipfs.ipfs pubsubipfs pubsub allows you to publish messages to a given topic, and also tosubscribe to new messages on a given topic.This is an experimental feature. It is not intended in its current stateto be used in a production environment.To use, the daemon must be run with '--enable-pubsub-experiment'.SUBCOMMANDSipfs pubsub ls                    - List subscribed topics by name.ipfs pubsub peers [<topic>]       - List peers we are currently pubsubbing with.ipfs pubsub pub <topic> <data>... - Publish a message to a given pubsub topic.ipfs pubsub sub <topic>           - Subscribe to messages on a given topic.For more information about each command, use:'ipfs pubsub <subcmd> --help'

首先ipfs2订阅"phone"这个topic

$ ipfs pubsub sub phone

接着ipfs1 publish", topic为"phone",内容为"Moring"

$ ipfs pubsub pub phone "Moring"

最后看ipfs2的终端

$ ipfs pubsub sub phone
Moring

在ipfs2中另外开一个终端,查询节点当前订阅的topic

$ ipfs pubsub ls
phone

ipfs1查找订阅"phone"这个topic的peer,如果topic为空,默认是查找所有的topic

ipfs pubsub peers phone
QmUB36eFCLEN4PvwSQaJ2tsEBr9epTm5h1rATuY11baZ6o

代码讲解

ipfs config文件中关于pubsub的默认配置

"Pubsub": {"DisableSigning": false,"Router": "","StrictSignatureVerification": false}

DisableSigning为false,表示本节点publish消息时需要签名,StrictSignatureVerification为false,表明接受不带签名的publish消息。

Router有3种,默认使用的是FloodSubRouter,RandomSubRouter目前没有被ipfs使用。

FloodSubRouter
GossipSubRouter
RandomSubRouter

顾名思义,Flood是洪泛的意思,即向所有订阅节点发布(publish);gossip是私语的意思,即向6个订阅节点发布;random是随机的意思,即随机向6个订阅节点发布。
值得注意的是,FloodSubRouter是基础协议,GossipSubRouter和RandomSubRouter都支持FloodSubRouter。

显然,GossipSubRouter最好用,也最复杂,接下来主要讲解GossipSubRouter

GossipSubRouter

GossipSubRouter结构:

type GossipSubRouter struct {p       *PubSubpeers   map[peer.ID]protocol.ID         // peer protocolsmesh    map[string]map[peer.ID]struct{} // topic meshesfanout  map[string]map[peer.ID]struct{} // topic fanoutlastpub map[string]int64                // last publish time for fanout topicsgossip  map[peer.ID][]*pb.ControlIHave  // pending gossipcontrol map[peer.ID]*pb.ControlMessage  // pending control messagesmcache  *MessageCache
}

GossipSub以topic为单位维护两个网络,分别为mesh和fanout,两者都包含订阅该topic的节点,如果自身订阅了一个topic,则mesh[topic]不为空,fanout为空;如果没有订阅,则mesh[topic]为空。

publish过程

publish的时候,首先会判断自己有没有订阅这个topic,即判断mesh[topic]是否为空,如果不为空,就向mesh[topic]中的节点发送publish消息;如果为空,就判断fanout[topic]是否为空,如果不为空,就向fanout[topic]中的节点发送publish消息;如果为空,就从pubsub.topic[topic]中向fanout[topic]补充节点至多6个节点,最后向这些节点发送publish消息。

其它节点收到publish消息后,会首先判断是否已经收到过这个publish消息,如果已经收到就将其丢弃;如果之前没收到,首先校验消息(如果publish消息附带签名,就检验签名),除了推送给本地的订阅了topic的对象,还会将该消息原封不动publish出去,过程同上,以此完成消息传递过程,这是一个典型的gossip过程。

为了回收空间,对于一个topic,如果1分钟不publish,就会delete fanout[topic]。

go-libp2p-pubsub/gossipsub.go

 // overlay parametersGossipSubD   = 6GossipSubDlo = 4GossipSubDhi = 12// gossip parametersGossipSubHistoryLength = 5GossipSubHistoryGossip = 3// heartbeat intervalGossipSubHeartbeatInitialDelay = 100 * time.MillisecondGossipSubHeartbeatInterval     = 1 * time.Second// fanout ttlGossipSubFanoutTTL = 60 * time.Second

Subscribe过程

当Subscribe一个topic时,首先会判断自己有没有订阅这个topic,即判断pubsub.myTopics[topic]是否为空,如果不为空,表明自己已经订阅,直接return; 如果为空,则join gossip mesh,首先mesh[topic]是否为空,如果不为空,表明之前已经join了,直接return; 如果为空,就判断fanout[topic]是否为空,如果不为空,表明自己在1min内publish过这个topic,则将fanout[topic]移出到mesh[topic],并delete fanout[topic]。 如果为空,从pubsub.topics[topic]补充6个节点到mesh[topic],并且向这些补充进来的节点发送iGraft消息,告诉它我把它加入到mesh[topic],其它节点收到iGraft消息后,也会把我加入到mesh[topic]中。

取消订阅时,首先会判断自己有没有订阅这个topic,即判断mesh[topic]是否为空,如果为空,表明自己没有订阅,直接return;如果不为空,表明自己已经订阅,则删除mesh[topic],并且向那些被剔除的节点发送iPrune消息,告诉它我把它从mesh[topic]网络剔除了,其它节点收到iPrune消息后,也会把我从mesh[topic]剔除。

消息交换&网络控制

为了确保网络节点在线,gossip每1S就会发送一个心跳包,以维护网络。心跳包的另外一个功能是交换信息,如iHave、iGraft和iPrune。

其它节点收到心跳包后,会判断iHave是否有自己想要的消息,如果有,那么向其发送iWant消息。

gossipSub每1S都会检查mesh[topic]的节点个数,如果少于6个节点则从pubsub.topics[topic]中补充,并且向这些补充进来的节点发送iGraft消息,告诉它我把它加入到mesh[topic],其它节点收到iGraft消息后,也会把我加入到mesh[topic]中;

如果多于6个节点,就会随机剔除多余的节点,并且向那些被剔除的节点发送iPrune消息,告诉它我把它从mesh[topic]网络剔除了,其它节点收到iPrune消息后,也会把我从mesh[topic]剔除。

RPC消息

pubsub使用RPC通信

// pubsub.go
type RPC struct {pb.RPC// unexported on purpose, not sending this over the wirefrom peer.ID
}// rpc.pb.go
type RPC struct {Subscriptions        []*RPC_SubOpts  `protobuf:"bytes,1,rep,name=subscriptions" json:"subscriptions,omitempty"`Publish              []*Message      `protobuf:"bytes,2,rep,name=publish" json:"publish,omitempty"`Control              *ControlMessage `protobuf:"bytes,3,opt,name=control" json:"control,omitempty"`
}type RPC_SubOpts struct {Subscribe            *bool    `protobuf:"varint,1,opt,name=subscribe" json:"subscribe,omitempty"`Topicid              *string  `protobuf:"bytes,2,opt,name=topicid" json:"topicid,omitempty"`
}type Message struct {From                 []byte   `protobuf:"bytes,1,opt,name=from" json:"from,omitempty"`Data                 []byte   `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`Seqno                []byte   `protobuf:"bytes,3,opt,name=seqno" json:"seqno,omitempty"`TopicIDs             []string `protobuf:"bytes,4,rep,name=topicIDs" json:"topicIDs,omitempty"`Signature            []byte   `protobuf:"bytes,5,opt,name=signature" json:"signature,omitempty"`
}type ControlMessage struct {Ihave                []*ControlIHave `protobuf:"bytes,1,rep,name=ihave" json:"ihave,omitempty"`Iwant                []*ControlIWant `protobuf:"bytes,2,rep,name=iwant" json:"iwant,omitempty"`Graft                []*ControlGraft `protobuf:"bytes,3,rep,name=graft" json:"graft,omitempty"`Prune                []*ControlPrune `protobuf:"bytes,4,rep,name=prune" json:"prune,omitempty"`
}type ControlIHave struct {TopicID              *string  `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"`MessageIDs           []string `protobuf:"bytes,2,rep,name=messageIDs" json:"messageIDs,omitempty"`
}type ControlIWant struct {MessageIDs           []string `protobuf:"bytes,1,rep,name=messageIDs" json:"messageIDs,omitempty"`
}type ControlGraft struct {TopicID              *string  `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"`
}type ControlPrune struct {TopicID              *string  `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"`
}

RPC_SubOpts封装的是订阅信息,即是否订阅某一topic。当有新peer连接的时候,ipfs1会和新peer握手(Helo),握手内容是Subscriptions,即当前订阅的topic。其它节点收到ipfs1发过来的hello(rpc)后,会把其peerID加入到pubsub.topics[topic]中,以便后续publish或者Subscribe该topic的时候,找得到订阅的peer。

Message封装的是publish消息,其中From是publisher;Data是内容数据;Seqno是序列号,用于标识版本,每次publish,Seqno加1(pubsub.counter++);TopicIDs是topic集;Signature是签名。

ControlMessage封装的是控制消息,分别是IWant, IHave、IGraft和IPrune, 其中后3者以topic为单位。

ipfs pubsub代码解读相关推荐

  1. IPFS pubsub功能的使用

    什么是 pubsub? 发布订阅模式(Publish–subscribe pattern),最早是由苹果公司在 Mac OS 引入. 消息的发送者(publishers)不直接将消息发送给接收者(su ...

  2. 200行代码解读TDEngine背后的定时器

    作者 | beyondma来源 | CSDN博客 导读:最近几周,本文作者几篇有关陶建辉老师最新的创业项目-TdEngine代码解读文章出人意料地引起了巨大的反响,原以为C语言已经是昨日黄花,不过从读 ...

  3. 装逼一步到位!GauGAN代码解读来了

    ↑↑↑关注后"星标"Datawhale 每日干货 & 每月组队学习,不错过 Datawhale干货 作者:游璐颖,福州大学,Datawhale成员 AI神笔马良 如何装逼一 ...

  4. Unet论文解读代码解读

    论文地址:http://www.arxiv.org/pdf/1505.04597.pdf 论文解读 网络 架构: a.U-net建立在FCN的网络架构上,作者修改并扩大了这个网络框架,使其能够使用很少 ...

  5. Lossless Codec---APE代码解读系列(二)

    APE file 一些概念 APE代码解读系列(一) APE代码解读系列(三) 1. 先要了解APE compression level APE主要有5level, 分别是: CompressionL ...

  6. RT-Thread 学习笔记(五)—— RTGUI代码解读

    ---恢复内容开始--- RT-Thread 版本:2.1.0 RTGUI相关代码解读,仅为自己学习记录,若有错误之处,请告知maoxudong0813@163.com,不胜感激! GUI流程: ma ...

  7. vins 解读_代码解读 | VINS 视觉前端

    AI 人工智能 代码解读 | VINS 视觉前端 本文作者是计算机视觉life公众号成员蔡量力,由于格式问题部分内容显示可能有问题,更好的阅读体验,请查看原文链接:代码解读 | VINS 视觉前端 v ...

  8. BERT:代码解读、实体关系抽取实战

    目录 前言 一.BERT的主要亮点 1. 双向Transformers 2.句子级别的应用 3.能够解决的任务 二.BERT代码解读 1. 数据预处理 1.1 InputExample类 1.2 In ...

  9. shfflenetv2代码解读

    shufflenetv2代码解读 目录 shufflenetv2代码解读 概述 shufflenetv2网络结构图 shufflenetv2架构参数 shufflenetv2代码细节分析 概述 shu ...

最新文章

  1. excel文件无法打印提示内存不足_三星打印机无法扫描文件?来看看她的详细解决办法...
  2. python如何在官网下载1005无标题,如何安装python cairo?
  3. linux配置chrony时间同步
  4. 《Linux内核设计与实现》读书笔记(十二)- 内存管理
  5. Python定时任务调度——APScheduler
  6. [html] 说说你对WEB标准和W3C的理解与认识?
  7. 蓝桥杯 第七届 JAVA B组 凑算式
  8. 自动生成mybatis代码
  9. autosar架构_(1)Testing-Autosar架构及模块描述
  10. IIC原理超详细讲解---值得一看
  11. ARM嵌入式系统中的体系结构
  12. cmd命令行激活win7
  13. zcu102_14_Zynq在Standalone下使用uGUI
  14. linux网站权限恢复,RMAN异机恢复——备份集权限问题
  15. JavaScrip笔记心得(持续更新)
  16. Python手撸机器学习系列(十五):简单神经网络
  17. java基于微信小程序的校园二手闲置商品交易平台 uinapp 计算机毕业设计
  18. 大数据的分布式数据库技术的对比
  19. pku 1265 Area
  20. 北京时间与UTC时间转换

热门文章

  1. unreal4特性介绍
  2. Unreal4+Qt+Plugins(unrealcv)安装教程
  3. 日记记事本java实训,黑马学习日记  GUI实现简单记事本功能
  4. .jnlp 文件打开方式
  5. 仿钉钉考勤统计页面的日历组件,通过日历展示每日考勤打卡情况,支持在日历上打两种不同类型的点,大致适配各种分辨率效果图
  6. HTML简单动画制作
  7. 独家爆料:创宇云与小鸟云的故事
  8. 青少年计算机等级测试内容,青少年人工智能技术水平测试一级等级考试介绍
  9. exponential backoff algorithm
  10. 对物联网的感悟_物联网心得体会总结