您的位置:首页 > 其它

分析一个通用的rtsp server实现过程基础模块

2017-08-05 16:10 741 查看
本文分析先分析common模块:
common 模块有tcp,udp,socket,mutex类以及cyclebuffer类,StdAfx类

1. mutux类实现,这里的类实现的方法有点冗余,源码如下,

class TLock
{
public:
TLock( void )
{
pthread_mutexattr_t mattr;
pthread_mutexattr_init( &mattr );
pthread_mutex_init( &m_mutex, &mattr );
};

virtual ~TLock( void )
{
pthread_mutex_lock( &m_mutex );
pthread_mutex_unlock( &m_mutex );
pthread_mutex_destroy( &m_mutex );
};

virtual void Lock( void )
{
pthread_mutex_lock( &m_mutex );
};

virtual void UnLock( void )
{
pthread_mutex_unlock( &m_mutex );
};

protected:
private:
pthread_mutex_t m_mutex;
};

class ATLock
{
public:
ATLock( TLock* pTLock )
{
m_pTLock = pTLock;
m_pTLock->Lock();
};

ATLock( TLock& lock )
{
m_pTLock = &lock;
m_pTLock->Lock();
};

virtual ~ATLock()
{
m_pTLock->UnLock();
};

protected:

TLock* m_pTLock;
};
这里的代码需要建议修改,建议可以参考比较规范的定义类的方式,可以复用和单独提供处理作为库,给其他模块调用,建议用下面的方式,具体的实现,需要读者来参考上面的补上相应的函数就ok了,就不贴出了!

class thread_mutex
{
public:
thread_mutex();
~thread_mutex();
public:
void lock();
void unlock();
private:

pthread_mutex_t m_mutex;
};


2. Cyclebuffer类,这个作为一个环形缓冲,可以参考前面我写的的java的双缓冲文章,基本的思路都是有的,这里直接给出另外的write和read的C++实现方式,这里有个技巧需要重新计算get_free_space的空间,然后重新给读和写的位置赋值。

write函数如下:

int cycle_buffer::write( const unsigned char* buffer, int len )
{
unsigned int free_space_size = 0;
unsigned int writed_size = len;

/* 修改获取相应的space 的信息*/
unsigned char* pRPos = NULL;
unsigned char* pWPos = NULL;
free_space_size = get_free_space( &pRPos, &pWPos );

if ( free_space_size < len )
{
return 0;
}

if ( m_flag_buffering_data_ok != 1 )
{
//没有缓冲满,需要进行计数
m_buffering_write_count += len;

if ( m_buffering_write_count > m_bufferingsize )
{
//已经缓冲满
m_flag_buffering_data_ok = 1;
}
}

if ( ( m_cb.end - pWPos + 1 ) < len )
{
/*出现翻转的情况*/
writed_size = m_cb.end - pWPos + 1;
memcpy( pWPos, buffer, writed_size );
memcpy( m_cb.begin, buffer + writed_size, len - writed_size );
}
else
{
memcpy( pWPos, buffer, writed_size );
}

int offset = ( ( pWPos - m_cb.begin ) + len ) % m_cb.size;
m_cb.wpos = m_cb.begin + offset;

return len;
}

read函数也是一样的,和上面一样的架构!
int cycle_buffer::read( unsigned char* buffer, int len )
{
if ( m_flag_buffering_data_ok != 1 )
{
//没有缓冲满,无法进行读
return 0;
}

unsigned int valid_data_size = 0;
unsigned int read_size = len;

/* 修改获取相应的space 的信息,主要的原因就是有一个是需要空出来的*/
unsigned char* pRPos = NULL;
unsigned char* pWPos = NULL;
valid_data_size = m_cb.size - get_free_space( &pRPos, &pWPos ) - 1;

if ( valid_data_size < len )
{
return 0;
}
if ( ( m_cb.end - pRPos + 1 ) < len )
{
/*出现翻转的情况*/
read_size = m_cb.end - pRPos + 1;
memcpy( buffer, pRPos, read_size );
memcpy( buffer + read_size, m_cb.begin, len - read_size );
}
else
{
memcpy( buffer, pRPos, read_size );
}
int offset = ( ( pRPos - m_cb.begin ) + len ) % m_cb.size;
m_cb.rpos = m_cb.begin + offset;
return len;
}


从这两段的代码逻辑可以看出,简单的问题复杂化了,其实只要根据write,read和start,end的位置就可以写出具体的读写buffer了,参考前面java ringbuffer修改,相信也不是太难。当然这里可以看出,有一个地方需要锁的地方,就是:

get_free_space


这个函数,read和write不同线程,同时调用的话,会打乱write和read的位置,里面最好加一把锁!

3. tcp/ip 相关base类:

直接看bass socket类,这里才是核心的架构,

class Socket
{
public:
virtual int Write( PBYTE pBuffer, int writeSize, UINT nTimeOut = 2000000 );  // 2 sec
virtual int Read( BYTE* pBuffer, int readSize, UINT nTimeOut = 2000000 ); // 2 sec
protected:
int Select( int mode, int timeoutUsec );
SOCKET m_Socket;
SOCKADDR_IN m_BindAddr;
SOCKADDR_IN m_ConnectAddr;
};
这里可以看出,所有的网络通信是继承这个基础类,不过从整体架构感觉这里的socket类有点不规范,

如果需要重构的话,最好这里全部是虚函数,然后重新写一个类来实现,如select函数的东西,将socket里面的基本通信方式全部补上,这里比较合理。

这里m_socket就是上层打开socekt传过来的,因为所有后面的读写全部调用这里的方式实现,统一节省资源,

同时不需要锁,因为m_socket唯一性决定!

然后看tcp类的实现方式:

class Tcp : public Socket
{
public:

virtual BOOL Open( PCSTR bindIp = "", INT bindPort = 0 );

virtual BOOL Connect( PCSTR connectIp, INT connectPort );

protected:
BOOL m_isConnect;
};


这里只有open和connect函数,明显定义的不够好,并且从tcp.cpp可以看出:

open函数里面包含了bind的函数,耦合性过强,建议分开单独成函数,使架构更加清晰!

接下来看下UDP类的实现,因为数据都是rtp包发送,而rtp是通过udp来实现,因此需要实现下面的方式,来交互数据

class Udp : public Socket
{
public:
virtual BOOL Open( string bindIp = "", int bindPort = 0 );
virtual BOOL Connect( string connectIp, int connectPort );
virtual int Read( BYTE* pBuffer, UINT16 bufferSize, UINT nTimeOut = 500000 );
virtual int Write( PBYTE pBuffer, UINT16 bufferSize, UINT nTimeOut = 500000 );
protected:
BOOL m_isConnect;
};


这里就read和write函数会call子类的读写函数,当然open和上面的一样的模式,建议后期修改!

当然tcp和udp的在open的都是,可以设置一系列等参数,

用setsockopt来设置,这里就不详细的介绍了!

4. 当然为了保存兼容想,需要单独定义个类,StdAfx类

用来兼容命名的统一性,如下:

typedef unsigned char BYTE;
typedef unsigned short WORD;
typedef BYTE* PBYTE;
typedef const char* PCSTR;
typedef unsigned long ULONG;
typedef void* PVOID;


因此,这个架构就将上面的基础组件搭建好,后面就需要根据这个做具体的逻辑处理任务!

接下来,进一步介绍rtsp的相应整体架构流程,对于太细的细节不做过多的详述!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐