您的位置:首页 > 其它

Jgroups

2016-05-11 00:00 351 查看
摘要: 分布式系统要求服务器间数据同步,因此采用了jgroups进行通信。

这是之前在网上看到的文章,跟着学习了一把。结合上一篇文章,再定义一个实体,cacheType,cacheKey,cacheValue,cacheSeconds就可以用jgroups+Guava cache实现分布式localCache的数据同步。

[code=plain]package jgroups;

import org.jgroups.*;
import org.jgroups.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created by ZhaoYun on 2016/5/8.
* 节点
*/
public class Node extends ReceiverAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(Node.class);

/**
* 配置文件.
*/
private static final String CONFIG_XML = "network-tcp.xml";

/**
* 集群名称.
*/
private static final String CLUSTER_NAME = "ZY";

/**
* 节点通道.
*/
private JChannel channel = null;

/**
* 以此作为节点间初始化的同步数据.
*/
private Map<String, String> cacheData = new HashMap<String, String>();

private ReentrantLock lock = new ReentrantLock();

public Node(){
InputStream is = this.getClass().getClassLoader().getResourceAsStream(CONFIG_XML);
try {
channel = new JChannel(is);
channel.setReceiver(this);
channel.connect(CLUSTER_NAME);
channel.getState(null,50000);
} catch (Exception e) {
System.out.println("启动节点异常!" + e.getMessage());
// 最好是自定义RuntimeException!
throw new RuntimeException("启动节点异常!", e);
}
}

/**
*  发送消息给目标地址.
* @param address 为空表示发给所有节点.
* @param object 消息
*/
public void sendMsg(Address address,Object object){
Message message = new Message(address,null,object);
try{
channel.send(message);
}catch (Exception e){
System.out.println("send message error:"+e.getMessage());
throw new RuntimeException("send message error!", e);
}
}

/**
*
* @param output
* @throws Exception
*/
@Override
public void getState(OutputStream output)throws Exception{
lock.lock();
try{
Util.objectToStream(cacheData,new DataOutputStream(output));
}catch (Exception e){
System.out.println("get state error:"+ e.getMessage());
throw new RuntimeException();
}finally {
lock.unlock();
}
}

/**
*
* @param message
*/
@Override
public void receive(Message message){
//当前节点不接收自己发送到通道当中的消息.
if(message.getSrc().equals(channel.getAddress())){
System.out.println(" self ");
return;
}
System.out.println(message.getObject()+ "ZY"+message.getDest());
}

/**
*
* @param inputStream
* @throws Exception
*/
@Override
public void setState(InputStream inputStream) throws Exception{
lock.lock();
try{
Map<String, String> cacheData = (Map<String, String>) Util.objectFromStream(new DataInputStream(inputStream));
this.cacheData.putAll(cacheData);
}catch (Exception e){
System.out.println("从主节点同步状态到当前节点发生异常!" + e.getMessage());
}finally {
lock.unlock();
}
}

@Override
public void viewAccepted(View view) {
System.out.println("当前成员[" + this.channel.getAddressAsString() + "]");
System.out.println(view.getCreator());
System.out.println(view.getMembers());
System.out.println("当前节点数据:" + cacheData);
}

/**
* 提供一个简单的初始化数据的方法.
* @param key
* @param val
*/
public void addData(String key,String val){
if(key!=null&&!key.isEmpty()){
cacheData.put(key, val);
}
}

Jgroups maven依赖:

[code=plain]<!-- jgroups start  用于多实例之间信息同步-->
<dependency>
<groupId>org.jgroups</groupId>
<artifactId>jgroups</artifactId>
<version>3.5.0.Final</version>
</dependency>
<!-- jgroups end -->
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: