您的位置:首页 > 编程语言 > Java开发

六、JAVA多线程与常见的线程池

2018-03-30 00:25 393 查看
首先讲一下进程和线程的区别:

    进程:每个进程都有独立的代码和数据空间(进程上下文),进程间的切换会有较大的开销,一个进程包含1--n个线程。

    线程:同一类线程共享代码和数据空间,每个线程有独立的运行栈和程序计数器(PC),线程切换开销小。

    线程和进程一样分为五个阶段:创建、就绪、运行、阻塞、终止。

    多进程是指操作系统能同时运行多个任务(程序)。

    多线程是指在同一程序中有多个顺序流在执行。一.线程的生命周期及五种基本状态

关于Java中线程的生命周期,首先看一下下面这张较为经典的图:



上图中基本上囊括了Java中多线程各重要知识点。掌握了上图中的各知识点,Java中的多线程也就基本上掌握了。主要包括:

Java线程具有五中基本状态

新建状态(New):当线程对象对创建后,即进入了新建状态,如:Thread t = new MyThread();

就绪状态(Runnable):当调用线程对象的start()方法(t.start();),线程即进入就绪状态。处于就绪状态的线程,只是说明此线程已经做好了准备,随时等待CPU调度执行,并不是说执行了t.start()此线程立即就会执行;

运行状态(Running):当CPU开始调度处于就绪状态的线程时,此时线程才得以真正执行,即进入到运行状态。注:就 绪状态是进入到运行状态的唯一入口,也就是说,线程要想进入运行状态执行,首先必须处于就绪状态中;

阻塞状态(Blocked):处于运行状态中的线程由于某种原因,暂时放弃对CPU的使用权,停止执行,此时进入阻塞状态,直到其进入到就绪状态,才 有机会再次被CPU调用以进入到运行状态。根据阻塞产生的原因不同,阻塞状态又可以分为三种:

1.等待阻塞:运行状态中的线程执行wait()方法,使本线程进入到等待阻塞状态;

2.同步阻塞 -- 线程在获取synchronized同步锁失败(因为锁被其它线程所占用),它会进入同步阻塞状态;

3.其他阻塞 -- 通过调用线程的sleep()或join()或发出了I/O请求时,线程会进入到阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入就绪状态。

死亡状态(Dead):线程执行完了或者因异常退出了run()方法,该线程结束生命周期。

二. Java多线程的创建及启动

Java中线程的创建常见有如三种基本形式

1.继承Thread类,重写该类的run()方法。
2.实现Runnable接口,并重写该接口的run()方法,该run()方法同样是线程执行体,创建Runnable实现类的实例,并以此实例作为Thread类的target来创建Thread对象,该Thread对象才是真正的线程对象。
3.使用Callable和Future接口创建线程。具体是创建Callable接口的实现类,并实现clall()方法。并使用FutureTask类来包装Callable实现类的对象,且以此FutureTask对象作为Thread对象的target来创建线程。

1.继承Thread类,重写该类的run()方法。
[html] view plain copyclass MyThread extends Thread {  
      
    private int i = 0;  
  
    @Override  
    public void run() {  
        for (i = 0; i < 100; i++) {  
            System.out.println(Thread.currentThread().getName() + " " + i);  
        }  
    }  
}  
[html] view plain copypublic class ThreadTest {  
  
    public static void main(String[] args) {  
        for (int i = 0; i < 100; i++) {  
            System.out.println(Thread.currentThread().getName() + " " + i);  
            if (i == 30) {  
                Thread myThread1 = new MyThread();     // 创建一个新的线程  myThread1  此线程进入新建状态  
                Thread myThread2 = new MyThread();     // 创建一个新的线程 myThread2 此线程进入新建状态  
                myThread1.start();                     // 调用start()方法使得线程进入就绪状态  
                myThread2.start();                     // 调用start()方法使得线程进入就绪状态  
            }  
        }  
    }  
}  
如上所示,继承Thread类,通过重写run()方法定义了一个新的线程类MyThread,其中run()方法的方法体代表了线程需要完成的任务,称之为线程执行体。当创建此线程类对象时一个新的线程得以创建,并进入到线程新建状态。通过调用线程对象引用的start()方法,使得该线程进入到就绪状态,此时此线程并不一定会马上得以执行,这取决于CPU调度时机。注意:start()方法的调用后并不是立即执行多线程代码,而是使得该线程变为可运行态(Runnable),什么时候运行是由操作系统决定的。
从程序运行的结果可以发现,多线程程序是乱序执行。因此,只有乱序执行的代码才有必要设计为多线程。
Thread.sleep()方法调用目的是不让当前线程独自霸占该进程所获取的CPU资源,以留出一定时间给其他线程执行的机会。
实际上所有的多线程代码执行顺序都是不确定的,每次执行的结果都是随机的。

但是start方法重复调用的话,会出现java.lang.IllegalThreadStateException异常。
[html] view plain copyThread1 mTh1=new Thread1("A");  
Thread1 mTh2=mTh1;  
mTh1.start();  
mTh2.start();  

输出:
Exception in thread "main" java.lang.IllegalThreadStateException
    at java.lang.Thread.start(Unknown Source)
    at com.multithread.learning.Main.main(Main.java:31)

2.实现Runnable接口,并重写该接口的run()方法,该run()方法同样是线程执行体,创建Runnable实现类的实例,并以此实例作为Thread类的target来创建Thread对象,该Thread对象才是真正的线程对象。
[html] view plain copyclass MyRunnable implements Runnable {  
    private int i = 0;  
  
    @Override  
    public void run() {  
        for (i = 0; i < 100; i++) {  
            System.out.println(Thread.currentThread().getName() + " " + i);  
        }  
    }  
}  
[html] view plain copypublic class ThreadTest {  
  
    public static void main(String[] args) {  
        for (int i = 0; i < 100; i++) {  
            System.out.println(Thread.currentThread().getName() + " " + i);  
            if (i == 30) {  
                Runnable myRunnable = new MyRunnable(); // 创建一个Runnable实现类的对象  
             //myRunnable.run();并不是线程开启,而是简单的方法调用  
                Thread thread1 = new Thread(myRunnable,"A窗口(线程)"); // 将myRunnable作为Thread target创建新的线程  
                Thread thread2 = new Thread(myRunnable,"B窗口(线程)");  
  
//thread1.run(); //如果该线程是使用独立的 Runnable 运行对象构造的,则调用该 Runnable 对象的 run 方法;否则,该方法不执行任何操作并返回。  
  
                thread1.start(); // 调用start()方法使得线程进入就绪状态  
                thread2.start();  
            }  
        }  
    }  
}  

要注意的是:

    1.r.run()并不是启动线程,而是简单的方法调用。

    2.Thread也有run()方法,如果该线程是使用独立的 Runnable 运行对象构造的,则调用该 Runnable 对象的 run 方法;否则,该方法不执行任何操作并返回。

    3.并不是一启动线程(调用start()方法)就执行这个线程,而是进入就绪状态,什么时候运行要看CUP。

                这里要注意每个线程都是用同一个实例化对象
总结:

实现Runnable接口比继承Thread类所具有的优势:

1):适合多个相同的程序代码的线程去处理同一个资源

2):可以避免java中的单继承的限制

3):增加程序的健壮性,代码可以被多个线程共享,代码和数据独立

相信以上两种创建新线程的方式大家都很熟悉了,那么Thread和Runnable之间到底是什么关系呢?我们首先来看一下下面这个例子。
[html] view plain copypublic class ThreadTest {  
  
    public static void main(String[] args) {  
        for (int i = 0; i < 100; i++) {  
            System.out.println(Thread.currentThread().getName() + " " + i);  
            if (i == 30) {  
                Runnable myRunnable = new MyRunnable();  
                Thread thread = new MyThread(myRunnable);  
                thread.start();  
            }  
        }  
    }  
}  
  
class MyRunnable implements Runnable {  
    private int i = 0;  
  
    @Override  
    public void run() {  
        System.out.println("in MyRunnable run");  
        for (i = 0; i < 100; i++) {  
            System.out.println(Thread.currentThread().getName() + " " + i);  
        }  
    }  
}  
  
class MyThread extends Thread {  
  
    private int i = 0;  
      
    public MyThread(Runnable runnable){  
        super(runnable);  
    }  
  
    @Override  
    public void run() {  
        System.out.println("in MyThread run");  
        for (i = 0; i < 100; i++) {  
            System.out.println(Thread.currentThread().getName() + " " + i);  
        }  
    }  
}  

同样的,与实现Runnable接口创建线程方式相似,不同的地方在于
[html] view plain copyThread thread = new MyThread(myRunnable);  

那么这种方式可以顺利创建出一个新的线程么?答案是肯定的。至于此时的线程执行体到底是MyRunnable接口中的run()方法还是MyThread类中的run()方法呢?通过输出我们知道线程执行体是MyThread类中的run()方法。其实原因很简单,因为Thread类本身也是实现了Runnable接口,而run()方法最先是在Runnable接口中定义的方法。
[html] view plain copypublic interface Runnable {  
     
     public abstract void run();  
       
}  
我们看一下Thread类中对Runnable接口中run()方法的实现:[html] view plain copy@Override  
    public void run() {  
        if (target != null) {  
            target.run();  
        }  
    }  

也就是说,当执行到Thread类中的run()方法时,会首先判断target是否存在,存在则执行target中的run()方法,也就是实现了Runnable接口并重写了run()方法的类中的run()方法。但是上述给到的列子中,由于多态的存在,根本就没有执行到Thread类中的run()方法,而是直接先执行了运行时类型即MyThread类中的run()方法。

3.使用Callable和Future接口创建线程。具体是创建Callable接口的实现类,并实现call()方法。并使用FutureTask类来包装Callable实现类的对象,且以此FutureTask对象作为Thread对象的target来创建线程。
 看着好像有点复杂,直接来看一个例子就清晰了。[html] view plain copypublic class ThreadTest {  
  
    public static void main(String[] args) {  
  
        Callable<Integer> myCallable = new MyCallable();    // 创建MyCallable对象  
        FutureTask<Integer> ft = new FutureTask<Integer>(myCallable); //使用FutureTask来包装MyCallable对象  
  
        for (int i = 0; i < 100; i++) {  
            System.out.println(Thread.currentThread().getName() + " " + i);  
            if (i == 30) {  
                Thread thread = new Thread(ft);   //FutureTask对象作为Thread对象的target创建新的线程  
                thread.start();                      //线程进入到就绪状态  
            }  
        }  
  
        System.out.println("主线程for循环执行完毕..");  
          
        try {  
            int sum = ft.get();            //取得新创建的新线程中的call()方法返回的结果  
            System.out.println("sum = " + sum);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } catch (ExecutionException e) {  
            e.printStackTrace();  
        }  
  
    }  
}  
  
  
class MyCallable implements Callable<Integer> {  
    private int i = 0;  
  
    // 与run()方法不同的是,call()方法具有返回值  
    @Override  
    public Integer call() {  
        int sum = 0;  
        for (; i < 100; i++) {  
            System.out.println(Thread.currentThread().getName() + " " + i);  
            sum += i;  
        }  
        return sum;  
    }  
  
}  

首先,我们发现,在实现Callable接口中,此时不再是run()方法了,而是call()方法,此call()方法作为线程执行体,同时还具有返回值!在创建新的线程时,是通过FutureTask来包装MyCallable对象,同时作为了Thread对象的target。那么看下FutureTask类的定义:
[html] view plain copypublic class FutureTask<V> implements RunnableFuture<V> {  
      
    //....  
      
}  
[html] view plain copypublic interface RunnableFuture<V> extends Runnable, Future<V> {  
       
    void run();  
<span style="font-family:'Helvetica Neue', Helvetica, Verdana, Arial, sans-serif;line-height:26.4444446563721px;">}</span>  
于是,我们发现FutureTask类实际上是同时实现了Runnable和Future接口,由此才使得其具有Future和Runnable双重特性。通过Runnable特性,可以作为Thread对象的target,而Future特性,使得其可以取得新创建线程中的call()方法的返回值。
执行下此程序,我们发现sum = 4950永远都是最后输出的。而“主线程for循环执行完毕..”则很可能是在子线程循环中间输出。由CPU的线程调度机制,我们知道,“主线程for循环执行完毕..”的输出时机是没有任何问题的,那么为什么sum =4950会永远最后输出呢?
原因在于通过ft.get()方法获取子线程call()方法的返回值时,当子线程此方法还未执行完毕,ft.get()方法会一直阻塞,直到call()方法执行完毕才能取到返回值。
上述主要讲解了三种常见的线程创建方式,对于线程的启动而言,都是调用线程对象的start()方法,需要特别注意的是:不能对同一线程对象两次调用start()方法。

上下文切换

对于单核CPU来说(对于多核CPU,此处就理解为一个核),CPU在一个时刻只能运行一个线程,当在运行一个线程的过程中转去运行另外一个线程,这个叫做线程上下文切换(对于进程也是类似)。由于可能当前线程的任务并没有执行完毕,所以在切换时需要保存线程的运行状态,以便下次重新切换回来时能够继续切换之前的状态运行。举个简单的例子:比如一个线程A正在读取一个文件的内容,正读到文件的一半,此时需要暂停线程A,转去执行线程B,当再次切换回来执行线程A的时候,我们不希望线程A又从文件的开头来读取。因此需要记录线程A的运行状态,那么会记录哪些数据呢?因为下次恢复时需要知道在这之前当前线程已经执行到哪条指令了,所以需要记录程序计数器的值,另外比如说线程正在进行某个计算的时候被挂起了,那么下次继续执行的时候需要知道之前挂起时变量的值时多少,因此需要记录CPU寄存器的状态。所以一般来说,线程上下文切换过程中会记录程序计数器、CPU寄存器状态等数据。说简单点的:对于线程的上下文切换实际上就是 存储和恢复CPU状态的过程,它使得线程执行能够从中断点恢复执行。虽然多线程可以使得任务执行的效率得到提升,但是由于在线程切换时同样会带来一定的开销代价,并且多个线程会导致系统资源占用的增加,所以在进行多线程编程时要注意这些因素。

线程的常用方法

编号方法说明
1
public void start()
使该线程开始执行;Java 虚拟机调用该线程的 run 方法。
2
public void run()
如果该线程是使用独立的 Runnable 运行对象构造的,则调用该 Runnable 对象的 run 方法;否则,该方法不执行任何操作并返回。
3
public final void setName(String name)
改变线程名称,使之与参数 name 相同。
4
public final void setPriority(int priority)
更改线程的优先级。
5
public final void setDaemon(boolean on)
将该线程标记为守护线程或用户线程。
6
public final void join(long millisec)
等待该线程终止的时间最长为 millis 毫秒。
7
public void interrupt()
中断线程。
8
public final boolean isAlive()
测试线程是否处于活动状态。
9
public static void yield()
暂停当前正在执行的线程对象,并执行其他线程。
10
public static void sleep(long millisec)
在指定的毫秒数内让当前正在执行的线程休眠(暂停执行),此操作受到系统计时器和调度程序精度和准确性的影响。
11
public static Thread currentThread()
返回对当前正在执行的线程对象的引用。


静态方法

currentThread()方法

currentThread()方法可以返回代码段正在被哪个线程调用的信息。

sleep()方法

方法sleep()的作用是在指定的毫秒数内让当前“正在执行的线程”休眠(暂停执行)。这个“正在执行的线程”是指this.currentThread()返回的线程。sleep方法有两个重载版本:
sleep相当于让线程睡眠,交出CPU,让CPU去执行其他的任务。
但是有一点要非常注意,sleep方法不会释放锁,也就是说如果当前线程持有对某个对象的锁,则即使调用sleep方法,其他线程也无法访问这个对象。看下面这个例子就清楚了:
输出结果:

从上面输出结果可以看出,当Thread-0进入睡眠状态之后,Thread-1并没有去执行具体的任务。只有当Thread-0执行完之后,此时Thread-0释放了对象锁,Thread-1才开始执行。注意,如果调用了sleep方法,必须捕获InterruptedException异常或者将该异常向上层抛出。当线程睡眠时间满后,不一定会立即得到执行,因为此时可能CPU正在执行其他的任务。所以说调用sleep方法相当于让线程进入阻塞状态。

yield()方法

调用yield方法会让当前线程交出CPU权限,让CPU去执行其他的线程。它跟sleep方法类似,同样不会释放锁。但是yield不能控制具体的交出CPU的时间,另外,yield方法只能让拥有相同优先级的线程有获取CPU执行时间的机会。注意,调用yield方法并不会让线程进入阻塞状态,而是让线程重回就绪状态,它只需要等待重新获取CPU执行时间,这一点是和sleep方法不一样的。
代码:
执行结果:
如果将 
//Thread.yield();
的注释去掉,执行结果如下:

对象方法

start()方法

start()用来启动一个线程,当调用start方法后,系统才会开启一个新的线程来执行用户定义的子任务,在这个过程中,会为相应的线程分配需要的资源。

run()方法

run()方法是不需要用户来调用的,当通过start方法启动一个线程之后,当线程获得了CPU执行时间,便进入run方法体去执行具体的任务。注意,继承Thread类必须重写run方法,在run方法中定义具体要执行的任务。

getId()

getId()的作用是取得线程的唯一标识
代码:
输出:

isAlive()方法

方法isAlive()的功能是判断当前线程是否处于活动状态
代码:
程序运行结果:
方法isAlive()的作用是测试线程是否偶处于活动状态。什么是活动状态呢?活动状态就是线程已经启动且尚未终止。线程处于正在运行或准备开始运行的状态,就认为线程是“存活”的。
有个需要注意的地方
虽然上面的实例中打印的值是true,但此值是不确定的。打印true值是因为myThread线程还未执行完毕,所以输出true。如果代码改成下面这样,加了个sleep休眠:
则上述代码运行的结果输出为false,因为mythread对象已经在1秒之内执行完毕。

join()方法

在很多情况下,主线程创建并启动了线程,如果子线程中药进行大量耗时运算,主线程往往将早于子线程结束之前结束。这时,如果主线程想等待子线程执行完成之后再结束,比如子线程处理一个数据,主线程要取得这个数据中的值,就要用到join()方法了。方法join()的作用是等待线程对象销毁。
执行结果:
由上可以看出main主线程等待joined thread线程先执行完了才结束的。如果把th.join()这行注释掉,运行结果如下:

getName和setName

用来得到或者设置线程名称。

getPriority和setPriority

用来获取和设置线程优先级。

setDaemon和isDaemon

用来设置线程是否成为守护线程和判断线程是否是守护线程。守护线程和用户线程的区别在于:守护线程依赖于创建它的线程,而用户线程则不依赖。举个简单的例子:如果在main线程中创建了一个守护线程,当main方法运行完毕之后,守护线程也会随着消亡。而用户线程则不会,用户线程会一直运行直到其运行完毕。在JVM中,像垃圾收集器线程就是守护线程。在上面已经说到了Thread类中的大部分方法,那么Thread类中的方法调用到底会引起线程状态发生怎样的变化呢?下面一幅图就是在上面的图上进行改进而来的:

停止线程

停止线程是在多线程开发时很重要的技术点,掌握此技术可以对线程的停止进行有效的处理。
停止一个线程可以使用Thread.stop()方法,但最好不用它。该方法是不安全的,已被弃用。
在Java中有以下3种方法可以终止正在运行的线程:使用退出标志,使线程正常退出,也就是当run方法完成后线程终止
使用stop方法强行终止线程,但是不推荐使用这个方法,因为stop和suspend及resume一样,都是作废过期的方法,使用他们可能产生不可预料的结果。
使用interrupt方法中断线程,但这个不会终止一个正在运行的线程,还需要加入一个判断才可以完成线程的停止。

暂停线程

interrupt()方法

线程的优先级

在操作系统中,线程可以划分优先级,优先级较高的线程得到的CPU资源较多,也就是CPU优先执行优先级较高的线程对象中的任务。
设置线程优先级有助于帮“线程规划器”确定在下一次选择哪一个线程来优先执行。
设置线程的优先级使用setPriority()方法,此方法在JDK的源码如下:
在Java中,线程的优先级分为1~10这10个等级,如果小于1或大于10,则JDK抛出异常throw new IllegalArgumentException()。
JDK中使用3个常量来预置定义优先级的值,代码如下:
线程优先级特性:继承性
比如A线程启动B线程,则B线程的优先级与A是一样的。
规则性
高优先级的线程总是大部分先执行完,但不代表高优先级线程全部先执行完。
随机性
优先级较高的线程不一定每一次都先执行完。

守护线程

在Java线程中有两种线程,一种是User Thread(用户线程),另一种是Daemon Thread(守护线程)。
Daemon的作用是为其他线程的运行提供服务,比如说GC线程。其实User Thread线程和Daemon Thread守护线程本质上来说去没啥区别的,唯一的区别之处就在虚拟机的离开:如果User Thread全部撤离,那么Daemon Thread也就没啥线程好服务的了,所以虚拟机也就退出了。守护线程并非虚拟机内部可以提供,用户也可以自行的设定守护线程,方法:public final void setDaemon(boolean on) ;但是有几点需要注意:thread.setDaemon(true)必须在thread.start()之前设置,否则会跑出一个IllegalThreadStateException异常。你不能把正在运行的常规线程设置为守护线程。 (备注:这点与守护进程有着明显的区别,守护进程是创建后,让进程摆脱原会话的控制+让进程摆脱原进程组的控制+让进程摆脱原控制终端的控制;所以说寄托于虚拟机的语言机制跟系统级语言有着本质上面的区别)
在Daemon线程中产生的新线程也是Daemon的。 (这一点又是有着本质的区别了:守护进程fork()出来的子进程不再是守护进程,尽管它把父进程的进程相关信息复制过去了,但是子进程的进程的父进程不是init进程,所谓的守护进程本质上说就是“父进程挂掉,init收养,然后文件0,1,2都是/dev/null,当前目录到/”)
不是所有的应用都可以分配给Daemon线程来进行服务,比如读写操作或者计算逻辑。因为在Daemon Thread还没来的及进行操作时,虚拟机可能已经退出了。

同步与死锁

同步代码块
在代码块上加上”synchronized”关键字,则此代码块就称为同步代码块
同步代码块格式
同步方法
除了代码块可以同步,方法也是可以同步的
方法同步格式
synchronized后续会单独来学习。(●’◡’●)

面试题

线程和进程有什么区别?
答:一个进程是一个独立(self contained)的运行环境,它可以被看作一个程序或者一个应用。而线程是在进程中执行的一个任务。线程是进程的子集,一个进程可以有很多线程,每条线程并行执行不同的任务。不同的进程使用不同的内存空间,而所有的线程共享一片相同的内存空间。别把它和栈内存搞混,每个线程都拥有单独的栈内存用来存储本地数据。如何在Java中实现线程?
答:
创建线程有两种方式:
一、继承 Thread 类,扩展线程。
二、实现 Runnable 接口。启动一个线程是调用run()还是start()方法?
答:启动一个线程是调用start()方法,使线程所代表的虚拟处理机处于可运行状态,这意味着它可以由JVM 调度并执行,这并不意味着线程就会立即运行。run()方法是线程启动后要进行回调(callback)的方法。Thread类的sleep()方法和对象的wait()方法都可以让线程暂停执行,它们有什么区别?
答:sleep()方法(休眠)是线程类(Thread)的静态方法,调用此方法会让当前线程暂停执行指定的时间,将执行机会(CPU)让给其他线程,但是对象的锁依然保持,因此休眠时间结束后会自动恢复(线程回到就绪状态,请参考第66题中的线程状态转换图)。wait()是Object类的方法,调用对象的wait()方法导致当前线程放弃对象的锁(线程暂停执行),进入对象的等待池(wait pool),只有调用对象的notify()方法(或notifyAll()方法)时才能唤醒等待池中的线程进入等锁池(lock pool),如果线程重新获得对象的锁就可以进入就绪状态。线程的sleep()方法和yield()方法有什么区别?
答:
① sleep()方法给其他线程运行机会时不考虑线程的优先级,因此会给低优先级的线程以运行的机会;yield()方法只会给相同优先级或更高优先级的线程以运行的机会;
② 线程执行sleep()方法后转入阻塞(blocked)状态,而执行yield()方法后转入就绪(ready)状态;
③ sleep()方法声明抛出InterruptedException,而yield()方法没有声明任何异常;
④ sleep()方法比yield()方法(跟操作系统CPU调度相关)具有更好的可移植性。请说出与线程同步以及线程调度相关的方法。
答:wait():使一个线程处于等待(阻塞)状态,并且释放所持有的对象的锁;
sleep():使一个正在运行的线程处于睡眠状态,是一个静态方法,调用此方法要处理InterruptedException异常;
notify():唤醒一个处于等待状态的线程,当然在调用此方法的时候,并不能确切的唤醒某一个等待状态的线程,而是由JVM确定唤醒哪个线程,而且与优先级无关;
notityAll():唤醒所有处于等待状态的线程,该方法并不是将对象的锁给所有线程,而是让它们竞争,只有获得锁的线程才能进入就绪状态;

Executors提供四种线程池,分别为:newCachedThreadPool 
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。 
创建方式: Executors.newCachedThreadPool();
newFixedThreadPool 
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。定长线程池的大小最好根据系统资源进行设置,如Runtime.getRuntime().availableProcessors()。 
创建方式: Executors.newFixedThreadPool();
newScheduledThreadPool 
创建一个定长线程池,支持定时及周期性任务执行。 
创建方式: Executors.newScheduledThreadPool ();
newSingleThreadExecutor 
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 
创建方式: Executors.newSingleThreadExecutor ();
我们重点看一下newScheduledThreadPool示例, 其他的创建完线程池后,使用 threadPool.execute(new Runnable())方式执行任务。
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 表示延迟3秒执行
scheduledThreadPool.schedule(new Runnable() {
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);
}
// 表示延迟1秒后每3秒执行一次
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("delay 1 seconds, and excute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

说在前面:

查看Executors源码我们知道,Executors 类提供了使用了 ThreadPoolExecutor 的简单的 ExecutorService 实现,也就是上面所说的四种Executors线程池,但是 ThreadPoolExecutor 提供的功能远不止于此。 
不过在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池 
我们可以在创建 ThreadPoolExecutor 实例时指定活动线程的数量,我们也可以限制线程池的大小并且创建我们自己的 RejectedExecutionHandler 实现来处理不能适应工作队列的工作。 
下面我们就先了解一下ThreadPoolExecutor,然后在看个示例代码。Executors 源码:
public class Executors {

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

一、ThreadPoolExecutor类

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。在ThreadPoolExecutor类中提供了四个构造方法:
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。构造器中各个参数的含义:corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:参数keepAliveTime的时间单位,有7种取值。TimeUnit.DAYS、TimeUnit.HOURS、TimeUnit.MINUTES、TimeUnit.SECONDS、TimeUnit.MILLISECONDS、TimeUnit.MICROSECONDS、TimeUnit.NANOSECONDS
workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue。 
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
threadFactory:线程工厂,主要用来创建线程;
handler:表示当拒绝处理任务时的策略,有以下四种取值: 
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) 
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
查看源码我们知道: 
1、Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的; 
2、然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等; 
3、抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法; 
4、然后ThreadPoolExecutor继承了类AbstractExecutorService。ThreadPoolExecutor类中有几个非常重要的方法: 
1、execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。 
2、submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。 
3、shutdown()和shutdownNow()是用来关闭线程池的。 
4、还有一大波get的方法, 可以获取与线程池相关属性的方法。

二.剖析线程池实现原理

1、线程池状态
volatile int runState;  // 前线程池的状态,它是一个volatile变量用来保证线程之间的可见性
static final int RUNNING    = 0; //  当创建线程池后,初始时,线程池处于RUNNING状态
static final int SHUTDOWN   = 1; 如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕
static final int STOP       = 2; // 如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
static final int TERMINATED = 3; // 当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
1
2
3
4
5
2.任务的执行
private final BlockingQueue<Runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工作集
private volatile long  keepAliveTime;    //线程存货时间
private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间
private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数, 当线程数大于corePoolSize时,创建新的先线程,但是创建新的线程数 + corePoolSize不能大于maximumPoolSize
private volatile int   poolSize;       //线程池中当前的线程数
private volatile RejectedExecutionHandler handler; //任务拒绝策略
private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程
private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数
private long completedTaskCount;   //用来记录已经执行完毕的任务个数
1
2
3
4
5
6
7
8
9
10
11
12
在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task.  The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread.  If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
从代码注释,我们知道:如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
addWorker() 添加任务, 创建Worker, Worker 继承 AbstractQueuedSynchronizer 实现 Runnable 
addWorker()几个关键步骤:
w = new Worker(firstTask);
final Thread t = w.thread; // 从worker取得线程
if (workerAdded) {
t.start(); // worker添加成功,执行任务
workerStarted = true;
}
1
2
3
4
5
6
3.线程池中的线程初始化 
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。 
在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:
// 初始化一个核心线程;
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}

// 初始化所有核心线程
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
4.任务缓存队列及排队策略 
workQueue的类型为BlockingQueue,通常可以取下面三种类型: 
1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小; 
2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE; 
3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。5.任务拒绝策略 
前面已经讲过, 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略: 
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) 
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务6.线程池的关闭 
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务 
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务7.线程池容量的动态调整 
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(), 
setCorePoolSize:设置核心池大小 
setMaximumPoolSize:设置线程池最大能创建的线程数目大小 
当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。

三、使用示例

我们可以在创建 ThreadPoolExecutor 实例时指定活动线程的数量,我们也可以限制线程池的大小并且创建我们自己的 RejectedExecutionHandler 实现来处理不能适应工作队列的工作。
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " is rejected");
}

}
1
2
3
4
5
6
7
8
ThreadPoolExecutor 提供了一些方法,我们可以使用这些方法来查询 executor 的当前状态,线程池大小,活动线程数量以及任务数量。因此我是用来一个监控线程在特定的时间间隔内打印 executor 信息。 
MyMonitorThread.java
public class MyMonitorThread implements Runnable
{
private ThreadPoolExecutor executor;

private int seconds;

private boolean run=true;

public MyMonitorThread(ThreadPoolExecutor executor, int delay)
{
this.executor = executor;
this.seconds=delay;
}

public void shutdown(){
this.run=false;
}

@Override
public void run()
{
while(run){
System.out.println(
String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
this.executor.getPoolSize(),
this.executor.getCorePoolSize(),
this.executor.getActiveCount(),
this.executor.getCompletedTaskCount(),
this.executor.getTaskCount(),
this.executor.isShutdown(),
this.executor.isTerminated()));
try {
Thread.sleep(seconds*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
这里是使用 ThreadPoolExecutor 的线程池实现例子。 
WorkerPool.java
public class WorkerPool {

public static void main(String args[]) throws InterruptedException{
//RejectedExecutionHandler implementation
RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
//Get the ThreadFactory implementation to use
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//creating the ThreadPoolExecutor
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler);
//start the monitoring thread
MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
Thread monitorThread = new Thread(monitor);
monitorThread.start();
//submit work to the thread pool
for(int i=0; i<10; i++){
executorPool.execute(new WorkerThread("cmd"+i));
}

Thread.sleep(30000);
//shut down the pool
executorPool.shutdown();
//shut down the monitor thread
Thread.sleep(5000);
monitor.shutdown();

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
注意在初始化 ThreadPoolExecutor 时,我们保持初始池大小为 2,最大池大小为 4 而工作队列大小为 2。因此如果已经有四个正在执行的任务而此时分配来更多任务的话,工作队列将仅仅保留他们(新任务)中的两个,其他的将会被 RejectedExecutionHandlerImpl 处理。

四.合理配置线程池的大小

遵循两原则: 
1、如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1 
2、如果是IO密集型任务,参考值可以设置为2*NCPU 
当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息