您的位置:首页 > 其它

线程同步工具类

2016-10-21 10:16 211 查看


阅读目录

写在前面
闭锁
信号量
栅栏
阻塞队列



写在前面

同步工具类主要包括闭锁(如CountDownLatch),栅栏(如CyclicBarrier),信号量(如Semaphore)和阻塞队列(如LinkedBlockingQueue)等;

使用同步工具类可以协调线程的控制流;

同步工具类封装了一些状态,这些状态决定线程是继续执行还是等待,此外同步工具类还提供了修改状态的方法;

下面将简单介绍以上同步工具类;



闭锁

可以让一个线程等待一组事件发生后(不一定要线程结束)继续执行;

以CountDownLatch为例,内部包含一个计数器,一开始初始化为一个整数(事件个数),发生一个事件后,调用countDown方法,计数器减1,await用于等待计数器为0后继续执行当前线程;

举个例子如下,main线程等待其它子线程的事件发生后继续执行main线程:

package concurrency;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class TaskTest implements Runnable {

private CountDownLatch latch;
private int sleepTime;

/**
*
*/
public TaskTest(int sleepTime, CountDownLatch latch) {
this.sleepTime = sleepTime;
this.latch = latch;
}

/**
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try {
CountDownLatchTest.print(" is running。");
TimeUnit.MILLISECONDS.sleep(sleepTime);
CountDownLatchTest.print(" finished。");
//计数器减减
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

public class CountDownLatchTest {
public static void main(String[] args) {
int count = 10;
final CountDownLatch latch = new CountDownLatch(count);
ExecutorService es = Executors.newFixedThreadPool(count);
for (int i = 0; i < count; i++) {
es.execute(new TaskTest((i + 1) * 1000, latch));
}

try {
CountDownLatchTest.print(" waiting...");
//主线程等待其它事件发生
latch.await();
//其它事件已发生,继续执行主线程
CountDownLatchTest.print(" continue。。。");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
es.shutdown();
}
}

public static void print(String str){
SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
System.out.println("[" + dfdate.format(new Date()) + "]" + Thread.currentThread().getName() + str);
}
}


结果打印如下:

 此外,FutureTask也可用作闭锁,其get方法会等待任务完成后返回结果,否则一直阻塞直到任务完成;



信号量

控制同时执行某个指定操作的数量,常用于实现资源池,如数据库连接池,线程池...

以Semaphore为例,其内部维护一组资源,可以通过构造函数指定数目,其它线程在执行的时候,可以通过acquire方法获取资源,有的话,继续执行(使用结束后释放资源),没有资源的话将阻塞直到有其它线程调用release方法释放资源;

举个例子,如下代码,十个线程竞争三个资源,一开始有三个线程可以直接运行,剩下的七个线程只能阻塞等到其它线程使用资源完毕才能执行;

package concurrency;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreTest {

public static void print(String str){
SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
System.out.println("[" + dfdate.format(new Date()) + "]" + Thread.currentThread().getName() + str);
}

public static void main(String[] args) {
// 线程数目
int threadCount = 10;
// 资源数目
Semaphore semaphore = new Semaphore(3);

ExecutorService es = Executors.newFixedThreadPool(threadCount);

// 启动若干线程
for (int i = 0; i < threadCount; i++)
es.execute(new ConsumeResourceTask((i + 1) * 1000, semaphore));
}
}

class ConsumeResourceTask implements Runnable {
private Semaphore semaphore;
private int sleepTime;

/**
*
*/
public ConsumeResourceTask(int sleepTime, Semaphore semaphore) {
this.sleepTime = sleepTime;
this.semaphore = semaphore;
}

public void run() {
try {
//获取资源
semaphore.acquire();
SemaphoreTest.print(" 占用一个资源...");
TimeUnit.MILLISECONDS.sleep(sleepTime);
SemaphoreTest.print(" 资源使用结束,释放资源");
//释放资源
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


[10:30:11]pool-1-thread-1 占用一个资源...
[10:30:11]pool-1-thread-2 占用一个资源...
[10:30:11]pool-1-thread-3 占用一个资源...
[10:30:12]pool-1-thread-1 资源使用结束,释放资源
[10:30:12]pool-1-thread-4 占用一个资源...
[10:30:13]pool-1-thread-2 资源使用结束,释放资源
[10:30:13]pool-1-thread-5 占用一个资源...
[10:30:14]pool-1-thread-3 资源使用结束,释放资源
[10:30:14]pool-1-thread-8 占用一个资源...
[10:30:16]pool-1-thread-4 资源使用结束,释放资源
[10:30:16]pool-1-thread-6 占用一个资源...
[10:30:18]pool-1-thread-5 资源使用结束,释放资源
[10:30:18]pool-1-thread-9 占用一个资源...
[10:30:22]pool-1-thread-8 资源使用结束,释放资源
[10:30:22]pool-1-thread-7 占用一个资源...
[10:30:22]pool-1-thread-6 资源使用结束,释放资源
[10:30:22]pool-1-thread-10 占用一个资源...
[10:30:27]pool-1-thread-9 资源使用结束,释放资源
[10:30:29]pool-1-thread-7 资源使用结束,释放资源
[10:30:32]pool-1-thread-10 资源使用结束,释放资源




栅栏

栅栏用于等待其它线程,且会阻塞自己当前线程;

所有线程必须同时到达栅栏位置后,才能继续执行;

举个例子如下:

package concurrency;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class CyclicBarrierTaskTest implements Runnable {
private CyclicBarrier cyclicBarrier;

private int timeout;

public CyclicBarrierTaskTest(CyclicBarrier cyclicBarrier, int timeout) {
this.cyclicBarrier = cyclicBarrier;
this.timeout = timeout;
}

@Override
public void run() {
TestCyclicBarrier.print(" 正在running...");
try {
TimeUnit.MILLISECONDS.sleep(timeout);
TestCyclicBarrier.print(" 到达栅栏处,等待其它线程到达");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}

TestCyclicBarrier.print(" 所有线程到达栅栏处,继续执行各自线程任务...");
}
}

public class TestCyclicBarrier {

public static void print(String str) {
SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
System.out.println("[" + dfdate.format(new Date()) + "]"
+ Thread.currentThread().getName() + str);
}

public static void main(String[] args) {
int count = 5;

ExecutorService es = Executors.newFixedThreadPool(count);

CyclicBarrier barrier = new CyclicBarrier(count, new Runnable() {

@Override
public void run() {
TestCyclicBarrier.print(" 所有线程到达栅栏处,可以在此做一些处理...");
}
});
for (int i = 0; i < count; i++)
es.execute(new CyclicBarrierTaskTest(barrier, (i + 1) * 1000));
}

}


[11:07:00]pool-1-thread-2 正在running...
[11:07:00]pool-1-thread-1 正在running...
[11:07:00]pool-1-thread-5 正在running...
[11:07:00]pool-1-thread-3 正在running...
[11:07:00]pool-1-thread-4 正在running...
[11:07:01]pool-1-thread-1 到达栅栏处,等待其它线程到达
[11:07:02]pool-1-thread-2 到达栅栏处,等待其它线程到达
[11:07:03]pool-1-thread-3 到达栅栏处,等待其它线程到达
[11:07:04]pool-1-thread-4 到达栅栏处,等待其它线程到达
[11:07:05]pool-1-thread-5 到达栅栏处,等待其它线程到达
[11:07:05]pool-1-thread-5 所有线程到达栅栏处,可以在此做一些处理...
[11:07:05]pool-1-thread-1 所有线程到达栅栏处,继续执行各自线程任务...
[11:07:05]pool-1-thread-2 所有线程到达栅栏处,继续执行各自线程任务...
[11:07:05]pool-1-thread-5 所有线程到达栅栏处,继续执行各自线程任务...
[11:07:05]pool-1-thread-3 所有线程到达栅栏处,继续执行各自线程任务...
[11:07:05]pool-1-thread-4 所有线程到达栅栏处,继续执行各自线程任务...


public
class CompliteServiceMain {

public static void main(String[] args) {

int processors = Runtime.getRuntime().availableProcessors();

ExecutorService exec = Executors.newFixedThreadPool(processors);

ExecutorCompletionService<Integer> completionService =new ExecutorCompletionService<Integer>(exec);

final CyclicBarrier c=new CyclicBarrier(processors);

final long start = System.nanoTime();

for(int i=0;i<processors*2;i++)

{

completionService.submit(new Callable<Integer>() {

@Override

public Integer call() throws Exception {

Random random = new Random();

int ran = random.nextInt(1000);

Thread.sleep(ran);

System.out.println(Thread.currentThread().getName()+" sleep "+ran+"ms");

c.await();

System.out.println(System.nanoTime()-start);

return ran;

}

});

}

for(int i=0;i<processors*2;i++)

{

try {

Integer res = completionService.take().get();

System.out.println(res);

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

}

exec.shutdown();

}

}public class CompliteServiceMain {

public static void main(String[] args) {

int processors = Runtime.getRuntime().availableProcessors();

ExecutorService exec = Executors.newFixedThreadPool(processors);

ExecutorCompletionService<Integer> completionService =new ExecutorCompletionService<Integer>(exec);

final CyclicBarrier c=new CyclicBarrier(processors);

final long start = System.nanoTime();

for(int i=0;i<processors*2;i++)

{

completionService.submit(new Callable<Integer>() {

@Override

public Integer call() throws Exception {

Random random = new Random();

int ran = random.nextInt(1000);

Thread.sleep(ran);

System.out.println(Thread.currentThread().getName()+" sleep "+ran+"ms");

c.await();

System.out.println(System.nanoTime()-start);

return ran;

}

});

}

for(int i=0;i<processors*2;i++)

{

try {

Integer res = completionService.take().get();

System.out.println(res);

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

}

exec.shutdown();

}

}

pool-1-thread-2 sleep 365ms
pool-1-thread-1 sleep 777ms
pool-1-thread-3 sleep 870ms
pool-1-thread-4 sleep 859ms
988347215
988531958
988573323
988608157
859
365
777
870
pool-1-thread-3 sleep 847ms
pool-1-thread-1 sleep 880ms
pool-1-thread-4 sleep 897ms
pool-1-thread-2 sleep 959ms
1947452526
1947466210
1947526858
847
880
1947500422
897
959




阻塞队列

阻塞队列提供了可阻塞的入队和出对操作,如果队列满了,入队操作将阻塞直到有空间可用,如果队列空了,出队操作将阻塞直到有元素可用;

队列可以为有界和无界队列,无界队列不会满,因此入队操作将不会阻塞;

下面将使用阻塞队列LinkedBlockingQueue举个生产者-消费者例子,生产者每隔1秒生产1个产品,然后有6个消费者在消费产品,可以发现,每隔1秒,只有一个消费者能够获取到产品消费,其它线程只能等待...

如下代码:

package concurrency;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

//生产者
public class Producer implements Runnable {
private final BlockingQueue<String> fileQueue;

public Producer(BlockingQueue<String> queue) {
this.fileQueue = queue;

}

public void run() {
try {
while (true) {
TimeUnit.MILLISECONDS.sleep(1000);
String produce = this.produce();
System.out.println(Thread.currentThread() + "生产:" + produce);
fileQueue.put(produce);
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

public String produce() {
SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
return dfdate.format(new Date());
}

public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);

for (int i = 0; i < 1; i++) {
new Thread(new Producer(queue)).start();
}
for (int i = 0; i < 6; i++) {
new Thread(new Consumer(queue)).start();
}
}
}

// 消费者
class Consumer implements Runnable {
private final BlockingQueue<String> queue;

public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}

public void run() {
try {
while (true) {
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println(Thread.currentThread() + "prepare 消费");
System.out.println(Thread.currentThread() + "starting:"
+ queue.take());
System.out.println(Thread.currentThread() + "end 消费");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}


Thread[Thread-1,5,main]prepare 消费
Thread[Thread-3,5,main]prepare 消费
Thread[Thread-4,5,main]prepare 消费
Thread[Thread-2,5,main]prepare 消费
Thread[Thread-6,5,main]prepare 消费
Thread[Thread-5,5,main]prepare 消费
Thread[Thread-0,5,main]生产:11:36:36
Thread[Thread-1,5,main]starting:11:36:36
Thread[Thread-1,5,main]end 消费
Thread[Thread-1,5,main]prepare 消费
Thread[Thread-0,5,main]生产:11:36:37
Thread[Thread-4,5,main]starting:11:36:37
Thread[Thread-4,5,main]end 消费
Thread[Thread-4,5,main]prepare 消费
Thread[Thread-0,5,main]生产:11:36:38
Thread[Thread-3,5,main]starting:11:36:38
Thread[Thread-3,5,main]end 消费
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: