一个.net客户端通讯框架的设计(三)---设计IO连接
2010-12-09 12:02
357 查看
IAsyncIOService(对应代码里的IAsyncConnector)
这是一个异步IO服务接口,含有一下几个方法。
AsyncConnectorpublic class AsyncConnector : IAsyncConnector
{
#region async sockets
private Socket _socket;
#endregion
#region reused socketasynceventargs
private GenericPool<SocketAsyncEventArgs> _asyncSendArgsPool = new GenericPool<SocketAsyncEventArgs>();
private GenericPool<SocketAsyncEventArgs> _asyncRecvArgsPool = new GenericPool<SocketAsyncEventArgs>();
#endregion
#region fields
//private IoSession session = new IoSession();
private ConnectionStatus _connectionStatus = ConnectionStatus.Unused;
#endregion
#region ctors
public AsyncConnector()
{
var sendAspectHandler = new SocketSendEventArgsAspectHandler();
sendAspectHandler.OnClearupEventReference = (p) =>
{
p.Completed -= _socketArgs_Completed;
};
_asyncSendArgsPool
= new GenericPool<SocketAsyncEventArgs>(GenericPool<SocketAsyncEventArgs>.DEFAULT_MAXSIZE, sendAspectHandler);
var recvAspectHandler = new SocketRecvEventArgsAspectHandler();
recvAspectHandler.OnInitializeEvent += (p) =>
p.Completed += _socketArgs_Completed;
recvAspectHandler.OnClearupEventReference = (p) =>
p.Completed -= _socketArgs_Completed;
_asyncRecvArgsPool
= new GenericPool<SocketAsyncEventArgs>(GenericPool <SocketAsyncEventArgs>.DEFAULT_MAXSIZE, recvAspectHandler);
}
#endregion
#region Property
/*public IoSession Session
{
get
{
return session;
}
}*/
public event ConnectionStatusChangedEventHandler OnConnectionStatusChanged;
public IoHandler Handler
{
get;
set;
}
public IProtocolFactory ProtocolFactory
{
get;
set;
}
#endregion
public void Connect(EndPoint endPoint)
{
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
SocketAsyncEventArgs _socketArgs = new SocketAsyncEventArgs();
_socketArgs.Completed += new EventHandler<SocketAsyncEventArgs>(_socketArgs_Connected);
_socketArgs.RemoteEndPoint = endPoint;
ConnectionStatus = IO.ConnectionStatus.Connecting;
if (!_socket.ConnectAsync(_socketArgs))
{
_socketArgs_Connected(_socket, _socketArgs);
}
}
public ConnectionStatus ConnectionStatus
{
get
{
return _connectionStatus;
}
set
{
if (value == _connectionStatus)
return;
if (OnConnectionStatusChanged != null)
OnConnectionStatusChanged(_connectionStatus, value);
_connectionStatus = value;
}
}
public void Close()
{
_socket.Close();
ConnectionStatus = IO.ConnectionStatus.Closed;
}
public void Close(bool isImmediately)
{
_socket.Close();
ConnectionStatus = IO.ConnectionStatus.Closed;
}
public void Abort()
{
_socket.Close();
ConnectionStatus = IO.ConnectionStatus.Closed;
}
public void Dispose()
{
if (_socket != null)
{
if (_socket.Connected)
{
_socket.Shutdown(SocketShutdown.Both);
_socket.Close();
_socket.Dispose();
_socket = null;
ConnectionStatus = IO.ConnectionStatus.Closed;
}
_asyncRecvArgsPool.Empty();
_asyncSendArgsPool.Empty();
}
//session.Dispose();
}
public void Send<T>(T msg)
{
var args = _asyncSendArgsPool.Resolve();
byte[] ret = ProtocolFactory.GetEncoder<T>().Encode(msg);
args.SetBuffer(ret, 0, ret.Length);
_socket.SendAsync(args);
}
private void _socketArgs_Connected(object sender, SocketAsyncEventArgs args)
{
args.Completed -= _socketArgs_Connected;
if (args.SocketError == SocketError.Success)
{
ConnectionStatus = IO.ConnectionStatus.Connected;
var recvArgs = _asyncRecvArgsPool.Resolve();
if (!_socket.ReceiveAsync(recvArgs))
_socketArgs_Received(sender, recvArgs);
}
else
{
ConnectionStatus = IO.ConnectionStatus.Unavailable;
}
}
private void _socketArgs_Completed(object sender, SocketAsyncEventArgs args)
{
switch(args.LastOperation)
{
case SocketAsyncOperation.Send:
_socketArgs_Sended(sender, args);
break;
case SocketAsyncOperation.Receive:
_socketArgs_Received(sender, args);
break;
}
}
private void _socketArgs_Received(object sender, SocketAsyncEventArgs args)
{
var socket = sender as Socket;
if (args.SocketError == SocketError.Success)
{
//check if the buffer is full used(ByteTransfered, OffSet)
var bufferStatus = args.UserToken as BufferStatus;
bufferStatus.Buffer.Append(args.Buffer, args.Offset/*bufferStatus.ReadOffset*/, args.BytesTransferred);
bufferStatus.IncreaseTransfered(args.BytesTransferred);
var decoder = ProtocolFactory.GetDecoder();
var data = bufferStatus.Buffer.copyAvaliableBytes();
var result = decoder.Decodable(data);
switch (result)
{
case DecodeResult.NEED_DATA:
socket.ReceiveAsync(args);
break;
case DecodeResult.NOT_OK:
throw new BadImageFormatException();
case DecodeResult.OK:
int usedSize;
object netMsg = null;
decoder.Decode(data, out usedSize, out netMsg);
ThreadPool.QueueUserWorkItem(Handler.MessageReceived, netMsg);
bufferStatus.Buffer.Reset();
bufferStatus.Buffer.Append(data, usedSize, data.Length - usedSize);
socket.ReceiveAsync(args);
break;
}
}
}
private void _socketArgs_Sended(object sender, SocketAsyncEventArgs args)
{
if (args.SocketError == SocketError.Success)
{
}
else
{
}
_asyncSendArgsPool.Put(args);
}
}
代码下载 http://files.cnblogs.com/wJiang/ClientConnLib.rar
这是一个异步IO服务接口,含有一下几个方法。
AsyncConnectorpublic class AsyncConnector : IAsyncConnector
{
#region async sockets
private Socket _socket;
#endregion
#region reused socketasynceventargs
private GenericPool<SocketAsyncEventArgs> _asyncSendArgsPool = new GenericPool<SocketAsyncEventArgs>();
private GenericPool<SocketAsyncEventArgs> _asyncRecvArgsPool = new GenericPool<SocketAsyncEventArgs>();
#endregion
#region fields
//private IoSession session = new IoSession();
private ConnectionStatus _connectionStatus = ConnectionStatus.Unused;
#endregion
#region ctors
public AsyncConnector()
{
var sendAspectHandler = new SocketSendEventArgsAspectHandler();
sendAspectHandler.OnClearupEventReference = (p) =>
{
p.Completed -= _socketArgs_Completed;
};
_asyncSendArgsPool
= new GenericPool<SocketAsyncEventArgs>(GenericPool<SocketAsyncEventArgs>.DEFAULT_MAXSIZE, sendAspectHandler);
var recvAspectHandler = new SocketRecvEventArgsAspectHandler();
recvAspectHandler.OnInitializeEvent += (p) =>
p.Completed += _socketArgs_Completed;
recvAspectHandler.OnClearupEventReference = (p) =>
p.Completed -= _socketArgs_Completed;
_asyncRecvArgsPool
= new GenericPool<SocketAsyncEventArgs>(GenericPool <SocketAsyncEventArgs>.DEFAULT_MAXSIZE, recvAspectHandler);
}
#endregion
#region Property
/*public IoSession Session
{
get
{
return session;
}
}*/
public event ConnectionStatusChangedEventHandler OnConnectionStatusChanged;
public IoHandler Handler
{
get;
set;
}
public IProtocolFactory ProtocolFactory
{
get;
set;
}
#endregion
public void Connect(EndPoint endPoint)
{
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
SocketAsyncEventArgs _socketArgs = new SocketAsyncEventArgs();
_socketArgs.Completed += new EventHandler<SocketAsyncEventArgs>(_socketArgs_Connected);
_socketArgs.RemoteEndPoint = endPoint;
ConnectionStatus = IO.ConnectionStatus.Connecting;
if (!_socket.ConnectAsync(_socketArgs))
{
_socketArgs_Connected(_socket, _socketArgs);
}
}
public ConnectionStatus ConnectionStatus
{
get
{
return _connectionStatus;
}
set
{
if (value == _connectionStatus)
return;
if (OnConnectionStatusChanged != null)
OnConnectionStatusChanged(_connectionStatus, value);
_connectionStatus = value;
}
}
public void Close()
{
_socket.Close();
ConnectionStatus = IO.ConnectionStatus.Closed;
}
public void Close(bool isImmediately)
{
_socket.Close();
ConnectionStatus = IO.ConnectionStatus.Closed;
}
public void Abort()
{
_socket.Close();
ConnectionStatus = IO.ConnectionStatus.Closed;
}
public void Dispose()
{
if (_socket != null)
{
if (_socket.Connected)
{
_socket.Shutdown(SocketShutdown.Both);
_socket.Close();
_socket.Dispose();
_socket = null;
ConnectionStatus = IO.ConnectionStatus.Closed;
}
_asyncRecvArgsPool.Empty();
_asyncSendArgsPool.Empty();
}
//session.Dispose();
}
public void Send<T>(T msg)
{
var args = _asyncSendArgsPool.Resolve();
byte[] ret = ProtocolFactory.GetEncoder<T>().Encode(msg);
args.SetBuffer(ret, 0, ret.Length);
_socket.SendAsync(args);
}
private void _socketArgs_Connected(object sender, SocketAsyncEventArgs args)
{
args.Completed -= _socketArgs_Connected;
if (args.SocketError == SocketError.Success)
{
ConnectionStatus = IO.ConnectionStatus.Connected;
var recvArgs = _asyncRecvArgsPool.Resolve();
if (!_socket.ReceiveAsync(recvArgs))
_socketArgs_Received(sender, recvArgs);
}
else
{
ConnectionStatus = IO.ConnectionStatus.Unavailable;
}
}
private void _socketArgs_Completed(object sender, SocketAsyncEventArgs args)
{
switch(args.LastOperation)
{
case SocketAsyncOperation.Send:
_socketArgs_Sended(sender, args);
break;
case SocketAsyncOperation.Receive:
_socketArgs_Received(sender, args);
break;
}
}
private void _socketArgs_Received(object sender, SocketAsyncEventArgs args)
{
var socket = sender as Socket;
if (args.SocketError == SocketError.Success)
{
//check if the buffer is full used(ByteTransfered, OffSet)
var bufferStatus = args.UserToken as BufferStatus;
bufferStatus.Buffer.Append(args.Buffer, args.Offset/*bufferStatus.ReadOffset*/, args.BytesTransferred);
bufferStatus.IncreaseTransfered(args.BytesTransferred);
var decoder = ProtocolFactory.GetDecoder();
var data = bufferStatus.Buffer.copyAvaliableBytes();
var result = decoder.Decodable(data);
switch (result)
{
case DecodeResult.NEED_DATA:
socket.ReceiveAsync(args);
break;
case DecodeResult.NOT_OK:
throw new BadImageFormatException();
case DecodeResult.OK:
int usedSize;
object netMsg = null;
decoder.Decode(data, out usedSize, out netMsg);
ThreadPool.QueueUserWorkItem(Handler.MessageReceived, netMsg);
bufferStatus.Buffer.Reset();
bufferStatus.Buffer.Append(data, usedSize, data.Length - usedSize);
socket.ReceiveAsync(args);
break;
}
}
}
private void _socketArgs_Sended(object sender, SocketAsyncEventArgs args)
{
if (args.SocketError == SocketError.Success)
{
}
else
{
}
_asyncSendArgsPool.Put(args);
}
}
代码下载 http://files.cnblogs.com/wJiang/ClientConnLib.rar
相关文章推荐
- 一个.net客户端通讯框架的设计(一)---前言
- 一个.net客户端通讯框架的设计(二)---准备FastBuffer和BOConverter
- 一个.net客户端通讯框架的设计(四)---协议解码/编码器工厂以及为其他模块提供消息的dealers/trigers
- 通讯框架 T-io 学习——给初学者的Demo:ShowCase设计分析
- 客户端websocket(C#)长连接及简易RPC框架设计(一)
- 客户端单SOCKET连接框架设计
- 客户端websocket(c#)长连接及简易rpc框架设计(二)
- [连载]《C#通讯(串口和网络)框架的设计与实现》-1.通讯框架介绍
- Netty实现服务端客户端长连接通讯及心跳检测
- 无线客户端框架设计(5.1):将JSON映射为实体对象(iOS篇)
- [更新设计]跨平台物联网通讯框架ServerSuperIO 2.0 ,功能、BUG、细节说明,以及升级思考过程!
- 客户端与服务端长连接Mina框架讲解
- 详解JavaScript中的客户端消息框架设计原理
- 一个AJAX客户端连接框架(自创)
- 无线客户端框架设计(5):调用MobileAPI的设计(iOS篇)
- 无线客户端框架设计(2):项目结构的设计(iOS篇)
- 百万级 TCP 长连接即时通讯框架 t-io
- Mina长连接框架实现Android客户端与服务器端通信
- [连载]《C#通讯(串口和网络)框架的设计与实现》- 6.通讯控制器的设计
- 分享基于分布式Http长连接框架--设计模型