您的位置:首页 > 理论基础 > 计算机网络

akka.io的基本用法

2014-11-27 13:38 225 查看
akka.io的api已经非常非常简单了, 实在很难挑剔.  如果用它来做单进程的游戏服务器, 基本上分成三个步骤就可以完成了.

1. akka.io的环境初始化, 包括了tcp extension的初始化. 

2. 绑定一个端口, 并将这个端口上的事件交给某个actor处理, 如连接到来事件.

3. 有连接到来时将其指派给某个业务actor处理, 接下来这个业务actor就负责自己身上的所有事件了, 如消息到来事件.

-------------------------------------------------------

1.  环境初始化.

/** 服务器启动. */
public final static boolean init()
{
try
{
AkMgr.sys = ActorSystem.create();
TcpExt ext = Tcp.get(AkMgr.sys);
ActorRef ref = ext.manager();
//
ActorRef srv = AkMgr.sys.actorOf(Props.create(Srv.class), Srv.class.getName());
InetSocketAddress addr = new InetSocketAddress("0.0.0.0", 20001);
ref.tell(TcpMessage.bind(srv, addr, 0x10000, AkMgr.setOpt(), false), srv);
return true;
} catch (Exception e)
{
Log.error(Log.trace(e));
return false;
}
}


/** 服务器端口套接字选项. */
private static final List<Inet.SocketOption> setOpt()
{
List<Inet.SocketOption> options = new ArrayList<Inet.SocketOption>();
options.add(TcpSO.reuseAddress(true));
options.add(TcpSO.sendBufferSize(0x400 * 10));
options.add(TcpSO.receiveBufferSize(0x400 * 10));
return options;
}


tcp的选项似乎有点奇怪, 在bind的时候指定, 然后它们将被应用到所有的连接上去. 换句话说, 是统一指定的. 并且只提供了下面几个选项(版本是2.3.7)

没有看到linger. timeout什么的.  看来akka觉得我们不需要其它的. 事实上通过测试结果来看, 也确实如此. 连接所关联的actor在stop的时候, 

立即就被销毁了. 也没有看到tcp缓冲区残留等待, time_wait状态, 换句话说, 连接上的关闭是暴力的. 如果想延迟关闭, 你可能得单独处理.



2. 监听端口事件处理actor.

上面的Srv就是负责处理监听端口上的事件类了, onReceive函数中重要的事件就是Tcp.Connected了, 它表示了一个连接到来.

public void onReceive(Object msg) throws Exception
{
try
{
if (msg instanceof Tcp.Bound)
this.boundEvn((Bound) msg);
else if (msg instanceof Tcp.Connected)
this.connEvn((Tcp.Connected) msg, getSender());
else
{
Log.error("it`s an unexpected message: %s\n", msg);
this.unhandled(msg);
}
} catch (Exception e)
{
Log.error(Log.trace(e));
}
}


3. 当连接到来的时候, 将业务actor Peer注册到连接上就可以了.

/** 连接到来事件. */
private void connEvn(Tcp.Connected msg, ActorRef sender)
{
if (Log.isTrace())
Log.trace("got a connection from peer: %s\n", msg.remoteAddress());
ActorRef peer = AkMgr.actorOf(Props.create(Peer.class, sender, msg.remoteAddress())); /* 构造一个peer. */
this.getSender().tell(TcpMessage.register(peer), this.getSelf()); /* 连接上的事件交予peer处理. */
}


下面是Peer类的消息入口,  可能的事件在Tcp.*中都有定义.  包括了报文送达, 连接断开, 和这里没有处理的send过载等事件.

/** 消息入口. */
public void onReceive(Object msg) throws Exception
{
if (msg instanceof Tcp.Received)
this.datEvn((Tcp.Received) msg);
else if (msg instanceof Tcp.ErrorClosed)
this.disEvn((Tcp.ErrorClosed) msg);
else
Log.error("it`s an unexpected message: %s\n", msg.getClass().getName());
}


这里有一个值得怀疑的地方是, 每个Tcp.Received消息是无法控制的. 相当于只要tcp缓冲区中有数据, akka就会把它拿出来, 以Tcp.Received的形式扔到应用上

来.由应用自己去decode消息流.  因为消息流是无边界的, 那么应用自己需要额外开辟一段内存来去缓存不完整的消息.

因此在注册一个Peer到连接上的时候(TcpMessage.register(peer)), 如果能让应用指定一个缓冲区是不是更好呢?  这样应用层和akka可以共同操作这片区域, 

从而减少来回的copy呢?

4. 关于性能. 

akka.io的性能是非常好的.  在10000个连接, 15000条消息/s, 8Mbytes/s的压力下. 占用了一个i5 4核cpu的120%, 也就是一个cpu多一点点.  gc也比较少,  内存的使用也很稳定.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  akka 网络游戏 框架