您的位置:首页 > 产品设计 > UI/UE

Java线程和多线程(十一)——BlockingQueue

2016-10-22 18:51 253 查看
这次讨论的是Java的
BlockingQueue
java.util.concurrent.BlockingQueue
是一个Java的队列接口,支持一系列操作,比如,在获取和移除对象的时候如果队列为空会来等待队列变成非空的,而当队列满了的时候,插入元素会等待队列中的元素被移除,保证队列有空余的空间。

Java的
BlockingQueue
是不能接受
null
的值,如果传入
null
将会跑出
NullPointerException
的。


如果看到JDK的源码的话,其中会包含一个非空检查的方法

/**
* Throws NullPointerException if argument is null.
*
* @param v the element
*/
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}


所以一旦传入
null
,就会抛出
NullPointerException




Java的
BlockingQueue
的实现类都是线程安全的。所有的方法都是原子性的,用得都是内部锁或者是其他的并发控制机制。

Java的
BlockingQueue
接口是Java集合框架的一部分,可以用来实现生产者消费者的问题。开发者不需要担心队列空间是否足够生产者或者是队列中的对象是否可以让消费者来进行操作,因为这些实现都由
BlockingQuene
的实现来完成的。

Java提供了几种不同的
BlockingQueue
的实现,比如
ArrayBlockingQueue
LinkedBlockingQueue
PriorityBlockingQueue
SynchronousQueue
等等。

我们将通过
ArrayBlockingQueue
来实现生产者消费者的问题。下面就是
BlockingQueue
的一些开发者需要知道的方法。

put(E e)
:这个方法用来元素插入到队列之中。如果队列满了,那么该操作将会阻塞,等待队列重新有空间。

E take()
:这个方法将从一个队列中获取并从队列头部移除出去。如果队列是空的,它会等待队列中的元素重新可以获取为止。

下面我们就可以通过阻塞队列来实现生产者消费者问题。

Java阻塞队列举例——消息

一个普通的Java对象可以由生产者添加到队列中,开发者也可以称之为装在或者是队列消息。

package com.sapphire.concurrency;

public class Message {
private String msg;

public Message(String str){
this.msg=str;
}

public String getMsg() {
return msg;
}

}


Java阻塞队列举例——生产者

生产者的类将会创建消息,并且将之放到队列中,参考下面的代码:

package com.sapphire.concurrency;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

private BlockingQueue<Message> queue;

public Producer(BlockingQueue<Message> q){
this.queue=q;
}
@Override
public void run() {
//produce messages
for(int i=0; i<100; i++){
Message msg = new Message(""+i);
try {
Thread.sleep(i);
queue.put(msg);
System.out.println("Produced "+msg.getMsg());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//adding exit message
Message msg = new Message("exit");
try {
queue.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


可以看得出来,
run
方法会将消息插入到阻塞队列之中。

Java阻塞队列举例——消费者

消费者类将会从队列中处理消息,并且当收到
exit
消息的时候结束,参考如下代码:

package com.sapphire.concurrency;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable{

private BlockingQueue<Message> queue;

public Consumer(BlockingQueue<Message> q){
this.queue=q;
}

@Override
public void run() {
try{
Message msg;
//consuming messages until exit message is received
while((msg = queue.take()).getMsg() !="exit"){
Thread.sleep(10);
System.out.println("Consumed "+msg.getMsg());
}
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}


而生产者的
run
方法会将阻塞队列中的元素拿出来。

Java阻塞队列举例——服务

最后,我们为生产者和消费者创建了阻塞队列服务。这个生产生消费者服务将会创建固定长度的阻塞队列,并由生产者和消费者所共用。这个服务将同时启动生产者和消费者线程,代码如下:

package com.sapphire.concurrency;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerService {

public static void main(String[] args) {
//Creating BlockingQueue of size 10
BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
//starting producer to produce messages in queue
new Thread(producer).start();
//starting consumer to consume messages from queue
new Thread(consumer).start();
System.out.println("Producer and Consumer has been started");
}

}


输出如下:

roducer and Consumer has been started
Produced 0
Produced 1
Produced 2
Produced 3
Produced 4
Consumed 0
Produced 5
Consumed 1
Produced 6
Produced 7
Consumed 2
Produced 8
...


其中的
Thread.sleep
方法用来让生产者和消费者能够进行延时性的生产和消费message对象。


例子中使用的方法为
put(E e)
方法,其实在
BlockingQueue
中针对将元素加入队列的方法是有三个的

1.
void put(E e)
:当阻塞队列是限制长度的阻塞队列时,如果队列已满,
put
方法是会持续等待,直到队列有空余的位置的。所以这个方法其实是一个阻塞的方法。如上面的代码,当队列的长度我们限制为10的时候,如果消费者没有及时消费掉message对象,那么生产者的线程会在执行put的时候持续阻塞掉。

2.
boolean add(E e)
:该方法和
put
方法的不同之处是该方法是非阻塞的,当无法将元素插入到队列中时,会抛出
IlleaglaStateException


3.
boolean offer(E e)
:该方法其实基本和
add
方法是一致的,但是当队列是满的时候,
offer
方法返回的是
false
,而
add
方法是抛出异常。

事实上,在
ArrayBlockingQueue
add
实现就是调用
offer
的,当
offer
返回
false
的时候直接抛出异常。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐