live555支持单播和组播,我们先分析单播的流媒体服务端,后面分析组播的流媒体服务端。

一、单播的流媒体服务端:

// Create the RTSP server:

RTSPServer* rtspServer = NULL;

// Normal case: Streaming from a built-in RTSP server:

rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);

if (rtspServer == NULL) {

*env << "Failed to create RTSP server: " << env->getResultMsg() << "\n";

exit();

}

*env << "...done initializing \n";

if( streamingMode == STREAMING_UNICAST )

{

ServerMediaSession* sms = ServerMediaSession::createNew(*env,

H264StreamName[video_type],

H264StreamName[video_type],

streamDescription,

streamingMode == STREAMING_MULTICAST_SSM);

sms->addSubsession(WISH264VideoServerMediaSubsession::createNew(sms->envir(), *H264InputDevice[video_type], H264VideoBitrate));

sms->addSubsession(WISPCMAudioServerMediaSubsession::createNew(sms->envir(), *H264InputDevice[video_type]));

rtspServer->addServerMediaSession(sms);

char *url = rtspServer->rtspURL(sms);

*env << "Play this stream using the URL:\t" << url << "\n";

delete[] url;

}

// Begin the LIVE555 event loop:

env->taskScheduler().doEventLoop(&watchVariable); // does not return

我们一步一步分析:

1>  rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);

RTSPServer*

RTSPServer::createNew(UsageEnvironment& env, Port ourPort,

UserAuthenticationDatabase* authDatabase,

unsigned reclamationTestSeconds)

{

int ourSocket = -;

do {

int ourSocket = setUpOurSocket(env, ourPort);

if (ourSocket == -) break;

return new RTSPServer(env, ourSocket, ourPort, authDatabase, reclamationTestSeconds);

} while ();

if (ourSocket != -) ::closeSocket(ourSocket);

return NULL;

}

此函数首先创建一个rtsp协议的socket,并且监听rtspServerPortNum端口,创建RTSPServer类的实例。下面我们看下RTSPServer的构造函数:

RTSPServer::RTSPServer(UsageEnvironment& env,

int ourSocket, Port ourPort,

UserAuthenticationDatabase* authDatabase,

unsigned reclamationTestSeconds)

: Medium(env),

fServerSocket(ourSocket), fServerPort(ourPort),

fAuthDB(authDatabase), fReclamationTestSeconds(reclamationTestSeconds),

fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),

fSessionIdCounter()

{

#ifdef USE_SIGNALS

// Ignore the SIGPIPE signal, so that clients on the same host that are killed

// don't also kill us:

signal(SIGPIPE, SIG_IGN);

#endif

// Arrange to handle connections from others:

env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket, (TaskScheduler::BackgroundHandlerProc*)&incomingConnectionHandler, this);

}

RTSPServer构造函数,初始化fServerMediaSessions为创建的HashTable,初始化fServerSocket为我们前面创建的tcp socket,fServerPort为我们监听的端口rtspServerPortNum,并且向taskScheduler注册fServerSocket的任务函数incomingConnectionHandler,这个任务函数主要监听是否有新的客服端连接accept,如果有新的客服端接入,创建RTSPClientSession的实例。

RTSPClientSession要提供什么功能呢?可以想象:需要监听客户端的rtsp请求并回应它,需要在DESCRIBE请求中返回所请求的流的信息,需要在SETUP请求中建立起RTP会话,需要在TEARDOWN请求中关闭RTP会话,等等...

RTSPServer::RTSPClientSession::RTSPClientSession(RTSPServer& ourServer, unsigned sessionId, int clientSocket, struct sockaddr_in clientAddr)

: fOurServer(ourServer), fOurSessionId(sessionId),

fOurServerMediaSession(NULL),

fClientSocket(clientSocket), fClientAddr(clientAddr),

fLivenessCheckTask(NULL),

fIsMulticast(False), fSessionIsActive(True), fStreamAfterSETUP(False),

fTCPStreamIdCount(), fNumStreamStates(), fStreamStates(NULL)

{

// Arrange to handle incoming requests:

resetRequestBuffer();

envir().taskScheduler().turnOnBackgroundReadHandling(fClientSocket,(TaskScheduler::BackgroundHandlerProc*)&incomingRequestHandler, this);

noteLiveness();

}

上面这个函数是RTSPClientSession的构造函数,初始化sessionId为++fSessionIdCounter,初始化fClientSocket为accept创建的socket(clientSocket),初始化fClientAddr为accept接收的客服端地址,也向taskScheduler注册了fClientSocket的认为函数incomingRequestHandler。

incomingRequestHandler会调用incomingRequestHandler1,incomingRequestHandler1函数定义如下:

void RTSPServer::RTSPClientSession::incomingRequestHandler1()

{

noteLiveness();

struct sockaddr_in dummy; // 'from' address, meaningless in this case

Boolean endOfMsg = False;

unsigned char* ptr = &fRequestBuffer[fRequestBytesAlreadySeen];

int bytesRead = readSocket(envir(), fClientSocket, ptr, fRequestBufferBytesLeft, dummy);

if (bytesRead <= || (unsigned)bytesRead >= fRequestBufferBytesLeft) {

// Either the client socket has died, or the request was too big for us.

// Terminate this connection:

#ifdef DEBUG

fprintf(stderr, "RTSPClientSession[%p]::incomingRequestHandler1() read %d bytes (of %d); terminating connection!\n", this, bytesRead, fRequestBufferBytesLeft);

#endif

delete this;

return;

}

#ifdef DEBUG

ptr[bytesRead] = '\0';

fprintf(stderr, "RTSPClientSession[%p]::incomingRequestHandler1() read %d bytes:%s\n", this, bytesRead, ptr);

#endif

// Look for the end of the message:

unsigned char *tmpPtr = ptr;

if (fRequestBytesAlreadySeen > ) --tmpPtr;

// in case the last read ended with a

while (tmpPtr < &ptr[bytesRead-]) {

if (*tmpPtr == '\r' && *(tmpPtr+) == '\n') {

if (tmpPtr - fLastCRLF == ) { // This is it:

endOfMsg = ;

break;

}

fLastCRLF = tmpPtr;

}

++tmpPtr;

}

fRequestBufferBytesLeft -= bytesRead;

fRequestBytesAlreadySeen += bytesRead;

if (!endOfMsg) return; // subsequent reads will be needed to complete the request

// Parse the request string into command name and 'CSeq',

// then handle the command:

fRequestBuffer[fRequestBytesAlreadySeen] = '\0';

char cmdName[RTSP_PARAM_STRING_MAX];

char urlPreSuffix[RTSP_PARAM_STRING_MAX];

char urlSuffix[RTSP_PARAM_STRING_MAX];

char cseq[RTSP_PARAM_STRING_MAX];

if (!parseRTSPRequestString((char*)fRequestBuffer, fRequestBytesAlreadySeen,

cmdName, sizeof cmdName,

urlPreSuffix, sizeof urlPreSuffix,

urlSuffix, sizeof urlSuffix,

cseq, sizeof cseq))

{

#ifdef DEBUG

fprintf(stderr, "parseRTSPRequestString() failed!\n");

#endif

handleCmd_bad(cseq);

} else {

#ifdef DEBUG

fprintf(stderr, "parseRTSPRequestString() returned cmdName \"%s\", urlPreSuffix \"%s\", urlSuffix \"%s\"\n", cmdName, urlPreSuffix, urlSuffix);

#endif

if (strcmp(cmdName, "OPTIONS") == ) {

handleCmd_OPTIONS(cseq);

} else if (strcmp(cmdName, "DESCRIBE") == ) {

printf("incomingRequestHandler1 ~~~~~~~~~~~~~~\n");

handleCmd_DESCRIBE(cseq, urlSuffix, (char const*)fRequestBuffer);

} else if (strcmp(cmdName, "SETUP") == ) {

handleCmd_SETUP(cseq, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);

} else if (strcmp(cmdName, "TEARDOWN") ==

|| strcmp(cmdName, "PLAY") ==

|| strcmp(cmdName, "PAUSE") ==

|| strcmp(cmdName, "GET_PARAMETER") == ) {

handleCmd_withinSession(cmdName, urlPreSuffix, urlSuffix, cseq, (char const*)fRequestBuffer);

} else {

handleCmd_notSupported(cseq);

}

}

#ifdef DEBUG

fprintf(stderr, "sending response: %s", fResponseBuffer);

#endif

send(fClientSocket, (char const*)fResponseBuffer, strlen((char*)fResponseBuffer), );

if (strcmp(cmdName, "SETUP") == && fStreamAfterSETUP) {

// The client has asked for streaming to commence now, rather than after a

// subsequent "PLAY" command. So, simulate the effect of a "PLAY" command:

handleCmd_withinSession("PLAY", urlPreSuffix, urlSuffix, cseq, (char const*)fRequestBuffer);

}

resetRequestBuffer(); // to prepare for any subsequent request

if (!fSessionIsActive) delete this;

}

此函数,我们可以看到rtsp的协议的各个命令的接收处理和应答。

2> ServerMediaSession* sms = ServerMediaSession::createNew(... ...)

创建ServerMediaSession类的实例,初始化fStreamName为"h264_ch1",fInfoSDPString为"h264_ch1",fDescriptionSDPString为"RTSP/RTP stream from NETRA",fMiscSDPLines为null,fCreationTime获取的时间,fIsSSM为false。

3> sms->addSubsession(WISH264VideoServerMediaSubsession::createNew(... ...);

WISH264VideoServerMediaSubsession::createNew():这个函数的主要目的是创建OnDemandServerMediaSubsession类的实例,这个类在前面已经分析,是单播时候必须创建的,初始化fWISInput为*H264InputDevice[video_type]。

sms->addSubsession() 是将WISH264VideoServerMediaSubsession类的实例加入到fSubsessionsTail链表首节点中。

4> sms->addSubsession(WISPCMAudioServerMediaSubsession::createNew(... ...);

WISPCMAudioServerMediaSubsession::createNew():这个函数的主要目的是创建OnDemandServerMediaSubsession类的实例,这个类在前面已经分析,是单播时候必须创建的,初始化fWISInput为*H264InputDevice[video_type]。

sms->addSubsession() 是将WISPCMAudioServerMediaSubsession类的实例加入到fSubsessionsTail->fNext中。

5> rtspServer->addServerMediaSession(sms)

将rtspServer加入到fServerMediaSessions的哈希表中。

6> env->taskScheduler().doEventLoop(&watchVariable);

这个doEventLoop在前面已经分析过,主要处理socket任务和延迟任务。

二、组播的流媒体服务器:

// Create the RTSP server:

RTSPServer* rtspServer = NULL;

// Normal case: Streaming from a built-in RTSP server:

rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);

if (rtspServer == NULL) {

*env << "Failed to create RTSP server: " << env->getResultMsg() << "\n";

exit();

}

*env << "...done initializing \n";

if( streamingMode == STREAMING_UNICAST )

{

... ...

}

else

{

if (streamingMode == STREAMING_MULTICAST_SSM)

{

if (multicastAddress == )

multicastAddress = chooseRandomIPv4SSMAddress(*env);

} else if (multicastAddress != ) {

streamingMode = STREAMING_MULTICAST_ASM;

}

struct in_addr dest;

dest.s_addr = multicastAddress;

const unsigned char ttl = ;

// For RTCP:

const unsigned maxCNAMElen = ;

unsigned char CNAME[maxCNAMElen + ];

gethostname((char *) CNAME, maxCNAMElen);

CNAME[maxCNAMElen] = '\0'; // just in case

ServerMediaSession* sms;

sms = ServerMediaSession::createNew(*env, H264StreamName[video_type], H264StreamName[video_type], streamDescription,streamingMode == STREAMING_MULTICAST_SSM);

/* VIDEO Channel initial */

if()

{

// Create 'groupsocks' for RTP and RTCP:

const Port rtpPortVideo(videoRTPPortNum);

const Port rtcpPortVideo(videoRTPPortNum+);

rtpGroupsockVideo = new Groupsock(*env, dest, rtpPortVideo, ttl);

rtcpGroupsockVideo = new Groupsock(*env, dest, rtcpPortVideo, ttl);

if (streamingMode == STREAMING_MULTICAST_SSM) {

rtpGroupsockVideo->multicastSendOnly();

rtcpGroupsockVideo->multicastSendOnly();

}

setVideoRTPSinkBufferSize();

sinkVideo = H264VideoRTPSink::createNew(*env, rtpGroupsockVideo,, 0x42, "h264");

// Create (and start) a 'RTCP instance' for this RTP sink:

unsigned totalSessionBandwidthVideo = (Mpeg4VideoBitrate+)/; // in kbps; for RTCP b/w share

rtcpVideo = RTCPInstance::createNew(*env, rtcpGroupsockVideo,

totalSessionBandwidthVideo, CNAME,

sinkVideo, NULL /* we're a server */ ,

streamingMode == STREAMING_MULTICAST_SSM);

// Note: This starts RTCP running automatically

sms->addSubsession(PassiveServerMediaSubsession::createNew(*sinkVideo, rtcpVideo));

sourceVideo = H264VideoStreamFramer::createNew(*env, H264InputDevice[video_type]->videoSource());

// Start streaming:

sinkVideo->startPlaying(*sourceVideo, NULL, NULL);

}

/* AUDIO Channel initial */

if()

{

// there's a separate RTP stream for audio

// Create 'groupsocks' for RTP and RTCP:

const Port rtpPortAudio(audioRTPPortNum);

const Port rtcpPortAudio(audioRTPPortNum+);

rtpGroupsockAudio = new Groupsock(*env, dest, rtpPortAudio, ttl);

rtcpGroupsockAudio = new Groupsock(*env, dest, rtcpPortAudio, ttl);

if (streamingMode == STREAMING_MULTICAST_SSM)

{

rtpGroupsockAudio->multicastSendOnly();

rtcpGroupsockAudio->multicastSendOnly();

}

if( audioSamplingFrequency == )

sinkAudio = SimpleRTPSink::createNew(*env, rtpGroupsockAudio, , audioSamplingFrequency, "audio", "PCMU", );

else

sinkAudio = SimpleRTPSink::createNew(*env, rtpGroupsockAudio, , audioSamplingFrequency, "audio", "PCMU", );

// Create (and start) a 'RTCP instance' for this RTP sink:

unsigned totalSessionBandwidthAudio = (audioOutputBitrate+)/; // in kbps; for RTCP b/w share

rtcpAudio = RTCPInstance::createNew(*env, rtcpGroupsockAudio,

totalSessionBandwidthAudio, CNAME,

sinkAudio, NULL /* we're a server */,

streamingMode == STREAMING_MULTICAST_SSM);

// Note: This starts RTCP running automatically

sms->addSubsession(PassiveServerMediaSubsession::createNew(*sinkAudio, rtcpAudio));

sourceAudio = H264InputDevice[video_type]->audioSource();

// Start streaming:

sinkAudio->startPlaying(*sourceAudio, NULL, NULL);

}

rtspServer->addServerMediaSession(sms);

{

struct in_addr dest; dest.s_addr = multicastAddress;

char *url = rtspServer->rtspURL(sms);

//char *url2 = inet_ntoa(dest);

*env << "Mulicast Play this stream using the URL:\n\t" << url << "\n";

//*env << "2 Mulicast addr:\n\t" << url2 << "\n";

delete[] url;

}

}

// Begin the LIVE555 event loop:

env->taskScheduler().doEventLoop(&watchVariable); // does not return

1> rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);

同前面单播的分析一样。

2> sms = ServerMediaSession::createNew(... ...)

同前面单播的分析一样。

3> 视频

1. 创建视频rtp、rtcp的Groupsock类的实例,实现rtp和rtcp的udp通信socket。这里应该了解下ASM和SSM。

2. 创建RTPSink类的实例,实现视频数据的RTP打包传输。

3. 创建RTCPInstance类的实例,实现RTCP打包传输。

4. 创建PassiveServerMediaSubsession类的实例,并加入到fSubsessionsTail链表中的首节点。

5. 创建FramedSource类的实例,实现一帧视频数据的获取。

5. 开始发送RTP和RTCP数据到组播地址。

4> 音频

1. 创建音频rtp、rtcp的Groupsock类的实例,实现rtp和rtcp的udp通信socket。这里应该了解下ASM和SSM。

2. 创建RTPSink类的实例,实现音频数据的RTP打包传输。

3. 创建RTCPInstance类的实例,实现RTCP打包传输。

4. 创建PassiveServerMediaSubsession类的实例,并加入到fSubsessionsTail链表中的下一个节点。

5. 创建FramedSource类的实例,实现一帧音频数据的获取。

5. 开始发送RTP和RTCP数据到组播地址。

5> rtspServer->addServerMediaSession(sms)

同前面单播的分析一样。

6> env->taskScheduler().doEventLoop(&watchVariable)

同前面单播的分析一样。

三、单播和组播的区别

1> 创建socket的时候,组播一开始就创建了,而单播的则是根据收到的“SETUP”命令创建相应的socket。

2> startPlaying的时候,组播一开始就发送数据到组播地址,而单播则是根据收到的“PLAY”命令开始startPlaying。

四、startPlaying分析

首先分析组播:

sinkVideo->startPlaying()实现不在H264VideoRTPSink类中,也不在RTPSink类中,而是在MediaSink类中实现:

Boolean MediaSink::startPlaying(MediaSource& source,

afterPlayingFunc* afterFunc,

void* afterClientData)

{

// Make sure we're not already being played:

if (fSource != NULL) {

envir().setResultMsg("This sink is already being played");

return False;

}

// Make sure our source is compatible:

if (!sourceIsCompatibleWithUs(source)) {

envir().setResultMsg("MediaSink::startPlaying(): source is not compatible!");

return False;

}

fSource = (FramedSource*)&source;

fAfterFunc = afterFunc;

fAfterClientData = afterClientData;

return continuePlaying();

}

这里发现调用了continuePlaying()函数,那这个函数在哪里实现的呢?因为sinkVideo是通过 H264VideoRTPSink::createNew()实现,返回的H264VideoRTPSink类的实例,因此我们可以判定这个continuePlaying()在H264VideoRTPSink类实现。

Boolean H264VideoRTPSink::continuePlaying()

{

// First, check whether we have a 'fragmenter' class set up yet.

// If not, create it now:

if (fOurFragmenter == NULL) {

fOurFragmenter = new H264FUAFragmenter(envir(), fSource, OutPacketBuffer::maxSize, ourMaxPacketSize() - /*RTP hdr size*/);

fSource = fOurFragmenter;

}

//printf("function=%s line=%d\n",__func__,__LINE__);

// Then call the parent class's implementation:

return MultiFramedRTPSink::continuePlaying();

}

看到这里我们发现调用的是MultiFramedRTPSink类的成员函数continuePlaying,看下这个函数的实现:

Boolean MultiFramedRTPSink::continuePlaying()

{

// Send the first packet.

// (This will also schedule any future sends.)

buildAndSendPacket(True);

return True;

}

这里我们发现了buildAndSendPacket(),这个函数实现:

void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket)

{

//此函数中主要是准备rtp包的头,为一些需要跟据实际数据改变的字段留出位置。

fIsFirstPacket = isFirstPacket;

// Set up the RTP header:

unsigned rtpHdr = 0x80000000; // RTP version 2; marker ('M') bit not set (by default; it can be set later)

rtpHdr |= (fRTPPayloadType << );

rtpHdr |= fSeqNo; // sequence number

fOutBuf->enqueueWord(rtpHdr);//向包中加入一个字

// Note where the RTP timestamp will go.

// (We can't fill this in until we start packing payload frames.)

fTimestampPosition = fOutBuf->curPacketSize();

fOutBuf->skipBytes(); // leave a hole for the timestamp 在缓冲中空出时间戳的位置

fOutBuf->enqueueWord(SSRC());

// Allow for a special, payload-format-specific header following the

// RTP header:

fSpecialHeaderPosition = fOutBuf->curPacketSize();

fSpecialHeaderSize = specialHeaderSize();

fOutBuf->skipBytes(fSpecialHeaderSize);

// Begin packing as many (complete) frames into the packet as we can:

fTotalFrameSpecificHeaderSizes = ;

fNoFramesLeft = False;

fNumFramesUsedSoFar = ; // 一个包中已打入的帧数。

//头准备好了,再打包帧数据

packFrame();

}

继续看packFrame():

void MultiFramedRTPSink::packFrame()

{

// First, see if we have an overflow frame that was too big for the last pkt

if (fOutBuf->haveOverflowData()) {

//如果有帧数据,则使用之。OverflowData是指上次打包时剩下的帧数据,因为一个包可能容纳不了一个帧。

// Use this frame before reading a new one from the source

unsigned frameSize = fOutBuf->overflowDataSize();

struct timeval presentationTime = fOutBuf->overflowPresentationTime();

unsigned durationInMicroseconds =fOutBuf->overflowDurationInMicroseconds();

fOutBuf->useOverflowData();

afterGettingFrame1(frameSize, , presentationTime,durationInMicroseconds);

} else {

//一点帧数据都没有,跟source要吧。

// Normal case: we need to read a new frame from the source

if (fSource == NULL)

return;

//更新缓冲中的一些位置

fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize();

fCurFrameSpecificHeaderSize = frameSpecificHeaderSize();

fOutBuf->skipBytes(fCurFrameSpecificHeaderSize);

fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize;

//从source获取下一帧

fSource->getNextFrame(fOutBuf->curPtr(),//新数据存放开始的位置

fOutBuf->totalBytesAvailable(),//缓冲中空余的空间大小

afterGettingFrame, //因为可能source中的读数据函数会被放在任务调度中,所以把获取帧后应调用的函数传给source

this,

ourHandleClosure, //这个是source结束时(比如文件读完了)要调用的函数。

this);

}

}

fSource定义在MediaSink类中,在这个类中startPlaying()函数中,给fSource赋值为传入的参数sourceVideo,sourceVideo实现getNextFrame()函数在FramedSource中,这是一个虚函数:

void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize,

afterGettingFunc* afterGettingFunc,

void* afterGettingClientData,

onCloseFunc* onCloseFunc,

void* onCloseClientData)

{

// Make sure we're not already being read:

if (fIsCurrentlyAwaitingData) {

envir() << "FramedSource[" << this << "]::getNextFrame(): attempting to read more than once at the same time!\n";

exit();

}

fTo = to;

fMaxSize = maxSize;

fNumTruncatedBytes = ; // by default; could be changed by doGetNextFrame()

fDurationInMicroseconds = ; // by default; could be changed by doGetNextFrame()

fAfterGettingFunc = afterGettingFunc;

fAfterGettingClientData = afterGettingClientData;

fOnCloseFunc = onCloseFunc;

fOnCloseClientData = onCloseClientData;

fIsCurrentlyAwaitingData = True;

doGetNextFrame();

}

sourceVideo通过实现H264VideoStreamFramer::createNew()实例化,发现doGetNextFrame()函数实现在H264VideoStreamFramer类中:

void H264VideoStreamFramer::doGetNextFrame()

{

//fParser->registerReadInterest(fTo, fMaxSize);

//continueReadProcessing();

fInputSource->getNextFrame(fTo, fMaxSize,

afterGettingFrame, this,

FramedSource::handleClosure, this);

}

这fInputSource在H264VideoStreamFramer的基类StreamParser中被初始化为传入的参数H264InputDevice[video_type]->videoSource(),VideoOpenFileSource类继承OpenFileSource类,因此这个doGetNextFrame再一次FramedSource类中的getNextFrame()函数,这次getNextFrame函数中调用的doGetNextFrame()函数则是在OpenFileSource类实现的:

void OpenFileSource::incomingDataHandler1() {

int ret;

if (!isCurrentlyAwaitingData()) return; // we're not ready for the data yet

ret = readFromFile();

if (ret < ) {

handleClosure(this);

fprintf(stderr,"In Grab Image, the source stops being readable!!!!\n");

}

else if (ret == )

{

if( uSecsToDelay >= uSecsToDelayMax )

{

uSecsToDelay = uSecsToDelayMax;

}else{

uSecsToDelay *= ;

}

nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)incomingDataHandler, this);

}

else {

nextTask() = envir().taskScheduler().scheduleDelayedTask(, (TaskFunc*)afterGetting, this);

}

}

获取一帧数据后,执行延迟队列中的afterGetting()函数,此函数实现父类FramedSource中:

void FramedSource::afterGetting(FramedSource* source)

{

source->fIsCurrentlyAwaitingData = False;

// indicates that we can be read again

// Note that this needs to be done here, in case the "fAfterFunc"

// called below tries to read another frame (which it usually will)

if (source->fAfterGettingFunc != NULL) {

(*(source->fAfterGettingFunc))(source->fAfterGettingClientData,

source->fFrameSize,

source->fNumTruncatedBytes,

source->fPresentationTime,

source->fDurationInMicroseconds);

}

}

fAfterGettingFunc函数指针在getNextFrame()函数被赋值,在MultiFramedRTPSink::packFrame() 函数中,被赋值MultiFramedRTPSink::afterGettingFrame():

void MultiFramedRTPSink::afterGettingFrame(void* clientData, unsigned numBytesRead,

unsigned numTruncatedBytes,

struct timeval presentationTime,

unsigned durationInMicroseconds)

{

MultiFramedRTPSink* sink = (MultiFramedRTPSink*)clientData;

sink->afterGettingFrame1(numBytesRead, numTruncatedBytes,

presentationTime, durationInMicroseconds);

}

继续看afterGettingFrame1实现:

void MultiFramedRTPSink::afterGettingFrame1(

unsigned frameSize,

unsigned numTruncatedBytes,

struct timeval presentationTime,

unsigned durationInMicroseconds)

{

if (fIsFirstPacket) {

// Record the fact that we're starting to play now:

gettimeofday(&fNextSendTime, NULL);

}

//如果给予一帧的缓冲不够大,就会发生截断一帧数据的现象。但也只能提示一下用户

if (numTruncatedBytes > ) {

unsigned const bufferSize = fOutBuf->totalBytesAvailable();

envir()

<< "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size ("

<< bufferSize

<< "). "

<< numTruncatedBytes

<< " bytes of trailing data was dropped! Correct this by increasing \"OutPacketBuffer::maxSize\" to at least "

<< OutPacketBuffer::maxSize + numTruncatedBytes

<< ", *before* creating this 'RTPSink'. (Current value is "

<< OutPacketBuffer::maxSize << ".)\n";

}

unsigned curFragmentationOffset = fCurFragmentationOffset;

unsigned numFrameBytesToUse = frameSize;

unsigned overflowBytes = ;

//如果包只已经打入帧数据了,并且不能再向这个包中加数据了,则把新获得的帧数据保存下来。

// If we have already packed one or more frames into this packet,

// check whether this new frame is eligible to be packed after them.

// (This is independent of whether the packet has enough room for this

// new frame; that check comes later.)

if (fNumFramesUsedSoFar > ) {

//如果包中已有了一个帧,并且不允许再打入新的帧了,则只记录下新的帧。

if ((fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment())

|| !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize))

{

// Save away this frame for next time:

numFrameBytesToUse = ;

fOutBuf->setOverflowData(fOutBuf->curPacketSize(), frameSize,

presentationTime, durationInMicroseconds);

}

}

//表示当前打入的是否是上一个帧的最后一块数据。

fPreviousFrameEndedFragmentation = False;

//下面是计算获取的帧中有多少数据可以打到当前包中,剩下的数据就作为overflow数据保存下来。

if (numFrameBytesToUse > ) {

// Check whether this frame overflows the packet

if (fOutBuf->wouldOverflow(frameSize)) {

// Don't use this frame now; instead, save it as overflow data, and

// send it in the next packet instead. However, if the frame is too

// big to fit in a packet by itself, then we need to fragment it (and

// use some of it in this packet, if the payload format permits this.)

if (isTooBigForAPacket(frameSize)

&& (fNumFramesUsedSoFar == || allowFragmentationAfterStart())) {

// We need to fragment this frame, and use some of it now:

overflowBytes = computeOverflowForNewFrame(frameSize);

numFrameBytesToUse -= overflowBytes;

fCurFragmentationOffset += numFrameBytesToUse;

} else {

// We don't use any of this frame now:

overflowBytes = frameSize;

numFrameBytesToUse = ;

}

fOutBuf->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse,

overflowBytes, presentationTime, durationInMicroseconds);

} else if (fCurFragmentationOffset > ) {

// This is the last fragment of a frame that was fragmented over

// more than one packet. Do any special handling for this case:

fCurFragmentationOffset = ;

fPreviousFrameEndedFragmentation = True;

}

}

if (numFrameBytesToUse == && frameSize > ) {

//如果包中有数据并且没有新数据了,则发送之。(这种情况好像很难发生啊!)

// Send our packet now, because we have filled it up:

sendPacketIfNecessary();

} else {

//需要向包中打入数据。

// Use this frame in our outgoing packet:

unsigned char* frameStart = fOutBuf->curPtr();

fOutBuf->increment(numFrameBytesToUse);

// do this now, in case "doSpecialFrameHandling()" calls "setFramePadding()" to append padding bytes

// Here's where any payload format specific processing gets done:

doSpecialFrameHandling(curFragmentationOffset, frameStart,

numFrameBytesToUse, presentationTime, overflowBytes);

++fNumFramesUsedSoFar;

// Update the time at which the next packet should be sent, based

// on the duration of the frame that we just packed into it.

// However, if this frame has overflow data remaining, then don't

// count its duration yet.

if (overflowBytes == ) {

fNextSendTime.tv_usec += durationInMicroseconds;

fNextSendTime.tv_sec += fNextSendTime.tv_usec / ;

fNextSendTime.tv_usec %= ;

}

//如果需要,就发出包,否则继续打入数据。

// Send our packet now if (i) it's already at our preferred size, or

// (ii) (heuristic) another frame of the same size as the one we just

// read would overflow the packet, or

// (iii) it contains the last fragment of a fragmented frame, and we

// don't allow anything else to follow this or

// (iv) one frame per packet is allowed:

if (fOutBuf->isPreferredSize()

|| fOutBuf->wouldOverflow(numFrameBytesToUse)

|| (fPreviousFrameEndedFragmentation

&& !allowOtherFramesAfterLastFragment())

|| !frameCanAppearAfterPacketStart(

fOutBuf->curPtr() - frameSize, frameSize)) {

// The packet is ready to be sent now

sendPacketIfNecessary();

} else {

// There's room for more frames; try getting another:

packFrame();

}

}

}

看一下发送数据的函数:

void MultiFramedRTPSink::sendPacketIfNecessary()

{

//发送包

if (fNumFramesUsedSoFar > ) {

// Send the packet:

#ifdef TEST_LOSS

if ((our_random()%) != ) // simulate 10% packet loss #####

#endif

if (!fRTPInterface.sendPacket(fOutBuf->packet(),fOutBuf->curPacketSize())) {

// if failure handler has been specified, call it

if (fOnSendErrorFunc != NULL)

(*fOnSendErrorFunc)(fOnSendErrorData);

}

++fPacketCount;

fTotalOctetCount += fOutBuf->curPacketSize();

fOctetCount += fOutBuf->curPacketSize() - rtpHeaderSize

- fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes;

++fSeqNo; // for next time

}

//如果还有剩余数据,则调整缓冲区

if (fOutBuf->haveOverflowData()

&& fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize() / ) {

// Efficiency hack: Reset the packet start pointer to just in front of

// the overflow data (allowing for the RTP header and special headers),

// so that we probably don't have to "memmove()" the overflow data

// into place when building the next packet:

unsigned newPacketStart = fOutBuf->curPacketSize()-

(rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize());

fOutBuf->adjustPacketStart(newPacketStart);

} else {

// Normal case: Reset the packet start pointer back to the start:

fOutBuf->resetPacketStart();

}

fOutBuf->resetOffset();

fNumFramesUsedSoFar = ;

if (fNoFramesLeft) {

//如果再没有数据了,则结束之

// We're done:

onSourceClosure(this);

} else {

//如果还有数据,则在下一次需要发送的时间再次打包发送。

// We have more frames left to send. Figure out when the next frame

// is due to start playing, then make sure that we wait this long before

// sending the next packet.

struct timeval timeNow;

gettimeofday(&timeNow, NULL);

int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec;

int64_t uSecondsToGo = secsDiff *

+ (fNextSendTime.tv_usec - timeNow.tv_usec);

if (uSecondsToGo < || secsDiff < ) { // sanity check: Make sure that the time-to-delay is non-negative:

uSecondsToGo = ;

}

// Delay this amount of time:

nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo,

(TaskFunc*) sendNext, this);

}

}

当一帧数据发送完,在doEventLoop()函数执行任务函数sendNext(),继续发送一包,进行下一个循环。音频数据的发送也是如此。

总结一下调用过程(参考牛搞大神):

单播数据发送:

单播的时候,只有收到客服端的“PLAY”的命令时,才开始发送数据,在RTSPClientSession类中handleCmd_PLAY()函数中调用

void RTSPServer::RTSPClientSession

::handleCmd_PLAY(ServerMediaSubsession* subsession, char const* cseq,

char const* fullRequestStr)

{

... ...

fStreamStates[i].subsession->startStream(fOurSessionId,

fStreamStates[i].streamToken,

(TaskFunc*)noteClientLiveness,

this,

rtpSeqNum,

rtpTimestamp);

... ...

}

startStream()函数定义在OnDemandServerMediaSubsession类中:

void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,

void* streamToken,

TaskFunc* rtcpRRHandler,

void* rtcpRRHandlerClientData,

unsigned short& rtpSeqNum,

unsigned& rtpTimestamp)

{

StreamState* streamState = (StreamState*)streamToken;

Destinations* destinations = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));

if (streamState != NULL) {

streamState->startPlaying(destinations, rtcpRRHandler, rtcpRRHandlerClientData);

if (streamState->rtpSink() != NULL) {

rtpSeqNum = streamState->rtpSink()->currentSeqNo();

rtpTimestamp = streamState->rtpSink()->presetNextTimestamp();

}

}

}

startPlaying函数实现在StreamState类中:

void StreamState::startPlaying(Destinations* dests,

TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData)

{

if (dests == NULL) return;

if (!fAreCurrentlyPlaying && fMediaSource != NULL) {

if (fRTPSink != NULL) {

fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);

fAreCurrentlyPlaying = True;

} else if (fUDPSink != NULL) {

fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);

fAreCurrentlyPlaying = True;

}

}

if (fRTCPInstance == NULL && fRTPSink != NULL) {

// Create (and start) a 'RTCP instance' for this RTP sink:

fRTCPInstance = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs,

fTotalBW, (unsigned char*)fMaster.fCNAME,

fRTPSink, NULL /* we're a server */);

// Note: This starts RTCP running automatically

}

if (dests->isTCP) {

// Change RTP and RTCP to use the TCP socket instead of UDP:

if (fRTPSink != NULL) {

fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);

}

if (fRTCPInstance != NULL) {

fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);

fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,

rtcpRRHandler, rtcpRRHandlerClientData);

}

} else {

// Tell the RTP and RTCP 'groupsocks' about this destination

// (in case they don't already have it):

if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort);

if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort);

if (fRTCPInstance != NULL) {

fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,

rtcpRRHandler, rtcpRRHandlerClientData);

}

}

}

这个函数就会去调用RTPSink类中的startPlaying()函数,但是RTPSink没有实现,直接调用父类MediaSink中的startPlaying函数。后面就跟组播一样的采集,组包,发送数据了。

zookeeper源码分析之五服务端&lpar;集群leader&rpar;处理请求流程

leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...

zookeeper源码分析之四服务端&lpar;单机&rpar;处理请求流程

上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...

Netty源码分析之服务端启动过程

一.首先来看一段服务端的示例代码: public class NettyTestServer { public void bind(int port) throws Exception{ EventL ...

MVC系列学习&lpar;十二&rpar;-服务端的验证

在前一讲,提到过,客户端的东西永远可以造假,所以我们还要在服务端进行验证 注意:先加载表单,后添加js文件,才能有效:而先加载js,后添加表单,是没有效果的 1.视图与Model中的代码如下 2.一张 ...

muduo库源码剖析&lpar;二&rpar; 服务端

一. TcpServer类: 管理所有的TCP客户连接,TcpServer供用户直接使用,生命期由用户直接控制.用户只需设置好相应的回调函数(如消息处理messageCallback)然后TcpSer ...

4&period; 源码分析---SOFARPC服务端暴露

服务端的示例 我们首先贴上我们的服务端的示例: public static void main(String[] args) { ServerConfig serverConfig = new Ser ...

Photon Server 实现注册与登录&lpar;二&rpar; --- 服务端代码整理

一.有的代码前端和后端都会用到.比如一些请求的Code.使用需要新建项目存放公共代码. 新建项目Common存放公共代码: EventCode :存放服务端自动发送信息给客户端的code Operat ...

zookeeper源码分析之一服务端启动过程

zookeeper简介 zookeeper是为分布式应用提供分布式协作服务的开源软件.它提供了一组简单的原子操作,分布式应用可以基于这些原子操作来实现更高层次的同步服务,配置维护,组管理和命名.zoo ...

TeamTalk源码分析之服务端描述

TTServer(TeamTalk服务器端)主要包含了以下几种服务器: LoginServer (C++): 登录服务器,分配一个负载小的MsgServer给客户端使用 MsgServer (C++) ...

Netty源码分析之服务端启动

Netty服务端启动代码: public final class EchoServer { static final int PORT = Integer.parseInt(System.getPro ...

随机推荐

PHP代码编写规范

一. 变量命名 a) 所有字母都使用小写 b) 首字母根据变量值类型指定 i. 整数i ii. 浮点数f iii. 字符串s iv. 布尔值b v. 数组a vi. 对象o vii. 资源r viii ...

kali linux karmetasploit配置【续】

Karmetasploit In Action https://www.offensive-security.com/metasploit-unleashed/karmetasploit-action ...

KindEditor设置为过滤模式,但在代码模式下提交表单时不过虑HTML标签的解决方法

KindEditor设置filterMode为true,但在代码模式下提交表单的话,发现并没有过虑掉自己不想保留的HTML标签. 这时只需同步内容前加上红色部分内容即可: onClick=" ...

Xpath定位总结

先贴上练习xpath的地址:http://www.w3school.com.cn/example/xmle/books.xml 或则也可以使用百度进行练习 1.相对定位与绝对定位 //表示相对定位,对 ...

type 命令详解

type  作用: 用来显示指定命令的类型,判断出命令是内部命令还是外部命令. 命令类型: alias: 别名 keyword:关键字, shell 保留字 function:函数, shell函数 ...

Python之路&lpar;第三十四篇&rpar; 网络编程:验证客户端合法性

一.验证客户端合法性 如果你想在分布式系统中实现一个简单的客户端链接认证功能,又不像SSL那么复杂,那么利用hmac+加盐的方式来实现. 客户端验证的总的思路是将服务端随机产生的指定位数的字节发送到客 ...

(转)awesome-text-summarization

awesome-text-summarization 2018-07-19 10:45:13 A curated list of resources dedicated to text summari ...

UVa Dropping Balls

题目链接: https://cn.vjudge.net/problem/UVA-679 /* 问题 输入完全二叉树的层数D和有几个小球滚落,计算最后一个小球落入的叶子结点的小号. 解题思路 直接模拟超 ...

&lbrack;Baltic2014&rsqb;friends

嘟嘟嘟 首先想想暴力的做法,枚举加入的字符,然后判断删去这个字符后两个长度为n / 2的字符串是否相等,复杂度O(n2). 所以可以想办法把判断复杂度降低到O(1),那自然就想到hash了.hash是 ...

live555服务器性能,Live555 分析(二):服务端相关推荐

  1. 在以TCP为连接方式的服务器中,为什么在服务端设计当中需要考虑心跳?

    https://www.zhihu.com/question/35013918 在以TCP为连接方式的服务器中,为什么在服务端设计当中需要考虑心跳? 这个心跳包除了告知服务端我在线,还有其他作用吗?比 ...

  2. zookeeper源码分析之四服务端(单机)处理请求流程

    上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...

  3. 服务器ws证书,如何让服务端同时支持WebSocket和SSL加密的WebSocket(即同时支持ws和wss)?...

    自从HTML5出来以后,使用WebSocket通信就变得火热起来,基于WebSocket开发的手机APP和手机游戏也越来越多.我的一些开发APP的朋友,开始使用WebSocket通信,后来觉得通信不够 ...

  4. zookeeper源码分析之五服务端(集群leader)处理请求流程

    leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...

  5. 一款好用的Linux系统服务器性能监控分析工具介绍

    软件性能测试过程中经常要对服务器性能指标(比如CPU.内存.磁盘IO及网络IO等等)进行监控以分析出软件在此服务器上的性能瓶颈以便进行后续的服务器调优及软件性能优化.下面为大家介绍一款小编认为比较好用 ...

  6. hessian原理解析二(服务端分析)

    hessian 服务端源码分析 我们在回头看看 web.xml 中 servlet 配置 <servlet> <servlet-name>HelloHessian</se ...

  7. 聊聊WebRTC网关服务器1:如何选择服务端端口方案?

    <聊聊WebRTC网关服务器>系列文章系由WebRTCon2018中网易云信音视频技术专家的分享内容<从零开始构建音视频网关服务器>整理而成,该系列文章将和大家分享网易NRTC ...

  8. SharedCache分析:服务端程序

    SharedCache由3个主要的项目组成MergeSystem.Indexus.WinServiceCommon.MergeSystem.Indexus.WinService和MergeSystem ...

  9. Linux服务器性能查看分析调优

    一 linux服务器性能查看 1.1 cpu性能查看 1.查看物理cpu个数: cat /proc/cpuinfo |grep "physical id"|sort|uniq|wc ...

最新文章

  1. WinAPI: waveOutGetPlaybackRate - 获取输出设备当前的播放速度设置(默认速度值的倍数)...
  2. 用 PHP 读取 XML
  3. windows10商店无法下载linux,Windows 10 Store 下载极慢
  4. “记录”是给世界做积极贡献的一种方式
  5. 双光子荧光成像_有机双光子荧光染料在生物成像中的应用取得新进展
  6. 指针数组vs数组指针 指针函数vs函数指针
  7. 学习三层结构心得(一)
  8. PBRT-V3体渲染笔记
  9. 2500个常用汉字及繁体对应
  10. C++类的定义和声明怎么写
  11. PS小知识(三)——画圆滑线及虚线
  12. SQLSERVER2012备份日志报错:”读取失败: 23(数据错误(循环冗余检查)。)”
  13. python当前运行目录_Python获取运行目录与当前脚本目录的方法
  14. 如何解决王者荣耀排位赛中的系统制裁
  15. 5G科普——CU和DU分离
  16. 宇视摄像头尾线防水处理
  17. uploader上传
  18. 安装ps显示无法连接到adobe服务器,Adobe系列软件Photoshop安装时提示“安装完成,下列产品的某些项目无法成功安装”错误的原因及解决方法...
  19. 文献记录(part109)--Self-Representation Based Unsupervised Exemplar Selection in a Union of Subspaces
  20. 计算机组成原理与系统结构习题集

热门文章

  1. 什么是轻量级框架,什么是重量级框架?
  2. 谈android界面设计
  3. srcelement、parentElement
  4. MySql授权某个表的权限给新建用户
  5. 【STM32】知识补充 深入探讨预分频器
  6. 教程示例:控制存储空间和文件夹的访问权限
  7. SSE和WebSocket
  8. 蛋糕做出新高度,投资不高却收入“甜蜜”,他们怎样做的?
  9. pwnable 题目解析:[Toddler's Bottle]-lotto 逻辑漏洞
  10. 决策树中的熵与信息增益