您的位置:首页 > 其它

多线程基本概念

2014-12-24 09:51 148 查看
Executor

接口,用于对线程任务执行,调度等过程的解耦。提交Runnable任务。

它仅仅是一个接口

public interface Executor {
void execute(Runnable command);
}


所以,可以

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());


或者干脆:

class ThreadPerTaskExecutor implements Executor {      
public void execute(Runnable r) {
new Thread(r).start();
}
}


或者用策略模式:

class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
final Executor executor;
Runnable active;

SerialExecutor(Executor executor) {
this.executor = executor;
}

public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}

protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}}


ExecutorService

Executor有一个扩展的接口,是ExecutorService; Executors类提供工厂方法,创建各种ExecutorService。Executors还可以创建callable,Callable相比Runnable,包含返回类型和异常。

ExecutorService是对Executor的增强,加入了管理终止termination的方法和提供用于异步任务Future的处理。

关闭ExecutorService,将拒绝新任务。关闭方法有两种,shutdown和shutdownnow;前者方法不关闭当前任务,

后者会将正在执行的任务关闭。其submit方法用于提交任务(Callable或者Runnable),这里执行execute()

,并返回Future。(Future是一个接口,表示异步计算的结果,get方法可以获取结果,cancel可以取消没完成的任务;FutureTask是Future的一个实现)。

ExecutorService有两个扩展,一个是AbstractExecutorService,另一个是ScheduledExecutorService。前者实现了executorService的submit,invoke方法,并默认返回FutureTask,一个实现是ThreadPoolExecutor。后者提供了定时

或者按一定周期执行的方法。

ThreadPoolExecutor

1.简单的场景,可以用Executors 工厂方法:

Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)

Executors.newFixedThreadPool(int)(固定大小线程池)

Executors.newSingleThreadExecutor()(单个后台线程)

它们均为大多数使用场景预定义了设置。

2.复杂一些的场景,或者需要自定义时候,用ThreadPoolExecutor:

构造方法

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)


将根据corePoolSize(参见 getCorePoolSize())和 maximumPoolSize(参见 getMaximumPoolSize())

设置的边界自动调整池大小。当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使有线程是空闲的。

如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。

如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。

如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务

keepAliveTime表示多出corePoolSize的线程存活的时间。

workQueue任务执行之前,保持任务;对于execute方法提交的任务,将由这个队列保存。这个队列有三种形式:

SynchronousQueue,LinkedBlockingQueue,ArrayBlockingQueue。SynchronousQueue是默认的,它将任务直接提交给线程而不保持它们。如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界aximumPoolSizes以避免拒绝新提交的任务。但是当命令以超过队列所能处理的平均数连续到达时,此策略允许程无界的增长。LinkedBlockingQueue将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 值也就无效了)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许队列无限的增长。ArrayBlockingQueue有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

被拒绝的任务

当 Executor 已经关闭,或Executor将有限边界用于最大线程和工作队列容量,且已经饱和时, 在方法 execute(java.lang.Runnable) 中提交的新任务将被拒绝。

在以上两种情况下,execute 方法都将调用其RejectedExecutionHandler的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。

下面提供了四种预定义的处理程序策略:

1. 在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序遭到拒绝将抛出运行时RejectedExecutionException。

2. 在 ThreadPoolExecutor.CallerRunsPolicy中,线程调用运行该任务的 execute 本身。

此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

3. 在ThreadPoolExecutor.DiscardPolicy中,不能执行的任务将被删除。

4. 在ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。

定义和使用其他种类的RejectedExecutionHandler类也是可能的,但这样做需要非常小心,尤其是当策略仅用于特定容量或排队策略时

Example of ThreadPoolExecutor

GenerakTask implements callable .....


List<Future<V>> futures = new ArrayList<Future<V>>();
for(GenerakTask<V> task : tasks) {
<span style="white-space:pre">	</span>futures.add(executor.submit(task));
}
List<V> result = new ArrayList<V>();
for(Future<V> future : futures) {
try {
result.add(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return result;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: