C++实现的无锁队列
2013-12-06 17:42
316 查看
关于无锁队列的实现,网上有很多的文章,其实现原理都来自论文implementing_lock_free.pdf,国内关于无锁队列实现介绍的较好的博客:http://coolshell.cn/articles/8239.html
关于理论的介绍上述文章已经写的非常好了,因此本文不会复述无锁队列实现原理,只讲解下实现细节以及过程中遇到的问题。
本文实现的是一个多生产者多消费者无锁队列,相比于单生产者多消费者等要复杂很多。
实现代码:
代码主要由GenericNode,FreeList, Queue组成,下面分别对其进行说明。
GenericNode是一个带引用计数的结点类,用于Queue中实现队列链表。init将结点初始化。add_ref将引用加1,成功返回this指针,失败返回NULL. 返回失败的原因可能是引用计数为0了,这说明该结点的生命周期已经结束(被归还到内存池中或者delete了)。
release_ref释放一个引用,如果引用0,那么将该对象归还给内存池,或者删除。为何要使用引用计数见后面讲解Queue。
FreeList是GenericNode的内存池,也是一个无锁链表。allocate从链表中取出头结点,并将头结点指向下一个结点。如果只有一个头结点,说明没有空闲结点可用,就创建一个新的结点并返回。代码中取出头结点后还要判断ref_count是为了测试需要,理论上ref_count一定为0的,如果不为0说明多线之间的同步存在问题。release将一个结点添加到链表末尾,需要注意的是,末尾结点的next不是指向NULL,而是指向我们自定义的常量nil(-1)。这样做的原因是为了避免一个ABA问题,考虑如下一个场景:
有三个线程,一个调用allocate简称a1,两个调用release,简称r1和r2。链表为空,即head==tail。a1执行的line:77时,cpu被r1抢占,r1执行到line:108时,cpu被r2抢占,r2成功更新了tail指针,cpu被a1抢占继续执行到line:91,再被r1抢占继续执行,此时r1的old指针已经被a1初始化了,因此old->next为NULL,如果还是用next==NULL来判断尾指针时,那么r1就可以成功更新old,但实际上old已经不是tail。所以这里使用next==nil来判断尾指针。程序之前没有这样做,就总是会出这样的错,费了不少时间查出了这个问题。
Queue就是无锁队列的实现了,其实也就是个无锁链表,但是与FreeList不同的是,Queue中的结点都需要存储数据,而FreeList中的结点无需关心存储数据。Queue中head结点是一个冗余结点,真正的头结点是head->next。所以pop数据时,是返回head->next->value。然后删除head结点,head->next成了新的冗余头结点。这就带来了这样的问题:假设有2个消费者线程tr1,tr2。tr1线程所得到的head->next结点有可能是tr2线程的head结点,而tr2线程完全有可能先释放head结点,然后tr1再执行返回head->next->value时就发生错误了。因此引入了引用计数,当消费者线程引用head和head->next时都增加它们的引用,这样保证了它肯定不会被回收。
综上所述,我的C++无锁队列就这样实现了。从目前测试情况来看还没有发现问题,但对此,我现在依然保持很谨慎的态度,因为多线程的情况远比我们想象的复杂,我的大脑实在无法模拟cpu的各种调度场景。目前也没有测试其性能,后面打算测试一下它的性能,与boost无锁队列性能对比下。也欢迎各位小伙伴对其尽情的殴打(测试,挑刺),殴打完了请告知结果,欢迎探讨,探索无锁编程。
关于理论的介绍上述文章已经写的非常好了,因此本文不会复述无锁队列实现原理,只讲解下实现细节以及过程中遇到的问题。
本文实现的是一个多生产者多消费者无锁队列,相比于单生产者多消费者等要复杂很多。
实现代码:
#define dqix_atomic_cmp_set(lock, set, old) \ ( (LONG) InterlockedCompareExchange( (LONG *)lock, (LONG)set, (LONG)old ) \ == (LONG)old ) #define dqix_atomic_fetch_add( p, add ) InterlockedExchangeAdd( (LONG *)p, add ) #define dqix_atomic_increase( p ) InterlockedExchangeAdd( (LONG *)p, 1 ) #define dqix_atomic_decrease( p ) InterlockedExchangeAdd( (LONG *)p, -1 ) #pragma intrinsic(_ReadWriteBarrier) #define dqix_memory_barrier() _ReadWriteBarrier() template<typename T> class FreeList; template<typename T> struct GenericNode { GenericNode( FreeList<T> * free_ls = NULL ) :ref_cnt(0), next(NULL), _free_list(free_ls){} ~GenericNode(){} inline void init( ) volatile { ref_cnt = 1; next = NULL; } inline volatile GenericNode* add_ref( ) volatile { for( int cnt = ref_cnt; cnt > 0; cnt = ref_cnt ){ if ( dqix_atomic_cmp_set( &ref_cnt, cnt + 1, cnt ) ){ return this; } } return NULL; } inline void release_ref( )volatile { if( dqix_atomic_decrease( &ref_cnt ) == 1 ){ if ( _free_list ){ _free_list->release( this ); return; } delete this; } } inline int ref_count() volatile { return ref_cnt; } T value; volatile GenericNode* volatile next; private: volatile int ref_cnt; FreeList<T> *_free_list; }; template<typename T> class FreeList { public: typedef GenericNode<T> Node; typedef volatile Node* volatile AtomicNodePtr; FreeList( size_t capacity = 100 ): _head(0), _tail(0), _capacity(capacity),_size(0), nil((void*)-1) { _head = new(std::nothrow) Node(this); _head->next = (AtomicNodePtr)nil; _tail = _head; } ~FreeList(){} AtomicNodePtr allocate() volatile { if ( _head == NULL ){ set_dqix_error( EDQIX_MEMFAIL ); return NULL; } for( ;; ){ AtomicNodePtr p = _head; AtomicNodePtr pnext = _head->next; if ( _head == _tail ){ p = new(std::nothrow) Node((FreeList*)this); if ( !p ){ set_dqix_error( EDQIX_MEMFAIL ); return NULL; } p->init(); return p; } if ( dqix_atomic_cmp_set( &_head, p->next, p )){ if ( p->ref_count() == 0 ){ p->init(); dqix_atomic_decrease(&_size); return p; } _CrtDbgBreak(); } } } void release( volatile Node* volatile node ) volatile { if ( _size >= _capacity ){ delete node; return; } node->next = ( AtomicNodePtr )nil; memset( (void*)&(node->value), 0, sizeof(node->value) ); for (;;){ AtomicNodePtr old = _tail; if ( dqix_atomic_cmp_set( &(old->next), node, nil ) ){ if( !dqix_atomic_cmp_set( &_tail, node, old ) ){ _CrtDbgBreak(); } dqix_atomic_increase(&_size); return; } } } private: AtomicNodePtr _head; AtomicNodePtr _tail; volatile size_t _capacity; volatile int64_t _size; const void *nil; }; template<typename T> class Queue { public: Queue() :_head(NULL), _tail(NULL), _size(0){} ~Queue() { volatile Node *p = _head->next; for( ; p; p = _head->next ){ _head->next = p->next; delete p; } delete _head; } bool push( const T& value ) { if( _head == NULL ){ _head = _pool.allocate(); if( !_head ){ set_dqix_error( EDQIX_MEMFAIL ); return false; } _tail = _head; } AtomicNodePtr new_node = _pool.allocate(); if( !new_node ){ set_dqix_error( EDQIX_MEMFAIL ); return false; } new_node->value = value; dqix_memory_barrier(); for( ;; ){ AtomicNodePtr p = _tail; if( !p->next && dqix_atomic_cmp_set( &(p->next), new_node, NULL ) ){ if( !dqix_atomic_cmp_set( &_tail, new_node, p ) ){ _CrtDbgBreak(); } dqix_atomic_increase( &_size ); return true; } } } bool pop( T& value ) { if( !_head || !_head->next ){ set_dqix_error( EDQIX_QUEUE_EMPTY ); return false; } for( ;; ){ AtomicNodePtr p = _head->add_ref(); if( !p ){ continue; } if( p == _tail ){ set_dqix_error( EDQIX_QUEUE_EMPTY ); p->release_ref( ); return false; } AtomicNodePtr pnext = p->next; if ( p != _head || !pnext->add_ref() ){ p->release_ref( ); continue; } if( dqix_atomic_cmp_set( &_head, p->next, p ) ){ value = pnext->value; pnext->release_ref(); p->release_ref();//release _head->add_ref()增加的引用 p->release_ref();//release 自身的引用,释放指针p dqix_atomic_decrease( &_size ); return true; } p->release_ref(); pnext->release_ref(); } } bool empty() const{ return _size == 0; } size_t size() const { return _size; } private: typedef GenericNode<T> Node; typedef volatile Node* volatile AtomicNodePtr; AtomicNodePtr _head; AtomicNodePtr _tail; volatile int _size; volatile FreeList<T> _pool; };
代码主要由GenericNode,FreeList, Queue组成,下面分别对其进行说明。
GenericNode是一个带引用计数的结点类,用于Queue中实现队列链表。init将结点初始化。add_ref将引用加1,成功返回this指针,失败返回NULL. 返回失败的原因可能是引用计数为0了,这说明该结点的生命周期已经结束(被归还到内存池中或者delete了)。
release_ref释放一个引用,如果引用0,那么将该对象归还给内存池,或者删除。为何要使用引用计数见后面讲解Queue。
FreeList是GenericNode的内存池,也是一个无锁链表。allocate从链表中取出头结点,并将头结点指向下一个结点。如果只有一个头结点,说明没有空闲结点可用,就创建一个新的结点并返回。代码中取出头结点后还要判断ref_count是为了测试需要,理论上ref_count一定为0的,如果不为0说明多线之间的同步存在问题。release将一个结点添加到链表末尾,需要注意的是,末尾结点的next不是指向NULL,而是指向我们自定义的常量nil(-1)。这样做的原因是为了避免一个ABA问题,考虑如下一个场景:
有三个线程,一个调用allocate简称a1,两个调用release,简称r1和r2。链表为空,即head==tail。a1执行的line:77时,cpu被r1抢占,r1执行到line:108时,cpu被r2抢占,r2成功更新了tail指针,cpu被a1抢占继续执行到line:91,再被r1抢占继续执行,此时r1的old指针已经被a1初始化了,因此old->next为NULL,如果还是用next==NULL来判断尾指针时,那么r1就可以成功更新old,但实际上old已经不是tail。所以这里使用next==nil来判断尾指针。程序之前没有这样做,就总是会出这样的错,费了不少时间查出了这个问题。
Queue就是无锁队列的实现了,其实也就是个无锁链表,但是与FreeList不同的是,Queue中的结点都需要存储数据,而FreeList中的结点无需关心存储数据。Queue中head结点是一个冗余结点,真正的头结点是head->next。所以pop数据时,是返回head->next->value。然后删除head结点,head->next成了新的冗余头结点。这就带来了这样的问题:假设有2个消费者线程tr1,tr2。tr1线程所得到的head->next结点有可能是tr2线程的head结点,而tr2线程完全有可能先释放head结点,然后tr1再执行返回head->next->value时就发生错误了。因此引入了引用计数,当消费者线程引用head和head->next时都增加它们的引用,这样保证了它肯定不会被回收。
综上所述,我的C++无锁队列就这样实现了。从目前测试情况来看还没有发现问题,但对此,我现在依然保持很谨慎的态度,因为多线程的情况远比我们想象的复杂,我的大脑实在无法模拟cpu的各种调度场景。目前也没有测试其性能,后面打算测试一下它的性能,与boost无锁队列性能对比下。也欢迎各位小伙伴对其尽情的殴打(测试,挑刺),殴打完了请告知结果,欢迎探讨,探索无锁编程。
相关文章推荐