您的位置:首页 > 其它

使用Zookeeper实现Leader(Master)选举

2017-11-13 15:39 447 查看
使用Zookeeper实现Leader(Master)选举

http://blog.csdn.net/MassiveStars/article/details/53894551

应用场景

分布式系统最典型的架构就是一主多从。在很多时候,虽然处理大规模的数据、图像和文件等,这种工作极其耗资源而且数据、文件等都是共享的,若全部机器都计算处理一次会浪费保贵的计算资源;我们可以把这些工作交给一台机器处理,其它机器则通过数据库、分布式文件系统等方式共享计算成果Leader(Master)。另外,对于数据库、缓存等组件读写分离是惯用的提高性能的方式;读写分离是把写全部给leader(master),查询则使用follower的机器。使用Zookeeper提供的API可轻松实现leader选举。

具体步骤

1、客户端连接时,在指定的目录(这里假定为"/leader")创建一个EPHEMERAL_SEQUENTIAL的节点,把内网的IP数据存入创建节点。

2、获取目录的子点节,并取得序列号最小的节点,我们把这个节点设置为leader。当此节点被删除时,证明leader断线。

3、其它机器监听leader节点,当leader节点的删除时,再取目录的最小子点节作为leader。

详细代码

zk客户端工具类

[java] view plain copy print?

package org.massive.common;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;

/**

* Created by Massive on 2016/12/18.

*/

public class ZooKeeperClient {

private static String connectionString = "localhost:2181";

private static int sessionTimeout = 10000;

public static ZooKeeper getInstance() throws IOException, InterruptedException {

//--------------------------------------------------------------

// 为避免连接还未完成就执行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss)

// 这里等Zookeeper的连接完成才返回实例

//--------------------------------------------------------------

final CountDownLatch connectedSignal = new CountDownLatch(1);

ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {

@Override

public void process(WatchedEvent event) {

if (event.getState() == Event.KeeperState.SyncConnected) {

connectedSignal.countDown();

}

}

});

connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS);

return zk;

}

public static int getSessionTimeout() {

return sessionTimeout;

}

public static void setSessionTimeout(int sessionTimeout) {

ZooKeeperClient.sessionTimeout = sessionTimeout;

}

}

LeaderElection

[java] view plain copy print?

package org.massive.group;

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.Stat;

import org.massive.common.ZooKeeperClient;

import java.io.IOException;

import java.io.UnsupportedEncodingException;

import java.net.*;

import java.util.*;

import java.util.concurrent.CountDownLatch;

/**

* Created by Massive on 2016/12/25.

*/

public class LeaderElection {

private ZooKeeper zk;

private int sessionTimeout;

private static byte[] DEFAULT_DATA = {0x12,0x34};

private static Object mutex = new Object();

private static String ROOT = "/leader";

private byte[] localhost = getLocalIpAdressBytes();

private String znode;

private static CountDownLatch firstElectionSignal = new CountDownLatch(1);

//----------------------------------------------------

// leader的IP地址

//----------------------------------------------------

private static String leader;

public LeaderElection() throws IOException, InterruptedException, KeeperException {

this.zk = ZooKeeperClient.getInstance();

this.sessionTimeout = zk.getSessionTimeout();

ensureExists(ROOT);

ensureLocalNodeExists();

System.out.println("-------------------------------------");

System.out.println("local IP: " + getLocalIpAddress());

System.out.println("local created node: " + znode);

System.out.println("-------------------------------------");

}

/**

* 检查本机是否已创建节点,不存在则创建

* @throws KeeperException

* @throws InterruptedException

*/

public void ensureLocalNodeExists() throws KeeperException, InterruptedException {

List<String> list = zk.getChildren(ROOT,new NodeDeleteWatcher());

for (String node : list) {

Stat stat = new Stat();

String path = ROOT + "/" + node;

byte[] data = zk.getData(path,false,stat);

if (Arrays.equals(data,localhost)) {

znode = path;

return;

}

}

znode = zk.create(ROOT + "/", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);

}

public void ensureExists(String path) {

try {

Stat stat = zk.exists(path, false);

if (stat == null) {

zk.create(path, DEFAULT_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

}

} catch (KeeperException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

public void start() throws KeeperException, InterruptedException, UnsupportedEncodingException {

do{

synchronized (mutex) {

System.out.println("begin leader election...");

List<String> nodes = zk.getChildren(ROOT, null);

SortedSet<String> sortedNode = new TreeSet<String>();

for (String node : nodes) {

sortedNode.add(ROOT + "/" + node);

}

//----------------------------------------------------

// 取出序列号最小的消息

//----------------------------------------------------

String first = sortedNode.first();

leader = first;

//----------------------------------------------------

// 监控序列最小节点(非本机创建节点)

//----------------------------------------------------

NodeDeleteWatcher watcher = znode.equals(first) ? null : new NodeDeleteWatcher();

byte[] data = zk.getData(first, watcher, null);

leader = new String(data, "UTF-8");

System.out.println("leader election end, the leader is : " + leader);

if (firstElectionSignal.getCount() != 0) {

firstElectionSignal.countDown();

}

if (znode.equals(first)) {

return;

}

mutex.wait();

}

} while (true);

}

class NodeDeleteWatcher implements Watcher {

@Override

public void process(WatchedEvent event) {

if (event.getType() == Event.EventType.NodeDeleted) {

synchronized (mutex) {

mutex.notify();

}

}

}

}

/**

* 获取本地内网IP地址

* @return

* @throws SocketException

*/

public static String getLocalIpAddress() throws SocketException {

// 获得本机的所有网络接口

Enumeration<NetworkInterface> nifs = NetworkInterface.getNetworkInterfaces();

while (nifs.hasMoreElements()) {

NetworkInterface nif = nifs.nextElement();

// 获得与该网络接口绑定的 IP 地址,一般只有一个

Enumeration<InetAddress> addresses = nif.getInetAddresses();

while (addresses.hasMoreElements()) {

InetAddress addr = addresses.nextElement();

String ip = addr.getHostAddress();

// 只关心 IPv4 地址

if (addr instanceof Inet4Address && !"127.0.0.1".equals(ip)) {

return ip;

}

}

}

return null;

}

public static byte[] getLocalIpAdressBytes() throws SocketException {

String ip = getLocalIpAddress();

return ip == null ? null : ip.getBytes();

}

public static String getLeader() throws InterruptedException {

//----------------------------------------------------

// 第一次leader选举完成后才释放阀门

//----------------------------------------------------

firstElectionSignal.await();

return leader;

}

public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

//-------------------------------------------------------

// 启动一条线程用处理leader选举

//-------------------------------------------------------

new Thread(new Runnable() {

@Override

public void run() {

try {

LeaderElection leaderElection = new LeaderElection();

leaderElection.start();

} catch (KeeperException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

}

}).start();

String leader = LeaderElection.getLeader();

}

}

测试

运行main方法,本机器的某次输出结果

[plain] view plain copy print?

-------------------------------------

local IP: 192.168.1.103

local created node: /leader/0000000017

-------------------------------------

begin leader election...

leader election end, the leader is : 192.168.1.103

注:本文只进行了简单的测试,要在生产环境中使用请先同时多在个JVM上进行测试。

参考文章:

http://zookeeper.apache.org/doc/r3.4.9/recipes.html

本文为原创,转载请注明出处http://blog.csdn.net/massivestars/article/details/53894551

ZooKeeper实现分布式锁和队列可参考:

使用Zookeeper实现分布式锁

使用ZooKeeper实现队列
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: