您的位置:首页 > 其它

一个.net客户端通讯框架的设计(三)---设计IO连接

2010-12-09 12:02 357 查看
IAsyncIOService(对应代码里的IAsyncConnector)

这是一个异步IO服务接口,含有一下几个方法。

AsyncConnectorpublic class AsyncConnector : IAsyncConnector
{
#region async sockets
private Socket _socket;
#endregion
#region reused socketasynceventargs
private GenericPool<SocketAsyncEventArgs> _asyncSendArgsPool = new GenericPool<SocketAsyncEventArgs>();
private GenericPool<SocketAsyncEventArgs> _asyncRecvArgsPool = new GenericPool<SocketAsyncEventArgs>();
#endregion
#region fields
//private IoSession session = new IoSession();
private ConnectionStatus _connectionStatus = ConnectionStatus.Unused;
#endregion
#region ctors
public AsyncConnector()
{
var sendAspectHandler = new SocketSendEventArgsAspectHandler();
sendAspectHandler.OnClearupEventReference = (p) =>
{
p.Completed -= _socketArgs_Completed;
};
_asyncSendArgsPool
= new GenericPool<SocketAsyncEventArgs>(GenericPool<SocketAsyncEventArgs>.DEFAULT_MAXSIZE, sendAspectHandler);

var recvAspectHandler = new SocketRecvEventArgsAspectHandler();
recvAspectHandler.OnInitializeEvent += (p) =>
p.Completed += _socketArgs_Completed;
recvAspectHandler.OnClearupEventReference = (p) =>
p.Completed -= _socketArgs_Completed;
_asyncRecvArgsPool
= new GenericPool<SocketAsyncEventArgs>(GenericPool <SocketAsyncEventArgs>.DEFAULT_MAXSIZE, recvAspectHandler);
}
#endregion
#region Property
/*public IoSession Session
{
get
{
return session;
}
}*/
public event ConnectionStatusChangedEventHandler OnConnectionStatusChanged;
public IoHandler Handler
{
get;
set;
}
public IProtocolFactory ProtocolFactory
{
get;
set;
}
#endregion
public void Connect(EndPoint endPoint)
{
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
SocketAsyncEventArgs _socketArgs = new SocketAsyncEventArgs();
_socketArgs.Completed += new EventHandler<SocketAsyncEventArgs>(_socketArgs_Connected);
_socketArgs.RemoteEndPoint = endPoint;
ConnectionStatus = IO.ConnectionStatus.Connecting;
if (!_socket.ConnectAsync(_socketArgs))
{
_socketArgs_Connected(_socket, _socketArgs);
}
}

public ConnectionStatus ConnectionStatus
{
get
{
return _connectionStatus;
}
set
{
if (value == _connectionStatus)
return;
if (OnConnectionStatusChanged != null)
OnConnectionStatusChanged(_connectionStatus, value);
_connectionStatus = value;
}
}

public void Close()
{
_socket.Close();
ConnectionStatus = IO.ConnectionStatus.Closed;
}

public void Close(bool isImmediately)
{
_socket.Close();
ConnectionStatus = IO.ConnectionStatus.Closed;
}

public void Abort()
{
_socket.Close();
ConnectionStatus = IO.ConnectionStatus.Closed;
}
public void Dispose()
{
if (_socket != null)
{
if (_socket.Connected)
{
_socket.Shutdown(SocketShutdown.Both);
_socket.Close();
_socket.Dispose();
_socket = null;
ConnectionStatus = IO.ConnectionStatus.Closed;
}
_asyncRecvArgsPool.Empty();
_asyncSendArgsPool.Empty();
}
//session.Dispose();
}
public void Send<T>(T msg)
{
var args = _asyncSendArgsPool.Resolve();
byte[] ret = ProtocolFactory.GetEncoder<T>().Encode(msg);
args.SetBuffer(ret, 0, ret.Length);
_socket.SendAsync(args);
}
private void _socketArgs_Connected(object sender, SocketAsyncEventArgs args)
{
args.Completed -= _socketArgs_Connected;
if (args.SocketError == SocketError.Success)
{
ConnectionStatus = IO.ConnectionStatus.Connected;
var recvArgs = _asyncRecvArgsPool.Resolve();
if (!_socket.ReceiveAsync(recvArgs))
_socketArgs_Received(sender, recvArgs);
}
else
{
ConnectionStatus = IO.ConnectionStatus.Unavailable;
}
}
private void _socketArgs_Completed(object sender, SocketAsyncEventArgs args)
{
switch(args.LastOperation)
{
case SocketAsyncOperation.Send:
_socketArgs_Sended(sender, args);
break;
case SocketAsyncOperation.Receive:
_socketArgs_Received(sender, args);
break;
}
}
private void _socketArgs_Received(object sender, SocketAsyncEventArgs args)
{
var socket = sender as Socket;
if (args.SocketError == SocketError.Success)
{
//check if the buffer is full used(ByteTransfered, OffSet)
var bufferStatus = args.UserToken as BufferStatus;
bufferStatus.Buffer.Append(args.Buffer, args.Offset/*bufferStatus.ReadOffset*/, args.BytesTransferred);
bufferStatus.IncreaseTransfered(args.BytesTransferred);
var decoder = ProtocolFactory.GetDecoder();
var data = bufferStatus.Buffer.copyAvaliableBytes();
var result = decoder.Decodable(data);
switch (result)
{
case DecodeResult.NEED_DATA:
socket.ReceiveAsync(args);
break;
case DecodeResult.NOT_OK:
throw new BadImageFormatException();
case DecodeResult.OK:
int usedSize;
object netMsg = null;
decoder.Decode(data, out usedSize, out netMsg);
ThreadPool.QueueUserWorkItem(Handler.MessageReceived, netMsg);
bufferStatus.Buffer.Reset();
bufferStatus.Buffer.Append(data, usedSize, data.Length - usedSize);
socket.ReceiveAsync(args);
break;
}
}
}
private void _socketArgs_Sended(object sender, SocketAsyncEventArgs args)
{
if (args.SocketError == SocketError.Success)
{
}
else
{
}
_asyncSendArgsPool.Put(args);
}
}

代码下载 http://files.cnblogs.com/wJiang/ClientConnLib.rar
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: