您的位置:首页 > 编程语言 > Java开发

使用java.util.concurrent包多线程处理数据

2014-06-26 19:52 155 查看
先说说需求:

从某一个网站抓取数据(数据量非常大,每一次请求得到的结果不同),然后把抓取到的放在多线程多列里面,然后在把队列里面的数据放在mysql数据库里面。

数据来源的入口:

package com.test.monitor;

import java.io.IOException;

import java.net.MalformedURLException;

import org.json.JSONArray;

import org.json.JSONObject;

import org.jsoup.Jsoup;

import utils.PropertiesUtils;

import com.test.DBUtil.ThreadPoolManager;

import com.test.commons.util.HelperUtil;

/**

* <p>

* Company: test

* </p>

*

* @author tester

* @date 2013年6月13日

* @version 1.0

*/

public class GetLogFromURL {

ThreadPoolManager tpm = ThreadPoolManager.newInstance();

static String result = "";

/**

* 启动监听线程,获取url上的数据

*/

public void threadStart() {

Thread t = new Thread(new Runnable() {

public void run() {

while (true) {

try {

String url = setNgxURL();

String json = null;

try {

json = Jsoup.connect(url).timeout(500)

.ignoreContentType(true).execute().body();

if (json != null && "[]".equals(json)) {// 如果内容取完,暂停一会

return;

}

result = json;

Thread.sleep(100);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}// 休眠100毫秒

tpm.addLogMsg(json);//将数据放在队列里面,后面会有线程动态监听队列,如果有数据做处理,没有数据就暂停

// 处理异常

} catch (MalformedURLException e) {

System.err.println(e);

} catch (IOException e) {

System.err.println(e);

}

}

}

});

t.start();

}

public String setNgxURL(){

String url = "http://monitor.51zhangdan.com/monitor/manage/proxy/ngx.do";

return url;

}

public static void main(String[] args) {

new GetLogFromURL().threadStart();

}

}

package com.test.DBUtil;

import java.util.LinkedList;

import java.util.List;

import java.util.Queue;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.Executors;

import java.util.concurrent.RejectedExecutionHandler;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.ScheduledFuture;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

import com.test.monitor.InsertDataIntoDB;

import com.test.monitor.ParserJSON;

/**

* <p>Company:***</p>

* @author ***

* @date 2013年6月13日

* @version 1.0

*/

public class ThreadPoolManager {

private static ThreadPoolManager tpm = new ThreadPoolManager();

// 线程池维护线程的最少数量

private final static int CORE_POOL_SIZE = 4;

// 线程池维护线程的最大数量

private final static int MAX_POOL_SIZE = 50;

// 线程池维护线程所允许的空闲时间

private final static int KEEP_ALIVE_TIME = 0;

// 线程池所使用的缓冲队列大小

private final static int WORK_QUEUE_SIZE = 50;

// 原始日志消息缓冲队列

Queue<String> msgQueue = new LinkedList<String>();

// 访问原始日志消息缓存的调度线程

final Runnable dataBufferThread = new Runnable() {

public void run() {

// 查看是否有待定请求,如果有,则创建一个新的WriteDataThread,并添加到线程池中

if (hasMoreAcquire()) {

String msg = (String) msgQueue.poll();

Runnable task = new ParserJSON(msg); //此处有一个线程监听,

threadPool.execute(task);

}

}

};

final RejectedExecutionHandler handler = new RejectedExecutionHandler() {

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

// System.out.println(((ParserJSON) r).getMsg() + "消息放入队列中重新等待执行");

msgQueue.offer(((ParserJSON) r).getMsg());

}

};

// 重要日志消息缓冲队列

Queue<List<Object>> importantMsgQueue = new LinkedList<List<Object>>();

// 访问原始日志消息缓存的调度线程

final Runnable importantDataBufferThread = new Runnable() {

public void run() {

// 查看是否有待定请求,如果有,则创建一个新的InsertDataIntoDBThread,并添加到线程池中

if (hasMoreImportantAcquire()) {

List<Object> msg = (List<Object>) importantMsgQueue.poll();

Runnable task = new InsertDataIntoDB(msg);

importantThreadPool.execute(task);

}

}

};

final RejectedExecutionHandler importantHandler = new RejectedExecutionHandler() {

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

// System.out.println(((InsertDataIntoDBThread) r).getMsg() + "消息放入队列中重新等待执行");

importantMsgQueue.offer(((InsertDataIntoDB) r).getMsg());

}

};

// 管理原始日志数据访问的线程池

final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(

CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,

new ArrayBlockingQueue<Runnable>(WORK_QUEUE_SIZE), this.handler);

// 管理重要日志数据访问的线程池

final ThreadPoolExecutor importantThreadPool = new ThreadPoolExecutor(

CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,

new ArrayBlockingQueue<Runnable>(WORK_QUEUE_SIZE), this.importantHandler);

// 原始日志信息调度线程池

final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);

final ScheduledFuture<?> taskHandler = scheduler.scheduleAtFixedRate(

dataBufferThread, 0, 1, TimeUnit.MILLISECONDS);

// 重要日志信息调度线程池

final ScheduledExecutorService importantScheduler = Executors.newScheduledThreadPool(10);

final ScheduledFuture<?> importantTaskHander = importantScheduler.scheduleAtFixedRate(

importantDataBufferThread, 0, 1, TimeUnit.MILLISECONDS);

public static ThreadPoolManager newInstance() {

return tpm;

}

private ThreadPoolManager() {

}

private boolean hasMoreAcquire() {

return !msgQueue.isEmpty();

}

private boolean hasMoreImportantAcquire() {

return !importantMsgQueue.isEmpty();

}

/**

* 处理原始日志信息

* @param msg

*/

public void addLogMsg(String msg) {

Runnable task = new ParserJSON(msg);

threadPool.execute(task);//只要有数据,线程池中的线程就会执行

}

/**

* 原始日志被解析后的部分重要日志信息

* @param importantLogMsg

*/

public void addPartLogMsg(List<Object> importantLogMsg){

Runnable task = new InsertDataIntoDB(importantLogMsg);

importantThreadPool.execute(task);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐