您的位置:首页 > 产品设计 > UI/UE

Intel Threading Building Blocks :基本算法参考及使用

2012-06-09 18:12 387 查看

基本算法(algorithms)

Intel TBB提供的大多数并行算法支持泛型。但是这些受支持的类型必须实现必要的概念方法。并行算法可以嵌套,
例如,一个parallel_for的内部可以调用另一个parallel_for。目前版本的TBB(4.0)提供的基本算法如下所示:

parallel_for

parallel_reduce

parallel_scan

parallel_do

管道(pipeline、parallel_pipeline)

parallel_sort

parallel_invoke

parallel_for

l 摘要

parallel_for是在一个值域执行并行迭代操作的模板函数。

l 语法

template<typenameIndex, typename Func>

Funcparallel_for( Index first, Index_type last, const Func& f

[, task_group_context&group] );

template<typenameIndex, typename Func>

Funcparallel_for( Index first, Index_type last,

Index step, const Func&f

[, task_group_context&group] );

template<typenameRange, typename Body>

voidparallel_for( const Range& range, const Body& body,

[, partitioner[,task_group_context& group]] );

l 头文件

#include “tbb/parallel_for.h”

l 描述

parallel_for(first, last,step, f)表示一个循环的并行执行:

for(auto i= first; i<last; i+=step) f(i);

注意以下几点:

1、索引类型必须是整形

2、循环不能回环

3、步长(step)必须为正,如果省略了,隐指为1

4、并没有保证迭代操作以并行方式进行

5、较小的迭代等待更大的迭代可能会发生死锁

6、分割策略总是auto_partitioner

parallel_for(range, body, partitioner)提供了并行迭代的泛型形式。它表示在区域的每个值,并行执行

body。partitioner选项指定了分割策略。Range类型必须符合Range概念模型。body必须符合下表的要求:

原型
语义
Body::Body(const Body&)
拷贝构造
Body::~Body()
析构
void Body::operator()(Range& range) const
对range对象应用body对象
采用最后一个模板以及stl中的vector容器改写1-1

例:1-2
#include <iostream>
#include <vector>
#include <tbb/tbb.h>
#include <tbb/blocked_range.h>
#include <tbb/parallel_for.h>

using namespace std;
using namespace tbb;

typedef vector<int>::iterator IntVecIt;

struct body
{
void operator()(const blocked_range<IntVecIt>&r)const
{
for(auto i = r.begin(); i!=r.end(); i++)

cout<<*i<<' ';
}
};

int main()
{
vector<int> vec;
for(int i=0; i<10; i++)
vec.push_back(i);

parallel_for(blocked_range< IntVecIt>(vec.begin(), vec.end())
, body());
return 0;
}

parallel_reduce

l 摘要

parallel_reduce模板在一个区域迭代,将由各个任务计算得到的部分结果合并,得到最终结果。

parallel_reduce对区域(range)类型的要求与parallel_for一样。body类型需要分割构造函数以及一个

join方法。body的分割构造函数拷贝运行循环体需要的只读数据,并分配并归操作中初始化并归变量

的标志元素。join方法会组合并归操作中各任务的结果。

l 语法

template<typenameRange, typename Value,

typename Func, typename Reduction>

Value parallel_reduce(const Range& range, const Value& identity,

const Func& func,const Reduction& reduction,

[, partitioner[,task_group_context& group]] );

template<typenameRange, typename Body>

void parallel_reduce(const Range& range, const Body& body

[, partitioner[,task_group_context& group]] );

l 头文件

#include “tbb/parallel_reduce.h”

l 描述

parallel_reduce模板有两种形式。函数形式是为方便与lambda表达式一起使用而设计。

第二种形式是为了最小化数据拷贝。

下面的表格总结了第一种形式中的identity,func,reduction的类型要求要求:

原型
摘要
Value Identity
Func::operator()的左标识元素
Value Func::operator()(const Range& range, const Value& x)
累计从初始值x开始的子区域的结果
Value Reduction::operator()(const Value& x, const Value& y);
合并x跟y的结果
第二种形式parallel_reduce(range,body)对range中的每个值执行body的并行并归。

Range类型必须符合Range类型要求。body必须符合下表的要求:

原型
摘要
Body::Body(Body&, split)
分割构造函数。必须能跟operator()、join()并发运行。
Body::~Body()
析构函数
void Body::operator()(const Range& )
累计子区域的结果
void Body::join(Body& rhs)
将结果结合。rhs中的结果将合并到this中。
parallel_reduce使用分割构造函数来为每个线程生成一个或多个body的拷贝。当它拷贝

body的时候,也许body的operator()或者join()正在并发运行。要确保这种并发运行

下的安全。典型应用中,这种安全要求不会消耗你太多的精力。

l example

下面的例子将一个容器内的数值累加。

例1-3:
#include <iostream>
#include <tbb/parallel_reduce.h>
#include <tbb/blocked_range.h>
#include <vector>

using namespace std;
using namespace tbb;

int main()
{
vector<int> vec;
for(int i=0; i<100; i++)
vec.push_back(i);

int result = parallel_reduce(blocked_range<vector<int>::iterator>(vec.begin(), vec.end()),
0,[](const blocked_range<vector<int>::iterator>& r, int init)->int{

for(auto a = r.begin(); a!=r.end(); a++)
init+=*a;
return init;
},

[](int x, int y)->int{
return x+y;
}
);
cout<<"result:"<<result<<endl;
return 0;

}

parallel_scan

l 摘要

并行计算前束(prefix)的函数模板。即输入一个数组,生成一个数组,其中每个元素的值

都是原数组中在此元素之前的元素的某个运算符的结果的累积。比如求和:

输入:[2, 8, 9, -4, 1, 3, -2, 7]

生成:[0, 2, 10, 19, 15, 16,
19, 17]

l 语法

template<typename Range, typename Body>

void parallel_scan( const Range& range, Body& body );

template<typename Range, typename Body>

void parallel_scan( const Range& range, Body& body, const

auto_partitioner& );

template<typename Range, typename Body>

void parallel_scan( const Range& range, Body& body, const

simple_partitioner& );

l 头文件

#include “tbb/parallel_scan.h”

l 描述

数学里对于并行前束的定义如下:

设⊕为左标识元素id⊕的关联运算符。在队列X0,X1,…Xn-1执行⊕并行前束得到队列Y0,Y1,Y2,…Yn-1:

y0= id⊕
⊕x0

yi=yi-1⊕
xi

parallel_scan<Range,Body>以泛型形式实现并行前束。它的要求如下:

伪签名
语义
void Body::operator()(const Range& r, pre_scan tag)
累积归纳区域r
void Body::operator()(const Range& r, final_scan tag)
归纳区域r以及计算扫描结果
Body::Body(Body& b, split)
分割b以便this和b能被单独累积归纳。*this对象即本表下行的对象a
void Body::reverse_join(Body& a)
将a的归纳结果合并到this,this是先前从a的分割构造函数中创建的。*this对象即本表上一行中的对象b
void Body::assign(Body& b)
将b的归纳结果赋给this
l example
#include <tbb/parallel_scan.h>
#include <tbb/blocked_range.h>
#include <iostream>
using namespace tbb;
using namespace std;

template<typename T>
class Body
{
T _sum;
T* const _y;
const T* const _x;
public:
Body(T y[], const T x[]):_sum(0), _x(x), _y(y){}
T get_sum() const
{
return _sum;
}

template<typename Tag>
void operator()(const blocked_range<int>& r, Tag)
{
T temp = _sum;
for(int i = r.begin(); i< r.end(); i++)
{
temp+=_x[i];
if(Tag::is_final_scan())
_y[i] = temp;
}

_sum = temp;
}

Body(Body&b, split):_x(b._x), _y(b._y), _sum(0){}
void reverse_join(Body& a)
{
_sum+=a._sum;
}
void assign(Body& b)
{
_sum = b._sum;
}

};

int main()
{
int x[10] = {0,1,2,3,4,5,6,7,8,9};
int y[10];
Body<int> body(y,x);
parallel_scan(blocked_range<int>(0, 10), body);
cout<<"sum:"<<body.get_sum()<<endl;
return 0;
}

parallel_do

l 摘要

并行处理工作项的模板函数

l 语法

template<typename InputIterator, typename Body>

void parallel_do( InputIterator first, InputIteratorlast,

Body body[,task_group_context& group] );

l 头文件

#include "tbb/parallel_do.h"

l 描述

parallel_do(first, last,body)在对处于半开放区间[first,
last)的元素应用函数对象body(不见得并行运行)。如果body重载的()函数的第二个参数(类型为parallel_do_feeder)不为空,那么可以增加另外的工作项。当对输入队列或者通过parallel_do_feeder::add方法添加的所有项x执行的body(x)都返回后,函数结束。其中的parallel_do_feeder允许parallel_do的body添加额外的工作项,只有parallel_do才能创建或者销毁parallel_do_feeder对象。其他的代码对parallel_do_feeder唯一能做的事就是调用它的add方法。

对于Body类型的需求如下:

伪签名
语义
Body::operator()( cv-qualifiers T& item,

Parallel_do_feeder<T>& feeder) const

或者

Body::operator()(cv-qualifiers T& item) const
处理item。模板parallel_do也许会为同一个this(不能是同一个item)并行调用operator()。如果有第二个参数,允许在函数中另行添加工作项。
T(const T&)
拷贝工作项
~T::T()
销毁工作项
如果所有来自输入流的元素不能随机访问,那么parallel_do中的并行就不具备可扩展性。为达到可扩展性,可按

如下方式之一处理:

ü 使用随机迭代器指定输入流

ü 对诸如body经常增加不止一项任务的行为设计自己的算法

ü 用parallel_for来替换

为了提高速度,B::operator()的粒度至少要约10万个时钟周期。否则,parallel_do的内在开销就会影响有效工作。算法可以传递一个task_group_context对象,这样它的任务可以在此组内执行。默认情况下,算法在它自己的有界组中执行。

l example
#include <tbb/parallel_do.h>
#include <iostream>
#include <vector>
using namespace std;
using namespace tbb;

struct t_test
{
string msg;
int ref;
void operator()()const
{
cout<<msg<<endl;
}
};

template <typename T>
struct body_test
{
void operator()(T* t, parallel_do_feeder<T*>& feeder) const
{
(*t)();
if(t->ref == 0)
{
t->msg = "added msg";
feeder.add(t);
t->ref++;
}
}
};

int main()
{
t_test *pt = new t_test;
pt->ref = 0;
pt->msg = "original msg";

vector<t_test*> vec;
vec.push_back(pt);
parallel_do(vec.begin(), vec.end(), body_test<t_test>());
delete pt;
return 0;
}

pipleline

class pipeline

{

public:

pipeline();

~pipeline();

void add_filter( filter& f );

void run( size_t max_number_of_live_tokens

[,task_group_context& group] );

void clear();

};

可按以下步骤使用pipeline类:

1、从filter继承类f,f的构造函数传递给基类filter的构造函数一个参数,来指定它的模式

2、重载虚方法filter::operator()来实现过滤器对元素处理,并返回一个将被下一个过滤器处理的元素指针。如果流里没有其他的要处理的元素,返回空值。最后一个过滤器的返回值将被忽略。

3、生成pipeline类的实例

4、生成过滤器f的实例,并将它们按先后顺序加给pipeline。一个过滤器的实例一次只能加给一个pipeline。同一时间,一个过滤器禁止成为多个pipeline的成员。

5、调用pipeline::run方法。参数max_number_of_live_tokens指定了能并发运行的阶段数量上限。较高的值会以更多的内存消耗为代价来增加并发性。
函数parallel_pipeline提供了一种强类型的面向lambda的方式来建立并运行管道。

过滤器基类

filter

class filter

{

public:

enum mode

{

parallel = implementation-defined,

serial_in_order = implementation-defined,

serial_out_of_order =implementation-defined

};

bool is_serial() const;

bool is_ordered() const;

virtual void* operator()( void* item ) = 0;

virtual void finalize( void* item ) {}

virtual ~filter();

protected:

filter( mode );

};

过滤器模式有三种模式:parallel,serial_in_order,serial_out_of_order

Ø parallel过滤器能不按特定的顺序并行处理多个工作项

Ø serial_out_of_order过滤器不按特定的顺序每次处理一个工作项

Ø serial_in_order过滤器每次处理一个工作项。管道中的所有serial_in_order过滤器都按同样的顺序处理工作项。

由于parallel过滤器支持并行加速,所以推荐使用。如果必须使用serial过滤器,那么serial_out_of_order类型的过滤器是优先考虑的,因为他在处理顺序上的约束较少。

线程绑定过滤器

thread_bound_filter

classthread_bound_filter: public filter

{

protected:

thread_bound_filter(mode filter_mode);

public:

enum result_type

{

success,

item_not_available,

end_of_stream

};

result_type try_process_item();

result_type process_item();

};
管道中过滤器的抽象基类,线程必须显式为其提供服务。当一个过滤器必须由某个指定线程执行的时候会派上用场。服务于thread_bound_filter的线程不能是调用pipeline::run()的线程。
example:
#include<iostream>

#include <tbb/pipeline.h>

#include<tbb/compat/thread>

#include<tbb/task_scheduler_init.h>

using namespacestd;
using namespacetbb;
char input[] ="abcdefg\n";

classinputfilter:public filter
{
char *_ptr;
public:
void *operator()(void *)
{
if(*_ptr)
{
cout<<"input:"<<*_ptr<<endl;
return _ptr++;
}
else   return 0;

}
inputfilter():filter(serial_in_order),_ptr(input){}
};

classoutputfilter: public thread_bound_filter
{
public:
void *operator()(void *item)
{
cout<<*(char*)item;
return 0;
}
outputfilter():thread_bound_filter(serial_in_order){}
};

voidrun_pipeline(pipeline *p)
{
p->run(8);
}

int main()
{
inputfilter inf;
outputfilter ouf;
pipeline p;
p.add_filter(inf);
p.add_filter(ouf);
//由于主线程服务于继承自thread_bound_filter的outputfilter,所以pipeline要运行在另一个单独的线程
thread t(run_pipeline, &p);
while(ouf.process_item()!=thread_bound_filter::end_of_stream)
continue;
t.join();
return 0;
}


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: