您的位置:首页 > 编程语言 > Java开发

多线程 + java nio 实现按行读取并处理超大文件

2018-03-18 19:09 429 查看
之前通过 java nio 实现了按行读写文件的方法,但由于是单线程读写,所以当读取数据量比较大的文件时性能肯定会是个大问题,所以为了解决读取处理大文件时性能上的问题,就在原来按行读取文件的基础上增加了多线程的实现,并使用线程来进行线程调度,思路和其它相关博客都一样,只是在代码实现的方式上有所区别,整体思路就是根据初始化线程数量来将文件进行分段读取,一个线程读取一个片段的内容,然后将读取到的行数据交由事先注册的处理接口来进行处理。具体代码如下:

1.文件读取工具类:文件读取的具体实现类,提供读取文件的入口

/**

* 多线程按行读取文件工具类

* @author zyh

*

*/

public class FileReader {

private int threadNum = 3;//线程数,默认为3

private String filePath;//文件路径

private int bufSize = 1024;//缓冲区大小,默认为1024

private DataProcessHandler dataProcessHandler;//数据处理接口

private ExecutorService threadPool;

public FileReader(String filePath,int bufSize,int threadNum){
this.threadNum = threadNum;
this.bufSize = bufSize;
this.filePath = filePath;
this.threadPool = Executors.newFixedThreadPool(threadNum);
}

/**
* 启动多线程读取文件
*/
public void startRead(){
FileChannel infile = null;
try {
RandomAccessFile raf = new RandomAccessFile(filePath,"r");
infile = raf.getChannel();
long size = infile.size();
long subSize = size/threadNum;
for(int i = 0; i < threadNum; i++){
long startIndex = i*subSize;
if(size%threadNum > 0 && i == threadNum - 1){
subSize += size%threadNum;
}
RandomAccessFile accessFile = new RandomAccessFile(filePath,"r");
FileChannel inch = accessFile.getChannel();
threadPool.execute(new MultiThreadReader(inch,startIndex,subSize));
}
threadPool.shutdown();
} catch (FileNotFoundException e1) {
e1.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally{
try {
if(infile != null){
infile.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 注册数据处理接口
* @param dataHandler
*/
public void registerHanlder(DataProcessHandler dataHandler){
this.dataProcessHandler = dataHandler;
}

/**
* 多线程按行读取文件具体实现类
* @author zyh
*
*/
public class MultiThreadReader implements Runnable{
private FileChannel channel;
private long startIndex;
private long rSize;

public MultiThreadReader(FileChannel channel,long startIndex,long rSize){
this.channel = channel;
this.startIndex = startIndex > 0?startIndex - 1:startIndex;
this.rSize = rSize;
}

public void run(){
readByLine();
}

/**
* 按行读取文件实现逻辑
* @return
*/
public void readByLine(){
try {
ByteBuffer rbuf = ByteBuffer.allocate(bufSize);
channel.position(startIndex);//设置读取文件的起始位置
long endIndex = startIndex + rSize;//读取文件数据的结束位置
byte[] temp = new byte[0];//用来缓存上次读取剩下的部分
int LF = "\n".getBytes()[0];//换行符
boolean isEnd = false;//用于判断数据是否读取完
boolean isWholeLine = false;//用于判断第一行读取到的是否是完整的一行
long lineCount = 0;//行数统计
long endLineIndex = startIndex;//当前处理字节所在位置
while(channel.read(rbuf) != -1 && !isEnd){
int position = rbuf.position();
byte[] rbyte = new byte[position];
rbuf.flip();
rbuf.get(rbyte);
int startnum = 0;//每行的起始位置下标,相对于当前所读取到的byte数组
//判断是否有换行符
//如果读取到最后一行不是完整的一行时,则继续往后读取直至读取到完整的一行才结束
for(int i = 0; i < rbyte.length; i++){
endLineIndex++;
if(rbyte[i] == LF){//若存在换行符
if(channel.position() == startIndex){//若改数据片段第一个字节为换行符,说明第一行读取到的是完整的一行
isWholeLine = true;
startnum = i + 1;
}else{
byte[] line = new byte[temp.length + i - startnum + 1];
System.arraycopy(temp, 0, line, 0, temp.length);
System.arraycopy(rbyte, startnum, line, temp.length, i - startnum + 1);
startnum = i + 1;
lineCount++;
temp = new byte[0];
//处理数据
if(startIndex != 0){//如果不是第一个数据段
if(lineCount == 1){
if(isWholeLine){//当且仅当第一行为完整行时才处理
dataProcessHandler.process(line);
}
}else{
dataProcessHandler.process(line);
}
}else{
dataProcessHandler.process(line);
}
//结束读取的判断
if(endLineIndex >= endIndex){
isEnd = true;
break;
}
}
}
}
if(!isEnd && startnum < rbyte.length){//说明rbyte最后还剩不完整的一行
byte[] temp2 = new byte[temp.length + rbyte.length - startnum];
System.arraycopy(temp, 0, temp2, 0, temp.length);
System.arraycopy(rbyte, startnum, temp2, temp.length, rbyte.length - startnum);
temp = temp2;
}
rbuf.clear();
}
//兼容最后一行没有换行的情况
if(temp.length > 0){
if(dataProcessHandler != null){
dataProcessHandler.process(temp);
}
}
} catch (IOException e) {
e.printStackTrace();
}finally{
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

public int getThreadNum() {
return threadNum;
}

public String getFilePath() {
return filePath;
}

public ExecutorService getThreadPool() {
return threadPool;
}
public int getBufSize() {
return bufSize;
}


}

2.数据处理接口:定义处理行数据的接口

/**

* 数据处理接口

* @author zyh

*

*/

public interface DataProcessHandler {

void process(byte[] data);

}

3.数据处理的具体实现类:

public class FileLineDataHandler implements DataProcessHandler {

private String encode = “GBK”;

@Override

public void process(byte[] data) {

try {

System.out.println(new String(data,encode).toString());

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

}

}

public class MultiThreadReadByLine {

public static void main(String[] args){

FileReader fileReader = new FileReader(“D:\javaniotemp\test1.txt”,100,3);

4000
fileReader.registerHanlder(new FileLineDataHandler());

fileReader.startRead();

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: