您的位置:首页 > 其它

red5源码分析---2

2016-04-21 21:31 519 查看

red5源码分析—服务器连接

和red5客户端类似,red5服务器端也使用了mina框架进行底层Socket的封装。另外,red5服务器端使用Spring管理相关的bean,在分析red5服务器端如何处理客户端的连接之前,下面先简单看一下red5 standalone的启动流程。

一. red5服务器启动

red5服务器端代码的main函数在org.red5.server.Bootstrap中,main函数加载org.red5.server.Launcher,Launcher通过Spring解析red5.xml文件,red5.xml文件会继续根据red5-common.xml和red5-core.xml创建相应的bean,并根据red5.globals文件创建一个GlobalScope,后面会提到。

本文从red5-core.xml文件中构造的RTMPMinaTransport说起,该类对应的Spring配置文件如下,

<bean id="rtmpTransport" class="org.red5.server.net.rtmp.RTMPMinaTransport" init-method="start" destroy-method="stop">
<property name="ioHandler" ref="rtmpMinaIoHandler" />
<property name="addresses">
<list>
<value>${rtmp.host}:${rtmp.port}</value>
</list>
</property>
<property name="ioThreads" value="${rtmp.io_threads}" />
<property name="sendBufferSize" value="${rtmp.send_buffer_size}" />
<property name="receiveBufferSize" value="${rtmp.receive_buffer_size}" />
<property name="trafficClass" value="${rtmp.traffic_class}" />
<property name="backlog" value="${rtmp.backlog}" />
<property name="tcpNoDelay" value="${rtmp.tcp_nodelay}" />
<property name="keepAlive" value="${rtmp.tcp_keepalive}" />
<property name="thoughputCalcInterval" value="${rtmp.thoughput_calc_interval}" />
<property name="enableDefaultAcceptor" value="${rtmp.default_acceptor}" />
<property name="initialPoolSize" value="${rtmp.initial_pool_size}" />
<property name="maxPoolSize" value="${rtmp.max_pool_size}" />
<property name="maxProcessorPoolSize" value="${rtmp.max_processor_pool_size}" />
<property name="executorKeepAliveTime" value="${rtmp.executor_keepalive_time}" />
<property name="minaPollInterval" value="${jmx.mina.poll.interval}" />
<property name="enableMinaMonitor" value="${jmx.mina.monitor.enable}" />
<property name="enableMinaLogFilter" value="${mina.logfilter.enable}" />
</bean>


根据RTMPMinaTransport的配置,Spring框架会调用RTMPMinaTransport的start函数进行初始化,代码如下,

public void start() throws Exception {
initIOHandler();
IoBuffer.setUseDirectBuffer(!useHeapBuffers);
if (useHeapBuffers) {
IoBuffer.setAllocator(new SimpleBufferAllocator());
}
if (enableDefaultAcceptor) {
acceptor = new NioSocketAcceptor(ioThreads);
} else {
SimpleIoProcessorPool<NioSession> pool = new SimpleIoProcessorPool<NioSession>(NioProcessor.class, maxProcessorPoolSize);
executor = new ThreadPoolExecutor(initialPoolSize, maxPoolSize, executorKeepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(Short.MAX_VALUE));
acceptor = new NioSocketAcceptor(executor, pool);
}
if (enableMinaLogFilter) {
DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
LoggingFilter logFilter = new LoggingFilter(RTMPMinaTransport.class);
chain.addLast("logger", logFilter);
}
acceptor.setCloseOnDeactivation(true);
acceptor.setHandler(ioHandler);
acceptor.setBacklog(backlog);
SocketSessionConfig sessionConf = acceptor.getSessionConfig();
sessionConf.setReuseAddress(true);
sessionConf.setTcpNoDelay(tcpNoDelay);
sessionConf.setSendBufferSize(sendBufferSize);
sessionConf.setReceiveBufferSize(receiveBufferSize);
sessionConf.setMaxReadBufferSize(receiveBufferSize);
sessionConf.setThroughputCalculationInterval(thoughputCalcInterval);
sessionConf.setReaderIdleTime(readerIdleTime);
sessionConf.setKeepAlive(keepAlive);
if (trafficClass == -1) {

} else {
sessionConf.setTrafficClass(trafficClass);
}
acceptor.setReuseAddress(true);
try {
Set<InetSocketAddress> socketAddresses = new HashSet<InetSocketAddress>();
for (String addr : addresses) {
if (addr.indexOf(':') != -1) {
String[] parts = addr.split(":");
socketAddresses.add(new InetSocketAddress(parts[0], Integer.valueOf(parts[1])));
} else {
socketAddresses.add(new InetSocketAddress(addr, 1935));
}
}
acceptor.bind(socketAddresses);
String cName = this.getClass().getName();
if (cName.indexOf('.') != -1) {
cName = cName.substring(cName.lastIndexOf('.')).replaceFirst("[\\.]", "");
}
if (enableMinaMonitor) {
stats = new IoServiceStatistics((AbstractIoService) acceptor);
stats.setThroughputCalculationInterval(minaPollInterval);
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try {
serviceManagerObjectName = new ObjectName("org.red5.server:type=RTMPMinaTransport");
mbs.registerMBean(new StandardMBean(this, RTMPMinaTransportMXBean.class, true), serviceManagerObjectName);
} catch (Exception e) {

}
}
} catch (Exception e) {

}
}


start函数主要是mina框架的初始化,开头的initIOHandler函数用于初始化mina框架中的IoHandler,根据red5-core.xml,其已经被初始化为RTMPMinaIoHandler,并设置其成员变量handler为RTMPHandler,和上一章分析类似,RTMPMinaIoHandler属于mina框架调用,RTMPHandler属于RTMP业务调用,两个handler执行的逻辑不同。

<bean id="rtmpHandler" class="org.red5.server.net.rtmp.RTMPHandler">
<property name="server" ref="red5.server" />
<property name="statusObjectService" ref="statusObjectService" />
</bean>

<bean id="rtmpMinaIoHandler" class="org.red5.server.net.rtmp.RTMPMinaIoHandler">
<property name="handler" ref="rtmpHandler" />
</bean>


回到start函数中,再往下设置了IoBuffer,构造NioSocketAcceptor用于监听客户端连接,并对其进行相应的设置,最后调用NioSocketAcceptor的bind开始监听了。

二. sessionCreated和sessionOpened

当有客户端连接到来时,mina框架会回调RTMPMinaIoHandler的sessionCreated和sessionOpened函数,下面来看,

public void sessionCreated(IoSession session) throws Exception {
session.getFilterChain().addFirst("rtmpeFilter", new RTMPEIoFilter());
RTMPMinaConnection conn = createRTMPMinaConnection();
conn.setIoSession(session);
conn.setHandler(handler);
session.setAttribute(RTMPConnection.RTMP_SESSION_ID, conn.getSessionId());
session.setAttribute(RTMPConnection.RTMP_HANDSHAKE, new InboundHandshake());
}


sessionCreated函数首先添加了RTMPEIoFilter过滤器用于握手,接着通过createRTMPMinaConnection创建一个RTMPMinaConnection,然后对RTMPMinaConnection进行相应的设置,最后向session中添加InboundHandshake用于服务器端的握手。

下面来看sessionOpened函数,

public void sessionOpened(IoSession session) throws Exception {
String sessionId = (String) session.getAttribute(RTMPConnection.RTMP_SESSION_ID);
RTMPConnManager connManager = (RTMPConnManager) RTMPConnManager.getInstance();
session.setAttribute(RTMPConnection.RTMP_CONN_MANAGER, new WeakReference<IConnectionManager<RTMPConnection>>(connManager));
RTMPMinaConnection conn = (RTMPMinaConnection) connManager.getConnectionBySessionId(sessionId);
handler.connectionOpened(conn);
}


sessionOpened函数首先获取RTMPConnManager,然后通过RTMPConnManager获得前面在sessionCreated函数中创建的RTMPMinaConnection,并调用RTMPHandler的connectionOpened函数,

public void connectionOpened(RTMPConnection conn) {
conn.open();
conn.startWaitForHandshake();
}


connectionOpened函数依次调用了RTMPMinaConnection的open和startWaitForHandshake函数,open函数为空,startWaitForHandshake定义在RTMPMinaConnection的父类RTMPConnection中,代码如下,

public void startWaitForHandshake() {
try {
waitForHandshakeTask = scheduler.schedule(new WaitForHandshakeTask(), new Date(System.currentTimeMillis() + maxHandshakeTimeout));
} catch (TaskRejectedException e) {

}
}


startWaitForHandshake函数启动了一个WaitForHandshakeTask线程,并延迟maxHandshakeTimeout时间启动,maxHandshakeTimeout的默认值为10秒。从客户端与服务器建立TCP连接完成开始,如果10秒内没有完成RTMP的连接,red5服务器端就会关闭该连接。看一下WaitForHandshakeTask执行的任务就一目了然了,

public void run() {
if (state.getState() != RTMP.STATE_CONNECTED) {
onInactive();
}
}
protected void onInactive() {
close();
}


刚刚在sessionOpened中提到过,该函数会获取RTMPConnManager,RTMPConnManager是一个单例,用来管理服务器端的连接,其中有一段静态代码块值得一看,

{
executor.scheduleAtFixedRate(new Runnable() {
public void run() {
int closedConnections = 0;
Collection<RTMPConnection> allConns = getAllConnections();
for (RTMPConnection conn : allConns) {
String sessionId = conn.getSessionId();
RTMP rtmp = conn.getState();
switch (rtmp.getState()) {
case RTMP.STATE_DISCONNECTED:
case RTMP.STATE_DISCONNECTING:
removeConnection(sessionId);
break;
default:
long ioTime = 0L;
IoSession session = conn.getIoSession();
if (conn instanceof RTMPMinaConnection) {
ioTime = System.currentTimeMillis() - session.getLastIoTime();
} else if (conn instanceof RTMPTConnection) {
ioTime = System.currentTimeMillis() - ((RTMPTConnection) conn).getLastDataReceived();
}
if (ioTime >= conn.maxInactivity) {
if (session != null) {
if (session.isConnected()) {
session.getWriteRequestQueue().clear(session);
}
}
conn.onInactive();
if (!conn.isClosed()) {

} else {
closedConnections++;
}
}
}
}
if (closedConnections > 0) {
System.gc();
}
}
}, 7000, 30000, TimeUnit.MILLISECONDS);
}


可以看出,RTMPConnManager在初始化时会创建一个线程,该线程每30秒运行一次,用来判断服务器端管理的连接是否过期。判断的方式是检测每个连接有多长时间没有被访问了,如果超过了系统设定的最大时间,就关闭该连接。

下一章开始分析red5客户端的握手过程。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  源码 red5 服务器