您的位置:首页 > 编程语言 > C#

C# 异步Socket

2013-11-08 11:59 232 查看
还是那个BUSLED项目;

因为客户端会不定期的发送握手消息包过来,服务端将回发握手消息包;

同时,服务端也可能会主动下发消息包到客户端;阻塞的话,没有办法做到 同时接收和发送消息包,所以就只好做异步了;

看了看,c# MSDN的文档,发现其实挺容易;

/// <summary>

    /// 异步的 TCPClient 发送和接受消息类;

    /// </summary>

    public class AsyncTCPCLient

    {

        private TcpClient  tcpConn;

        private Stream socketStream = null;

        private Socket socket;

        private BinaryReader br = null;

        private BinaryWriter bw = null;

        private Action<Exception> expNotify;

        

        /// <summary>

        /// 构造函数

        /// </summary>

        /// <param name="tcpConn">已经建立好连接的TCPClient对象</param>

        /// <param name="expNotify">异常时的处理函数</param>

        public AsyncTCPCLient(TcpClient tcpConn, Action<Exception> expNotify)

        {

            this.tcpConn = tcpConn;

            this.socket = tcpConn.Client;

            this.expNotify = expNotify;

        }

        /// <summary>

        /// 启动异步消息接收

        /// </summary>

        /// <param name="receiveObject"></param>

        public void BeginAsyncReceive(ReceiveStateObject receiveObject)

        {

            try

            {

                var startOffset = receiveObject.StartOffset;

                var leftBytes = receiveObject.Content.Length - startOffset;

                socket.BeginReceive(receiveObject.Content, receiveObject.StartOffset, leftBytes, 0, new AsyncCallback(BeginAsyncReceiveCallback), receiveObject);

            }

            catch (Exception e)

            {

                Console.WriteLine(e.ToString());

                if (expNotify != null)

                {

                    expNotify(e);

                }                

            }

        }

        /// <summary>

        /// 异步消息接收的回调函数

        /// </summary>

        /// <param name="ar"></param>

        private void BeginAsyncReceiveCallback(IAsyncResult ar)

        {

            ReceiveStateObject nowStateObj = (ReceiveStateObject)ar.AsyncState;

            try

            {                

                int readBytes = socket.EndReceive(ar);

                if (readBytes > 0)

                {

                    nowStateObj.JudgeContent(readBytes);

                    if (nowStateObj.State == ReceiveStateObject.READ_COMPLETE_MESSAGE) //请注意,不要合并起来,

                    {

                       //nowStateObj = new ReceiveStateObject();

                       // BeginAsyncReceive(nowStateObj);

                       nowStateObj.handleMessage();  //否则,必须等到 handleMessage 调用完才可以再去接收消息;

                       // return;

                    }

                }

                BeginAsyncReceive(nowStateObj);                

            }

            catch (Exception e)

            {

                Console.WriteLine(e.ToString());

                if (expNotify != null)

                {

                    expNotify(e);

                } 

            }

        }

        /// <summary>

        /// 异步写

        /// </summary>

        /// <param name="stateObject"></param>

        public void SyncWriter(SendStateObject stateObject)

        {

            try {

                var startOffset = stateObject.StartOffset;

                var leftBytes = stateObject.Content.Length - startOffset;

                socket.BeginSend(stateObject.Content, startOffset, leftBytes, 0, new AsyncCallback(SyncWriterCallBack), stateObject);

            }

            catch (Exception e)

            {

                Console.WriteLine(e.ToString());

                if (expNotify != null)

                {

                    expNotify(e);

                }

            }

         }

        /// <summary>

        /// 异步写的回调

        /// </summary>

        /// <param name="ar"></param>

        public void SyncWriterCallBack(IAsyncResult ar)

        {

            try

            {

                SendStateObject stateObject = (SendStateObject)(ar.AsyncState);

                var sendBytes = socket.EndSend(ar);

                stateObject.StartOffset += sendBytes;

                if (stateObject.StartOffset < stateObject.Content.Length)

                {

                    SyncWriter(stateObject);

                }

            }

            catch (Exception e)

            {

                Console.WriteLine(e.ToString());

                if (expNotify != null)

                {

                    expNotify(e);

                } 

            }

            

        }

    }

    /// <summary>

    /// 发送的状态对象;

    /// </summary>

    public class SendStateObject

    {

        public int StartOffset = 0;

        public byte[] Content = null;

        

        /// <summary>

        /// 发送给定内容;

        /// </summary>

        /// <param name="content"></param>

        public SendStateObject(byte[] content)

        {

            this.Content = content;

            StartOffset = 0;

        }

        /// <summary>

        /// 

        /// </summary>

        /// <param name="content"></param>

        public SendStateObject(BusMessage msg) 

        {

            byte[] msgContent = msg.Serialize();

            this.Content = msgContent;

            

            if (System.Configuration.ConfigurationSettings.AppSettings["logmessage"] == "1")

            {

                string filepath = "c:\\temp\\发送_" + msg.MsgType + "_" + DateTime.Now.ToString("yyyyMMddHHmmssfff") + ".bin";

                FileUtil.writeFileBinaryContent(filepath, Content);

            }

            StartOffset = 0;

        } 

    }

    /// <summary>

    /// 接收的状态对象;

    /// 接收时,注意根据接收的内容改变 缓冲区大小和位置;

    /// </summary>

    public class ReceiveStateObject    

    {

        public const int WAIT_HEAD = 0;

        const int WAIT_LEN = 1;

        const int WAIT_CONTNET = 2;

        public const int READ_COMPLETE_MESSAGE = 3;

        

        public int State = -1;

        public int StartOffset = 0;

        Action<BusMessage> MessageHandle;

        public ReceiveStateObject(Action<BusMessage> messageHandle)

        {

            ChangeState(WAIT_HEAD);

            MessageHandle = messageHandle;

        }

        MemoryStream ms = new MemoryStream(1050);

        private byte[] _content;

        public byte[] Content

        {

            get { return _content; }

        }

        /// <summary>

        /// 判断读到的内容,并根据读到的内容改变状态;

        /// </summary>

        /// <param name="readBytes">读到的字节数目</param>

        public void JudgeContent(int readBytes)

        {

            switch (State)

            {

                case WAIT_HEAD: 

                    if (_content[0]==0x7e) 

                        ChangeState(WAIT_LEN);

                    break;

                case WAIT_LEN:

                    StartOffset += readBytes;

                    if (StartOffset >= _content.Length)

                    {

                        ChangeState(WAIT_CONTNET);

                    }                                                          

                    break;

                case WAIT_CONTNET:

                     StartOffset += readBytes;

                     if (readBytes >= _content.Length)

                     {

                         ChangeState(READ_COMPLETE_MESSAGE);

                     }

                     break;                   

            }

        }

        public void ChangeState(int newState)

        {

            if (State == newState) return;

            State = newState;

            switch (newState)

            {

                case WAIT_HEAD: 

                    _content = new byte[1]; 

                    ms.Position = 0;

                    StartOffset = 0;

                    break;

                case WAIT_LEN:                      

                    ms.Write(_content, 0, _content.Length);

                    _content = new byte[2];

                    StartOffset = 0;

                    break;

                case WAIT_CONTNET: 

                     ms.Write(_content, 0, _content.Length);                     

                    _content = new byte[NetworkBitConverter.ToUInt16(_content,0) - 3];

                    StartOffset = 0;

                    break;

                case READ_COMPLETE_MESSAGE:

                    ms.Write(_content, 0, _content.Length);

                    break;                    

            }

        }

        /// <summary>

        /// 对当前收到消息的处理函数;

        /// </summary>

        public virtual void handleMessage()

        {            

            if (MessageHandle != null)

            {

                ms.Position = 0;

                BusMessage msg = BusMessageListener.StaticDeserialize(new NetworkBinaryReader(ms), null);

                if (msg != null)

                {

                    if (System.Configuration.ConfigurationSettings.AppSettings["logmessage"] == "1")

                    {

                        string filepath = "c:\\temp\\接收_" + msg.MsgType + "_" + DateTime.Now.ToString("yyyyMMddHHmmssfff") + ".bin";

                        FileUtil.writeFileBinaryContent(filepath, ms.ToArray());

                    }

                    MessageHandle(msg);

                }

            }

            ChangeState(WAIT_HEAD);

        }        

    }  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: