您的位置:首页 > 编程语言

ortp 源代码 阅读注释

2011-01-21 15:09 381 查看
ortp

文件结构

port.c 都是跨平台的东西,定义了互斥器 内存分配,多线程 网络socket

rtp.c 定义了 数据结构 rtp_header_t rtp_stats_t 以及相关的一些宏

#define RTP_TIMESTAMP_IS_NEWER_THAN(ts1,ts2) \

((uint32_t)((uint32_t)(ts1) - (uint32_t)(ts2))< (uint32_t)(1<<31))

#define RTP_TIMESTAMP_IS_STRICTLY_NEWER_THAN(ts1,ts2) \

( ((uint32_t)((uint32_t)(ts1) - (uint32_t)(ts2))< (uint32_t)(1<<31)) && (ts1)!=(ts2) )

#define TIME_IS_NEWER_THAN(t1,t2) RTP_TIMESTAMP_IS_NEWER_THAN(t1,t2)

#define TIME_IS_STRICTLY_NEWER_THAN(t1,t2) RTP_TIMESTAMP_IS_STRICTLY_NEWER_THAN(t1,t2)

就是对两个unsigned in 的比较,第一个宏 是 ts1 <= ts2 第二个是 ts1 < ts2 这个还要好好想想

utils.h 定义了一个链表 几个简单的宏

payloadtype.h 定义了数据结构 PayloadType 音频数据的 都是payload的东西 和函数

一个OList 的双向链表

struct _OList {

struct _OList *next;

struct _OList *prev;

void *data;

};

后面要用到

sessionset.c 很牛叉的东西 是非阻塞会话的调度的东西

session->mask_pos 标志 自己的掩码在scheduler中掩码的位置,scheduler有一个掩码,scheduler管理所有注册的会话,如果是非阻塞模式 每一次需求都会置 会话的掩码,当需求满足了会将scheduler掩码对应位置置位,sessionset_set_select函数会将所有注册的会话掩码和 scheduler的掩码比对发现某个会话的需求满足了 就返回,他把scheduler与会话中掩码相同的部分放在会话中,从scheduler中删除这些掩码,session_set_is_set就是判断会话的掩码与scheduler的掩码是否相同,相同返回0,不同返回1

//将set(s)的第d位置为1

#define ORTP_FD_SET(d, s) (ORTP__FDS_BITS (s)[ORTP__FDELT(d)] |= ORTP__FDMASK(d))

//将s的第d为置为0

#define ORTP_FD_CLR(d, s) (ORTP__FDS_BITS (s)[ORTP__FDELT(d)] &= ~ORTP__FDMASK(d))

//判断s 的第d位是否为1

#define ORTP_FD_ISSET(d, s) ((ORTP__FDS_BITS (s)[ORTP__FDELT(d)] & ORTP__FDMASK(d)) != 0)

/* The fd_set member is required to be an array of longs. */

typedef long int ortp__fd_mask;

/* Number of bits per word of `fd_set' (some code assumes this is 32). */

#define ORTP__FD_SETSIZE 1024

/* It's easier to assume 8-bit bytes than to get CHAR_BIT. */

#define ORTP__NFDBITS (8 * sizeof (ortp__fd_mask))

#define ORTP__FDELT(d) ((d) / ORTP__NFDBITS)

#define ORTP__FDMASK(d) ((ortp__fd_mask) 1 << ((d) % ORTP__NFDBITS))

//set一共有1024位 因为使用ortp__fd_mask表示 所以计算一下大小

/* fd_set for select and pselect. */

typedef struct

{

ortp__fd_mask fds_bits[ORTP__FD_SETSIZE / ORTP__NFDBITS];

# define ORTP__FDS_BITS(set) ((set)->fds_bits)

} ortp_fd_set;

str_util.s str_util.c 里面定义了一些数据结构 一个先进先出的queue,使用双向链表实现

rtpsignaltable.h .c

struct _RtpSignalTable

{

RtpCallback callback[RTP_CALLBACK_TABLE_MAX_ENTRIES];

unsigned long user_data[RTP_CALLBACK_TABLE_MAX_ENTRIES];

struct _RtpSession *session; //指向rtpSession

const char *signal_name;

int count;

};

信号量表 这里知道有这个表就行

里面有一个这个函数 应该是信号量激活的时候执行回调函数

里面的c++开始还没明白什么意思。看了一下才懂,因为callback是一个数组,每一次删除的时候把指定的callback删除,在中间空下了就空下来了,添加的是后只要找距离开始最近的空位置添加进来就行

void rtp_signal_table_emit(RtpSignalTable *table)

{

int i,c;

for (i=0,c=0;c<table->count;i++){

if (table->callback[i]!=NULL){

c++; /*I like it*/

table->callback[i](table->session,table->user_data[i]);

}

}

}

event.c .h

typedef mblk_t OrtpEvent;

typedef struct OrtpEvQueue{

queue_t q;

ortp_mutex_t mutex;

} OrtpEvQueue;

使用 queue 然后加上一个互斥器 实现多线程的安全操作

typedef struct RtpEndpoint{

#ifdef ORTP_INET6

struct sockaddr_storage addr;

#else

struct sockaddr addr;

#endif

socklen_t addrlen;

}RtpEndpoint;

不过这个东东 我不知道什么东西

rtpsession.c

添加数据包操作

/* put an rtp packet in queue. It is called by rtp_parse()*/

void rtp_putq(queue_t *q, mblk_t *mp)

{

mblk_t *tmp;

rtp_header_t *rtp=(rtp_header_t*)mp->b_rptr,*tmprtp;

/* insert message block by increasing time stamp order : the last (at the bottom)

message of the queue is the newest*/

ortp_debug("rtp_putq(): Enqueuing packet with ts=%i and seq=%i",rtp->timestamp,rtp->seq_number);

if (qempty(q)) //如果队列是空的 直接压进来就行了

{

putq(q,mp);

return;

}

tmp=qlast(q); //指向最新的

/* we look at the queue from bottom to top, because enqueued packets have a better chance

to be enqueued at the bottom, since there are surely newer */

while (!qend(q,tmp)) //遍历队列

{

tmprtp=(rtp_header_t*)tmp->b_rptr;

ortp_debug("rtp_putq(): Seeing packet with seq=%i",tmprtp->seq_number);

if (rtp->seq_number == tmprtp->seq_number) //判断是否是相同的

{

/* this is a duplicated packet. Don't queue it */

ortp_debug("rtp_putq: duplicated message.");

freemsg(mp); //相同的就释放掉

return;

}

else if (RTP_SEQ_IS_GREATER(rtp->seq_number,tmprtp->seq_number)) //找到一个比他小的位置 插进去

{

insq(q,tmp->b_next,mp);

return;

}

tmp=tmp->b_prev;

}

/* this packet is the oldest, it has to be

placed on top of the queue */

insq(q,qfirst(q),mp); //没有找到合适的位置 就插在最前面了

}

取数据操作

mblk_t *rtp_getq(queue_t *q,uint32_t timestamp, int *rejected)

{

mblk_t *tmp,*ret=NULL,*old=NULL;

rtp_header_t *tmprtp;

uint32_t ts_found=0;

*rejected=0;

ortp_debug("rtp_getq(): Timestamp %i wanted.",timestamp);

if (qempty(q))

{

/*ortp_debug("rtp_getq: q is empty.");*/

return NULL;

}

/* return the packet with ts just equal or older than the asked timestamp */

/* packets with older timestamps are discarded */

while ((tmp=qfirst(q))!=NULL)

{

tmprtp=(rtp_header_t*)tmp->b_rptr;

ortp_debug("rtp_getq: Seeing packet with ts=%i",tmprtp->timestamp);

if ( RTP_TIMESTAMP_IS_NEWER_THAN(timestamp,tmprtp->timestamp) )

{

if (ret!=NULL && tmprtp->timestamp==ts_found)

{

/* we've found two packets with same timestamp. return the first one */

break;

}

if (old!=NULL)

{

ortp_debug("rtp_getq: discarding too old packet with ts=%i",ts_found);

(*rejected)++;

freemsg(old);

}

ret=getq(q); /* dequeue the packet, since it has an interesting timestamp*/

ts_found=tmprtp->timestamp;

ortp_debug("rtp_getq: Found packet with ts=%i",tmprtp->timestamp);

old=ret;

}

else

{

break;

}

}

return ret;

}

这个在队列中找时间到了的包,找到一个存在old里面,如果又找到了一个,发现前面old存在,则说明old是一个过期的,于是old就被抛弃了

//这是处理函数 在RtpScheduler线程中循环调用

/* time is the number of miliseconds elapsed since the start of the scheduler */

void rtp_session_process (RtpSession * session, uint32_t time, RtpScheduler *sched)

{

wait_point_lock(&session->snd.wp);

if (wait_point_check(&session->snd.wp,time)) //检查当前时间 发送的

{

session_set_set(&sched->w_sessions,session); //置位 如果是非阻塞的就可以通过标志位找到是哪个会话可用了

wait_point_wakeup(&session->snd.wp); //把session从中唤醒 如果是阻塞的就完成了

}

wait_point_unlock(&session->snd.wp);

wait_point_lock(&session->rcv.wp);

if (wait_point_check(&session->rcv.wp,time)) //检查接收

{

session_set_set(&sched->r_sessions,session);

wait_point_wakeup(&session->rcv.wp);

}

wait_point_unlock(&session->rcv.wp);

}

rtpsession_inet.c

里面定义了一些会话的网络部分

比如rtp_session_set_local_addr

rtp_session_set_remote_addr

void rtp_scheduler_start(RtpScheduler *sched)

{

if (sched->thread_running==0){

sched->thread_running=1;

ortp_mutex_lock(&sched->lock);

ortp_thread_create(&sched->thread, NULL, rtp_scheduler_schedule,(void*)sched);

ortp_cond_wait(&sched->unblock_select_cond,&sched->lock);

ortp_mutex_unlock(&sched->lock);

}

else ortp_warning("Scheduler thread already running.");

}

函数为scheduler创建了一个线程 到底在哪里开启的呢?看ortp.c 中的这个函数

void ortp_scheduler_init()

{

static bool_t initialized=FALSE;

if (initialized) return;

initialized=TRUE;

#ifdef __hpux

/* on hpux, we must block sigalrm on the main process, because signal delivery

is ?random?, well, sometimes the SIGALRM goes to both the main thread and the

scheduler thread */

sigset_t set;

sigemptyset(&set);

sigaddset(&set,SIGALRM);

sigprocmask(SIG_BLOCK,&set,NULL);

#endif /* __hpux */

__ortp_scheduler=rtp_scheduler_new();

rtp_scheduler_start(__ortp_scheduler);

//sleep(1);

}

这是scheduler 的线程函数

void * rtp_scheduler_schedule(void * psched)

{

RtpScheduler *sched=(RtpScheduler*) psched;

RtpTimer *timer=sched->timer;

RtpSession *current;

/* take this lock to prevent the thread to start until g_thread_create() returns

because we need sched->thread to be initialized */

ortp_mutex_lock(&sched->lock);

ortp_cond_signal(&sched->unblock_select_cond); /* unblock the starting thread */

ortp_mutex_unlock(&sched->lock);

timer->timer_init();

while(sched->thread_running)

{

/* do the processing here: */

ortp_mutex_lock(&sched->lock);

current=sched->list;

/* processing all scheduled rtp sessions */

while (current!=NULL) //遍历

{

ortp_debug("scheduler: processing session=0x%x.\n",current);

rtp_session_process(current,sched->time_,sched); //处理每一个注册的session

current=current->next;

}

/* wake up all the threads that are sleeping in _select() */

ortp_cond_broadcast(&sched->unblock_select_cond);

ortp_mutex_unlock(&sched->lock);

/* now while the scheduler is going to sleep, the other threads can compute their

result mask and see if they have to leave, or to wait for next tick*/

//ortp_message("scheduler: sleeping.");

timer->timer_do(); //等待一段时间 具体函数在下面

sched->time_+=sched->timer_inc;

}

/* when leaving the thread, stop the timer */

timer->timer_uninit();

return NULL;

}

void win_timer_do(void)

{

DWORD diff;

// If timer have expired while we where out of this method

// Try to run after lost time.

if (late_ticks > 0)

{

late_ticks--;

posix_timer_time+=TIME_INTERVAL;

return;

}

diff = GetTickCount() - posix_timer_time - offset_time;

if( diff>TIME_INTERVAL && (diff<(1<<31)))

{

late_ticks = diff/TIME_INTERVAL;

ortp_warning("we must catchup %i ticks.",late_ticks);

return;

}

WaitForSingleObject(TimeEvent,TIME_TIMEOUT);

return;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: