您的位置:首页 > 其它

Hama共享内存通信问题

2016-09-12 23:13 218 查看
1. Java的共享内存通信

总体性能未见提高,且由于避免了sync()中的通信,使得数据传输时间较短,导致如下问题:

16/09/12 18:16:27 INFO graph.GraphJobRunner: Start process msg:                       1473675387360

16/09/12 18:16:27 INFO message.MesssagShareManager: messageManager memoryRead finish: 1473675387864

可见当读完数据时,循环已经开始读取并计算。因此反而会影响真正的计算过程,虽然一轮超步中的sync时间缩短了,但是looping计算的时间却增加了,所以总体性能未见提高。杯具!

 

 附上解决代码:

package org.apache.hama.bsp.message;

import io.netty.util.internal.ConcurrentSet;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.FileChannel.MapMode;
import java.util.Iterator;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hama.graph.GraphJobMessage;
import org.apache.hama.util.WritableUtils;

public class MessageShareHandler<M extends Writable> implements Callable<Boolean>{
public MappedByteBuffer mbb;
public static String  MEMORY_SHARE_FILE = "/opt/hama-0.7.1/share/" ;
RandomAccessFile raf; // 共享内存对应文件
public static int bufferSize = 30 * 1024 * 1024;  //内存
ConcurrentSet<GraphJobMessage> localMessages = new ConcurrentSet<GraphJobMessage>() ;  //消息容器,消息进来时已经经过combine,因此不会重复Id

protected static final Log LOG = LogFactory
.getLog(MessageShareHandler.class);

public MessageShareHandler(String shareFile, Configuration conf) {
try {
raf = new RandomAccessFile(shareFile , "rw");
} catch (IOException e) {
e.printStackTrace();
}
}

public void addLocalMessage(M msg) {
localMessages.add((GraphJobMessage)msg) ;
}

@Override
public Boolean call() throws Exception {
try {
FileChannel fc = raf.getChannel();
FileLock flock = fc.tryLock();

if(flock==null) {
Thread.sleep(10) ;
LOG.info("MessageShareHandler sleep 10 ms") ;
} else {
mbb = fc.map(MapMode.READ_WRITE, 0, bufferSize);  //因为写之前不知道需要映射多大共享内存,暂定30M
mbb.position(4) ; //预留一个int长度(4个字节)作为文件长度
int totalLength  = 4 ; //最大2G

//模拟发送,最好像v0.6.4版本中,为每个任务的数据设置一个目录,所有发送到该任务的共享内存数据全部映射到这里!
Iterator<GraphJobMessage> it = localMessages.iterator();
while (it.hasNext()) {
GraphJobMessage e = it.next();
it.remove();
LOG.info("Msg : " + e.toString() + " is shared! ") ;
byte[] message = WritableUtils.serialize(e) ;
int msgLen = message.length ;
mbb.putInt(msgLen) ;                   //这样快还是写入流读取,比如写到流里, DataOutput output output.toByteArray()
mbb.put(message);
//		          System.out.println("Position : " + mbb.position());
totalLength = totalLength + msgLen +4 ;
}
mbb.putInt(0,mbb.position()) ; //补写长度
flock.release() ;
LOG.info("IsLoaded: " + mbb.isLoaded() + " Length: "+ totalLength +" Position: " + mbb.position());
}
close() ;
} catch (Exception e) {
e.printStackTrace();
}
return true;
}

/**
* 每轮同步结束时需要清空下
*/
public void close() {
//		try {
mbb.clear() ;
//			raf.close() ;
localMessages.clear() ;
//		} catch (IOException e) {
//			e.printStackTrace();
//		}
}
}


 
package org.apache.hama.bsp.message;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.FileChannel.MapMode;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.graph.GraphJobMessage;
import org.apache.hama.util.ReflectionUtils;
import org.apache.hama.util.WritableUtils;

public class MessageShareReader<M extends Writable> implements Callable<Boolean> {
public MappedByteBuffer mbb;
RandomAccessFile raf; // 共享内存对应文件
//	ConcurrentSet<GraphJobMessage> localMessages = new ConcurrentSet<GraphJobMessage>() ;  //接收共享目录下的各任务消息
private Combiner<Writable> combiner ;
MessageManager mManager ; //消息接收

protected static final Log LOG = LogFactory
.getLog(MessageShareHandler.class);

public MessageShareReader(String shareFile,  MessageManager messageManager ,Configuration conf) {
try {
//		    final String combinerName = "org.apache.hama.examples.PageRank$PagerankCombiner";
final String combinerName = conf.get(Constants.COMBINER_CLASS);
if (combinerName != null) {
combiner = (Combiner<Writable>) ReflectionUtils
.newInstance(combinerName);
}

mManager = messageManager ;
raf = new RandomAccessFile(shareFile, "rw");
FileChannel fc = raf.getChannel();
mbb = fc.map(MapMode.READ_ONLY, 0, fc.size()); // 映射的共享内存
mbb.load() ; // 预加载进内存
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}

public void close() {
try {
mbb.clear() ; //清空mbb
raf.close() ;
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public Boolean call() throws Exception {
try {
FileChannel fc = raf.getChannel();
FileLock flock = fc.tryLock();
while(flock==null) {           //轮询等待读取消息
Thread.sleep(10) ;
}
LOG.info("IsLoaded: " + mbb.isLoaded() +" position:"+ mbb.position());
int fileLength = mbb.getInt() ;
//			buffer = new byte[fileLength] ;
//			mbb.get(buffer) ; //本地消息缓存, 是否一次性读出?

BSPMessageBundle<GraphJobMessage> bundle = new BSPMessageBundle<GraphJobMessage>() ;
while(mbb.position() < fileLength ) {
int msgLength = mbb.getInt() ;
if(msgLength>0) {
byte[] message = new byte[msgLength] ;
mbb.get(message) ;
GraphJobMessage gjm = new GraphJobMessage() ;
WritableUtils.deserialize(message, gjm) ;
bundle.addMessage(gjm); //先不考虑同目录下消息合并
}
}
fileLength = 0 ;
mbb.putInt(0, fileLength) ; //清空文件标志位
flock.release() ;
mManager.loopBackBundle(bundle) ;
close() ;
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
}

package org.apache.hama.bsp.message;

public class MesssagShareManager implements Runnable {
AbstractMessageManager messageManager ;
RpcSyncClient rpcc ;
public static boolean ifShare = false;

public MesssagShareManager(AbstractMessageManager aMessageManager, RpcSyncClient rpcc) {
messageManager = aMessageManager ;
this.rpcc = rpcc;
}

@Override
public void run() {
messageManager.transferLocalMessages();

if(ifShare==false) {
try {
rpcc.enterBarrier() ;
messageManager.startMemoryShare() ;
rpcc.leaveBarrier() ;
messageManager.startMemoryRead() ;  //首轮超步需同步一次以避免丢失数据
ifShare = true ;
} catch (Exception e) {
e.printStackTrace();
}
} else {
messageManager.startMemoryShare() ;
messageManager.startMemoryRead() ;
}
}
}


2. 解决Hama中出现的java.lang.NullPointerException
at org.apache.hama.util.UnsafeByteArrayInputStream.<init>(UnsafeByteArrayInputStream.java:63)
at org.apache.hama.util.WritableUtils.unsafeDeserialize(WritableUtils.java:63)
at org.apache.hama.graph.MapVerticesInfo.get(MapVerticesInfo.java:101)
at org.apache.hama.graph.GraphJobRunner$ComputeRunnable.<init>(GraphJobRunner.java:376)

在增加顶点打印或者消息打印后,有时候这个问题反而没有了,猜测可能是由于其容器ConcurrenHashMap的并发读取问题,

所以换成另外一种捕获异常的方法,在捕获异常时打印对应的顶点ID或者消息ID.

3.当ERROR精度设定极小时(如10-6),值精度每次计算略有不同(小数点10位以后),不影响结果的正确性。

4. 启动新线程的代价往往是极小的,如:

new Thread(new LocalMessageTransfer(((AbstractMessageManager)messenger),getSuperstepCount())).start() ;
LOG.info("Start LocalMessageTransfer last: " + (System.currentTimeMillis() - start) + " ms") ;

16/09/12 22:05:27 INFO bsp.BSPPeerImpl: Start LocalMessageTransfer last: 0 ms

5. 关于IncomingVertexMessageManager中的消息容器

这里使用 ConcurrentHashMap 作为消息容器,没有加锁,但是由于 ConcurrentHashMap特点,只在同一个ID互斥操作时加锁,确实不会出现冲突。即使部分外部代码可能导致的没有加锁的乱序操作,也是无碍,是因为该操作满足结合律,谁先加后加,没有区别,不会出现顺序颠倒导致结果不对的情况,和银行账户操作不同。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: