目标:
上一节分析了SRS针对推流客户端的处理逻辑,这里接下来分析针对拉流客户端的处理逻辑。

SRS拉流端处理逻辑简单说就是SrsRtmpConn::do_playing()协程从SrsLiveConsumer对象中不断读取数据并转发给拉流客户端的过程。另外,在这个过程中,如果服务器处于edge模式,还有一个回源拉流的处理过程需要重点分析。

内容:

1、拉流端整体处理流程

拉流客户端与服务器之间的协议交互流程:

SrsRtmpConn::stream_service_cycle()函数中通过SrsRtmpServer::identify_client()识别客户端类型,如果是拉流客户端,则进入SrsRtmpConn::playing()处理

srs_error_t SrsRtmpConn::stream_service_cycle()
{rtmp->identify_client(&info->type); // 对于推流端,每个推流端通过fetch_or_create函数生成一个对应的SrsLiveSource对象// 对于拉流端,// 1)如果本机已经有对应推流端的SrsLiveSource对象,则根据拉流地址字符串(/live/stream)直接获取// 2)如果本机没有对应推流端的SrsLiveSource对象,这里则会创建一个SrsLiveSource对象//   并在后续的SrsRtmpConn::playing()函数中,根据edge模式有一个回源拉流的操作_srs_sources->fetch_or_create(req, server, &source); switch (info->type) {case SrsRtmpConnPlay:rtmp->start_play(info->res->stream_id); // 发送拉流响应报文http_hooks_on_play(); // 执行HTTP回调处理,对外通知客户端拉流开始playing(source);  // 拉流处理逻辑http_hooks_on_stop();  //执行HTTP回调处理,对外通知客户端拉流结束return err;}
}

SrsRtmpConn::playing()处理的逻辑:

1、如果是源站,支持源站集群,且本地没有当前需要的拉流,则向配置的协同源站查询,最终将查询结果通知拉流客户端

2、为每个拉流端创建一个SrsLiveConsumer消费者对象,并与推流端SrsLiveSource对象绑定,将source中缓存的meta和GOP数据发送到消费者队列

3、创建并启动一个拉流端接收协程,此协程主要工作是接收拉流客户端的播放控制命令(暂停、继续)

4、真正的拉流协程do_playing()就阻塞在条件变量mw_wait,等待有新数据时被唤醒。

本文福利, C++音视频学习资料包、技术视频,内容包括(音视频开发,面试题,FFmpeg ,webRTC ,rtmp ,hls ,rtsp ,ffplay ,srs,推流拉流)↓↓↓↓↓↓见下面↓↓文章底部↓↓

srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
{// 这里判断本机是源站模式,且支持源集群,且当前source对象并没有客户端推流,才进入此分支// 这个分支基本上包括了源站集群的全部处理逻辑if (!info->edge && _srs_config->get_vhost_origin_cluster(req->vhost) && source->inactive()) {// 从配置文件中读取其它协同工作源站的地址vector<string> coworkers = _srs_config->get_vhost_coworkers(req->vhost);// 向每个协同工作源站查询是否存在指定的流(/live/stream)for (int i = 0; i < (int)coworkers.size(); i++) {// 向配置的协同源站发起HTTP查询请求SrsHttpHooks::discover_co_workers(url, host, port);// 根据协同工作源站返回的信息生成新URLstring rurl = srs_generate_rtmp_url(host, port, req->host, req->vhost, );if (host.empty() || port == 0) { continue; } // 如果是无效URL,则继续查询// 使用新生成的URL向客户端返回重定向响应rtmp->redirect(req, rurl, accepted);  return srs_error_new(ERROR_CONTROL_REDIRECT, "redirected");}}// 为每个拉流端创建一个SrsLiveConsumer消费者对象,并与SrsLiveSource对象绑定// 这个函数内部包括了edge模式下,edge站点回源拉流的详细处理,这是一个非常重要的特性,需要单独分析// 这里我们暂时认为,通过此函数的处理,拉流端需要的数据流,已经被推送到了本站点source->create_consumer(consumer);  // 这里每次创建一个新的消费者对象时,都会首先将source中缓存的meta和GOP数据发送到消费者队列source->consumer_dumps(consumer); // 创建并启动一个拉流端接收协程,此协程主要工作是接收客户端的播放控制命令(暂停、继续)SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, );trd.start();   do_playing(source, consumer, &trd); // 进入拉流处理逻辑trd.stop(); // 拉流结束return err;
}

拉流端接收协程SrsQueueRecvThread trd的处理逻辑和推流端接收协程完全一致(在srs_app_recv_thread.cpp文件中)

1、调用SrsRtmpServer::recv_message()->SrsProtocol::recv_message()接收一个完整的RTMP数据包。

2、调用ISrsMessageConsumer::consume()虚接口向外复制数据,此时实际调用的接口是SrsQueueRecvThread::consume()。

3、最终数据包被保存到SrsQueueRecvThread对象的std::vector<SrsCommonMessage*>queue队列,并调用SrsLiveConsumer::wakeup()接口,唤醒条件变量mw_wait,因为真正的拉流协程do_playing()就阻塞在条件变量mw_wait,等待有新数据时被唤醒。

srs_error_t SrsRecvThread::do_cycle()
{rtmp->recv_message(&msg); // 此函数用于得到一个Message结构pumper->consume(msg);  // 对于推流端,pumper对象类型为SrsPublishRecvThread// 对于拉流端,pumper对象类型为SrsQueueRecvThread
}srs_error_t SrsQueueRecvThread::consume(SrsCommonMessage* msg)
{queue.push_back(msg); // 接收拉流客户端发送来的播放控制命令并放入队列// 另一个协程do_playing()通过rtrd->pump()从队列中取报文_consumer->wakeup(); // 唤醒消费者所在的协程,这里具体就是do_playing()协程// do_playing()协程唤醒后,会调用SrsLiveConsumer::dump_packets// 获取数据并向客户端发送。
}void SrsLiveConsumer::wakeup()
{if (mw_waiting) {srs_cond_signal(mw_wait);mw_waiting = false;}
}

SrsRtmpConn::do_playing()协程的处理逻辑:

1、调用SrsQueueRecvThread::empty()检查拉流端接收协程的队列中是否有等待处理的控制命令报文,如果有,则调用process_play_control_msg()处理控制命令。如果协程处理出错,则退出。

2、调用SrsLiveConsumer::wait()将本线程阻塞在条件变量mw_wait上,等待接收到新数据或播放端控制命令。

3、协程被唤醒后,调用SrsLiveConsumer::dump_packets()取数据,并通过SrsRtmpServer::send_and_free_messages()函数发送给拉流客户端。

srs_error_t SrsRtmpConn::do_playing()
{while (true) { if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "rtmp: thread quit");} while (!rtrd->empty()) { // 接收协程缓存队列不为空,表示接收到播放端发送的控制命令SrsCommonMessage* msg = rtrd->pump();process_play_control_msg(consumer, msg);  // 处理播放端控制指令}// 如果协程运行状态出错,则退出协程if ((err = rtrd->error_code()) != srs_success) {return srs_error_wrap(err, "rtmp: recv thread");}// 调用SrsLiveConsumer::wait()阻塞在条件变量mw_wait上,等待被唤醒// 1、推流端接收到推流数据,并调用SrsLiveConsumer::enqueue()时,会唤醒此条件变量// 2、拉流端接收到用户发送的播放控制命令时,调用SrsLiveConsumer::wakeup()唤醒此条件变量consumer->wait(mw_msgs, mw_sleep);  // wait for message to incoming// 从SrsLiveConsumer内部队列中取出数据,// 并通过SrsRtmpServer::send_and_free_messages()函数发送给拉流客户端consumer->dump_packets(&msgs, count);  if (count > 0 && (err = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != srs_success) {return srs_error_wrap(err, "rtmp: send %d messages", count);}}
}

拉流端播放控制命令的处理逻辑

srs_error_t SrsRtmpConn::process_play_control_msg(SrsLiveConsumer* consumer, SrsCommonMessage* msg)
{if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {return err;} // 判断报文必须是控制命令,否则不处理rtmp->decode_message(msg, &pkt);  // 如果是close命令,直接返回错误,结束协程SrsCloseStreamPacket* close = dynamic_cast<SrsCloseStreamPacket*>(pkt);if (close) {return srs_error_new(ERROR_CONTROL_RTMP_CLOSE, "rtmp: close stream");}// 如果是call命令,发送一个响应报文SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt);if (call) {SrsCallResPacket* res = new SrsCallResPacket(call->transaction_id);rtmp->send_and_free_packet(res, 0);}// 如果是pause命令,向客户端发送响应报文,并在本地SrsLiveConsumer对象中设置pause标志SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(pkt);if (pause) {if ((err = rtmp->on_play_client_pause(info->res->stream_id, pause->is_pause)) != srs_success) {return srs_error_wrap(err, "rtmp: pause");}if ((err = consumer->on_play_client_pause(pause->is_pause)) != srs_success) {return srs_error_wrap(err, "rtmp: pause");}return err;}
}

2、Edge模式下回源拉流的处理逻辑

当客户端连接到SRS的Edge站点并请求拉流,而Edge站点还没有缓存用户所需的拉流数据时,SRS会触发回源拉流操作,具体处理逻辑如下:
1)Edge站点根据拉流URL创建一个SrsLiveSource对象

srs_error_t SrsRtmpConn::stream_service_cycle()
{......// 对于拉流端,如果本机没有对应推流端的SrsLiveSource对象,这里则会创建一个SrsLiveSource对象// 并在后续的SrsRtmpConn::playing()函数中,根据edge模式有一个回源拉流的操作_srs_sources->fetch_or_create(req, server, &source); ......
}

2)Edge站点为每条拉流客户端创建一个SrsLiveConsumer消费者对象

srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
{......// 为每个拉流端创建一个SrsLiveConsumer消费者对象,并与SrsLiveSource对象绑定// 这个函数内部包括了edge模式下,edge站点回源拉流的详细处理,这是一个非常重要的特性source->create_consumer(consumer);  ......
}

3)为拉流端创建消费者对象,如果是edge模式,调用SrsPlayEdge::on_client_play()函数,用于启动一个SrsEdgeIngester协程执行回源拉流。

srs_error_t SrsLiveSource::create_consumer(SrsLiveConsumer*& consumer)
{consumer = new SrsLiveConsumer(this);consumers.push_back(consumer);// for edge, when play edge stream, check the stateif (_srs_config->get_vhost_is_edge(req->vhost)) {// notice edge to start for the first client.if ((err = play_edge->on_client_play()) != srs_success) {return srs_error_wrap(err, "play edge");}}return err;
}srs_error_t SrsPlayEdge::on_client_play()
{srs_error_t err = srs_success;// 只有SrsPlayEdge对象的状态是SrsEdgeStateInit状态时,// 才启动一个单独的SrsEdgeIngester协程用于回源拉流if (state == SrsEdgeStateInit) {state = SrsEdgeStatePlay; // 此状态标志防止回源拉流操作被重复执行err = ingester->start();}return err;
}

4)SrsEdgeIngester协程回源拉流的具体执行逻辑

srs_error_t SrsEdgeIngester::cycle()
{while (true) {......if ((err = do_cycle()) != srs_success) {srs_warn("EdgeIngester: Ignore error, %s", srs_error_desc(err).c_str());srs_freep(err);}srs_usleep(SRS_EDGE_INGESTER_CIMS);}return err;
}srs_error_t SrsEdgeIngester::do_cycle()
{......while (true) {......        // 创建SrsEdgeRtmpUpstream对象,其中包含了RTMP客户端SDK// 并通过SrsEdgeRtmpUpstream::connect()函数,选择一个源站,发起连接请求upstream = new SrsEdgeRtmpUpstream(redirect);source->on_source_id_changed(_srs_context->get_id());err = upstream->connect(req, lb));err = edge->on_ingest_play();// 连接成功,则设置SrsPlayEdge对象为已连接状态// set to larger timeout to read av data from origin.upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);err = ingest(redirect); // 此函数内部循环获取拉流数据// 执行到这里表示拉流结束// 如果错误码是ERROR_CONTROL_REDIRECT,表示需要重新选择一个源站继续拉流if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {int port;string server;upstream->selected(server, port);string url = req->get_stream_url();srs_error_reset(err);continue;}// 走到这里,表示拉流结束,结束当前协程if (srs_is_client_gracefully_close(err)) {srs_error_reset(err);}break;}return err;
}

SrsEdgeIngester::ingest()内部调用SrsEdgeRtmpUpstream::recv_message()

->SrsBasicRtmpClient::recv_message()

->SrsRtmpClient::recv_message()

->SrsProtocol::recv_message()

得到一个完整的RTMP Message报文,并通过SrsEdgeIngester::process_publish_message() 处理

srs_error_t SrsEdgeIngester::ingest(string& redirect)
{......        while (true) {......                // read from client.SrsCommonMessage* msg = NULL;if ((err = upstream->recv_message(&msg)) != srs_success) {return srs_error_wrap(err, "recv message");}        if ((err = process_publish_message(msg, redirect)) != srs_success) {return srs_error_wrap(err, "process message");}}return err;
}

SrsEdgeIngester::process_publish_message()处理RTMP Message报文的逻辑是:

1)如果是音视频报文、聚合报文或音视频元数据,直接调用SrsLiveSource的相关方法处理

2)如果是CALL Message报文,则分析报文是否是重定向(redirect)请求,并处理。

srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, string& redirect)
{......    // process audio packetif (msg->header.is_audio()) { source->on_audio(msg); }// process video packetif (msg->header.is_video()) { source->on_video(msg); }// process aggregate packetif (msg->header.is_aggregate()) {source->on_aggregate(msg);return srs_success;}// process onMetaDataif (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {SrsPacket* pkt = NULL;upstream->decode_message(msg, &pkt);SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);source->on_meta_data(msg, metadata);return err;}// call messages, for example, reject, redirect.if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {SrsPacket* pkt = NULL;upstream->decode_message(msg, &pkt);// RTMP 302 redirectSrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt);SrsAmf0Any* prop = NULL;SrsAmf0Object* evt = call->arguments->to_object();if ((prop = evt->ensure_property_string("level")) == NULL) {return srs_success;} else if (prop->to_str() != StatusLevelError) {return srs_success;}if ((prop = evt->get_property("ex")) == NULL || !prop->is_object()) {return srs_success;}SrsAmf0Object* ex = prop->to_object();// The redirect is tcUrl while redirect2 is RTMP URL.// https://github.com/ossrs/srs/issues/1575#issuecomment-574999798if ((prop = ex->ensure_property_string("redirect2")) == NULL) {if ((prop = ex->ensure_property_string("redirect")) == NULL) {return srs_success;}}redirect = prop->to_str();return srs_error_new(ERROR_CONTROL_REDIRECT, "RTMP 302 redirect to %s", redirect.c_str());}return err;
}

总结:

通过前一节和本节的分析分析,我们了解了SRS4.0 RTMP服务模块推拉流的整体逻辑:

1)每个推流客户端对应一个SrsLiveSource对象,每个拉流客户端对应一个SrsLiveConsumer对象。推流端接收协程SrsRecvThread::cycle()将接收到的RTMP数据包通过SrsLiveSource对象最终复制到SrsLiveConsumer对象的数据队列中进行缓存。

2)拉流端处理协程SrsRtmpConn::do_playing()从SrsLiveConsumer对象中不断读取数据并转发给拉流客户端。

3)如果SRS服务器工作在edge模式,

a) SRS将接收到的客户推流数据将通过SrsPublishEdge对象推向源站。

b) 如果本站点还没有缓存用户所需的拉流数据时,SRS会通过SrsPlayEdge::on_client_play()触发回源拉流操作。

补充讨论关于RTMP推拉流的延时问题:
通过学习SRS流媒体服务器推流端和拉流端的代码逻辑,可以知道:
1)在没有拉流客户端时,推流端只能在SrsLiveSource对象中通过SrsMetaCache和SrsGopCache缓存音视频元数据和最多一组GOP视频。
2)有拉流客户端接入时,数据很快会通过批量写的方式发送到拉流客户端
3)但是,如果使用VLC之类的播放工具拉流,会发现播放的视频画面存在8~10秒的延时
出现上述原因,主要是因为VLC有比较大的播放缓存,如果使用ffplay配合nobuffer参数,可以将延时降低到小于1秒。
原文链接:SRS4.0源代码分析之RTMP拉流处理 - 资料 - 我爱音视频网 - 构建全国最权威的音视频技术交流分享论坛

本文福利, C++音视频学习资料包、技术视频,内容包括(音视频开发,面试题,FFmpeg ,webRTC ,rtmp ,hls ,rtsp ,ffplay ,srs,推流拉流)↓↓↓↓↓↓见下面↓↓文章底部↓↓

SRS4.0源代码分析之RTMP拉流处理相关推荐

  1. 5、SRS4.0源代码分析之RTMP拉流处理

    目标: 上一节分析了SRS针对推流客户端的处理逻辑,这里接下来分析针对拉流客户端的处理逻辑. SRS拉流端处理逻辑简单说就是SrsRtmpConn::do_playing()协程从SrsLiveCon ...

  2. 4、SRS4.0源代码分析之RTMP推流处理

    目标:     本章我们将分析SRS4.0 RTMP服务模块与推流相关的代码处理逻辑. 内容:     根据上节内容可知,SRS4.0针对RTMP推流客户端的处理逻辑,主要在协程SrsRtmpConn ...

  3. 10、SRS4.0源代码分析之WebRTC推流端处理

    目标: 上一节分析了SRS4.0中WebRTC模块的总体架构和软件处理流程.接下来分析SRS4.0 WebRTC模块针对客户端推流连接上各种协议报文的软件处理逻辑. 内容: WebRTC模块在启动过程 ...

  4. 13、SRS4.0源代码分析之GB28181实验环境搭建

    前言 严格的说SRS4.0正式发布版本中已经去掉了GB28181相关的代码(主要时因为该特性还有一些Bug需要修复),本文目的是记录之前学习和使用SRS GB28181推流处理的一些心得. 内容 一. ...

  5. vlc播放器或者web实现rtmp拉流

    最简单的拉流莫过于接着第三方播放器了,我们可以利用VLC播放器实现rtmp拉流. 当安装完vlc播放器并且客户端已经在推流了(推流地址为rtmp://127.0.0.1:1935/live/123), ...

  6. js调用vlc_vlc播放器或者web实现rtmp拉流

    最简单的拉流莫过于接着第三方播放器了,我们可以利用VLC播放器实现rtmp拉流. 当安装完vlc播放器并且客户端已经在推流了(推流地址为rtmp://127.0.0.1:1935/live/123), ...

  7. 区块链教程Fabric1.0源代码分析scc(系统链码)

    区块链教程Fabric1.0源代码分析scc(系统链码),2018年下半年,区块链行业正逐渐褪去发展之初的浮躁.回归理性,表面上看相关人才需求与身价似乎正在回落.但事实上,正是初期泡沫的渐退,让人们更 ...

  8. 区块链教程Fabric1.0源代码分析Peer peer channel命令及子命令实现

    区块链教程Fabric1.0源代码分析Peer peer channel命令及子命令实现,2018年下半年,区块链行业正逐渐褪去发展之初的浮躁.回归理性,表面上看相关人才需求与身价似乎正在回落.但事实 ...

  9. 区块链教程Fabric1.0源代码分析Tx(Transaction 交易)一

    区块链教程Fabric1.0源代码分析Tx(Transaction 交易)一,2018年下半年,区块链行业正逐渐褪去发展之初的浮躁.回归理性,表面上看相关人才需求与身价似乎正在回落.但事实上,正是初期 ...

最新文章

  1. 计算机的学生该怎么做?
  2. 这 10 个简单的面试题,却隐藏大坑,大厂的套路防不胜防
  3. 【干货】超全!华为交换机端口vlan详解~
  4. 【译】LXC and LXD: Explaining Linux Containers
  5. 为什么那么好的女孩子还单身?
  6. 见识过世界的强大,才能拥有掌握世界的力量
  7. c语言程序设计编程解读,【答题】C语言程序设计问题与解释实验
  8. C语言进阶——全局变量
  9. Winform中ComcoBox控件设置选定项
  10. AT88SC104 加密认证过程
  11. Unity与UE4引擎源码内使用到的第三方库的比较
  12. matlab电磁场与微波技术相关仿真的代码,几乎覆盖电磁和微波领域
  13. NGS 分析流程 (一)
  14. 威联通QnapClub软件源汉化加速
  15. 计算机乘法原理 移位,原码乘法,原码乘法原理详解
  16. Python进行表格拆分
  17. android有道翻译代码,Android使用有道翻译API实如今线翻译功能(示例代码)
  18. 手把手教你编写一个音乐播放器
  19. 巴纳姆效应心理 学对号入座 营销中惯用的营销心理学
  20. ICCV 2019 | VIPL实验室5篇录取论文详解

热门文章

  1. 适配 iphone 微信h5页面
  2. 怎么调用onenet平台的API从而读取我们的设备数据和下发命令,做到控制开关
  3. 用arduino从零开始做一个《儿童算术智能出题机》——NO.1硬件篇(MAX7219、矩阵键盘、GD3800D、3D打印)
  4. 在R语言下配置企业微信机器人
  5. 【每天一个 Linux 命令】网络相关命令(ifconfig、route、ping、traceroute、netstat、ss、telnet、rcp、scp)
  6. 关系型数据库理论基础阐释
  7. sheepdog简介
  8. mac系统双开应用(QQ、微信)
  9. android移动应用开发 基于adobe air 下载,基于Adobe AIR的下一代移动应用开发
  10. 植物大战僵尸2 服务器维护时间,植物大战僵尸2PVP商店多久更新一次