您的位置:首页 > 编程语言 > Java开发

java实现的线程池

2016-07-08 15:57 387 查看
线程池(Thread Pool)对于限制应用程序中同一时刻运行的线程数很有用。因为每启动一个新线程都会有相应的性能开销,每个线程都需要给栈分配一些内存等等。

我们可以把并发执行的任务传递给一个线程池,来替代为每个并发执行的任务都启动一个新的线程。只要池里有空闲的线程,任务就会分配给一个线程执行。在线程池的内部,任务被插入一个阻塞队列(Blocking Queue ),线程池里的线程会去取这个队列里的任务。当一个新任务插入队列时,一个空闲线程就会成功的从队列中取出任务并且执行它。

每个任务是一个Runnable对象,阻塞队列控制任务的数量,调度线程通过execute()向阻塞队列中添加任务,当阻塞队列满时调度线程阻塞。线程池中的线程不断从阻塞队列中取出任务执行,直到被stop()为止。

public class ThreadPool {
private BlockingQueue<Runnable> taskQueue = null;
private List<PoolThread> threads = new ArrayList<>();
private boolean isStopped = false;

public ThreadPool(int noOfThreads, int maxNoOfTasks) {
taskQueue = new BlockingQueue<>(maxNoOfTasks);

for (int i = 0; i < noOfThreads; ++i) {
threads.add(new PoolThread(taskQueue));
}

for (PoolThread thread : threads) {
thread.start();
}
}

public synchronized void execute(Runnable task) {
if (this.isStopped) {
throw new IllegalStateException("ThreadPool is stopped");
}
try {
this.taskQueue.enqueue(task);//任务队列满时会阻塞
} catch (InterruptedException e) {
//...
}
}

public synchronized void stop() {
this.isStopped = true;
for (PoolThread thread : threads) {
thread.toStop();
}
}
}

class PoolThread extends Thread {

private BlockingQueue<Runnable> taskQueue = null;
private boolean isStopped = false;

//每个线程需要维护一个任务队列的引用,从任务队列中取任务对象运行
public PoolThread(BlockingQueue<Runnable> queue) {
this.taskQueue = queue;
}

public void run() {
while (!isStopped) {
try {
Runnable runnable = taskQueue.take();
runnable.run();
taskQueue.dequeue();
} catch (Exception e) {
//...
}
}
}

public synchronized void toStop() {
isStopped = true;
this.interrupt();//中断阻塞的线程
}

public synchronized boolean isStopped() {
return isStopped;
}
}


自己实现的阻塞队列的代码
public class BlockingQueue <T>{
private List<T> queue = new LinkedList<>();
private int limit = 10;

public BlockingQueue(int limit) {
//队列有一个上限值
this.limit = limit;
}

public synchronized void enqueue(T item) throws InterruptedException {
//如果队列已满,向队列中添加元素的线程会被挂起,直到队列中有元素被取走
while (this.queue.size() == this.limit) {
this.wait();
}

if (this.queue.isEmpty()) {
//队列马上就不空了,唤醒想要从队列中取元素线程
notifyAll();
}

this.queue.add(item);
}

public synchronized void dequeue() throws InterruptedException {
//如果队列为空,从队列中取元素的线程会被挂起
while (this.queue.isEmpty()) {
this.wait();
}

if (this.queue.size() == this.limit) {
//唤醒想要向队列中添加元素而阻塞的线程
notifyAll();
}

this.queue.remove(0);
}

public synchronized T take() throws InterruptedException {
while (this.queue.isEmpty()) {
this.wait();
}

return this.queue.get(0);
}
}


参考文献:http://tutorials.jenkov.com/java-concurrency/thread-pools.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 并发 线程池