您的位置:首页 > 编程语言 > C语言/C++

C++实现的无锁队列

2013-12-06 17:42 316 查看
关于无锁队列的实现,网上有很多的文章,其实现原理都来自论文implementing_lock_free.pdf,国内关于无锁队列实现介绍的较好的博客:http://coolshell.cn/articles/8239.html

关于理论的介绍上述文章已经写的非常好了,因此本文不会复述无锁队列实现原理,只讲解下实现细节以及过程中遇到的问题。

本文实现的是一个多生产者多消费者无锁队列,相比于单生产者多消费者等要复杂很多。

实现代码:

#define dqix_atomic_cmp_set(lock, set, old)					  \
( (LONG) InterlockedCompareExchange( (LONG *)lock, (LONG)set, (LONG)old ) \
== (LONG)old )
#define dqix_atomic_fetch_add( p, add ) InterlockedExchangeAdd( (LONG *)p, add )
#define dqix_atomic_increase( p ) InterlockedExchangeAdd( (LONG *)p, 1 )
#define dqix_atomic_decrease( p ) InterlockedExchangeAdd( (LONG *)p, -1 )
#pragma intrinsic(_ReadWriteBarrier)
#define dqix_memory_barrier()  _ReadWriteBarrier()
template<typename T> class FreeList;
template<typename T>
struct GenericNode
{
GenericNode( FreeList<T> * free_ls = NULL )
:ref_cnt(0), next(NULL), _free_list(free_ls){}
~GenericNode(){}
inline void init( ) volatile
{
ref_cnt = 1;
next = NULL;
}

inline volatile GenericNode* add_ref( ) volatile
{
for( int cnt = ref_cnt; cnt > 0; cnt = ref_cnt ){
if ( dqix_atomic_cmp_set( &ref_cnt, cnt + 1, cnt ) ){
return this;
}
}
return NULL;
}

inline void release_ref( )volatile
{
if( dqix_atomic_decrease( &ref_cnt ) == 1 ){
if ( _free_list ){
_free_list->release( this );
return;
}
delete this;
}
}

inline int ref_count() volatile
{
return ref_cnt;
}

T value;
volatile GenericNode* volatile next;
private:
volatile int ref_cnt;
FreeList<T> *_free_list;
};

template<typename T>
class FreeList
{
public:
typedef GenericNode<T> Node;
typedef volatile Node* volatile AtomicNodePtr;
FreeList( size_t capacity = 100 ): _head(0), _tail(0), _capacity(capacity),_size(0), nil((void*)-1)
{
_head = new(std::nothrow) Node(this);
_head->next = (AtomicNodePtr)nil;
_tail = _head;
}
~FreeList(){}

AtomicNodePtr allocate() volatile
{
if ( _head == NULL ){
set_dqix_error( EDQIX_MEMFAIL );
return NULL;
}

for( ;; ){
AtomicNodePtr p = _head;
AtomicNodePtr pnext = _head->next;
if ( _head == _tail ){
p = new(std::nothrow) Node((FreeList*)this);
if ( !p ){
set_dqix_error( EDQIX_MEMFAIL );
return NULL;
}
p->init();
return p;
}
if ( dqix_atomic_cmp_set( &_head, p->next, p )){
if ( p->ref_count() == 0 ){
p->init();
dqix_atomic_decrease(&_size);
return p;
}
_CrtDbgBreak();
}
}
}

void release( volatile Node* volatile node ) volatile
{
if ( _size >= _capacity ){
delete node;
return;
}
node->next = ( AtomicNodePtr )nil;
memset( (void*)&(node->value), 0, sizeof(node->value) );
for (;;){
AtomicNodePtr old = _tail;
if ( dqix_atomic_cmp_set( &(old->next), node, nil ) ){
if( !dqix_atomic_cmp_set( &_tail, node, old ) ){
_CrtDbgBreak();
}
dqix_atomic_increase(&_size);
return;
}
}
}
private:
AtomicNodePtr  _head;
AtomicNodePtr  _tail;
volatile size_t	_capacity;
volatile int64_t _size;
const void *nil;
};

template<typename T>
class Queue
{
public:
Queue()
:_head(NULL), _tail(NULL), _size(0){}
~Queue()
{
volatile Node *p = _head->next;
for( ; p; p = _head->next ){
_head->next = p->next;
delete p;
}
delete _head;
}

bool push( const T& value )
{
if( _head == NULL ){
_head = _pool.allocate();
if( !_head ){
set_dqix_error( EDQIX_MEMFAIL );
return false;
}
_tail = _head;
}

AtomicNodePtr new_node = _pool.allocate();
if( !new_node ){
set_dqix_error( EDQIX_MEMFAIL );
return false;
}
new_node->value = value;
dqix_memory_barrier();
for( ;; ){
AtomicNodePtr p = _tail;
if( !p->next && dqix_atomic_cmp_set( &(p->next), new_node, NULL ) ){
if( !dqix_atomic_cmp_set( &_tail, new_node, p ) ){
_CrtDbgBreak();
}
dqix_atomic_increase( &_size );
return true;
}
}
}
bool pop( T& value )
{
if( !_head || !_head->next ){
set_dqix_error( EDQIX_QUEUE_EMPTY );
return false;
}
for( ;; ){
AtomicNodePtr p = _head->add_ref();
if( !p ){
continue;
}
if( p == _tail ){
set_dqix_error( EDQIX_QUEUE_EMPTY );
p->release_ref( );
return false;
}
AtomicNodePtr pnext = p->next;
if ( p != _head || !pnext->add_ref() ){
p->release_ref( );
continue;
}

if( dqix_atomic_cmp_set( &_head, p->next, p ) ){
value = pnext->value;
pnext->release_ref();
p->release_ref();//release _head->add_ref()增加的引用
p->release_ref();//release 自身的引用,释放指针p
dqix_atomic_decrease( &_size );
return true;
}
p->release_ref();
pnext->release_ref();
}
}

bool empty() const{ return _size == 0; }
size_t size() const { return _size; }

private:
typedef GenericNode<T> Node;
typedef volatile Node* volatile AtomicNodePtr;

AtomicNodePtr _head;
AtomicNodePtr _tail;
volatile int _size;

volatile FreeList<T> _pool;
};


代码主要由GenericNode,FreeList, Queue组成,下面分别对其进行说明。

GenericNode是一个带引用计数的结点类,用于Queue中实现队列链表。init将结点初始化。add_ref将引用加1,成功返回this指针,失败返回NULL. 返回失败的原因可能是引用计数为0了,这说明该结点的生命周期已经结束(被归还到内存池中或者delete了)。

release_ref释放一个引用,如果引用0,那么将该对象归还给内存池,或者删除。为何要使用引用计数见后面讲解Queue。

FreeList是GenericNode的内存池,也是一个无锁链表。allocate从链表中取出头结点,并将头结点指向下一个结点。如果只有一个头结点,说明没有空闲结点可用,就创建一个新的结点并返回。代码中取出头结点后还要判断ref_count是为了测试需要,理论上ref_count一定为0的,如果不为0说明多线之间的同步存在问题。release将一个结点添加到链表末尾,需要注意的是,末尾结点的next不是指向NULL,而是指向我们自定义的常量nil(-1)。这样做的原因是为了避免一个ABA问题,考虑如下一个场景:

有三个线程,一个调用allocate简称a1,两个调用release,简称r1和r2。链表为空,即head==tail。a1执行的line:77时,cpu被r1抢占,r1执行到line:108时,cpu被r2抢占,r2成功更新了tail指针,cpu被a1抢占继续执行到line:91,再被r1抢占继续执行,此时r1的old指针已经被a1初始化了,因此old->next为NULL,如果还是用next==NULL来判断尾指针时,那么r1就可以成功更新old,但实际上old已经不是tail。所以这里使用next==nil来判断尾指针。程序之前没有这样做,就总是会出这样的错,费了不少时间查出了这个问题。

Queue就是无锁队列的实现了,其实也就是个无锁链表,但是与FreeList不同的是,Queue中的结点都需要存储数据,而FreeList中的结点无需关心存储数据。Queue中head结点是一个冗余结点,真正的头结点是head->next。所以pop数据时,是返回head->next->value。然后删除head结点,head->next成了新的冗余头结点。这就带来了这样的问题:假设有2个消费者线程tr1,tr2。tr1线程所得到的head->next结点有可能是tr2线程的head结点,而tr2线程完全有可能先释放head结点,然后tr1再执行返回head->next->value时就发生错误了。因此引入了引用计数,当消费者线程引用head和head->next时都增加它们的引用,这样保证了它肯定不会被回收。

综上所述,我的C++无锁队列就这样实现了。从目前测试情况来看还没有发现问题,但对此,我现在依然保持很谨慎的态度,因为多线程的情况远比我们想象的复杂,我的大脑实在无法模拟cpu的各种调度场景。目前也没有测试其性能,后面打算测试一下它的性能,与boost无锁队列性能对比下。也欢迎各位小伙伴对其尽情的殴打(测试,挑刺),殴打完了请告知结果,欢迎探讨,探索无锁编程。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: