您的位置:首页 > 编程语言 > C#

C# Socket 多线程可断点传送大文件3

2013-05-23 17:49 435 查看
更新使用.net 4.0 Parallel 来代替 new Thread();

/*********************************************************************************
** File Name    :    FileTransmitor.cs
** Copyright (C) 2010 Snda Network Corporation. All Rights Reserved.
** Creator        :    RockyWong
** Create Date    :    2010-06-02 11:22:45
** Update Date    :    2013-01-11 11:35:26
** Description    :    多线程多管道可断点传输大文件
** Version No    :
*********************************************************************************/
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Rocky.Net
{
public sealed class FileTransfer : Disposable
{
#region Fields
internal const int PerLongSize = sizeof(long);
internal const string PointExtension = ".dat";
internal const string TempExtension = ".temp";

private string _savePath;
private Socket _listener;
#endregion

#region Properties
public event EventHandler<TransferEventArgs> Prepare;
public event EventHandler<TransferEventArgs> ProgressChanged;
public event EventHandler<TransferEventArgs> Completed;

public string DirectoryPath
{
get { return _savePath; }
set
{
_savePath = value + @"\" + DateTime.Now.ToString("yyyy-MM") + @"\";
}
}
#endregion

#region Constructors
public FileTransfer()
{

}

protected override void DisposeInternal(bool disposing)
{
if (disposing)
{
SocketHelper.DisposeListener(ref _listener);
}
_listener = null;
Prepare = null;
ProgressChanged = null;
Completed = null;
}
#endregion

#region Methods
private void OnPrepare(TransferEventArgs e)
{
if (this.Prepare != null)
{
this.Prepare(this, e);
}
}

private void OnProgressChanged(TransferEventArgs e)
{
if (this.ProgressChanged != null)
{
this.ProgressChanged(this, e);
}
}

private void OnCompleted(TransferEventArgs e)
{
if (this.Completed != null)
{
this.Completed(this, e);
}
}
#endregion

#region Receive
/// <summary>
/// Listen & Receive
/// </summary>
/// <param name="savePath"></param>
/// <param name="port"></param>
public void Listen(string savePath, ushort port)
{
Contract.Requires(!string.IsNullOrEmpty(savePath));
if (_listener != null)
{
throw new ApplicationException("已启动监听");
}

Runtime.CreateDirectory(_savePath = savePath);
var localIpe = new IPEndPoint(IPAddress.Any, port);
//最多支持16线程
_listener = SocketHelper.CreateListener(localIpe, 16);
TaskHelper.Factory.StartNew(() =>
{
while (_listener != null)
{
Socket controlClient = _listener.Accept();
Runtime.LogInfo("TunnelTest 双工通讯: {0}.", controlClient.RemoteEndPoint);

TransferConfig config;
controlClient.Receive(out config);
var e = new TransferEventArgs(config);
this.OnPrepare(e);
if (e.Cancel)
{
controlClient.Close();
continue;
}

var chunkGroup = new ReceiveChunkModel[config.ThreadCount];
chunkGroup[0] = new ReceiveChunkModel(controlClient);
for (int i = 1; i < chunkGroup.Length; i++)
{
chunkGroup[i] = new ReceiveChunkModel(_listener.Accept());
}
TaskHelper.Factory.StartNew(Receive, new object[] { e, chunkGroup });
}
});
}

private void Receive(object state)
{
var args = (object[])state;
var e = (TransferEventArgs)args[0];
var chunkGroup = (ReceiveChunkModel[])args[1];
var controlClient = chunkGroup[0].Client;

e.Progress = new TransferProgress();
e.Progress.Start(e.Config.FileLength);
#region Breakpoint
int perPairCount = PerLongSize * 2, count = perPairCount * chunkGroup.Length;
byte[] bufferInfo = new byte[count];
string filePath = Path.Combine(_savePath, e.Config.Checksum + Path.GetExtension(e.Config.FileName)),
pointFilePath = Path.ChangeExtension(filePath, PointExtension), tempFilePath = Path.ChangeExtension(filePath, TempExtension);
FileStream pointStream;
long oddSize, avgSize = Math.DivRem(e.Config.FileLength, (long)chunkGroup.Length, out oddSize);
if (File.Exists(pointFilePath) && File.Exists(tempFilePath))
{
pointStream = new FileStream(pointFilePath, FileMode.Open, FileAccess.ReadWrite, FileShare.None);
pointStream.Read(bufferInfo, 0, count);
long fValue, tValue;
for (int i = 0, j = chunkGroup.Length - 1; i < chunkGroup.Length; i++)
{
fValue = BitConverter.ToInt64(bufferInfo, i * perPairCount);
tValue = BitConverter.ToInt64(bufferInfo, i * perPairCount + PerLongSize);
chunkGroup[i].Initialize(tempFilePath, e.Config.ChunkLength, i * avgSize, i == j ? avgSize + oddSize : avgSize, fValue, tValue);
Runtime.LogDebug("[Multi]Local{0} breakpoint read{1}:{2}/{3}.", _listener.LocalEndPoint, i, fValue, tValue);
}
controlClient.Send(bufferInfo);
}
else
{
pointStream = new FileStream(pointFilePath, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None);
FileStream stream = new FileStream(tempFilePath, FileMode.CreateNew, FileAccess.Write, FileShare.Write);
stream.SetLength(e.Config.FileLength);
stream.Flush();
stream.Dispose();
for (int i = 0, j = chunkGroup.Length - 1; i < chunkGroup.Length; i++)
{
chunkGroup[i].Initialize(tempFilePath, e.Config.ChunkLength, i * avgSize, i == j ? avgSize + oddSize : avgSize);
}
controlClient.Send(bufferInfo, 0, 4, SocketFlags.None);
}
var timer = new Timer(arg =>
{
long fValue, tValue;
for (int i = 0; i < chunkGroup.Length; i++)
{
chunkGroup[i].ReportProgress(out fValue, out tValue);
Buffer.BlockCopy(BitConverter.GetBytes(fValue), 0, bufferInfo, i * perPairCount, 8);
Buffer.BlockCopy(BitConverter.GetBytes(tValue), 0, bufferInfo, i * perPairCount + PerLongSize, 8);
Runtime.LogDebug("[Multi]Local{0} breakpoint write{1}:{2}/{3}.", _listener.LocalEndPoint, i, fValue, tValue);
}
pointStream.Position = 0L;
pointStream.Write(bufferInfo, 0, count);
pointStream.Flush();
}, null, TimeSpan.Zero, TimeSpan.FromSeconds(4));
#endregion
Parallel.ForEach(chunkGroup, chunk => chunk.Run());
long bytesTransferred = 0L;
do
{
chunkGroup.ReportSpeed(e.Progress, ref bytesTransferred);
this.OnProgressChanged(e);
Thread.Sleep(1000);
}
while (!chunkGroup.IsAllCompleted());
chunkGroup.ReportSpeed(e.Progress, ref bytesTransferred);
this.OnProgressChanged(e);
timer.Dispose();
pointStream.Dispose();
File.Delete(pointFilePath);
File.Move(tempFilePath, filePath);

e.Progress.Stop();
this.OnCompleted(e);
}
#endregion

#region Send
public void Send(TransferConfig config, IPEndPoint remoteIpe)
{
Contract.Requires(config != null && remoteIpe != null);

var controlChunk = new SendChunkModel(remoteIpe);
controlChunk.Client.Send(config);
var e = new TransferEventArgs(config);
this.OnPrepare(e);
if (e.Cancel || !controlChunk.Client.Connected)
{
controlChunk.Client.Close();
return;
}

var chunkGroup = new SendChunkModel[config.ThreadCount];
chunkGroup[0] = controlChunk;
for (int i = 1; i < chunkGroup.Length; i++)
{
chunkGroup[i] = new SendChunkModel(remoteIpe);
}

e.Progress = new TransferProgress();
e.Progress.Start(config.FileLength);
#region Breakpoint
int perPairCount = PerLongSize * 2, count = perPairCount * chunkGroup.Length;
byte[] bufferInfo = new byte[count];
long oddSize, avgSize = Math.DivRem(config.FileLength, (long)chunkGroup.Length, out oddSize);
if (controlChunk.Client.Receive(bufferInfo) == 4)
{
for (int i = 0, j = chunkGroup.Length - 1; i < chunkGroup.Length; i++)
{
chunkGroup[i].Initialize(config.FilePath, config.ChunkLength, i * avgSize, i == j ? avgSize + oddSize : avgSize);
}
}
else
{
long fValue, tValue;
for (int i = 0, j = chunkGroup.Length - 1; i < chunkGroup.Length; i++)
{
fValue = BitConverter.ToInt64(bufferInfo, i * perPairCount);
tValue = BitConverter.ToInt64(bufferInfo, i * perPairCount + PerLongSize);
chunkGroup[i].Initialize(config.FilePath, config.ChunkLength, i * avgSize, i == j ? avgSize + oddSize : avgSize, fValue, tValue);
Runtime.LogDebug("[Multi]Remote{0} breakpoint{1}:{2}/{3}.", remoteIpe, i, fValue, tValue);
}
}
Thread.Sleep(200);
#endregion
Parallel.ForEach(chunkGroup, chunk => chunk.Run());
long bytesTransferred = 0L;
do
{
chunkGroup.ReportSpeed(e.Progress, ref bytesTransferred);
this.OnProgressChanged(e);
Thread.Sleep(1000);
}
while (!chunkGroup.IsAllCompleted());
chunkGroup.ReportSpeed(e.Progress, ref bytesTransferred);
this.OnProgressChanged(e);

e.Progress.Stop();
this.OnCompleted(e);
}
#endregion
}
}


另外推荐一个网络小工具:

内测安装地址:http://publish.xineworld.com/cloudagent/publish.htm

上篇文章:http://www.cnblogs.com/Googler/archive/2013/01/11/2856219.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: