C# Socket 多线程可断点传送大文件3
2013-05-23 17:49
435 查看
更新使用.net 4.0 Parallel 来代替 new Thread();
另外推荐一个网络小工具:
内测安装地址:http://publish.xineworld.com/cloudagent/publish.htm
上篇文章:http://www.cnblogs.com/Googler/archive/2013/01/11/2856219.html
/********************************************************************************* ** File Name : FileTransmitor.cs ** Copyright (C) 2010 Snda Network Corporation. All Rights Reserved. ** Creator : RockyWong ** Create Date : 2010-06-02 11:22:45 ** Update Date : 2013-01-11 11:35:26 ** Description : 多线程多管道可断点传输大文件 ** Version No : *********************************************************************************/ using System; using System.Collections.Generic; using System.Diagnostics.Contracts; using System.IO; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Rocky.Net { public sealed class FileTransfer : Disposable { #region Fields internal const int PerLongSize = sizeof(long); internal const string PointExtension = ".dat"; internal const string TempExtension = ".temp"; private string _savePath; private Socket _listener; #endregion #region Properties public event EventHandler<TransferEventArgs> Prepare; public event EventHandler<TransferEventArgs> ProgressChanged; public event EventHandler<TransferEventArgs> Completed; public string DirectoryPath { get { return _savePath; } set { _savePath = value + @"\" + DateTime.Now.ToString("yyyy-MM") + @"\"; } } #endregion #region Constructors public FileTransfer() { } protected override void DisposeInternal(bool disposing) { if (disposing) { SocketHelper.DisposeListener(ref _listener); } _listener = null; Prepare = null; ProgressChanged = null; Completed = null; } #endregion #region Methods private void OnPrepare(TransferEventArgs e) { if (this.Prepare != null) { this.Prepare(this, e); } } private void OnProgressChanged(TransferEventArgs e) { if (this.ProgressChanged != null) { this.ProgressChanged(this, e); } } private void OnCompleted(TransferEventArgs e) { if (this.Completed != null) { this.Completed(this, e); } } #endregion #region Receive /// <summary> /// Listen & Receive /// </summary> /// <param name="savePath"></param> /// <param name="port"></param> public void Listen(string savePath, ushort port) { Contract.Requires(!string.IsNullOrEmpty(savePath)); if (_listener != null) { throw new ApplicationException("已启动监听"); } Runtime.CreateDirectory(_savePath = savePath); var localIpe = new IPEndPoint(IPAddress.Any, port); //最多支持16线程 _listener = SocketHelper.CreateListener(localIpe, 16); TaskHelper.Factory.StartNew(() => { while (_listener != null) { Socket controlClient = _listener.Accept(); Runtime.LogInfo("TunnelTest 双工通讯: {0}.", controlClient.RemoteEndPoint); TransferConfig config; controlClient.Receive(out config); var e = new TransferEventArgs(config); this.OnPrepare(e); if (e.Cancel) { controlClient.Close(); continue; } var chunkGroup = new ReceiveChunkModel[config.ThreadCount]; chunkGroup[0] = new ReceiveChunkModel(controlClient); for (int i = 1; i < chunkGroup.Length; i++) { chunkGroup[i] = new ReceiveChunkModel(_listener.Accept()); } TaskHelper.Factory.StartNew(Receive, new object[] { e, chunkGroup }); } }); } private void Receive(object state) { var args = (object[])state; var e = (TransferEventArgs)args[0]; var chunkGroup = (ReceiveChunkModel[])args[1]; var controlClient = chunkGroup[0].Client; e.Progress = new TransferProgress(); e.Progress.Start(e.Config.FileLength); #region Breakpoint int perPairCount = PerLongSize * 2, count = perPairCount * chunkGroup.Length; byte[] bufferInfo = new byte[count]; string filePath = Path.Combine(_savePath, e.Config.Checksum + Path.GetExtension(e.Config.FileName)), pointFilePath = Path.ChangeExtension(filePath, PointExtension), tempFilePath = Path.ChangeExtension(filePath, TempExtension); FileStream pointStream; long oddSize, avgSize = Math.DivRem(e.Config.FileLength, (long)chunkGroup.Length, out oddSize); if (File.Exists(pointFilePath) && File.Exists(tempFilePath)) { pointStream = new FileStream(pointFilePath, FileMode.Open, FileAccess.ReadWrite, FileShare.None); pointStream.Read(bufferInfo, 0, count); long fValue, tValue; for (int i = 0, j = chunkGroup.Length - 1; i < chunkGroup.Length; i++) { fValue = BitConverter.ToInt64(bufferInfo, i * perPairCount); tValue = BitConverter.ToInt64(bufferInfo, i * perPairCount + PerLongSize); chunkGroup[i].Initialize(tempFilePath, e.Config.ChunkLength, i * avgSize, i == j ? avgSize + oddSize : avgSize, fValue, tValue); Runtime.LogDebug("[Multi]Local{0} breakpoint read{1}:{2}/{3}.", _listener.LocalEndPoint, i, fValue, tValue); } controlClient.Send(bufferInfo); } else { pointStream = new FileStream(pointFilePath, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None); FileStream stream = new FileStream(tempFilePath, FileMode.CreateNew, FileAccess.Write, FileShare.Write); stream.SetLength(e.Config.FileLength); stream.Flush(); stream.Dispose(); for (int i = 0, j = chunkGroup.Length - 1; i < chunkGroup.Length; i++) { chunkGroup[i].Initialize(tempFilePath, e.Config.ChunkLength, i * avgSize, i == j ? avgSize + oddSize : avgSize); } controlClient.Send(bufferInfo, 0, 4, SocketFlags.None); } var timer = new Timer(arg => { long fValue, tValue; for (int i = 0; i < chunkGroup.Length; i++) { chunkGroup[i].ReportProgress(out fValue, out tValue); Buffer.BlockCopy(BitConverter.GetBytes(fValue), 0, bufferInfo, i * perPairCount, 8); Buffer.BlockCopy(BitConverter.GetBytes(tValue), 0, bufferInfo, i * perPairCount + PerLongSize, 8); Runtime.LogDebug("[Multi]Local{0} breakpoint write{1}:{2}/{3}.", _listener.LocalEndPoint, i, fValue, tValue); } pointStream.Position = 0L; pointStream.Write(bufferInfo, 0, count); pointStream.Flush(); }, null, TimeSpan.Zero, TimeSpan.FromSeconds(4)); #endregion Parallel.ForEach(chunkGroup, chunk => chunk.Run()); long bytesTransferred = 0L; do { chunkGroup.ReportSpeed(e.Progress, ref bytesTransferred); this.OnProgressChanged(e); Thread.Sleep(1000); } while (!chunkGroup.IsAllCompleted()); chunkGroup.ReportSpeed(e.Progress, ref bytesTransferred); this.OnProgressChanged(e); timer.Dispose(); pointStream.Dispose(); File.Delete(pointFilePath); File.Move(tempFilePath, filePath); e.Progress.Stop(); this.OnCompleted(e); } #endregion #region Send public void Send(TransferConfig config, IPEndPoint remoteIpe) { Contract.Requires(config != null && remoteIpe != null); var controlChunk = new SendChunkModel(remoteIpe); controlChunk.Client.Send(config); var e = new TransferEventArgs(config); this.OnPrepare(e); if (e.Cancel || !controlChunk.Client.Connected) { controlChunk.Client.Close(); return; } var chunkGroup = new SendChunkModel[config.ThreadCount]; chunkGroup[0] = controlChunk; for (int i = 1; i < chunkGroup.Length; i++) { chunkGroup[i] = new SendChunkModel(remoteIpe); } e.Progress = new TransferProgress(); e.Progress.Start(config.FileLength); #region Breakpoint int perPairCount = PerLongSize * 2, count = perPairCount * chunkGroup.Length; byte[] bufferInfo = new byte[count]; long oddSize, avgSize = Math.DivRem(config.FileLength, (long)chunkGroup.Length, out oddSize); if (controlChunk.Client.Receive(bufferInfo) == 4) { for (int i = 0, j = chunkGroup.Length - 1; i < chunkGroup.Length; i++) { chunkGroup[i].Initialize(config.FilePath, config.ChunkLength, i * avgSize, i == j ? avgSize + oddSize : avgSize); } } else { long fValue, tValue; for (int i = 0, j = chunkGroup.Length - 1; i < chunkGroup.Length; i++) { fValue = BitConverter.ToInt64(bufferInfo, i * perPairCount); tValue = BitConverter.ToInt64(bufferInfo, i * perPairCount + PerLongSize); chunkGroup[i].Initialize(config.FilePath, config.ChunkLength, i * avgSize, i == j ? avgSize + oddSize : avgSize, fValue, tValue); Runtime.LogDebug("[Multi]Remote{0} breakpoint{1}:{2}/{3}.", remoteIpe, i, fValue, tValue); } } Thread.Sleep(200); #endregion Parallel.ForEach(chunkGroup, chunk => chunk.Run()); long bytesTransferred = 0L; do { chunkGroup.ReportSpeed(e.Progress, ref bytesTransferred); this.OnProgressChanged(e); Thread.Sleep(1000); } while (!chunkGroup.IsAllCompleted()); chunkGroup.ReportSpeed(e.Progress, ref bytesTransferred); this.OnProgressChanged(e); e.Progress.Stop(); this.OnCompleted(e); } #endregion } }
另外推荐一个网络小工具:
内测安装地址:http://publish.xineworld.com/cloudagent/publish.htm
上篇文章:http://www.cnblogs.com/Googler/archive/2013/01/11/2856219.html
相关文章推荐
- C# socket 多线程多管道可断点传送大文件(附单线程单管道传送)
- C# HttpWebRequest可断点上传,下载文件;SOCKET多线程多管道可断点传送大文件
- Socket 发送文件示例 C# 多线程[转]
- 多线程Socket传送文件的客户端和服务端源代码
- 多线程Socket传送文件的客户端和服务端源代码
- c# socket传送大文件分享代码
- C# Socket实现断点上传文件
- c# socket传送大文件
- C#基于socket的TCP的文件传送
- C# websocket与html js实现文件发送与接收处理
- Android中Socket大文件断点上传
- C#网络编程(二)------多线程socket实例
- C#网络编程(二)------多线程socket实例
- Android中Socket大文件断点上传
- C# 实现的多线程异步Socket数据包接收qi框架
- C# 实现的多线程异步Socket数据包接收器框架(1)
- 用java编写多线程ftp断点下载文件程序
- Android多线程实现文件断点下载
- C# 实现的多线程异步Socket数据包接收器框架
- C#学习——简单socket、多线程