您的位置:首页 > 编程语言 > Java开发

【Java多线程】写入同一文件,自定义线程池与线程回收利用2

2014-01-24 13:26 435 查看
原始版地址:http://my.oschina.net/u/1017195/blog/195376

起初为了方便快捷,只为实现功能,写了很多垃圾的代码. 造成性能不高,可读性,可维护性不强。

朋友们提了很多意见,我都吸取了经验,于是将代码又改动了一下。

经过测试,运行效率显著提升:

任务完成时间:30508 ms

任务完成时间:30735 ms

任务完成时间:31167 ms

package test.com.linapex.room;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.OutputStreamWriter;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

import com.linapex.common.util.ZhengzeValidate;

public class TBuilderRoomSqlFileTool2
{
final static int BSIZE = 1024 * 1024;
final static int DATACACHENUM = 10000;

static int currThreadCount = 0;
static int maxThreadCount = 9;

static File roomFilterLogFile = new File("roomFilter.log");
static File sqlFile = new File("roomSql.sql");
static File csvFile = new File("D:\\baiduyundownload\\如家汉庭等酒店2000W开房数据\\2000W\\1-200W.csv");

final static String sqlStrTemplate = "INSERT INTO `t_room_record`(id ,name, card, gender, birthday, address, zip, mobile, email, version) VALUES (null,':0', ':1', ':2', ':3', ':4', ':5', ':6', ':7',':8');";

public static BufferedWriter initSQLWrite(File sqlFile) throws Exception
{
if (!sqlFile.exists())
{
if (!sqlFile.createNewFile())
{
System.err.println("创建文件失败,已存在:" + sqlFile.getAbsolutePath());
}
}

return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(sqlFile, true), "UTF-8"));
}

public static void loadCSV(CallBack3<Void> callBack) throws Exception
{
//		BufferedReader reader = null;
//		try
//		{
//			reader = new BufferedReader(new FileReader(csvFile));
//			String str = null;
//
//			int num = 0;
//			while ((str = reader.readLine()) != null)
//			{
//				num++;
//				callBack.call(num, str);
//			}
//		} finally
//		{
//			reader.close();
//		}

FileChannel inChannel = null;
try
{
String enterStr = "\n";

inChannel = new FileInputStream(csvFile).getChannel();

ByteBuffer buffer = ByteBuffer.allocate(BSIZE);

StringBuilder newlinesBui = new StringBuilder();
int num = 0;
while (inChannel.read(buffer) != -1)
{
buffer.flip();

//数据组合.
String content = new String(buffer.array());
newlinesBui.append(content).toString();

int fromIndex = 0;
int endIndex = -1;
//循环找到 \n
while ((endIndex = newlinesBui.indexOf(enterStr, fromIndex)) > -1)
{
//得到一行
String line = newlinesBui.substring(fromIndex, endIndex);

num++;
callBack.call(num, line);

fromIndex = endIndex + 1;
}

newlinesBui.delete(0, fromIndex);
buffer.clear();
}

} finally
{
if (inChannel != null)
{
inChannel.close();
}
}

}

public static void main(String[] args) throws Exception
{
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreadCount);

final List<Future<String>> threadResultList = new ArrayList<Future<String>>();

final BufferedWriter bw = initSQLWrite(sqlFile); //主要的buffer对象.

final WriteSqlHandle2 writeSqlFile = new WriteSqlHandle2(DATACACHENUM);

StopWatch2 stopWatch = new StopWatch2();
stopWatch.start();

loadCSV(new CallBack3<Void>()
{

@Override
public Void call(int num, String str)
{
String[] strs = str.split(",");

if (strs.length < 8)
{
writeLog("此条数据不录入::0", Arrays.toString(strs));
return null;
}

String name = strs[0].trim();
if (!ZhengzeValidate.isChina(name))
{
writeLog("此条数据不录入::0", Arrays.toString(strs));
return null;
}

try
{
String card = strs[4];
String gender = strs[5];
String birthday = strs[6];
String address = strs[7];
String zip = strs[8];
String mobile = strs[20];
String email = strs[22];
String version = strs[31];

//生成sql语句
final String tempSql = tm(sqlStrTemplate, name, card, gender, birthday, address, zip, mobile, email, version);

//添加数据,如果超出了缓存数据,则 开始写入文件系统
if (writeSqlFile.add(tempSql))
{
currThreadCount++;

//如果提交的线程过多,则取回之后再提交.
if (currThreadCount >= maxThreadCount)
{
//							System.out.println(String.format("当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount));
for (Future<String> fs : threadResultList)
{
String tempSqlName = fs.get();

currThreadCount--;
//								System.out.println("已回调线程数:" + (maxThreadCount - currThreadCount) + "  线程返回的值:" + tempSqlName);
}

threadResultList.clear(); //清空
currThreadCount = threadResultList.size();
//							System.out.println(String.format("重新开始提交线程   当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount));
}

Future<String> future = threadPool.submit(new TaskWithResult(writeSqlFile, bw));

threadResultList.add(future);
//						System.out.println(String.format("开启了%s条线程(保存了%s条数据)", curr_thread_count, num));
}

} catch (Exception e)
{
writeLog("录入错误的数据::0", Arrays.toString(strs));
writeLog("错误的原因::0", e.getMessage());
}
return null;
}
});

writeSqlFile.flush(bw);

threadPool.shutdown();

stopWatch.stop();

System.out.println(String.format("任务完成时间:%s ms", stopWatch.getTime()));

}

public static void writeLog(String str, Object... values)
{
//FileUtils.doWriteFile(roomFilterLogFile.getAbsolutePath(), tm(str, values) + "\r\n", null, false);
}

public static String tm(String strSource, Object... values)
{
if (strSource == null)
{
return null;
}

StringBuilder builder = new StringBuilder(strSource);

final String prefix = ":";
for (int index = 0; index < values.length; index++)
{
String value = values[index].toString();
if (value == null)
{
continue;
}

String key = new StringBuilder(prefix).append(index).toString();

int i = -1;
if ((i = builder.indexOf(key, i)) > -1)
{
int len = key.length();
builder.replace(i, i + len, value);
}
}

return builder.toString();
}

}

class TaskWithResult implements Callable<String>
{
WriteSqlHandle2 handle2;

BufferedWriter bufferedWriter;

public TaskWithResult(WriteSqlHandle2 handle2, BufferedWriter bufferedWriter)
{
this.handle2 = handle2;
this.bufferedWriter = bufferedWriter;
}

@Override
public String call() throws Exception
{
String fileName = Thread.currentThread().getName();

handle2.save(bufferedWriter);

return fileName;
}

}

class WriteSqlHandle2
{
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

WriteLock writeLock = readWriteLock.writeLock();

List<String> cacheList;

int currItemCount = 0;

int dataCacheNum;

public WriteSqlHandle2()
{
cacheList = new ArrayList<String>();
}

public WriteSqlHandle2(int dataCacheNum)
{
this.dataCacheNum = dataCacheNum;
cacheList = new ArrayList<String>(dataCacheNum);
}

public boolean isCacheExpires()
{
return currItemCount >= dataCacheNum;
}

public boolean add(String sqlStr)
{
try
{
writeLock.lock();
cacheList.add(sqlStr);
currItemCount++;
return isCacheExpires();
} finally
{
writeLock.unlock();
}
}

public void save(BufferedWriter bw) throws Exception
{
try
{
writeLock.lock();

//如果数据没有超出缓存.则返回.
if (!isCacheExpires())
{
return;
}

StopWatch2 stopWatch = new StopWatch2();
stopWatch.start();

//			System.out.println(String.format("%s,准备消费   需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size()));

for (String str : cacheList)
{
bw.write(str + "\r\n");
currItemCount--;
}

stopWatch.stop();

System.out.println(String.format("%s,消费完成,耗费时间:%s ms,消费数据长度:%s", Thread.currentThread().getName(), stopWatch.getTime(), cacheList.size()));

cacheList.clear(); //清空数据.
} finally
{
writeLock.unlock();
}
}

public void flush(BufferedWriter bw) throws Exception
{
System.out.println(String.format("flush线程:%s, 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size()));

for (String str : cacheList)
{
bw.write(str + "\r\n");
}

System.out.println(String.format("flush线程:%s, 消费完成,消费数据长度:%s", Thread.currentThread().getName(), cacheList.size()));

cacheList.clear(); //清空数据

closeWrite(bw);
}

private void closeWrite(BufferedWriter bw) throws Exception
{
bw.flush();
bw.close();
}

}

class StopWatch2
{
long begin;
long end;

public void start()
{
begin = System.currentTimeMillis();
}

public void stop()
{
end = System.currentTimeMillis();
}

public long getTime()
{
return end - begin;
}
}

interface CallBack3<T>
{
T call(int num, String str);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: