您的位置:首页 > 理论基础 > 计算机网络

c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据

2015-02-14 22:00 603 查看
本文基于networkcomms2.3.1开源版本 gplv3协议

在networkcomms通信系统中,服务器端收到某连接上的数据后,数据会暂时存放在"数据包创建器"(PacketBuilder)中,PacketBuilder类似一个流动的容器,收到的数据被服务器处理完成后,相应在二进制数据,会从存储他的PacketBuilder中删除。

我们知道在networkcomms的消息体系中,传送的数据的第一个字节用来存储数据包包头长度,解析出数据包包头后,包头中包含数据包长度。所以在读入进入PacketBuilder中的数据,会根据第一个字节中存储的数据,解析出包头的长度,进一步解析出数据包的长度。如果PacketBuilder中的存储的字节大小,大于数据包的长度,那么主程序将会对PacketBuilder中的数据,进行提取,并进行处理。

此节,我们暂时不讨论PacketBuilder的细节,先看一下服务器如何对PacketBuilder中的数据进行处理

protected void IncomingPacketHandleHandOff(PacketBuilder packetBuilder)
{
try
{
if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... checking for completed packet with " + packetBuilder.TotalBytesCached.ToString() + " bytes read.");

if (packetBuilder.TotalPartialPacketCount == 0)
throw new Exception("Executing IncomingPacketHandleHandOff when no packets exist in packetbuilder.");

//循环,直到我们完成对packetBuilder的处理
//Loop until we are finished with this packetBuilder
int loopCounter = 0;
while (true)
{
//If we have ended up with a null packet at the front, probably due to some form of concatentation we can pull it off here
//It is possible we have concatenation of several null packets along with real data so we loop until the firstByte is greater than 0
//此处,可能是处理心跳检测时对方发来的0字节数据
if (packetBuilder.FirstByte() == 0)
{
if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... null packet removed in IncomingPacketHandleHandOff() from " + ConnectionInfo + ", loop index - " + loopCounter.ToString());

packetBuilder.ClearNTopBytes(1);

//Reset the expected bytes to 0 so that the next check starts from scratch
packetBuilder.TotalBytesExpected = 0;

//If we have run out of data completely then we can return immediately
if (packetBuilder.TotalBytesCached == 0) return;
}
else
{
//First determine the expected size of a header packet
//获取数据包包头长度
int packetHeaderSize = packetBuilder.FirstByte() + 1;

//Do we have enough data to build a header?
//如果没有足够的二级制数据来还原出一个数据包包头
if (packetBuilder.TotalBytesCached < packetHeaderSize)
{
if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... ... more data required for complete packet header.");

//Set the expected number of bytes and then return
packetBuilder.TotalBytesExpected = packetHeaderSize;
return;
}

//We have enough for a header
//有足够的数据来还原出数据包包头
PacketHeader topPacketHeader;
using(MemoryStream headerStream = packetBuilder.ReadDataSection(1, packetHeaderSize - 1))
//数据包包头
topPacketHeader = new PacketHeader(headerStream, NetworkComms.InternalFixedSendReceiveOptions);

//Idiot test
if (topPacketHeader.PacketType == null)
throw new SerialisationException("packetType value in packetHeader should never be null");

//We can now use the header to establish if we have enough payload data
//First case is when we have not yet received enough data
//如果没有足够的数据来还原出数据包
if (packetBuilder.TotalBytesCached < packetHeaderSize + topPacketHeader.PayloadPacketSize)
{
if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... ... more data required for complete packet payload. Expecting " + (packetHeaderSize + topPacketHeader.PayloadPacketSize).ToString() + " total packet bytes.");

//Set the expected number of bytes and then return
packetBuilder.TotalBytesExpected = packetHeaderSize + topPacketHeader.PayloadPacketSize;
return;
}
//Second case is we have enough data
//有足够的数据还原出数据包
else if (packetBuilder.TotalBytesCached >= packetHeaderSize + topPacketHeader.PayloadPacketSize)
{
//We can either have exactly the right amount or even more than we were expecting
//We may have too much data if we are sending high quantities and the packets have been concatenated
//no problem!!
SendReceiveOptions incomingPacketSendReceiveOptions = IncomingPacketSendReceiveOptions(topPacketHeader);
if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Debug("Received packet of type '" + topPacketHeader.PacketType + "' from " + ConnectionInfo + ", containing " + packetHeaderSize.ToString() + " header bytes and " + topPacketHeader.PayloadPacketSize.ToString() + " payload bytes.");

//If this is a reserved packetType we call the method inline so that it gets dealt with immediately
//判断是否为保留类型,保留类型会优先处理
//保留类型包括心跳检测、连接建立等
bool isReservedType = false;
foreach (var tName in NetworkComms.reservedPacketTypeNames)
{
//isReservedType |= topPacketHeader.PacketType == tName;
if (topPacketHeader.PacketType == tName)
{
isReservedType = true;
break;
}
}

//Only reserved packet types get completed inline
//如果数据包类型为保留类型  设定为高优先级
if (isReservedType)
{
#if WINDOWS_PHONE
var priority = QueueItemPriority.Normal;
#else
var priority = (QueueItemPriority)Thread.CurrentThread.Priority;
#endif
//创建优先级队列项目
PriorityQueueItem item = new PriorityQueueItem(priority, this, topPacketHeader, packetBuilder.ReadDataSection(packetHeaderSize, topPacketHeader.PayloadPacketSize), incomingPacketSendReceiveOptions);
if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... handling packet type '" + topPacketHeader.PacketType + "' inline. Loop index - " + loopCounter.ToString());
NetworkComms.CompleteIncomingItemTask(item);
}
else
{
//创建优先级队列项目
QueueItemPriority itemPriority = (incomingPacketSendReceiveOptions.Options.ContainsKey("ReceiveHandlePriority") ? (QueueItemPriority)Enum.Parse(typeof(QueueItemPriority), incomingPacketSendReceiveOptions.Options["ReceiveHandlePriority"]) : QueueItemPriority.Normal);
//把数据包包头,和数据包数据 赋值给优先级队列项
PriorityQueueItem item = new PriorityQueueItem(itemPriority, this, topPacketHeader, packetBuilder.ReadDataSection(packetHeaderSize, topPacketHeader.PayloadPacketSize), incomingPacketSendReceiveOptions);

//QueueItemPriority.Highest is the only priority that is executed inline
#if !WINDOWS_PHONE
//如果是最高优先级,当前线程处理
if (itemPriority == QueueItemPriority.Highest)
{
if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... handling packet type '" + topPacketHeader.PacketType + "' with priority HIGHEST inline. Loop index - " + loopCounter.ToString());
NetworkComms.CompleteIncomingItemTask(item);
}
else
{
//不是最高优先级 交给自定义线程池处理   此线程池支持优先级处理
int threadId = NetworkComms.CommsThreadPool.EnqueueItem(item.Priority, NetworkComms.CompleteIncomingItemTask, item);
if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... added completed " + item.PacketHeader.PacketType + " packet to thread pool (Q:" + NetworkComms.CommsThreadPool.QueueCount.ToString() + ", T:" + NetworkComms.CommsThreadPool.CurrentNumTotalThreads.ToString() + ", I:" + NetworkComms.CommsThreadPool.CurrentNumIdleThreads.ToString() + ") with priority " + itemPriority.ToString() + (threadId > 0 ? ". Selected threadId=" + threadId.ToString() : "") + ". Loop index=" + loopCounter.ToString() + ".");
}
#else
int threadId = NetworkComms.CommsThreadPool.EnqueueItem(item.Priority, NetworkComms.CompleteIncomingItemTask, item);
if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... added completed " + item.PacketHeader.PacketType + " packet to thread pool (Q:" + NetworkComms.CommsThreadPool.QueueCount.ToString() + ", T:" + NetworkComms.CommsThreadPool.CurrentNumTotalThreads.ToString() + ", I:" + NetworkComms.CommsThreadPool.CurrentNumIdleThreads.ToString() + ") with priority " + itemPriority.ToString() + (threadId > 0 ? ". Selected threadId=" + threadId.ToString() : "") + ". Loop index=" + loopCounter.ToString() + ".");
#endif
}

//从PacketBuilder中删除已经处理过的数据
//We clear the bytes we have just handed off
if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Removing " + (packetHeaderSize + topPacketHeader.PayloadPacketSize).ToString() + " bytes from incoming packet buffer from connection with " + ConnectionInfo +".");
packetBuilder.ClearNTopBytes(packetHeaderSize + topPacketHeader.PayloadPacketSize);

//Reset the expected bytes to 0 so that the next check starts from scratch
packetBuilder.TotalBytesExpected = 0;

//If we have run out of data completely then we can return immediately
if (packetBuilder.TotalBytesCached == 0) return;
}
else
throw new CommunicationException("This should be impossible!");
}

loopCounter++;
}
}
catch (Exception ex)
{
//Any error, throw an exception.
if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Fatal("A fatal exception occured in IncomingPacketHandleHandOff(), connection with " + ConnectionInfo + " be closed. See log file for more information.");

NetworkComms.LogError(ex, "CommsError");
CloseConnection(true, 45);
}
}


在上面的数据处理过程中,不知道您是否注意到一个重要的概念,即“自定义线程池”,
CommsThreadPool

此线程池,可以说是Networkcomms通信框架中一颗璀璨的明珠,由他负责处理接收到的所有数据,此线程池支持优先级,即高优先级的数据会被优先处理

www.networkcomms.cn
www.cnblogs.com/networkcomms
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐