使用java.util.concurrent包多线程处理数据
2014-06-26 19:52
155 查看
先说说需求:
从某一个网站抓取数据(数据量非常大,每一次请求得到的结果不同),然后把抓取到的放在多线程多列里面,然后在把队列里面的数据放在mysql数据库里面。
数据来源的入口:
package com.test.monitor;
import java.io.IOException;
import java.net.MalformedURLException;
import org.json.JSONArray;
import org.json.JSONObject;
import org.jsoup.Jsoup;
import utils.PropertiesUtils;
import com.test.DBUtil.ThreadPoolManager;
import com.test.commons.util.HelperUtil;
/**
* <p>
* Company: test
* </p>
*
* @author tester
* @date 2013年6月13日
* @version 1.0
*/
public class GetLogFromURL {
ThreadPoolManager tpm = ThreadPoolManager.newInstance();
static String result = "";
/**
* 启动监听线程,获取url上的数据
*/
public void threadStart() {
Thread t = new Thread(new Runnable() {
public void run() {
while (true) {
try {
String url = setNgxURL();
String json = null;
try {
json = Jsoup.connect(url).timeout(500)
.ignoreContentType(true).execute().body();
if (json != null && "[]".equals(json)) {// 如果内容取完,暂停一会
return;
}
result = json;
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}// 休眠100毫秒
tpm.addLogMsg(json);//将数据放在队列里面,后面会有线程动态监听队列,如果有数据做处理,没有数据就暂停
// 处理异常
} catch (MalformedURLException e) {
System.err.println(e);
} catch (IOException e) {
System.err.println(e);
}
}
}
});
t.start();
}
public String setNgxURL(){
String url = "http://monitor.51zhangdan.com/monitor/manage/proxy/ngx.do";
return url;
}
public static void main(String[] args) {
new GetLogFromURL().threadStart();
}
}
package com.test.DBUtil;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.test.monitor.InsertDataIntoDB;
import com.test.monitor.ParserJSON;
/**
* <p>Company:***</p>
* @author ***
* @date 2013年6月13日
* @version 1.0
*/
public class ThreadPoolManager {
private static ThreadPoolManager tpm = new ThreadPoolManager();
// 线程池维护线程的最少数量
private final static int CORE_POOL_SIZE = 4;
// 线程池维护线程的最大数量
private final static int MAX_POOL_SIZE = 50;
// 线程池维护线程所允许的空闲时间
private final static int KEEP_ALIVE_TIME = 0;
// 线程池所使用的缓冲队列大小
private final static int WORK_QUEUE_SIZE = 50;
// 原始日志消息缓冲队列
Queue<String> msgQueue = new LinkedList<String>();
// 访问原始日志消息缓存的调度线程
final Runnable dataBufferThread = new Runnable() {
public void run() {
// 查看是否有待定请求,如果有,则创建一个新的WriteDataThread,并添加到线程池中
if (hasMoreAcquire()) {
String msg = (String) msgQueue.poll();
Runnable task = new ParserJSON(msg); //此处有一个线程监听,
threadPool.execute(task);
}
}
};
final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// System.out.println(((ParserJSON) r).getMsg() + "消息放入队列中重新等待执行");
msgQueue.offer(((ParserJSON) r).getMsg());
}
};
// 重要日志消息缓冲队列
Queue<List<Object>> importantMsgQueue = new LinkedList<List<Object>>();
// 访问原始日志消息缓存的调度线程
final Runnable importantDataBufferThread = new Runnable() {
public void run() {
// 查看是否有待定请求,如果有,则创建一个新的InsertDataIntoDBThread,并添加到线程池中
if (hasMoreImportantAcquire()) {
List<Object> msg = (List<Object>) importantMsgQueue.poll();
Runnable task = new InsertDataIntoDB(msg);
importantThreadPool.execute(task);
}
}
};
final RejectedExecutionHandler importantHandler = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// System.out.println(((InsertDataIntoDBThread) r).getMsg() + "消息放入队列中重新等待执行");
importantMsgQueue.offer(((InsertDataIntoDB) r).getMsg());
}
};
// 管理原始日志数据访问的线程池
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(WORK_QUEUE_SIZE), this.handler);
// 管理重要日志数据访问的线程池
final ThreadPoolExecutor importantThreadPool = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(WORK_QUEUE_SIZE), this.importantHandler);
// 原始日志信息调度线程池
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
final ScheduledFuture<?> taskHandler = scheduler.scheduleAtFixedRate(
dataBufferThread, 0, 1, TimeUnit.MILLISECONDS);
// 重要日志信息调度线程池
final ScheduledExecutorService importantScheduler = Executors.newScheduledThreadPool(10);
final ScheduledFuture<?> importantTaskHander = importantScheduler.scheduleAtFixedRate(
importantDataBufferThread, 0, 1, TimeUnit.MILLISECONDS);
public static ThreadPoolManager newInstance() {
return tpm;
}
private ThreadPoolManager() {
}
private boolean hasMoreAcquire() {
return !msgQueue.isEmpty();
}
private boolean hasMoreImportantAcquire() {
return !importantMsgQueue.isEmpty();
}
/**
* 处理原始日志信息
* @param msg
*/
public void addLogMsg(String msg) {
Runnable task = new ParserJSON(msg);
threadPool.execute(task);//只要有数据,线程池中的线程就会执行
}
/**
* 原始日志被解析后的部分重要日志信息
* @param importantLogMsg
*/
public void addPartLogMsg(List<Object> importantLogMsg){
Runnable task = new InsertDataIntoDB(importantLogMsg);
importantThreadPool.execute(task);
}
}
从某一个网站抓取数据(数据量非常大,每一次请求得到的结果不同),然后把抓取到的放在多线程多列里面,然后在把队列里面的数据放在mysql数据库里面。
数据来源的入口:
package com.test.monitor;
import java.io.IOException;
import java.net.MalformedURLException;
import org.json.JSONArray;
import org.json.JSONObject;
import org.jsoup.Jsoup;
import utils.PropertiesUtils;
import com.test.DBUtil.ThreadPoolManager;
import com.test.commons.util.HelperUtil;
/**
* <p>
* Company: test
* </p>
*
* @author tester
* @date 2013年6月13日
* @version 1.0
*/
public class GetLogFromURL {
ThreadPoolManager tpm = ThreadPoolManager.newInstance();
static String result = "";
/**
* 启动监听线程,获取url上的数据
*/
public void threadStart() {
Thread t = new Thread(new Runnable() {
public void run() {
while (true) {
try {
String url = setNgxURL();
String json = null;
try {
json = Jsoup.connect(url).timeout(500)
.ignoreContentType(true).execute().body();
if (json != null && "[]".equals(json)) {// 如果内容取完,暂停一会
return;
}
result = json;
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}// 休眠100毫秒
tpm.addLogMsg(json);//将数据放在队列里面,后面会有线程动态监听队列,如果有数据做处理,没有数据就暂停
// 处理异常
} catch (MalformedURLException e) {
System.err.println(e);
} catch (IOException e) {
System.err.println(e);
}
}
}
});
t.start();
}
public String setNgxURL(){
String url = "http://monitor.51zhangdan.com/monitor/manage/proxy/ngx.do";
return url;
}
public static void main(String[] args) {
new GetLogFromURL().threadStart();
}
}
package com.test.DBUtil;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.test.monitor.InsertDataIntoDB;
import com.test.monitor.ParserJSON;
/**
* <p>Company:***</p>
* @author ***
* @date 2013年6月13日
* @version 1.0
*/
public class ThreadPoolManager {
private static ThreadPoolManager tpm = new ThreadPoolManager();
// 线程池维护线程的最少数量
private final static int CORE_POOL_SIZE = 4;
// 线程池维护线程的最大数量
private final static int MAX_POOL_SIZE = 50;
// 线程池维护线程所允许的空闲时间
private final static int KEEP_ALIVE_TIME = 0;
// 线程池所使用的缓冲队列大小
private final static int WORK_QUEUE_SIZE = 50;
// 原始日志消息缓冲队列
Queue<String> msgQueue = new LinkedList<String>();
// 访问原始日志消息缓存的调度线程
final Runnable dataBufferThread = new Runnable() {
public void run() {
// 查看是否有待定请求,如果有,则创建一个新的WriteDataThread,并添加到线程池中
if (hasMoreAcquire()) {
String msg = (String) msgQueue.poll();
Runnable task = new ParserJSON(msg); //此处有一个线程监听,
threadPool.execute(task);
}
}
};
final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// System.out.println(((ParserJSON) r).getMsg() + "消息放入队列中重新等待执行");
msgQueue.offer(((ParserJSON) r).getMsg());
}
};
// 重要日志消息缓冲队列
Queue<List<Object>> importantMsgQueue = new LinkedList<List<Object>>();
// 访问原始日志消息缓存的调度线程
final Runnable importantDataBufferThread = new Runnable() {
public void run() {
// 查看是否有待定请求,如果有,则创建一个新的InsertDataIntoDBThread,并添加到线程池中
if (hasMoreImportantAcquire()) {
List<Object> msg = (List<Object>) importantMsgQueue.poll();
Runnable task = new InsertDataIntoDB(msg);
importantThreadPool.execute(task);
}
}
};
final RejectedExecutionHandler importantHandler = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// System.out.println(((InsertDataIntoDBThread) r).getMsg() + "消息放入队列中重新等待执行");
importantMsgQueue.offer(((InsertDataIntoDB) r).getMsg());
}
};
// 管理原始日志数据访问的线程池
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(WORK_QUEUE_SIZE), this.handler);
// 管理重要日志数据访问的线程池
final ThreadPoolExecutor importantThreadPool = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(WORK_QUEUE_SIZE), this.importantHandler);
// 原始日志信息调度线程池
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
final ScheduledFuture<?> taskHandler = scheduler.scheduleAtFixedRate(
dataBufferThread, 0, 1, TimeUnit.MILLISECONDS);
// 重要日志信息调度线程池
final ScheduledExecutorService importantScheduler = Executors.newScheduledThreadPool(10);
final ScheduledFuture<?> importantTaskHander = importantScheduler.scheduleAtFixedRate(
importantDataBufferThread, 0, 1, TimeUnit.MILLISECONDS);
public static ThreadPoolManager newInstance() {
return tpm;
}
private ThreadPoolManager() {
}
private boolean hasMoreAcquire() {
return !msgQueue.isEmpty();
}
private boolean hasMoreImportantAcquire() {
return !importantMsgQueue.isEmpty();
}
/**
* 处理原始日志信息
* @param msg
*/
public void addLogMsg(String msg) {
Runnable task = new ParserJSON(msg);
threadPool.execute(task);//只要有数据,线程池中的线程就会执行
}
/**
* 原始日志被解析后的部分重要日志信息
* @param importantLogMsg
*/
public void addPartLogMsg(List<Object> importantLogMsg){
Runnable task = new InsertDataIntoDB(importantLogMsg);
importantThreadPool.execute(task);
}
}
相关文章推荐
- 使用OWC时处理有间断的数据的方法
- 使用Hibernate处理数据
- SQL SERVER2000教程-第五章 处理数据 第七节 使用COMPUTE和COMPUTE BY对数据进行汇总
- SQL SERVER2000教程-第五章 处理数据 第十九节 使用DECLARE 语句进行数据统计
- SQL SERVER2000教程-第五章 处理数据 第十七节 使用CASE函数格式进行条件查询
- 使用XMLBeans处理XML数据和文档入门 作者:肖菁 (dev2dev ID: powerise) 湖南省长沙铁道学院科创计算机系统集成有限公司软件中心软件工程师
- 使用Hibernate处理数据
- 跟我一起学Windows Workflow Foundation(3)-----使用If/Else活动,定制活动处理工作流,使用事件传递数据
- AE中使用Geoprocessor 的clip处理输出矢量数据
- SQL SERVER2000教程-第五章 处理数据 第二十一节 使用CASE语句、SUM函数、AVG函数进行综合数据统计
- 使用Hibernate处理数据
- 使用XMLBeans处理XML数据和文档入门
- 使用数据2分处理的通用分页存储过程 前半部分与后半部分数据访问时间相同
- 使用ADO.NET 和C# 处理BLOB 数据
- 使用Hibernate处理数据(Hibernate Your Data)(英语原文)
- 使用用VB处理MYSQL数据库中二进制数据问题
- SQL SERVER2000教程-第五章 处理数据 第十六节 使用CHARINDEX函数代替Like进行数据查询
- 跟我一起学Windows Workflow Foundation(3)-----使用If/Else活动,定制活动处理工作流,使用事件传递数据
- SQL SERVER2000教程-第五章 处理数据 第十五节 使用TRUNCATE TABLE快速删除表中的所有数据
- SQL SERVER2000教程-第五章 处理数据 第二十四节 使用table数据类型变量获得临时表