openedge-hub模块请求处理源码浅析——百度BIE边缘侧openedge项目源码阅读(2)
前言
在openedge-hub模块启动源码浅析——百度BIE边缘侧openedge项目源码阅读(1)一文中浅析了openedge-hub模块的启动过程,openedge-hub为每一个连接的请求创建一个实体——session,这个实体负责创建后会话的处理(这部分代码位于openedge/openedge-hub/session/session_handle.go),比如说处理连接、发布、发布确认、订阅、取消订阅、断开连接等请求,下面分别进行解析。
预处理
当接收到来自客户端的连接请求后,openedge-hub底层依赖的gomqtt库为其创建了一个连接,openedge-hub在其之上封装了session及其session相关的操作。
在创建了session后,还需要进行如下处理如下代码:
pkt, err = s.conn.Receive()if err != nil {if !s.tomb.Alive() {return}s.log.WithError(err).Warnf("failed to reveive message")s.close(true)return}if _, ok := pkt.(*packet.Connect); !ok && s.authorizer == nil {s.log.Errorf("only connect packet allowed before auth")s.close(true)return}
这段代码是位于session_handle中的Handle方法中,也是后续所有种类packet处理逻辑的入口,主要是通过阻塞接收请求,并看一下这个会话有没有授权,代码中也有显示,只有Connect类型的请求才可以不需要授权,其他必须要求授权。
Connect处理
处理连接的入口是Handle方法的如下部分:
func (s *session) Handle() {···for {pkt, err = s.conn.Receive()···switch p := pkt.(type) {case *packet.Connect:s.log.Debugln("received:", p.Type())err = s.onConnect(p)···}}
}
再继续看onConnect方法:
func (s *session) onConnect(p *packet.Connect) error {···authorizer := s.manager.auth.AuthenticateAccount(p.Username, p.Password)if authorizer == nil {s.sendConnack(packet.BadUsernameOrPassword)return fmt.Errorf("username (%s) or password not permitted", p.Username)}s.authorizer = authorizerp.Will != nil {//处理一下will中的publish是否允许,与onPublish逻辑差不多,后面讲···}var err errors.clientID = p.ClientIDs.clean = p.CleanSessionif p.ClientID == "" {s.id = common.PrefixTmp + uuid.Generate().String()s.clean = true} else {s.id = common.PrefixSess + p.ClientID}err = s.manager.register(s)···err = s.sendConnack(packet.ConnectionAccepted)if err != nil {return err}err = s.manager.rules.StartRule(s.id)···return nil
}
这段代码分为三个部分,第一部分进行验证授权,第二个部分是向sessionManager和ruleManager注册(ruleManager注册的过程包括在sessionManager中),第三部分就是向客户端返回连接成功并开启rule。
第一部分:验证授权
在sessionManager进行初始化的时候,曾经创建过一个Auth对象,这个对象保存了用户名、密码、以及对应每个用户的发布、订阅的权限,AuthenticateAccount方法就是根据用户名、密码验证是不是正确,并返回相应的authorizer:
// AuthenticateAccount auth client account, then return authorizer if pass
func (a *Auth) AuthenticateAccount(username, password string) *Authorizer {_account, ok := a.accounts[username]if ok && len(password) > 0 && strings.Compare(encodePassword(password), _account.Password) == 0 {return _account.Authorizer}return nil
}
authorizer中存储了该用户发布、订阅的权限,后面用户发布、订阅的时候都要用这个对象进行发布、订阅的权限验证。
顺便这里有一个p.will的处理,这个处理就是在连接的时候会发送可能订阅的一些内容,这里也需要这个authorizer进行权限的验证。
第二部分:sessionManager注册
这里为session赋予id,就是一个前缀加上客户端id(如果没有客户端id就随机生成),然后向sessionManager进行注册:
// Called by session during onConnect
func (m *Manager) register(sess *session) error {···m.sessions.Set(sess.id, sess)//Blank: 使得消息能够返回客户端的关键return m.rules.AddRuleSess(sess.id, !sess.clean, sess.publish, sess.republish)
}// AddRuleSess adds a new rule for session during running
func (m *Manager) AddRuleSess(id string, persistent bool, publish, republish common.Publish) error {···m.rules.Set(id, newRuleSess(id, persistent, m.broker, m.trieq0, publish, republish))return nil
}func newRuleSess(id string, p bool, b broker, r *router.Trie, publish, republish common.Publish) base {return newRuleBase(id, p, b, r, publish, republish)
}
sessionManager注册就是向sessionManager中保存session的实例,之后调用ruleManager添加一个rule(可以看到这里最终调用的还是newRuleBase方法,与之前ruleManager创建RuleMsgQ0和RuleTopic的方法一样),这个rule的id就是session的id,这里要注意两个参数,publish、republish方法,这两个参数最终其实是传递到这个Rule的msgchan中:
func (s *session) publish(msg common.Message) {pub := new(packet.Publish)pub.Message.QOS = packet.QOS(msg.TargetQOS)pub.Message.Topic = msg.TargetTopicpub.Message.Payload = msg.Payloadpub.Message.Retain = msg.Retainif msg.TargetQOS == 1 {pid := s.pids.Set(&msg)pub.ID = packet.ID(pid)}if err := s.send(pub, true); err != nil {s.close(true)}
}
上面的方法就是return m.rules.AddRuleSess(sess.id, !sess.clean, sess.publish, sess.republish)
语句中的publish参数,结合上一篇博客,很容易想到,因为RuleMsgQ0在接收到broker的消息后,会在Trie树中找到对应的sinksub ,sinksub中会进行封装,这里就把消息的TargetTopic赋值了,如果是在RuleTopic中的消息,TargetTopic会变成相应sinksub的TargetTopic(也就是进行转发的路径),如果不是在RuleTopic的话,那么TargetTopic就会与message的Topic相同:
// Flow flows message
func (s *sinksub) Flow(msg common.Message) {// set target topicif s.ttopic != "" {msg.TargetTopic = s.ttopic} else {msg.TargetTopic = msg.Topic}···
}
不论怎样,最终到达publish方法的Message的TargetTopic就是要发送的Topic,比如说定义了/a的消息发送到/a/b中,那么最终到达publish方法中的Message的TargetTopic就是/a/b,publish方法最终会err := s.send(pub, true)
将消息发送到客户端。
第三部分:开启rule
首先到达这一步时会先向客户端发送连接通过的信息,之后开启这个客户端的rule,其实与开启RuleMsgQ0和RuleTopic的逻辑是相同的,其实都是开启msgchan,等待msgchan中channel的另一端发送数据,当接收到消息后会调用msgchan的process方法:
// ProcessingQ1 processing message with QOS=1
func (c *msgchan) goProcessingQ1() error {···
loop:for {select {case <-c.msgtomb.Dying():break loopcase msg := <-c.msgq1:c.process(msg)}}···
}func (c *msgchan) process(msg *common.Message) {if msg.QOS == 0 {c.publish(*msg)return}···
}
可以看到接收到消息后就进入了process方法,process方法又会调用c.publish方法,这个方法就是刚刚创建rule时传入的那个publish方法。
publish处理
当Handle方法接收到Publish类型的请求时,那么就进入了Publish的处理:
func (s *session) Handle() {···for {pkt, err = s.conn.Receive()···switch p := pkt.(type) {case *packet.Connect:s.log.Debugln("received:", p.Type())err = s.onConnect(p)···}}
}
接下来进入onConnect方法:
func (s *session) onPublish(p *packet.Publish) error {if _, ok := s.permittedPublishTopics[p.Message.Topic]; !ok {// TODO: remove?if !common.PubTopicValidate(p.Message.Topic) {return fmt.Errorf("publish topic (%s) invalid", p.Message.Topic)}if !s.authorizer.Authorize(auth.Publish, p.Message.Topic) {return fmt.Errorf("publish topic (%s) not permitted", p.Message.Topic)}s.permittedPublishTopics[p.Message.Topic] = struct{}{}}···msg := common.NewMessage(uint32(p.Message.QOS), p.Message.Topic, p.Message.Payload, s.clientID)···s.manager.flow(msg)return nil
}
首先进行鉴定这个主题的消息是否有权限进行发布,session中有一个属性permittedPublishTopics map[string]struct{}
就是用来存储已经授权可以发布的主题,如果这个主题是第一次发布,那么就进入第一个if-else中。
在这里面先通过PubTopicValidate方法鉴定要发布主题的名称是不是合法,看看其具体逻辑:
const (// topic validate fieldsMaxSlashCount = 8MaxTopicNameLen = 255// wildcard topic fieldsTopicSeparator = "/"SingleWildCard = "+"MultipleWildCard = "#"SysCmdPrefix = "$"
)
// PubTopicValidate validate MQTT publish topic
func PubTopicValidate(topic string) bool {if topic == "" {return false}if len(topic) > MaxTopicNameLen || strings.Contains(topic, "\u0000") ||strings.Count(topic, TopicSeparator) > MaxSlashCount {return false}if ContainsWildcard(topic) {return false}if isSysTopic(topic) {return false}return true
}
从上面的代码中可以看到一些规定:
- topic不能为空(“”)
- 长度不能超过255
- 使用“/“字符不能超过8次
- 不能包含“\u0000”字符
- 不能包含“+”字符
- 不能包含“$”字符
验证过发布主题的名称后,通过之前connect中为session添加的authorizer验证该主题是否具有发布的权限,如果具有发布的权限那么在permittedPublishTopics中记录下来,这样下次发布该主题的消息就不用再重新认证了。
验证过后,就将构建一个Message,然后将Message使用s.manager.flow
(调用session对应的sessionManager的flow方法)把进行消息发布:
// Flow flows message to broker
func (b *Broker) Flow(msg *common.Message) {···select {case b.msgQ0Chan <- msg:case <-b.tomb.Dying():b.log.Debugf("flow message (qos=0) failed since broker closed")}
}
这个方法是之前在构造sessionManager的时候赋予sessionManager的,也就是s.manager.flow
执行的方法,其目的就是向broker的channel(msgQ0Chan)发送消息,之后由RuleMsgQ0来进行消息的消费:
func (s *sink) goRoutingQ0() error {var msg *common.Messagefor {select {case <-s.tomb.Dying():return nilcase msg = <-s.broker.MsgQ0Chan():matches := s.trieq0.MatchUnique(msg.Topic)for _, sub := range matches {sub.Flow(*msg)}}}
}
这个方法的具体逻辑在上一篇openedge-hub模块启动源码浅析——百度BIE边缘侧openedge项目源码阅读(1)有详细说到,这里简单的说一下就是RuleMsgQ0在这个方法中读取消息后,遍历在Trie树中注册的sinksub,因为sinksub中拥有对应session的rule的msgchan,然后通过这个msgchan把相应消息发送到对应的sinksub,如果sinksub对应的是RuleTopic的话会依据配置文件中的subscription来修改消息,并将消息发送到broker中,继续循环上述的过程,如果sinksub对应的是session的rule的话,那么对应session的msgchan读取这个消息,并调用publish方法将消息发送回客户端。
Subscribe处理
当Handle方法接收到Subscribe类型的请求时,那么就进入了Subscribe的处理:
func (s *session) Handle() {···for {pkt, err = s.conn.Receive()···switch p := pkt.(type) {case *packet.Subscribe:s.log.Debugf("received: %s, subs: %v", p.Type(), p.Subscriptions)err = s.onSubscribe(p)···}}
}
下面进入onSubscribe方法中:
func (s *session) onSubscribe(p *packet.Subscribe) error {ack := packet.NewSuback()rv := s.genSubAck(p.Subscriptions)for i, sub := range p.Subscriptions {if rv[i] == packet.QOSFailure {s.log.Errorf("failed to subscribe topic (%s)", sub.Topic)continue}if _, ok := s.subs[sub.Topic]; !ok {err := s.manager.rules.AddSinkSub(s.id, s.id, uint32(sub.QOS), sub.Topic, uint32(sub.QOS), "")···s.log.Infof("topic (%s) subscribed", sub.Topic)s.subs[sub.Topic] = sub···} else {if s.subs[sub.Topic].QOS != sub.QOS {//重新向Trie树中添加sinksub,之后移除原有的sinksub···}}}ack.ID = p.IDack.ReturnCodes = rverr := s.send(ack, true)if err != nil {return err}return s.sendRetainMessage(p)
}
这里构建了一个rv,这个主要是验证订阅的每一个subscription合法性,这里就不进入源码看了,直接翻译过来.
订阅时主题合法性检验:
- topic不能为“”
- 主题名称长度不能超过255
- 主题名称不能包含“\u0000”
- 主题名称中“/”不能超过8个
- 主题名称中的“#”只能位于最后以“/”后
- “#”字符不与其他字符混用,比如/ab#c、/c#、/#a都是错误的,/…/#正确
- “+”字符不与其他字符混用,比如/ab+c、/c+、/+c都是错误的,/…/+/…正确
在genSubAck方法中对于不符合上述规定的subscription都设置rv[index]=QOSFailure
,之后对于通过合法性检验的subscription向RuleManager中添加sinkSub,这样就把sinkSub添加到RuleManager中(具体添加过程,在上一篇博客中详细讲到),这样在RuleMsgQ0从broker的channel中读取到数据后就可以根据订阅的主题找到对应的sinksub,并且发送消息了。
Pingreq处理
当Handle方法接收到Pingreq类型的请求时,那么就进入了Pingreq的处理:
func (s *session) Handle() {···for {pkt, err = s.conn.Receive()···switch p := pkt.(type) {case *packet.Pingreq:s.log.Debugln("received:", p.Type())err = s.onPingreq(p)···}}
}
进入到onPingreq方法:
func (s *session) onPingreq(p *packet.Pingreq) error {return s.send(packet.NewPingresp(), true)
}
其实就是发送一个包,表明能够ping到。
Unsubscribe处理
当Handle方法接收到Pingreq类型的请求时,那么就进入了Pingreq的处理:
func (s *session) Handle() {···for {pkt, err = s.conn.Receive()···switch p := pkt.(type) {case *packet.Unsubscribe:s.log.Debugf("received: %s, topics: %v", p.Type(), p.Topics)err = s.onUnsubscribe(p)···}}
}
下面进入到onUnsubscribe方法中:
func (s *session) onUnsubscribe(p *packet.Unsubscribe) error {···for _, topic := range p.Topics {if _, ok := s.subs[topic]; ok {err := s.manager.rules.RemoveSinkSub(s.id, topic)}···}···return s.send(ack, true)
}
其实就是从RuleManager的Trie树中剔除掉当初session(其实应该是session对应的rule)设置的sinksub。
Disconnect处理
当Handle方法接收到Disconnect类型的请求时,那么就进入了Disconnect的处理:
func (s *session) Handle() {···for {pkt, err = s.conn.Receive()···case *packet.Disconnect:s.log.Debugln("received:", p.Type())s.close(false)return···}}
}
下面进入到close方法中:
// Close closes this session, only called by session manager
func (s *session) close(will bool) {s.once.Do(func() {s.tomb.Kill(nil)···s.manager.remove(s.id)if will {s.sendWillMessage()}s.conn.Close()···})
}// Called by session when error raises
func (m *Manager) remove(id string) {m.sessions.Remove(id)err := m.rules.RemoveRule(id)if err != nil {m.log.WithError(err).Debugf("failed to remove rule")}
}
其实就是移除sessionManager中保存的session、ruleManager中保存的rule,并且移除掉Trie树中所有的rule相关的sinksub(是在err := m.rules.RemoveRule(id)
中执行的),最后关掉连接,关掉协程。
非法消息类型处理
当之前消息类型处理中出现异常或者消息类型不合法(即不是规定的类型),那么就会关掉这个session的连接:
func (s *session) Handle() {···if err != nil {s.log.Errorf(err.Error())s.close(true)break}
}
openedge-hub模块请求处理源码浅析——百度BIE边缘侧openedge项目源码阅读(2)相关推荐
- openedge-hub模块启动源码浅析——百度BIE边缘侧openedge项目源码阅读(1)
前言 因为最近项目需要用到边缘计算,结合百度的openedge进行开发,openedge目前主要功能为结合docker容器实现边缘计算,具体内容官网很多,其架构中,openedge-hub作为所有模块 ...
- openedge-function模块浅析——百度BIE边缘侧openedge项目源码阅读(3)
前言 中断了一段时间,发现前面分析的hub模块的源码拉错分枝了(对,我就是个菜鸡),不过大致流程差不多,有时间改一下.这次分析openedge-function模块,openedge-function ...
- java外挂源码_2.7 万 Star!Github 项目源码辅助阅读神器
[导语]:一款用于将 Github 项目代码以树形格式展示的浏览器插件. 简介 大家平时逛 GitHub 是否会觉得查看源代码的体验十分糟糕?项目文件需要一层层点击,返回也要一层层返回.这样不直观,也 ...
- 音乐外链生成源码php,百度网盘音乐外链源码 | 小楼昨夜又东风
百度网盘音乐外链方法,以前一直用破博客的,后来用的人多了,导致虚拟主机CUP占用过大,暂时撤掉外链了.不过作者提供了源码,原文链接:http://www.poboke.com/study/the-so ...
- C# QRcode 二维码生成与读取实例 付完整项目源码
[实例简介]zxing示例 其中包含了windowsphone以及 qrcode lib 以及winform的demo 文件:590m.com/f/25127180-496915780-5854ad( ...
- Java中资源文件获取源码浅析
Java中资源文件获取源码浅析 文章目录 Java中资源文件获取源码浅析 JDK11 Class.getResource(String) Class.getClassLoader().getResou ...
- Android项目源码分享
├─android web应用 │ jqmDemo_static.zip │ jqmMobileDemo-master.zip │ jqmMobileDemo1_1-ma ...
- Python模拟二维码登录百度
模拟二维码登录百度 写在前面 准备工作 二维码地址 登录状态 获取gid 登录参数 代码部分 二维码展示 获取cookie 完整代码 写在后面 写在前面 前段时间写了利用BDUSS到达百度首页,这一次 ...
- 开源项目源码分析(Kickstarter-iOS )(一)
开源项目源码分析(Kickstarter-iOS )(一) 1.Kickstarter开源项目简介 2. Kickstarter项目结构 2.1 Makefile 文件 2.2 Git submodu ...
最新文章
- MATLAB的图像文件怎么标字母,用matlab对图片进行字符识别,只要能识别字母就行…十万火急!!请各位大侠们多多帮忙…...
- 涂鸦智能 dubbo-go 亿级流量的实践与探索
- 301缓存重定向?301 Moved Permanently (from disk cache)
- 移动端页面0.5px border的实现
- 单目视觉定位测距的两种方式(转载)
- 2019上半年系统集成项目管理工程师下午案例分析真题与答案解析
- 关于.dll' could not be found 的问题以及解决方案
- 叶片制成切片的结构示意图_更集成的发动机!洞悉UTC 3D打印整体式半叶片
- AdRotator,广告轮播 #1 -- 透过XML档来作设定。
- Excel如何批量根据身份证号码查询出地址
- 百度地图经纬度获取标点与城市编码
- 《又到毕业季》MATLAB GUI 基础控件与交互
- 机器学习-凸优化理论-课堂笔记
- grads右侧的色标图注画的命令
- 谈谈promise,谈谈微任务
- Sonatype Nexus安装
- MAC地址的介绍(单播、广播、组播、数据收发)
- 基于CommonsCollections4的Gadget分析
- 两种方法教你在postman设置请求里带动态token
- com.alibaba.fastjson.JSONException