欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

《Golang流媒体实战》系列的链接

  1. 体验开源项目lal
  2. 回源
  3. 转推和录制
  4. lalserver的启动源码阅读
  5. Golang流媒体实战之五:lal推流服务源码阅读
  6. Golang流媒体实战之六:lal拉流服务源码阅读

本篇概览

  • 本文是《Golang流媒体实战》系列的第六篇,经过前面两篇的源码阅读后,咱们逐渐进入深入学习的状态,本篇继续阅读关键代码:拉流服务
  • 为了高效准确的阅读拉流服务源码,本篇继续使用日志结合源码的阅读方式,具体改动后面会详细说明,总的来说就是了解lal在拉流场景是如何响应每个命令,以及如何将推流端发来的流媒体数据给到拉流端

直接跳过一部分源码

  • 在拉流场景,lal与客户端的握手和chunk传输都是通用的RTMP协议,在本文这部分代码就直接跳过了,因为前文已有详细的说明

开始阅读

  • 拉流服务的入口依旧在server_session.go#RunLoop(),握手成功后由ServerSession.runReadLoop处理拉流客户端发来的消息
func (s *ServerSession) RunLoop() (err error) {if err = s.handshake(); err != nil {_ = s.dispose(err)return err}err = s.runReadLoop()_ = s.dispose(err)return err
}
  • 跳过处理chunk的代码chunk_composer.go#RunLoop,直接来到处理message的server_session.go#doMsg方法,如下所示,面对着各种消息类型的处理逻辑,又让人犯愁了:在拉流的时候,真实的消息顺序究竟是怎样的呢?
func (s *ServerSession) doMsg(stream *Stream) error {if err := s.writeAcknowledgementIfNeeded(stream); err != nil {return err}//log.Debugf("%d %d %v", stream.header.msgTypeId, stream.msgLen, stream.header)switch stream.header.MsgTypeId {case base.RtmpTypeIdWinAckSize:return s.doWinAckSize(stream)case base.RtmpTypeIdSetChunkSize:// noop// 因为底层的 chunk composer 已经处理过了,这里就不用处理case base.RtmpTypeIdCommandMessageAmf0:return s.doCommandMessage(stream)case base.RtmpTypeIdCommandMessageAmf3:return s.doCommandAmf3Message(stream)case base.RtmpTypeIdMetadata:return s.doDataMessageAmf0(stream)case base.RtmpTypeIdAck:return s.doAck(stream)case base.RtmpTypeIdUserControl:s.doUserControl(stream)case base.RtmpTypeIdAudio:fallthroughcase base.RtmpTypeIdVideo:if s.sessionStat.BaseType() != base.SessionBaseTypePubStr {return nazaerrors.Wrap(base.ErrRtmpUnexpectedMsg)}s.avObserver.OnReadRtmpAvMsg(stream.toAvMsg())default:Log.Warnf("[%s] read unknown message. typeid=%d, %s", s.UniqueKey(), stream.header.MsgTypeId, stream.toDebugString())}return nil
}
  • 此刻去看下真实日志应该是个不错的方法,但是,此时lal还在处理推流请求,有大量推流相关的日志也在源源不断的输出
  • 于是,为了只看拉流先关日志,对代码做少量修改,如下图所示,修改后只有拉流才会输出日志
  • 第二处改动如下,在处理amf0消息的时候,如果不是推流,就把命令打印出来
  • 修改完毕再重新运行lal、推流、拉流,就能获取到修改后的日志了,用关键字pull log过滤后的日志内容如下
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:3 MsgLen:196 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0},  - server_session.go:216
INFO pull log, SessionId [RTMPPUBSUB4], cmd [connect] - server_session.go:345
INFO [RTMPPUBSUB4] < R connect('live'). tcUrl=rtmp://127.0.0.1:1935/live - server_session.go:413
INFO [RTMPPUBSUB4] > W Window Acknowledgement Size 5000000. - server_session.go:417
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:2 MsgLen:4 MsgTypeId:5 MsgStreamId:0 TimestampAbs:0},  - server_session.go:216
INFO [RTMPPUBSUB4] < R Window Acknowledgement Size: 5000000 - server_session.go:257
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:3 MsgLen:25 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0},  - server_session.go:216
INFO pull log, SessionId [RTMPPUBSUB4], cmd [createStream] - server_session.go:345
INFO [RTMPPUBSUB4] < R createStream(). - server_session.go:444
INFO [RTMPPUBSUB4] > W _result(). - server_session.go:445
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:8 MsgLen:38 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0},  - server_session.go:216
INFO pull log, SessionId [RTMPPUBSUB4], cmd [getStreamLength] - server_session.go:345
2023/04/08 10:09:06.774588 DEBUG [RTMPPUBSUB4] read command message, ignore it. cmd=getStreamLength, header={Csid:8 MsgLen:38 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0}, b=len(core)=4096, rpos=27, wpos=38, hex=00000000  05 02 00 07 74 65 73 74  31 31 31                 |....test111|- server_session.go:366
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:8 MsgLen:36 MsgTypeId:20 MsgStreamId:1 TimestampAbs:0},  - server_session.go:216
INFO pull log, SessionId [RTMPPUBSUB4], cmd [play] - server_session.go:345
INFO [RTMPPUBSUB4] < R play('test111'). - server_session.go:507
INFO [RTMPPUBSUB4] > W onStatus('NetStream.Play.Start'). - server_session.go:517
2023/04/08 10:09:06.774929 DEBUG [GROUP2] [RTMPPUBSUB4] add SubSession into group. - group__out_sub.go:20
INFO pull log, SessionId [RTMPPUBSUB4], sub msg header {Csid:2 MsgLen:10 MsgTypeId:4 MsgStreamId:0 TimestampAbs:1},  - server_session.go:216
  • 通过上述日志,可以看出拉流场景,lal收到的命令依次如下
connect
->
server bandwidth
->
createStream
->
getStreamLength
->
play
->
control message
  • 有了这个实际顺序,阅读源码理时就不会迷失方向了,接下来先要搞清楚一个问题:下图是刚才新增的代码,s.sesssionStat.BaseType()代表的是当前会话的类型,那么问题来了,这个会话类型是何时确定的呢?

会话类型是何时确定的

  • 当lal的1935端口收到一个远程TCP连接的时候(推流或拉流都会建立TCP连接),会调用server.go#handleTcpConnect,里面会用NewServerSession穿件Session对象,即会话对象,如下图红色箭头
  • 在NewServerSession方法中,调用base.NewBasicSessionStat的时候,指定了sessionType等于base.SessionTypeRtmpServerSession(注意,这时候还只知道是个TCP连接,并不清楚具体是推流还是拉流)
  • 展开base.NewBasicSessionStat方法,看到了设置会话类型的代码,如下图,此时会话类型是PUBSUB,很中性,很合理,毕竟现在还不知是推流还是拉流
  • 建立TCP连接后,就会陆陆续续收到拉流端侧发来的各种命令,其中有一个amf0命令名为play,看名字也知道是播放的命令,处理该命令的方法是server_session.go#doPlay,下图是其部分源码,红色箭头可见此时会话的类型被正式设置成了SUB
  • 代码读到这里,我突然想到:举一反三,推流的会话类型是啥时确定的呢?应该是在收到明确的推流命令时吧
  • 打开代码,果然,在处理publish命令的时候,将推流的会话类型设置为PUB
  • 终于把会话类型的问题弄明白了,接下来学习每个命令的响应

server bandwidth(5)

  • 消息类型等于5的时候,lal的处理逻辑是doWinAckSize方法,这里只是对成员变量做了设置
func (s *ServerSession) doWinAckSize(stream *Stream) error {if stream.msg.Len() < 4 {return base.NewErrRtmpShortBuffer(4, int(stream.msg.Len()), "ServerSession::doWinAckSize")}s.peerWinAckSize = int(bele.BeUint32(stream.msg.buff.Bytes()))Log.Infof("[%s] < R Window Acknowledgement Size: %d", s.UniqueKey(), s.peerWinAckSize)return nil
}

createStream

  • createStream命令的处理也很简单,没有业务逻辑,只是对客户端的回复
func (s *ServerSession) doCreateStream(tid int, stream *Stream) error {Log.Infof("[%s] < R createStream().", s.UniqueKey())Log.Infof("[%s] > W _result().", s.UniqueKey())if err := s.packer.writeCreateStreamResult(s.conn, tid); err != nil {return err}return nil
}

getStreamLength

  • 接下来的命令是getStreamLength,顾名思义,客户端想知道媒体流的长度
  • 在直播场景下,媒体流没有长度,于是,面对getStreamLength命令,lal不予理会

play

  • 拉流场景中,play算是最重要的命令了,前面在分析如何设置会话类型的时候,已经对play有一些了解,接下来要细看这部分
  • play命令的处理逻辑如下,先从命令提取了流名,然后回复两个控制命令StreamIsRecorded和StreamBegin,告诉端侧播放即将开始,紧接着就是状态同步命令NetStream.Play.Start,然后设置超时时间(推流是写超时,拉流失读超时),接着是前面看过一次的代码:设置会话类型为SUB,最后是对观察者的回调
func (s *ServerSession) doPlay(tid int, stream *Stream) (err error) {if err = stream.msg.readNull(); err != nil {return err}s.streamNameWithRawQuery, err = stream.msg.readStringWithType()if err != nil {return err}ss := strings.Split(s.streamNameWithRawQuery, "?")s.streamName = ss[0]if len(ss) == 2 {s.rawQuery = ss[1]}s.url = fmt.Sprintf("%s/%s", s.tcUrl, s.streamNameWithRawQuery)Log.Infof("[%s] < R play('%s').", s.UniqueKey(), s.streamNameWithRawQuery)// TODO chef: start duration resetif err := s.packer.writeStreamIsRecorded(s.conn, Msid1); err != nil {return err}if err := s.packer.writeStreamBegin(s.conn, Msid1); err != nil {return err}Log.Infof("[%s] > W onStatus('NetStream.Play.Start').", s.UniqueKey())if err := s.packer.writeOnStatusPlay(s.conn, Msid1); err != nil {return err}// 回复完信令后修改 connection 的属性s.modConnProps()s.sessionStat.SetBaseType(base.SessionBaseTypeSubStr)err = s.observer.OnNewRtmpSubSession(s)if err != nil {s.DisposeByObserverFlag = true}return err
}
  • 对上述代码,有一处不理解的地方,就是根据会话类型修改连接超时时长的代码(modConnProps方法内部),这段代码执行完毕后才会设置会话类型,所以modConnProps方法中的会话类型应该是不准的,那么超时的设置也就有问题了,也许是我对代码的理解还不够深入吧
  • 再来看看刚刚提到的观察者的回调,对应的是server_manager__.go#OnNewRtmpSubSession方法,主要是先鉴权,再把会话加入Group
func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) error {sm.mutex.Lock()defer sm.mutex.Unlock()info := base.Session2SubStartInfo(session)if err := sm.option.Authentication.OnSubStart(info); err != nil {return err}group := sm.getOrCreateGroup(session.AppName(), session.StreamName())group.AddRtmpSubSession(session)info.HasInSession = group.HasInSession()info.HasOutSession = group.HasOutSession()sm.option.NotifyHandler.OnSubStart(info)return nil
}
  • 至此,play命令的主要操作就算看完了,lal接下来收到的是Control Message(0x04)

Control Message

  • 响应Control Message的方法是doUserControl,只是个ping的响应
func (s *ServerSession) doUserControl(stream *Stream) error {// TODO(chef): 检查buff长度有效性 202301userControlType := bele.BeUint16(stream.msg.buff.Bytes())if userControlType == uint16(base.RtmpUserControlPingRequest) {stream.msg.buff.Skip(2)timestamp := bele.BeUint32(stream.msg.buff.Bytes())return s.packer.writePingResponse(s.conn, timestamp)}return nil
}
  • 代码读到此,lal处理拉流客户端命令的逻辑算是看完了,可见主要是RTMP协议的实现、会话对象维护、还有就是根据流名加入Group
  • 其实到现在咱们还只是看了lal与拉流客户端正式建立联系的代码,真正的流传输还没看到,这也是接下来的任务:拉流动作的具体实现代码

拉流动作

  • 如果您看过了前文的推流代码,此刻应该是胸有成竹了,关键代码前面已经看过,现在无非是从拉流的视角再去温习一遍而已
  • 拉流对应的具体动作,其实是推流的逻辑触发的,简单的说:lal收到推流端发来的媒体流数据时,就会将数据写入拉流的TCP连接中
  • 咱们来看代码
  • lal收到推流端发来的媒体流消息时,会执行group__core_streaming.go#broadcastByRtmpMsg,下面是其中的一段代码,遍历该流名的group下的所有拉流会话,逐一处理,这部分代码中,针对刚刚加入的会话有特别处理,首先要把媒体流的meta信息给拉流端,其次要将缓存的关键帧推给拉流侧,这样拉流侧就能快速播放了,而无需等到推流端推来的关键帧(一个GOP可能长达数秒,不用缓存的话可能要等数秒才有关键帧,图像才能显示)
 for session := range group.rtmpSubSessionSet {if session.IsFresh {// TODO chef: 头信息和full gop也可以在SubSession刚加入时发送if group.rtmpGopCache.MetadataEnsureWithoutSetDataFrame != nil {Log.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey())_ = session.Write(group.rtmpGopCache.MetadataEnsureWithoutSetDataFrame)}if group.rtmpGopCache.VideoSeqHeader != nil {Log.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey())_ = session.Write(group.rtmpGopCache.VideoSeqHeader)}if group.rtmpGopCache.AacSeqHeader != nil {Log.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey())_ = session.Write(group.rtmpGopCache.AacSeqHeader)}gopCount := group.rtmpGopCache.GetGopCount()if gopCount > 0 {// GOP缓存中肯定包含了关键帧session.ShouldWaitVideoKeyFrame = falseLog.Debugf("[%s] [%s] write gop cache. gop num=%d", group.UniqueKey, session.UniqueKey(), gopCount)}for i := 0; i < gopCount; i++ {for _, item := range group.rtmpGopCache.GetGopDataAt(i) {_ = session.Write(item)}}// 有新加入的sub session(本次循环的第一个新加入的sub session),把rtmp buf writer中的缓存数据全部广播发送给老的sub session// 从而确保新加入的sub session不会发送这部分脏的数据// 注意,此处可能被调用多次,但是只有第一次会实际flush缓存数据if group.rtmpMergeWriter != nil {group.rtmpMergeWriter.Flush()}session.IsFresh = false}if session.ShouldWaitVideoKeyFrame && msg.IsVideoKeyNalu() {// 有sub session在等待关键帧,并且当前是关键帧// 把rtmp buf writer中的缓存数据全部广播发送给老的sub session// 并且修改这个sub session的标志// 让rtmp buf writer来发送这个关键帧if group.rtmpMergeWriter != nil {group.rtmpMergeWriter.Flush()}session.ShouldWaitVideoKeyFrame = false}}
  • 然后才是关键代码,就是这段
 if len(group.rtmpSubSessionSet) > 0 {if group.rtmpMergeWriter == nil {group.write2RtmpSubSessions(lazyRtmpChunkDivider.GetEnsureWithoutSdf())} else {group.rtmpMergeWriter.Write(lazyRtmpChunkDivider.GetEnsureWithoutSdf())}}
  • 真正执行的是write2RtmpSubSessions方法,如下所示,遍历所有拉流的session,把流媒体消息通过TCP连接写入(session.Write方法)
func (group *Group) write2RtmpSubSessions(b []byte) {for session := range group.rtmpSubSessionSet {if session.IsFresh || session.ShouldWaitVideoKeyFrame {continue}_ = session.Write(b)}
}
  • 至此,拉流源码阅读完成,除了对基础知识的掌握,相信您对lal作者的源码风格也逐渐熟悉了吧:简洁明了,关键位置有注释,这样的代码读起来真是一种享受,接下来的学习之旅,一定有有更多精彩等着我们

你不孤单,欣宸原创一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列

Golang流媒体实战之六:lal拉流服务源码阅读相关推荐

  1. Golang流媒体实战之五:lal推流服务源码阅读

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos <Golang流媒体实战>系列的链接 体验 ...

  2. 三. 微服务源码阅读-Hystrix 源码

    3. Hystrix 源码 1. 断路器开关 @SpringBootApplication(scanBasePackages = {"len.hgy"}) //注册到eureka ...

  3. Golang流媒体实战之一:体验开源项目lal

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos 关于<Golang流媒体实战> 因为工作需 ...

  4. 开源流媒体解决方案,流媒体服务器,推拉流,直播平台,SRS,WebRTC,移动端流媒体,网络会议,优秀博客资源等分享

    开源流媒体解决方案,流媒体服务器,推拉流,直播平台,SRS,WebRTC,移动端流媒体,网络会议,优秀博客资源等分享 一.优秀的流媒体博客资源 1.1 EasyNVR:专注于安防视频互联网化的技术 1 ...

  5. SRS流媒体服务器——WebRTC推拉流演示

    SRS流媒体服务器--WebRTC推拉流 目录 WebRTC推拉流配置 WebRTC拉流演示 WebRTC推流演示 SRS官方WebRTC文档:https://github.com/ossrs/srs ...

  6. golang string 转换 uint64_Golang 的 #x27;print#x27; 源码解读

    和其他编程语言一样,Golang也有很多类型的输出方式,这些输出都是通过fmt包来实现的,以下是主要输出方式的源码,路径在fmt/print.go下,源码随简单,理解起来也很容易,阅读理解一遍后记忆更 ...

  7. 源码 状态机_[源码阅读] 阿里SOFA服务注册中心MetaServer(1)

    [源码阅读] 阿里SOFA服务注册中心MetaServer(1) 0x00 摘要 0x01 服务注册中心 1.1 服务注册中心简介 1.2 SOFARegistry 总体架构 1.3 为什么要分层 0 ...

  8. 《一步一步看源码:Nacos》框架源码系列之一(其1,配置服务源码)

    Nacos源码 ​ 因为最近项目在做容器化处理,容器化后涉及到不同进程对同一个文件的读写,考虑到可能会存在同一文件的配置文件,可能会把彼此覆盖掉,所以这里学习一下Nacos源码. 整体结构图 ​ 这边 ...

  9. srs可以用java开发吗,为SRS流媒体服务器添加HLS加密功能(附源码)

    #为SRS流媒体服务器添加HLS加密功能(附源码)# 之前测试使用过nginx的HLS加密功能,会使用到一个叫做nginx-rtmp-module的插件,但此插件很久不更新了,网上搜索到一个中国制造的 ...

最新文章

  1. 【神经网络】(1) 简单网络,实例:气温预测,附python完整代码和数据集
  2. 字节跳动_掌握Java字节码
  3. 如何在Eclipse中添加新建包,java文件,工程工具栏按钮
  4. CAS单点登录3--服务端登录页个性化
  5. pandas - 案例(美国2012年总统候选人政治献金数据分析)
  6. Android开源项目--分类汇总
  7. python编程计算器_python怎么编写计算器程序
  8. 步步为营!高手教你如何有效使用深度学习解决实际问题
  9. mysql主从搭建教程
  10. 【PMP认证考试之个人总结】第 3 章 项目整合管理
  11. [hadoop读书笔记] 第五章 MapReduce工作机制
  12. 论文笔记_CV_AD_Visual Perception for Autonomous Driving
  13. 振动试验条件及试验标准
  14. 数据包络分析法(DEA)_1
  15. 80坐标系(3度带)转经纬度
  16. 一套简单的基本生活财富自由方案
  17. win10查看无线密码
  18. 小葵花妈妈课堂开课了:《AsyncTask源码分析》
  19. Java打印表格 Console/控制台
  20. 论语 灵公篇(笔记)

热门文章

  1. python中可变参数args传入函数时储存的类型是_[转载]Python中函数的参数定义和可变参数*args与**args...
  2. 华为手机哪几款是鸿蒙系统,华为手机全部型号,华为鸿蒙系统即将发布!升级名单已经确定,目前有3种升级方式...
  3. jQuery表单验证气泡提示插件
  4. 黏住用户,百度的难题
  5. linux常用命令使用
  6. echarts 柱状图柱子改成圆柱体_Origin做多因子柱状图
  7. 腾讯地图教你快速实现轨迹回放
  8. HDU1276:士兵队列训练问题
  9. 学习笔记-FRIDA脚本系列(三)
  10. c语言入门之项目2.2——个人所得税计算器