您的位置:首页 > 编程语言 > Java开发

java lock await方法会释放掉当前锁 标准的生产者消费者问题

2017-11-29 16:17 429 查看
In all cases, before this method can return the current thread must

re-acquire the lock associated with this condition. When the

thread returns it is guaranteed to hold this lock.
会释放,其他线程执行Condition.signal(),之前的线程会重新获得锁,继续执行,
AbstractQueuedSynchronizer.java 第2040行,释放锁

  //往篮子里面放馒头  

     public void push(ManTou m){  

         lock.lock();  

         try {  

             while(max == manTous.size()){  

                 System.out.println("篮子是满的,待会儿再生产...");  

                 full.await();  

             }  

             manTous.add(m);  

             empty.signal();  

         } catch (InterruptedException e) {  

             e.printStackTrace();  

         }finally{  

             lock.unlock();  

         }  

     }  

//往篮子里面取馒头  

     public ManTou pop(){  

         ManTou m = null;  

         lock.lock();  

         try {  

             while(manTous.size() == 0){  

                 System.out.println("篮子是空的,待会儿再吃...");  

                 empty.await();  

             }  

             m = manTous.removeFirst();  

             full.signal();  

         } catch (InterruptedException e) {  

             e.printStackTrace();  

         }finally{  

             lock.unlock();  

             return m;  

         }  

     }  

对于上面例子push   full.await()执行后,

 manTous.add(m);  

             empty.signal(); 

不在执行,直接执行finally方法 释放掉锁,
不会影响其他线程执行remove之类的函数
对于其他线程增加push方法  也会进入await状态

当pop函数执行 后,集合可以再次push可以执行 ,full.signal(); 唤醒 await状态线程的push操作。



以下转载:

生产消费实例 参考

java 并发包下的提供Lock,Lock相对于Synchronized可以更好的解决线程同步问题,更加的灵活和高效,并且ReadWriteLock锁还能实现读、写的分离。但线程间仅仅互斥是不够的,还需要通信,本篇的内容是基于上篇之上,使用Lock如何处理线程通信。阻塞队列(BlockingQueue)就是使用condition的和lock实现的。可以查看:Java并发编程-阻塞队列(BlockingQueue)的实现原理

Condition 

那么引入本篇的主角,Condition,Condition 将 Object的通信方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 通信方法的使用。

在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通信方式,Condition都可以实现,这里注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。

Condition的强大之处在于它可以为多个线程间建立不同的Condition, 使用synchronized/wait()只有一个阻塞队列,notifyAll会唤起所有阻塞队列下的线程,而使用lock/condition,可以实现多个阻塞队列,signalAll只会唤起某个阻塞队列下的阻塞线程。

下面用两种方式编写生产者/消费者模式代码加以说明。

- 使用synchronized/wait()实现生产者消费者模式如下:
//模拟生产和消费的对象
class Buffer {
private int maxSize;
private List<Date> storage;
Buffer(int size){
maxSize=size;
storage=new LinkedList<>();
}
//生产方法
public synchronized void put()  {
try {
while (storage.size() ==maxSize ){//如果队列满了
System.out.print(Thread.currentThread().getName()+": wait \n");;
wait();//阻塞线程
}
storage.add(new Date());
System.out.print(Thread.currentThread().getName()+": put:"+storage.size()+ "\n");
Thread.sleep(1000);
notifyAll();//唤起线程
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//消费方法
public synchronized void take() {
try {
while (storage.size() ==0 ){//如果队列满了
System.out.print(Thread.currentThread().getName()+": wait \n");;
wait();//阻塞线程
}
Date d=((LinkedList<Date>)storage).poll();
System.out.print(Thread.currentThread().getName()+": take:"+storage.size()+ "\n");
Thread.sleep(1000);
notifyAll();//唤起线程
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
//生产者
class Producer implements Runnable{
private Buffer buffer;
Producer(Buffer b){
buffer=b;
}
@Override
public void run() {
while(true){
buffer.put();
}
}
}
//消费者
class Consumer implements Runnable{
private Buffer buffer;
Consumer(Buffer b){
buffer=b;
}
@Override
public void run() {
while(true){
buffer.take();
}
}
}
//
public class Main{
public static void main(String[] arg){
Buffer buffer=new Buffer(10);
Producer producer=new Producer(buffer);
Consumer consumer=new Consumer(buffer);
//创建线程执行生产和消费
for(int i=0;i<3;i++){
new Thread(producer,"producer-"+i).start();
}
for(int i=0;i<3;i++){
new Thread(consumer,"consumer-"+i).start();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83

- 使用lock/condition实现生产者消费者模式如下:
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Buffer {
private  final Lock lock;
private  final Condition notFull;
private  final Condition notEmpty;
private int maxSize;
private List<Date> storage;
Buffer(int size){
//使用锁lock,并且创建两个condition,相当于两个阻塞队列
lock=new ReentrantLock();
notFull=lock.newCondition();
notEmpty=lock.newCondition();
maxSize=size;
storage=new LinkedList<>();
}
public void put()  {
lock.lock();
try {
while (storage.size() ==maxSize ){//如果队列满了
System.out.print(Thread.currentThread().getName()+": wait \n");;
notFull.await();//阻塞生产线程
}
storage.add(new Date());
System.out.print(Thread.currentThread().getName()+": put:"+storage.size()+ "\n");
Thread.sleep(1000);
notEmpty.signalAll();//唤醒消费线程
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
lock.unlock();
}
}

public  void take() {
lock.lock();
try {
while (storage.size() ==0 ){//如果队列满了
System.out.print(Thread.currentThread().getName()+": wait \n");;
notEmpty.await();//阻塞消费线程
}
Date d=((LinkedList<Date>)storage).poll();
System.out.print(Thread.currentThread().getName()+": take:"+storage.size()+ "\n");
Thread.sleep(1000);
notFull.signalAll();//唤醒生产线程

} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
lock.unlock();
}
}
}

class Producer implements Runnable{
private Buffer buffer;
Producer(Buffer b){
buffer=b;
}
@Override
public void run() {
while(true){
buffer.put();
}
}
}
class Consumer implements Runnable{
private Buffer buffer;
Consumer(Buffer b){
buffer=b;
}
@Override
public void run() {
while(true){
buffer.take();
}
}
}
public class Main{
public static void main(String[] arg){
Buffer buffer=new Buffer(10);
Producer producer=new Producer(buffer);
Consumer consumer=new Consumer(buffer);
for(int i=0;i<3;i++){
new Thread(producer,"producer-"+i).start();
}
for(int i=0;i<3;i++){
new Thread(consumer,"consumer-"+i).start();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

- 当生产者执行put方法时,调用notEmpty.signalAll()只会唤醒notEmpty.await()下的消费者线程。 
- 当消费者执行塔克方法时,调用notFull.signalAll()只会唤醒notFull.await()下的消费者线程。

另一篇:


Lock的await/singal 和 Object的wait/notify 的区别

在使用Lock之前,我们都使用Object 的wait和notify实现同步的。举例来说,一个producer和consumer,consumer发现没有东西了,等待,produer生成东西了,唤醒。
线程consumer线程producer
synchronize(obj){ 

    obj.wait();//没东西了,等待 

}
synchronize(obj){ 

    obj.notify();//有东西了,唤醒 

}
有了lock后,世道变了,现在是:
lock.lock(); 

condition.await(); 

lock.unlock();
lock.lock(); 

condition.signal(); 

lock.unlock();
为了突出区别,省略了若干细节。区别有三点:
1. lock不再用synchronize把同步代码包装起来;
2. 阻塞需要另外一个对象condition;
3. 同步和唤醒的对象是condition而不是lock,对应的方法是await和signal,而不是wait和notify。

为什么需要使用condition呢?简单一句话,lock更灵活。以前的方式只能有一个等待队列,在实际应用时可能需要多个,比如读和写。为了这个灵活性,lock将同步互斥控制和等待队列分离开来,互斥保证在某个时刻只有一个线程访问临界区(lock自己完成),等待队列负责保存被阻塞的线程(condition完成)。

通过查看ReentrantLock的源代码发现,condition其实是等待队列的一个管理者,condition确保阻塞的对象按顺序被唤醒。

在Lock的实现中,LockSupport被用来实现线程状态的改变,后续将更进一步研究LockSupport的实现机制。

[java] view
plain copy

 package com.thread;  

   

 import java.util.LinkedList;  

 import java.util.concurrent.locks.Condition;  

 import java.util.concurrent.locks.Lock;  

 import java.util.concurrent.locks.ReentrantLock;  

   

   

 /** 

  * 使用Lock来实现生产者和消费者问题 

  *  

  *  

  * 

  */  

 public class ProducerConsumer {  

     public static void main(String[] args) {  

         Basket b = new Basket();  

         Product p = new Product(b);  

         Consumer c = new Consumer(b);  

         Consumer c1 = new Consumer(b);  

         new Thread(p).start();  

         new Thread(c).start();  

         new Thread(c1).start();  

     }  

 }  

 //馒头  

 class ManTou{  

     int id;  

     public ManTou(int id) {  

         this.id = id;  

     }  

     @Override  

     public String toString() {  

         return "ManTou"+id;  

     }  

 }  

   

 //装馒头的篮子  

 class Basket{  

     int max = 6;  

     LinkedList<ManTou> manTous = new LinkedList<ManTou>();  

     Lock lock = new ReentrantLock(); //锁对象  

     Condition full = lock.newCondition();  //用来监控篮子是否满的Condition实例  

     Condition empty = lock.newCondition(); //用来监控篮子是否空的Condition实例  

     //往篮子里面放馒头  

     public void push(ManTou m){  

         lock.lock();  

         try {  

             while(max == manTous.size()){  

                 System.out.println("篮子是满的,待会儿再生产...");  

                 full.await();  

             }  

             manTous.add(m);  

             empty.signal();  

         } catch (InterruptedException e) {  

             e.printStackTrace();  

         }finally{  

             lock.unlock();  

         }  

     }  

     //往篮子里面取馒头  

     public ManTou pop(){  

         ManTou m = null;  

         lock.lock();  

         try {  

             while(manTous.size() == 0){  

                 System.out.println("篮子是空的,待会儿再吃...");  

                 empty.await();  

             }  

             m = manTous.removeFirst();  

             full.signal();  

         } catch (InterruptedException e) {  

             e.printStackTrace();  

         }finally{  

             lock.unlock();  

             return m;  

         }  

     }  

 }  

 //生产者  

 class Product implements Runnable{  

     Basket basket;  

     public Product(Basket basket) {  

         this.basket = basket;  

     }  

     public void run() {  

         for (int i = 0; i < 40; i++) {  

             ManTou m = new ManTou(i);  

             basket.push(m);  

             System.out.println("生产了"+m);  

             try {  

                 Thread.sleep((int)(Math.random()*2000));  

             } catch (InterruptedException e) {  

                 e.printStackTrace();  

             }  

               

         }  

     }  

 }  

  

//消费者  

class Consumer implements Runnable{  

    Basket basket;  

    public Consumer(Basket basket) {  

        this.basket = basket;  

    }  

    public void run() {  

        for (int i = 0; i < 20; i++) {  

            try {  

                Thread.sleep((int)(Math.random()*2000));  

            } catch (InterruptedException e) {  

                e.printStackTrace();  

            }  

            ManTou m = basket.pop();  

            System.out.println("消费了"+m);  

        }  

    }  

}  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐