您的位置:首页 > 理论基础 > 计算机网络

P2P网络通讯程序(C#)

2011-07-24 00:42 393 查看
在网上看了很多程序(QQ、Azureus、Ants、PPStream)都实现了p2p,以前觉得技术很高深。通过这段时间的学习才发现,单纯的实现p2p在局域网通讯很容易,但是要实现外网穿透(NAT)感觉很困难。最近看了Azureus和emule源码,分别是J***A和C++版,本人对这两门语言都不熟悉,看起来很吃力。最后只好根据VC++实现的P2PDemo程序进行了改版,根据设计思路用c#写了一个Demo出来。通过测试,多个客户端在局域网能脱离服务端实现端到端工作。外网的情况要通过路由器,看了Azureus要实现uPnp进行端口映射,在CodeProject上下载了一个uPnp源码看,测试结果没有启用uPnp路由器。结果现在郁闷了,不知道下一步怎么测试,是不是用upnp实现了端口自动映射成功就能实现象QQ那样通讯。

下面是程序说明:

1、公共类

公共类主要定义一些包结构

a、Packet.cs[Serializable()]

public abstract class Packet

{

/// <summary>

/// 命令类型

/// </summary>

/// <returns></returns>

public virtual int GetCommandType()

{

return -1;

}

/// <summary>

/// 用户名

/// </summary>

public string UserName

{

get;

set;

}

public Packet()

{ }

public Packet(string username)

{

this.UserName = username;

}

}

复制代码b、MassTextPacket.cs --分片传输类[Serializable()]

public class MassTextPacket:TextPacket

{

private int seqID;

/// <summary>

/// 包序列

/// </summary>

public int SeqID

{

get { return seqID; }

set { seqID = value; }

}

private int seqCount;

/// <summary>

/// 包数量

/// </summary>

public int SeqCount

{

get { return seqCount; }

set { seqCount = value; }

}

private int _CLSD;

public int CLSD

{

get { return _CLSD; }

set { _CLSD = value; }

}

}

复制代码2、客户端

a、消息传送时进行p2p通讯 private bool SendMessageTo(string toUserName, Packet packet)

{

PeerEntity toUser = userList.Single(c => c.UserName == toUserName);

if (toUser == null)

{

return false;

}

ReceivedACK = false;

for (int i=0; i<MAXRETRY; i++)

{

// 如果对方P2P地址不为0,就试图以它为目的地址发送数据,

// 如果发送失败,则认为此P2P地址无效

if (toUser.P2PAddress != null && toUser.P2PAddress.Port != 0)

{

if (packet.GetType() == typeof(TextPacket))

{

TextPacket msgPacket = new TextPacket(toUserName, (packet as TextPacket).Message);

byte[] buffer = UtilityHelper.Serialize(msgPacket);

if (buffer.Length > MAXBUFFERSIZE)

{



MassTextPacket mtp = new MassTextPacket();

mtp.SeqID = 0;

mtp.SeqCount = (int)System.Math.Ceiling(buffer.Length / (decimal)MAXBUFFERSIZE);

mtp.CLSD = mtp.GetHashCode();



long pos = 0;

long count = buffer.Length < MAXBUFFERSIZE ? buffer.Length : MAXBUFFERSIZE;

while (pos < buffer.Length && pos > 0)

{

byte[] bytes = new byte[count]; ;

for (int k = 0; k < count; k++)

bytes[k] = buffer[pos + k];

//数据组包

mtp.SeqID = mtp.SeqID + 1;

mtp.Message = Convert.ToBase64String(bytes);

//发送数据

byte[] buf = UtilityHelper.Serialize(mtp);

client.Send(buf, buf.Length, toUser.P2PAddress);

Thread.Sleep(100);

}

}

else

client.Send(buffer, buffer.Length, toUser.P2PAddress);

}

else if (packet.GetType() == typeof(FileStreamPacket))

{

FileStreamPacket fsp = packet as FileStreamPacket;

System.IO.FileStream fs = new System.IO.FileStream(fsp.FileName, System.IO.FileMode.Open, System.IO.FileAccess.Read, FileShare.Read);

handle1.Reset();

fsp.SeqID = 0;

fsp.SeqCount = (int)System.Math.Ceiling(fs.Length / (decimal)MAXBUFFERSIZE);

fsp.CLSD = fsp.GetHashCode();

long pos = 0;

long count = fs.Length < MAXBUFFERSIZE ? fs.Length : MAXBUFFERSIZE;

while (pos < fs.Length && count > 0)

{

byte[] buffer = new byte[count];

fs.Seek(pos, SeekOrigin.Begin);

fs.Read(buffer, 0, (int)count);

pos += count;

count = pos + MAXBUFFERSIZE < fs.Length ? MAXBUFFERSIZE : fs.Length - pos;

//数据组包

fsp.SeqID = fsp.SeqID + 1;

fsp.Message = Convert.ToBase64String(buffer);

//发送数据

byte[] buf = UtilityHelper.Serialize(fsp);

client.Send(buf, buf.Length, toUser.P2PAddress);

Thread.Sleep(300);

}

handle1.Set();

}

// 等待接收线程将标记修改

for (int j = 0; j < 10; j++)

{

if (this.ReceivedACK)

{

this.ReceivedACK = false;

return true;

}

else

{

Thread.Sleep(300);

}

}

}

// 构建P2P打洞封包

// 然后通过服务器转发,请求对方向自己打洞

P2PConnectionPacket transMsg = new P2PConnectionPacket(UserName, toUserName);

byte[] msgBuffer = UtilityHelper.Serialize(transMsg);

client.Send(msgBuffer, msgBuffer.Length, hostPoint);

// 等待对方的P2PCONNECTACK消息

for(int j = 0; j < 10; ++j)

{

toUser = userList.Single(c => c.UserName == toUserName);

if ( toUser.P2PAddress != null && toUser.P2PAddress.Port != 0)

break;

Thread.Sleep(300);

}



}

return false;

}

复制代码b、消息接受线程/// <summary>

/// 接受线程处理

/// </summary>

private void RecvThreadProc()

{

byte[] buffer;

while (true)

{





buffer = client.Receive(ref remotePoint);

Packet packet = UtilityHelper.Deserialize(buffer) as Packet;

Type msgType = packet.GetType();

if (msgType == typeof(UserListAckPacket))

{

// 转换消息

UserListAckPacket usersMsg = (UserListAckPacket)packet;

// 更新用户列表

userList.Clear();

foreach (PeerEntity user in usersMsg.Users)

{

userList.Add(user);

}

bUserListComplete = true;

}

else if (msgType == typeof(UserLoginAckPacket))

{

ProcUserLogAckMsg(packet);

}

else if (msgType == typeof(TextPacket))

{

// 转换消息

TextPacket txtPacket = (TextPacket)packet;

printf("Receive a message: {0}", txtPacket.Message);

// 发送应答消息

P2PAckPacket ackMsg = new P2PAckPacket();

buffer = UtilityHelper.Serialize(ackMsg);

client.Send(buffer, buffer.Length, remotePoint);

}

else if (msgType == typeof(MassTextPacket))

{

lock (this)

{

MassTextPacket fPacket = (MassTextPacket)packet;

if (packets.ContainsKey(fPacket.CLSD))

packets[fPacket.CLSD].Add(fPacket);

else

packets.Add(fPacket.CLSD, new List<MassTextPacket>() { fPacket });

printf("PacketID:{0}--SeqNo:{1}--progress:{2}%", fPacket.CLSD, fPacket.SeqID, (int)(System.Math.Round(packets[fPacket.CLSD].Count / (decimal)(fPacket as MassTextPacket).SeqCount, 2) * 100));

//组包

if ((fPacket as MassTextPacket).SeqCount == packets[fPacket.CLSD].Count)

{

List<MassTextPacket> temp = packets[fPacket.CLSD].OrderBy(c => c.SeqID).ToList();

List<byte> values = new List<byte>();

foreach (MassTextPacket mt in temp)

{

byte[] buf = Convert.FromBase64String(mt.Message);

values.AddRange(buf);

}

MassTextPacket value = UtilityHelper.Deserialize(values.ToArray()) as MassTextPacket;

printf("Receive a message: {0}", value.Message);



// 发送应答消息

P2PAckPacket ackMsg = new P2PAckPacket();

buffer = UtilityHelper.Serialize(ackMsg);

client.Send(buffer, buffer.Length, remotePoint);

}

}

}

else if (msgType == typeof(FileStreamPacket))

{

lock (this)

{

FileStreamPacket fPacket = (FileStreamPacket)packet;

if (packets.ContainsKey(fPacket.CLSD))

packets[fPacket.CLSD].Add(fPacket);

else

packets.Add(fPacket.CLSD, new List<MassTextPacket>() { fPacket });

printf("PacketID:{0}--SeqNo:{1}--progress:{2}%", fPacket.CLSD, fPacket.SeqID, (int)(System.Math.Round(packets[fPacket.CLSD].Count / (decimal)(fPacket as FileStreamPacket).SeqCount, 2) * 100));

//组包

if ((fPacket as FileStreamPacket).SeqCount == packets[fPacket.CLSD].Count)

{

List<MassTextPacket> temp = packets[fPacket.CLSD].OrderBy(c => c.SeqID).ToList();

System.IO.FileStream fs = new System.IO.FileStream((fPacket as FileStreamPacket).FileName + ".tmp", System.IO.FileMode.Create, System.IO.FileAccess.ReadWrite);

foreach (FileStreamPacket mt in temp)

{

byte[] buf = Convert.FromBase64String(mt.Message);

fs.Write(buf, 0, buf.Length);

}

fs.Flush();

fs.Close();

printf("Receive a file: {0}", (fPacket as FileStreamPacket).FileName);

//清除数据包

packets[fPacket.CLSD].Clear();

// 发送应答消息

P2PAckPacket ackMsg = new P2PAckPacket();

buffer = UtilityHelper.Serialize(ackMsg);

client.Send(buffer, buffer.Length, remotePoint);

}

}



}

else if (msgType == typeof(P2PAckPacket))

{

this.ReceivedACK = true;

}

else if (msgType == typeof(P2PPurchHolePacket))

{

ProcP2PPurchHoleMsg(packet, remotePoint);

}

else if (msgType == typeof(P2PPurchHoleAckPacket))

{

PeerEntity touser = userList.SingleOrDefault(c => c.UserName == (packet as P2PPurchHoleAckPacket).ToUserName);

//更改本地的P2P连接时使用的IP地址

touser.P2PAddress = touser.RemoteEndPoint;

}

Thread.Sleep(100);

}

}

复制代码c.建立p2p会话 private void ProcP2PPurchHoleMsg(Packet packet,IPEndPoint remoteEP)

{

//打洞请求消息

P2PPurchHolePacket purchReqMsg = (P2PPurchHolePacket)packet;

PeerEntity toUser = userList.Single(c => c.UserName == purchReqMsg.ToUserName);

PeerEntity user = userList.Single(c => c.UserName == purchReqMsg.UserName);

toUser.P2PAddress = toUser.RemoteEndPoint;

printf("Set P2P Address for {0}->[{1}]", user.UserName, toUser.P2PAddress.ToString());



//uPnp实现端口映射

if(NAT.AddPortMapping(toUser.P2PAddress.Port, ProtocolType.Udp, "AddPortMapping"))

printf("Port mapping successed!");

// 发送打洞消息到远程主机

P2PPurchHoleAckPacket trashMsg = new P2PPurchHoleAckPacket(purchReqMsg.UserName, purchReqMsg.ToUserName);

byte[] buffer = UtilityHelper.Serialize(trashMsg);

client.Send(buffer, buffer.Length, user.RemoteEndPoint);

}

复制代码3、服务端

a、消息处理线程 private void RecvThreadProc()

{

IPEndPoint remotePoint = null;

byte[] msgBuffer = null;

while (true)

{

msgBuffer = server.Receive(ref remotePoint);

try

{

object msgObj = UtilityHelper.Deserialize(msgBuffer);

switch ((msgObj as Packet).GetCommandType())

{

case Command.MSG_USERLOGIN: //用户登录

ProcUserLoginMsg(msgObj as UserLoginPacket, remotePoint);

break;

case Command.MSG_USERLOGOUT: //退出登录

ProcUserLogoutMsg(msgObj as UserLogoutPacket, remotePoint);

break;

case Command.MSG_GETUSERLIST: //所有用户列表

ProcGetUserListMsg(msgObj as UserListPacket, remotePoint);

break;

case Command.MSG_P2PCONNECT: //点对点连接信息

ProcP2PConnectMsg(msgObj as P2PConnectionPacket, remotePoint);

break;

case Command.MSG_USERACTIVEQUERY: // 用户对服务器轮询的应答

ProcUserActiveQueryMsg(msgObj as UserActiveQueryPacket, remotePoint);

break;

}

Thread.Sleep(100);

}

catch { }

}

}

复制代码b、服务端请求客户端建立p2p连接 private void ProcP2PConnectMsg(Packet packet,IPEndPoint remoteEP)

{

// 转换接受的消息

P2PConnectionPacket transMsg = (P2PConnectionPacket)packet;

printf("{0}({1}) wants to p2p {2}", remoteEP.Address.ToString(), transMsg.UserName, transMsg.ToUserName);

// 获取目标用户

PeerEntity toUser = userList.SingleOrDefault(c => c.UserName == transMsg.ToUserName);



// 转发Purch Hole请求消息

P2PPurchHolePacket transMsg2 = new P2PPurchHolePacket(transMsg.UserName, toUser.UserName);

//转发消息

byte[] buffer = UtilityHelper.Serialize(transMsg2);

server.Send(buffer, buffer.Length, toUser.RemoteEndPoint);



}

复制代码4、测试

a、服务端

附件: p2p_srv.jpg

b、客户端

附件: p2p_b.jpg

附件: p2p_a.jpg

困惑:

1、能不能实现外网通讯,要实现像QQ那样通讯要做哪些改进。

2、文件续传如何实现。

3、c#封装的网络操作类(像QQ.NET源码的Net实现)

4、远程协助的实现。

DEMO下载:[ 评价与收藏 ]下载地址
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: