您的位置:首页 > 编程语言 > PHP开发

Live555学习笔记(四)—— RTP数据流向分析

2016-07-01 13:35 1361 查看
    本文先分析流文件数据的打包和发送,然后再分析live555 的数据采集,其中大部分是Live555的源码跟踪分析,涉及到JPEG的则是自己在live55库上添加的代码。


RTP Server 接收到客户端命令 play后调用的函数是 handleCmd_Play 。在该函数中会调用void StreamState::startPlaying函数

void StreamState
::startPlaying(Destinations* dests, unsigned clientSessionId,
TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData,
ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
void* serverRequestAlternativeByteHandlerClientData) {
if (dests == NULL) return;

if (fRTCPInstance == NULL && fRTPSink != NULL) {
// Create (and start) a 'RTCP instance' for this RTP sink:
fRTCPInstance = fMaster.createRTCP(fRTCPgs, fTotalBW, (unsigned char*)fMaster.fCNAME, fRTPSink);
// Note: This starts RTCP running automatically
fRTCPInstance->setAppHandler(fMaster.fAppHandlerTask, fMaster.fAppHandlerClientData);

if (dests->isTCP) {
// Change RTP and RTCP to use the TCP socket instead of UDP:
if (fRTPSink != NULL) {
fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
::setServerRequestAlternativeByteHandler(fRTPSink->envir(), dests->tcpSocketNum,
serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
// So that we continue to handle RTSP commands from the client
if (fRTCPInstance != NULL) {
fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,
rtcpRRHandler, rtcpRRHandlerClientData);
} else {
// Tell the RTP and RTCP 'groupsocks' about this destination
// (in case they don't already have it):
if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort, clientSessionId);
if (fRTCPgs != NULL && !(fRTCPgs == fRTPgs && dests->rtcpPort.num() == dests->rtpPort.num())) {
fRTCPgs->addDestination(dests->addr, dests->rtcpPort, clientSessionId);
if (fRTCPInstance != NULL) {
fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
rtcpRRHandler, rtcpRRHandlerClientData);

if (fRTCPInstance != NULL) {
// Hack: Send an initial RTCP "SR" packet, before the initial RTP packet, so that receivers will (likely) be able to
// get RTCP-synchronized presentation times immediately:

if (!fAreCurrentlyPlaying && fMediaSource != NULL) {
if (fRTPSink != NULL) {
fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);     /* 在这里调用Sink 的startPlaying*/
fAreCurrentlyPlaying = True;
} else if (fUDPSink != NULL) {
fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
fAreCurrentlyPlaying = True;
进入Boolean MediaSink::startPlaying()函数
Boolean MediaSink::startPlaying(MediaSource& source,
afterPlayingFunc* afterFunc,
void* afterClientData) {
// Make sure we're not already being played:
if (fSource != NULL) {
envir().setResultMsg("This sink is already being played");
return False;

// Make sure our source is compatible:
if (!sourceIsCompatibleWithUs(source)) {
envir().setResultMsg("MediaSink::startPlaying(): source is not compatible!");
return False;
fSource = (FramedSource*)&source;

fAfterFunc = afterFunc;
fAfterClientData = afterClientData;
return continuePlaying();       /* 进入下一步处理 */
下面进入Boolean MultiFramedRTPSink::continuePlaying() 函数,该函数里面只调用了buildAndSendPacket 一个函数。
Boolean MultiFramedRTPSink::continuePlaying() {
// Send the first packet.
// (This will also schedule any future sends.)
return True;
void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket) {        (0.0)
fIsFirstPacket = isFirstPacket;
/* 准备RTP包的包头 */
// Set up the RTP header:
unsigned rtpHdr = 0x80000000; // RTP version 2; marker ('M') bit not set (by default; it can be set later)
rtpHdr |= (fRTPPayloadType<<16);
rtpHdr |= fSeqNo; // sequence number

// Note where the RTP timestamp will go.
// (We can't fill this in until we start packing payload frames.)
fTimestampPosition = fOutBuf->curPacketSize();
fOutBuf->skipBytes(4); // leave a hole for the timestamp /* 预留是个字节来装时间搓*/


// Allow for a special, payload-format-specific header following the
// RTP header:
fSpecialHeaderPosition = fOutBuf->curPacketSize();
fSpecialHeaderSize = specialHeaderSize();

// Begin packing as many (complete) frames into the packet as we can:
fTotalFrameSpecificHeaderSizes = 0;
fNoFramesLeft = False;
fNumFramesUsedSoFar = 0;
packFrame();/* 包头已经打包好,开始打包数据*/
void MultiFramedRTPSink::packFrame() {
// Get the next frame.

// First, skip over the space we'll use for any frame-specific header:
fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize();
fCurFrameSpecificHeaderSize = frameSpecificHeaderSize();
fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize;

// See if we have an overflow frame that was too big for the last pkt
if (fOutBuf->haveOverflowData()) {                                                                (1.0)
// Use this frame before reading a new one from the source
unsigned frameSize = fOutBuf->overflowDataSize();
struct timeval presentationTime = fOutBuf->overflowPresentationTime();
unsigned durationInMicroseconds = fOutBuf->overflowDurationInMicroseconds();

afterGettingFrame1(frameSize, 0, presentationTime, durationInMicroseconds);
} else {                                                                                          (2.0)
// Normal case: we need to read a new frame from the source
if (fSource == NULL) return;
fSource->getNextFrame(fOutBuf->curPtr(), fOutBuf->totalBytesAvailable(),
afterGettingFrame, this, ourHandleClosure, this);
因此这里就出现了文章开头的函数调用关系中的两个分支。先分析分支一进入afterGettingFrame1 函数,再分析fSource->getNextFrame 的调用。

分支一 :
void MultiFramedRTPSink
::afterGettingFrame1(unsigned frameSize, unsigned numTruncatedBytes,
struct timeval presentationTime,
unsigned durationInMicroseconds) {
if (fIsFirstPacket) {
// Record the fact that we're starting to play now:
gettimeofday(&fNextSendTime, NULL);

fMostRecentPresentationTime = presentationTime;
if (fInitialPresentationTime.tv_sec == 0 && fInitialPresentationTime.tv_usec == 0) {
fInitialPresentationTime = presentationTime;

if (numTruncatedBytes > 0) {/*如果缓存小于一帧数据的大小,那么这一帧数据就会被截断,因此打印消息提示问题点 */
unsigned const bufferSize = fOutBuf->totalBytesAvailable();
envir() << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size ("
<< bufferSize << ").  "
<< numTruncatedBytes << " bytes of trailing data was dropped!  Correct this by increasing \"OutPacketBuffer::maxSize\" to at least "
<< OutPacketBuffer::maxSize + numTruncatedBytes << ", *before* creating this 'RTPSink'.  (Current value is "
<< OutPacketBuffer::maxSize << ".)\n";
unsigned curFragmentationOffset = fCurFragmentationOffset;
unsigned numFrameBytesToUse = frameSize;
unsigned overflowBytes = 0;

// If we have already packed one or more frames into this packet,
// check whether this new frame is eligible to be packed after them.
// (This is independent of whether the packet has enough room for this
// new frame; that check comes later.)
if (fNumFramesUsedSoFar > 0) {
if ((fPreviousFrameEndedFragmentation
&& !allowOtherFramesAfterLastFragment())
|| !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize)) {
// Save away this frame for next time:
numFrameBytesToUse = 0;
fOutBuf->setOverflowData(fOutBuf->curPacketSize(), frameSize,
presentationTime, durationInMicroseconds);
fPreviousFrameEndedFragmentation = False;

if (numFrameBytesToUse > 0) {
// Check whether this frame overflows the packet
if (fOutBuf->wouldOverflow(frameSize)) {
// Don't use this frame now; instead, save it as overflow data, and
// send it in the next packet instead.  However, if the frame is too
// big to fit in a packet by itself, then we need to fragment it (and
// use some of it in this packet, if the payload format permits this.)
if (isTooBigForAPacket(frameSize)
&& (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) {
// We need to fragment this frame, and use some of it now:
overflowBytes = computeOverflowForNewFrame(frameSize);
numFrameBytesToUse -= overflowBytes;
fCurFragmentationOffset += numFrameBytesToUse;
} else {
// We don't use any of this frame now:
overflowBytes = frameSize;
numFrameBytesToUse = 0;
fOutBuf->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse,
overflowBytes, presentationTime, durationInMicroseconds);
} else if (fCurFragmentationOffset > 0) {
// This is the last fragment of a frame that was fragmented over
// more than one packet.  Do any special handling for this case:
fCurFragmentationOffset = 0;
fPreviousFrameEndedFragmentation = True;

if (numFrameBytesToUse == 0 && frameSize > 0) {
// Send our packet now, because we have filled it up:
} else {
// Use this frame in our outgoing packet:
unsigned char* frameStart = fOutBuf->curPtr();
// do this now, in case "doSpecialFrameHandling()" calls "setFramePadding()" to append padding bytes

// Here's where any payload format specific processing gets done:
doSpecialFrameHandling(curFragmentationOffset, frameStart,
numFrameBytesToUse, presentationTime,


// Update the time at which the next packet should be sent, based
// on the duration of the frame that we just packed into it.
// However, if this frame has overflow data remaining, then don't
// count its duration yet.
if (overflowBytes == 0) {
fNextSendTime.tv_usec += durationInMicroseconds;
fNextSendTime.tv_sec += fNextSendTime.tv_usec/1000000;
fNextSendTime.tv_usec %= 1000000;

// Send our packet now if (i) it's already at our preferred size, or
// (ii) (heuristic) another frame of the same size as the one we just
//      read would overflow the packet, or
// (iii) it contains the last fragment of a fragmented frame, and we
//      don't allow anything else to follow this or
// (iv) one frame per packet is allowed:
if (fOutBuf->isPreferredSize()
|| fOutBuf->wouldOverflow(numFrameBytesToUse)
|| (fPreviousFrameEndedFragmentation &&
|| !frameCanAppearAfterPacketStart(fOutBuf->curPtr() - frameSize,
frameSize) ) {
// The packet is ready to be sent now
sendPacketIfNecessary();                                                                    (3.0)
} else {
// There's room for more frames; try getting another:
packFrame();                                                                                (4.0)
void MultiFramedRTPSink::sendPacketIfNecessary() {
if (fNumFramesUsedSoFar > 0) {
// Send the packet:
#ifdef TEST_LOSS
if ((our_random()%10) != 0) // simulate 10% packet loss #####
if (!fRTPInterface.sendPacket(fOutBuf->packet(), fOutBuf->curPacketSize())) {                    (5.0)
// if failure handler has been specified, call it
if (fOnSendErrorFunc != NULL) (*fOnSendErrorFunc)(fOnSendErrorData);
fTotalOctetCount += fOutBuf->curPacketSize();
fOctetCount += fOutBuf->curPacketSize()
- rtpHeaderSize - fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes;

++fSeqNo; // for next time

if (fOutBuf->haveOverflowData() /*如果当前帧还有数据,那么调整缓冲区 */
&& fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize()/2) {
// Efficiency hack: Reset the packet start pointer to just in front of
// the overflow data (allowing for the RTP header and special headers),
// so that we probably don't have to "memmove()" the overflow data
// into place when building the next packet:
unsigned newPacketStart = fOutBuf->curPacketSize()
- (rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize());
} else {
// Normal case: Reset the packet start pointer back to the start:
fNumFramesUsedSoFar = 0;

if (fNoFramesLeft) {
// We're done:
} else {
// We have more frames left to send.  Figure out when the next frame
// is due to start playing, then make sure that we wait this long before
// sending the next packet.
struct timeval timeNow;
gettimeofday(&timeNow, NULL);
int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec;
int64_t uSecondsToGo = secsDiff*1000000 + (fNextSendTime.tv_usec - timeNow.tv_usec);
if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative:
uSecondsToGo = 0;

// Delay this amount of time:
nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo, (TaskFunc*)sendNext, this);  (6.0)
(5.0)这里通过Boolean RTPInterface::sendPacket --> Boolean Groupsock::output 把数据直接发送到链接的网络上。
(6.0)这里需要注意,这是一个延时调用,在uSecondsToGo 时间之后调用函数void MultiFramedRTPSink::sendNext()如果当前帧已经发送完毕,那么就去获取下一帧数据继续发送。
void MultiFramedRTPSink::sendNext(void* firstArg) {
MultiFramedRTPSink* sink = (MultiFramedRTPSink*)firstArg;
sink->buildAndSendPacket(False);                                                                      (7.0)

// See if we have an overflow frame that was too big for the last pkt
if (fOutBuf->haveOverflowData()) {
// Use this frame before reading a new one from the source
unsigned frameSize = fOutBuf->overflowDataSize();
struct timeval presentationTime = fOutBuf->overflowPresentationTime();
unsigned durationInMicroseconds = fOutBuf->overflowDurationInMicroseconds();

afterGettingFrame1(frameSize, 0, presentationTime, durationInMicroseconds);                       (1.0)
} else {
// Normal case: we need to read a new frame from the source
if (fSource == NULL) return;
fSource->getNextFrame(fOutBuf->curPtr(), fOutBuf->totalBytesAvailable(),                          (2.0)
afterGettingFrame, this, ourHandleClosure, this);
注意在(2.0)中的fSource->getNextFrame 函数中,它保存了两个回调函数的地址:
void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize,
afterGettingFunc* afterGettingFunc,
void* afterGettingClientData,
onCloseFunc* onCloseFunc,
void* onCloseClientData) {
// Make sure we're not already being read:
if (fIsCurrentlyAwaitingData) {
envir() << "FramedSource[" << this << "]::getNextFrame(): attempting to read more than once at the same time!\n";

fTo = to;
fMaxSize = maxSize;
fNumTruncatedBytes = 0; // by default; could be changed by doGetNextFrame()
fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame()
fAfterGettingFunc = afterGettingFunc;                                                               (8.0)
fAfterGettingClientData = afterGettingClientData;
fOnCloseFunc = onCloseFunc;                                                                         (9.0)
fOnCloseClientData = onCloseClientData;
fIsCurrentlyAwaitingData = True;

doGetNextFrame();                                                                                   (10.0)
(8.0)因为可能source中的读数据函数会被放在任务调度中,所以把获取帧后应调用的函数传给source,这里的函数指针指向:void MultiFramedRTPSink::afterGettingFrame

(10.0)中的doGetNextFrame 这里不同的编码格式有不同的实现方式。一般会在doGetNextFrame  函数中调用void MultiFramedRTPSink::afterGettingFrame 函数。这样就把数据传递给了前面提到的分支一,然后再通过分支一把数据发送出去。
void JPEGDeviceSource::deliverFrameToClient() {
unsigned newFrameSize = fsendpack->output_buffer.nSize0 + fsendpack->output_buffer.nSize1;

fNeedAFrame = False;

// Set the 'presentation time': the time that this frame was captured
fPresentationTime = fLastCaptureTime;
if (newFrameSize > fMaxSize) {
fFrameSize = fMaxSize;
fNumTruncatedBytes = newFrameSize - fMaxSize;
printf("fNumTruncatedBytes is 0x%x\n",fNumTruncatedBytes);
} else {
fFrameSize = newFrameSize;
memcpy(fTo, fsendpack->output_buffer.pData0, fsendpack->output_buffer.nSize0);
if(fsendpack->output_buffer.nSize1 > 0)
memcpy(fTo + fsendpack->output_buffer.nSize0, fsendpack->output_buffer.pData1, fsendpack->output_buffer.nSize1);
FreeOneBitStreamFrame(pVideoEnc, &fsendpack->output_buffer);
fsendpack->dec = 0;

// envir()<<"test send a frame date to client\n";

// Switch to another task, and inform the reader that he has data:
nextTask() = envir().taskScheduler().scheduleDelayedTask(0,
(TaskFunc*)FramedSource::afterGetting, this);                     (11.0)

在(11.0)中会调用void FramedSource::afterGetting 函数,里面实际调用的是
void FramedSource::afterGetting(FramedSource* source) {
source->fIsCurrentlyAwaitingData = False;
// indicates that we can be read again
// Note that this needs to be done here, in case the "fAfterFunc"
// called below tries to read another frame (which it usually will)
if (source->fAfterGettingFunc != NULL) {
(*(source->fAfterGettingFunc))(source->fAfterGettingClientData,                                 (12.0)
source->fFrameSize, source->fNumTruncatedBytes,
(12.0)会掉函数的调用,实际上就是(8.0)的函数void MultiFramedRTPSink::afterGettingFrame  这样与分支一也形成了一个闭环。


    服务端在收到客户端发送的setup命令后,创建RTP链接,启动设备采集数据,最后循环更新设备数据。查看代码,函数void RTSPServer::RTSPClientSession

::handleCmd_SETUP 和函数void OnDemandServerMediaSubsession::getStreamParameters 源码代码太多,不再单独列出。主要分析JPEG相关的代码,该部分代码为后面自己添加的代码,是为了能够从摄像头设备中读取到流文件数据。

FramedSource* JPEGVideoLiverServerMediaSubsession::createNewStreamSource(unsigned /*clientSessionId*/, unsigned& estBitrate) {
estBitrate = 35000; // kbps, estimate

unsigned timePerFrame = 1000000/15;
// Create a framer for the Video Elementary Stream:
return JPEGDeviceSource::createNew(envir(), timePerFrame);                                       (20)
JPEGDeviceSource::createNew(UsageEnvironment& env,
unsigned timePerFrame) {

return new JPEGDeviceSource(env, NULL, timePerFrame);
::JPEGDeviceSource(UsageEnvironment& env, FILE* fid,
unsigned timePerFrame)
: JPEGVideoSource(env),
fTimePerFrame(timePerFrame) {

AWCameraContext *Context;
V4L2_CONTEXT *fv4l2Context;

// Ask to be notified when data becomes available on the camera's socket:
1,init codec
2,init camera


getpack = get_head_list();
fsendpack= get_head_list();
initCodecParam(WIDTH,HEIGHT,JPGQ);  /*初始化JPEG编码*/
initCamera(WIDTH,HEIGHT);<span style="white-space:pre">	</span>      /* 初始化摄像头 */

1,open cameradev
fNeedAFrame = False;
fv4l2Context=(V4L2_CONTEXT*)Context->v4l2ctx ;

envir()<<"test into function JPEGDeviceSource\n";
envir().taskScheduler().turnOnBackgroundReadHandling(fv4l2Context->mCamFd,      (21)
(TaskScheduler::BackgroundHandlerProc*)&newFrameHandler, this);

StartCamera(Context->v4l2ctx, &Context->width, &Context->height);  /*打开camera,camera开始采集数据 */

注意这里的(21),这里建立的是socket handler,在Live555 的任务调度中会每次执行这里的函数:newFrameHandler。它的实现如下:
void JPEGDeviceSource::newFrameHandler1() {

struct v4l2_buffer p_buf;
AWCameraContext * Context;
V4L2_CONTEXT *fv4l2Context;

int result=0,res;

VencInputBuffer input_buffer;

//envir()<<"test into function newFrameHandler1 \n";
if (getpack->dec==1)
envir()<<"getpack list full\n";
if (fNeedAFrame)
//	  		envir()<<"********fNeedAFrame********\n";
envir()<<"test getpack->dec==1 and  fNeedAFrame true \n";

return ;

fv4l2Context=(V4L2_CONTEXT*)Context->v4l2ctx ;

result = CameraGetOneframe(fv4l2Context, &p_buf);
envir()<<"CameraGetOneframe fail\n";

input_buffer.nID = p_buf.index;
input_buffer.pAddrPhyY =(unsigned char*)p_buf.m.offset;
input_buffer.pAddrPhyC = (unsigned char*)(p_buf.m.offset + 640*480);
input_buffer.bEnableCorp = 0;
input_buffer.sCropInfo.nLeft =  240;
input_buffer.sCropInfo.nTop  =  240;
input_buffer.sCropInfo.nWidth  =  240;
input_buffer.sCropInfo.nHeight =  240;

res = AddOneInputBuffer(pVideoEnc, &input_buffer);
if (res<0){
printf("AddOneInputBuffer Error!!!\n");
cameraDev->returnFrame(cameraDev, input_buffer.nID);

res= AlreadyUsedInputBuffer(pVideoEnc,&input_buffer);
cameraDev->returnFrame(cameraDev, input_buffer.nID);
GetOneBitstreamFrame(pVideoEnc, &getpack->output_buffer);
getpack->dec = 1;

if (fNeedAFrame)                                                                       (22)
envir()<<"test fNeedAFrame is true \n";
这里有两个互斥标志变量fNeedAFrame 和 getpack->dec, dec为解码标志变量,fNeedAFrame 为数据获取标志为。
从上面的分析可以看出,摄像头设备一直在采集数据,然后RPT在收到play命令后 开始不断的数据打包发送,这样从设备就可以接收到连续的流文件数据。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息