java中的多线程
2016-03-29 21:37
489 查看
简介
为了给并发程序开发提供更好的支持,java不仅提供了Thread类、Runnable接口等简单的多线程支持工具,还提供了用于多线程管理的线程池,用于提高并发程序的性能。
无限制线程的缺陷
多线程的软件设计方法确实可以提高多核处理器的计算能力,提高系统的性能和吞吐量,但是如果不加限制的使用多线程,对于系统性能不仅不能提升,反而会下降产生不利影响。
简单的线程创建方法new Thread().start(),通过thread来启动线程,并且由系统自动的回收线程。在简单的系统这样做并没有问题,但是在真实的生产系统中,在某一时刻会有大量的请求,这样就需要为每一个请求创建一个线程,而当线程数量过大时,会耗尽CPU和内存资源。
虽然线程是一种轻量级的工具,但是其创建和销毁也需要消耗一定的时间;其次,线程本身也是需要占用内存空间的,大量的线程会抢占内存只有,导致Out of memory异常,并且大量线程的回收会给GC带来很大压力,延长GC停顿的时间。
因此,对线程的使用必须控制一个度,在适当的范围内使用会提供系统性能,但是,一旦超过了这个范围,大量的线程就会拖垮整个系统。在生产环境下,必须要对其进行管理和控制。
简单线程池的实现
前面介绍在多线程中不断的创建和销毁线程会带来额外的开销,这样就需要引入一种线程复用机制,即线程池。线程池的基本功能就是进行线程复用,当系统接受一个提交的任务,需要一个线程时,并不急于去创建一个线程,而是去现场池中寻找,是否有闲置的线程,若有,直接使用线程池中的线程工作,如没有,再去创建新的线程。待任务完成后,也不是简单的销毁线程,而是将线程放回线程池中,以便下次复用。
上面已经把线程池实现的原理简单说明了一下,下面我们自己实现一个线程,来了解一下线程池实现的核心功能,有助于理解线程池的实现。
线程池实现代码:
从代码中可以看出,线程池中有一个闲置线程的队列,在执行任务时,如果有闲置线程,则从线程池中取线程执行任务,如果没有现在线程,则创建新的线程,并且在现场使用完毕后,会将线程重新放回到闲置线程队列中。
另外,线程池的使用还需要一个永不退出的线程的配合使用,该线程在手动关闭前永不结束,并且一直等待新任务的到来。代码如下:
执行线程:
测试代码:
线程池能减少线程频繁调度的开销,线程的复用,对系统性能的提升效果比较明显。
Executor框架
JDK提供了一整套的Executor框架,帮助开发人员有效的进行线程控制。ThreadPoolExecutor表示一个线程池,Executors类扮演着线程池工厂的角色,通过Executor可以取得一个特定功能的线程池。
newFixedThreadPool():该方法返回一个固定线程数量的线程池,该线程池中线程的数量始终保持不变。当一个任务提交后,线程池中若有空闲线程则立即执行,若没有,新任务会被保存在一个任务队列中,待有现车空闲时,便处理任务队列中的任务。
newSingleThreadExecutor():该方法返回只有一个线程的线程池。若多余的任务被提交到该线程池,任务会被保存到一个任务队列中,若线程空闲,按先进先出的顺序执行队列中的任务。
newCacheThreadPool():该方法返回一个根据实际情况调整线程数量的线程池。若有空闲线程可以复用,则优先选择使用可复用的线程,否则,创建新的线程处理新任务。
newSingleThreadScheduledExecutor()方法:该方法返回一个ScheduledExecutorService对象,线程池大小为1,在给定时间内执行某一任务。
自定义线程池
newFixedThreadPool、newSingleThreadExecutor、newCacheThreadPool的内部实现都实现了ThreadPoolExecutor。在ThreadPoolExecutor中有一个最主要的构造函数:
corePoolSize:线程池中核心线程的数量;
maximumPoolSize:线程池中最大线程的数量;
keepAliveTime:当线程池中的线程数量超过corePoolSize时,多余的空闲线程的存活时间;
unit:keepAliveTime的单位;
workQueue:任务队列,提交但是尚未执行的任务;
threadFactory:线程工厂,用于创建线程;
hander:拒绝策略,当任务太多时,怎么拒绝任务。
主要来介绍下workQueue和hander:
workQueue任务队列,提交但是尚未执行的任务,主要由以下几种实现方式:
直接提交的队列:该功能由SynchronousQueue对象提供。SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每当插入操作都要等待响应的删除操作,反之,每当删除操作斗鱼等待线程插入操作。它不保证任务,总是将任务提交给线程来执行,如果没有空闲线程,则创建新的线程,当线程数达到最大数值时,则执行拒绝策略。因此,如果使用SynchronousQueue,通常会设置很大的maximumPoolSize,否则很容易执行拒绝策略。
有界任务队列:当使用有界任务队列,若有新的任务需要执行,如果线程池的实际线程数小于corePoolSize,则会创建新的线程,若大于corePoolSize,则将新任务加入到等待队列中,若任务队列已满,无法加入,则在总数不大于maximumPoolSize的前提下,创建新的线程执行。若大于maximumPoolSize,则执行拒绝策略。如ArrayBlockingQueue队列。
无界任务队列:于有界任务队列相比,除非耗尽系统资源,否则不会出现任务放入任务队列失败的情况。当有新的任务到来时,系统的线程数小于corePoolSize,线程池就会产生新的线程执行任务。当系统的线程数达到corePoolSize,就不再增加。如后续仍有新的任务产生,但是没有空闲的线程资源,那么线程进入任务队列进行等待。如:LinkedBlockingQueue队列。
优先任务队列:优先任务队列是带有执行优先级的队列。可以控制任务的执行顺序,是一个特殊的无界队列。如PriorityBlockingQueue队列。PriorityBlockingQueue可以根据任务自身的优先级顺序先后执行,在确保系统性能的同时,也能有很好的质量保证。线程要实现comparable接口。
hander是线程池的拒绝策略:
JDK内置的拒绝策略如下:
AbortPolicy策略:该策略会直接抛出异常,阻止系统的正常工作。
CallerRunsPolicy策略:只要线程池未关系,该策略直接在调用者线程中,运行当前被丢弃的任务。
DiscardOledestPolicy策略:该策略将丢弃最老的一个请求,尝试再次执行提交当前任务。
DiscardPolicy策略:该策略丢弃无法处理的任务,不予以任何处理。
在JDK所提供的线程池不能满足需求的时候,可以考虑实现自定义的线程池,自定义的线程池可以提供更为灵活的任务处理和调度方式。
扩展ThreadPoolExecutor
ThreadPoolExecutor是一个可扩展的线程池,它提供了beforeExecute()、afterExecute()和terminated()三个方法实现对线程池的控制。
在默认的ThreadPoolExecutor实现中,提供了空的beforeExecute()、afterExecute()的实现。在实际应用中,可以对其进行扩展,实现对线程池状态的跟踪,输出一些有用的调试信息,以帮助系统故障诊断。
下面就是一个带有日志输出功能的线程池,该线程池会在任务执行前输入任务执行的名称和id,同时,在任务执行完毕后,可以输出线程的id和当前线程池的线程数量。
为了给并发程序开发提供更好的支持,java不仅提供了Thread类、Runnable接口等简单的多线程支持工具,还提供了用于多线程管理的线程池,用于提高并发程序的性能。
无限制线程的缺陷
多线程的软件设计方法确实可以提高多核处理器的计算能力,提高系统的性能和吞吐量,但是如果不加限制的使用多线程,对于系统性能不仅不能提升,反而会下降产生不利影响。
简单的线程创建方法new Thread().start(),通过thread来启动线程,并且由系统自动的回收线程。在简单的系统这样做并没有问题,但是在真实的生产系统中,在某一时刻会有大量的请求,这样就需要为每一个请求创建一个线程,而当线程数量过大时,会耗尽CPU和内存资源。
虽然线程是一种轻量级的工具,但是其创建和销毁也需要消耗一定的时间;其次,线程本身也是需要占用内存空间的,大量的线程会抢占内存只有,导致Out of memory异常,并且大量线程的回收会给GC带来很大压力,延长GC停顿的时间。
因此,对线程的使用必须控制一个度,在适当的范围内使用会提供系统性能,但是,一旦超过了这个范围,大量的线程就会拖垮整个系统。在生产环境下,必须要对其进行管理和控制。
简单线程池的实现
前面介绍在多线程中不断的创建和销毁线程会带来额外的开销,这样就需要引入一种线程复用机制,即线程池。线程池的基本功能就是进行线程复用,当系统接受一个提交的任务,需要一个线程时,并不急于去创建一个线程,而是去现场池中寻找,是否有闲置的线程,若有,直接使用线程池中的线程工作,如没有,再去创建新的线程。待任务完成后,也不是简单的销毁线程,而是将线程放回线程池中,以便下次复用。
上面已经把线程池实现的原理简单说明了一下,下面我们自己实现一个线程,来了解一下线程池实现的核心功能,有助于理解线程池的实现。
线程池实现代码:
public class ThreadPool { private static ThreadPool instance = null; //空闲线程队列 private List<PThread> idelThreads; //已有的线程总数 private int threadCounter; private boolean isShutdown = false; public ThreadPool() { idelThreads = new Vector<PThread>(5); threadCounter=0; } public synchronized int getCreatedThreadsCount() { return threadCounter; } //取得线程池实例 public synchronized static ThreadPool getInstatce(){ if(instance==null){ instance = new ThreadPool(); } return instance; } //把线程重新放回到池中 public synchronized void repool(PThread repoolThread){ if(!isShutdown){ idelThreads.add(repoolThread); }else{ repoolThread.shutdown(); } } //停止池中所有线程 public synchronized void shutdown(){ isShutdown = true; for (int i = 0; i < idelThreads.size(); i++) { PThread pthread = idelThreads.get(i); pthread.shutdown(); } } //执行任务 public synchronized void start(Runnable target){ PThread pthread = null; //如果有闲置线程 if(idelThreads.size()>0){ int index = idelThreads.size()-1; pthread=idelThreads.get(index); idelThreads.remove(index); pthread.setTarget(target); }else{//如果没有闲置线程 threadCounter++; PThread p = new PThread(instance, target, "PThread#"+threadCounter); p.start(); } } }
从代码中可以看出,线程池中有一个闲置线程的队列,在执行任务时,如果有闲置线程,则从线程池中取线程执行任务,如果没有现在线程,则创建新的线程,并且在现场使用完毕后,会将线程重新放回到闲置线程队列中。
另外,线程池的使用还需要一个永不退出的线程的配合使用,该线程在手动关闭前永不结束,并且一直等待新任务的到来。代码如下:
public class PThread extends Thread{ //线程池 private ThreadPool pool; //任务 private Runnable target; private boolean isShutDown = false; private boolean isIdle = false; public PThread(ThreadPool pool, Runnable target,String name) { super(name); this.pool = pool; this.target = target; } public synchronized Runnable getTarget() { return target; } public synchronized boolean isIdle() { return isIdle; } @Override public void run() { while(!isShutDown){ isIdle = false; if(target!=null){ //运行任务 target.run(); } //任务结束,闲置任务 isIdle=true; try { pool.repool(this); synchronized (this) { //线程闲置,等待任务到来 wait(); } } catch (Exception e) { // TODO: handle exception } isIdle=false; } } public synchronized void setTarget(Runnable target){ this.target=target; //设置任务之后,通知run方法,开始执行 notifyAll(); } public synchronized void shutdown(){ isShutDown=true; notifyAll(); } }
执行线程:
public class MyThread implements Runnable{ private String name; public MyThread() { } public MyThread(String name) { this.name = name; } @Override public void run() { // TODO Auto-generated method stub try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
测试代码:
public class TestClient { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); long begin = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { //new Thread(new MyThread("testnopoolThread"+i)).start(); ThreadPool.getInstatce().start(new MyThread("testpoolThread"+i)); //executor.execute(new MyThread("executorpoolThread"+i)); } System.out.println(System.currentTimeMillis()-begin); } }
线程池能减少线程频繁调度的开销,线程的复用,对系统性能的提升效果比较明显。
Executor框架
JDK提供了一整套的Executor框架,帮助开发人员有效的进行线程控制。ThreadPoolExecutor表示一个线程池,Executors类扮演着线程池工厂的角色,通过Executor可以取得一个特定功能的线程池。
newFixedThreadPool():该方法返回一个固定线程数量的线程池,该线程池中线程的数量始终保持不变。当一个任务提交后,线程池中若有空闲线程则立即执行,若没有,新任务会被保存在一个任务队列中,待有现车空闲时,便处理任务队列中的任务。
newSingleThreadExecutor():该方法返回只有一个线程的线程池。若多余的任务被提交到该线程池,任务会被保存到一个任务队列中,若线程空闲,按先进先出的顺序执行队列中的任务。
newCacheThreadPool():该方法返回一个根据实际情况调整线程数量的线程池。若有空闲线程可以复用,则优先选择使用可复用的线程,否则,创建新的线程处理新任务。
newSingleThreadScheduledExecutor()方法:该方法返回一个ScheduledExecutorService对象,线程池大小为1,在给定时间内执行某一任务。
自定义线程池
newFixedThreadPool、newSingleThreadExecutor、newCacheThreadPool的内部实现都实现了ThreadPoolExecutor。在ThreadPoolExecutor中有一个最主要的构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
corePoolSize:线程池中核心线程的数量;
maximumPoolSize:线程池中最大线程的数量;
keepAliveTime:当线程池中的线程数量超过corePoolSize时,多余的空闲线程的存活时间;
unit:keepAliveTime的单位;
workQueue:任务队列,提交但是尚未执行的任务;
threadFactory:线程工厂,用于创建线程;
hander:拒绝策略,当任务太多时,怎么拒绝任务。
主要来介绍下workQueue和hander:
workQueue任务队列,提交但是尚未执行的任务,主要由以下几种实现方式:
直接提交的队列:该功能由SynchronousQueue对象提供。SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每当插入操作都要等待响应的删除操作,反之,每当删除操作斗鱼等待线程插入操作。它不保证任务,总是将任务提交给线程来执行,如果没有空闲线程,则创建新的线程,当线程数达到最大数值时,则执行拒绝策略。因此,如果使用SynchronousQueue,通常会设置很大的maximumPoolSize,否则很容易执行拒绝策略。
有界任务队列:当使用有界任务队列,若有新的任务需要执行,如果线程池的实际线程数小于corePoolSize,则会创建新的线程,若大于corePoolSize,则将新任务加入到等待队列中,若任务队列已满,无法加入,则在总数不大于maximumPoolSize的前提下,创建新的线程执行。若大于maximumPoolSize,则执行拒绝策略。如ArrayBlockingQueue队列。
无界任务队列:于有界任务队列相比,除非耗尽系统资源,否则不会出现任务放入任务队列失败的情况。当有新的任务到来时,系统的线程数小于corePoolSize,线程池就会产生新的线程执行任务。当系统的线程数达到corePoolSize,就不再增加。如后续仍有新的任务产生,但是没有空闲的线程资源,那么线程进入任务队列进行等待。如:LinkedBlockingQueue队列。
优先任务队列:优先任务队列是带有执行优先级的队列。可以控制任务的执行顺序,是一个特殊的无界队列。如PriorityBlockingQueue队列。PriorityBlockingQueue可以根据任务自身的优先级顺序先后执行,在确保系统性能的同时,也能有很好的质量保证。线程要实现comparable接口。
hander是线程池的拒绝策略:
JDK内置的拒绝策略如下:
AbortPolicy策略:该策略会直接抛出异常,阻止系统的正常工作。
CallerRunsPolicy策略:只要线程池未关系,该策略直接在调用者线程中,运行当前被丢弃的任务。
DiscardOledestPolicy策略:该策略将丢弃最老的一个请求,尝试再次执行提交当前任务。
DiscardPolicy策略:该策略丢弃无法处理的任务,不予以任何处理。
在JDK所提供的线程池不能满足需求的时候,可以考虑实现自定义的线程池,自定义的线程池可以提供更为灵活的任务处理和调度方式。
扩展ThreadPoolExecutor
ThreadPoolExecutor是一个可扩展的线程池,它提供了beforeExecute()、afterExecute()和terminated()三个方法实现对线程池的控制。
在默认的ThreadPoolExecutor实现中,提供了空的beforeExecute()、afterExecute()的实现。在实际应用中,可以对其进行扩展,实现对线程池状态的跟踪,输出一些有用的调试信息,以帮助系统故障诊断。
下面就是一个带有日志输出功能的线程池,该线程池会在任务执行前输入任务执行的名称和id,同时,在任务执行完毕后,可以输出线程的id和当前线程池的线程数量。
public class MyThreadPoolExecutor extends ThreadPoolExecutor{ public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("BeforeExecute MyThread name:"+((MyThread)r).getName()+"ID: "+t.getId() ); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("AfterExecute MyThread ID:"+Thread.currentThread().getId()); System.out.println("AfterExecute Poolsize:"+this.getPoolSize()); } }
相关文章推荐
- java、运算
- Java中的参数传递方式
- 深入理解 Java 虚拟机-Java 垃圾收集机制
- Java千百问_03基本语法(005)_二进制是怎样做位运算的
- Spring源码(二)---AOP
- java反射笔记
- 深入理解 Java 虚拟机-javac 编译与 JIT 编译
- eclipse 条件断点
- java守护线程
- java的IO流
- Java方法参数-值传递、引用传递
- 深入理解 Java 虚拟机-Java 语法糖
- 深入理解 Java 虚拟机-多态性实现机制——静态分派与动态分派
- JavaWeb 后端 <十三> 之 监听器 JSTL国际化
- 20160329javaweb之JSP -session入门
- 简单复利计算java板
- Spring 3.0 + Atomikos构建jta分布式事务
- JAVA 死锁实例
- 2016-03-29-Spring框架
- 我看过的最好最实用的String文章