您的位置:首页 > 编程语言 > Java开发

经典Java线程池的代码及各部分功能简介

2012-08-09 20:40 423 查看
(1)根据xml文件来管理线程池的最大最小线程数

  (2)对线程池通过Timer定期扫描以防止线程未激活;

  (3)通过某一个变量(本程序中是freeThreadCount)来得到空闲线程的数目;

  

  一、配置xml(listen.xml)是:

  <?xml version="1.0" encoding="UTF-8"?>

  <config>

  <ConsumeThreadPool>

  <minPools>10</minPools>   <!--线程池最小线程-->

  <maxPools>100</maxPools>    <!--线程池最大线程-->

  <checkThreadPeriod>5</checkThreadPeriod> <!--检查线程池中线程的周期5分钟-->

  </ConsumeThreadPool>

  </config>

  

  二、对于ConsumeThreadPoolPara的javabean:

  import java.io.*;

  public class ConsumeThreadPoolPara implements Serializable{

  private int minPools;

  private int maxPools;

  private int checkThreadPeriod;

  

  public int getMinPools(){

  return minPools;

  }

  public int getMaxPools(){

  return maxPools;

  }

  public int getCheckThreadPeriod(){

  return checkThreadPeriod;

  }

  public void setMinPools(int minPools){

  this.minPools = minPools;

  }

  public void setMaxPools(int maxPools){

  this.maxPools = maxPools;

  }

  public void setCheckThreadPeriod(int checkThreadPeriod){

  this.checkThreadPeriod = checkThreadPeriod;

  }

  public String toString(){

  return minPools+" " + maxPools+" "+checkThreadPeriod;

  }

  public ConsumeThreadPoolPara() {

  }

  public static void main(String[] args) {

  ConsumeThreadPoolPara consumeThreadPool1 = new ConsumeThreadPoolPara();

  }

  

  }

  

  三、解析xml程序代码(生成ConsumeThreadPoolPara):

  使用jdom解析:

  import org.jdom.*;

  import org.jdom.input.SAXBuilder;

  import java.io.*;

  import java.util.*;

  

  public class ParseConfig {

  static Hashtable Listens = null;

  static ConnPara connpara = null;

  static ConsumeThreadPoolPara consumeThreadPoolPara = null;

  private static String configxml = "listen.xml";

  

  static{

  getConsumeThreadPoolPara(); //得到消费的线程池的参数

  }

  

  /**

  * 装载文档

  * @return 返回根结点

  * @throws JDOMException

  */

  public static Element loadDocument() throws JDOMException{

  SAXBuilder parser = new SAXBuilder(); // 新建立构造器

  try {

  Document document = parser.build(configxml);

  Element root = document.getRootElement();

  return root;

  }catch(JDOMException e){

  logger.error("listen.xml文件格式非法!");

  throw new JDOMException();

  }

  }

  

  public static ConsumeThreadPoolPara getConsumeThreadPoolPara(){

  if(consumeThreadPoolPara ==null){

  try {

  Element root = loadDocument();

  Element consumeThreadPool = root.getChild("ConsumeThreadPool");

  if (consumeThreadPool != null) { //代表有数据库配置

  consumeThreadPoolPara = new ConsumeThreadPoolPara();

  Element minPools = consumeThreadPool.getChild("minPools");

  consumeThreadPoolPara.setMinPools(Integer.parseInt(minPools.getTextTrim()));

  Element maxPools = consumeThreadPool.getChild("maxPools");

  consumeThreadPoolPara.setMaxPools(Integer.parseInt(maxPools.getTextTrim()));

  Element checkThreadPeriod = consumeThreadPool.getChild("checkThreadPeriod");

  consumeThreadPoolPara.setCheckThreadPeriod(Integer.parseInt(checkThreadPeriod.getTextTrim()));

  }

  }

  catch (JDOMException e) {

  }

  }

  return consumeThreadPoolPara;

  }

  }

  

  四、线程池源代码:

  import java.util.*;

  

  /**

  * <p>Title: 线程池</p>

  * <p>Description: 采集消费模块</p>

  * <p>Copyright: Copyright (c) 2004</p>

  * <p>Company: </p>

  * @author 张荣斌

  * @version 1.0

  */

  

  public class ThreadPool {

  private static int minPools = 10; //最小连接池数目

  private static int maxPools = 100; //最大连接池数目

  private static int checkThreadPeriod = 5; //检查连接池的周期

  ArrayList m_ThreadList; //工作线程列表

  LinkedList m_RunList = null; //工作任务列表

  int totalThread = 0; //总线程数

  static int freeThreadCount = 0; //未被使用的线程数目

  private java.util.Timer timer = null; //定时器

  static Object o = new Object();

  

  static{ //先初始化线程池的参数

  ConsumeThreadPoolPara consumeThreadPoolPara = ParseConfig.getConsumeThreadPoolPara();

  if(consumeThreadPoolPara!=null){

  minPools = consumeThreadPoolPara.getMinPools();

  maxPools = consumeThreadPoolPara.getMaxPools();

  checkThreadPeriod = consumeThreadPoolPara.getCheckThreadPeriod()*60*1000;

  }

  }

  public void setMinPools(int minPools){

  this.minPools = minPools;

  }

  public void setMaxPools(int maxPools){

  this.maxPools = maxPools;

  }

  public void setCheckThreadPeriod(int checkThreadPeriod){

  this.checkThreadPeriod = checkThreadPeriod;

  }

  public ThreadPool() {

  

  m_ThreadList=new ArrayList();

  m_RunList=new LinkedList();

  for(int i=0;i<minPools;i++){

  WorkerThread temp=new WorkerThread();

  totalThread = totalThread + 1;

  m_ThreadList.add(temp);

  temp.start();

  try{

  Thread.sleep(100);

  }catch(Exception e){

  }

  }

  timer = new Timer(true); //启动定时器

  timer.schedule(new CheckThreadTask(this),0,checkThreadPeriod);

  }

  

  /**

  * 当有一个工作来的时候启动线程池的线程

  * 1.当空闲线程数为0的时候,看总线程是否小于最大线程池的数目,就new一个新的线程,否则sleep,直到有空闲线程为止;

  * 2.当空闲线程不为0,则将任务丢给空闲线程去完成

  * @param work

  */

  public synchronized void run(String work)

  {

  if (freeThreadCount == 0) {

  if(totalThread<maxPools){

  WorkerThread temp = new WorkerThread();

  totalThread = totalThread + 1;

  m_ThreadList.add(temp);

  temp.start();

  synchronized(m_RunList){

  m_RunList.add(work);

  m_RunList.notify();

  }

  }else{

  while (freeThreadCount == 0) {

  try {

  Thread.sleep(200);

  }

  catch (InterruptedException e) {

  }

  }

  synchronized(m_RunList){

  m_RunList.add(work);

  m_RunList.notify();

  }

  }

  } else {

  synchronized(m_RunList){

  m_RunList.add(work);

  m_RunList.notify();

  }

  }

  }

  

  /**

  * 检查所有的线程的有效性

  */

  public synchronized void checkAllThreads() {

  

  Iterator lThreadIterator = m_ThreadList.iterator();

  

  while (lThreadIterator.hasNext()) { //逐个遍厉

  WorkerThread lTestThread = (WorkerThread) lThreadIterator.next();

  

  if (! (lTestThread.isAlive())) { //如果处在非活动状态时

  lTestThread = new WorkerThread(); //重新生成个线程

  lTestThread.start(); //启动

  }

  }

  }

  

  /**

  * 打印调试信息

  */

  public void printDebugInfo(){

  System.out.println("totalThread="+totalThread);

  System.out.println("m_ThreadList.size()="+m_ThreadList.size());

  }

  

  /**

  *

  * <p>Title: 工作线程类</p>

  * @author 张荣斌

  * @version 1.0

  */

  class WorkerThread extends Thread{

  boolean running = true;

  String work;

  

  public void run(){

  while(running){

  synchronized(o){

  freeThreadCount++;

  }

  synchronized(m_RunList){

  while(m_RunList.size() == 0){

  try{

  m_RunList.wait();

  if(!running) return;

  }catch(InterruptedException e){

  }

  }

  synchronized(o){

  freeThreadCount--;

  }

  work = (String)m_RunList.removeLast();

  if(work==null) return;

  }

  

  // 得到了work 进行工作,这里work可以换成自己的工作类

  }

  }

  }

  

  }

  /**

  *

  * <p>Title: 定时器调动的任务</p>

  * @author 张荣斌

  * @version 1.0

  */

  class CheckThreadTask extends TimerTask{

  private static boolean isRunning = false;

  private ThreadPool pool;

  

  public CheckThreadTask(ThreadPool pool){

  this.pool = pool;

  }

  public void run() {

  if (!isRunning) {

  isRunning = true;

  pool.checkAllThreads();

  isRunning = false;

  }

  }

  }
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: