您的位置:首页 > 编程语言 > Java开发

java threadPool 线程池简单分析

2014-12-27 23:47 411 查看
java 1.5 concurrent 工具包中提供了五类线程池的创建:

ExecutorService executor=Executors.newCachedThreadPool();
		ExecutorService cacheExecutor=Executors.newCachedThreadPool(new TestThreadFactory());
		
		ExecutorService fixExecutor=Executors.newFixedThreadPool(10);
		ExecutorService fixedExecutor=Executors.newFixedThreadPool(10, new TestThreadFactory());
		
		ExecutorService sigExecutor=Executors.newSingleThreadExecutor();
		ExecutorService singleExecutor=Executors.newSingleThreadExecutor(new TestThreadFactory());
		
		ScheduledExecutorService schExecutor=Executors.newScheduledThreadPool(10);
		ScheduledExecutorService scheduledExecutor=Executors.newScheduledThreadPool(10,new TestThreadFactory());
		
		ScheduledExecutorService ssExecutor=Executors.newSingleThreadScheduledExecutor();
		ScheduledExecutorService sigSchExcutor=Executors.newSingleThreadScheduledExecutor(new TestThreadFactory());


底层的实现原理基本一样: new线程池的时候生成一个任务队列(blockQueue<Runnable>),第一次执行execute()或者submit()方法时会创建一个循环的线程,用于反复读取队列中的任务并执行之(ps:第一次提交的任务是不用进入任务队列,由刚创建的线程直接执行 ),后续的 execute()或者submit()操作则直接提交Runnable任务到队列里.队列为空时,循环线程就会被blockQueue的take()方法阻塞住.

SingleThreadExecutor其实是FixedThreadPool的一个特例,SingleThreadExecutor指定对于同一个队列只有一个线程去循环读取队列任务并执行, FiexedThreadPool则可以为同一队列指定多个线程去循环读取队列任务并执行.

newFixedThreadPool(10)会产生10个线程去读取同一个任务队列,但这10个线程不是同时产生,而是提交一个任务(即执行一次execute()或者submit()方法)产生一个,当提交的任务数量超过10个,第11个任务直接提交到blockQueue<Runnable>队列里,然后由这10个线程中的某个线程去获取并执行该任务.FixedThreadPool产生的10个线程以后也不会被回收成9个,更不可能增加到11个.

CacheThreadPool不指定具体数量的线程去读取并只执行任务队列中的任务,但是它有个最大线程数(Integer.MAX_VALUE=2的32次-1), 当 任务队列饱和无法插入新任务时,会自动生成一个新的线程去执行新插入的任务,并参与读取饱和的任务队列并执行.如果高峰期生成了10个线程,低谷期只需要一个线程来执行,其余的9个线程在存活一段时间后就会被终止.存活时间默认是一分钟.这一点要和FixedThreadPool区分.

ScheduledThreadPool线程池线程数量也需要预先指定,它的主要特点是按计划延时读取并执行队列任务

无论何种线程,当任务队列增加任务的速度大于队列读取执行的速度时,就可能产生任务丢失的情况,丢失的概率由低到高依次是

CacheThreadPool > newFixedThreadPool > SingleThreadExecutor,这个很好理解.这种情况下,程序默认都会向外抛出RejectedExecutionException异常

new 线程池的时候另一个构造参数 ThreadFactory,主要用途就是对提交的任务做个简单的封装.

附上几个核心的代码片段

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }


private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }


private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        if (t != null) {
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }


/**
         * Runs a single task between before/after methods.
         */
        private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                /*
                 * Ensure that unless pool is stopping, this thread
                 * does not have its interrupt set. This requires a
                 * double-check of state in case the interrupt was
                 * cleared concurrently with a shutdownNow -- if so,
                 * the interrupt is re-enabled.
                 */
                if (runState < STOP &&
                    Thread.interrupted() &&
                    runState >= STOP)
                    thread.interrupt();
                /*
                 * Track execution state to ensure that afterExecute
                 * is called only if task completed or threw
                 * exception. Otherwise, the caught runtime exception
                 * will have been thrown by afterExecute itself, in
                 * which case we don't want to call it again.
                 */
                boolean ran = false;
                beforeExecute(thread, task);
                try {
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }

        /**
         * Main run loop
         */
        public void run() {
            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: