您的位置:首页 > 数据库

多线程实现数据库的并发操作

2015-08-04 13:36 375 查看
  在Java中,程序需要操作数据库,操作数据首要事就是要获得数据库的Connection对象,利用多线程对数据导入数据库中将会加快操作进度,但是多个线程共享Connection对象,是不安全的,因为可以利用Java中的ThreadLocal为每个线程保存一个Connection对象,代码如下:

package com.quar.innovation.db;

import java.sql.Connection;
import java.sql.DriverManager;

public class ConnnectionManager {

private static final ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>();

private static final String BETADBURL = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&user=root&password=root";

public static Connection getConnectionFromThreadLocal() {
Connection conn = connectionHolder.get();
try {
if (conn == null || conn.isClosed()) {
Connection con = ConnnectionManager.getConnection();
connectionHolder.set(con);
System.out.println("[Thread]" + Thread.currentThread().getName());
return con;
}
return conn;
} catch (Exception e) {
System.out.println("[ThreadLocal Get Connection Error]" + e.getMessage());
}
return null;

}

public static Connection getConnection() {
Connection conn = null;
try {
Class.forName("com.mysql.jdbc.Driver");
conn = (Connection) DriverManager.getConnection(BETADBURL);
} catch (Exception e) {
System.out.println("[Get Connection Error]" + e.getMessage());
}
return conn;
}
}


  通过ThreadLocal就可以为每个线程保留一份Connection对象,利用Java的ThreadPoolExecutor启动线程池,完成数据库操作,完整代码如下:

public class QunarThreadPoolExecutor extends ThreadPoolExecutor {

// 记录每个线程执行任务开始时间
private ThreadLocal<Long> start = new ThreadLocal<Long>();

// 记录所有任务完成使用的时间
private AtomicLong totals = new AtomicLong();

// 记录线程池完成的任务数
private AtomicInteger tasks = new AtomicInteger();

public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}

public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

/**
* 每个线程在调用run方法之前调用该方法
* */
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
start.set(System.currentTimeMillis());
}

/**
* 每个线程在执行完run方法后调用该方法
* */
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
tasks.incrementAndGet();
totals.addAndGet(System.currentTimeMillis() - start.get());
}

@Override
protected void terminated() {
super.terminated();
System.out.println("完成"+ tasks.get() +"个任务,平均耗时: [" + totals.get() / tasks.get() + "] ms");
}

public class DataUpdater implements Runnable {

private PreparedStatement pst;

private List<UserProfileItem> userProfiles;

private final String SQL = "insert into userprofile (`uid` ,`profile` , `logday`) VALUES (?, ? ,?) ON DUPLICATE KEY UPDATE `profile`= ? ";

public DataUpdater(List<UserProfileItem> userProfiles) {
this.userProfiles = userProfiles;
}

public void run() {
try {
pst = ConnnectionManager.getConnectionFromThreadLocal().prepareStatement(SQL);
for (UserProfileItem userProfile : userProfiles) {
if(userProfile.getUid() != null && !userProfile.getUid().isEmpty() &&
userProfile.getProfile() != null && !userProfile.getProfile().isEmpty()) {
pst.setString(1, userProfile.getUid());
pst.setString(2, userProfile.getProfile());
pst.setInt(3, userProfile.getLogday());
pst.setString(4, userProfile.getProfile());
pst.addBatch();
}
}
pst.executeBatch();
} catch (Exception e) {
System.err.println("[SQL ERROR MESSAGE]" + e.getMessage());
} finally {
close(pst);
}

}

public void close(PreparedStatement pst) {
if (pst != null) {
try {
pst.close();
} catch (SQLException e) {
System.err.println("[Close Statement Error]" + e.getMessage());
}
}
}
}

public class UserProfileItem {

private String uid;

private String profile;

private int logday;

public UserProfileItem(String uid, String profile , int logday) {
this.logday = logday;
this.profile = profile;
this.uid = uid;
}

public String getUid() {
return uid;
}

public String getProfile() {
return profile;
}

public int getLogday() {
return logday;
}

}

public class DataUpdaterMain {

private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();

private QunarThreadPoolExecutor qunarThreadPoolExecutor = new QunarThreadPoolExecutor(5, 8, 5, TimeUnit.MINUTES, queue);

public void shutThreadPool(ThreadPoolExecutor executor) {
if (executor != null) {
executor.shutdown();
try {
if (!executor.awaitTermination(20 , TimeUnit.MINUTES)) {
executor.shutdownNow();
}
} catch (Exception e) {
System.err.println("[ThreadPool Close Error]" + e.getMessage());
}

}
}

public void close(Reader reader) {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
System.err.println("[Close Io Error]" + e.getMessage());
}
}
}

public void closeConnection(Connection conn , Statement st) {
try {
if (conn != null) {
conn.close();
}
if (st != null) {
conn.close();
}
} catch (Exception e) {
System.err.println("[Close MySQL Error]" + e.getMessage());
}
}

public boolean update(String file ,int logday) {
long start = System.currentTimeMillis();
BufferedReader br = null;
int num = 0;
try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
String line = null;
List<UserProfileItem> userProfiles = new LinkedList<UserProfileItem>();
while ((line = br.readLine()) != null) {
++num;
String []items = line.split("\t");
if (items.length == 2) {
String uid = items[0];
String profile = items[1];
userProfiles.add(new UserProfileItem(uid, profile, logday));
if (userProfiles.size() >= 100) {
qunarThreadPoolExecutor.execute(new DataUpdater(userProfiles));
userProfiles = new LinkedList<UserProfileItem>();
}
} else {
System.err.println("[Data Error]" + line);
}
}
qunarThreadPoolExecutor.execute(new DataUpdater(userProfiles));;
} catch (Exception e) {
e.printStackTrace();
System.err.println("[Read File Error]" + e.getMessage());
return false;
}  finally {
System.err.println("[Update] take time " + (System.currentTimeMillis() - start) + ".ms");
System.err.println("[Update] update item " + num);
shutThreadPool(qunarThreadPoolExecutor);;
close(br);
}
return true;
}

public static void main(String []args) {
String file = "D:\\workspaces\\promotionwordData.log";
int logday = Integer.parseInt("20150606");
DataUpdaterMain dataUpdaterMain = new DataUpdaterMain();
dataUpdaterMain.update(file, logday);
}
}


  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: