SRS4.0源代码分析之RTMP拉流处理
目标:
上一节分析了SRS针对推流客户端的处理逻辑,这里接下来分析针对拉流客户端的处理逻辑。
SRS拉流端处理逻辑简单说就是SrsRtmpConn::do_playing()协程从SrsLiveConsumer对象中不断读取数据并转发给拉流客户端的过程。另外,在这个过程中,如果服务器处于edge模式,还有一个回源拉流的处理过程需要重点分析。
内容:
1、拉流端整体处理流程
srs_error_t SrsRtmpConn::stream_service_cycle()
{rtmp->identify_client(&info->type); // 对于推流端,每个推流端通过fetch_or_create函数生成一个对应的SrsLiveSource对象// 对于拉流端,// 1)如果本机已经有对应推流端的SrsLiveSource对象,则根据拉流地址字符串(/live/stream)直接获取// 2)如果本机没有对应推流端的SrsLiveSource对象,这里则会创建一个SrsLiveSource对象// 并在后续的SrsRtmpConn::playing()函数中,根据edge模式有一个回源拉流的操作_srs_sources->fetch_or_create(req, server, &source); switch (info->type) {case SrsRtmpConnPlay:rtmp->start_play(info->res->stream_id); // 发送拉流响应报文http_hooks_on_play(); // 执行HTTP回调处理,对外通知客户端拉流开始playing(source); // 拉流处理逻辑http_hooks_on_stop(); //执行HTTP回调处理,对外通知客户端拉流结束return err;}
}
1、如果是源站,支持源站集群,且本地没有当前需要的拉流,则向配置的协同源站查询,最终将查询结果通知拉流客户端
2、为每个拉流端创建一个SrsLiveConsumer消费者对象,并与推流端SrsLiveSource对象绑定,将source中缓存的meta和GOP数据发送到消费者队列
3、创建并启动一个拉流端接收协程,此协程主要工作是接收拉流客户端的播放控制命令(暂停、继续)
4、真正的拉流协程do_playing()就阻塞在条件变量mw_wait,等待有新数据时被唤醒。
本文福利, C++音视频学习资料包、技术视频,内容包括(音视频开发,面试题,FFmpeg ,webRTC ,rtmp ,hls ,rtsp ,ffplay ,srs,推流拉流)↓↓↓↓↓↓见下面↓↓文章底部↓↓
srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
{// 这里判断本机是源站模式,且支持源集群,且当前source对象并没有客户端推流,才进入此分支// 这个分支基本上包括了源站集群的全部处理逻辑if (!info->edge && _srs_config->get_vhost_origin_cluster(req->vhost) && source->inactive()) {// 从配置文件中读取其它协同工作源站的地址vector<string> coworkers = _srs_config->get_vhost_coworkers(req->vhost);// 向每个协同工作源站查询是否存在指定的流(/live/stream)for (int i = 0; i < (int)coworkers.size(); i++) {// 向配置的协同源站发起HTTP查询请求SrsHttpHooks::discover_co_workers(url, host, port);// 根据协同工作源站返回的信息生成新URLstring rurl = srs_generate_rtmp_url(host, port, req->host, req->vhost, );if (host.empty() || port == 0) { continue; } // 如果是无效URL,则继续查询// 使用新生成的URL向客户端返回重定向响应rtmp->redirect(req, rurl, accepted); return srs_error_new(ERROR_CONTROL_REDIRECT, "redirected");}}// 为每个拉流端创建一个SrsLiveConsumer消费者对象,并与SrsLiveSource对象绑定// 这个函数内部包括了edge模式下,edge站点回源拉流的详细处理,这是一个非常重要的特性,需要单独分析// 这里我们暂时认为,通过此函数的处理,拉流端需要的数据流,已经被推送到了本站点source->create_consumer(consumer); // 这里每次创建一个新的消费者对象时,都会首先将source中缓存的meta和GOP数据发送到消费者队列source->consumer_dumps(consumer); // 创建并启动一个拉流端接收协程,此协程主要工作是接收客户端的播放控制命令(暂停、继续)SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, );trd.start(); do_playing(source, consumer, &trd); // 进入拉流处理逻辑trd.stop(); // 拉流结束return err;
}
拉流端接收协程SrsQueueRecvThread trd的处理逻辑和推流端接收协程完全一致(在srs_app_recv_thread.cpp文件中)
1、调用SrsRtmpServer::recv_message()->SrsProtocol::recv_message()接收一个完整的RTMP数据包。
2、调用ISrsMessageConsumer::consume()虚接口向外复制数据,此时实际调用的接口是SrsQueueRecvThread::consume()。
3、最终数据包被保存到SrsQueueRecvThread对象的std::vector<SrsCommonMessage*>queue队列,并调用SrsLiveConsumer::wakeup()接口,唤醒条件变量mw_wait,因为真正的拉流协程do_playing()就阻塞在条件变量mw_wait,等待有新数据时被唤醒。
srs_error_t SrsRecvThread::do_cycle()
{rtmp->recv_message(&msg); // 此函数用于得到一个Message结构pumper->consume(msg); // 对于推流端,pumper对象类型为SrsPublishRecvThread// 对于拉流端,pumper对象类型为SrsQueueRecvThread
}srs_error_t SrsQueueRecvThread::consume(SrsCommonMessage* msg)
{queue.push_back(msg); // 接收拉流客户端发送来的播放控制命令并放入队列// 另一个协程do_playing()通过rtrd->pump()从队列中取报文_consumer->wakeup(); // 唤醒消费者所在的协程,这里具体就是do_playing()协程// do_playing()协程唤醒后,会调用SrsLiveConsumer::dump_packets// 获取数据并向客户端发送。
}void SrsLiveConsumer::wakeup()
{if (mw_waiting) {srs_cond_signal(mw_wait);mw_waiting = false;}
}
SrsRtmpConn::do_playing()协程的处理逻辑:
1、调用SrsQueueRecvThread::empty()检查拉流端接收协程的队列中是否有等待处理的控制命令报文,如果有,则调用process_play_control_msg()处理控制命令。如果协程处理出错,则退出。
2、调用SrsLiveConsumer::wait()将本线程阻塞在条件变量mw_wait上,等待接收到新数据或播放端控制命令。
3、协程被唤醒后,调用SrsLiveConsumer::dump_packets()取数据,并通过SrsRtmpServer::send_and_free_messages()函数发送给拉流客户端。
srs_error_t SrsRtmpConn::do_playing()
{while (true) { if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "rtmp: thread quit");} while (!rtrd->empty()) { // 接收协程缓存队列不为空,表示接收到播放端发送的控制命令SrsCommonMessage* msg = rtrd->pump();process_play_control_msg(consumer, msg); // 处理播放端控制指令}// 如果协程运行状态出错,则退出协程if ((err = rtrd->error_code()) != srs_success) {return srs_error_wrap(err, "rtmp: recv thread");}// 调用SrsLiveConsumer::wait()阻塞在条件变量mw_wait上,等待被唤醒// 1、推流端接收到推流数据,并调用SrsLiveConsumer::enqueue()时,会唤醒此条件变量// 2、拉流端接收到用户发送的播放控制命令时,调用SrsLiveConsumer::wakeup()唤醒此条件变量consumer->wait(mw_msgs, mw_sleep); // wait for message to incoming// 从SrsLiveConsumer内部队列中取出数据,// 并通过SrsRtmpServer::send_and_free_messages()函数发送给拉流客户端consumer->dump_packets(&msgs, count); if (count > 0 && (err = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != srs_success) {return srs_error_wrap(err, "rtmp: send %d messages", count);}}
}
拉流端播放控制命令的处理逻辑
srs_error_t SrsRtmpConn::process_play_control_msg(SrsLiveConsumer* consumer, SrsCommonMessage* msg)
{if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {return err;} // 判断报文必须是控制命令,否则不处理rtmp->decode_message(msg, &pkt); // 如果是close命令,直接返回错误,结束协程SrsCloseStreamPacket* close = dynamic_cast<SrsCloseStreamPacket*>(pkt);if (close) {return srs_error_new(ERROR_CONTROL_RTMP_CLOSE, "rtmp: close stream");}// 如果是call命令,发送一个响应报文SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt);if (call) {SrsCallResPacket* res = new SrsCallResPacket(call->transaction_id);rtmp->send_and_free_packet(res, 0);}// 如果是pause命令,向客户端发送响应报文,并在本地SrsLiveConsumer对象中设置pause标志SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(pkt);if (pause) {if ((err = rtmp->on_play_client_pause(info->res->stream_id, pause->is_pause)) != srs_success) {return srs_error_wrap(err, "rtmp: pause");}if ((err = consumer->on_play_client_pause(pause->is_pause)) != srs_success) {return srs_error_wrap(err, "rtmp: pause");}return err;}
}
2、Edge模式下回源拉流的处理逻辑
当客户端连接到SRS的Edge站点并请求拉流,而Edge站点还没有缓存用户所需的拉流数据时,SRS会触发回源拉流操作,具体处理逻辑如下:
1)Edge站点根据拉流URL创建一个SrsLiveSource对象
srs_error_t SrsRtmpConn::stream_service_cycle()
{......// 对于拉流端,如果本机没有对应推流端的SrsLiveSource对象,这里则会创建一个SrsLiveSource对象// 并在后续的SrsRtmpConn::playing()函数中,根据edge模式有一个回源拉流的操作_srs_sources->fetch_or_create(req, server, &source); ......
}
2)Edge站点为每条拉流客户端创建一个SrsLiveConsumer消费者对象
srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
{......// 为每个拉流端创建一个SrsLiveConsumer消费者对象,并与SrsLiveSource对象绑定// 这个函数内部包括了edge模式下,edge站点回源拉流的详细处理,这是一个非常重要的特性source->create_consumer(consumer); ......
}
3)为拉流端创建消费者对象,如果是edge模式,调用SrsPlayEdge::on_client_play()函数,用于启动一个SrsEdgeIngester协程执行回源拉流。
srs_error_t SrsLiveSource::create_consumer(SrsLiveConsumer*& consumer)
{consumer = new SrsLiveConsumer(this);consumers.push_back(consumer);// for edge, when play edge stream, check the stateif (_srs_config->get_vhost_is_edge(req->vhost)) {// notice edge to start for the first client.if ((err = play_edge->on_client_play()) != srs_success) {return srs_error_wrap(err, "play edge");}}return err;
}srs_error_t SrsPlayEdge::on_client_play()
{srs_error_t err = srs_success;// 只有SrsPlayEdge对象的状态是SrsEdgeStateInit状态时,// 才启动一个单独的SrsEdgeIngester协程用于回源拉流if (state == SrsEdgeStateInit) {state = SrsEdgeStatePlay; // 此状态标志防止回源拉流操作被重复执行err = ingester->start();}return err;
}
4)SrsEdgeIngester协程回源拉流的具体执行逻辑
srs_error_t SrsEdgeIngester::cycle()
{while (true) {......if ((err = do_cycle()) != srs_success) {srs_warn("EdgeIngester: Ignore error, %s", srs_error_desc(err).c_str());srs_freep(err);}srs_usleep(SRS_EDGE_INGESTER_CIMS);}return err;
}srs_error_t SrsEdgeIngester::do_cycle()
{......while (true) {...... // 创建SrsEdgeRtmpUpstream对象,其中包含了RTMP客户端SDK// 并通过SrsEdgeRtmpUpstream::connect()函数,选择一个源站,发起连接请求upstream = new SrsEdgeRtmpUpstream(redirect);source->on_source_id_changed(_srs_context->get_id());err = upstream->connect(req, lb));err = edge->on_ingest_play();// 连接成功,则设置SrsPlayEdge对象为已连接状态// set to larger timeout to read av data from origin.upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);err = ingest(redirect); // 此函数内部循环获取拉流数据// 执行到这里表示拉流结束// 如果错误码是ERROR_CONTROL_REDIRECT,表示需要重新选择一个源站继续拉流if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {int port;string server;upstream->selected(server, port);string url = req->get_stream_url();srs_error_reset(err);continue;}// 走到这里,表示拉流结束,结束当前协程if (srs_is_client_gracefully_close(err)) {srs_error_reset(err);}break;}return err;
}
SrsEdgeIngester::ingest()内部调用SrsEdgeRtmpUpstream::recv_message()
->SrsBasicRtmpClient::recv_message()
->SrsRtmpClient::recv_message()
->SrsProtocol::recv_message()
得到一个完整的RTMP Message报文,并通过SrsEdgeIngester::process_publish_message() 处理
srs_error_t SrsEdgeIngester::ingest(string& redirect)
{...... while (true) {...... // read from client.SrsCommonMessage* msg = NULL;if ((err = upstream->recv_message(&msg)) != srs_success) {return srs_error_wrap(err, "recv message");} if ((err = process_publish_message(msg, redirect)) != srs_success) {return srs_error_wrap(err, "process message");}}return err;
}
SrsEdgeIngester::process_publish_message()处理RTMP Message报文的逻辑是:
1)如果是音视频报文、聚合报文或音视频元数据,直接调用SrsLiveSource的相关方法处理
2)如果是CALL Message报文,则分析报文是否是重定向(redirect)请求,并处理。
srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, string& redirect)
{...... // process audio packetif (msg->header.is_audio()) { source->on_audio(msg); }// process video packetif (msg->header.is_video()) { source->on_video(msg); }// process aggregate packetif (msg->header.is_aggregate()) {source->on_aggregate(msg);return srs_success;}// process onMetaDataif (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {SrsPacket* pkt = NULL;upstream->decode_message(msg, &pkt);SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);source->on_meta_data(msg, metadata);return err;}// call messages, for example, reject, redirect.if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {SrsPacket* pkt = NULL;upstream->decode_message(msg, &pkt);// RTMP 302 redirectSrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt);SrsAmf0Any* prop = NULL;SrsAmf0Object* evt = call->arguments->to_object();if ((prop = evt->ensure_property_string("level")) == NULL) {return srs_success;} else if (prop->to_str() != StatusLevelError) {return srs_success;}if ((prop = evt->get_property("ex")) == NULL || !prop->is_object()) {return srs_success;}SrsAmf0Object* ex = prop->to_object();// The redirect is tcUrl while redirect2 is RTMP URL.// https://github.com/ossrs/srs/issues/1575#issuecomment-574999798if ((prop = ex->ensure_property_string("redirect2")) == NULL) {if ((prop = ex->ensure_property_string("redirect")) == NULL) {return srs_success;}}redirect = prop->to_str();return srs_error_new(ERROR_CONTROL_REDIRECT, "RTMP 302 redirect to %s", redirect.c_str());}return err;
}
总结:
通过前一节和本节的分析分析,我们了解了SRS4.0 RTMP服务模块推拉流的整体逻辑:
1)每个推流客户端对应一个SrsLiveSource对象,每个拉流客户端对应一个SrsLiveConsumer对象。推流端接收协程SrsRecvThread::cycle()将接收到的RTMP数据包通过SrsLiveSource对象最终复制到SrsLiveConsumer对象的数据队列中进行缓存。
2)拉流端处理协程SrsRtmpConn::do_playing()从SrsLiveConsumer对象中不断读取数据并转发给拉流客户端。
3)如果SRS服务器工作在edge模式,
a) SRS将接收到的客户推流数据将通过SrsPublishEdge对象推向源站。
b) 如果本站点还没有缓存用户所需的拉流数据时,SRS会通过SrsPlayEdge::on_client_play()触发回源拉流操作。
补充讨论关于RTMP推拉流的延时问题:
通过学习SRS流媒体服务器推流端和拉流端的代码逻辑,可以知道:
1)在没有拉流客户端时,推流端只能在SrsLiveSource对象中通过SrsMetaCache和SrsGopCache缓存音视频元数据和最多一组GOP视频。
2)有拉流客户端接入时,数据很快会通过批量写的方式发送到拉流客户端
3)但是,如果使用VLC之类的播放工具拉流,会发现播放的视频画面存在8~10秒的延时
出现上述原因,主要是因为VLC有比较大的播放缓存,如果使用ffplay配合nobuffer参数,可以将延时降低到小于1秒。
原文链接:SRS4.0源代码分析之RTMP拉流处理 - 资料 - 我爱音视频网 - 构建全国最权威的音视频技术交流分享论坛
本文福利, C++音视频学习资料包、技术视频,内容包括(音视频开发,面试题,FFmpeg ,webRTC ,rtmp ,hls ,rtsp ,ffplay ,srs,推流拉流)↓↓↓↓↓↓见下面↓↓文章底部↓↓
SRS4.0源代码分析之RTMP拉流处理相关推荐
- 5、SRS4.0源代码分析之RTMP拉流处理
目标: 上一节分析了SRS针对推流客户端的处理逻辑,这里接下来分析针对拉流客户端的处理逻辑. SRS拉流端处理逻辑简单说就是SrsRtmpConn::do_playing()协程从SrsLiveCon ...
- 4、SRS4.0源代码分析之RTMP推流处理
目标: 本章我们将分析SRS4.0 RTMP服务模块与推流相关的代码处理逻辑. 内容: 根据上节内容可知,SRS4.0针对RTMP推流客户端的处理逻辑,主要在协程SrsRtmpConn ...
- 10、SRS4.0源代码分析之WebRTC推流端处理
目标: 上一节分析了SRS4.0中WebRTC模块的总体架构和软件处理流程.接下来分析SRS4.0 WebRTC模块针对客户端推流连接上各种协议报文的软件处理逻辑. 内容: WebRTC模块在启动过程 ...
- 13、SRS4.0源代码分析之GB28181实验环境搭建
前言 严格的说SRS4.0正式发布版本中已经去掉了GB28181相关的代码(主要时因为该特性还有一些Bug需要修复),本文目的是记录之前学习和使用SRS GB28181推流处理的一些心得. 内容 一. ...
- vlc播放器或者web实现rtmp拉流
最简单的拉流莫过于接着第三方播放器了,我们可以利用VLC播放器实现rtmp拉流. 当安装完vlc播放器并且客户端已经在推流了(推流地址为rtmp://127.0.0.1:1935/live/123), ...
- js调用vlc_vlc播放器或者web实现rtmp拉流
最简单的拉流莫过于接着第三方播放器了,我们可以利用VLC播放器实现rtmp拉流. 当安装完vlc播放器并且客户端已经在推流了(推流地址为rtmp://127.0.0.1:1935/live/123), ...
- 区块链教程Fabric1.0源代码分析scc(系统链码)
区块链教程Fabric1.0源代码分析scc(系统链码),2018年下半年,区块链行业正逐渐褪去发展之初的浮躁.回归理性,表面上看相关人才需求与身价似乎正在回落.但事实上,正是初期泡沫的渐退,让人们更 ...
- 区块链教程Fabric1.0源代码分析Peer peer channel命令及子命令实现
区块链教程Fabric1.0源代码分析Peer peer channel命令及子命令实现,2018年下半年,区块链行业正逐渐褪去发展之初的浮躁.回归理性,表面上看相关人才需求与身价似乎正在回落.但事实 ...
- 区块链教程Fabric1.0源代码分析Tx(Transaction 交易)一
区块链教程Fabric1.0源代码分析Tx(Transaction 交易)一,2018年下半年,区块链行业正逐渐褪去发展之初的浮躁.回归理性,表面上看相关人才需求与身价似乎正在回落.但事实上,正是初期 ...
最新文章
- 计算机的学生该怎么做?
- 这 10 个简单的面试题,却隐藏大坑,大厂的套路防不胜防
- 【干货】超全!华为交换机端口vlan详解~
- 【译】LXC and LXD: Explaining Linux Containers
- 为什么那么好的女孩子还单身?
- 见识过世界的强大,才能拥有掌握世界的力量
- c语言程序设计编程解读,【答题】C语言程序设计问题与解释实验
- C语言进阶——全局变量
- Winform中ComcoBox控件设置选定项
- AT88SC104 加密认证过程
- Unity与UE4引擎源码内使用到的第三方库的比较
- matlab电磁场与微波技术相关仿真的代码,几乎覆盖电磁和微波领域
- NGS 分析流程 (一)
- 威联通QnapClub软件源汉化加速
- 计算机乘法原理 移位,原码乘法,原码乘法原理详解
- Python进行表格拆分
- android有道翻译代码,Android使用有道翻译API实如今线翻译功能(示例代码)
- 手把手教你编写一个音乐播放器
- 巴纳姆效应心理 学对号入座 营销中惯用的营销心理学
- ICCV 2019 | VIPL实验室5篇录取论文详解
热门文章
- 适配 iphone 微信h5页面
- 怎么调用onenet平台的API从而读取我们的设备数据和下发命令,做到控制开关
- 用arduino从零开始做一个《儿童算术智能出题机》——NO.1硬件篇(MAX7219、矩阵键盘、GD3800D、3D打印)
- 在R语言下配置企业微信机器人
- 【每天一个 Linux 命令】网络相关命令(ifconfig、route、ping、traceroute、netstat、ss、telnet、rcp、scp)
- 关系型数据库理论基础阐释
- sheepdog简介
- mac系统双开应用(QQ、微信)
- android移动应用开发 基于adobe air 下载,基于Adobe AIR的下一代移动应用开发
- 植物大战僵尸2 服务器维护时间,植物大战僵尸2PVP商店多久更新一次