1、线程模型


srs使用了state-threads协程库,是单线程多协程模型。 这个协程的概念类似于lua的协程,都是单线程中可以创建多个协程。而golang中的goroutine协程是多线程并发的,goroutine有可能运行在同一个线程也可能在不同线程,这样就有了线程安全问题,所以需要chan通信或者mutex加锁共享资源。 而srs因为是单线程多协程所以不用考虑线程安全,数据不用加锁。

2、主流程分析


撇掉程序启动的一些初始化和设置,直接进入:

int SrsServer::listen()
{int ret = ERROR_SUCCESS;if ((ret = listen_rtmp()) != ERROR_SUCCESS) {return ret;}if ((ret = listen_http_api()) != ERROR_SUCCESS) {return ret;}if ((ret = listen_http_stream()) != ERROR_SUCCESS) {return ret;}if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {return ret;}return ret;
}

先看看listen_rtmp():

int SrsServer::listen_rtmp()
{int ret = ERROR_SUCCESS;// stream service port.std::vector<std::string> ip_ports = _srs_config->get_listens();srs_assert((int)ip_ports.size() > 0);close_listeners(SrsListenerRtmpStream);for (int i = 0; i < (int)ip_ports.size(); i++) {SrsListener* listener = new SrsStreamListener(this, SrsListenerRtmpStream);listeners.push_back(listener);std::string ip;int port;srs_parse_endpoint(ip_ports[i], ip, port);if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {srs_error("RTMP stream listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);return ret;}}return ret;
}

创建了SrsStreamListener,在SrsStreamListener::listen中又创建了SrsTcpListener进行listen

SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
{handler = h;ip = i;port = p;
​_fd = -1;_stfd = NULL;
​pthread = new SrsReusableThread("tcp", this);
}

SrsTcpListener中创建了pthread: SrsReusableThread。 在int SrsTcpListener::listen()中调用了pthread->start(),协程会回调到int SrsTcpListener::cycle()

int SrsTcpListener::cycle()
{int ret = ERROR_SUCCESS;st_netfd_t client_stfd = st_accept(_stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT);if(client_stfd == NULL){// ignore error.if (errno != EINTR) {srs_error("ignore accept thread stoppped for accept client error");}return ret;}srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd));if ((ret = handler->on_tcp_client(client_stfd)) != ERROR_SUCCESS) {srs_warn("accept client error. ret=%d", ret);return ret;}return ret;
}

accept连接后,回调到on_tcp_client。 也就是SrsStreamListener::on_tcp_client

int SrsStreamListener::on_tcp_client(st_netfd_t stfd)
{int ret = ERROR_SUCCESS;if ((ret = server->accept_client(type, stfd)) != ERROR_SUCCESS) {srs_warn("accept client error. ret=%d", ret);return ret;}
​return ret;
}
int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
{
...SrsConnection* conn = NULL;if (type == SrsListenerRtmpStream) {conn = new SrsRtmpConn(this, client_stfd);} else if (type == SrsListenerHttpApi) {
#ifdef SRS_AUTO_HTTP_APIconn = new SrsHttpApi(this, client_stfd, http_api_mux);
#elsesrs_warn("close http client for server not support http-api");srs_close_stfd(client_stfd);return ret;
#endif} else if (type == SrsListenerHttpStream) {
#ifdef SRS_AUTO_HTTP_SERVERconn = new SrsResponseOnlyHttpConn(this, client_stfd, http_server);
#elsesrs_warn("close http client for server not support http-server");srs_close_stfd(client_stfd);return ret;
#endif} else {// TODO: FIXME: handler others}srs_assert(conn);// directly enqueue, the cycle thread will remove the client.conns.push_back(conn);srs_verbose("add conn to vector.");// cycle will start process thread and when finished remove the client.// @remark never use the conn, for it maybe destroyed.if ((ret = conn->start()) != ERROR_SUCCESS) {return ret;}srs_verbose("conn started success.");
​srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret);return ret;
}

在上面根据type创建不同的SrsConnectionRtmp创建了SrsRtmpConn,并且加入到std::vector<SrsConnection*> conns;中,然后执行conn->start()

SrsConnection基类创建了一个协程pthread: SrsOneCycleThread,上面的conn->start(),实际上是pthread->start()

SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c)
{id = 0;manager = cm;stfd = c;disposed = false;expired = false;// the client thread should reap itself, // so we never use joinable.// TODO: FIXME: maybe other thread need to stop it.// @see: https://github.com/ossrs/srs/issues/78pthread = new SrsOneCycleThread("conn", this);
}
​
int SrsConnection::start()
{return pthread->start();
}

int SrsConnection::cycle()调用了do_cycle(),派生类实现了这个方法。

int SrsRtmpConn::do_cycle()
{int ret = ERROR_SUCCESS;srs_trace("RTMP client ip=%s", ip.c_str());
​rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);//正式进入rtmp握手。if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {srs_error("rtmp handshake failed. ret=%d", ret);return ret;}srs_verbose("rtmp handshake success");if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {srs_error("rtmp connect vhost/app failed. ret=%d", ret);return ret;}srs_verbose("rtmp connect app success");// set client ip to request.req->ip = ip;srs_trace("connect app, ""tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, args=%s", req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),req->app.c_str(), (req->args? "(obj)":"null"));// show client identityif(req->args) {std::string srs_version;std::string srs_server_ip;int srs_pid = 0;int srs_id = 0;SrsAmf0Any* prop = NULL;if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {srs_version = prop->to_str();}if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {srs_server_ip = prop->to_str();}if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {srs_pid = (int)prop->to_number();}if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {srs_id = (int)prop->to_number();}srs_info("edge-srs ip=%s, version=%s, pid=%d, id=%d", srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);if (srs_pid > 0) {srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d", srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);}}ret = service_cycle();http_hooks_on_close();
​return ret;
}

在这儿正式进入rtmp协议处理阶段。先进行握手:rtmp->handshake()等操作,然后进入service_cycle();

int SrsRtmpConn::service_cycle()
{    ...while (!disposed) {ret = stream_service_cycle();// stream service must terminated with error, never success.// when terminated with success, it's user required to stop.if (ret == ERROR_SUCCESS) {continue;}// when not system control error, fatal error, return.if (!srs_is_system_control_error(ret)) {if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {srs_error("stream service cycle failed. ret=%d", ret);}return ret;}// for republish, continue serviceif (ret == ERROR_CONTROL_REPUBLISH) {// set timeout to a larger value, wait for encoder to republish.rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT_US);rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT_US);srs_trace("control message(unpublish) accept, retry stream service.");continue;}// for "some" system control error, // logical accept and retry stream service.if (ret == ERROR_CONTROL_RTMP_CLOSE) {// TODO: FIXME: use ping message to anti-death of socket.// @see: https://github.com/ossrs/srs/issues/39// set timeout to a larger value, for user paused.rtmp->set_recv_timeout(SRS_PAUSED_RECV_TIMEOUT_US);rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT_US);srs_trace("control message(close) accept, retry stream service.");continue;}// for other system control message, fatal error.srs_error("control message(%d) reject as error. ret=%d", ret, ret);return ret;}return ret;
}

stream_service_cycle:

int SrsRtmpConn::stream_service_cycle()
{int ret = ERROR_SUCCESS;SrsRtmpConnType type;if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) != ERROR_SUCCESS) {if (!srs_is_client_gracefully_close(ret)) {srs_error("identify client failed. ret=%d", ret);}return ret;}srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param);req->strip();srs_trace("client identified, type=%s, stream_name=%s, duration=%.2f, param=%s",srs_client_type_string(type).c_str(), req->stream.c_str(), req->duration, req->param.c_str());// discovery vhost, resolve the vhost from configSrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);if (parsed_vhost) {req->vhost = parsed_vhost->arg0();}if (req->schema.empty() || req->vhost.empty() || req->port.empty() || req->app.empty()) {ret = ERROR_RTMP_REQ_TCURL;srs_error("discovery tcUrl failed. ""tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), ret);return ret;}if ((ret = check_vhost()) != ERROR_SUCCESS) {srs_error("check vhost failed. ret=%d", ret);return ret;}srs_trace("connected stream, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, stream=%s, param=%s, args=%s",req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),req->app.c_str(), req->stream.c_str(), req->param.c_str(), (req->args? "(obj)":"null"));// do token traverse before serve it.// @see https://github.com/ossrs/srs/pull/239if (true) {bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost);if (vhost_is_edge && edge_traverse) {if ((ret = check_edge_token_traverse_auth()) != ERROR_SUCCESS) {srs_warn("token auth failed, ret=%d", ret);return ret;}}}// security checkif ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) {srs_error("security check failed. ret=%d", ret);return ret;}srs_info("security check ok");// Never allow the empty stream name, for HLS may write to a file with empty name.// @see https://github.com/ossrs/srs/issues/834if (req->stream.empty()) {ret = ERROR_RTMP_STREAM_NAME_EMPTY;srs_error("RTMP: Empty stream name not allowed, ret=%d", ret);return ret;}
​// client is identified, set the timeout to service timeout.rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);// find a source to serve.SrsSource* source = NULL;if ((ret = SrsSource::fetch_or_create(req, server, &source)) != ERROR_SUCCESS) {return ret;}srs_assert(source != NULL);// update the statistic when source disconveried.SrsStatistic* stat = SrsStatistic::instance();if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) {srs_error("stat client failed. ret=%d", ret);return ret;}
​bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);bool enabled_cache = _srs_config->get_gop_cache(req->vhost);srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]",req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, source->source_id(), source->source_id());source->set_cache(enabled_cache);client_type = type;//根据客户端类型进入不同分支switch (type) {case SrsRtmpConnPlay: {srs_verbose("start to play stream %s.", req->stream.c_str());// response connection start playif ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) {srs_error("start to play stream failed. ret=%d", ret);return ret;}if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) {srs_error("http hook on_play failed. ret=%d", ret);return ret;}srs_info("start to play stream %s success", req->stream.c_str());ret = playing(source);http_hooks_on_stop();return ret;}case SrsRtmpConnFMLEPublish: {srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) {srs_error("start to publish stream failed. ret=%d", ret);return ret;}return publishing(source);}case SrsRtmpConnHaivisionPublish: {srs_verbose("Haivision start to publish stream %s.", req->stream.c_str());if ((ret = rtmp->start_haivision_publish(res->stream_id)) != ERROR_SUCCESS) {srs_error("start to publish stream failed. ret=%d", ret);return ret;}return publishing(source);}case SrsRtmpConnFlashPublish: {srs_verbose("flash start to publish stream %s.", req->stream.c_str());if ((ret = rtmp->start_flash_publish(res->stream_id)) != ERROR_SUCCESS) {srs_error("flash start to publish stream failed. ret=%d", ret);return ret;}return publishing(source);}default: {ret = ERROR_SYSTEM_CLIENT_INVALID;srs_info("invalid client type=%d. ret=%d", type, ret);return ret;}}
​return ret;
}

【学习地址】:FFmpeg/WebRTC/RTMP/NDK/Android音视频流媒体高级开发

【文章福利】:免费领取更多音视频学习资料包、大厂面试题、技术视频和学习路线图,资料包括(C/C++,Linux,FFmpeg webRTC rtmp hls rtsp ffplay srs 等等)有需要的可以点击1079654574加群领取哦~

先进行tmp->identify_client客户端身份识别。 然后根据根据客户端类型(type)进入不同分支。 SrsRtmpConnPlay 客户端播流。 SrsRtmpConnFMLEPublish Rtmp推流到服务器。 SrsRtmpConnHaivisionPublish 应该是海康威视推流到服务器? SrsRtmpConnFlashPublish Flash推流到服务器。 这儿只看SrsRtmpConnFMLEPublish: 进入int SrsRtmpConn::publishing(SrsSource* source),然后int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)

int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
{
...// start isolate recv thread.if ((ret = trd->start()) != ERROR_SUCCESS) {srs_error("start isolate recv thread failed. ret=%d", ret);return ret;}...
}

trd协程运行,协程循环:执行rtmp->recv_message(&msg)后调用int SrsPublishRecvThread::handle(SrsCommonMessage* msg)。 再回调到int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge)。 之后处理收到的数据:

int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge)
{int ret = ERROR_SUCCESS;// for edge, directly proxy message to origin.if (vhost_is_edge) {if ((ret = source->on_edge_proxy_publish(msg)) != ERROR_SUCCESS) {srs_error("edge publish proxy msg failed. ret=%d", ret);return ret;}return ret;}// process audio packetif (msg->header.is_audio()) {if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) {srs_error("source process audio message failed. ret=%d", ret);return ret;}return ret;}// process video packetif (msg->header.is_video()) {if ((ret = source->on_video(msg)) != ERROR_SUCCESS) {srs_error("source process video message failed. ret=%d", ret);return ret;}return ret;}// process aggregate packetif (msg->header.is_aggregate()) {if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {srs_error("source process aggregate message failed. ret=%d", ret);return ret;}return ret;}// process onMetaDataif (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {SrsPacket* pkt = NULL;if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {srs_error("decode onMetaData message failed. ret=%d", ret);return ret;}SrsAutoFree(SrsPacket, pkt);if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {srs_error("source process onMetaData message failed. ret=%d", ret);return ret;}srs_info("process onMetaData message success.");return ret;}srs_info("ignore AMF0/AMF3 data message.");return ret;}return ret;
}

如果本服务器是edge边缘服务器(vhost_is_edge)直接推流回源到源服务器。 audio和video分开处理。 这儿只看一下video的处理:

int SrsSource::on_video(SrsCommonMessage* shared_video)
{int ret = ERROR_SUCCESS;// monotically increase detect.if (!mix_correct && is_monotonically_increase) {if (last_packet_time > 0 && shared_video->header.timestamp < last_packet_time) {is_monotonically_increase = false;srs_warn("VIDEO: stream not monotonically increase, please open mix_correct.");}}last_packet_time = shared_video->header.timestamp;// drop any unknown header video.// @see https://github.com/ossrs/srs/issues/421if (!SrsFlvCodec::video_is_acceptable(shared_video->payload, shared_video->size)) {char b0 = 0x00;if (shared_video->size > 0) {b0 = shared_video->payload[0];}srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0);return ret;}// convert shared_video to msg, user should not use shared_video again.// the payload is transfer to msg, and set to NULL in shared_video.SrsSharedPtrMessage msg;if ((ret = msg.create(shared_video)) != ERROR_SUCCESS) {srs_error("initialize the video failed. ret=%d", ret);return ret;}srs_info("Video dts=%"PRId64", size=%d", msg.timestamp, msg.size);// directly process the audio message.if (!mix_correct) {return on_video_imp(&msg);}// insert msg to the queue.mix_queue->push(msg.copy());// fetch someone from mix queue.SrsSharedPtrMessage* m = mix_queue->pop();if (!m) {return ret;}// consume the monotonically increase message.if (m->is_audio()) {ret = on_audio_imp(m);} else {ret = on_video_imp(m);}srs_freep(m);return ret;
}

shared_video转换为SrsSharedPtrMessage。 调用on_video_imp

int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
{int ret = ERROR_SUCCESS;srs_info("Video dts=%"PRId64", size=%d", msg->timestamp, msg->size);bool is_sequence_header = SrsFlvCodec::video_is_sequence_header(msg->payload, msg->size);// whether consumer should drop for the duplicated sequence header.bool drop_for_reduce = false;if (is_sequence_header && cache_sh_video && _srs_config->get_reduce_sequence_header(_req->vhost)) {if (cache_sh_video->size == msg->size) {drop_for_reduce = srs_bytes_equals(cache_sh_video->payload, msg->payload, msg->size);srs_warn("drop for reduce sh video, size=%d", msg->size);}}// cache the sequence header if h264// donot cache the sequence header to gop_cache, return here.if (is_sequence_header) {srs_freep(cache_sh_video);cache_sh_video = msg->copy();// parse detail audio codecSrsAvcAacCodec codec;// user can disable the sps parse to workaround when parse sps failed.// @see https://github.com/ossrs/srs/issues/474codec.avc_parse_sps = _srs_config->get_parse_sps(_req->vhost);SrsCodecSample sample;if ((ret = codec.video_avc_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) {srs_error("source codec demux video failed. ret=%d", ret);return ret;}// when got video stream info.SrsStatistic* stat = SrsStatistic::instance();if ((ret = stat->on_video_info(_req, SrsCodecVideoAVC, codec.avc_profile, codec.avc_level)) != ERROR_SUCCESS) {return ret;}srs_trace("%dB video sh,  codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %dfps, %ds)",msg->size, codec.video_codec_id,srs_codec_avc_profile2str(codec.avc_profile).c_str(),srs_codec_avc_level2str(codec.avc_level).c_str(), codec.width, codec.height,codec.video_data_rate / 1000, codec.frame_rate, codec.duration);}#ifdef SRS_AUTO_HLSif ((ret = hls->on_video(msg, is_sequence_header)) != ERROR_SUCCESS) {// apply the error strategy for hls.// @see https://github.com/ossrs/srs/issues/264std::string hls_error_strategy = _srs_config->get_hls_on_error(_req->vhost);if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) {srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret);// unpublish, ignore ret.hls->on_unpublish();// ignore.ret = ERROR_SUCCESS;} else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) {if (srs_hls_can_continue(ret, cache_sh_video, msg)) {ret = ERROR_SUCCESS;} else {srs_warn("hls continue video failed. ret=%d", ret);return ret;}} else {srs_warn("hls disconnect publisher for video error. ret=%d", ret);return ret;}}
#endif#ifdef SRS_AUTO_DVRif ((ret = dvr->on_video(msg)) != ERROR_SUCCESS) {srs_warn("dvr process video message failed, ignore and disable dvr. ret=%d", ret);// unpublish, ignore ret.dvr->on_unpublish();// ignore.ret = ERROR_SUCCESS;}
#endif
​
#ifdef SRS_AUTO_HDSif ((ret = hds->on_video(msg)) != ERROR_SUCCESS) {srs_warn("hds process video message failed, ignore and disable dvr. ret=%d", ret);// unpublish, ignore ret.hds->on_unpublish();// ignore.ret = ERROR_SUCCESS;}
#endif// copy to all consumerif (!drop_for_reduce) {for (int i = 0; i < (int)consumers.size(); i++) {SrsConsumer* consumer = consumers.at(i);if ((ret = consumer->enqueue(msg, atc, jitter_algorithm)) != ERROR_SUCCESS) {srs_error("dispatch the video failed. ret=%d", ret);return ret;}}srs_info("dispatch video success.");}
​// copy to all forwarders.if (!forwarders.empty()) {std::vector<SrsForwarder*>::iterator it;for (it = forwarders.begin(); it != forwarders.end(); ++it) {SrsForwarder* forwarder = *it;if ((ret = forwarder->on_video(msg)) != ERROR_SUCCESS) {srs_error("forwarder process video message failed. ret=%d", ret);return ret;}}}// when sequence header, donot push to gop cache and adjust the timestamp.if (is_sequence_header) {return ret;}
​// cache the last gop packetsif ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) {srs_error("gop cache msg failed. ret=%d", ret);return ret;}srs_verbose("cache gop success.");// if atc, update the sequence header to abs time.if (atc) {if (cache_sh_video) {cache_sh_video->timestamp = msg->timestamp;}if (cache_metadata) {cache_metadata->timestamp = msg->timestamp;}}return ret;
}

以上进行了缓存h264 sequence header,hls分发,客户端消费者分发,forwarders推流等等。 这里主要看一下消费者分发:

// copy to all consumerif (!drop_for_reduce) {for (int i = 0; i < (int)consumers.size(); i++) {SrsConsumer* consumer = consumers.at(i);if ((ret = consumer->enqueue(msg, atc, jitter_algorithm)) != ERROR_SUCCESS) {srs_error("dispatch the video failed. ret=%d", ret);return ret;}}srs_info("dispatch video success.");}
int SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag)
{int ret = ERROR_SUCCESS;//这儿的copy操作只是增加引用计数,没有实际的内存拷贝。SrsSharedPtrMessage* msg = shared_msg->copy();
​if (!atc) {if ((ret = jitter->correct(msg, ag)) != ERROR_SUCCESS) {srs_freep(msg);return ret;}}if ((ret = queue->enqueue(msg, NULL)) != ERROR_SUCCESS) {return ret;}#ifdef SRS_PERF_QUEUE_COND_WAITsrs_verbose("enqueue msg, time=%"PRId64", size=%d, duration=%d, waiting=%d, min_msg=%d", msg->timestamp, msg->size, queue->duration(), mw_waiting, mw_min_msgs);// fire the mw when msgs is enough.if (mw_waiting) {int duration_ms = queue->duration();bool match_min_msgs = queue->size() > mw_min_msgs;// For ATC, maybe the SH timestamp bigger than A/V packet,// when encoder republish or overflow.// @see https://github.com/ossrs/srs/pull/749if (atc && duration_ms < 0) {st_cond_signal(mw_wait);mw_waiting = false;return ret;}// when duration ok, signal to flush.if (match_min_msgs && duration_ms > mw_duration) {st_cond_signal(mw_wait);mw_waiting = false;return ret;}}
#endifreturn ret;
}

每个SrsConsumer消费者拥有独立的SrsMessageQueue* queue队列。内部队列实现实际上是std::multimap<int64_t, SrsSharedPtrMessage*> msgsSrsMessageQueue有数量大小限制,当队列满的时候删除丢弃旧的messages:

队列大小限制queue_size设置为配置文件中的"queue_length"。如果没设置则默认#define SRS_PERF_PLAY_QUEUE 30queue_size_ms = (int)(queue_size * 1000);

int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
{int ret = ERROR_SUCCESS;if (msg->is_av()) {if (av_start_time == -1) {av_start_time = msg->timestamp;}av_end_time = msg->timestamp;}msgs.push_back(msg);
​while (av_end_time - av_start_time > queue_size_ms) {// notice the caller queue already overflow and shrinked.if (is_overflow) {*is_overflow = true;}shrink();}return ret;
}
void SrsMessageQueue::shrink()
{SrsSharedPtrMessage* video_sh = NULL;SrsSharedPtrMessage* audio_sh = NULL;int msgs_size = (int)msgs.size();// remove all msg// igone the sequence headerfor (int i = 0; i < (int)msgs.size(); i++) {SrsSharedPtrMessage* msg = msgs.at(i);
​if (msg->is_video() && SrsFlvCodec::video_is_sequence_header(msg->payload, msg->size)) {srs_freep(video_sh);video_sh = msg;continue;}else if (msg->is_audio() && SrsFlvCodec::audio_is_sequence_header(msg->payload, msg->size)) {srs_freep(audio_sh);audio_sh = msg;continue;}
​srs_freep(msg);}msgs.clear();
​// update av_start_timeav_start_time = av_end_time;//push_back secquence header and update timestampif (video_sh) {video_sh->timestamp = av_end_time;msgs.push_back(video_sh);}if (audio_sh) {audio_sh->timestamp = av_end_time;msgs.push_back(audio_sh);}if (_ignore_shrink) {srs_info("shrink the cache queue, size=%d, removed=%d, max=%.2f", (int)msgs.size(), msgs_size - (int)msgs.size(), queue_size_ms / 1000.0);} else {srs_trace("shrink the cache queue, size=%d, removed=%d, max=%.2f", (int)msgs.size(), msgs_size - (int)msgs.size(), queue_size_ms / 1000.0);}
}

保存最近的sequence_header,然后清除其他messages

AVC sequence header AAC sequence header 这两个header非常重要,是客户端解码的必需部分,所以不能删除。

这个丢包策略没有根据整个GOP进行丢包,而是直接丢掉除sequence_header的包,有可能会造成客户端花屏。

3、总结


客户端Rtmp推流到服务器,服务器将消息缓存到各个客户端消费者自己的队列中,数据使用引用计数没有内存拷贝操作。过期数据将被清除。 客户端消费者是SrsRtmpConnPlay类型,消费者播放流的流程在下一篇文章中介绍。

原文链接:SRS流媒体服务器源码分析(一):Rtmp publish流程 - 简书

SRS流媒体服务器源码分析(一):Rtmp publish流程相关推荐

  1. SRS(simple-rtmp-server)流媒体服务器源码分析--启动

    SRS(simple-rtmp-server)流媒体服务器源码分析--系统启动 一.前言 小卒最近看SRS源码,随手写下博客,其一为了整理思路,其二也是为日后翻看方便.如果不足之处,请指教! 首先总结 ...

  2. HDFS源码分析DataXceiver之整体流程

    在<HDFS源码分析之DataXceiverServer>一文中,我们了解到在DataNode中,有一个后台工作的线程DataXceiverServer.它被用于接收来自客户端或其他数据节 ...

  3. Fuchsia源码分析--系统调用流程

    Fuchsia源码分析--系统调用流程 以zx_channel_create为例 Fuchsia系统调用的定义 Fuchsia系统调用定义文件的编译 Fuchsia系统调用用户空间的调用流程 zx_c ...

  4. Thrift异步IO服务器源码分析

    http://yanyiwu.com/work/2014/12/06/thrift-tnonblockingserver-analysis.html 最近在使用 libevent 开发项目,想起之前写 ...

  5. Linux项目实战C++轻量级Web服务器源码分析TinyWebServer

    目录 文章简介 一. 先跑起来项目 二.再看项目核心 三.逐个击破!立下flag 文章简介 TinyWebServer是Linux下C++轻量级Web服务器,助力初学者快速实践网络编程,搭建属于自己的 ...

  6. [Abp vNext 源码分析] - 1. 框架启动流程分析

    一.简要说明 本篇文章主要剖析与讲解 Abp vNext 在 Web API 项目下的启动流程,让大家了解整个 Abp vNext 框架是如何运作的.总的来说 ,Abp vNext 比起 ABP 框架 ...

  7. 源码分析Dubbo服务提供者启动流程-下篇

    本文继续上文Dubbo服务提供者启动流程,在上篇文章中详细梳理了从dubbo spring文件开始,Dubbo是如何加载配置文件,服务提供者dubbo:service标签服务暴露全流程,本节重点关注R ...

  8. 源码分析Dubbo服务提供者启动流程-上篇

    本节将详细分析Dubbo服务提供者的启动流程,请带着如下几个疑问进行本节的阅读,因为这几个问题将是接下来几篇文章分析的重点内容.  1.什么时候建立与注册中心的连接.  2.服务提供者什么时候向注册中 ...

  9. F2FS源码分析-5.2 [数据恢复流程] 后滚恢复和Checkpoint的作用与实现

    F2FS源码分析系列文章 主目录 一.文件系统布局以及元数据结构 二.文件数据的存储以及读写 三.文件与目录的创建以及删除(未完成) 四.垃圾回收机制 五.数据恢复机制 数据恢复的原理以及方式 后滚恢 ...

最新文章

  1. 博客园出现了奇怪的cookie问题
  2. 以后再也不去字节面试了…
  3. [QTP] 描述性编程
  4. 他将国际奥赛变成个人秀,哈佛为他打破校规,他的选择让国人骄傲
  5. php 失去 焦点 另一个表单猎取值,同一表单如何根据某一个文本框的值 改变另一个文本框的值...
  6. 基于JavaSpringboot+Vue实现前后端分离房屋租赁系统
  7. 后会终无期,且行且珍惜
  8. 数据科学入门与实战:玩转pandas之一
  9. Linux基础——怎么样从 Windows 通过 SSH 远程 Linux
  10. 分享些我见到的听到的各种创业经历(有成功也有失败)——分析下创业成功要做到哪些...
  11. AR/MR技术的应用
  12. 激活navicat12
  13. 【生信】全基因组测序(WGS)
  14. android横竖屏切换布局闪退,Android 横竖屏切换以及横屏启动闪退问题
  15. 男女在床上说的28个NB笑话!
  16. 已经开工三天的软件测试工程师:被女足和谷爱凌感动到了
  17. 机械臂控制C语言程序,ROS机械臂开发:MoveIt!编程
  18. add_metrology_object_circle_measure (对齐测量模型)
  19. hiho 1051 : 补提交卡
  20. openpyxl基本使用

热门文章

  1. js阻止冒泡的两种方法
  2. scala的apply用法
  3. 基于Arduino SDK 开发炫彩灯带(附送礼品)
  4. 霍夫曼树以及霍夫曼编码(动态数组实现方式)
  5. JavaScript 的怪癖 8:“类数组对象”
  6. android mnt asec,通过adb修改Android etc下的vold.fstab,调测vold.fstab
  7. bzoj3262: 陌上花开
  8. 1062lcd在dxp哪个库_dxp_2004_元件库中的常用元件所在位置
  9. 聊聊 JDBC 的 executeBatch || 对比下不同数据库对 JDBC batch 的实现细节
  10. 解决win10 ubuntu双系统删除ubuntu后,windows界面进不去的问题