Hama共享内存通信问题
2016-09-12 23:13
218 查看
1. Java的共享内存通信
总体性能未见提高,且由于避免了sync()中的通信,使得数据传输时间较短,导致如下问题:
16/09/12 18:16:27 INFO graph.GraphJobRunner: Start process msg: 1473675387360
16/09/12 18:16:27 INFO message.MesssagShareManager: messageManager memoryRead finish: 1473675387864
可见当读完数据时,循环已经开始读取并计算。因此反而会影响真正的计算过程,虽然一轮超步中的sync时间缩短了,但是looping计算的时间却增加了,所以总体性能未见提高。杯具!
附上解决代码:
2. 解决Hama中出现的java.lang.NullPointerException
at org.apache.hama.util.UnsafeByteArrayInputStream.<init>(UnsafeByteArrayInputStream.java:63)
at org.apache.hama.util.WritableUtils.unsafeDeserialize(WritableUtils.java:63)
at org.apache.hama.graph.MapVerticesInfo.get(MapVerticesInfo.java:101)
at org.apache.hama.graph.GraphJobRunner$ComputeRunnable.<init>(GraphJobRunner.java:376)
在增加顶点打印或者消息打印后,有时候这个问题反而没有了,猜测可能是由于其容器ConcurrenHashMap的并发读取问题,
所以换成另外一种捕获异常的方法,在捕获异常时打印对应的顶点ID或者消息ID.
3.当ERROR精度设定极小时(如10-6),值精度每次计算略有不同(小数点10位以后),不影响结果的正确性。
4. 启动新线程的代价往往是极小的,如:
new Thread(new LocalMessageTransfer(((AbstractMessageManager)messenger),getSuperstepCount())).start() ;
LOG.info("Start LocalMessageTransfer last: " + (System.currentTimeMillis() - start) + " ms") ;
16/09/12 22:05:27 INFO bsp.BSPPeerImpl: Start LocalMessageTransfer last: 0 ms
5. 关于IncomingVertexMessageManager中的消息容器
这里使用 ConcurrentHashMap 作为消息容器,没有加锁,但是由于 ConcurrentHashMap特点,只在同一个ID互斥操作时加锁,确实不会出现冲突。即使部分外部代码可能导致的没有加锁的乱序操作,也是无碍,是因为该操作满足结合律,谁先加后加,没有区别,不会出现顺序颠倒导致结果不对的情况,和银行账户操作不同。
总体性能未见提高,且由于避免了sync()中的通信,使得数据传输时间较短,导致如下问题:
16/09/12 18:16:27 INFO graph.GraphJobRunner: Start process msg: 1473675387360
16/09/12 18:16:27 INFO message.MesssagShareManager: messageManager memoryRead finish: 1473675387864
可见当读完数据时,循环已经开始读取并计算。因此反而会影响真正的计算过程,虽然一轮超步中的sync时间缩短了,但是looping计算的时间却增加了,所以总体性能未见提高。杯具!
附上解决代码:
package org.apache.hama.bsp.message; import io.netty.util.internal.ConcurrentSet; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.FileChannel.MapMode; import java.util.Iterator; import java.util.concurrent.Callable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hama.graph.GraphJobMessage; import org.apache.hama.util.WritableUtils; public class MessageShareHandler<M extends Writable> implements Callable<Boolean>{ public MappedByteBuffer mbb; public static String MEMORY_SHARE_FILE = "/opt/hama-0.7.1/share/" ; RandomAccessFile raf; // 共享内存对应文件 public static int bufferSize = 30 * 1024 * 1024; //内存 ConcurrentSet<GraphJobMessage> localMessages = new ConcurrentSet<GraphJobMessage>() ; //消息容器,消息进来时已经经过combine,因此不会重复Id protected static final Log LOG = LogFactory .getLog(MessageShareHandler.class); public MessageShareHandler(String shareFile, Configuration conf) { try { raf = new RandomAccessFile(shareFile , "rw"); } catch (IOException e) { e.printStackTrace(); } } public void addLocalMessage(M msg) { localMessages.add((GraphJobMessage)msg) ; } @Override public Boolean call() throws Exception { try { FileChannel fc = raf.getChannel(); FileLock flock = fc.tryLock(); if(flock==null) { Thread.sleep(10) ; LOG.info("MessageShareHandler sleep 10 ms") ; } else { mbb = fc.map(MapMode.READ_WRITE, 0, bufferSize); //因为写之前不知道需要映射多大共享内存,暂定30M mbb.position(4) ; //预留一个int长度(4个字节)作为文件长度 int totalLength = 4 ; //最大2G //模拟发送,最好像v0.6.4版本中,为每个任务的数据设置一个目录,所有发送到该任务的共享内存数据全部映射到这里! Iterator<GraphJobMessage> it = localMessages.iterator(); while (it.hasNext()) { GraphJobMessage e = it.next(); it.remove(); LOG.info("Msg : " + e.toString() + " is shared! ") ; byte[] message = WritableUtils.serialize(e) ; int msgLen = message.length ; mbb.putInt(msgLen) ; //这样快还是写入流读取,比如写到流里, DataOutput output output.toByteArray() mbb.put(message); // System.out.println("Position : " + mbb.position()); totalLength = totalLength + msgLen +4 ; } mbb.putInt(0,mbb.position()) ; //补写长度 flock.release() ; LOG.info("IsLoaded: " + mbb.isLoaded() + " Length: "+ totalLength +" Position: " + mbb.position()); } close() ; } catch (Exception e) { e.printStackTrace(); } return true; } /** * 每轮同步结束时需要清空下 */ public void close() { // try { mbb.clear() ; // raf.close() ; localMessages.clear() ; // } catch (IOException e) { // e.printStackTrace(); // } } }
package org.apache.hama.bsp.message; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.FileChannel.MapMode; import java.util.concurrent.Callable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hama.Constants; import org.apache.hama.bsp.BSPMessageBundle; import org.apache.hama.bsp.Combiner; import org.apache.hama.graph.GraphJobMessage; import org.apache.hama.util.ReflectionUtils; import org.apache.hama.util.WritableUtils; public class MessageShareReader<M extends Writable> implements Callable<Boolean> { public MappedByteBuffer mbb; RandomAccessFile raf; // 共享内存对应文件 // ConcurrentSet<GraphJobMessage> localMessages = new ConcurrentSet<GraphJobMessage>() ; //接收共享目录下的各任务消息 private Combiner<Writable> combiner ; MessageManager mManager ; //消息接收 protected static final Log LOG = LogFactory .getLog(MessageShareHandler.class); public MessageShareReader(String shareFile, MessageManager messageManager ,Configuration conf) { try { // final String combinerName = "org.apache.hama.examples.PageRank$PagerankCombiner"; final String combinerName = conf.get(Constants.COMBINER_CLASS); if (combinerName != null) { combiner = (Combiner<Writable>) ReflectionUtils .newInstance(combinerName); } mManager = messageManager ; raf = new RandomAccessFile(shareFile, "rw"); FileChannel fc = raf.getChannel(); mbb = fc.map(MapMode.READ_ONLY, 0, fc.size()); // 映射的共享内存 mbb.load() ; // 预加载进内存 } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } public void close() { try { mbb.clear() ; //清空mbb raf.close() ; } catch (IOException e) { e.printStackTrace(); } } @Override public Boolean call() throws Exception { try { FileChannel fc = raf.getChannel(); FileLock flock = fc.tryLock(); while(flock==null) { //轮询等待读取消息 Thread.sleep(10) ; } LOG.info("IsLoaded: " + mbb.isLoaded() +" position:"+ mbb.position()); int fileLength = mbb.getInt() ; // buffer = new byte[fileLength] ; // mbb.get(buffer) ; //本地消息缓存, 是否一次性读出? BSPMessageBundle<GraphJobMessage> bundle = new BSPMessageBundle<GraphJobMessage>() ; while(mbb.position() < fileLength ) { int msgLength = mbb.getInt() ; if(msgLength>0) { byte[] message = new byte[msgLength] ; mbb.get(message) ; GraphJobMessage gjm = new GraphJobMessage() ; WritableUtils.deserialize(message, gjm) ; bundle.addMessage(gjm); //先不考虑同目录下消息合并 } } fileLength = 0 ; mbb.putInt(0, fileLength) ; //清空文件标志位 flock.release() ; mManager.loopBackBundle(bundle) ; close() ; } catch (Exception e) { e.printStackTrace(); } return true; } }
package org.apache.hama.bsp.message; public class MesssagShareManager implements Runnable { AbstractMessageManager messageManager ; RpcSyncClient rpcc ; public static boolean ifShare = false; public MesssagShareManager(AbstractMessageManager aMessageManager, RpcSyncClient rpcc) { messageManager = aMessageManager ; this.rpcc = rpcc; } @Override public void run() { messageManager.transferLocalMessages(); if(ifShare==false) { try { rpcc.enterBarrier() ; messageManager.startMemoryShare() ; rpcc.leaveBarrier() ; messageManager.startMemoryRead() ; //首轮超步需同步一次以避免丢失数据 ifShare = true ; } catch (Exception e) { e.printStackTrace(); } } else { messageManager.startMemoryShare() ; messageManager.startMemoryRead() ; } } }
2. 解决Hama中出现的java.lang.NullPointerException
at org.apache.hama.util.UnsafeByteArrayInputStream.<init>(UnsafeByteArrayInputStream.java:63)
at org.apache.hama.util.WritableUtils.unsafeDeserialize(WritableUtils.java:63)
at org.apache.hama.graph.MapVerticesInfo.get(MapVerticesInfo.java:101)
at org.apache.hama.graph.GraphJobRunner$ComputeRunnable.<init>(GraphJobRunner.java:376)
在增加顶点打印或者消息打印后,有时候这个问题反而没有了,猜测可能是由于其容器ConcurrenHashMap的并发读取问题,
所以换成另外一种捕获异常的方法,在捕获异常时打印对应的顶点ID或者消息ID.
3.当ERROR精度设定极小时(如10-6),值精度每次计算略有不同(小数点10位以后),不影响结果的正确性。
4. 启动新线程的代价往往是极小的,如:
new Thread(new LocalMessageTransfer(((AbstractMessageManager)messenger),getSuperstepCount())).start() ;
LOG.info("Start LocalMessageTransfer last: " + (System.currentTimeMillis() - start) + " ms") ;
16/09/12 22:05:27 INFO bsp.BSPPeerImpl: Start LocalMessageTransfer last: 0 ms
5. 关于IncomingVertexMessageManager中的消息容器
这里使用 ConcurrentHashMap 作为消息容器,没有加锁,但是由于 ConcurrentHashMap特点,只在同一个ID互斥操作时加锁,确实不会出现冲突。即使部分外部代码可能导致的没有加锁的乱序操作,也是无碍,是因为该操作满足结合律,谁先加后加,没有区别,不会出现顺序颠倒导致结果不对的情况,和银行账户操作不同。
相关文章推荐
- linux下进程间共享内存通信的问题
- linux进程内存共享---实现生产者消费者问题
- 线程通信机制:共享内存 VS 消息传递
- 操作系统 进程间的通信 之 信号 消息队列 共享内存 浅析
- 进程通信之五 共享内存
- 使用mmap函数进行内核空间和用户空间的共享内存通信
- 解决solaris10上因为共享内存不足导致不能初始化sybase15.0.3服务的问题
- 一步一步学Linux C:共享内存通信实例
- 内存数据库研发日志之一.共享内存的诡异问题
- linux进程间的通信之 共享内存
- DLL与Exe利用共享内存通信模拟
- 共享内存 —— 通过SharedPreferences实现进程间数据共享的问题详解
- Linux 进程通信--共享内存
- Linux 进程通信--共享内存。
- linux通信-共享内存
- win平台C语言共享内存通信
- 进程之间使用共享内存通信....
- 进程通信-共享内存-mmap()-code2
- 关于Python multiprocessing.Array创建的共享内存无法删除的问题
- 进程通信--共享内存