您的位置:首页 > 其它

MapReduce Key Revert ——特定数据模式的负载均衡

2009-10-27 11:45 344 查看

符号、记法

其中{k,v}指一个Key,Value对,{..} 中第一个分量是Key,第二个是Value。

[e]指一个集合,其中的元素为e。

[{k,v}]就指一个{k,v}的集合。

问题

给定巨大的集合S=[{k1,k2}],对S中每个k1,计算k1相同,而k2不同的元素个数。生成[{k1,distinct_count([k2])}]。

其中S数据的特点是:

1. 不同k1的数目很少(少于1K,很可能小于机器数目);

2. 不同k2的数目很大(大于20M);

3. 总的记录数目很大(大于1G);

4. k1的分布很不均匀,最频繁的k1,其记录数占到总数的1/3,对应的独立k2几乎包括整个k2集合;

5. k2的分布可以认为是均匀分布。

数据集的这些特点,也正是这种方法的适用条件。如果k1,k2的分布都很均匀,并且不同k1的数目远大于机器数目,还是用传统方案比较简单。

MapReduce解决方案

传统方案

Map.Input={k1,k2}

Map.Output={k1,k2}

Partition=Hash(k1)

Reduce.Input={k1,k2}

Reduce.Output={k1,distinct_count(k2)}

Reduce中,对每个k1,需要做set_union,计算
distinct_count(k2)=count(set_union([[k2]]))

这种方法的问题是因为k1的分布不均匀,导致负载不平衡,最频繁的k1,成为计算的瓶颈,无法提高并行度。

倒转Key

Map1.Input={k1,k2}

Map1.Output={k2,k1}

Partition1=Hash(k2)

Reduce1.Input=Map1.Output={k2,k1}

Reduce1.Output={k1,cnt=distinct_count([{k1,k2}])}

Map2.Input=Reduce1.Output

Map2=Identity

Partition2=Hash(k1)

Reduce2.Input=Map2.Output={k1,cnt}

Reduce2.Output={k1,sum([cnt])}

l 在Map1和Reduce1之间,按Hash(k2)进行partition。

l 每个Reduce1处理的数据就只包含k2的一个子集;因为k2均匀分布,每个子集尺寸相当;最关键之处就在这里,这样一个转换,达到了负载均衡!

l Reduce1维护一个全局的hash_map<k1,hash_set<k2> >,对每条记录,分别进行一次k1和k2的查找或插入【m[k1].insert(k2)】。

l Reduce2每读一条输入,要进行一次k1的查找,并累加该k1->cnt【m[k1]+=cnt】。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: