前言

在openedge-hub模块启动源码浅析——百度BIE边缘侧openedge项目源码阅读(1)一文中浅析了openedge-hub模块的启动过程,openedge-hub为每一个连接的请求创建一个实体——session,这个实体负责创建后会话的处理(这部分代码位于openedge/openedge-hub/session/session_handle.go),比如说处理连接、发布、发布确认、订阅、取消订阅、断开连接等请求,下面分别进行解析。

预处理

当接收到来自客户端的连接请求后,openedge-hub底层依赖的gomqtt库为其创建了一个连接,openedge-hub在其之上封装了session及其session相关的操作。
在创建了session后,还需要进行如下处理如下代码:

pkt, err = s.conn.Receive()if err != nil {if !s.tomb.Alive() {return}s.log.WithError(err).Warnf("failed to reveive message")s.close(true)return}if _, ok := pkt.(*packet.Connect); !ok && s.authorizer == nil {s.log.Errorf("only connect packet allowed before auth")s.close(true)return}

这段代码是位于session_handle中的Handle方法中,也是后续所有种类packet处理逻辑的入口,主要是通过阻塞接收请求,并看一下这个会话有没有授权,代码中也有显示,只有Connect类型的请求才可以不需要授权,其他必须要求授权。

Connect处理

处理连接的入口是Handle方法的如下部分:

func (s *session) Handle() {···for {pkt, err = s.conn.Receive()···switch p := pkt.(type) {case *packet.Connect:s.log.Debugln("received:", p.Type())err = s.onConnect(p)···}}
}

再继续看onConnect方法:

func (s *session) onConnect(p *packet.Connect) error {···authorizer := s.manager.auth.AuthenticateAccount(p.Username, p.Password)if authorizer == nil {s.sendConnack(packet.BadUsernameOrPassword)return fmt.Errorf("username (%s) or password not permitted", p.Username)}s.authorizer = authorizerp.Will != nil {//处理一下will中的publish是否允许,与onPublish逻辑差不多,后面讲···}var err errors.clientID = p.ClientIDs.clean = p.CleanSessionif p.ClientID == "" {s.id = common.PrefixTmp + uuid.Generate().String()s.clean = true} else {s.id = common.PrefixSess + p.ClientID}err = s.manager.register(s)···err = s.sendConnack(packet.ConnectionAccepted)if err != nil {return err}err = s.manager.rules.StartRule(s.id)···return nil
}

这段代码分为三个部分,第一部分进行验证授权,第二个部分是向sessionManager和ruleManager注册(ruleManager注册的过程包括在sessionManager中),第三部分就是向客户端返回连接成功并开启rule。

第一部分:验证授权

在sessionManager进行初始化的时候,曾经创建过一个Auth对象,这个对象保存了用户名、密码、以及对应每个用户的发布、订阅的权限,AuthenticateAccount方法就是根据用户名、密码验证是不是正确,并返回相应的authorizer:

// AuthenticateAccount auth client account, then return authorizer if pass
func (a *Auth) AuthenticateAccount(username, password string) *Authorizer {_account, ok := a.accounts[username]if ok && len(password) > 0 && strings.Compare(encodePassword(password), _account.Password) == 0 {return _account.Authorizer}return nil
}

authorizer中存储了该用户发布、订阅的权限,后面用户发布、订阅的时候都要用这个对象进行发布、订阅的权限验证。
顺便这里有一个p.will的处理,这个处理就是在连接的时候会发送可能订阅的一些内容,这里也需要这个authorizer进行权限的验证。

第二部分:sessionManager注册

这里为session赋予id,就是一个前缀加上客户端id(如果没有客户端id就随机生成),然后向sessionManager进行注册:

// Called by session during onConnect
func (m *Manager) register(sess *session) error {···m.sessions.Set(sess.id, sess)//Blank: 使得消息能够返回客户端的关键return m.rules.AddRuleSess(sess.id, !sess.clean, sess.publish, sess.republish)
}// AddRuleSess adds a new rule for session during running
func (m *Manager) AddRuleSess(id string, persistent bool, publish, republish common.Publish) error {···m.rules.Set(id, newRuleSess(id, persistent, m.broker, m.trieq0, publish, republish))return nil
}func newRuleSess(id string, p bool, b broker, r *router.Trie, publish, republish common.Publish) base {return newRuleBase(id, p, b, r, publish, republish)
}

sessionManager注册就是向sessionManager中保存session的实例,之后调用ruleManager添加一个rule(可以看到这里最终调用的还是newRuleBase方法,与之前ruleManager创建RuleMsgQ0和RuleTopic的方法一样),这个rule的id就是session的id,这里要注意两个参数,publish、republish方法,这两个参数最终其实是传递到这个Rule的msgchan中:

func (s *session) publish(msg common.Message) {pub := new(packet.Publish)pub.Message.QOS = packet.QOS(msg.TargetQOS)pub.Message.Topic = msg.TargetTopicpub.Message.Payload = msg.Payloadpub.Message.Retain = msg.Retainif msg.TargetQOS == 1 {pid := s.pids.Set(&msg)pub.ID = packet.ID(pid)}if err := s.send(pub, true); err != nil {s.close(true)}
}

上面的方法就是return m.rules.AddRuleSess(sess.id, !sess.clean, sess.publish, sess.republish)语句中的publish参数,结合上一篇博客,很容易想到,因为RuleMsgQ0在接收到broker的消息后,会在Trie树中找到对应的sinksub ,sinksub中会进行封装,这里就把消息的TargetTopic赋值了,如果是在RuleTopic中的消息,TargetTopic会变成相应sinksub的TargetTopic(也就是进行转发的路径),如果不是在RuleTopic的话,那么TargetTopic就会与message的Topic相同:

// Flow flows message
func (s *sinksub) Flow(msg common.Message) {// set target topicif s.ttopic != "" {msg.TargetTopic = s.ttopic} else {msg.TargetTopic = msg.Topic}···
}

不论怎样,最终到达publish方法的Message的TargetTopic就是要发送的Topic,比如说定义了/a的消息发送到/a/b中,那么最终到达publish方法中的Message的TargetTopic就是/a/b,publish方法最终会err := s.send(pub, true)将消息发送到客户端。

第三部分:开启rule

首先到达这一步时会先向客户端发送连接通过的信息,之后开启这个客户端的rule,其实与开启RuleMsgQ0和RuleTopic的逻辑是相同的,其实都是开启msgchan,等待msgchan中channel的另一端发送数据,当接收到消息后会调用msgchan的process方法:

// ProcessingQ1 processing message with QOS=1
func (c *msgchan) goProcessingQ1() error {···
loop:for {select {case <-c.msgtomb.Dying():break loopcase msg := <-c.msgq1:c.process(msg)}}···
}func (c *msgchan) process(msg *common.Message) {if msg.QOS == 0 {c.publish(*msg)return}···
}

可以看到接收到消息后就进入了process方法,process方法又会调用c.publish方法,这个方法就是刚刚创建rule时传入的那个publish方法。

publish处理

当Handle方法接收到Publish类型的请求时,那么就进入了Publish的处理:

func (s *session) Handle() {···for {pkt, err = s.conn.Receive()···switch p := pkt.(type) {case *packet.Connect:s.log.Debugln("received:", p.Type())err = s.onConnect(p)···}}
}

接下来进入onConnect方法:

func (s *session) onPublish(p *packet.Publish) error {if _, ok := s.permittedPublishTopics[p.Message.Topic]; !ok {// TODO: remove?if !common.PubTopicValidate(p.Message.Topic) {return fmt.Errorf("publish topic (%s) invalid", p.Message.Topic)}if !s.authorizer.Authorize(auth.Publish, p.Message.Topic) {return fmt.Errorf("publish topic (%s) not permitted", p.Message.Topic)}s.permittedPublishTopics[p.Message.Topic] = struct{}{}}···msg := common.NewMessage(uint32(p.Message.QOS), p.Message.Topic, p.Message.Payload, s.clientID)···s.manager.flow(msg)return nil
}

首先进行鉴定这个主题的消息是否有权限进行发布,session中有一个属性permittedPublishTopics map[string]struct{}就是用来存储已经授权可以发布的主题,如果这个主题是第一次发布,那么就进入第一个if-else中。
在这里面先通过PubTopicValidate方法鉴定要发布主题的名称是不是合法,看看其具体逻辑:


const (// topic validate fieldsMaxSlashCount          = 8MaxTopicNameLen        = 255// wildcard topic fieldsTopicSeparator    = "/"SingleWildCard    = "+"MultipleWildCard  = "#"SysCmdPrefix      = "$"
)
// PubTopicValidate validate MQTT publish topic
func PubTopicValidate(topic string) bool {if topic == "" {return false}if len(topic) > MaxTopicNameLen || strings.Contains(topic, "\u0000") ||strings.Count(topic, TopicSeparator) > MaxSlashCount {return false}if ContainsWildcard(topic) {return false}if isSysTopic(topic) {return false}return true
}

从上面的代码中可以看到一些规定:

  1. topic不能为空(“”)
  2. 长度不能超过255
  3. 使用“/“字符不能超过8次
  4. 不能包含“\u0000”字符
  5. 不能包含“+”字符
  6. 不能包含“$”字符

验证过发布主题的名称后,通过之前connect中为session添加的authorizer验证该主题是否具有发布的权限,如果具有发布的权限那么在permittedPublishTopics中记录下来,这样下次发布该主题的消息就不用再重新认证了。
验证过后,就将构建一个Message,然后将Message使用s.manager.flow(调用session对应的sessionManager的flow方法)把进行消息发布:

// Flow flows message to broker
func (b *Broker) Flow(msg *common.Message) {···select {case b.msgQ0Chan <- msg:case <-b.tomb.Dying():b.log.Debugf("flow message (qos=0) failed since broker closed")}
}

这个方法是之前在构造sessionManager的时候赋予sessionManager的,也就是s.manager.flow执行的方法,其目的就是向broker的channel(msgQ0Chan)发送消息,之后由RuleMsgQ0来进行消息的消费:

func (s *sink) goRoutingQ0() error {var msg *common.Messagefor {select {case <-s.tomb.Dying():return nilcase msg = <-s.broker.MsgQ0Chan():matches := s.trieq0.MatchUnique(msg.Topic)for _, sub := range matches {sub.Flow(*msg)}}}
}

这个方法的具体逻辑在上一篇openedge-hub模块启动源码浅析——百度BIE边缘侧openedge项目源码阅读(1)有详细说到,这里简单的说一下就是RuleMsgQ0在这个方法中读取消息后,遍历在Trie树中注册的sinksub,因为sinksub中拥有对应session的rule的msgchan,然后通过这个msgchan把相应消息发送到对应的sinksub,如果sinksub对应的是RuleTopic的话会依据配置文件中的subscription来修改消息,并将消息发送到broker中,继续循环上述的过程,如果sinksub对应的是session的rule的话,那么对应session的msgchan读取这个消息,并调用publish方法将消息发送回客户端。

Subscribe处理

当Handle方法接收到Subscribe类型的请求时,那么就进入了Subscribe的处理:

func (s *session) Handle() {···for {pkt, err = s.conn.Receive()···switch p := pkt.(type) {case *packet.Subscribe:s.log.Debugf("received: %s, subs: %v", p.Type(), p.Subscriptions)err = s.onSubscribe(p)···}}
}

下面进入onSubscribe方法中:

func (s *session) onSubscribe(p *packet.Subscribe) error {ack := packet.NewSuback()rv := s.genSubAck(p.Subscriptions)for i, sub := range p.Subscriptions {if rv[i] == packet.QOSFailure {s.log.Errorf("failed to subscribe topic (%s)", sub.Topic)continue}if _, ok := s.subs[sub.Topic]; !ok {err := s.manager.rules.AddSinkSub(s.id, s.id, uint32(sub.QOS), sub.Topic, uint32(sub.QOS), "")···s.log.Infof("topic (%s) subscribed", sub.Topic)s.subs[sub.Topic] = sub···} else {if s.subs[sub.Topic].QOS != sub.QOS {//重新向Trie树中添加sinksub,之后移除原有的sinksub···}}}ack.ID = p.IDack.ReturnCodes = rverr := s.send(ack, true)if err != nil {return err}return s.sendRetainMessage(p)
}

这里构建了一个rv,这个主要是验证订阅的每一个subscription合法性,这里就不进入源码看了,直接翻译过来.

订阅时主题合法性检验:

  1. topic不能为“”
  2. 主题名称长度不能超过255
  3. 主题名称不能包含“\u0000”
  4. 主题名称中“/”不能超过8个
  5. 主题名称中的“#”只能位于最后以“/”后
  6. “#”字符不与其他字符混用,比如/ab#c、/c#、/#a都是错误的,/…/#正确
  7. “+”字符不与其他字符混用,比如/ab+c、/c+、/+c都是错误的,/…/+/…正确

在genSubAck方法中对于不符合上述规定的subscription都设置rv[index]=QOSFailure,之后对于通过合法性检验的subscription向RuleManager中添加sinkSub,这样就把sinkSub添加到RuleManager中(具体添加过程,在上一篇博客中详细讲到),这样在RuleMsgQ0从broker的channel中读取到数据后就可以根据订阅的主题找到对应的sinksub,并且发送消息了。

Pingreq处理

当Handle方法接收到Pingreq类型的请求时,那么就进入了Pingreq的处理:

func (s *session) Handle() {···for {pkt, err = s.conn.Receive()···switch p := pkt.(type) {case *packet.Pingreq:s.log.Debugln("received:", p.Type())err = s.onPingreq(p)···}}
}

进入到onPingreq方法:

func (s *session) onPingreq(p *packet.Pingreq) error {return s.send(packet.NewPingresp(), true)
}

其实就是发送一个包,表明能够ping到。

Unsubscribe处理

当Handle方法接收到Pingreq类型的请求时,那么就进入了Pingreq的处理:

func (s *session) Handle() {···for {pkt, err = s.conn.Receive()···switch p := pkt.(type) {case *packet.Unsubscribe:s.log.Debugf("received: %s, topics: %v", p.Type(), p.Topics)err = s.onUnsubscribe(p)···}}
}

下面进入到onUnsubscribe方法中:

func (s *session) onUnsubscribe(p *packet.Unsubscribe) error {···for _, topic := range p.Topics {if _, ok := s.subs[topic]; ok {err := s.manager.rules.RemoveSinkSub(s.id, topic)}···}···return s.send(ack, true)
}

其实就是从RuleManager的Trie树中剔除掉当初session(其实应该是session对应的rule)设置的sinksub。

Disconnect处理

当Handle方法接收到Disconnect类型的请求时,那么就进入了Disconnect的处理:

func (s *session) Handle() {···for {pkt, err = s.conn.Receive()···case *packet.Disconnect:s.log.Debugln("received:", p.Type())s.close(false)return···}}
}

下面进入到close方法中:

// Close closes this session, only called by session manager
func (s *session) close(will bool) {s.once.Do(func() {s.tomb.Kill(nil)···s.manager.remove(s.id)if will {s.sendWillMessage()}s.conn.Close()···})
}// Called by session when error raises
func (m *Manager) remove(id string) {m.sessions.Remove(id)err := m.rules.RemoveRule(id)if err != nil {m.log.WithError(err).Debugf("failed to remove rule")}
}

其实就是移除sessionManager中保存的session、ruleManager中保存的rule,并且移除掉Trie树中所有的rule相关的sinksub(是在err := m.rules.RemoveRule(id)中执行的),最后关掉连接,关掉协程。

非法消息类型处理

当之前消息类型处理中出现异常或者消息类型不合法(即不是规定的类型),那么就会关掉这个session的连接:

func (s *session) Handle() {···if err != nil {s.log.Errorf(err.Error())s.close(true)break}
}

openedge-hub模块请求处理源码浅析——百度BIE边缘侧openedge项目源码阅读(2)相关推荐

  1. openedge-hub模块启动源码浅析——百度BIE边缘侧openedge项目源码阅读(1)

    前言 因为最近项目需要用到边缘计算,结合百度的openedge进行开发,openedge目前主要功能为结合docker容器实现边缘计算,具体内容官网很多,其架构中,openedge-hub作为所有模块 ...

  2. openedge-function模块浅析——百度BIE边缘侧openedge项目源码阅读(3)

    前言 中断了一段时间,发现前面分析的hub模块的源码拉错分枝了(对,我就是个菜鸡),不过大致流程差不多,有时间改一下.这次分析openedge-function模块,openedge-function ...

  3. java外挂源码_2.7 万 Star!Github 项目源码辅助阅读神器

    [导语]:一款用于将 Github 项目代码以树形格式展示的浏览器插件. 简介 大家平时逛 GitHub 是否会觉得查看源代码的体验十分糟糕?项目文件需要一层层点击,返回也要一层层返回.这样不直观,也 ...

  4. 音乐外链生成源码php,百度网盘音乐外链源码 | 小楼昨夜又东风

    百度网盘音乐外链方法,以前一直用破博客的,后来用的人多了,导致虚拟主机CUP占用过大,暂时撤掉外链了.不过作者提供了源码,原文链接:http://www.poboke.com/study/the-so ...

  5. C# QRcode 二维码生成与读取实例 付完整项目源码

    [实例简介]zxing示例 其中包含了windowsphone以及 qrcode lib 以及winform的demo 文件:590m.com/f/25127180-496915780-5854ad( ...

  6. Java中资源文件获取源码浅析

    Java中资源文件获取源码浅析 文章目录 Java中资源文件获取源码浅析 JDK11 Class.getResource(String) Class.getClassLoader().getResou ...

  7. Android项目源码分享

    ├─android web应用 │      jqmDemo_static.zip │      jqmMobileDemo-master.zip │      jqmMobileDemo1_1-ma ...

  8. Python模拟二维码登录百度

    模拟二维码登录百度 写在前面 准备工作 二维码地址 登录状态 获取gid 登录参数 代码部分 二维码展示 获取cookie 完整代码 写在后面 写在前面 前段时间写了利用BDUSS到达百度首页,这一次 ...

  9. 开源项目源码分析(Kickstarter-iOS )(一)

    开源项目源码分析(Kickstarter-iOS )(一) 1.Kickstarter开源项目简介 2. Kickstarter项目结构 2.1 Makefile 文件 2.2 Git submodu ...

最新文章

  1. MATLAB的图像文件怎么标字母,用matlab对图片进行字符识别,只要能识别字母就行…十万火急!!请各位大侠们多多帮忙…...
  2. 涂鸦智能 dubbo-go 亿级流量的实践与探索
  3. 301缓存重定向?301 Moved Permanently (from disk cache)
  4. 移动端页面0.5px border的实现
  5. 单目视觉定位测距的两种方式(转载)
  6. 2019上半年系统集成项目管理工程师下午案例分析真题与答案解析
  7. 关于.dll' could not be found 的问题以及解决方案
  8. 叶片制成切片的结构示意图_更集成的发动机!洞悉UTC 3D打印整体式半叶片
  9. AdRotator,广告轮播 #1 -- 透过XML档来作设定。
  10. Excel如何批量根据身份证号码查询出地址
  11. 百度地图经纬度获取标点与城市编码
  12. 《又到毕业季》MATLAB GUI 基础控件与交互
  13. 机器学习-凸优化理论-课堂笔记
  14. grads右侧的色标图注画的命令
  15. 谈谈promise,谈谈微任务
  16. Sonatype Nexus安装
  17. MAC地址的介绍(单播、广播、组播、数据收发)
  18. 基于CommonsCollections4的Gadget分析
  19. 两种方法教你在postman设置请求里带动态token
  20. com.alibaba.fastjson.JSONException

热门文章

  1. c语言坦克大战程序设计,用纯C语言实现坦克大战
  2. 使用Google reCAPTCHA进行人机验证
  3. 卡方分布(Chi-Squared Distribution)
  4. 在Mac上怎么使用Charles进行抓包
  5. 窗口根据屏幕分辨率自动调整大小
  6. 19.猜数字的游戏:随机生成数字与输入数字进行比较
  7. HR唠家常式的套路题
  8. pytorch Load部分weights
  9. android真机测试什么不同,android真机测试闪退
  10. Hp服务器系统盘被热拔插会怎么样,被骗十几年 原来这些设备不能热插拔!