【Java多线程】写入同一文件,自定义线程池与线程回收利用2
2014-01-24 13:26
435 查看
原始版地址:http://my.oschina.net/u/1017195/blog/195376
起初为了方便快捷,只为实现功能,写了很多垃圾的代码. 造成性能不高,可读性,可维护性不强。
朋友们提了很多意见,我都吸取了经验,于是将代码又改动了一下。
经过测试,运行效率显著提升:
任务完成时间:30508 ms
任务完成时间:30735 ms
任务完成时间:31167 ms
起初为了方便快捷,只为实现功能,写了很多垃圾的代码. 造成性能不高,可读性,可维护性不强。
朋友们提了很多意见,我都吸取了经验,于是将代码又改动了一下。
经过测试,运行效率显著提升:
任务完成时间:30508 ms
任务完成时间:30735 ms
任务完成时间:31167 ms
package test.com.linapex.room; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FileReader; import java.io.OutputStreamWriter; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import com.linapex.common.util.ZhengzeValidate; public class TBuilderRoomSqlFileTool2 { final static int BSIZE = 1024 * 1024; final static int DATACACHENUM = 10000; static int currThreadCount = 0; static int maxThreadCount = 9; static File roomFilterLogFile = new File("roomFilter.log"); static File sqlFile = new File("roomSql.sql"); static File csvFile = new File("D:\\baiduyundownload\\如家汉庭等酒店2000W开房数据\\2000W\\1-200W.csv"); final static String sqlStrTemplate = "INSERT INTO `t_room_record`(id ,name, card, gender, birthday, address, zip, mobile, email, version) VALUES (null,':0', ':1', ':2', ':3', ':4', ':5', ':6', ':7',':8');"; public static BufferedWriter initSQLWrite(File sqlFile) throws Exception { if (!sqlFile.exists()) { if (!sqlFile.createNewFile()) { System.err.println("创建文件失败,已存在:" + sqlFile.getAbsolutePath()); } } return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(sqlFile, true), "UTF-8")); } public static void loadCSV(CallBack3<Void> callBack) throws Exception { // BufferedReader reader = null; // try // { // reader = new BufferedReader(new FileReader(csvFile)); // String str = null; // // int num = 0; // while ((str = reader.readLine()) != null) // { // num++; // callBack.call(num, str); // } // } finally // { // reader.close(); // } FileChannel inChannel = null; try { String enterStr = "\n"; inChannel = new FileInputStream(csvFile).getChannel(); ByteBuffer buffer = ByteBuffer.allocate(BSIZE); StringBuilder newlinesBui = new StringBuilder(); int num = 0; while (inChannel.read(buffer) != -1) { buffer.flip(); //数据组合. String content = new String(buffer.array()); newlinesBui.append(content).toString(); int fromIndex = 0; int endIndex = -1; //循环找到 \n while ((endIndex = newlinesBui.indexOf(enterStr, fromIndex)) > -1) { //得到一行 String line = newlinesBui.substring(fromIndex, endIndex); num++; callBack.call(num, line); fromIndex = endIndex + 1; } newlinesBui.delete(0, fromIndex); buffer.clear(); } } finally { if (inChannel != null) { inChannel.close(); } } } public static void main(String[] args) throws Exception { final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreadCount); final List<Future<String>> threadResultList = new ArrayList<Future<String>>(); final BufferedWriter bw = initSQLWrite(sqlFile); //主要的buffer对象. final WriteSqlHandle2 writeSqlFile = new WriteSqlHandle2(DATACACHENUM); StopWatch2 stopWatch = new StopWatch2(); stopWatch.start(); loadCSV(new CallBack3<Void>() { @Override public Void call(int num, String str) { String[] strs = str.split(","); if (strs.length < 8) { writeLog("此条数据不录入::0", Arrays.toString(strs)); return null; } String name = strs[0].trim(); if (!ZhengzeValidate.isChina(name)) { writeLog("此条数据不录入::0", Arrays.toString(strs)); return null; } try { String card = strs[4]; String gender = strs[5]; String birthday = strs[6]; String address = strs[7]; String zip = strs[8]; String mobile = strs[20]; String email = strs[22]; String version = strs[31]; //生成sql语句 final String tempSql = tm(sqlStrTemplate, name, card, gender, birthday, address, zip, mobile, email, version); //添加数据,如果超出了缓存数据,则 开始写入文件系统 if (writeSqlFile.add(tempSql)) { currThreadCount++; //如果提交的线程过多,则取回之后再提交. if (currThreadCount >= maxThreadCount) { // System.out.println(String.format("当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount)); for (Future<String> fs : threadResultList) { String tempSqlName = fs.get(); currThreadCount--; // System.out.println("已回调线程数:" + (maxThreadCount - currThreadCount) + " 线程返回的值:" + tempSqlName); } threadResultList.clear(); //清空 currThreadCount = threadResultList.size(); // System.out.println(String.format("重新开始提交线程 当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount)); } Future<String> future = threadPool.submit(new TaskWithResult(writeSqlFile, bw)); threadResultList.add(future); // System.out.println(String.format("开启了%s条线程(保存了%s条数据)", curr_thread_count, num)); } } catch (Exception e) { writeLog("录入错误的数据::0", Arrays.toString(strs)); writeLog("错误的原因::0", e.getMessage()); } return null; } }); writeSqlFile.flush(bw); threadPool.shutdown(); stopWatch.stop(); System.out.println(String.format("任务完成时间:%s ms", stopWatch.getTime())); } public static void writeLog(String str, Object... values) { //FileUtils.doWriteFile(roomFilterLogFile.getAbsolutePath(), tm(str, values) + "\r\n", null, false); } public static String tm(String strSource, Object... values) { if (strSource == null) { return null; } StringBuilder builder = new StringBuilder(strSource); final String prefix = ":"; for (int index = 0; index < values.length; index++) { String value = values[index].toString(); if (value == null) { continue; } String key = new StringBuilder(prefix).append(index).toString(); int i = -1; if ((i = builder.indexOf(key, i)) > -1) { int len = key.length(); builder.replace(i, i + len, value); } } return builder.toString(); } } class TaskWithResult implements Callable<String> { WriteSqlHandle2 handle2; BufferedWriter bufferedWriter; public TaskWithResult(WriteSqlHandle2 handle2, BufferedWriter bufferedWriter) { this.handle2 = handle2; this.bufferedWriter = bufferedWriter; } @Override public String call() throws Exception { String fileName = Thread.currentThread().getName(); handle2.save(bufferedWriter); return fileName; } } class WriteSqlHandle2 { ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); WriteLock writeLock = readWriteLock.writeLock(); List<String> cacheList; int currItemCount = 0; int dataCacheNum; public WriteSqlHandle2() { cacheList = new ArrayList<String>(); } public WriteSqlHandle2(int dataCacheNum) { this.dataCacheNum = dataCacheNum; cacheList = new ArrayList<String>(dataCacheNum); } public boolean isCacheExpires() { return currItemCount >= dataCacheNum; } public boolean add(String sqlStr) { try { writeLock.lock(); cacheList.add(sqlStr); currItemCount++; return isCacheExpires(); } finally { writeLock.unlock(); } } public void save(BufferedWriter bw) throws Exception { try { writeLock.lock(); //如果数据没有超出缓存.则返回. if (!isCacheExpires()) { return; } StopWatch2 stopWatch = new StopWatch2(); stopWatch.start(); // System.out.println(String.format("%s,准备消费 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size())); for (String str : cacheList) { bw.write(str + "\r\n"); currItemCount--; } stopWatch.stop(); System.out.println(String.format("%s,消费完成,耗费时间:%s ms,消费数据长度:%s", Thread.currentThread().getName(), stopWatch.getTime(), cacheList.size())); cacheList.clear(); //清空数据. } finally { writeLock.unlock(); } } public void flush(BufferedWriter bw) throws Exception { System.out.println(String.format("flush线程:%s, 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size())); for (String str : cacheList) { bw.write(str + "\r\n"); } System.out.println(String.format("flush线程:%s, 消费完成,消费数据长度:%s", Thread.currentThread().getName(), cacheList.size())); cacheList.clear(); //清空数据 closeWrite(bw); } private void closeWrite(BufferedWriter bw) throws Exception { bw.flush(); bw.close(); } } class StopWatch2 { long begin; long end; public void start() { begin = System.currentTimeMillis(); } public void stop() { end = System.currentTimeMillis(); } public long getTime() { return end - begin; } } interface CallBack3<T> { T call(int num, String str); }
相关文章推荐
- 【Java多线程】写入同一文件,自定义线程池与线程回收利用
- 【Java多线程】写入同一文件,自定义线程池与线程回收利用
- 用线程池实现多线程向同一个文件写入数据
- Java 多线程 自定义线程辅助
- java.util.concurrent解读,自定义线程工厂,线程池
- 黑马程序员-------------多线程中的(线程、线程组、线程池、以及Java的设计模式)概念及方法的总结
- JAVA笔记14__多线程共享数据(同步)/ 线程死锁 / 生产者与消费者应用案例 / 线程池
- java 多线程读取多个文件 和 不用线程读取多个文件
- JAVA多线程之两个线程同时写一个文件
- 【Java基础】Java多线程之线程组和线程池
- java多线程之线程组与线程池
- 多线程爬虫Java调用wget下载文件,独立线程读取输出缓冲区
- Java多线程查找指定文件夹下包含指定关键字的文件数量(未使用线程池版)
- java线程安全之Executor框架及自定义线程池(十五)
- java读取系统Properties配置文件利用线程实时监控配置文件变化
- Java 多线程 自定义线程辅助
- java多线程写入同一文件
- Java 多线程 Executor 线程池 从线程返回结果 Java编程思想读书笔记
- Java向自定义文件夹中写入文件
- Java:多线程,使用同步锁(Lock)时利用Condition类实现线程间通信