您的位置:首页 > 编程语言 > C语言/C++

C++并发实战17:线程安全的stack和queue

2013-12-24 16:59 555 查看
1 线程安全的数据结构有几个可以注意的地方:当一个线程看见invariants时其他线程不会破坏该invariants,比如一个线程在遍历访问vector另一个线程却在修改vector这就破坏了variants;注意数据结构接口引起的竞态,必要的时候将多个操作合并;注意异常的处理;避免局部操作的锁超出其作用范围,否则可能引起死锁;尽可能的缩小临界区。

      线程安全的栈关键代码:

#include <exception>
struct empty_stack: std::exception
{
const char* what() const throw();
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack(){}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data=other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
std::shared_ptr<T> pop()//top和pop合并,采用shared_ptr返回栈顶元素防止元素构造时发生异常
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
std::shared_ptr<T> const res(std::make_shared<T>(std::move(data.top())));//make_shared比shared_ptr直接构造效率高
data.pop();
return res;
}
void pop(T& value)//采用参数引用返回栈顶元素
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
value=std::move(data.top());
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};//这里还是有死锁的可能:栈内元素是用户代码,若该元素在构造或析构时修改栈则可能发生死锁,当然这种设计本身就有一定问题,应该从设计本身下手


               一个线程安全的queue关键代码:

template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T> > data_queue;//队里存储的是shared_ptr这样可以保证push和pop操作时不会引起构造或析构异常,队列更加高效
std::condition_variable data_cond;//采用条件变量同步入队和出队操作
public:
threadsafe_queue(){}
void wait_and_pop(T& value)//直至容器中有元素可以删除
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=std::move(*data_queue.front());
data_queue.pop();
}
bool try_pop(T& value)//若队中无元素可以删除则直接返回false
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return false;
value=std::move(*data_queue.front());
data_queue.pop();
return true;
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
std::shared_ptr<T> res=data_queue.front();
data_queue.pop();
return res;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res=data_queue.front();
data_queue.pop();
return res;
}
void push(T new_value)
{
std::shared_ptr<T> data(std::make_shared<T>(std::move(new_value)));//数据的构造在临界区外从而缩小临界区,并且不会在临界区抛出异常
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};

             上面的采用一个mutex保护整个queue使得线程迫使线程顺序化,为了减小锁的粒度,使用链表作为queue的底层数据结构,并采用一个虚拟节点使head和tail分开:

template<typename T>
class threadsafe_queue
{
private:
struct node
{
std::shared_ptr<T> data;//通过shared_ptr管理资源T,那么资源的初始化和析构都在临界区外进行
std::unique_ptr<node> next;
};
std::mutex head_mutex;
std::unique_ptr<node> head;
std::mutex tail_mutex;
node* tail;
node* get_tail(//返回tail用于判断head==tail
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
}
std::unique_ptr<node> pop_head()/删除队首元素并返回该元素
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if(head.get()==get_tail())//判断队是否为空,get_tail()必选在head_mutex保护下,试想多个线程都在pop那么会出现什么情形?
{
return nullptr;
}
std::unique_ptr<node> old_head=std::move(head);
head=std::move(old_head->next);
return old_head;
}
public:
threadsafe_queue():
head(new node),tail(head.get())//构造函数创建一个虚拟节点,初始时head和tail都指向这个虚拟节点,这也是判断queue为空的条件head==tail都指向同一个虚拟节点
{}
threadsafe_queue(const threadsafe_queue& other)=delete;
threadsafe_queue& operator=(const threadsafe_queue& other)=delete;//为了简化代码禁止拷贝和赋值
std::shared_ptr<T> try_pop()//
{
std::unique_ptr<node> old_head=pop_head();
return old_head?old_head->data:std::shared_ptr<T>();
}
void push(T new_value)//向队列添加一个元素,T的实例在临界区外创建即使抛出异常queue也没有被修改,而且加速多个线程的添加操作
{
std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));//注意make_shared可以提高效率,make_shared()函数要比直接创建shared_ptr对象的方式快且高效,因为它内部仅分配一次内存,消除了shared_ptr 构造时的开销
std::unique_ptr<node> p(new node);//创建一个虚拟节点,tail始终指向一个虚拟节点从而和head分开(队列中有元素时),防止队列中只有元素时pop和top都操作的tail和head(若没有虚拟节点此时tail和head都是同一个节点)
node* const new_tail=p.get();
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data=new_data;
tail->next=std::move(p);
tail=new_tail;
}
};
     关于上述队列的几点说明:

1)  tail->next==nullptr;tail->data==nullptr//tail始终指向一个虚拟节点

2)  head==tail表示队列为空

3)  head->next==tail表示队中只有一个元素

4)   队列中真实的元素x满足x!=tail, x->data指向T的实例,x->next指向下一个节点, x->next==tail表示x的队列中最后一个真实的节点

5)  采用两个mutex使得锁的粒度减小提高了并发性能

      一个相对完整的线程安全queue版本:

template<typename T>
class threadsafe_queue
{
public:
threadsafe_queue():head(new node),tail(head.get()){}
threadsafe_queue(const threadsafe_queue& other)=delete;
threadsafe_queue& operator=(const threadsafe_queue& other)=delete;
std::shared_ptr<T> try_pop()
{
std::unique_ptr<node> const old_head=try_pop_head();
return old_head?old_head->data:std::shared_ptr<T>();
}
bool try_pop(T& value)
{
std::unique_ptr<node> const old_head=try_pop_head(value);
return old_head;
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_ptr<node> const old_head=wait_pop_head();
return old_head->data;
}
void wait_and_pop(T& value)
{
std::unique_ptr<node> const old_head=wait_pop_head(value);
}
void empty()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
return (head==get_tail());
}
void push(T new_value)
{
std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
std::unique_ptr<node> p(new node);
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data=new_data;
node* const new_tail=p.get();
tail->next=std::move(p);
tail=new_tail;
}
data_cond.notify_one();
}

private:
node* get_tail()
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
}
std::unique_ptr<node> pop_head()
{
std::unique_ptr<node> const old_head=std::move(head);
head=std::move(old_head->next);
return old_head;
}
std::unique_lock<std::mutex> wait_for_data()
{
std::unique_lock<std::mutex> head_lock(head_mutex);
data_cond.wait(head_lock,[&]{return head!=get_tail();});
return std::move(head_lock);
}
std::unique_ptr<node> wait_pop_head()
{
std::unique_lock<std::mutex> head_lock(wait_for_data());
return pop_head();
}
std::unique_ptr<node> wait_pop_head(T& value)
{
std::unique_lock<std::mutex> head_lock(wait_for_data());
value=std::move(*head->data);
return pop_head();
}
std::unique_ptr<node> try_pop_head()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if(head.get()==get_tail())
{
return std::unique_ptr<node>();
}
return pop_head();
}
std::unique_ptr<node> try_pop_head(T& value)
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if(head.get()==get_tail())
{
return std::unique_ptr<node>();
}
value=std::move(*head->data);
return pop_head();
}

private:
struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};

std::mutex head_mutex;
std::unique_ptr<node> head;
std::mutex tail_mutex;
node* tail;
std::condition_variable data_cond;
};
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息