点击上方“LiveVideoStack”关注我们

作者 | 王朋闯

本文为王朋闯老师创作的系列ion文章,LiveVideoStack已获得授权发布,未来将持续更新。

大话ion系列(一)

大话ion系列(二)

大话ion系列(三)

大话ion系列(四)

八、QOS之Buffer和NACK

1. buffer简介

大家都知道webrtc有jitterbuffer,ion-sfu里也有buffer,抗丢包40%的秘诀就在这里。

主要作用有:

  • 缓存rtp包,收到nack后,重传rtp包

  • 计算rtcp-nack,发送给客户端

  • 计算rtcp-twcc,发送给客户端

  • 计算rtcp-rr/sr/pli/等,发送给客户端

2.buffer结构

看了上边buffer的功能,基本buffer内的数据结构都是为功能服务的。

//待处理的包
type  pendingPackets struct {arrivalTime int64//到达时间packet      []byte//包数据
}//扩展的包结构体
type ExtPacket struct {Head     bool//是否是第一个包Cycle    uint32//SN转了多少轮Arrival  int64//到达时间Packet   rtp.Packet//包Payload  interface{}//包payloadKeyFrame bool//是否关键帧
}// Buffer contains all packets
type Buffer struct {sync.Mutexbucket     *Bucket//定制的rtp包ringbuffernacker     *nackQueue//nack计算队列videoPool  *sync.Pool//视频包临时缓存audioPool  *sync.Pool//视频包临时缓存codecType  webrtc.RTPCodecTypeextPackets deque.Deque//扩展包缓存pPackets   []pendingPackets//待处理包,用于缓存一些来不及处理的包closeOnce  sync.OncemediaSSRC  uint32//媒体源clockRate  uint32//时钟频率maxBitrate uint64//最大码率lastReport int64//上次报告时间twccExt    uint8//rtp扩展头twcc的id,sdp里有audioExt   uint8//rtp扩展头audiolevel的id,sdp里有bound      boolclosed     atomicBoolmime       string//媒体类型,如video/h264等// 是否开启remb nack twcc audiolevelremb       boolnack       booltwcc       boolaudioLevel boolminPacketProbe     intlastPacketRead     intmaxTemporalLayer   int32bitrate            uint64//存储码率bitrateHelper      uint64//用来计算码率lastSRNTPTime      uint64//最后一次SR的NTP时间lastSRRTPTime      uint32//最后一次SR的RTP时间lastSRRecv         int64//1970年1月1日0时0分0秒起到现在的总纳秒数baseSN             uint16//用来组装RRcycles             uint32//SN回环次数lastRtcpPacketTime int64//上一次rtcplastRtcpSrTime     int64// Time the lastRTCP SR was received. Required for DLSR computation.lastTransit        uint32//用来计算jittermaxSeqNo           uint16//收到最大的SNstats Stats//状态统计latestTimestamp     uint32// latestreceived RTP timestamp on packetlatestTimestampTime int64  // Time of the latest timestamp (innanos since unix epoch)// callbacksonClose      func()onAudioLevel func(level uint8)feedbackCB   func([]rtcp.Packet)feedbackTWCC func(sn uint16, timeNS int64, marker bool)// loggerlogger logr.Logger
}

3.buffer创建

Pion/webrtc支持自定义BufferFactory,设置好之后,pion/webrtc的组件会使用自定义buffer。

比如pion/srtp是实际收发srtp和srtcp包的类,它们也会使用自定义buffer。

首先来看一下ion-sfu是在哪里设置自定义buffer的:

func NewWebRTCTransportConfig(c Config)WebRTCTransportConfig {//这个SettingEngine是pion里很重要的设置类,可以控制pion/webrtc很多行为和参数,比如ice-lite等se :=webrtc.SettingEngine{}se.DisableMediaEngineCopy(true)....//这里把自定义的BufferFactory给配置进去了//意思是pion/srtp会使用这个buffer来传包se.BufferFactory =c.BufferFactory.GetOrNew
}

srtp和srtcp流向是这样的:

客户端---srtp--->srtp.ReadStreamSRTP------->SFU
客户端<---srtcp---srtp.ReadStreamSRTCP<------SFU

当包到达pion/srtp时,就会触发ReadStreamSRTP.init函数和ReadStreamSRTCP.init函数。

  • ReadStreamSRTP.init调用自定义的BufferFactory.GetOrNew函数,new了一个buffer

  • ReadStreamSRTCP.init调用自定义的BufferFactory.GetOrNew函数,new了一个rtcpReader

之后收发rtp和rtcp包,就会流经这个buffer和rtcpReader:

https://github.com/pion/srtp/blob/3c34651fa0c6de900bdc91062e7ccb5992409643/stream_srtp.go#L53

func (r *ReadStreamSRTP) init(childstreamSession, ssrc uint32) error {sessionSRTP, ok :=child.(*SessionSRTP)
......ifr.session.bufferFactory != nil {//这里就是调用自定义的BufferFactory.GetOrNew函数了,new了一个bufferr.buffer = r.session.bufferFactory(packetio.RTPBufferPacket,ssrc)} else {
.......}return nil
}
srtp.(*ReadStreamSRTP).init--->session.bufferFactory(其实是buffer.BufferFactory.GetOrNew)--->buffer.NewBuffer
srtp.(*ReadStreamSRTCP).init--->session.bufferFactory(其实是buffer.BufferFactory.GetOrNew)--->buffer.NewNewRTCPReader

为什么这么搞呢?

仔细想想,如果控制了rtp和rtcp的buffer,是不是计算twcc、nack、stats等就很方便了,在buffer写入包的同时,就可以通过设置的回调函数搞各种复杂计算。

4.buffer收发包流程

收发rtp包流程图简单总结:

srtp.(*ReadStreamSRTP).write--->buffer.(*Buffer).Write--->buffer.(*Buffer).ReadExtended--->DownTrack.WriteRTP--->DownTrack.writeSimpleRTP/writeSimulcastRTP

贴一下代码:

func (r *ReadStreamSRTP) write(buf []byte) (n int, err error) {//这里就把包写入了自定义buffern, err = r.buffer.Write(buf)if errors.Is(err,packetio.ErrFull) {// Silently dropdata when the buffer is full.return len(buf), nil}return n, err
}

downtrack收发rtcp包流程图:

srtp.(*ReadStreamSRTCP).write--->buffer.(*RTCPReader).Write--->DownTrack.Bind里rr.OnPacket


5.bucket存储rtp

如图,bucket是一个定制的ringbuffer,是用来存储rtp包的:

包含一个数组,step是索引,每来一个包,先把包长度存入2字节,再把包存入一个MTU;step递增,以此类推,达到最大再从0循环(从0到maxSteps)。

buf: [2][MTU][2][MTU][2][MTU][2][MTU]...[MTU]0       1       2       3      maxSteps|       |      |       |  ... |step-------------------------------><---------------------------------|

rtp包写入bucket,并被WebRTCReceiver用来查找+重传包的过程。

buffer.(*Buffer).Write-->buffer.Buffer.calc-->buffer.bucket.AddPacket-->buffer.bucket.GetPacket<---WebRTCReceiver.RetransmitPackets<---DownTrack.handleRTCP

看下代码细节:

const maxPktSize = 1500//一般MTU的大小type Bucket struct {buf []byte//一块buffer,可以存多个包src *[]byte//存储原始buffer指针init     bool//是否初始化step     int//递增计数headSN   uint16//头部snmaxSteps int//最大计数,一般是(总buffer大小/mtu大小)来计算
}//创建bucket,存储包
funcNewBucket(buf *[]byte) *Bucket{return &Bucket{src:      buf,buf:      *buf,maxSteps: int(math.Floor(float64(len(*buf))/float64(maxPktSize))) -1,}
}//塞包
func (b *Bucket) AddPacket(pkt []byte, sn uint16, latest bool) ([]byte, error) {if !b.init {//如果没有初始化headSNb.headSN = sn - 1b.init = true}//如果不是最后一个包,即乱序if !latest {return b.set(sn, pkt)//存储or覆盖}//如果是最后一个包diff := sn -b.headSNb.headSN = sn//计算stepfor i := uint16(1); i < diff;i++ {b.step++if b.step >=b.maxSteps {b.step = 0}}return b.push(pkt), nil
}//查找序号sn的包并写入buf
func (b *Bucket) GetPacket(buf []byte, sn uint16) (i int, err error) {p := b.get(sn)if p == nil{err = errPacketNotFoundreturn}i = len(p)if cap(buf) < i {err = errBufferTooSmallreturn}if len(buf) < i {buf = buf[:i]}copy(buf, p)return
}//存包
func (b *Bucket) push(pkt []byte) []byte {//先写入2字节长度binary.BigEndian.PutUint16(b.buf[b.step*maxPktSize:],uint16(len(pkt)))off := b.step*maxPktSize+ 2//再写入包长度copy(b.buf[off:],pkt)b.step++//递增if b.step > b.maxSteps{b.step = 0//归零}return b.buf[off : off+len(pkt)]//返回包数据
}//查找包数据
func (b *Bucket) get(sn uint16) []byte {pos := b.step - int(b.headSN-sn+1)if pos < 0 {if pos*-1 > b.maxSteps+1 {return nil}pos = b.maxSteps +pos + 1}off := pos * maxPktSizeif off > len(b.buf) {return nil}if binary.BigEndian.Uint16(b.buf[off+4:off+6]) != sn{return nil}sz := int(binary.BigEndian.Uint16(b.buf[off : off+2]))return b.buf[off+2 : off+2+sz]
}//写入包数据
func (b *Bucket) set(sn uint16, pkt []byte) ([]byte, error) {if b.headSN-sn >=uint16(b.maxSteps+1) {return nil,errPacketTooOld}pos := b.step - int(b.headSN-sn+1)if pos < 0 {pos = b.maxSteps +pos + 1}off := pos *maxPktSizeif off > len(b.buf) ||off < 0 {return nil,errPacketTooOld}// 如果已经存在则不写入if binary.BigEndian.Uint16(b.buf[off+4:off+6]) == sn{return nil,errRTXPacket}binary.BigEndian.PutUint16(b.buf[off:], uint16(len(pkt)))copy(b.buf[off+2:], pkt)return b.buf[off+2 : off+2+len(pkt)], nil
}

6.  nackQueue计算nack

nack数组,用来存储nack信息并计算rtcp-nack。

[9316][9317]...[N]|sn

代码细节:

const maxNackTimes = 3   // 每个nack包发送的最大次数,防止客户端一直重传加重拥塞
const maxNackCache = 100// 最大缓存个数type nack struct {sn     uint32//rtp序列号nacked uint8//发送的次数
}type nackQueue struct {nacks []nack//nack数组kfSN  uint32//askKeyframeSN 要求发送PLI
}//创建nackQueue
funcnewNACKQueue() *nackQueue{return &nackQueue{nacks: make([]nack, 0, maxNackCache+1),}
}//删除
func (n *nackQueue) remove(extSN uint32) {i := sort.Search(len(n.nacks), func(iint) bool { return n.nacks[i].sn >= extSN })if i >= len(n.nacks) ||n.nacks[i].sn != extSN {return}copy(n.nacks[i:],n.nacks[i+1:])n.nacks = n.nacks[:len(n.nacks)-1]
}//插入,extSN从大到小,查找效率高
func (n *nackQueue) push(extSN uint32) {//找到<=数组中sn的位置,一般是0i := sort.Search(len(n.nacks), func(iint) bool { return n.nacks[i].sn >= extSN })if i < len(n.nacks) &&n.nacks[i].sn == extSN {return}nck := nack{sn:     extSN,nacked: 0,}if i == len(n.nacks) {//如果是0,直接appendn.nacks = append(n.nacks, nck)} else {//否则复制元素,到最前边n.nacks = append(n.nacks[:i+1], n.nacks[i:]...)n.nacks[i] = nck}//如果nack达到最大,删除最前一个if len(n.nacks) >=maxNackCache {copy(n.nacks,n.nacks[1:])}
}//生成nack
func (n *nackQueue) pairs(headSN uint32)([]rtcp.NackPair, bool) {if len(n.nacks) ==0 {return nil, false}i := 0askKF := falsevar np rtcp.NackPairvar nps []rtcp.NackPairfor _, nck := rangen.nacks {if nck.nacked >=maxNackTimes {//如果nack重发>=3次if nck.sn >n.kfSN {//如果sn>上次请求关键帧SNn.kfSN = nck.sn//记录下来askKF = true//返回请求PLI}continue}//跳过比headSN大3的if nck.sn >=headSN-2 {//如:9316>=9320-2 不成立,跳过n.nacks[i] = ncki++continue}//过来的是3个包//这个值是个经验值://如果太大,返回rtcp-nack包会delay太久,导致客户端重发包太迟,画面延迟//如果太小,比如2,则可能乱序的概率会大,因为等的越短,乱序包到来的概率越小n.nacks[i] = nack{sn:     nck.sn,nacked: nck.nacked +1,//计数器+1}i++//如果是np.PacketID==0,是第一个包,需要初始化np.PacketIDnp.LostPacketsif np.PacketID ==0 || uint16(nck.sn) > np.PacketID+16 {if np.PacketID !=0 {nps = append(nps, np)}np.PacketID = uint16(nck.sn)np.LostPackets = 0continue}//如果是后续包,计算LostPacketsnp.LostPackets |= 1 <<(uint16(nck.sn) - np.PacketID - 1)}if np.PacketID != 0 {nps = append(nps, np)//追加到后边}n.nacks = n.nacks[:i]//去掉已经算过的包return nps, askKF
}

7. 总结

本文介绍了Qos中两个基础部分:

  • 使用bucket缓存rtp包,收到nack后,重传rtp包

  • 使用nackQueue,存储信息并计算rtcp-nack,发送给客户端


作者简介:

王朋闯:前百度RTN资深工程师,前金山云RTC技术专家,前VIPKID流媒体架构师,ION开源项目发起人。

特别说明:

本文发布于知乎,已获得作者授权转载。


讲师招募

LiveVideoStackCon 2022 音视频技术大会 上海站,正在面向社会公开招募讲师,无论你所处的公司大小,title高低,老鸟还是菜鸟,只要你的内容对技术人有帮助,其他都是次要的。欢迎通过 speaker@livevideostack.com 提交个人资料及议题描述,我们将会在24小时内给予反馈。

喜欢我们的内容就点个“在看”吧!

大话ion系列(五)相关推荐

  1. 大话ion系列(三)

    点击上方"LiveVideoStack"关注我们 作者 | 王朋闯 本文为王朋闯老师创作的系列ion文章,LiveVideoStack已获得授权发布,未来将持续更新. 大话ion系 ...

  2. 大话ion系列(四)

    点击上方"LiveVideoStack"关注我们 作者 | 王朋闯 本文为王朋闯老师创作的系列ion文章,LiveVideoStack已获得授权发布,未来将持续更新. 大话ion系 ...

  3. 大话ion系列(二)

    点击上方"LiveVideoStack"关注我们 作者 | 王朋闯 本文为王朋闯老师创作的系列ion文章,LiveVideoStack已获得授权发布,未来将持续更新. 大话ion系 ...

  4. 大话ion系列(一)

    点击上方"LiveVideoStack"关注我们 作者 | 王朋闯 本文为王朋闯老师创作的系列ion文章,LiveVideoStack已获得授权发布,未来将持续更新. 一.为什么用 ...

  5. python解复杂方程_Python数据处理篇之Sympy系列(五)---解方程

    前言 sympy不仅在符号运算方面强大,在解方程方面也是很强大. 本章节学习对应官网的:Solvers 官方教程 (一)求解多元一次方程-solve() 1.说明: 解多元一次方程可以使用solve( ...

  6. 《ASP.NET Core In Action》读书笔记系列五 ASP.NET Core 解决方案结构解析1

    <ASP.NET Core In Action>读书笔记系列五 ASP.NET Core 解决方案结构解析1 参考文章: (1)<ASP.NET Core In Action> ...

  7. Silverlight Blend动画设计系列五:故事板(StoryBoards)和动画(Animations)

    原文:Silverlight & Blend动画设计系列五:故事板(StoryBoards)和动画(Animations) 正如你所看到的,Blend是一个非常强大的节约时间的设计工具,在Bl ...

  8. 盘点2013年那些最优秀的网页设计作品【系列五】

    这个系列的文章向大家展示2013年最优秀的国外网页设计作品,这些都是过去的一年在图片,纹理,导航等等各个方面的优秀网站.2013年,网页设计领域出现了几个新的流行趋势,最热门的就是响应式设计(Resp ...

  9. 数学之美系列五 -- 简单之美:布尔代数和搜索引擎的索引

    数学之美系列五 -- 简单之美:布尔代数和搜索引擎的索引 [建立一个搜索引擎大致需要做这样几件事:自动下载尽可能多的网页:建立快速有效的索引:根据相关性对网页进行公平准确的排序.我们在介绍 Googl ...

最新文章

  1. flask中使用Flask-SQLALCHEMY-------一个简单的例子
  2. Windows下删除Kafka中某个Group
  3. python中and与or的执行顺序-关联子查询的执行顺序是什么
  4. sidecar_Spring Cloud Sidecar –节点初始化
  5. Struts2学习笔记《二》
  6. Redis:08---字符串对象
  7. Modernizr使用指南
  8. XSS挖漏洞 - CSS编码和反斜杠的三个技巧
  9. 【生活相关】一(坚持)
  10. linux上设置jar包加载顺序,SpringBoot配置加载顺序
  11. hash冲突的4种解决方案
  12. python 柱状图和折线图放在一起_python中用matplotlib画折线图、柱状图、散点图
  13. 2021年茶艺师(初级)考试资料及茶艺师(初级)模拟试题
  14. 【转】在 26 岁时写给 18 岁的自己--Livid
  15. java8 stream collectors.joining
  16. flowplayer 在线视频
  17. 华为模拟器实现wlan 三层旁挂式组网
  18. 媒体查询--自适应屏幕大小
  19. 今天我在CSDN开通我的技术博客,与非技术博客分开
  20. 互联网公司招聘--奇虎360--研发工程师--2016年笔试题

热门文章

  1. 关于static变量的定义及性质的深层介绍
  2. Shiro之从数据库初始化角色权限信息
  3. 0001-Hello world(第一弹)
  4. redis StackExchange 主备 实现 demo
  5. struts2 依赖注入boolean类型的属性时报错
  6. HTML5本地存储之Web Storage篇
  7. 我的北京游戏开发总结【三】
  8. DEV GridControl ID相同的行显示相同的颜色(当ID的值不确定时)
  9. ReentrantReadWriteLock可重入读写锁分析
  10. 洛谷 - P3379 【模板】最近公共祖先(LCA)(RMQ求LCA/Tarjan求LCA)