您的位置:首页 > 编程语言 > Java开发

Java ExecutorService 线程池

2012-05-04 10:59 501 查看
ExecutorService 建立多线程的步骤:

1。定义线程类class Handler implements Runnable{

}
2。建立ExecutorService线程池ExecutorService executorService = Executors.newCachedThreadPool();

或者

int cpuNums = Runtime.getRuntime().availableProcessors();

//获取当前系统的CPU 数目

ExecutorService executorService =Executors.newFixedThreadPool(cpuNums * POOL_SIZE);

//ExecutorService通常根据系统资源情况灵活定义线程池大小
3。调用线程池操作循环操作,成为daemon,把新实例放入Executor池中

while(true){

executorService.execute(new Handler(socket));

// class Handler implements Runnable{

或者

executorService.execute(createTask(i));

//private static Runnable createTask(final int taskID)

}

execute(Runnable对象)方法

其实就是对Runnable对象调用start()方法

(当然还有一些其他后台动作,比如队列,优先级,IDLE timeout,active激活等)
几种不同的ExecutorService线程池对象

1.newCachedThreadPool() -缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse.如果没有,就建一个新的线程加入池中

-缓存型池子通常用于执行一些生存期很短的异步型任务

因此在一些面向连接的daemon型SERVER中用得不多。

-能reuse的线程,必须是timeout IDLE内的池中线程,缺省timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。

注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止。
2. newFixedThreadPool-newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程

-其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子

-和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器

-从方法的源代码看,cache池和fixed 池调用的是同一个底层池,只不过参数不同:

fixed池线程数固定,并且是0秒IDLE(无IDLE)

cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE
3.ScheduledThreadPool-调度型线程池

-这个池子里的线程可以按schedule依次delay执行,或周期执行
4.SingleThreadExecutor-单例线程,任意时间池中只能有一个线程

-用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)
上面四种线程池,都使用Executor的缺省线程工厂建立线程,也可单独定义自己的线程工厂

下面是缺省线程工厂代码:

static class DefaultThreadFactory implements ThreadFactory {

static final AtomicInteger poolNumber = new AtomicInteger(1);

final ThreadGroup group;

final AtomicInteger threadNumber = new AtomicInteger(1);

final String namePrefix;

DefaultThreadFactory() {

SecurityManager s = System.getSecurityManager();

group = (s != null)? s.getThreadGroup() :Thread.currentThread().getThreadGroup();

namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";

}

public Thread newThread(Runnable r) {

Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);

if (t.isDaemon())

t.setDaemon(false);

if (t.getPriority() != Thread.NORM_PRIORITY)

t.setPriority(Thread.NORM_PRIORITY);

return t;

}

}
也可自己定义ThreadFactory,加入建立池的参数中

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
Executor的execute()方法

execute() 方法将Runnable实例加入pool中,并进行一些pool size计算和优先级处理

execute() 方法本身在Executor接口中定义,有多个实现类都定义了不同的execute()方法

如ThreadPoolExecutor类(cache,fiexed,single三种池子都是调用它)的execute方法如下:

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

if (runState == RUNNING && workQueue.offer(command)) {

if (runState != RUNNING || poolSize == 0)

ensureQueuedTaskHandled(command);

}

else if (!addIfUnderMaximumPoolSize(command))

reject(command); // is shutdown or saturated

}

}

二:

一、创建任务

任务就是一个实现了Runnable接口的类。
创建的时候实run方法即可。

二、执行任务

通过java.util.concurrent.ExecutorService接口对象来执行任务,该接口对象通过工具类java.util.concurrent.Executors的静态方法来创建。

Executors此包中所定义的 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 类的工厂和实用方法。

ExecutorService提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以关闭 ExecutorService,这将导致其停止接受新任务。关闭后,执行程序将最后终止,这时没有任务在执行,也没有任务在等待执行,并且无法提交新任务。
executorService.execute(new TestRunnable());

1、创建ExecutorService
通过工具类java.util.concurrent.Executors的静态方法来创建。
Executors此包中所定义的 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 类的工厂和实用方法。

比如,创建一个ExecutorService的实例,ExecutorService实际上是一个线程池的管理工具:
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService executorService = Executors.newFixedThreadPool(3);
ExecutorService executorService = Executors.newSingleThreadExecutor();

2、将任务添加到线程去执行
当将一个任务添加到线程池中的时候,线程池会为每个任务创建一个线程,该线程会在之后的某个时刻自动执行。

三、关闭执行服务对象
executorService.shutdown();

四、综合实例

package concurrent;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

/**

* Created by IntelliJ IDEA.

*

* @author leizhimin 2008-11-25 14:28:59

*/

public class TestCachedThreadPool {

public static
void main(String[] args) {

// ExecutorService executorService = Executors.newCachedThreadPool();

ExecutorService executorService = Executors.newFixedThreadPool(5);

// ExecutorService executorService = Executors.newSingleThreadExecutor();

for (int i = 0; i < 5; i++) {

executorService.execute(new TestRunnable());

System.out.println("************* a" + i +
" *************");

}

executorService.shutdown();

}

}

class TestRunnable
implements Runnable {

public void run() {

System.out.println(Thread.currentThread().getName() +
"线程被调用了。");

while (true) {

try {

Thread.sleep(5000);

System.out.println(Thread.currentThread().getName());

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

运行结果:

************* a0 *************

************* a1 *************

pool-1-thread-2线程被调用了。

************* a2 *************

pool-1-thread-3线程被调用了。

pool-1-thread-1线程被调用了。

************* a3 *************

************* a4 *************

pool-1-thread-4线程被调用了。

pool-1-thread-5线程被调用了。

pool-1-thread-2

pool-1-thread-1

pool-1-thread-3

pool-1-thread-5

pool-1-thread-4

pool-1-thread-2

pool-1-thread-1

pool-1-thread-3

pool-1-thread-5

pool-1-thread-4

......

五、获取任务的执行的返回值
在Java5之后,任务分两类:一类是实现了Runnable接口的类,一类是实现了Callable接口的类。两者都可以被ExecutorService执行,但是Runnable任务没有返回值,而Callable任务有返回值。并且Callable的call()方法只能通过ExecutorService的submit(Callable<T> task)
方法来执行,并且返回一个 <T> Future<T>,是表示任务等待完成的 Future。

public interface Callable<V>


返回结果并且可能抛出异常的任务。实现者定义了一个不带任何参数的叫做
call 的方法。
Callable 接口类似于
Runnable
,两者都是为那些其实例可能被另一个线程执行的类设计的。但是
Runnable 不会返回结果,并且无法抛出经过检查的异常。
Executors
类包含一些从其他普通形式转换成
Callable 类的实用方法。

Callable中的call()方法类似Runnable的run()方法,就是前者有返回值,后者没有。

当将一个Callable的对象传递给ExecutorService的submit方法,则该call方法自动在一个线程上执行,并且会返回执行结果Future对象。

同样,将Runnable的对象传递给ExecutorService的submit方法,则该run方法自动在一个线程上执行,并且会返回执行结果Future对象,但是在该Future对象上调用get方法,将返回null。

遗憾的是,在Java API文档中,这块介绍的很糊涂,估计是翻译人员还没搞清楚的缘故吧。或者说是注释不到位。下面看个例子:

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.*;

/**

* Callable接口测试

*

* @author leizhimin 2008-11-26 9:20:13

*/

public class CallableDemo {

public static
void main(String[] args) {

ExecutorService executorService = Executors.newCachedThreadPool();

List<Future<String>> resultList = new ArrayList<Future<String>>();

//创建10个任务并执行

for (int i = 0; i < 10; i++) {

//使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中

Future<String> future = executorService.submit(new TaskWithResult(i));

//将任务执行结果存储到List中

resultList.add(future);

}

//遍历任务的结果

for (Future<String> fs : resultList) {

try {

System.out.println(fs.get());
//打印各个线程(任务)执行的结果

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

} finally {

//启动一次顺序关闭,执行以前提交的任务,但不接受新任务。如果已经关闭,则调用没有其他作用。

executorService.shutdown();

}

}

}

}

class TaskWithResult
implements Callable<String> {

private int id;

public TaskWithResult(int id) {

this.id = id;

}

/**

* 任务的具体过程,一旦任务传给ExecutorService的submit方法,则该方法自动在一个线程上执行。

*

* @return

* @throws Exception

*/

public String call()
throws Exception {

System.out.println("call()方法被自动调用,干活!!! " + Thread.currentThread().getName());

//一个模拟耗时的操作

for (int i = 999999; i > 0; i--) ;

return
"call()方法被自动调用,任务的结果是:" + id + " " + Thread.currentThread().getName();

}

}

运行结果:

call()方法被自动调用,干活!!! pool-1-thread-1

call()方法被自动调用,干活!!! pool-1-thread-3

call()方法被自动调用,干活!!! pool-1-thread-4

call()方法被自动调用,干活!!! pool-1-thread-6

call()方法被自动调用,干活!!! pool-1-thread-2

call()方法被自动调用,干活!!! pool-1-thread-5

call()方法被自动调用,任务的结果是:0 pool-1-thread-1

call()方法被自动调用,任务的结果是:1 pool-1-thread-2

call()方法被自动调用,干活!!! pool-1-thread-2

call()方法被自动调用,干活!!! pool-1-thread-6

call()方法被自动调用,干活!!! pool-1-thread-4

call()方法被自动调用,任务的结果是:2 pool-1-thread-3

call()方法被自动调用,干活!!! pool-1-thread-3

call()方法被自动调用,任务的结果是:3 pool-1-thread-4

call()方法被自动调用,任务的结果是:4 pool-1-thread-5

call()方法被自动调用,任务的结果是:5 pool-1-thread-6

call()方法被自动调用,任务的结果是:6 pool-1-thread-2

call()方法被自动调用,任务的结果是:7 pool-1-thread-6

call()方法被自动调用,任务的结果是:8 pool-1-thread-4

call()方法被自动调用,任务的结果是:9 pool-1-thread-3

Process finished with exit code 0

本文出自 “熔 岩” 博客,转载请与作者联系!

三:

采用Java 5的ExecutorService来进行线程池的方式实现多线程,模拟客户端多用户向同一服务器端发送请求。

服务器端:
import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStream;

import java.io.InputStreamReader;

import java.io.OutputStream;

import java.io.PrintWriter;

import java.net.*;

import java.util.concurrent.*;

public class MultiThreadServer {

private int port=8821;

private ServerSocket serverSocket;

private ExecutorService executorService;//线程池

private final int POOL_SIZE=10;//单个CPU线程池大小

public MultiThreadServer() throws IOException{

serverSocket=new ServerSocket(port);

//Runtime的availableProcessor()方法返回当前系统的CPU数目.

executorService=Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*POOL_SIZE);

System.out.println("服务器启动");

}

public void service(){

while(true){

Socket socket=null;

try {

//接收客户连接,只要客户进行了连接,就会触发accept();从而建立连接

socket=serverSocket.accept();

executorService.execute(new Handler(socket));

} catch (Exception e) {

e.printStackTrace();

}

}

}

public static void main(String[] args) throws IOException {

new MultiThreadServer().service();

}

}

class Handler implements Runnable{

private Socket socket;

public Handler(Socket socket){

this.socket=socket;

}

private PrintWriter getWriter(Socket socket) throws IOException{

OutputStream socketOut=socket.getOutputStream();

return new PrintWriter(socketOut,true);

}

private BufferedReader getReader(Socket socket) throws IOException{

InputStream socketIn=socket.getInputStream();

return new BufferedReader(new InputStreamReader(socketIn));

}

public String echo(String msg){

return "echo:"+msg;

}

public void run(){

try {

System.out.println("New connection accepted "+socket.getInetAddress()+":"+socket.getPort());

BufferedReader br=getReader(socket);

PrintWriter pw=getWriter(socket);

String msg=null;

while((msg=br.readLine())!=null){

System.out.println(msg);

pw.println(echo(msg));

if(msg.equals("bye"))

break;

}

} catch (IOException e) {

e.printStackTrace();

}finally{

try {

if(socket!=null)

socket.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

客户端:

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.io.OutputStream;

import java.net.Socket;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class MultiThreadClient {

public static void main(String[] args) {

int numTasks = 10;

ExecutorService exec = Executors.newCachedThreadPool();

for (int i = 0; i < numTasks; i++) {

exec.execute(createTask(i));

}

}

// 定义一个简单的任务

private static Runnable createTask(final int taskID) {

return new Runnable() {

private Socket socket = null;

private int port=8821;

public void run() {

System.out.println("Task " + taskID + ":start");

try {

socket = new Socket("localhost", port);

// 发送关闭命令

OutputStream socketOut = socket.getOutputStream();

socketOut.write("shutdown/r/n".getBytes());

// 接收服务器的反馈

BufferedReader br = new BufferedReader( new InputStreamReader(socket.getInputStream()));

String msg = null;

while ((msg = br.readLine()) != null)

System.out.println(msg);

} catch (IOException e) {

e.printStackTrace();

}

}

};

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: