您的位置:首页 > 编程语言 > Java开发

java中无锁并发数组队列及其应用

2013-11-02 17:31 513 查看
在java中,无锁并发数据结构已经有一大堆了,例如,ConcurrentLinkedQueue、ConcurrentSkipListSet、ConcurrentSkipListMap、ConcurrentHashMap(ConcurrentHashMap实际上是有锁的,只是锁的粒度更小)等。

但是在某个应用中,我需要一种无锁并发的数组容器,并且在容器生成时就指定容量,不需要动态扩展(以避免jvm恼人的频繁gc)。于是 ConcurrentArrayQueue 就有存在的必要了,而jdk并没有提供这种结构,我们自己动手写一个。

首先搜索了网上相关资料,找到一种c++的实现,看了一下,实现的还是非常巧妙的,这里是传送门

将其翻译成java代码如下

import java.util.concurrent.atomic.AtomicInteger;

/**
* 参考:
* http://www.codeproject.com/Articles/153898/Yet-another-implementation-of-a-lock-free-circular */
public class ConcurrentArrayQueue <T> {

// 环状缓存
private final Object[] ring;
private final AtomicInteger maximumReadIndex = new AtomicInteger(0);
private final AtomicInteger readIndex = new AtomicInteger(0);
private final AtomicInteger writeIndex = new AtomicInteger(0);

public ConcurrentArrayQueue(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException("Illegal capacity " + capacity);
ring = new Object[capacity + 1];
}

public boolean push(T e) {
int currentReadIndex, currentWriteIndex;

do {
currentReadIndex = readIndex.get();
currentWriteIndex = writeIndex.get();

// check if queue is full
if (((currentWriteIndex + 1) % ring.length) ==
(currentReadIndex % ring.length))
return false;
} while (!writeIndex.compareAndSet(currentWriteIndex, currentWriteIndex + 1));

// We know now that this index is reserved for us. Use it to save the data
ring[currentWriteIndex % ring.length] = e;

// update the maximum read index after saving the data.
// It might fail if there are more than 1 producer threads because this
// operation has to be done in the same order as the previous CAS
while (!maximumReadIndex.compareAndSet(currentWriteIndex, currentWriteIndex + 1)) {
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield() (POSIX.1b)
Thread.yield();
}

return true;
}

public T pop() {
int currentMaximumReadIndex;
int currentReadIndex;

while (true) {
currentReadIndex = readIndex.get();
currentMaximumReadIndex = maximumReadIndex.get();

// The queue is empty or a producer thread has allocate space in the queue
// but is waiting to commit the data into it
if ((currentReadIndex % ring.length) ==
(currentMaximumReadIndex % ring.length))
return null;

// retrieve the data from the queue
@SuppressWarnings("unchecked")
T ret = (T) ring[currentReadIndex % ring.length];

if (readIndex.compareAndSet(currentReadIndex, currentReadIndex + 1))
return ret; // 这里没有办法清理残余的引用,可能导致内存泄露
}
}

public int size() {
int ret = writeIndex.get() - readIndex.get();
if (ret < 0)
return 0;
return ret;
}

public void clear() {
while (size() > 0)
pop();
}

public int capactity() {
return ring.length - 1;
}
}

其原理就不多说了,给出的参考文章说的比较清楚了(特别是其给出的图片:-)

然后说一下应用,这里是用上面的结构写了一个对象池(ObjectPool)。

首先是可缓存对象工厂接口:

/**
* 用于配置对象池
*/
public abstract class PoolableObjectFactory <T> {

/**
* 新建对象
*/
public abstract T newObject();

/**
* 还原对象状态,避免内存泄露和垃圾信息导致的不确定性问题
*/
public abstract void passivateObject(T obj);

/**
* 最多缓存的对象数
*/
public int maxPooled() {
return 10;
}
}

接着是对象池:

import util.ConcurrentArrayQueue;

/**
* 对象池
*/
public class ObjectPool <T> {

// XXX 使用数组而不是链表,是为了避免反复的 new 节点
private final ConcurrentArrayQueue<T> ring;
private final PoolableObjectFactory<T> factory;

public ObjectPool(PoolableObjectFactory<T> factory) {
this.factory = factory;
this.ring = new ConcurrentArrayQueue<T>(factory.maxPooled());
}

public T borrowObject() {
T ret = ring.pop();
if (ret == null)
ret = factory.newObject();
return ret;
}

public T returnObject(T obj) {
if (obj == null)
return null;

// 清理对象
factory.passivateObject(obj);

// 对象入池
ring.push(obj);

// 丢弃对象
return null;
}

public void clear() {
ring.clear();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐