您的位置:首页 > 运维架构 > Apache

Apache-Hama框架简介–BSP模型的实现

2014-07-08 17:37 609 查看

Hama概况

Hama是基于BSP(BulkSynchronousParallel)计算技术的并行计算框架,用于大量的科学计算(比如矩阵、图论、网络等)。BSP计算技术最大的优势是加快迭代,在解决最小路径等问题中可以快速得到可行解(http://wiki.apache.org/hama/Benchmarks)。同时,Hama提供简单的编程,比如flexible模型、传统的消息传递模型,而且兼容很多分布式文件系统,比如HDFS、Hbase等。用户可以使用现有的Hadoop集群进行HamaBSP.

现在Hama最新的版本为2012年6月31号发行的0.5.0.这是Hama做为Apache顶级项目后首次发布的版本,该版本包含两个显著的新特性,分别是消息压缩器和完整的GooglePregel克隆,另外在计算系统性能和可持续性上都得以提升。

Hama结构

Hama主要有三部分构成:BSPMaster、GroomServers和Zookeeper。与Hadoop结构很相似,但没有通信和同步机制的部分。

Hama的集群由一个BSPMaster和多个互不关联的GroomServer作计算结点组成,HDFS和Zookeeper都可以是独立的集群。启动从BSPMaster开始,如果是master会启动BSPMaster、GroomServer两个进程,如果只是计算结点则只会启动GroomServer,启动/关闭脚本都是Master机器远程在GroomServer机器上执行。



BSPMaster

BSPMaster即集群的主,负责了集群各GroomServer结点的管理与作业的调度,就我所知它还存在单点的问题。相当于Hadoop的JobTracker或HDFS的NameNode。其基本作用如下:

1.维持Groom服务器状态。

2.维护supersteps和集群中的计数器。

3.维护Job的进度信息。

4.调度作业和任务分配给Groom服务器

5.分配执行的类和配置,整个Groom服务器。

6.为用户提供集群控制接口(Web和基于控制台)。

GroomServer

GroomServer是一个process,通过BSPMaster启动BSP任务。每一个Groom都有BSPMaster通信,可以通过BSPMaster获取任务,报告状态。GroomServer在HDFS或者其他文件系统上运行,通常,GroomServer与与数据结点在一个物理结点上运行,以保证获得最佳性能。

Zookeeper

Zookeeper用来管理BSPPeer的同步,用于实现BarrierSynchronisation机制。在ZK上,进入BSPPeer主要有进入Barrier和离开Barrier操作,所有进入Barrier的Peer会在zk上创建一个EPHEMERAL的node(/bsp/JobID/SuperstepNO./TaskID),最后一个进入Barrier的Peer同时还会创建一个readynode(/bsp/JobID/SuperstepNO./ready),Peer进入阻塞状态等待zk上所有task的node都删除后退出Barrier

BSPProgrammingModel

BSP(BulkSynchronousParallel,整体同步并行计算模型)是英国计算机科学家Viliant在上世纪80年代提出的一种并行计算模型。Google发布的一往篇论文(《Pregel:ASystemforLarge-ScaleGraphProcessing》)使得这一概念被更多人所认识,据说在Google80%的程序运行在MapReduce上,20%的程序运行在Pregel上。和MapReduce一样,Google并没有开源Pregel,Apache按Pregel的思想提供了类似框架Hama。

HamaBSP是基于大容量同步并行模型,利用分布式节点计算大量步骤。通常,BSP程序包含一序列的superstep。每一个superstep包含三个步骤:

Localcomputation

Processcommunication

Barriersynchronization

BulkSynchronousParallelModel(http://en.wikipedia.org/wiki/Bulk_synchronous_parallel)

Hama提供用户自定义的函数bsf(),通过bsf函数,用户可以编写自己的BSP程序,并且BSP程序可以控制整个程序的并行部分,意味着bsf函数不仅仅是程序普通的一部分。在0.2版本中,完成BSF函数,仅仅需要达成通信接口协议,这样就可以获得更多的参数。

BSP是一种跟MapReduce平行的一种并行计算方法,如果说MapReduce是把底层的数据传输和分配完全对用户屏蔽了的话,那BSP就是一种要对底层的数据传输和分配进行手动编程规定的模式了.这点上跟MPI(一种古老的并行模式)很像.



每个计算节点进行并行计算,在communication的阶段进行收发,将运行结果记录在barrier上,等到所有计算节点运行到barrier,所有的计算节点在继续运行。通过这些原理可以理解为三个步骤:send,sync,receive.

Communication

在bsp函数中,用户可以使用communication函数通过使用BSPPeerProtocol完成多种操作,BSF通信标准库中会提供多种communication函数:

Function
Description
send(StringpeerName,BSPMessagemsg)
Sendsamessagetoanotherpeer
put(BSPMessagemsg)
Putsamessagetolocalqueue
getCurrentMessage()
Returnsareceivedmessage
getNumCurrentMessages()
Returnsthenumberofreceivedmessages
sync()
Barriersynchronization
getPeerName()
Returnsapeer’shostname
getAllPeerNames()
Returnsallpeer’shostname.
getSuperstepCount()
Returnsthecountofsupersteps
图计算涉及到大量消息传递,Hama不完全是实时传送,消息的传输发生在Peer进入同步阶段后,并且对同一个目标GroomServer的消息进行了合并,两个物理结点之间每一次超步其实只会发生一次传输。

这些函数非常灵活,比如send函数可以给所有的peer发送消息,其代码如下:

@Override

publicvoidbsp(
BSPPeer<NullWritable,NullWritable,Text,DoubleWritable,LongMessage>peer)
throwsIOException,SyncException,InterruptedException{

for(StringpeerName:peer.getAllPeerNames()){
peer.send(peerName,
newLongMessage("Hellofrom"+peer.getPeerName(),System.currentTimeMillis()));
}

peer.sync();
}

Synchronization

通过sync()函数可以将所有的进程进入barrier,Hama运行到下一个superstep,在上面send函数中,BSP发送消息给所有的peer,这一过程的结束是通过sync函数完成的。

@Override

publicvoidbsp(

BSPPeer<NullWritable,NullWritable,Text,DoubleWritable,Writable>peer)

throwsIOException,SyncException,InterruptedException{


for(inti=0;i<100;i++){

//sendsomemessages

peer.sync();

}

}


用户将所有的进程关闭时,BSP工作将会结束。

Counters

跟Hadoop的MapReduce类似,用户可以使用Counter。

Counter的基本原理是用户可以增加枚举数量。在用户的程序中跟踪这个有意义的指标,这样的话就像一个循环,一直在执行。

下面的这段代码是来展示在BSP中Counter是如何执行的

//enumdefinition

enumLoopCounter{

LOOPS

}


@Override

publicvoidbsp(

BSPPeer<NullWritable,NullWritable,Text,DoubleWritable,DoubleWritable>peer)

throwsIOException,SyncException,InterruptedException{

for(inti=0;i<iterations;i++){

//detailsommitted

peer.getCounter(LoopCounter.LOOPS).increment(1L);

}

//restommitted

}


SetupandCleanup

在0.4.0版本之后,用户可以在BSP代码中完成Setup和Cleanup方法,这些方法可以从BSP类中继承:

publicclassMyEstimatorextends

BSP<NullWritable,NullWritable,Text,DoubleWritable,DoubleWritable>{


@Override

publicvoidsetup(

BSPPeer<NullWritable,NullWritable,Text,DoubleWritable,DoubleWritable>peer)

throwsIOException{

//Setup:Chooseoneasamaster

this.masterTask=peer.getPeerName(peer.getNumPeers()/2);

}


@Override

publicvoidcleanup(

BSPPeer<NullWritable,NullWritable,Text,DoubleWritable,DoubleWritable>peer)

throwsIOException{

//yourcleanuphere

}


@Override

publicvoidbsp(

BSPPeer<NullWritable,NullWritable,Text,DoubleWritable,DoubleWritable>peer)

throwsIOException,SyncException,InterruptedException{

//yourcomputationhere

}

}


Hama的应用情况

有许多应用和组织使用Apache的Hama,例如:

KoreaTelecom

Hamaisusedfornetflowtrafficanalysis

NHN,corp.

Analysisofsocialnetworkdataasaevaluationtool.

Oraclecorporation

Socialnetworkanalysison100TBtweetsdataset

2Racks+InfiniBandnetwork

UniversityoftheStateofMatoGrossoandFacultyCampoLimpoPaulista

12microcomputerstotaling30cores

Graphswithupto65,000vertices

Studyofanexactsolutiontotheproblemofcentralityinlargegraphs.

同时,Google和Wikipedia也使用了HamaGraphFile作为数据存储和处理。

从2002年开始,GoogleWebGraph已经包含了875,713个node和5,105,039个edge。

具体的数据集如下表:(http://snap.stanford.edu/data/web-Google.html)

Datasetstatistics

Nodes

875713

Edges

5105039

NodesinlargestWCC

855802(0.977)

EdgesinlargestWCC

5066842(0.993)

NodesinlargestSCC

434818(0.497)

EdgesinlargestSCC

3419124(0.670)

Averageclusteringcoefficient

0.6047

Numberoftriangles

13391903

Fractionofclosedtriangles

0.05523

Diameter(longestshortestpath)

22

90-percentileeffectivediameter

8.1

利用HamaGraphFile作为数据存储和处理的不仅仅是Google,Wikipedia同样利用HamaGraphFile进行页面之间(page-to-pagelink)数据存储、分析和处理。这可能是最成功的中间数据集的计算。但是,在SQL文件中所提供的格式相当不方便,其中有很多没有意义的链接,Wikipedia对其中的SQL文件做了优化处理。

下图展示的是Wikipedia利用Hama的GraphFile储存的数据量:

Total#pages

5,716,808

Total#links

130,160,392

Max.#outlinksfromasinglepage

5,775

Max.#inlinkstoasinglepage

374,934

#pageswithnooutlinks

10,438

#pageswithnoinlinks

1,942,943

引用:

BulkSynchronousParallel,http://en.wikipedia.org/wiki/Bulk_synchronous_parallel
EdwardJ.YoonApacheHama(v0.2):UserGuideaBSP-baseddistributedcomputingframework

IEEE_CLOUDCOM2010_HAMA

BenHHJuurlink;HarryAGWijshoffCommunicationprimitivesforBSPcomputers1996doi:10.1016/0020-0190(96)00073-7
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: