ipfs pubsub代码解读
Pubsub: Publish-subscribe发布订阅模式
运行环境
版本:go-ipfs@v0.4.23 go-libp2p-pubsub@v0.0.3
本文运行两个节点,一个在ubuntu,另外一个在windows,下文用ipfs1代表ubuntu端的ipfs,用ipfs2代表windows端的ipfs。两个节点将彼此的地址添加到彼此的bootstrap中,形成由2个节点组成的测试网。
两者的peerID分别为:
ipfs1: QmUB36eFCLEN4PvwSQaJ2tsEBr9epTm5h1rATuY11baZ6o
ipfs2: Qmco9fPhEC9aYsFxY3ZekoUMZucmNa9soWDXh6xgr6FsJy
两个节点启动daemon的时候都添加参数:–enable-pubsub-experiment
$ ipfs daemon --enable-pubsub-experiment
两个节点都将bitswap/dht/namesys/pubsub的log等级设置为debug
$ ipfs log level bitswap debug & ipfs log level dht debug & ipfs log level namesys debug & ipfs log level pubsub debug
ipfs pubsub命令实操
$ ipfs pubsub
USAGEipfs pubsub - An experimental publish-subscribe system on ipfs.ipfs pubsubipfs pubsub allows you to publish messages to a given topic, and also tosubscribe to new messages on a given topic.This is an experimental feature. It is not intended in its current stateto be used in a production environment.To use, the daemon must be run with '--enable-pubsub-experiment'.SUBCOMMANDSipfs pubsub ls - List subscribed topics by name.ipfs pubsub peers [<topic>] - List peers we are currently pubsubbing with.ipfs pubsub pub <topic> <data>... - Publish a message to a given pubsub topic.ipfs pubsub sub <topic> - Subscribe to messages on a given topic.For more information about each command, use:'ipfs pubsub <subcmd> --help'
首先ipfs2订阅"phone"这个topic
$ ipfs pubsub sub phone
接着ipfs1 publish", topic为"phone",内容为"Moring"
$ ipfs pubsub pub phone "Moring"
最后看ipfs2的终端
$ ipfs pubsub sub phone
Moring
在ipfs2中另外开一个终端,查询节点当前订阅的topic
$ ipfs pubsub ls
phone
ipfs1查找订阅"phone"这个topic的peer,如果topic为空,默认是查找所有的topic
ipfs pubsub peers phone
QmUB36eFCLEN4PvwSQaJ2tsEBr9epTm5h1rATuY11baZ6o
代码讲解
ipfs config文件中关于pubsub的默认配置
"Pubsub": {"DisableSigning": false,"Router": "","StrictSignatureVerification": false}
DisableSigning为false,表示本节点publish消息时需要签名,StrictSignatureVerification为false,表明接受不带签名的publish消息。
Router有3种,默认使用的是FloodSubRouter,RandomSubRouter目前没有被ipfs使用。
FloodSubRouter
GossipSubRouter
RandomSubRouter
顾名思义,Flood是洪泛的意思,即向所有订阅节点发布(publish);gossip是私语的意思,即向6个订阅节点发布;random是随机的意思,即随机向6个订阅节点发布。
值得注意的是,FloodSubRouter是基础协议,GossipSubRouter和RandomSubRouter都支持FloodSubRouter。
显然,GossipSubRouter最好用,也最复杂,接下来主要讲解GossipSubRouter
GossipSubRouter
GossipSubRouter结构:
type GossipSubRouter struct {p *PubSubpeers map[peer.ID]protocol.ID // peer protocolsmesh map[string]map[peer.ID]struct{} // topic meshesfanout map[string]map[peer.ID]struct{} // topic fanoutlastpub map[string]int64 // last publish time for fanout topicsgossip map[peer.ID][]*pb.ControlIHave // pending gossipcontrol map[peer.ID]*pb.ControlMessage // pending control messagesmcache *MessageCache
}
GossipSub以topic为单位维护两个网络,分别为mesh和fanout,两者都包含订阅该topic的节点,如果自身订阅了一个topic,则mesh[topic]不为空,fanout为空;如果没有订阅,则mesh[topic]为空。
publish过程
publish的时候,首先会判断自己有没有订阅这个topic,即判断mesh[topic]是否为空,如果不为空,就向mesh[topic]中的节点发送publish消息;如果为空,就判断fanout[topic]是否为空,如果不为空,就向fanout[topic]中的节点发送publish消息;如果为空,就从pubsub.topic[topic]中向fanout[topic]补充节点至多6个节点,最后向这些节点发送publish消息。
其它节点收到publish消息后,会首先判断是否已经收到过这个publish消息,如果已经收到就将其丢弃;如果之前没收到,首先校验消息(如果publish消息附带签名,就检验签名),除了推送给本地的订阅了topic的对象,还会将该消息原封不动publish出去,过程同上,以此完成消息传递过程,这是一个典型的gossip过程。
为了回收空间,对于一个topic,如果1分钟不publish,就会delete fanout[topic]。
go-libp2p-pubsub/gossipsub.go
// overlay parametersGossipSubD = 6GossipSubDlo = 4GossipSubDhi = 12// gossip parametersGossipSubHistoryLength = 5GossipSubHistoryGossip = 3// heartbeat intervalGossipSubHeartbeatInitialDelay = 100 * time.MillisecondGossipSubHeartbeatInterval = 1 * time.Second// fanout ttlGossipSubFanoutTTL = 60 * time.Second
Subscribe过程
当Subscribe一个topic时,首先会判断自己有没有订阅这个topic,即判断pubsub.myTopics[topic]是否为空,如果不为空,表明自己已经订阅,直接return; 如果为空,则join gossip mesh,首先mesh[topic]是否为空,如果不为空,表明之前已经join了,直接return; 如果为空,就判断fanout[topic]是否为空,如果不为空,表明自己在1min内publish过这个topic,则将fanout[topic]移出到mesh[topic],并delete fanout[topic]。 如果为空,从pubsub.topics[topic]补充6个节点到mesh[topic],并且向这些补充进来的节点发送iGraft消息,告诉它我把它加入到mesh[topic],其它节点收到iGraft消息后,也会把我加入到mesh[topic]中。
取消订阅时,首先会判断自己有没有订阅这个topic,即判断mesh[topic]是否为空,如果为空,表明自己没有订阅,直接return;如果不为空,表明自己已经订阅,则删除mesh[topic],并且向那些被剔除的节点发送iPrune消息,告诉它我把它从mesh[topic]网络剔除了,其它节点收到iPrune消息后,也会把我从mesh[topic]剔除。
消息交换&网络控制
为了确保网络节点在线,gossip每1S就会发送一个心跳包,以维护网络。心跳包的另外一个功能是交换信息,如iHave、iGraft和iPrune。
其它节点收到心跳包后,会判断iHave是否有自己想要的消息,如果有,那么向其发送iWant消息。
gossipSub每1S都会检查mesh[topic]的节点个数,如果少于6个节点则从pubsub.topics[topic]中补充,并且向这些补充进来的节点发送iGraft消息,告诉它我把它加入到mesh[topic],其它节点收到iGraft消息后,也会把我加入到mesh[topic]中;
如果多于6个节点,就会随机剔除多余的节点,并且向那些被剔除的节点发送iPrune消息,告诉它我把它从mesh[topic]网络剔除了,其它节点收到iPrune消息后,也会把我从mesh[topic]剔除。
RPC消息
pubsub使用RPC通信
// pubsub.go
type RPC struct {pb.RPC// unexported on purpose, not sending this over the wirefrom peer.ID
}// rpc.pb.go
type RPC struct {Subscriptions []*RPC_SubOpts `protobuf:"bytes,1,rep,name=subscriptions" json:"subscriptions,omitempty"`Publish []*Message `protobuf:"bytes,2,rep,name=publish" json:"publish,omitempty"`Control *ControlMessage `protobuf:"bytes,3,opt,name=control" json:"control,omitempty"`
}type RPC_SubOpts struct {Subscribe *bool `protobuf:"varint,1,opt,name=subscribe" json:"subscribe,omitempty"`Topicid *string `protobuf:"bytes,2,opt,name=topicid" json:"topicid,omitempty"`
}type Message struct {From []byte `protobuf:"bytes,1,opt,name=from" json:"from,omitempty"`Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`Seqno []byte `protobuf:"bytes,3,opt,name=seqno" json:"seqno,omitempty"`TopicIDs []string `protobuf:"bytes,4,rep,name=topicIDs" json:"topicIDs,omitempty"`Signature []byte `protobuf:"bytes,5,opt,name=signature" json:"signature,omitempty"`
}type ControlMessage struct {Ihave []*ControlIHave `protobuf:"bytes,1,rep,name=ihave" json:"ihave,omitempty"`Iwant []*ControlIWant `protobuf:"bytes,2,rep,name=iwant" json:"iwant,omitempty"`Graft []*ControlGraft `protobuf:"bytes,3,rep,name=graft" json:"graft,omitempty"`Prune []*ControlPrune `protobuf:"bytes,4,rep,name=prune" json:"prune,omitempty"`
}type ControlIHave struct {TopicID *string `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"`MessageIDs []string `protobuf:"bytes,2,rep,name=messageIDs" json:"messageIDs,omitempty"`
}type ControlIWant struct {MessageIDs []string `protobuf:"bytes,1,rep,name=messageIDs" json:"messageIDs,omitempty"`
}type ControlGraft struct {TopicID *string `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"`
}type ControlPrune struct {TopicID *string `protobuf:"bytes,1,opt,name=topicID" json:"topicID,omitempty"`
}
RPC_SubOpts封装的是订阅信息,即是否订阅某一topic。当有新peer连接的时候,ipfs1会和新peer握手(Helo),握手内容是Subscriptions,即当前订阅的topic。其它节点收到ipfs1发过来的hello(rpc)后,会把其peerID加入到pubsub.topics[topic]中,以便后续publish或者Subscribe该topic的时候,找得到订阅的peer。
Message封装的是publish消息,其中From是publisher;Data是内容数据;Seqno是序列号,用于标识版本,每次publish,Seqno加1(pubsub.counter++);TopicIDs是topic集;Signature是签名。
ControlMessage封装的是控制消息,分别是IWant, IHave、IGraft和IPrune, 其中后3者以topic为单位。
ipfs pubsub代码解读相关推荐
- IPFS pubsub功能的使用
什么是 pubsub? 发布订阅模式(Publish–subscribe pattern),最早是由苹果公司在 Mac OS 引入. 消息的发送者(publishers)不直接将消息发送给接收者(su ...
- 200行代码解读TDEngine背后的定时器
作者 | beyondma来源 | CSDN博客 导读:最近几周,本文作者几篇有关陶建辉老师最新的创业项目-TdEngine代码解读文章出人意料地引起了巨大的反响,原以为C语言已经是昨日黄花,不过从读 ...
- 装逼一步到位!GauGAN代码解读来了
↑↑↑关注后"星标"Datawhale 每日干货 & 每月组队学习,不错过 Datawhale干货 作者:游璐颖,福州大学,Datawhale成员 AI神笔马良 如何装逼一 ...
- Unet论文解读代码解读
论文地址:http://www.arxiv.org/pdf/1505.04597.pdf 论文解读 网络 架构: a.U-net建立在FCN的网络架构上,作者修改并扩大了这个网络框架,使其能够使用很少 ...
- Lossless Codec---APE代码解读系列(二)
APE file 一些概念 APE代码解读系列(一) APE代码解读系列(三) 1. 先要了解APE compression level APE主要有5level, 分别是: CompressionL ...
- RT-Thread 学习笔记(五)—— RTGUI代码解读
---恢复内容开始--- RT-Thread 版本:2.1.0 RTGUI相关代码解读,仅为自己学习记录,若有错误之处,请告知maoxudong0813@163.com,不胜感激! GUI流程: ma ...
- vins 解读_代码解读 | VINS 视觉前端
AI 人工智能 代码解读 | VINS 视觉前端 本文作者是计算机视觉life公众号成员蔡量力,由于格式问题部分内容显示可能有问题,更好的阅读体验,请查看原文链接:代码解读 | VINS 视觉前端 v ...
- BERT:代码解读、实体关系抽取实战
目录 前言 一.BERT的主要亮点 1. 双向Transformers 2.句子级别的应用 3.能够解决的任务 二.BERT代码解读 1. 数据预处理 1.1 InputExample类 1.2 In ...
- shfflenetv2代码解读
shufflenetv2代码解读 目录 shufflenetv2代码解读 概述 shufflenetv2网络结构图 shufflenetv2架构参数 shufflenetv2代码细节分析 概述 shu ...
最新文章
- excel文件无法打印提示内存不足_三星打印机无法扫描文件?来看看她的详细解决办法...
- python如何在官网下载1005无标题,如何安装python cairo?
- linux配置chrony时间同步
- 《Linux内核设计与实现》读书笔记(十二)- 内存管理
- Python定时任务调度——APScheduler
- [html] 说说你对WEB标准和W3C的理解与认识?
- 蓝桥杯 第七届 JAVA B组 凑算式
- 自动生成mybatis代码
- autosar架构_(1)Testing-Autosar架构及模块描述
- IIC原理超详细讲解---值得一看
- ARM嵌入式系统中的体系结构
- cmd命令行激活win7
- zcu102_14_Zynq在Standalone下使用uGUI
- linux网站权限恢复,RMAN异机恢复——备份集权限问题
- JavaScrip笔记心得(持续更新)
- Python手撸机器学习系列(十五):简单神经网络
- java基于微信小程序的校园二手闲置商品交易平台 uinapp 计算机毕业设计
- 大数据的分布式数据库技术的对比
- pku 1265 Area
- 北京时间与UTC时间转换
热门文章
- unreal4特性介绍
- Unreal4+Qt+Plugins(unrealcv)安装教程
- 日记记事本java实训,黑马学习日记 GUI实现简单记事本功能
- .jnlp 文件打开方式
- 仿钉钉考勤统计页面的日历组件,通过日历展示每日考勤打卡情况,支持在日历上打两种不同类型的点,大致适配各种分辨率效果图
- HTML简单动画制作
- 独家爆料:创宇云与小鸟云的故事
- 青少年计算机等级测试内容,青少年人工智能技术水平测试一级等级考试介绍
- exponential backoff algorithm
- 对物联网的感悟_物联网心得体会总结