您的位置:首页 > 其它

多线程文件下载的服务器端及客户端

2017-06-24 00:33 357 查看
多线程文件下载的服务器端及客户端

本文需要引用httpClient包

httpClient下载地址

1 服务器端

简要说明:

1.1 根据接收到的文件id,读取服务器端的文件。

1.2 判断request中的head是否有“Range”,如果有,代表需要支持断点续传功能,否则不需要。

1.3 根据Range的值判断,如 (bytes=1000-)或(bytes=1000-2000)分别代表第1000字节以后的值,及1000到2000字节之间的值。

1.4 根据Range的值读取文件中相应的字节返回。

/**
* 下载文件(支持单点续传下载)
*
* @param request
* @param fileId
* @throws IOException
*/
@RequestMapping(value = "/file/downloadMultiThread/{fileId}", method = RequestMethod.GET)
public void download3(HttpServletRequest request, HttpServletResponse response, @PathVariable("fileId") int fileId)
throws IOException {

if (isTest) {
Enumeration<String> enums = request.getHeaderNames();
while (enums.hasMoreElements()) {
String names = enums.nextElement();
if (isTest) {
System.out.println(names + ":[" + request.getHeader(names) + "]");
}
}
}
CmnTmpFile cmnTmpFile = cmnTmpFileSrvc.getCmnTmpFileByKeyId(fileId);
if (cmnTmpFile == null) {
return;
}

try {
if (isTest) {
log.info("请求下载的连接地址为:[" + request.getRequestURL() + "]?[" + request.getQueryString() + "]");
}
} catch (IllegalArgumentException e) {
log.error("请求下载的文件名参数为空!");
return;
}

File downloadFile = new File(cmnTmpFile.getFilePath());
if (downloadFile.exists()) {
if (downloadFile.isFile()) {
if (downloadFile.length() > 0) {
} else {
log.info("请求下载的文件是一个空文件");
return;
}
if (!downloadFile.canRead()) {
log.info("请求下载的文件不是一个可读的文件");
return;
} else {
}
} else {
log.info("请求下载的文件是一个文件夹");
return;
}
} else {
log.info("请求下载的文件不存在!");
return;
}
// 记录文件大小
long fileLength = downloadFile.length();
// 记录已下载文件大小
long pastLength = 0;
// 0:从头开始的全文下载;
// 1:从某字节开始的下载(bytes=1000-);
// 2:从某字节开始到某字节结束的下载(bytes=1000-2000)
int rangeSwitch = 0;
// 记录客户端需要下载的字节段的最后一个字节偏移量(比如bytes=1000-2000,则这个值是为2000)
long toLength = 0;
// 客户端请求的字节总量
long contentLength = 0;
// 记录客户端传来的形如“bytes=1000-”或者“bytes=1000-2000”的内容
String rangeBytes = "";
// 负责读取数据
RandomAccessFile raf = null;
// 写出数据
OutputStream os = null;
// 缓冲
OutputStream out = null;
// 暂存容器
byte b[] = new byte[1024];

if (request.getHeader("Range") != null) {
// 客户端请求的下载的文件块的开始字节
response.setStatus(javax.servlet.http.HttpServletResponse.SC_PARTIAL_CONTENT);
if (isTest) {
log.info("request.getHeader(\"Range\")=" + request.getHeader("Range"));
}
rangeBytes = request.getHeader("Range").replaceAll("bytes=", "");
if (rangeBytes.indexOf('-') == rangeBytes.length() - 1) {
// 如:bytes=1000-
rangeSwitch = 1;
rangeBytes = rangeBytes.substring(0, rangeBytes.indexOf('-'));
pastLength = Long.parseLong(rangeBytes.trim());
// 客户端请求的是 1000之后的字节
contentLength = fileLength - pastLength;
} else {
// 如:bytes=1000-2000
rangeSwitch = 2;
String temp0 = rangeBytes.substring(0, rangeBytes.indexOf('-'));
String temp2 = rangeBytes.substring(rangeBytes.indexOf('-') + 1, rangeBytes.length());
// bytes=1000-2000,从第1000个字节开始下载
pastLength = Long.parseLong(temp0.trim());
// bytes=1000-2000,到第2000个字节结束
toLength = Long.parseLong(temp2);
// 客户端请求的是1000-2000之间的字节
contentLength = toLength - pastLength;
}
} else {
// 从开始进行下载,客户端要求全文下载
contentLength = fileLength;
}

/**
* 如果设设置了Content -Length,则客户端会自动进行多线程下载。如果不希望支持多线程,则不要设置这个参数。 响应的格式是:
* Content - Length: [文件的总大小] - [客户端请求的下载的文件块的开始字节]
* ServletActionContext.getResponse().setHeader("Content- Length", new
* Long(file.length() - p).toString());
*/
response.reset();
// 告诉客户端允许断点续传多线程连接下载,响应的格式是:Accept-Ranges: bytes
response.setHeader("Accept-Ranges", "bytes");
// 如果是第一次下,还没有断点续传,状态是默认的 200,无需显式设置;响应的格式是:HTTP/1.1 200 OK

if (pastLength != 0) {
// 不是从最开始下载,响应的格式是:
// Content-Range: bytes [文件块的开始字节]-[文件的总大小 - 1]/[文件的总大小]
if (isTest) {
log.info("---------不是从开始进行下载!服务器即将开始断点续传...");
}
String contentRange = "";
switch (rangeSwitch) {
case 1:
// 针对 bytes=1000- 的请求
contentRange = new StringBuffer("bytes ").append(new Long(pastLength).toString()).append("-")
.append(new Long(fileLength - 1).toString()).append("/").append(new Long(fileLength).toString())
.toString();
response.setHeader("Content-Range", contentRange);
break;
case 2:
// 针对 bytes=1000-2000 的请求
contentRange = rangeBytes + "/" + new Long(fileLength).toString();
response.setHeader("Content-Range", contentRange);
break;
default:
break;
}
} else {
// 是从开始下载
if (isTest) {
log.info("---------是从开始进行下载!");
}
}

try {
response.addHeader("Content-Disposition", "attachment; filename=\"" + downloadFile.getName() + "\"");
// 设置 MIME 类型.
response.setContentType(CommonUtil.setContentType(downloadFile.getName()));
response.addHeader("Content-Length", String.valueOf(contentLength));
os = response.getOutputStream();
out = new BufferedOutputStream(os);
raf = new RandomAccessFile(downloadFile, "r");

int readNum = 0;
long readLength = 0;
try {
switch (rangeSwitch) {
case 0:
// 普通下载,或者从头开始的下载,同1
case 1:
// 针对 bytes=1000- 的请求
// 形如 bytes=1000- 的客户端请求,跳过 1000 个字节
raf.seek(pastLength);
readNum = 0;
while ((readNum = raf.read(b, 0, 1024)) != -1) {
out.write(b, 0, readNum);
}
break;
case 2:
// 针对 bytes=2000-3000 的请求
// 形如 bytes=2000-3000 的客户端请求,找到第 2000 个字节
raf.seek(pastLength);
readNum = 0;
readLength = 0; // 记录已读字节数
while (readLength <= contentLength - 1024) {
// 大部分字节在这里读取
readNum = raf.read(b, 0, 1024);
readLength += 1024;
out.write(b, 0, readNum);
}
if (readLength <= contentLength) {
// 余下的不足 1024 个字节在这里读取
readNum = raf.read(b, 0, (int) (contentLength - readLength));
out.write(b, 0, readNum);
}
break;
default:
break;
}
out.flush();
if (isTest) {
log.info("-----------下载结束");
}
} catch (IOException ie) {
/**
* 在写数据的时候, 对于 ClientAbortException 之类的异常,
* 是因为客户端取消了下载,而服务器端继续向浏览器写入数据时,抛出这个异常,这个是正常的。
*/
if (isTest) {
log.info("向客户端传输时出现IO异常,但此异常是允许的,有可能是客户端取消了下载,导致此异常");
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
if (out != null) {
try {
out.close();
} catch (IOException e) {
// 远程主机或者本机强制关闭
// log.error(e.getMessage(), e);
} finally {
out = null;
}
}
if (raf != null) {
try {
raf.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
} finally {
raf = null;
}
}
}
}
/**
* 打包压缩下载文件
*
* @param response
* @param fileId
* @throws IOException
*/
@RequestMapping(value = "/file/downLoadZipFile/{fileId}", method = RequestMethod.GET)
public void downLoadZipFile(HttpServletResponse response, @PathVariable("fileId") int fileId) throws IOException {
CmnTmpFile cmnTmpFile = cmnTmpFileSrvc.getCmnTmpFileByKeyId(fileId);
if (cmnTmpFile == null) {
return;
}
File file = new File(cmnTmpFile.getFilePath());
response.setContentType("APPLICATION/OCTET-STREAM");
response.setHeader("Content-Disposition", "attachment; filename=" + cmnTmpFile.getFileName() + ".zip");
ZipOutputStream out = new ZipOutputStream(response.getOutputStream());
try {
long start = System.currentTimeMillis();
if (isTest) {
log.info("----------开始下载文件,文件长度[" + file.length() + "]");
}
// 压缩输出文件
// ZipUtils.doCompress(cmnTmpFile.getFilePath(), out);
ZipUtils.doCompress(file, out);
response.flushBuffer();
if (isTest) {
System.out.println("耗时:[" + (System.currentTimeMillis() - start) + "]ms");
log.info("----------压缩下载文件完成");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
out.close();
}
}

/**
* 下载文件(常规单线程下载)
*
* @param request
* @param fileId
* @throws IOException
*/
@RequestMapping(value = "/file/download/{fileId}", method = RequestMethod.GET)
public void download(HttpServletRequest request, HttpServletResponse response, @PathVariable("fileId") int fileId)
throws IOException {
// 测试Header
if (isTest) {
Enumeration<String> enums = request.getHeaderNames();
while (enums.hasMoreElements()) {
String names = enums.nextElement();
if (isTest) {
System.out.println(names + ":[" + request.getHeader(names) + "]");
}
}
}
CmnTmpFile cmnTmpFile = cmnTmpFileSrvc.getCmnTmpFileByKeyId(fileId);
if (cmnTmpFile == null) {
return;
}
File file = new File(cmnTmpFile.getFilePath());
// HttpHeaders headers = new HttpHeaders();
// headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
// headers.setContentDispositionFormData("attachment",
// cmnTmpFile.getFileName());
try {
long start = System.currentTimeMillis();
if (isTest) {
log.info("----------开始下载文件,文件长度[" + file.length() + "]");
}
if (file.exists()) {
String filename = file.getName();
InputStream fis = new BufferedInputStream(new FileInputStream(file));
response.reset();
response.setContentType("application/x-download");
response.addHeader("Content-Disposition",
"attachment;filename=" + new String(filename.getBytes(), "iso-8859-1"));
response.addHeader("Content-Length", "" + file.length());
OutputStream toClient = new BufferedOutputStream(response.getOutputStream());
response.setContentType("application/octet-stream");
byte[] buffer = new byte[1024 * 1024 * 4];
int i = -1;
while ((i = fis.read(buffer)) != -1) {
toClient.write(buffer, 0, i);
}
fis.close();
toClient.flush();
toClient.close();
try {
response.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (isTest) {
System.out.println("耗时:[" + (System.currentTimeMillis() - start) + "]ms");
log.info("----------下载文件完成");
}
} catch (IOException ex) {
ex.printStackTrace();
}
}


压缩应用类代码如下:

package cn.xaele.utils.zip;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

/**
* 文件下载工具类
*/
public class ZipUtils {

public static void doCompress(String srcFile, String zipFile) throws Exception {
doCompress(new File(srcFile), new File(zipFile));
}

/**
* 文件压缩
*
* @param srcFile 目录或者单个文件
* @param destFile 压缩后的ZIP文件
*/
public static void doCompress(File srcFile, File destFile) throws Exception {
ZipOutputStream out = new ZipOutputStream(new FileOutputStream(destFile));
if (srcFile.isDirectory()) {
File[] files = srcFile.listFiles();
for (File file : files) {
doCompress(file, out);
}
} else {
doCompress(srcFile, out);
}
}

public static void doCompress(String pathname, ZipOutputStream out) throws IOException {
doCompress(new File(pathname), out);
}

public static void doCompress(File file, ZipOutputStream out) throws IOException {
if (file.exists()) {
byte[] buffer = new byte[1024];
FileInputStream fis = new FileInputStream(file);
out.putNextEntry(new ZipEntry(file.getName()));
int len = 0;
// 读取文件的内容,打包到zip文件
while ((len = fis.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
out.flush();
out.closeEntry();
fis.close();
}
}
}


2 客户端

简要说明:

2.1 客户端要下载一个文件, 先请求服务器,然后服务器将文件传送给客户端,最后客户端保存到本地, 完成下载过程。

2.2 多线程下载是在客户端开启多个线程同时下载,每个线程只负责下载文件其中的一部分,在客户端进行文件组装,当所有线程下载完的时候,文件下载完毕。 (注意:并非线程越多下载越快, 与网络环境有很大的关系;在同等的网络环境下,多线程下载速度要高于单线程;多线程下载占用资源比单线程多,相当于用资源换速度)。

2.3 下载前,首先要获取要下载文件的大小,然后用RandomAccessFile 类在客户端磁盘创建一个同样大小的文件,下载后的数据写入这个文件。

2.4 在每个线程分配的下载任务中包含待下载文件取值的开始和结束位置,然后启动下载线程下载数据。

2.5 判断有没有保存上次下载的临时文件,如果有,判断是否需要删除或重新启动下载线程;

2.6 在启动线程下载的时候保存下载文件的位置信息,下载完毕后删除当前线程产生的临时文件。

package cn.xaele.utils.file;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* 多线程下载文件,经测试4G以上的文件也可正常下载
*/
public class DownLoadLargeFile {
private static final Logger log = LogManager.getLogger(DownLoadLargeFile.class);
// 测试标记
private static boolean isTest = true;

private CloseableHttpClient httpClient;

public static void main(String[] args) {
long starttimes = System.currentTimeMillis();
DownLoadLargeFile app = new DownLoadLargeFile();

PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
// 设置整个连接池最大连接数 20
cm.setMaxTotal(20);
app.httpClient = HttpClients.custom().setConnectionManager(cm).build();
try {
app.doDownload("http://mirrors.hust.edu.cn/apache//httpcomponents/httpclient/binary/httpcomponents-client-4.5.3-bin.zip", "d:/doctohtml/");
// app.doDownload("d:/doctohtml/httpcomponents-client-4.5.3-bin.zip.cfg");
app.httpClient.close();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println((System.currentTimeMillis() - starttimes) + "ms");
cm.shutdown();
cm.close();
cm = null;
}

/**
* 启动多个线程下载文件
*
* @param remoteUrl
* @param localPath
* @throws IOException
*/
public void doDownload(String remoteUrl, String localPath) throws IOException {
FileCfg fileCfg = new FileCfg(localPath, remoteUrl);
if (fileCfg.getDnldStatus() == 0) {
download(fileCfg);
} else {
System.out.println("解析错误,无法下载");
}
}

/**
* 读取配置文件并按照其内容启动多个线程下载文件未下载完毕的部分
*
* @param cfgFile
* @throws IOException
*/
public void doDownload(String cfgFile) throws IOException {
FileCfg fileCfg = new FileCfg(cfgFile);
if (fileCfg.getDnldStatus() == 0) {
download(fileCfg);
} else {
System.out.println("解析错误,无法下载");
}
}

/**
* 根据配置文件下载文件
*
* @param fileCfg
* @throws IOException
*/
public void download(FileCfg fileCfg) throws IOException {
if (fileCfg.getDnldStatus() == 0) {
if (fileCfg.getFilePartList() != null && fileCfg.getFilePartList().size() > 0) {
CountDownLatch end = new CountDownLatch(fileCfg.getFilePartList().size());
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
boolean hasDownloadPart = false;
for (FilePart filePart : fileCfg.getFilePartList()) {
if (!filePart.isFinish()) {
hasDownloadPart = true;
// 仅下载未完成的文件片段
DownloadThread downloadThread = new DownloadThread(filePart, end, httpClient);
cachedThreadPool.execute(downloadThread);
}
}
if (hasDownloadPart) {
try {
end.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 下载完成后清除临时文件信息
fileCfg.clearFile();
log.debug("下载完成!{} ", fileCfg.getDnldTmpFile());
} else {
System.out.println("没有需要下载的内容");
}
} else {
System.out.println("解析错误,无法下载");
}
}

public static void callback(FilePart filePart) {
if (isTest) {
System.out.println(">>>子线程执行之后的值是:" + filePart.toString());
}
// 保存线程执行结果
File newFile = new File(filePart.getPartCfgfileName());

try {
// byte,char 1字节
// short 2字节
// int 4字节
// long 8字节
// Boolean 1字节
RandomAccessFile raf = new RandomAccessFile(newFile, "rws");
raf.seek(filePart.getThreadId() * 21 + 20);
raf.writeBoolean(filePart.isFinish());
raf.close();
} catch (IOException e) {
e.printStackTrace();
}
}

// 待下载文件任务信息
public class FileCfg {

// 待下载的文件链接
private String url = null;
// 待下载的文件链接
private String fileName = null;
// 待下载的文件长度
private long fileSize = 0l;
// 每个线程下载的字节数
private int unitSize = 1024000;
// 下载状态
private short dnldStatus = -1;
// 下载路径
private String dnldPath = "d:/download";

private List<FilePart> filePartList = null;

public FileCfg(String cfgFile) {
try {
// 读取配置文件
DataInputStream is = new DataInputStream(new BufferedInputStream(new FileInputStream(cfgFile)));
this.url = is.readUTF();
this.fileName = is.readUTF();
this.dnldPath = is.readUTF();
this.fileSize = is.readLong();
this.unitSize = is.readInt();
this.dnldStatus = is.readShort();

// 下载片段数
int partSize = is.readInt();
is.close();
if (isTest) {
System.out.println("FileCfg--->" + toString());
}

boolean reDownload = false;
File downloadFile = new File(getDnldTmpFile());
if (!downloadFile.exists() || !downloadFile.isFile()) {
// 重新下载文件
RandomAccessFile raf;
try {
raf = new RandomAccessFile(getDnldTmpFile(), "rw");
raf.setLength(fileSize);
raf.close();
} catch (IOException e) {
e.printStackTrace();
}
reDownload = true;
}

// 读取文件下载片段信息
filePartList = new ArrayList<>(partSize);
is = new DataInputStream(new BufferedInputStream(new FileInputStream(cfgFile + ".part")));
for (int i = 0; i < partSize; i++) {
FilePart part = new FilePart(getUrl(), getDnldTmpFile(), getDnldPartFile(), is.readInt(),
is.readLong(), is.readLong());
boolean finish = is.readBoolean();
if (!reDownload) {
part.setFinish(finish);
}
if (isTest) {
System.out.println(i + "--->" + part.toString());
}
filePartList.add(part);
}
is.close();
} catch (IOException e) {

}
}

public FileCfg(String dnldPath, String url) throws IOException {
this.url = url;
this.dnldPath = dnldPath;

HttpURLConnection httpConnection = (HttpURLConnection) new URL(url).openConnection();
httpConnection.setRequestMethod("HEAD");
int responseCode = httpConnection.getResponseCode();
if (responseCode >= 400) {
System.out.println("Web服务器响应错误!");
return;
}

String lengthField = httpConnection.getHeaderField("Content-Length");
if (lengthField != null && isLong(lengthField)) {
System.out.println("文件大小:[" + lengthField + "]");
this.fileSize = Long.parseLong(lengthField);
}

String nameField = httpConnection.getHeaderField("Content-Disposition");
if (nameField != null) {
String mark = "filename=\"";
if (isTest) {
System.out.println("字符串:[" + nameField + "]");
}
int idx = nameField.indexOf(mark);
this.fileName = nameField.substring(idx + mark.length(), nameField.length() - 1);
// 如果没有解析到文件名称,则从url中获取
if (this.fileName == null || this.fileName.length() < 1) {
this.fileName = url.substring(url.lastIndexOf("/") + 1, url.length()).replace("%20", " ");
}
if (isTest) {
System.out.println("文件名称:[" + fileName + "]");
}
}

if (isTest) {
// 读取所有的Head信息
httpConnection.getContentLength();
httpConnection.getContentLengthLong();
httpConnection.getHeaderFields();
for (Iterator<String> iter = httpConnection.getHeaderFields().keySet().iterator(); iter.hasNext();) {
String key = iter.next();
System.out.println("[" + key + "][" + httpConnection.getHeaderField(key) + "]");
}
}

calFileInfo();
}

public FileCfg(String dnldPath, String url, String fileName, long fileSize) {
this.url = url;
this.fileName = fileName;
this.fileSize = fileSize;
this.dnldPath = dnldPath;
calFileInfo();
}

/**
* 判断指定的字符串是否可转为Long型数据
*
* @param str
* @return
*/
public boolean isLong(String str) {
if (str == null || str.trim().length() < 1) {
return false;
}
Pattern pattern = Pattern.compile("[0-9]*");
return pattern.matcher(str).matches();
}

public String getUrl() {
return url;
}

public void setUrl(String url) {
this.url = url;
}

public String getFileName() {
return fileName;
}

public void setFileName(String fileName) {
this.fileName = fileName;
}

public long getFileSize() {
return fileSize;
}

public void setFileSize(long fileSize) {
this.fileSize = fileSize;
}

public int getUnitSize() {
return unitSize;
}

public void setUnitSize(int unitSize) {
this.unitSize = unitSize;
}

public short getDnldStatus() {
return dnldStatus;
}

public void setDnldStatus(short dnldStatus) {
this.dnldStatus = dnldStatus;
}

public String getDnldPath() {
return dnldPath;
}

public void setDnldPath(String dnldPath) {
this.dnldPath = dnldPath;
}

public List<FilePart> getFilePartList() {
return filePartList;
}

public void setFilePartList(List<FilePart> filePartList) {
this.filePartList = filePartList;
}

public String getDnldCfgFile() {
return dnldPath + "/" + fileName + ".cfg";
}

public String getDnldPartFile() {
return dnldPath + "/" + fileName + ".cfg.part";
}

public String getDnldTmpFile() {
return dnldPath + "/" + fileName + ".tmp";
}

public String getDnldFile() {
return dnldPath + "/" + fileName;
}

public void calFileInfo() {
// 计算文件片段数量
if (fileSize < 1) {
// 没有需要下载的文件
dnldStatus = -2;
return;
}
long filePartSize = (fileSize - 1) / unitSize + 1;
if (filePartSize > Integer.MAX_VALUE) {
// 文件过大,不能下载
dnldStatus = -10;
return;
}

// 构建文件片段列表
filePartList = new ArrayList<>((int) filePartSize);
for (int i = 0; i < filePartSize; i++) {
long offset = i * unitSize;
long length = unitSize;
// 读取数量超过文件大小
if ((offset + length) > this.fileSize) {
length = this.fileSize - offset;
}
FilePart part = new FilePart(getUrl(), getDnldTmpFile(), getDnldPartFile(), i, offset, length);
if (isTest) {
System.out.println(i + "--->" + part.toString());
}
filePartList.add(part);
}
dnldStatus = 0;

// 构建完成,保存信息到文档
writeFile();

// 检查下载文件是否存在
File newFile = new File(fileName);
if (!newFile.exists() || !newFile.isFile()) {
// 文件不存在,则重新创建,如存在,则保持原状,用于断点续传
RandomAccessFile raf;
try {
raf = new RandomAccessFile(newFile, "rw");
raf.setLength(fileSize);
raf.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 将下载信息保存到文件中,以便断点续传
*/
public void writeFile() {
// 文件下载信息
String dnldFile = getDnldCfgFile();
// 文件片段信息
String dnldPartFile = getDnldPartFile();

try {
// 保存文件下载信息到文件
DataOutputStream os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(dnldFile)));
os.writeUTF(url);
os.writeUTF(fileName);
os.writeUTF(dnldPath);
os.writeLong(fileSize);
os.writeInt(unitSize);
os.writeShort(dnldStatus);
os.writeInt(this.filePartList.size());
os.flush();
os.close();

// 保存文件片段信息到文件
os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(dnldPartFile)));
for (int i = 0, p = filePartList.size(); i < p; i++) {
FilePart part = filePartList.get(i);
os.writeInt(part.getThreadId());
os.writeLong(part.getOffset());
os.writeLong(part.getLength());
os.writeBoolean(part.isFinish());
os.flush();
}
os.close();

// 生成文件,并指定大小(与待下载的文件相同)
File saveFile = new File(getDnldTmpFile());
RandomAccessFile raf = new RandomAccessFile(saveFile, "rw");
raf.setLength(fileSize);
raf.close();
} catch (IOException e) {
e.printStackTrace();
}
}

public void clearFile() {
// 文件下载信息
String dnldFile = getDnldCfgFile();
// 文件片段信息
String dnldPartFile = getDnldPartFile();

File file = new File(dnldFile);
// 如果文件路径所对应的文件存在,并且是一个文件,则直接删除
if (file.exists() && file.isFile()) {
if (file.delete()) {
System.out.println("删除文件" + dnldFile + "成功!");
} else {
System.out.println("删除文件" + dnldFile + "失败!");
}
} else {
System.out.println("删除文件失败:" + dnldFile + "不存在!");
}

file = new File(dnldPartFile);
// 如果文件路径所对应的文件存在,并且是一个文件,则直接删除
if (file.exists() && file.isFile()) {
if (file.delete()) {
System.out.println("删除文件" + dnldPartFile + "成功!");
} else {
System.out.println("删除文件" + dnldPartFile + "失败!");
}
} else {
System.out.println("删除文件失败:" + dnldPartFile + "不存在!");
}

// 下载完成后的临时文件名改为正式名称
File oldFile = new File(getDnldTmpFile());
File newFile = new File(getDnldFile());
if (oldFile.exists()) {
// 重命名文件存在
if (!newFile.exists()) {
oldFile.renameTo(newFile);
} else {
// 若在该目录下已经有一个文件和新文件名相同,则不允许重命名
System.out.println(newFile + "已经存在!");
}
}
}

public String toString() {
return "isTest[" + isTest + "]url[" + url + "]fileName[" + fileName + "]fileSize[" + fileSize + "]unitSize["
+ unitSize + "]dnldStatus[" + dnldStatus + "]dnldPath[" + dnldPath + "]filePartList["
+ ((filePartList != null) ? filePartList.size() : 0) + "]";
}
}

/**
* 文件片段信息
*/
public class FilePart {

// 待下载的文件
private String url = null;
// 本地文件名
private String fileName = null;
// 本地文件名
private String partCfgfileName = null;
// 当前第几个线程
private int threadId = 0;
// 偏移量
private long offset = 0;
// 分配给本线程的下载字节数
private long length = 0;
// 监听本线程下载是否完成
private boolean finish = false;

public FilePart(String url, String fileName, String partCfgfileName, int threadId, long offset, long length) {
this.url = url;
this.fileName = fileName;
this.partCfgfileName = partCfgfileName;
this.threadId = threadId;
this.offset = offset;
this.length = length;
}

public String getUrl() {
return url;
}

public void setUrl(String url) {
this.url = url;
}

public String getFileName() {
return fileName;
}

public void setFileName(String fileName) {
this.fileName = fileName;
}

public String getPartCfgfileName() {
return partCfgfileName;
}

public void setPartCfgfileName(String partCfgfileName) {
this.partCfgfileName = partCfgfileName;
}

public int getThreadId() {
return threadId;
}

public void setThreadId(int threadId) {
this.threadId = threadId;
}

public long getOffset() {
return offset;
}

public void setOffset(long offset) {
this.offset = offset;
}

public long getLength() {
return length;
}

public void setLength(long length) {
this.length = length;
}

public boolean isFinish() {
return finish;
}

public void setFinish(boolean finish) {
this.finish = finish;
}

public String toString() {
return "threadId[" + threadId + "]offset[" + offset + "]length[" + length + "]finish[" + finish + "]";
}

}

/**
* 文件下载线程
*/
public class DownloadThread extends Thread {

// // 待下载的文件
// private String url = null;
// // 本地文件名
// private String fileName = null;
// 待下载文件片段
private FilePart filePart = null;
// 通知服务器文件的取值范围
private String rangeStr = "";
// 同步工具类,允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行
private CountDownLatch end;
// http客户端
private CloseableHttpClient httpClient;
// 上下文
private HttpContext context;

/**
* @param url
* @param file
* @param part
* @param end
* @param hc
*/
public DownloadThread(FilePart part, CountDownLatch end, CloseableHttpClient hc) {
this.filePart = part;
this.end = end;
this.httpClient = hc;
this.context = new BasicHttpContext();
this.rangeStr = "bytes=" + filePart.getOffset() + "-" + (filePart.getOffset() + filePart.getLength());
if (isTest) {
System.out.println("rangeStr[" + rangeStr + "]");
System.out.println("偏移量=" + filePart.getOffset() + ";字节数=" + filePart.getLength());
}
}

public void run() {
try {
HttpGet httpGet = new HttpGet(filePart.getUrl());
httpGet.addHeader("Range", rangeStr);
CloseableHttpResponse response = httpClient.execute(httpGet, context);
BufferedInputStream bis = new BufferedInputStream(response.getEntity().getContent());

byte[] buff = new byte[1024];
int bytesRead;
File newFile = new File(filePart.getFileName());
// Rws模式就是同步模式,每write修改一个byte,立马写到磁盘。当然性能差点儿,适合小的文件,debug模式,或者需要安全性高的时候。
// Rwd模式跟个rws基础的一样,不过只对“文件的内容”同步更新到磁盘,而不对metadata同步更新。
// 默认情形下(rw模式下),是使用buffer的,只有cache满的或者使用RandomAccessFile.close()关闭流的时候儿才真正的写到文件。
// 这个会有两个问题:
// 1.调试麻烦的--->使用write方法修改byte的时候儿,只修改到个内存内,还没到个文件,不能使用notepad++工具立即看见修改效果。
// 2.当系统halt的时候儿,不能写到文件,安全性稍微差点儿。
RandomAccessFile raf = new RandomAccessFile(newFile, "rws");

long offset = filePart.getOffset();
while ((bytesRead = bis.read(buff, 0, buff.length)) != -1) {
raf.seek(offset);
raf.write(buff, 0, bytesRead);
offset += bytesRead;
}
raf.close();
bis.close();
// 下载线程执行完毕
filePart.setFinish(true);
// 调用回调函数告诉主进程该线程已执行完毕
DownLoadLargeFile.callback(filePart);
} catch (ClientProtocolException e) {
log.error("文件下载异常信息:{}", ExceptionUtils.getStackTrace(e));
} catch (IOException e) {
log.error("文件下载异常信息:{}", ExceptionUtils.getStackTrace(e));
} finally {
end.countDown();
if (isTest) {
log.info("剩余线程[" + end.getCount() + "]继续执行!");
}
}
}
}

}


多线程下载客户端代码下载
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  多线程
相关文章推荐