根据上篇文章,rtmp 推流处理publishing 。do_publishing 处理SrsLiveSource及传入收发SrsPublishRecvThread协程。

srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
{srs_error_t err = srs_success;SrsRequest* req = info->req;if (_srs_config->get_refer_enabled(req->vhost)) {if ((err = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != srs_success) {return srs_error_wrap(err, "rtmp: referer check");}}if ((err = http_hooks_on_publish()) != srs_success) {return srs_error_wrap(err, "rtmp: callback on publish");}// TODO: FIXME: Should refine the state of publishing.if ((err = acquire_publish(source)) == srs_success) {// use isolate thread to recv,// @see: https://github.com/ossrs/srs/issues/237SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());err = do_publishing(source, &rtrd);rtrd.stop();}// whatever the acquire publish, always release publish.// when the acquire error in the midlle-way, the publish state changed,// but failed, so we must cleanup it.// @see https://github.com/ossrs/srs/issues/474// @remark when stream is busy, should never release it.if (srs_error_code(err) != ERROR_SYSTEM_STREAM_BUSY) {release_publish(source);}http_hooks_on_unpublish();return err;
}

do_publishing  :协程执行,处理数据统计。

srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThread* rtrd)
{srs_error_t err = srs_success;SrsRequest* req = info->req;SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();SrsAutoFree(SrsPithyPrint, pprint);// update the statistic when source disconveried.SrsStatistic* stat = SrsStatistic::instance();if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info->type)) != srs_success) {return srs_error_wrap(err, "rtmp: stat client");}// start isolate recv thread.// TODO: FIXME: Pass the callback here.if ((err = rtrd->start()) != srs_success) {return srs_error_wrap(err, "rtmp: receive thread");}// initialize the publish timeout.publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost);// set the sock options.set_sock_options();if (true) {bool mr = _srs_config->get_mr_enabled(req->vhost);srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d",mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout), tcp_nodelay);}int64_t nb_msgs = 0;uint64_t nb_frames = 0;while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "rtmp: thread quit");}pprint->elapse();// cond wait for timeout.if (nb_msgs == 0) {// when not got msgs, wait for a larger timeout.// @see https://github.com/ossrs/srs/issues/441rtrd->wait(publish_1stpkt_timeout);} else {rtrd->wait(publish_normal_timeout);}// check the thread error code.if ((err = rtrd->error_code()) != srs_success) {return srs_error_wrap(err, "rtmp: receive thread");}// when not got any messages, timeout.if (rtrd->nb_msgs() <= nb_msgs) {return srs_error_new(ERROR_SOCKET_TIMEOUT, "rtmp: publish timeout %dms, nb_msgs=%d",nb_msgs? srsu2msi(publish_normal_timeout) : srsu2msi(publish_1stpkt_timeout), (int)nb_msgs);}nb_msgs = rtrd->nb_msgs();// Update the stat for video fps.// @remark https://github.com/ossrs/srs/issues/851SrsStatistic* stat = SrsStatistic::instance();if ((err = stat->on_video_frames(req, (int)(rtrd->nb_video_frames() - nb_frames))) != srs_success) {return srs_error_wrap(err, "rtmp: stat video frames");}nb_frames = rtrd->nb_video_frames();// reportableif (pprint->can_print()) {kbps->sample();bool mr = _srs_config->get_mr_enabled(req->vhost);srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);srs_trace("<- " SRS_CONSTS_LOG_CLIENT_PUBLISH " time=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d, p1stpt=%d, pnt=%d",(int)pprint->age(), kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), mr, srsu2msi(mr_sleep),srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout));}}return err;
}

SrsPublishRecvThread::start   执行:trd.start 。trd指向SrsRecvThread 。

srs_error_t SrsPublishRecvThread::start()
{srs_error_t err = srs_success;if ((err = trd.start()) != srs_success) {err = srs_error_wrap(err, "publish recv thread");}ncid = cid = trd.cid();return err;
}

SrsRecvThread 执行SrsSTCoroutine。SrsSTCoroutine传入this对象cycle()

srs_error_t SrsRecvThread::start()
{srs_error_t err = srs_success;srs_freep(trd);trd = new SrsSTCoroutine("recv", this, _parent_cid);//change stack size to 256K, fix crash when call some 3rd-part api.((SrsSTCoroutine*)trd)->set_stack_size(1 << 18);if ((err = trd->start()) != srs_success) {return srs_error_wrap(err, "recv thread");}return err;
}

SrsRecvThread::cycle

srs_error_t SrsRecvThread::cycle()
{srs_error_t err = srs_success;// the multiple messages writev improve performance large,// but the timeout recv will cause 33% sys call performance,// to use isolate thread to recv, can improve about 33% performance.rtmp->set_recv_timeout(SRS_UTIME_NO_TIMEOUT);pumper->on_start();if ((err = do_cycle()) != srs_success) {err = srs_error_wrap(err, "recv thread");}// reset the timeout to pulse mode.rtmp->set_recv_timeout(timeout);pumper->on_stop();return err;
}

do_cycle  执行。

rtmp->recv_message ,rtmp 收数据。

pumper->consume     数据推流到。pumper->consume   指向。 SrsPublishRecvThread::consume(SrsCommonMessage* msg)

srs_error_t SrsRecvThread::do_cycle()
{srs_error_t err = srs_success;while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "recv thread");}// When the pumper is interrupted, wait then retry.if (pumper->interrupted()) {srs_usleep(timeout);continue;}SrsCommonMessage* msg = NULL;// Process the received message.if ((err = rtmp->recv_message(&msg)) == srs_success) {err = pumper->consume(msg);}if (err != srs_success) {// Interrupt the receive thread for any error.trd->interrupt();// Notify the pumper to quit for error.pumper->interrupt(err);return srs_error_wrap(err, "recv thread");}}return err;
}

SrsPublishRecvThread::consume 。执行_conn->handle_publish_message 函数。

_conn->handle_publish_message  指向SrsRtmpConn::handle_publish_message。

srs_error_t SrsPublishRecvThread::consume(SrsCommonMessage* msg)
{srs_error_t err = srs_success;// when cid changed, change it.if (ncid.compare(cid)) {_srs_context->set_id(ncid);cid = ncid;}_nb_msgs++;if (msg->header.is_video()) {video_frames++;}// log to show the time of recv thread.srs_verbose("recv thread now=%" PRId64 "us, got msg time=%" PRId64 "ms, size=%d",srs_update_system_time(), msg->header.timestamp, msg->size);// the rtmp connection will handle this messageerr = _conn->handle_publish_message(_source, msg);// must always free it,// the source will copy it if need to use.srs_freep(msg);if (err != srs_success) {return srs_error_wrap(err, "handle publish message");}// Yield to another coroutines.// @see https://github.com/ossrs/srs/issues/2194#issuecomment-777463768if (++nn_msgs_for_yield_ >= 15) {nn_msgs_for_yield_ = 0;srs_thread_yield();}return err;
}

handle_publish_message 处理。

srs_error_t SrsRtmpConn::handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
{srs_error_t err = srs_success;// process publish event.if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {SrsPacket* pkt = NULL;if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {return srs_error_wrap(err, "rtmp: decode message");}SrsAutoFree(SrsPacket, pkt);// for flash, any packet is republish.if (info->type == SrsRtmpConnFlashPublish) {// flash unpublish.// TODO: maybe need to support republish.srs_trace("flash flash publish finished.");return srs_error_new(ERROR_CONTROL_REPUBLISH, "rtmp: republish");}// for fmle, drop others except the fmle start packet.if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);if ((err = rtmp->fmle_unpublish(info->res->stream_id, unpublish->transaction_id)) != srs_success) {return srs_error_wrap(err, "rtmp: republish");}return srs_error_new(ERROR_CONTROL_REPUBLISH, "rtmp: republish");}srs_trace("fmle ignore AMF0/AMF3 command message.");return err;}// video, audio, data messageif ((err = process_publish_message(source, msg)) != srs_success) {return srs_error_wrap(err, "rtmp: consume message");}return err;
}

process_publish_message ,处理source->on_audio 及source->on_video 函数。

srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
{srs_error_t err = srs_success;// for edge, directly proxy message to origin.if (info->edge) {if ((err = source->on_edge_proxy_publish(msg)) != srs_success) {return srs_error_wrap(err, "rtmp: proxy publish");}return err;}// process audio packetif (msg->header.is_audio()) {if ((err = source->on_audio(msg)) != srs_success) {return srs_error_wrap(err, "rtmp: consume audio");}return err;}// process video packetif (msg->header.is_video()) {if ((err = source->on_video(msg)) != srs_success) {return srs_error_wrap(err, "rtmp: consume video");}return err;}// process aggregate packetif (msg->header.is_aggregate()) {if ((err = source->on_aggregate(msg)) != srs_success) {return srs_error_wrap(err, "rtmp: consume aggregate");}return err;}// process onMetaDataif (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {SrsPacket* pkt = NULL;if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {return srs_error_wrap(err, "rtmp: decode message");}SrsAutoFree(SrsPacket, pkt);if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);if ((err = source->on_meta_data(msg, metadata)) != srs_success) {return srs_error_wrap(err, "rtmp: consume metadata");}return err;}return err;}return err;
}

2. 数据发送:

srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
{srs_error_t err = srs_success;// Check page referer of player.SrsRequest* req = info->req;if (_srs_config->get_refer_enabled(req->vhost)) {if ((err = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != srs_success) {return srs_error_wrap(err, "rtmp: referer check");}}// When origin cluster enabled, try to redirect to the origin which is active.// A active origin is a server which is delivering stream.if (!info->edge && _srs_config->get_vhost_origin_cluster(req->vhost) && source->inactive()) {vector<string> coworkers = _srs_config->get_vhost_coworkers(req->vhost);for (int i = 0; i < (int)coworkers.size(); i++) {// TODO: FIXME: User may config the server itself as coworker, we must identify and ignore it.string host; int port = 0; string coworker = coworkers.at(i);string url = "http://" + coworker + "/api/v1/clusters?"+ "vhost=" + req->vhost + "&ip=" + req->host + "&app=" + req->app + "&stream=" + req->stream+ "&coworker=" + coworker;if ((err = SrsHttpHooks::discover_co_workers(url, host, port)) != srs_success) {// If failed to discovery stream in this coworker, we should request the next one util the last.// @see https://github.com/ossrs/srs/issues/1223if (i < (int)coworkers.size() - 1) {continue;}return srs_error_wrap(err, "discover coworkers, url=%s", url.c_str());}string rurl = srs_generate_rtmp_url(host, port, req->host, req->vhost, req->app, req->stream, req->param);srs_trace("rtmp: redirect in cluster, from=%s:%d, target=%s:%d, url=%s, rurl=%s",req->host.c_str(), req->port, host.c_str(), port, url.c_str(), rurl.c_str());// Ignore if host or port is invalid.if (host.empty() || port == 0) {continue;}bool accepted = false;if ((err = rtmp->redirect(req, rurl, accepted)) != srs_success) {srs_error_reset(err);} else {return srs_error_new(ERROR_CONTROL_REDIRECT, "redirected");}}return srs_error_new(ERROR_OCLUSTER_REDIRECT, "no origin");}// Set the socket options for transport.set_sock_options();// Create a consumer of source.SrsLiveConsumer* consumer = NULL;SrsAutoFree(SrsLiveConsumer, consumer);if ((err = source->create_consumer(consumer)) != srs_success) {return srs_error_wrap(err, "rtmp: create consumer");}if ((err = source->consumer_dumps(consumer)) != srs_success) {return srs_error_wrap(err, "rtmp: dumps consumer");}// Use receiving thread to receive packets from peer.SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id());if ((err = trd.start()) != srs_success) {return srs_error_wrap(err, "rtmp: start receive thread");}// Deliver packets to peer.wakable = consumer;err = do_playing(source, consumer, &trd);wakable = NULL;trd.stop();// Drop all packets in receiving thread.if (!trd.empty()) {srs_warn("drop the received %d messages", trd.size());}return err;
}

do_playing  函数。

consumer->dump_packets 从缓存 取出数据,

rtmp->send_and_free_messages 发送数据。

srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd)
{srs_error_t err = srs_success;SrsRequest* req = info->req;srs_assert(req);srs_assert(consumer);// update the statistic when source disconveried.SrsStatistic* stat = SrsStatistic::instance();if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info->type)) != srs_success) {return srs_error_wrap(err, "rtmp: stat client");}// initialize other componentsSrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();SrsAutoFree(SrsPithyPrint, pprint);SrsMessageArray msgs(SRS_PERF_MW_MSGS);bool user_specified_duration_to_stop = (req->duration > 0);int64_t starttime = -1;// setup the realtime.realtime = _srs_config->get_realtime_enabled(req->vhost);// setup the mw config.// when mw_sleep changed, resize the socket send buffer.mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);mw_sleep = _srs_config->get_mw_sleep(req->vhost);skt->set_socket_buffer(mw_sleep);// initialize the send_min_intervalsend_min_interval = _srs_config->get_send_min_interval(req->vhost);srs_trace("start play smi=%dms, mw_sleep=%d, mw_msgs=%d, realtime=%d, tcp_nodelay=%d",srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_msgs, realtime, tcp_nodelay);while (true) {// when source is set to expired, disconnect it.if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "rtmp: thread quit");}// collect elapse for pithy print.pprint->elapse();// to use isolate thread to recv, can improve about 33% performance.while (!rtrd->empty()) {SrsCommonMessage* msg = rtrd->pump();if ((err = process_play_control_msg(consumer, msg)) != srs_success) {return srs_error_wrap(err, "rtmp: play control message");}}// quit when recv thread error.if ((err = rtrd->error_code()) != srs_success) {return srs_error_wrap(err, "rtmp: recv thread");}#ifdef SRS_PERF_QUEUE_COND_WAIT// wait for message to incoming.// @see https://github.com/ossrs/srs/issues/257consumer->wait(mw_msgs, mw_sleep);
#endif// get messages from consumer.// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.// @remark when enable send_min_interval, only fetch one message a time.int count = (send_min_interval > 0)? 1 : 0;if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {return srs_error_wrap(err, "rtmp: consumer dump packets");}// reportableif (pprint->can_print()) {kbps->sample();srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d/%d",(int)pprint->age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep), mw_msgs);}if (count <= 0) {
#ifndef SRS_PERF_QUEUE_COND_WAITsrs_usleep(mw_sleep);
#endif// ignore when nothing got.continue;}// only when user specifies the duration,// we start to collect the durations for each message.if (user_specified_duration_to_stop) {for (int i = 0; i < count; i++) {SrsSharedPtrMessage* msg = msgs.msgs[i];// foreach msg, collect the duration.// @remark: never use msg when sent it, for the protocol sdk will free it.if (starttime < 0 || starttime > msg->timestamp) {starttime = msg->timestamp;}duration += (msg->timestamp - starttime) * SRS_UTIME_MILLISECONDS;starttime = msg->timestamp;}}// sendout messages, all messages are freed by send_and_free_messages().// no need to assert msg, for the rtmp wilal assert it.printf("%s %d %d\n",__FUNCTION__,__LINE__,count);if (count > 0 && (err = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != srs_success) {return srs_error_wrap(err, "rtmp: send %d messages", count);}// if duration specified, and exceed it, stop play live.// @see: https://github.com/ossrs/srs/issues/45if (user_specified_duration_to_stop) {if (duration >= req->duration) {return srs_error_new(ERROR_RTMP_DURATION_EXCEED, "rtmp: time %d up %d", srsu2msi(duration), srsu2msi(req->duration));}}// apply the minimal interval for delivery stream in srs_utime_t.if (send_min_interval > 0) {srs_usleep(send_min_interval);}// Yield to another coroutines.// @see https://github.com/ossrs/srs/issues/2194#issuecomment-777437476srs_thread_yield();}return err;
}

流媒体分析之rtmp协议srs服务器数据收发相关推荐

  1. 流媒体协议分析之webrtc 协议 srs 服务器实现

    1.信令交互  ,sdp信令交互. listen_udp : 注册udp 监听: listen_api :注册信令交互接口: #ifdef SRS_RTC_srs_hybrid->registe ...

  2. RTMP协议之AMF数据

    一.RTMP组成 1.RTMP包头 RTMP协议封包 由一个包头和一个包体组成,包头可以是4种长度的任意一种:12, 8, 4,  1 byte(s).完整的RTMP包头应该是12bytes,包含了时 ...

  3. 流媒体分析之srt 协议libsrt 实现

    1. srt协议概述 SRT协议能够在不可预测的互联网环境下提供安全.可靠的数据传输,目前广泛应用在流媒体传输领域.理论上SRT可以传输任意类型的数据,但由于其特别针对实时音视频传输做了优化,目前的主 ...

  4. 流媒体分析之srt 协议mpegts 封装

    一.TS 格式标准介绍 TS是一种音视频封装格式,全称为MPEG2-TS.其中TS即"Transport Stream"的缩写. 先简要介绍一下什么是MPEG2-TS: DVD的音 ...

  5. 服务器数据收发测试软件,Packet Sender(UDP/TCP网络测试工具) v7.0.5

    Packet Sender(UDP/TCP网络测试工具)是一个开源实用程序,允许发送和接收TCP.UDP和SSL(加密的TCP)数据包,主线分支正式支持Windows.Mac和桌面Linux,其他地方 ...

  6. RTMP协议深度解析:从原理到实践,掌握实时流媒体传输技术

    目录标题 1. 引言 1.1 流媒体传输技术的重要性 1.2 为什么选择RTMP协议 1.3 RTMP协议的发展与应用 2. RTMP协议基础 2.1 RTMP协议简介 2.2 RTMP协议与其他流媒 ...

  7. RTMP协议中文翻译(首发)(转)

    Adobe公司的实时消息传输协议 摘要 此备忘录描述了 Adobe公司的实时消息传输协议(RTMP),此协议从属于应用层,被设计用来在适合的传输协议(如TCP)上复用和打包多媒体传输流(如音频.视频和 ...

  8. rtmplib rtmp协议过程分析

    转自:http://chenzhenianqing.cn/articles/1009.html 写的很好,收藏如下,向作者致敬! 没事碰到了librtmp库,这个库是ffmpeg的依赖库,用来接收,发 ...

  9. 流媒体传输 - RTMP 协议报文分析

    握手之后,连接开始对一个或多个 chunk stream 进行合并.创建的每个块都有一个唯一 id 对其进行关联,这个 id 叫做 chunk stream id.这些块通过网络进行传输.传递时,每个 ...

最新文章

  1. 关于WPF的ComboBox中Items太多而导致加载过慢的问题
  2. 知识图谱基础知识(一): 概念和构建
  3. import lombok 报错_Android上使用Lombok和set、get方法告别
  4. 在diy的文件系统上创建文件的流程
  5. python协程实现一万并发_python中的协程并发
  6. 切单个图标为背景透明的方法
  7. linux集群中mpi的并行计算环境简单配置,linux集群中MPI的并行计算环境简单配置(转)...
  8. Kotlin — 适用于服务器开发
  9. Redis源代码分析(三十)--- pubsub公布订阅模式
  10. 小程序中的flex_在Flex应用程序中启用辅助功能
  11. 如何利用视频做动图?视频转gif动图
  12. 渲染的本质: 纹理过滤(Texture filtering)技术
  13. php集成c sdk,GitHub - cuncle/spider-php-sdk
  14. php取雅加达时间,2018年雅加达亚运会电竞赛程表 8月电子竞技比赛时间一览
  15. 开放平台–扫描微信二维码登录
  16. 古诗词-飞火在线工具
  17. 文件名依照字符串和数字进行排序
  18. 【NOIP2010普及组】三国游戏题解
  19. 如何成为名副其实的测试架构师?
  20. 程序员噩梦typescript+vue3

热门文章

  1. Differential dataflow 微分数据流
  2. 智能网联汽车 自动驾驶地图数据质量规范
  3. matlab画平行板电场,MATLAB静电场电场电势
  4. 冲量在线当选中关村数字经济产业联盟理事单位
  5. 程序员之天梯排行榜,你在哪一级?
  6. 荣耀YOYO建议新增快递取件服务
  7. 用lua实现竖列转盘游戏
  8. 20180925_Python练习题-三:一个商场在降价促销。如果购买金额50~100元(包含50元和100元)之间,会给10%的折扣;如果购买金额大于100元,会給20%折扣。编写一程序,询问购买价
  9. 神秘国度的爱情故事 数据结构课设-广州大学
  10. 做模具设计需要知道的6大系统的设计原则,老板就再也不说我了