java中无锁并发数组队列及其应用
2013-11-02 17:31
513 查看
在java中,无锁并发数据结构已经有一大堆了,例如,ConcurrentLinkedQueue、ConcurrentSkipListSet、ConcurrentSkipListMap、ConcurrentHashMap(ConcurrentHashMap实际上是有锁的,只是锁的粒度更小)等。
但是在某个应用中,我需要一种无锁并发的数组容器,并且在容器生成时就指定容量,不需要动态扩展(以避免jvm恼人的频繁gc)。于是 ConcurrentArrayQueue 就有存在的必要了,而jdk并没有提供这种结构,我们自己动手写一个。
首先搜索了网上相关资料,找到一种c++的实现,看了一下,实现的还是非常巧妙的,这里是传送门。
将其翻译成java代码如下
其原理就不多说了,给出的参考文章说的比较清楚了(特别是其给出的图片:-)
然后说一下应用,这里是用上面的结构写了一个对象池(ObjectPool)。
首先是可缓存对象工厂接口:
接着是对象池:
但是在某个应用中,我需要一种无锁并发的数组容器,并且在容器生成时就指定容量,不需要动态扩展(以避免jvm恼人的频繁gc)。于是 ConcurrentArrayQueue 就有存在的必要了,而jdk并没有提供这种结构,我们自己动手写一个。
首先搜索了网上相关资料,找到一种c++的实现,看了一下,实现的还是非常巧妙的,这里是传送门。
将其翻译成java代码如下
import java.util.concurrent.atomic.AtomicInteger; /** * 参考: * http://www.codeproject.com/Articles/153898/Yet-another-implementation-of-a-lock-free-circular */ public class ConcurrentArrayQueue <T> { // 环状缓存 private final Object[] ring; private final AtomicInteger maximumReadIndex = new AtomicInteger(0); private final AtomicInteger readIndex = new AtomicInteger(0); private final AtomicInteger writeIndex = new AtomicInteger(0); public ConcurrentArrayQueue(int capacity) { if (capacity < 0) throw new IllegalArgumentException("Illegal capacity " + capacity); ring = new Object[capacity + 1]; } public boolean push(T e) { int currentReadIndex, currentWriteIndex; do { currentReadIndex = readIndex.get(); currentWriteIndex = writeIndex.get(); // check if queue is full if (((currentWriteIndex + 1) % ring.length) == (currentReadIndex % ring.length)) return false; } while (!writeIndex.compareAndSet(currentWriteIndex, currentWriteIndex + 1)); // We know now that this index is reserved for us. Use it to save the data ring[currentWriteIndex % ring.length] = e; // update the maximum read index after saving the data. // It might fail if there are more than 1 producer threads because this // operation has to be done in the same order as the previous CAS while (!maximumReadIndex.compareAndSet(currentWriteIndex, currentWriteIndex + 1)) { // this is a good place to yield the thread in case there are more // software threads than hardware processors and you have more // than 1 producer thread // have a look at sched_yield() (POSIX.1b) Thread.yield(); } return true; } public T pop() { int currentMaximumReadIndex; int currentReadIndex; while (true) { currentReadIndex = readIndex.get(); currentMaximumReadIndex = maximumReadIndex.get(); // The queue is empty or a producer thread has allocate space in the queue // but is waiting to commit the data into it if ((currentReadIndex % ring.length) == (currentMaximumReadIndex % ring.length)) return null; // retrieve the data from the queue @SuppressWarnings("unchecked") T ret = (T) ring[currentReadIndex % ring.length]; if (readIndex.compareAndSet(currentReadIndex, currentReadIndex + 1)) return ret; // 这里没有办法清理残余的引用,可能导致内存泄露 } } public int size() { int ret = writeIndex.get() - readIndex.get(); if (ret < 0) return 0; return ret; } public void clear() { while (size() > 0) pop(); } public int capactity() { return ring.length - 1; } }
其原理就不多说了,给出的参考文章说的比较清楚了(特别是其给出的图片:-)
然后说一下应用,这里是用上面的结构写了一个对象池(ObjectPool)。
首先是可缓存对象工厂接口:
/** * 用于配置对象池 */ public abstract class PoolableObjectFactory <T> { /** * 新建对象 */ public abstract T newObject(); /** * 还原对象状态,避免内存泄露和垃圾信息导致的不确定性问题 */ public abstract void passivateObject(T obj); /** * 最多缓存的对象数 */ public int maxPooled() { return 10; } }
接着是对象池:
import util.ConcurrentArrayQueue; /** * 对象池 */ public class ObjectPool <T> { // XXX 使用数组而不是链表,是为了避免反复的 new 节点 private final ConcurrentArrayQueue<T> ring; private final PoolableObjectFactory<T> factory; public ObjectPool(PoolableObjectFactory<T> factory) { this.factory = factory; this.ring = new ConcurrentArrayQueue<T>(factory.maxPooled()); } public T borrowObject() { T ret = ring.pop(); if (ret == null) ret = factory.newObject(); return ret; } public T returnObject(T obj) { if (obj == null) return null; // 清理对象 factory.passivateObject(obj); // 对象入池 ring.push(obj); // 丢弃对象 return null; } public void clear() { ring.clear(); } }
相关文章推荐
- (13)多线程与并发库之java5阻塞队列(BlockingQueue)的应用----子线程循环10次,接着主线程循环100次,接着又回到子线程循环10次,接着再回到主线程循环100次,如此循环50次
- JAVA并发API源码解析:并发数据结构、线程、线程池及其应用
- java并发包消息队列及在开源软件中的应用-BlockingQueue
- 黑马程序员----------java基础知识(6)之数组查找和二维数组及其应用
- 线程高级应用-心得7-java5线程并发库中阻塞队列Condition的应用及案例分析
- 浅谈Java基础中的数组及其应用
- 线程高级应用-心得7-java5线程并发库中阻塞队列Condition的应用及案例分析
- java中自己用过的数据结构(队列Queue、优先级队列PriorityQueue和栈Stack),及其分别的应用场景
- 【Java学习笔记之九】java二维数组及其多维数组的内存应用拓展延伸
- 【张孝祥并发课程笔记】14:java5阻塞队列的应用
- Java多线程与并发库高级应用-可阻塞的队列
- java多线程之线程并发库阻塞队列的应用
- 线程高级应用-心得7-java5线程并发库中阻塞队列Condition的应用及案例分析
- 【Java多线程与并发库】16.java5阻塞队列的应用
- Java多线程与并发应用-(10)-java阻塞队列实现ArrayBlockingQueue
- java中数组的定义及其基础应用
- 并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法 在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。
- (13)多线程与并发库之java5阻塞队列(BlockingQueue)的应用----子线程循环10次,接着主线程循环100次,接着又回到子线程循环10次,接着再回到主线程循环100次,如此循环50次
- 基于数组实现Java 自定义Queue队列及应用
- Java多线程与并发库高级应用之阻塞队列BlockingQueue