您的位置:首页 > 运维架构

OpenRTMFP/Cumulus Primer(8)CumulusServer主进程主循环分析

2012-04-15 22:26 831 查看

OpenRTMFP/Cumulus Primer(8)CumulusServer主进程主循环分析

作者:柳大·Poechant(钟超)
博客:Blog.CSDN.net/Poechant
邮箱:zhongchao.ustc#gmail.com (# -> @)
日期:April 14th, 2012
该主循环在
RTMFPServer::run(const volatile bool& terminate)
函数中。
RTMFPServer
覆盖
Startable
run(const volatile bool &terminate)
方法。
void RTMFPServer::run(const volatile bool& terminate) {

1 绑定地址

CumulusServer
的 IP 地址和端口:
SocketAddress address("0.0.0.0",_port);
    _socket.bind(address,true);
绑定
CumulusEdge
的 IP 地址和端口:
SocketAddress edgesAddress("0.0.0.0",_edgesPort);
    if (_edgesPort>0)
        _edgesSocket.bind(edgesAddress,true);
发送者(Client)的 IP 地址和端口:
SocketAddress sender;
    UInt8 buff[PACKETRECV_SIZE];
    int size = 0;

    while (!terminate) {

        bool stop=false;
        bool idle = realTime(stop);
        if(stop)
            break;

        _handshake.isEdges=false;

2
CumulusServer
接收数据:

CumulusServer
的 socket 有数据可读:
if (_socket.available() > 0) {
            try {
从 socket 读取:把数据存到 buff,
把发送者地址赋给 sender,
把所读长度返回给 size */
size = _socket.receiveFrom(buff,sizeof(buff),sender);

处理
CumulusServer
的 socket 产生的异常:
} catch(Exception& ex) {
                DEBUG("Main socket reception : %s",ex.displayText().c_str());
                _socket.close();
                _socket.bind(address,true);
                continue;
            }

2 如果
CumulusEdge
端口存在且 edge socket 可用。

CumulusEdge
的 socket 有数据可读:
} else if (_edgesPort > 0 && _edgesSocket.available() > 0) {
            try {
                size = _edgesSocket.receiveFrom(buff, sizeof(buff), sender);
                _handshake.isEdges = true;
            } catch(Exception& ex) {
                DEBUG("Main socket reception : %s", ex.displayText().c_str());
                _edgesSocket.close();
                _edgesSocket.bind(edgesAddress, true);
                continue;
            }
            Edge* pEdge = edges(sender);
            if (pEdge)
                pEdge->update();

3
CumulusServer
CumulusEdge
的 socket 都没有数据:

} else {
CumulusServer
空闲:
if (idle) {
主线程等待一秒。
Thread::sleep(1);
                if (!_timeLastManage.isElapsed(_freqManage)) {
Just middle session
                    if (_middle) {
Sessions::Iterator it;
for (it = _sessions.begin(); it != _sessions.end(); ++it) {
Middle* pMiddle = dynamic_cast<Middle*>(it->second);
if (pMiddle)
pMiddle->manage();
}
}
} else { _timeLastManage.update();
manage();
}
}
continue;
}

4 发送方的 ip 被禁:

if (isBanned(sender.host())) {
            INFO("Data rejected because client %s is banned",
                sender.host().toString().c_str());
            continue;
        }

5 数据包长度小于可能的最小值(12)

if (size < RTMFP_MIN_PACKET_SIZE) {
            ERROR("Invalid packet");
            continue;
        }

        PacketReader packet(buff,size);
        Session* pSession = findSession(RTMFP::Unpack(packet));

        if (!pSession)
            continue;

        if (!pSession->checked)
            _handshake.commitCookie(*pSession);
CumulusEdge
或者自己(
CumulusServer
)的 socket:
pSession->setEndPoint(_handshake.isEdges ? _edgesSocket : _socket,sender);
        pSession->receive(packet);
    }

    _handshake.clear();
    _sessions.clear();
    _socket.close();
    if (_edgesPort>0)
        _edgesSocket.close();

    if(_pCirrus) {
        delete _pCirrus;
        _pCirrus = NULL;
    }
}
-转载请注明来自柳大的CSDN博客:Blog.CSDN.net/Poechant-
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: