多线程 + java nio 实现按行读取并处理超大文件
2018-03-18 19:09
429 查看
之前通过 java nio 实现了按行读写文件的方法,但由于是单线程读写,所以当读取数据量比较大的文件时性能肯定会是个大问题,所以为了解决读取处理大文件时性能上的问题,就在原来按行读取文件的基础上增加了多线程的实现,并使用线程来进行线程调度,思路和其它相关博客都一样,只是在代码实现的方式上有所区别,整体思路就是根据初始化线程数量来将文件进行分段读取,一个线程读取一个片段的内容,然后将读取到的行数据交由事先注册的处理接口来进行处理。具体代码如下:
1.文件读取工具类:文件读取的具体实现类,提供读取文件的入口
/**
* 多线程按行读取文件工具类
* @author zyh
*
*/
public class FileReader {
private int threadNum = 3;//线程数,默认为3
private String filePath;//文件路径
private int bufSize = 1024;//缓冲区大小,默认为1024
private DataProcessHandler dataProcessHandler;//数据处理接口
private ExecutorService threadPool;
}
2.数据处理接口:定义处理行数据的接口
/**
* 数据处理接口
* @author zyh
*
*/
public interface DataProcessHandler {
void process(byte[] data);
}
3.数据处理的具体实现类:
public class FileLineDataHandler implements DataProcessHandler {
private String encode = “GBK”;
@Override
public void process(byte[] data) {
try {
System.out.println(new String(data,encode).toString());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
public class MultiThreadReadByLine {
public static void main(String[] args){
FileReader fileReader = new FileReader(“D:\javaniotemp\test1.txt”,100,3);
4000
fileReader.registerHanlder(new FileLineDataHandler());
fileReader.startRead();
}
}
1.文件读取工具类:文件读取的具体实现类,提供读取文件的入口
/**
* 多线程按行读取文件工具类
* @author zyh
*
*/
public class FileReader {
private int threadNum = 3;//线程数,默认为3
private String filePath;//文件路径
private int bufSize = 1024;//缓冲区大小,默认为1024
private DataProcessHandler dataProcessHandler;//数据处理接口
private ExecutorService threadPool;
public FileReader(String filePath,int bufSize,int threadNum){ this.threadNum = threadNum; this.bufSize = bufSize; this.filePath = filePath; this.threadPool = Executors.newFixedThreadPool(threadNum); } /** * 启动多线程读取文件 */ public void startRead(){ FileChannel infile = null; try { RandomAccessFile raf = new RandomAccessFile(filePath,"r"); infile = raf.getChannel(); long size = infile.size(); long subSize = size/threadNum; for(int i = 0; i < threadNum; i++){ long startIndex = i*subSize; if(size%threadNum > 0 && i == threadNum - 1){ subSize += size%threadNum; } RandomAccessFile accessFile = new RandomAccessFile(filePath,"r"); FileChannel inch = accessFile.getChannel(); threadPool.execute(new MultiThreadReader(inch,startIndex,subSize)); } threadPool.shutdown(); } catch (FileNotFoundException e1) { e1.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally{ try { if(infile != null){ infile.close(); } } catch (IOException e) { e.printStackTrace(); } } } /** * 注册数据处理接口 * @param dataHandler */ public void registerHanlder(DataProcessHandler dataHandler){ this.dataProcessHandler = dataHandler; } /** * 多线程按行读取文件具体实现类 * @author zyh * */ public class MultiThreadReader implements Runnable{ private FileChannel channel; private long startIndex; private long rSize; public MultiThreadReader(FileChannel channel,long startIndex,long rSize){ this.channel = channel; this.startIndex = startIndex > 0?startIndex - 1:startIndex; this.rSize = rSize; } public void run(){ readByLine(); } /** * 按行读取文件实现逻辑 * @return */ public void readByLine(){ try { ByteBuffer rbuf = ByteBuffer.allocate(bufSize); channel.position(startIndex);//设置读取文件的起始位置 long endIndex = startIndex + rSize;//读取文件数据的结束位置 byte[] temp = new byte[0];//用来缓存上次读取剩下的部分 int LF = "\n".getBytes()[0];//换行符 boolean isEnd = false;//用于判断数据是否读取完 boolean isWholeLine = false;//用于判断第一行读取到的是否是完整的一行 long lineCount = 0;//行数统计 long endLineIndex = startIndex;//当前处理字节所在位置 while(channel.read(rbuf) != -1 && !isEnd){ int position = rbuf.position(); byte[] rbyte = new byte[position]; rbuf.flip(); rbuf.get(rbyte); int startnum = 0;//每行的起始位置下标,相对于当前所读取到的byte数组 //判断是否有换行符 //如果读取到最后一行不是完整的一行时,则继续往后读取直至读取到完整的一行才结束 for(int i = 0; i < rbyte.length; i++){ endLineIndex++; if(rbyte[i] == LF){//若存在换行符 if(channel.position() == startIndex){//若改数据片段第一个字节为换行符,说明第一行读取到的是完整的一行 isWholeLine = true; startnum = i + 1; }else{ byte[] line = new byte[temp.length + i - startnum + 1]; System.arraycopy(temp, 0, line, 0, temp.length); System.arraycopy(rbyte, startnum, line, temp.length, i - startnum + 1); startnum = i + 1; lineCount++; temp = new byte[0]; //处理数据 if(startIndex != 0){//如果不是第一个数据段 if(lineCount == 1){ if(isWholeLine){//当且仅当第一行为完整行时才处理 dataProcessHandler.process(line); } }else{ dataProcessHandler.process(line); } }else{ dataProcessHandler.process(line); } //结束读取的判断 if(endLineIndex >= endIndex){ isEnd = true; break; } } } } if(!isEnd && startnum < rbyte.length){//说明rbyte最后还剩不完整的一行 byte[] temp2 = new byte[temp.length + rbyte.length - startnum]; System.arraycopy(temp, 0, temp2, 0, temp.length); System.arraycopy(rbyte, startnum, temp2, temp.length, rbyte.length - startnum); temp = temp2; } rbuf.clear(); } //兼容最后一行没有换行的情况 if(temp.length > 0){ if(dataProcessHandler != null){ dataProcessHandler.process(temp); } } } catch (IOException e) { e.printStackTrace(); }finally{ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } } } public int getThreadNum() { return threadNum; } public String getFilePath() { return filePath; } public ExecutorService getThreadPool() { return threadPool; } public int getBufSize() { return bufSize; }
}
2.数据处理接口:定义处理行数据的接口
/**
* 数据处理接口
* @author zyh
*
*/
public interface DataProcessHandler {
void process(byte[] data);
}
3.数据处理的具体实现类:
public class FileLineDataHandler implements DataProcessHandler {
private String encode = “GBK”;
@Override
public void process(byte[] data) {
try {
System.out.println(new String(data,encode).toString());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
public class MultiThreadReadByLine {
public static void main(String[] args){
FileReader fileReader = new FileReader(“D:\javaniotemp\test1.txt”,100,3);
4000
fileReader.registerHanlder(new FileLineDataHandler());
fileReader.startRead();
}
}
相关文章推荐
- c++输入文件处理按行读取并且将实现对一行数据分别处理
- 【数字图像处理】C++读取、旋转和保存bmp图像文件编程实现
- Python 读取键盘输入字符,多线程操作,文件操作随机处理 开发范例
- java实现以某种格式按行读取写入文件
- php实现读取超大文件的方法
- php实现读取超大文件的方法
- PHP按行读取、处理较大CSV文件的代码实例
- 【数字图像处理】C++读取、旋转和保存bmp图像文件编程实现
- Java实现按行读取大文件
- Java 实现按行读取文件并且将行中的重复数据删除
- python实现带错误处理功能的远程文件读取方法
- 怎么用c语言实现读取一个txt文件里的数据 要按行读出来
- 本文给出了一种方便实用的解决大文件的读取、存储等处理的方法,并结合相关程序代码对具体的实现过程进行了介绍
- HTML5的FileAPI实现文件的读取及超大文件的上传
- PHP快速按行读取CSV大文件的封装类分享(也适用于其它超大文本文件)
- 【Python】按行读取文件、IOError: [Errno 22] invalid mode ('a+') or filename,处理文件的换行符
- 【数字图像处理】C++读取、旋转和保存bmp图像文件编程实现(转载)
- python实现带错误处理功能的远程文件读取方法
- 按行读取文件(php、c实现)
- 实现按行读取文件,把内容按照第三种内存模型打包数据传出,把行数通过函数参数传出。