您的位置:首页 > 运维架构 > Apache

BSP-Apache HAMA-Graph运行(1)

2016-07-01 17:12 981 查看
Apache-HAMA框架图



1、由BSPJobClient实现作业提交,实现的方法是:GraphJob.submit()



submit的主要内容是VertexID、VertexValue、EdgeValue等信息。

2、数据的加载

由“Loads vertices into memory ofeach peer,GraphJobRunner.loadVertices()”是通过通过GraphJobRunner.loadVertices()方法把parse好的顶点加载到each peer的内存中,loadVertices()方法的代码:

private void loadVertices(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
for (int i = 0; i < peer.getNumPeers(); i++) {
partitionMessages.put(i, new GraphJobMessage());
}
VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
.newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
VertexInputReader.class));
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
.newCachedThreadPool();
executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
executor.setRejectedExecutionHandler(retryHandler);
KeyValuePair<Writable, Writable> next = null;
while ((next = peer.readNext()) != null) {
Vertex<V, E, M> vertex = GraphJobRunner
.<V, E, M> newVertexInstance(VERTEX_CLASS);
boolean vertexFinished = false;
try {
vertexFinished = reader.parseVertex(next.getKey(), next.getValue(),
vertex);
} catch (Exception e) {
throw new IOException("Parse exception occured: " + e);
}
if (!vertexFinished) {
continue;
}
Runnable worker = new Parser(vertex);
executor.execute(worker);
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
Iterator<Entry<Integer, GraphJobMessage>> it;
it = partitionMessages.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, GraphJobMessage> e = it.next();
it.remove();
GraphJobMessage msg = e.getValue();
msg.setFlag(GraphJobMessage.PARTITION_FLAG);
peer.send(getHostName(e.getKey()), msg);
}
peer.sync();
executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
executor.setRejectedExecutionHandler(retryHandler);

GraphJobMessage msg;
while ((msg = peer.getCurrentMessage()) != null) {
executor.execute(new AddVertex(msg));
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
LOG.info(vertices.size() + " vertices are loaded into "
+ peer.getPeerName());
}
把所有的vertex is finished,需要在GraphJobRunner.loadVertices()方法中把vertex的信息用ConcurrentHashMap进行分割后在节点之间进行发送



3、数据的发送

Send a data with a tag to another BSPSlave corresponding to hostname.Messages sent by this method are not guaranteed to be received in a sent order.说明:BSP
-HAMA的bsp信息的发送和接收并不是严格一一对应的。



Abstract baseclass that should contain all information and services needed
for the concreteRPC subclasses. For example it manages how the queues are managed and itmaintains a cache for socket addresses.(抽象类AbstractMessageManager是节点进行所有信息和服务的RPC超类,主要是管理和维护GraphJobMessage信息在队列(queue)内的的操作)
进入messenger.send()进入,AbstractMessageManager类的send()方法:向outgoing中添加peerName和value(GraphJobMessage)信息,outgoingMessageManager.addMesssage(peerName,msg),peerName表示hostName
of peer。





因此,从outgoingBundles.put(targetPeerAddress,bundle)和outgoingBundles.get(targetPeerAddress).Add(msg)可以看出信息被put(进入)HashMap中,并没有把信息发出去。而且BSPPeer.send()发送消息就是把BSPPeer的name和GraphJobMessage信息加载到HashMap中,发送给其他的BSPPeer。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hama bsp