Orocos 无锁数据操作 Muliti Writer Single Reader Queue
2016-06-03 15:59
555 查看
无锁的数据结构应用越来越广泛,现在几乎所有的多核cpu都提供了 CAS 操作。
实现无锁的 Muliti Writer Single Reader Queue 的关键还是 实现指针的 CAS( gcc:
这里有一个 Single Writer Mulity Reader 的 无锁实现 see link
实现过程:
创建一个数组
reference link:
https://gcc.gnu.org/onlinedocs/gcc-4.4.1/gcc/Atomic-Builtins.html
http://www.ibm.com/developerworks/aix/library/au-multithreaded_structures2/
http://preshing.com/20120612/an-introduction-to-lock-free-programming/
实现无锁的 Muliti Writer Single Reader Queue 的关键还是 实现指针的 CAS( gcc:
__sync_val_compare_and_swap).
这里有一个 Single Writer Mulity Reader 的 无锁实现 see link
实现过程:
创建一个数组
_buf = new C[_size], 成员变量
_indxes保存这当前的索引值, 在 enqueue 和 dequeue 过程中用 局部变量 oldval 和 newval 用于保存当前索引和设置递增的索引,然后原子的更新当前索引 (
os::CAS(&_indxes._value, oldval._value, newval._value)), 源码如下:
template<class T> class AtomicMWSRQueue { //typedef _T* T; const int _size; typedef T C; typedef volatile C* CachePtrType; typedef C* volatile CacheObjType; typedef C ValueType; typedef C* PtrType; /** * Both read and write pointer are in a union. * This means a read's cas may fail because a * write happened (preemption). An implementation * with 2 distinct read/write pointers would not * suffer from this. */ // union 的使用使得可以通过一次读写 value 的值而更新 index[0] 和 index[1] 两个变量 union SIndexes { unsigned long _value; unsigned short _index[2]; }; /** * The pointer to the buffer can be cached, * the contents are volatile. */ CachePtrType _buf; /** * The indexes are packed into one double word. * Therefore the read and write index can be read and written atomically. */ volatile SIndexes _indxes; /** * Atomic advance and wrap of the Write pointer. * Return the old position or zero if queue is full. */ // 这个地方可重入,每次都会递增 newval._index[0] CachePtrType advance_w() { SIndexes oldval, newval; do { oldval._value = _indxes._value; /*Points to a free writable pointer.*/ newval._value = oldval._value; /*Points to the next writable pointer.*/ // check for full : if ((newval._index[0] == newval._index[1] - 1) || (newval._index[0] == newval._index[1] + _size - 1)) { return 0; } newval._index[0]++; if (newval._index[0] >= _size) newval._index[0] = 0; // if ptr is unchanged, replace it with newval. } while (!os::CAS(&_indxes._value, oldval._value, newval._value)); // frome here on : // oldval is 'unique', other preempting threads // will have a different value for oldval, as // wptr advances. As long as oldval has not been written, // rptr will not advance and wptr will remain stuck behind it. // return the old position to write to : return &_buf[oldval._index[0]]; // oldval._index[0] 从 0 开始 到 最大值 _size } /** * Advance and wrap of the Read pointer. * Only one thread may call this. */ bool advance_r(T& result) { SIndexes oldval, newval; // read it: oldval._value = _indxes._value; result = _buf[oldval._index[1]]; // return it if not yet written: if ( !result ) return false; // got it, clear field. _buf[oldval._index[1]] = 0; // move pointer: do { // re-read indxes, since we are the only reader, // _index[1] will not have changed since entry of this function oldval._value = _indxes._value; // 如果 CAS 失败,则此处会恢复 index[0] 和 index[1] 使得之前 ++newval._index[1] 失效,需要再次递增 newval._value = oldval._value; ++newval._index[1]; // if (newval._index[1] >= _size) newval._index[1] = 0; // we need to CAS since the write pointer may have moved. // this moves read pointer only: } while (!os::CAS(&_indxes._value, oldval._value, newval._value)); // 如果在读的过程中,其他线程进行了写操作,则再次更新两个指针 index[0] 和 index[1] 的值 return true; } // non-copyable ! AtomicMWSRQueue(const AtomicMWSRQueue<T>&); public: typedef unsigned int size_type; /** * Create an AtomicMWSRQueue with queue size \a size. * @param size The size of the queue, should be 1 or greater. */ // 构造后必须初始化 AtomicMWSRQueue(unsigned int size) : _size(size + 1) { _buf = new C[_size]; this->clear(); } ~AtomicMWSRQueue() { delete[] _buf; } /** * Inspect if the Queue is full. * @return true if full, false otherwise. */ bool isFull() const { // two cases where the queue is full : // if wptr is one behind rptr or if wptr is at end // and rptr at beginning. SIndexes val; val._value = _indxes._value; return val._index[0] == val._index[1] - 1 || val._index[0] == val._index[1] + _size - 1; } /** * Inspect if the Queue is empty. * @return true if empty, false otherwise. */ bool isEmpty() const { // empty if nothing to read. SIndexes val; val._value = _indxes._value; return val._index[0] == val._index[1]; } /** * Return the maximum number of items this queue can contain. */ size_type capacity() const { return _size - 1; } /** * Return the number of elements in the queue. */ size_type size() const { SIndexes val; val._value = _indxes._value; int c = (val._index[0] - val._index[1]); return c >= 0 ? c : c + _size; } /** * Enqueue an item. * @param value The value to enqueue. * @return false if queue is full, true if queued. */ bool enqueue(const T& value) { if (value == 0) return false; CachePtrType loc = advance_w(); if (loc == 0) return false; *loc = value; return true; } /** * Dequeue an item. * @param value Stores the dequeued value. It is unchanged when * dequeue returns false and contains the dequeued value * when it returns true. * @return false if queue is empty, true if result was written. */ bool dequeue(T& result) { T tmpresult; if (advance_r(tmpresult) ) { result = tmpresult; return true; } return false; } /** * Return the next to be read value. */ const T front() const { return _buf[_indxes._index[1]]; } /** * Clear all contents of the Queue and thus make it empty. */ void clear() { for (int i = 0; i != _size; ++i) { _buf[i] = 0; } _indxes._value = 0; } };
reference link:
https://gcc.gnu.org/onlinedocs/gcc-4.4.1/gcc/Atomic-Builtins.html
http://www.ibm.com/developerworks/aix/library/au-multithreaded_structures2/
http://preshing.com/20120612/an-introduction-to-lock-free-programming/
相关文章推荐
- 完全定制UITabBarViewController
- iOS开发之UITapGestureRecognizer单双击
- 使用orm(Sequelize) 操作 MySQL (2)
- Entity Framework Code First (五)Fluent API - 配置关系
- Entity Framework Code First (四)Fluent API - 配置属性/类型
- 单元格左侧附带选择按钮
- Java中 String、StringBuffer 、StringBuilder 总结
- requireGestureRecognizerToFail 手势识别
- break,continue,break的用法与区别
- 从零开始构建MSBuild C#项目文件
- execute、executeUpdate、executeQuery三者的区别(及返回值)
- iOS在UIView获取UIViewController
- ios barTintColor backgroundColor tintColor UITextAttributeTextColor的区别
- easyui主界面布局easyui-layout用法一例
- LCD1602
- 347TopKFrequentElements
- iOS--UIAlertView与UIAlertController和UIAlertAction之间的事儿
- Arduino可穿戴开发入门教程Arduino开发环境介绍
- Arduino可穿戴开发入门教程Arduino开发环境介绍
- SSH框架网上商城项目第14战之商城首页UI的设计