您的位置:首页 > 编程语言 > C语言/C++

C++编程 –安全并发访问容器元素

2016-01-14 23:54 363 查看
http://blog.csdn.net/flyfish1986/article/details/39526251

C++ 安全并发访问容器元素

2014-9-24 flyfish


标准库STL的vector, deque, list等等不是线程安全的

例如 线程1正在使用迭代器(iterator)读vector

线程2正在对该vector进行插入操作,使vector重新分配内存,这样就造成线程1中的迭代器失效

STL的容器

多个线程读是安全的,在读的过程中,不能对容器有任何写入操作

多个线程可以同时对不同的容器做写入操作。

不能指望任何STL实现来解决线程难题,必须手动做同步控制.

方案1 对vector进行加锁处理

effective STL给出的Lock框架

[cpp] view
plaincopy





template<typename Container> //一个为容器获取和释放互斥体的模板

class Lock

{ //框架;其中的很多细节被省略了

public:

Lock(const Container& container) :c(container)

{

getMutexFor(c);

//在构造函数中获取互斥体

}

~Lock()

{

releaseMutexFor(c);

//在析构函数中释放它

}

private: const Container& c;

};

如果需要实现工业强度,需要做更多的工作。

方案2 微软的Parallel Patterns Library (PPL)

看MSDN

PPL 提供的功能

1 Task Parallelism: a mechanism to execute several work items (tasks) in parallel

任务并行:一种并行执行若干工作项(任务)的机制

2 Parallel algorithms: generic algorithms that act on collections of data in parallel

并行算法:并行作用于数据集合的泛型算法

3 Parallel containers and objects: generic container types that provide safe concurrent access to their elements

并行容器和对象:提供对其元素的安全并发访问的泛型容器类型

示例是对斐波那契数列(Fibonacci)的顺序计算和并行计算的比较

顺序计算是

使用 STL std::for_each 算法

结果存储在 std::vector 对象中。

并行计算是

使用 PPL Concurrency::parallel_for_each 算法

结果存储在 Concurrency::concurrent_vector 对象中。

[cpp] view
plaincopy





// parallel-fibonacci.cpp

// compile with: /EHsc

#include <windows.h>

#include <ppl.h>

#include <concurrent_vector.h>

#include <array>

#include <vector>

#include <tuple>

#include <algorithm>

#include <iostream>

using namespace Concurrency;

using namespace std;

// Calls the provided work function and returns the number of milliseconds

// that it takes to call that function.

template <class Function>

__int64 time_call(Function&& f)

{

__int64 begin = GetTickCount();

f();

return GetTickCount() - begin;

}

// Computes the nth Fibonacci number.

int fibonacci(int n)

{

if(n < 2)

return n;

return fibonacci(n-1) + fibonacci(n-2);

}

int wmain()

{

__int64 elapsed;

// An array of Fibonacci numbers to compute.

array<int, 4> a = { 24, 26, 41, 42 };

// The results of the serial computation.

vector<tuple<int,int>> results1;

// The results of the parallel computation.

concurrent_vector<tuple<int,int>> results2;

// Use the for_each algorithm to compute the results serially.

elapsed = time_call([&]

{

for_each (a.begin(), a.end(), [&](int n) {

results1.push_back(make_tuple(n, fibonacci(n)));

});

});

wcout << L"serial time: " << elapsed << L" ms" << endl;

// Use the parallel_for_each algorithm to perform the same task.

elapsed = time_call([&]

{

parallel_for_each (a.begin(), a.end(), [&](int n) {

results2.push_back(make_tuple(n, fibonacci(n)));

});

// Because parallel_for_each acts concurrently, the results do not

// have a pre-determined order. Sort the concurrent_vector object

// so that the results match the serial version.

sort(results2.begin(), results2.end());

});

wcout << L"parallel time: " << elapsed << L" ms" << endl << endl;

// Print the results.

for_each (results2.begin(), results2.end(), [](tuple<int,int>& pair) {

wcout << L"fib(" << get<0>(pair) << L"): " << get<1>(pair) << endl;

});

}

命名空间Concurrency首字母大写,一般命名空间全是小写。

贴一个简单的示例代码

使用parallel_for_each 算法计算std::array 对象中每个元素的平方

参数分别是lambda 函数、函数对象和函数指针。

[cpp] view
plaincopy





#include "stdafx.h"

#include <ppl.h>

#include <array>

#include <iostream>

using namespace Concurrency;

using namespace std;

using namespace std::tr1;

// Function object (functor) class that computes the square of its input.

template<class Ty>

class SquareFunctor

{

public:

void operator()(Ty& n) const

{

n *= n;

}

};

// Function that computes the square of its input.

template<class Ty>

void square_function(Ty& n)

{

n *= n;

}

int _tmain(int argc, _TCHAR* argv[])

{

// Create an array object that contains 5 values.

array<int, 5> values = { 1, 2, 3, 4, 5 };

// Use a lambda function, a function object, and a function pointer to

// compute the square of each element of the array in parallel.

// Use a lambda function to square each element.

parallel_for_each(values.begin(), values.end(), [](int& n){n *= n;});

// Use a function object (functor) to square each element.

parallel_for_each(values.begin(), values.end(), SquareFunctor<int>());

// Use a function pointer to square each element.

parallel_for_each(values.begin(), values.end(), &square_function<int>);

// Print each element of the array to the console.

for_each(values.begin(), values.end(), [](int& n) {

wcout << n << endl;

});

return 0;

}

在微软的concurrent_vector.h文件中有这样一句

Microsoft would like to acknowledge that this concurrency data structure implementation

is based on Intel implementation in its Threading Building Blocks ("Intel Material").

也就是微软的concurrent_vector是在Intel 的Threading Building Blocks基础上实现的。

方案3 Intel TBB(Threading Building Blocks)

Intel TBB 提供的功能

1 直接使用的线程安全容器,比如 concurrent_vector 和 concurrent_queue。

2 通用的并行算法,如 parallel_for 和 parallel_reduce。

3 模板类 atomic 中提供了无锁(Lock-free或者mutex-free)并发编程支持。

方案4 无锁数据结构支持库Concurrent Data Structures (libcds).

地址 http://sourceforge.net/projects/libcds/
下载以后里面直接有从VC2008到VC2013的编译环境,依赖于boost库

方案5 Boost 使用boost.lockfree

boost.lockfree实现了三种无锁数据结构:

1 boost::lockfree::queue

2 boost::lockfree::stack

3 boost::lockfree::spsc_queue

生产者-消费者

下面的代码实现的是

实现了一个多写生成,多消费 队列。

产生整数,并被4个线程消费

[cpp] view
plaincopy





#include <boost/thread/thread.hpp>

#include <boost/lockfree/queue.hpp>

#include <iostream>

#include <boost/atomic.hpp>

boost::atomic_int producer_count(0);

boost::atomic_int consumer_count(0);

boost::lockfree::queue<int> queue(128);

const int iterations = 10000000;

const int producer_thread_count = 4;

const int consumer_thread_count = 4;

void producer(void)

{

for (int i = 0; i != iterations; ++i) {

int value = ++producer_count;

while (!queue.push(value))

;

}

}

boost::atomic<bool> done (false);

void consumer(void)

{

int value;

while (!done) {

while (queue.pop(value))

++consumer_count;

}

while (queue.pop(value))

++consumer_count;

}

int main(int argc, char* argv[])

{

using namespace std;

cout << "boost::lockfree::queue is ";

if (!queue.is_lock_free())

cout << "not ";

cout << "lockfree" << endl;

boost::thread_group producer_threads, consumer_threads;

for (int i = 0; i != producer_thread_count; ++i)

producer_threads.create_thread(producer);

for (int i = 0; i != consumer_thread_count; ++i)

consumer_threads.create_thread(consumer);

producer_threads.join_all();

done = true;

consumer_threads.join_all();

cout << "produced " << producer_count << " objects." << endl;

cout << "consumed " << consumer_count << " objects." << endl;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: