您的位置:首页 > 编程语言 > Java开发

java封装的几个线程池介绍

2017-06-09 11:09 218 查看

java封装的几个线程池介绍

FixedThreadPool

FixedThreadPool并不是一个类,它是由
Executors
工具类创建出来的一个固定线程数的一个ThreadPoolEexcutor的对象,有2种实现方式。

Executors.newFixedThreadPool(3);//固定3个线程数

Executors.newFixedThreadPool(3, Executors.defaultThreadFactory());


我们来看下
newFixedThreadPool()
的内部实现吧

public static ExecutorService newFixedThreadPool(int nThreads) {
//这里可以看出,返回的就是一个TreadPoolExecutor对象,不过这个对象的核心线程数和最大线程数相等
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
//相比上面的方法,就是多了threadFactory参数
threadFactory);
}


newFixedThreadPool()
分析


这里就用一个参数的方法来分析。

corePoolSize
maxPoolSize
都是参数
nThreads
:线程池中都是核心线程且是
nThraeds
个时,没有非核心线程。

非工作线程的存活时间是0:当核心线程执行完队列中的任务时,如果队列中没有任务,这时核心线程死亡。极大程度的节约了系统资源。

使用的队列默认最大值是Integer.MAX_VALUE:该队列长度是int最大值,相当于无限大了,这样不管你有多少任务,都能够存储在队列里面,等待核心线程去执行。

线程池中执行任务大致分为4个阶段

1.任务数小于核心线程数,如果在执行新任务,则创建核心线程去执行任务。

2.当核心线程都在执行任务时,未执行的任务且小于队列长度,这是任务会存储于队列中。

3.当核心线程都在执行任务,且未执行任务数大于队列长度,这是非核心线程会执行新的任务。

4.当核心线程都在执行任务,且未执行任务数大于队列长度,且非核心线程也在执行任务,这是会采用饱和策略。

newFixedPoolExecutor()
创建出来的线程池,不会有第3阶段。

FixedPoolExecutor的使用场景

FixedThreadPool适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。

SingleThreadPool

SingleThreadPool是只有一个线程的线程池,内部实现和FixedThreadPool一样,不过就是线程数有区别。

SingleThreadPool的调用方法

Executors.newSingleThreadExecutor();

Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());


SingleThreadPool的内部实现

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}


我们看到
new FinalizableDelegatedExecutorService()
对象包裹了一个
ThreadPoolExecutor
对象,这个
ThreadPoolExecutor
对象其实就是一个线程的
FixedThreadPool
,而
FinalizableDelegatedExecutorService
类就是对
ThreadPoolExecutor
在封装了一层。

所以我们也可以简单的理解为
SingleThreadPool
是只有一个线程的
FixedThreadPool


CachedThreadPool

CachedThreadPool的调用方法

Executors.newCachedThreadPool();

Executors.newCachedThreadPool(Executors.defaultThreadFactory());


CachedThreadPool的内部实现

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}


newCachedThreadPool()
中的ThreadPoolExecutor对象参数分析

corePoolSize
核心线程数为0

maxPoolSize
最大线程数为Integer.MAX_VALUE。

keepAliveTime
:非工作线程存活时间60秒

workQueue
:队列使用的
SynchronousQueue
(SynchronousQueue是一个没有容量的队列)。

通过参数我们可以知道,没有核心线程数,那么就不会走线程池的第一个阶段,SynchronousQueue队列是一个没有容量的阻塞队列。每个插入操作必须等待另一 个线程的对应移除操作。

下面看下
CachedThreadPool
execute()
方法的执行示意图:



1.
execute()
首先执行
SynchronousQueue.offer(Runnable r)
。如果的当前
maxPool
中有空闲线程,会执行
SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
,那么主线程(调用
execute()
的线程)执行offer操作与空闲线程执行的
poll
操作配对成功。主线程把任务交给空闲线程执行,
execute()
方法执行完成。

2.当
maxPool
为空,或者
maxPool
中当前没有空闲线程时,将没有线程执行
SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
。这种情况下,步骤1失败。此时
CachedThreadPool
会创建一个新线程执行任务,
execute()
执行完成。

3.在步骤2中,新创建的线程将任务执行完后,会执行
SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
。这个
poll
操作会让空闲线程最多在
SynchronousQueue
中等待60秒,如果60秒钟内主线程提交了一个新任务(主线程执行步骤1),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的
CachedThreadPool
不会使用任何资源。

CachedThreadPool
是大小无界的线程池,适用于执行很的短期异步任务的小程序,或者是负载较轻的服务器。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类。

ScheduledThreadPoolEexcutor的调用有4中形式

Executors.newScheduledThreadPool(3);

Executors.newScheduledThreadPool(3, Executors.defaultThreadFactory());

new ScheduledThreadPoolExecutor(3);

new ScheduledThreadPoolExecutor(3, Executors.defaultThreadFactory());


其实
Executors.newScheduledThreadPool(3)
方法的内部实现也是直接创建一个对象,如:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}


那我们接下来看下
ScheduledThreadPoolExecutor()
的构造方法实现

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}


从构造方法中可以知道,其实
ScheduledThreadPoolExecutor
调用的是父类中的构造方法,也就是
ThreadPoolExecutor
中的构造。那么我们来说说传入的参数:

corePoolSize:核心线程数,传入

默认最大线程数为Integer.MAX_VALUE。

保活时间默认10毫秒,
DEFAULT_KEEPALIVE_MILLIS
默认10L,单位毫秒。

使用的队列是
DelayedWorkQueue
DelayedWorkQueue
是一个无界限的队列,当添加的任务大于队列时,会从新增加队列的长度。同时
DelayedWorkQueue
还有着一套按超时时间升序排序的算法,遵循”左结点比右节点小(下次执行的时间更短)的原则。具体细节在下一章解释。

通过构造我们可以知道,
ScheduledThreadPoolExecutor
ThreadPoolExecutor
并没太多的区别,最大的区别就是队列的不同。因为
DelayedWorkQueue
的长度具有增长型和按超时时间升序排序算法,使得
ScheduledThreadPoolExecutor
具有延时执行任务的特性。

因为
DelayedWorkQueue
是属于无界队列,那么最大线程数设置Integer.MAX_VALUE就没有使用到。

ScheduledThreadPoolExecutor
的执行方法


因为
ScheduledThreadPoolExecutor
继承自
ThreadPoolExecutor
,所以会有
execute()
submit()
,这里不介绍这2个方法。

ScheduledThreadPoolExecutor
除了继承
ThreadPoolExecutor
,还实现了
ScheduledExecutorService
接口

ScheduledExecutorService
接口方法

//创建任务并在给定延迟时间后执行
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

//同上,只不过参数是Callable
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

/**
* 参数initialDelay:表示第一次延时的时间
* 参数period:表示任务执行开始后距下一个任务开始执行的时间间隔
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

/**
* 参数initialDelay:表示第一次延时的时间
* 参数period:表示任务执行结束后距下一个任务开始执行的时间间隔
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);


schedule()
方法实现代码:


public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();

//decorateTask()的第2个参数,就是返回值,也就是即将延迟执行的任务对象
RunnableScheduledFuture<Void> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit),sequencer.getAndIncrement()));

delayedExecute(t);//延迟执行这个任务
return t;
}

//decorateTask()实现代码
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}


ScheduledThreadPoolExecutor
的执行方法非常类似Timer定时器。

ScheduledThreadPoolExecutor
的简单使用


schedule()
的使用

public class Test {
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(3);

System.out.println("任务执行前时间:" + System.currentTimeMillis());//任务执行前时间:1496847975977

scheduledThreadPoolExecutor.schedule(new Runnable() {
@Override
public void run() {
System.out.println("任务执行时间:" + System.currentTimeMillis());//任务执行时间:1496847976993
}
}, 1000, TimeUnit.MILLISECONDS);
}
}


scheduleAtFixedRate()
的使用

public class Test {
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(3);

System.out.println("任务执行前时间:" + System.currentTimeMillis());

scheduledThreadPoolExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("任务执行时间:" + System.currentTimeMillis());
try {
Thread.sleep(1000);//这个是为了区别和下一个方法
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 1000, 5000, TimeUnit.MILLISECONDS);
}
}

控制台打印结果:

任务执行前时间:1496848166272
任务执行时间:1496848167285     //第一次执行,任务延迟1秒执行
任务执行时间:1496848172281     //每隔5秒后执行任务
任务执行时间:1496848177280
任务执行时间:1496848182282
任务执行时间:1496848187282


scheduleWithFixedDelay()
的使用

public class Test {
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(3);

System.out.println("任务执行前时间:" + System.currentTimeMillis());//任务执行前时间:1496847975977

scheduledThreadPoolExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println("任务执行时间:" + System.currentTimeMillis());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 1000, 5000, TimeUnit.MILLISECONDS);
}
}

打印结果:

任务执行前时间:1496848595162
任务执行时间:1496848596181     //第一次执行延时任务,延迟1秒
任务执行时间:1496848602188     //第2次执行任务,延迟大约6秒(5秒是延迟时间,1秒是任务执行的消耗时间)
任务执行时间:1496848608204
任务执行时间:1496848614211
任务执行时间:1496848620231
任务执行时间:1496848626256


ScheduledThreadPoolExecutor
停止方法


shutdown()


shutdownNow()


这两个方法在将线程池
ThreadPoolExecutor
的时候介绍过。

SingleThreadScheduledExecutor

SingleThreadScheduledExecutor
ScheduledThreadPoolExecutor
一样具有延时执行任务的特点。
SingleThreadScheduledExecutor
的内部实现就是
ScheduledThreadPoolExecutor
,只不过只有一个核心线程(虽然有最大线程数,但是队列的无边界的,所以除核心线程外,其他线程不会执行任务)。

SingleThreadScheduledExecutor
的调用

Executors.newSingleThreadScheduledExecutor();

Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory());


SingleThreadScheduledExecutor
的内部实现

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: