red5源码分析---2
2016-04-21 21:31
519 查看
red5源码分析—服务器连接
和red5客户端类似,red5服务器端也使用了mina框架进行底层Socket的封装。另外,red5服务器端使用Spring管理相关的bean,在分析red5服务器端如何处理客户端的连接之前,下面先简单看一下red5 standalone的启动流程。一. red5服务器启动
red5服务器端代码的main函数在org.red5.server.Bootstrap中,main函数加载org.red5.server.Launcher,Launcher通过Spring解析red5.xml文件,red5.xml文件会继续根据red5-common.xml和red5-core.xml创建相应的bean,并根据red5.globals文件创建一个GlobalScope,后面会提到。本文从red5-core.xml文件中构造的RTMPMinaTransport说起,该类对应的Spring配置文件如下,
<bean id="rtmpTransport" class="org.red5.server.net.rtmp.RTMPMinaTransport" init-method="start" destroy-method="stop"> <property name="ioHandler" ref="rtmpMinaIoHandler" /> <property name="addresses"> <list> <value>${rtmp.host}:${rtmp.port}</value> </list> </property> <property name="ioThreads" value="${rtmp.io_threads}" /> <property name="sendBufferSize" value="${rtmp.send_buffer_size}" /> <property name="receiveBufferSize" value="${rtmp.receive_buffer_size}" /> <property name="trafficClass" value="${rtmp.traffic_class}" /> <property name="backlog" value="${rtmp.backlog}" /> <property name="tcpNoDelay" value="${rtmp.tcp_nodelay}" /> <property name="keepAlive" value="${rtmp.tcp_keepalive}" /> <property name="thoughputCalcInterval" value="${rtmp.thoughput_calc_interval}" /> <property name="enableDefaultAcceptor" value="${rtmp.default_acceptor}" /> <property name="initialPoolSize" value="${rtmp.initial_pool_size}" /> <property name="maxPoolSize" value="${rtmp.max_pool_size}" /> <property name="maxProcessorPoolSize" value="${rtmp.max_processor_pool_size}" /> <property name="executorKeepAliveTime" value="${rtmp.executor_keepalive_time}" /> <property name="minaPollInterval" value="${jmx.mina.poll.interval}" /> <property name="enableMinaMonitor" value="${jmx.mina.monitor.enable}" /> <property name="enableMinaLogFilter" value="${mina.logfilter.enable}" /> </bean>
根据RTMPMinaTransport的配置,Spring框架会调用RTMPMinaTransport的start函数进行初始化,代码如下,
public void start() throws Exception { initIOHandler(); IoBuffer.setUseDirectBuffer(!useHeapBuffers); if (useHeapBuffers) { IoBuffer.setAllocator(new SimpleBufferAllocator()); } if (enableDefaultAcceptor) { acceptor = new NioSocketAcceptor(ioThreads); } else { SimpleIoProcessorPool<NioSession> pool = new SimpleIoProcessorPool<NioSession>(NioProcessor.class, maxProcessorPoolSize); executor = new ThreadPoolExecutor(initialPoolSize, maxPoolSize, executorKeepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(Short.MAX_VALUE)); acceptor = new NioSocketAcceptor(executor, pool); } if (enableMinaLogFilter) { DefaultIoFilterChainBuilder chain = acceptor.getFilterChain(); LoggingFilter logFilter = new LoggingFilter(RTMPMinaTransport.class); chain.addLast("logger", logFilter); } acceptor.setCloseOnDeactivation(true); acceptor.setHandler(ioHandler); acceptor.setBacklog(backlog); SocketSessionConfig sessionConf = acceptor.getSessionConfig(); sessionConf.setReuseAddress(true); sessionConf.setTcpNoDelay(tcpNoDelay); sessionConf.setSendBufferSize(sendBufferSize); sessionConf.setReceiveBufferSize(receiveBufferSize); sessionConf.setMaxReadBufferSize(receiveBufferSize); sessionConf.setThroughputCalculationInterval(thoughputCalcInterval); sessionConf.setReaderIdleTime(readerIdleTime); sessionConf.setKeepAlive(keepAlive); if (trafficClass == -1) { } else { sessionConf.setTrafficClass(trafficClass); } acceptor.setReuseAddress(true); try { Set<InetSocketAddress> socketAddresses = new HashSet<InetSocketAddress>(); for (String addr : addresses) { if (addr.indexOf(':') != -1) { String[] parts = addr.split(":"); socketAddresses.add(new InetSocketAddress(parts[0], Integer.valueOf(parts[1]))); } else { socketAddresses.add(new InetSocketAddress(addr, 1935)); } } acceptor.bind(socketAddresses); String cName = this.getClass().getName(); if (cName.indexOf('.') != -1) { cName = cName.substring(cName.lastIndexOf('.')).replaceFirst("[\\.]", ""); } if (enableMinaMonitor) { stats = new IoServiceStatistics((AbstractIoService) acceptor); stats.setThroughputCalculationInterval(minaPollInterval); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { serviceManagerObjectName = new ObjectName("org.red5.server:type=RTMPMinaTransport"); mbs.registerMBean(new StandardMBean(this, RTMPMinaTransportMXBean.class, true), serviceManagerObjectName); } catch (Exception e) { } } } catch (Exception e) { } }
start函数主要是mina框架的初始化,开头的initIOHandler函数用于初始化mina框架中的IoHandler,根据red5-core.xml,其已经被初始化为RTMPMinaIoHandler,并设置其成员变量handler为RTMPHandler,和上一章分析类似,RTMPMinaIoHandler属于mina框架调用,RTMPHandler属于RTMP业务调用,两个handler执行的逻辑不同。
<bean id="rtmpHandler" class="org.red5.server.net.rtmp.RTMPHandler"> <property name="server" ref="red5.server" /> <property name="statusObjectService" ref="statusObjectService" /> </bean> <bean id="rtmpMinaIoHandler" class="org.red5.server.net.rtmp.RTMPMinaIoHandler"> <property name="handler" ref="rtmpHandler" /> </bean>
回到start函数中,再往下设置了IoBuffer,构造NioSocketAcceptor用于监听客户端连接,并对其进行相应的设置,最后调用NioSocketAcceptor的bind开始监听了。
二. sessionCreated和sessionOpened
当有客户端连接到来时,mina框架会回调RTMPMinaIoHandler的sessionCreated和sessionOpened函数,下面来看,public void sessionCreated(IoSession session) throws Exception { session.getFilterChain().addFirst("rtmpeFilter", new RTMPEIoFilter()); RTMPMinaConnection conn = createRTMPMinaConnection(); conn.setIoSession(session); conn.setHandler(handler); session.setAttribute(RTMPConnection.RTMP_SESSION_ID, conn.getSessionId()); session.setAttribute(RTMPConnection.RTMP_HANDSHAKE, new InboundHandshake()); }
sessionCreated函数首先添加了RTMPEIoFilter过滤器用于握手,接着通过createRTMPMinaConnection创建一个RTMPMinaConnection,然后对RTMPMinaConnection进行相应的设置,最后向session中添加InboundHandshake用于服务器端的握手。
下面来看sessionOpened函数,
public void sessionOpened(IoSession session) throws Exception { String sessionId = (String) session.getAttribute(RTMPConnection.RTMP_SESSION_ID); RTMPConnManager connManager = (RTMPConnManager) RTMPConnManager.getInstance(); session.setAttribute(RTMPConnection.RTMP_CONN_MANAGER, new WeakReference<IConnectionManager<RTMPConnection>>(connManager)); RTMPMinaConnection conn = (RTMPMinaConnection) connManager.getConnectionBySessionId(sessionId); handler.connectionOpened(conn); }
sessionOpened函数首先获取RTMPConnManager,然后通过RTMPConnManager获得前面在sessionCreated函数中创建的RTMPMinaConnection,并调用RTMPHandler的connectionOpened函数,
public void connectionOpened(RTMPConnection conn) { conn.open(); conn.startWaitForHandshake(); }
connectionOpened函数依次调用了RTMPMinaConnection的open和startWaitForHandshake函数,open函数为空,startWaitForHandshake定义在RTMPMinaConnection的父类RTMPConnection中,代码如下,
public void startWaitForHandshake() { try { waitForHandshakeTask = scheduler.schedule(new WaitForHandshakeTask(), new Date(System.currentTimeMillis() + maxHandshakeTimeout)); } catch (TaskRejectedException e) { } }
startWaitForHandshake函数启动了一个WaitForHandshakeTask线程,并延迟maxHandshakeTimeout时间启动,maxHandshakeTimeout的默认值为10秒。从客户端与服务器建立TCP连接完成开始,如果10秒内没有完成RTMP的连接,red5服务器端就会关闭该连接。看一下WaitForHandshakeTask执行的任务就一目了然了,
public void run() { if (state.getState() != RTMP.STATE_CONNECTED) { onInactive(); } } protected void onInactive() { close(); }
刚刚在sessionOpened中提到过,该函数会获取RTMPConnManager,RTMPConnManager是一个单例,用来管理服务器端的连接,其中有一段静态代码块值得一看,
{ executor.scheduleAtFixedRate(new Runnable() { public void run() { int closedConnections = 0; Collection<RTMPConnection> allConns = getAllConnections(); for (RTMPConnection conn : allConns) { String sessionId = conn.getSessionId(); RTMP rtmp = conn.getState(); switch (rtmp.getState()) { case RTMP.STATE_DISCONNECTED: case RTMP.STATE_DISCONNECTING: removeConnection(sessionId); break; default: long ioTime = 0L; IoSession session = conn.getIoSession(); if (conn instanceof RTMPMinaConnection) { ioTime = System.currentTimeMillis() - session.getLastIoTime(); } else if (conn instanceof RTMPTConnection) { ioTime = System.currentTimeMillis() - ((RTMPTConnection) conn).getLastDataReceived(); } if (ioTime >= conn.maxInactivity) { if (session != null) { if (session.isConnected()) { session.getWriteRequestQueue().clear(session); } } conn.onInactive(); if (!conn.isClosed()) { } else { closedConnections++; } } } } if (closedConnections > 0) { System.gc(); } } }, 7000, 30000, TimeUnit.MILLISECONDS); }
可以看出,RTMPConnManager在初始化时会创建一个线程,该线程每30秒运行一次,用来判断服务器端管理的连接是否过期。判断的方式是检测每个连接有多长时间没有被访问了,如果超过了系统设定的最大时间,就关闭该连接。
下一章开始分析red5客户端的握手过程。
相关文章推荐
- 小心服务器内存居高不下的元凶--WebAPI服务
- 从源码安装Mysql/Percona 5.5
- 运维入门
- 利用开源软件打造自己的全功能远程工具
- Linux5.9无人值守安装
- 数据中心和云未来的十二大趋势
- 用vsftp快速搭建ftp服务器
- Linux快速构建apache web服务器
- 服务器监控策略浅谈
- 如何降低服务器采购成本 原理分析
- 建议的服务器分区办法
- 服务器托管六大优势分析
- Erlang实现的一个Web服务器代码实例
- 服务器技术全面解析
- 保护DNS服务器的几点方法小结
- 我国成为全球第二大服务器消费国
- 服务器 安全检查要点[星外提供]
- 服务器应用自动重新启动IIS批处理[原创]_DOS/BAT_脚本之家
- FTP 服务器关于权限的问题