您的位置:首页 > 编程语言 > Java开发

Java线程池及其底层源码实现分析

2017-09-05 19:53 986 查看
相关类:

Executors

ExecutorService

Callable

ThreadPool

Future

接口:

Executor

Executor接口的使用:

1
public class TestExecutor implements Executor{
2
@Override
3
public void execute(Runnable command){
4
//调用execute方法常常传入runnable接口对象,开启线程
5
}
6
7
}
ExecutorService接口的使用:(继承Executor接口)



1

/**


2

*submit方法(执行runnble、callable对象的线程)


3

*实现类:各种线程池


4

*/


Callable接口 && Runnable接口
callable调用call方法
runnable调用run方法
都可以被线程调用,但callable的call方法具有返回值(泛型)

Executors类(操作Executor的工具类)

1
ExecutorService service = Executors.newFixedThreadPool(5);//创建5个线程的线程池
ThreadPool线程池类(装着线程的容器)线程池创建的固定线程,线程任务执行完后线程不会消失,处于等待任务的状态(idel)。线程任务大于线程池容量时,多出来的任务放在等待队列中(内部使用BlockingQueue实现)



1

public class TestThreadPool{


2

public static void main(String[] args){


3

ExecutorService service = Executors.newFixedThreadPool(5);//1.创建5个线程的线程池容器


4



5

for(int i=0;i<6;i++){//2.放6个任务,线程池一次只能放5个,所以第6个任务需要重复使用旧的线程


6

service.execute(() -> {


7

System.out.println(Thread.getCurrentThread().getName());//3.打印出当前线程名


8

});


9

}


10

service.shutdown();//执行完当前任务则关闭线程


11

service.shutdownNow();//无论是否执行完都关闭线程


12

}


13

}


Future接口(线程未来产生的返回值)

1
public class TestFuture{
2
public static void main(String[] args){
3
//FutureTask实现类Runnable和Future接口
4
FutureTask<Integer> task = new FutureTask<>(
5
Thread.sleep(500);//阻塞等待500毫秒
6
return 1000;
7
);
8
9
//new的方式启动线程任务
10
new Thread(task).start();
11
System.out.println(task.get());//阻塞等待500毫秒后得到返回值
12
///////////////////////////////////////////////////////////////////
13
ExecutorService service = Executors.newFixedThreadPool(5);//创建5个线程的线程池
14
Future<Integer> future = service.submit(()->{//相当于运行类callable接口的call方法,返回1
15
Thread.sleep(500);
16
return 1;
17
});
18
System.out.println(future.get());//阻塞等待500毫秒后得到返回值
19
}
20
}
WorkStealingPool偷任务线程池底层采用ForkJoinPool实现(开启的是Deamon守护线程,主线程退出则线程退出)



1

public class WorkStealingPoolTest {


2

public static void main(String[] args) throws IOException {


3

//根据CPU核数启动相应个数的线程(4核cpu---4个线程)


4

ExecutorService service = Executors.newWorkStealingPool();


5

System.out.println(Runtime.getRuntime().availableProcessors());


6



7

service.execute(new R(1000));//线程1执行任务1----1秒


8

service.execute(new R(2000));//线程2执行任务2----2秒


9

service.execute(new R(2000));//线程3执行任务3----2秒


10

service.execute(new R(2000));//线程4执行任务4----2秒


11

service.execute(new R(2000));//任务5阻塞,当线程1执行完后把任务5偷过来执行


12



13

//由于产生的是守护线程,主线程不阻塞的话,看不到输出


14

System.in.read();//将主线程阻塞 


15

}


16



17

static class R implements Runnable {


18



19

int time;


20



21

R(int t) {


22

this.time = t;


23

}


24



25

@Override


26

public void run() {


27



28

try {


29

TimeUnit.MILLISECONDS.sleep(time);


30

} catch (InterruptedException e) {


31

e.printStackTrace();


32

}


33

//打印线程名---ForkJoinPool


34

System.out.println(time  + " " + Thread.currentThread().getName());


35



36

}


37



38

}


39

}


ForkJoinPool(分支合并线程池)
思想:分治,把大任务拆分成小任务并行计算,计算完成后将结果合并
守护线程

1
public class ForkJoinPoolTest{
2
3
public static void main(String[] args) throws Exception {
4
ForkJoinPool pool = new ForkJoinPool();
5
MyTask task = new MyTask(inits, 0, inits.;ength-1);
6
ForkJoinTask<int[]> taskResult = pool.submit(task);
7
try {
8
taskResult.get();//阻塞等待所有线程结果计算完成
9
} catch (InterruptedException | ExecutionException e) {
10
e.printStackTrace(System.out);
11
}
12
}
13
14
/**
15
 * 单个排序的子任务
16
 */
17
static class MyTask extends RecursiveTask<int[]> {
18
19
private int[] source;
20
private int start;
21
private int end;
22
23
public MyTask(int[] source,int start, int end ) {
24
this.source = source;
25
this.start = start;
26
this.end = end;
27
}
28
29
30
@Override
31
protected int[] compute() {
32
//长度小于50,进行计算
33
if(source.length <= 50) {
34
long sum = 0L;
35
for(int i=start; i<end; i++) sum += nums[i];
36
return sum;
37
} 
38
//长度大于50,继续划分子任务
39
int middle = start + (end-start)/2;
40
41
AddTask subTask1 = new MyTask(source,start,middle);
42
AddTask subTask2 = new MyTask(source,middle,end);
43
subTask1.fork();//递归创建子任务线程
44
subTask2.fork();
45
46
//计算完成后将两个子任务的结果合并
47
return subTask1.join() + subTask2.join();
48
}
49
}
50
}

各种线程池的底层实现:一、基本线程池:FixedThreadPoolCachedThreadPoolScheduledThreadPoolSingleThreadPool二、底层创建线程池都是使用ThreadPoolExecutor类实现的,而放置任务、执行任务使用了生产者消费者模型(阻塞队列的方式)
关于阻塞队列解决生产者消费者问题可以参考我的另一篇文章:http://blog.csdn.net/qq_17305249/article/details/77807296 三、源码分析ThreadPoolExecutor的API:



1

ThreadPoolExecutor(int corePoolSize,//核心线程数(最小)


2

   int maximumPoolSize,//最大线程数


3

   long keepAliveTime, //线程运行时间


4

   TimeUnit unit, //时间单位


5

   BlockingQueue<Runnable> workQueue)//底层采用哪种阻塞队列来放线程任务 


各种线程池的底层实现:



1

//FixedThreadPool


2

public static ExecutorService newFixedThreadPool(int nThreads){


3

return new ThreadPoolExecutor(nThreads,//初始线程数自定义


4

 nThreads,//最大线程数自定义


5

 0L, TimeUnit.SECONDS,//一旦启动线程池,线程永远不消失


6

 new LinkedBlockingQueue<Runnable>());//链表阻塞队列


7

}


8



9

//CachedThreadPool(采用同步阻塞队列装任务,队列中有任务则启动新线程执行,没任务就阻塞)


10

public static ExecutorService newCachedThreadPool(){


11

return new ThreadPoolExecutor(0,//初始为0个线程


12

 Integer.MAX_VALUE,//可以启动无限多线程


13

 60L, TimeUnit.SECONDS,//60秒空闲则结束


14

 new SynchronousQueue<Runnable>());//同步阻塞队列,有任务马上开新线程执行(容量用于为0)


15

}


16



17

//SingleThreadPool


18

public static ExecutorService newSingleThreadPool(){


19

return new ThreadPoolExecutor(1,//初始线程数为1


20

 1,//最大线程数为1


21

 0L, TimeUnit.SECONDS,//一旦启动线程池,线程永远不消失


22

 new LinkedBlockingQueue<Runnable>());//链表阻塞队列


23

}


24



25

//ScheduledThreadPool


26

public newScheduledThreadPool(int corePoolSize){


27

super(corePoolSize,//初始线程数自定义


28

  Integer.MAX_VALUE,//无限多线程数


29

  0, NANOSECONDS,//一旦启动线程池,线程永远不消失


30

  new DelayedWorkQueue<Runnable>());//延时阻塞队列,隔一段时间执行一次任务


31

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: