BSP-Apache HAMA-Graph运行(1)
2016-07-01 17:12
981 查看
Apache-HAMA框架图
1、由BSPJobClient实现作业提交,实现的方法是:GraphJob.submit()
submit的主要内容是VertexID、VertexValue、EdgeValue等信息。
2、数据的加载
由“Loads vertices into memory ofeach peer,GraphJobRunner.loadVertices()”是通过通过GraphJobRunner.loadVertices()方法把parse好的顶点加载到each peer的内存中,loadVertices()方法的代码:
private void loadVertices(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
for (int i = 0; i < peer.getNumPeers(); i++) {
partitionMessages.put(i, new GraphJobMessage());
}
VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
.newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
VertexInputReader.class));
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
.newCachedThreadPool();
executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
executor.setRejectedExecutionHandler(retryHandler);
KeyValuePair<Writable, Writable> next = null;
while ((next = peer.readNext()) != null) {
Vertex<V, E, M> vertex = GraphJobRunner
.<V, E, M> newVertexInstance(VERTEX_CLASS);
boolean vertexFinished = false;
try {
vertexFinished = reader.parseVertex(next.getKey(), next.getValue(),
vertex);
} catch (Exception e) {
throw new IOException("Parse exception occured: " + e);
}
if (!vertexFinished) {
continue;
}
Runnable worker = new Parser(vertex);
executor.execute(worker);
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
Iterator<Entry<Integer, GraphJobMessage>> it;
it = partitionMessages.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, GraphJobMessage> e = it.next();
it.remove();
GraphJobMessage msg = e.getValue();
msg.setFlag(GraphJobMessage.PARTITION_FLAG);
peer.send(getHostName(e.getKey()), msg);
}
peer.sync();
executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
executor.setRejectedExecutionHandler(retryHandler);
GraphJobMessage msg;
while ((msg = peer.getCurrentMessage()) != null) {
executor.execute(new AddVertex(msg));
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
LOG.info(vertices.size() + " vertices are loaded into "
+ peer.getPeerName());
}
把所有的vertex is finished,需要在GraphJobRunner.loadVertices()方法中把vertex的信息用ConcurrentHashMap进行分割后在节点之间进行发送
3、数据的发送
Send a data with a tag to another BSPSlave corresponding to hostname.Messages sent by this method are not guaranteed to be received in a sent order.说明:BSP
-HAMA的bsp信息的发送和接收并不是严格一一对应的。
Abstract baseclass that should contain all information and services needed
for the concreteRPC subclasses. For example it manages how the queues are managed and itmaintains a cache for socket addresses.(抽象类AbstractMessageManager是节点进行所有信息和服务的RPC超类,主要是管理和维护GraphJobMessage信息在队列(queue)内的的操作)
进入messenger.send()进入,AbstractMessageManager类的send()方法:向outgoing中添加peerName和value(GraphJobMessage)信息,outgoingMessageManager.addMesssage(peerName,msg),peerName表示hostName
of peer。
因此,从outgoingBundles.put(targetPeerAddress,bundle)和outgoingBundles.get(targetPeerAddress).Add(msg)可以看出信息被put(进入)HashMap中,并没有把信息发出去。而且BSPPeer.send()发送消息就是把BSPPeer的name和GraphJobMessage信息加载到HashMap中,发送给其他的BSPPeer。
1、由BSPJobClient实现作业提交,实现的方法是:GraphJob.submit()
submit的主要内容是VertexID、VertexValue、EdgeValue等信息。
2、数据的加载
由“Loads vertices into memory ofeach peer,GraphJobRunner.loadVertices()”是通过通过GraphJobRunner.loadVertices()方法把parse好的顶点加载到each peer的内存中,loadVertices()方法的代码:
private void loadVertices(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
for (int i = 0; i < peer.getNumPeers(); i++) {
partitionMessages.put(i, new GraphJobMessage());
}
VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
.newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
VertexInputReader.class));
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
.newCachedThreadPool();
executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
executor.setRejectedExecutionHandler(retryHandler);
KeyValuePair<Writable, Writable> next = null;
while ((next = peer.readNext()) != null) {
Vertex<V, E, M> vertex = GraphJobRunner
.<V, E, M> newVertexInstance(VERTEX_CLASS);
boolean vertexFinished = false;
try {
vertexFinished = reader.parseVertex(next.getKey(), next.getValue(),
vertex);
} catch (Exception e) {
throw new IOException("Parse exception occured: " + e);
}
if (!vertexFinished) {
continue;
}
Runnable worker = new Parser(vertex);
executor.execute(worker);
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
Iterator<Entry<Integer, GraphJobMessage>> it;
it = partitionMessages.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, GraphJobMessage> e = it.next();
it.remove();
GraphJobMessage msg = e.getValue();
msg.setFlag(GraphJobMessage.PARTITION_FLAG);
peer.send(getHostName(e.getKey()), msg);
}
peer.sync();
executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
executor.setRejectedExecutionHandler(retryHandler);
GraphJobMessage msg;
while ((msg = peer.getCurrentMessage()) != null) {
executor.execute(new AddVertex(msg));
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
LOG.info(vertices.size() + " vertices are loaded into "
+ peer.getPeerName());
}
把所有的vertex is finished,需要在GraphJobRunner.loadVertices()方法中把vertex的信息用ConcurrentHashMap进行分割后在节点之间进行发送
3、数据的发送
Send a data with a tag to another BSPSlave corresponding to hostname.Messages sent by this method are not guaranteed to be received in a sent order.说明:BSP
-HAMA的bsp信息的发送和接收并不是严格一一对应的。
Abstract baseclass that should contain all information and services needed
for the concreteRPC subclasses. For example it manages how the queues are managed and itmaintains a cache for socket addresses.(抽象类AbstractMessageManager是节点进行所有信息和服务的RPC超类,主要是管理和维护GraphJobMessage信息在队列(queue)内的的操作)
进入messenger.send()进入,AbstractMessageManager类的send()方法:向outgoing中添加peerName和value(GraphJobMessage)信息,outgoingMessageManager.addMesssage(peerName,msg),peerName表示hostName
of peer。
因此,从outgoingBundles.put(targetPeerAddress,bundle)和outgoingBundles.get(targetPeerAddress).Add(msg)可以看出信息被put(进入)HashMap中,并没有把信息发出去。而且BSPPeer.send()发送消息就是把BSPPeer的name和GraphJobMessage信息加载到HashMap中,发送给其他的BSPPeer。
相关文章推荐
- VxWork BSP 和启动过程
- PB工程中三个主要的.reg文件的区别!
- Hama安装
- Hama安装
- BSP 概念解析
- CRM开发总结2:BOL找字段层级关系
- CRM开发总结1:线索增强程序
- SylixOS的BSP开发实例之S3C2416 【第五篇】S3C2416 启动模式之 NandFlash
- SylixOS的BSP开发实例之S3C2416 【第三篇】S3C2416 寄存器描述
- SylixOS的BSP开发实例之S3C2416 【第二篇】内存映射与初始布局
- BSP
- 信息系统规划方法-企业系统规划法(BSP)
- Alsa period_size/periods/buffer_size计算逻辑
- 天嵌TQ_E9卡片电脑移植飞思卡尔yocto L4.1.15_1.0.0_ga 第四篇 新版kernel(L4.1.15-r)移植
- 天嵌TQ_E9卡片电脑移植飞思卡尔yocto L4.1.15_1.0.0_ga 第三篇 新版uboot(2015.04-r0)网卡移植(RTL8211E)
- 天嵌TQ_E9卡片电脑移植飞思卡尔yocto L4.1.15_1.0.0_ga 第一篇 yocto项目建立
- Yocto BSP 开发说明
- destination exists and is not a directory
- apache-hama集群配置
- match vertex