Jgroups
2016-05-11 00:00
351 查看
摘要: 分布式系统要求服务器间数据同步,因此采用了jgroups进行通信。
这是之前在网上看到的文章,跟着学习了一把。结合上一篇文章,再定义一个实体,cacheType,cacheKey,cacheValue,cacheSeconds就可以用jgroups+Guava cache实现分布式localCache的数据同步。
Jgroups maven依赖:
这是之前在网上看到的文章,跟着学习了一把。结合上一篇文章,再定义一个实体,cacheType,cacheKey,cacheValue,cacheSeconds就可以用jgroups+Guava cache实现分布式localCache的数据同步。
[code=plain]package jgroups; import org.jgroups.*; import org.jgroups.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.InputStream; import java.io.OutputStream; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; /** * Created by ZhaoYun on 2016/5/8. * 节点 */ public class Node extends ReceiverAdapter { private static final Logger LOGGER = LoggerFactory.getLogger(Node.class); /** * 配置文件. */ private static final String CONFIG_XML = "network-tcp.xml"; /** * 集群名称. */ private static final String CLUSTER_NAME = "ZY"; /** * 节点通道. */ private JChannel channel = null; /** * 以此作为节点间初始化的同步数据. */ private Map<String, String> cacheData = new HashMap<String, String>(); private ReentrantLock lock = new ReentrantLock(); public Node(){ InputStream is = this.getClass().getClassLoader().getResourceAsStream(CONFIG_XML); try { channel = new JChannel(is); channel.setReceiver(this); channel.connect(CLUSTER_NAME); channel.getState(null,50000); } catch (Exception e) { System.out.println("启动节点异常!" + e.getMessage()); // 最好是自定义RuntimeException! throw new RuntimeException("启动节点异常!", e); } } /** * 发送消息给目标地址. * @param address 为空表示发给所有节点. * @param object 消息 */ public void sendMsg(Address address,Object object){ Message message = new Message(address,null,object); try{ channel.send(message); }catch (Exception e){ System.out.println("send message error:"+e.getMessage()); throw new RuntimeException("send message error!", e); } } /** * * @param output * @throws Exception */ @Override public void getState(OutputStream output)throws Exception{ lock.lock(); try{ Util.objectToStream(cacheData,new DataOutputStream(output)); }catch (Exception e){ System.out.println("get state error:"+ e.getMessage()); throw new RuntimeException(); }finally { lock.unlock(); } } /** * * @param message */ @Override public void receive(Message message){ //当前节点不接收自己发送到通道当中的消息. if(message.getSrc().equals(channel.getAddress())){ System.out.println(" self "); return; } System.out.println(message.getObject()+ "ZY"+message.getDest()); } /** * * @param inputStream * @throws Exception */ @Override public void setState(InputStream inputStream) throws Exception{ lock.lock(); try{ Map<String, String> cacheData = (Map<String, String>) Util.objectFromStream(new DataInputStream(inputStream)); this.cacheData.putAll(cacheData); }catch (Exception e){ System.out.println("从主节点同步状态到当前节点发生异常!" + e.getMessage()); }finally { lock.unlock(); } } @Override public void viewAccepted(View view) { System.out.println("当前成员[" + this.channel.getAddressAsString() + "]"); System.out.println(view.getCreator()); System.out.println(view.getMembers()); System.out.println("当前节点数据:" + cacheData); } /** * 提供一个简单的初始化数据的方法. * @param key * @param val */ public void addData(String key,String val){ if(key!=null&&!key.isEmpty()){ cacheData.put(key, val); } }
Jgroups maven依赖:
[code=plain]<!-- jgroups start 用于多实例之间信息同步--> <dependency> <groupId>org.jgroups</groupId> <artifactId>jgroups</artifactId> <version>3.5.0.Final</version> </dependency> <!-- jgroups end -->
相关文章推荐
- Android 之MVC模式
- 观察者模式Observable与Observer的运用(单指拖放)
- android ble蓝牙开发略解
- android 蓝牙各种UUID
- 排序算法(三)快速排序
- java HashMap与Hashtable区别
- pycharm配置vagrant环境下调试开发
- 项目常用工具类整理(一)--时间工具类DateUtil.java
- 弹出窗口的两种实现方式 PopupWindow 和 Activity
- 设计模式(一) 单例模式
- 设计模式(二) 简单工厂模式
- 并发编程实战 1.6. 守护线程 - Daemon线程
- 并发编程实战 1.7. 处理运行时异常 - setUncaughtExceptionHand()
- Java方法中的参数是值传递
- 并发编程实战 1.8. 线程中变量的使用 - ThreadLocal
- 并发编程实战 1.9. 线程组 - ThreadGroup
- 并发编程实战 1.10. 线程组处理异常 - 重写uncaughtException()
- [一句秒懂]基本控件圆角问题
- [10秒学会] - iOS(OC) 函数式编程思想
- [10秒学会] - iOS9新特性之常见关键字