Solr4.7源码分析-启动篇之Solr Cloud(二)——solr选举机制
2014-07-31 22:05
483 查看
接前一篇文章
http://blog.csdn.net/wenchanter/article/details/37915127
solr cloud启动时,会为观察节点选出leader,在深入solr选举机制前,先梳理下ClusterState.json的层次结构和在创建这个json文件时用到的代码结构。
最上层是ClusterState实例,里面的属性:
![](http://img.blog.csdn.net/20140731222631826)
ClusterState包含有collectionStates,是个Map,value是DocCollection,这个DocCollection可视为autocpltCollection那层:
![](http://img.blog.csdn.net/20140731222639938)
DocCollection包含slices,Slice可以视为shard1,shard2那一层:
![](http://img.blog.csdn.net/20140731222426375)
Slice包含replicas,Replica可以视为core_node1那层:
![](http://img.blog.csdn.net/20140731222433234)
接下来是solr的选举逻辑,在ZkController中调用init方法时,有leader选举的一段代码:
LeaderElector的类注释本身已经说的比较详细:
这个setUp只是创建了选举的路径/overseer_elect/election
(待续)
http://blog.csdn.net/wenchanter/article/details/37915127
solr cloud启动时,会为观察节点选出leader,在深入solr选举机制前,先梳理下ClusterState.json的层次结构和在创建这个json文件时用到的代码结构。
{ "autocpltCollection”:{ -----------------------------------DocCollection "shards":{ "shard1”:{ -----------------------------------Slice "range":"80000000-ffffffff", "state":"active", "replicas":{ "core_node1”:{ -----------------------------------Replica "state":"active", "base_url":"http://solr1.doc.test.163.com:9091/solr", "core":"autocplt", "node_name":"solr1.doc.test.163.com:9091_solr", "leader":"true"}, "core_node3":{ "state":"down", "base_url":"http://solr3.doc.test.163.com:8983/solr", "core":"autocplt", "node_name":"solr3.doc.test.163.com:8983_solr"}}}, "shard2":{ "range":"0-7fffffff", "state":"active", "replicas":{"core_node2":{ "state":"active", "base_url":"http://solr2.doc.test.163.com:8080/solr", "core":"autocplt", "node_name":"solr2.doc.test.163.com:8080_solr", "leader":"true"}}}}, "maxShardsPerNode":"1", "router":{"name":"compositeId"}, "replicationFactor":"1", "autoCreated":"true"}, }
最上层是ClusterState实例,里面的属性:
private Integer zkClusterStateVersion; private final Map<String, DocCollection> collectionStates; // Map<collectionName, Map<sliceName,Slice>> private Set<String> liveNodes; private final ZkStateReader stateReader;
ClusterState包含有collectionStates,是个Map,value是DocCollection,这个DocCollection可视为autocpltCollection那层:
private final String name; private final Map<String, Slice> slices; private final Map<String, Slice> activeSlices; private final DocRouter router; protected final Map<String,Object> propMap; // 这个在父类ZkNodeProps中
DocCollection包含slices,Slice可以视为shard1,shard2那一层:
private final String name; private final DocRouter.Range range; private final Integer replicationFactor; // FUTURE: optional per-slice override of the collection replicationFactor private final Map<String,Replica> replicas; private final Replica leader; private final String state; private final String parent; private final Map<String, RoutingRule> routingRules;
Slice包含replicas,Replica可以视为core_node1那层:
接下来是solr的选举逻辑,在ZkController中调用init方法时,有leader选举的一段代码:
overseerElector = new LeaderElector(zkClient); this.overseer = new Overseer(shardHandler, adminPath, zkStateReader); ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName()); overseerElector.setup(context); overseerElector.joinElection(context, false);其中先初始化一个leader选举器LeaderElector:
LeaderElector的类注释本身已经说的比较详细:
/** * Leader Election process. This class contains the logic by which a * leader is chosen. First call * {@link #setup(ElectionContext)} to ensure * the election process is init'd. Next call * {@link #joinElection(ElectionContext, boolean)} to start the leader election. * * The implementation follows the classic ZooKeeper recipe of creating an * ephemeral, sequential node for each candidate and then looking at the set * of such nodes - if the created node is the lowest sequential node, the * candidate that created the node is the leader. If not, the candidate puts * a watch on the next lowest node it finds, and if that node goes down, * starts the whole process over by checking if it's the lowest sequential node, etc. * */看下overseerElector.setup(context);
这个setUp只是创建了选举的路径/overseer_elect/election
/** * Set up any ZooKeeper nodes needed for leader election. */ public void setup(final ElectionContext context) throws InterruptedException, KeeperException { this.context = context; String electZKPath = context.electionPath + LeaderElector.ELECTION_NODE; zkCmdExecutor.ensureExists(electZKPath, zkClient); }之后看下overseerElector.joinElection(context,false);
/** * Begin participating in the election process. Gets a new sequential number * and begins watching the node with the sequence number before it, unless it * is the lowest number, in which case, initiates the leader process. If the * node that is watched goes down, check if we are the new lowest node, else * watch the next lowest numbered node. * * @return sequential node number */ public int joinElection(ElectionContext context, boolean replacement) throws KeeperException, InterruptedException, IOException { // 这个调用的OverseerElectionContext.joinedElectionFired,里面就是个Overseer的close,还不知何用 context.joinedElectionFired(); // overseer_elect/election final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE; long sessionId = zkClient.getSolrZooKeeper().getSessionId(); String id = sessionId + "-" + context.id; String leaderSeqPath = null; boolean cont = true; int tries = 0; while (cont) { try { leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null, CreateMode.EPHEMERAL_SEQUENTIAL, false); context.leaderSeqPath = leaderSeqPath; cont = false; } catch (ConnectionLossException e) { // we don't know if we made our node or not... List<String> entries = zkClient.getChildren(shardsElectZkPath, null, true); boolean foundId = false; for (String entry : entries) { String nodeId = getNodeId(entry); if (id.equals(nodeId)) { // we did create our node... foundId = true; break; } } if (!foundId) { cont = true; if (tries++ > 20) { throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); } try { Thread.sleep(50); } catch (InterruptedException e2) { Thread.currentThread().interrupt(); } } } catch (KeeperException.NoNodeException e) { // we must have failed in creating the election node - someone else must // be working on it, lets try again if (tries++ > 20) { throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); } cont = true; try { Thread.sleep(50); } catch (InterruptedException e2) { Thread.currentThread().interrupt(); } } } // 取最后面的数字 int seq = getSeq(leaderSeqPath); checkIfIamLeader(seq, context, replacement); return seq; }getSeq是取-n_下面的数字,然后转成int
/** * Returns int given String of form n_0000000001 or n_0000000003, etc. * * @return sequence number */ public static int getSeq(String nStringSequence) { int seq = 0; Matcher m = LEADER_SEQ.matcher(nStringSequence); if (m.matches()) { seq = Integer.parseInt(m.group(1)); } else { throw new IllegalStateException("Could not find regex match in:" + nStringSequence); } return seq; }然后通过checkIfIamLeader方法判断自己是否是leader
/** * Check if the candidate with the given n_* sequence number is the leader. * If it is, set the leaderId on the leader zk node. If it is not, start * watching the candidate that is in line before this one - if it goes down, check * if this candidate is the leader again. * * @param replacement has someone else been the leader already? */ private void checkIfIamLeader(final int seq, final ElectionContext context, boolean replacement) throws KeeperException, InterruptedException, IOException { // leader changed - close the overseer context.checkIfIamLeaderFired(); // get all other numbers… // overseer_elect/election final String holdElectionPath = context.electionPath + ELECTION_NODE; List<String> seqs = zkClient.getChildren(holdElectionPath, null, true); // 依照最后的数字排序 sortSeqs(seqs); // 截取后面的数字,变成Integer的list List<Integer> intSeqs = getSeqs(seqs); if (intSeqs.size() == 0) { log.warn("Our node is no longer in line to be leader"); return; } if (seq <= intSeqs.get(0)) { // first we delete the node advertising the old leader in case the ephem is still there try { zkClient.delete(context.leaderPath, -1, true); } catch(Exception e) { // fine } // runIamLeaderProcess(context, replacement); } else { // I am not the leader - watch the node below me int i = 1; for (; i < intSeqs.size(); i++) { int s = intSeqs.get(i); if (seq < s) { // we found who we come before - watch the guy in front break; } } // 数组index-2,找到前面的那个数字 int index = i - 2; if (index < 0) { log.warn("Our node is no longer in line to be leader"); return; } try { // 在前面的一个数字的节点路径上注册一个getData的watcher,当前面的那个节点发生变化时,触发判断自己是不是leader zkClient.getData(holdElectionPath + "/" + seqs.get(index), new Watcher() { @Override public void process(WatchedEvent event) { // session events are not change events, // and do not remove the watcher if (EventType.None.equals(event.getType())) { return; } // am I the next leader? try { checkIfIamLeader(seq, context, true); } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); log.warn("", e); } catch (IOException e) { log.warn("", e); } catch (Exception e) { log.warn("", e); } } }, null, true); } catch (KeeperException.SessionExpiredException e) { throw e; } catch (KeeperException e) { log.warn("Failed setting watch", e); // we couldn't set our watch - the node before us may already be down? // we need to check if we are the leader again checkIfIamLeader(seq, context, true); } } }排序:
/** * Sort n string sequence list. */ public static void sortSeqs(List<String> seqs) { Collections.sort(seqs, new Comparator<String>() { @Override public int compare(String o1, String o2) { return Integer.valueOf(getSeq(o1)).compareTo( Integer.valueOf(getSeq(o2))); } }); }取得Seqs:
/** * Returns int list given list of form n_0000000001, n_0000000003, etc. * * @return int seqs */ private List<Integer> getSeqs(List<String> seqs) { List<Integer> intSeqs = new ArrayList<Integer>(seqs.size()); for (String seq : seqs) { intSeqs.add(getSeq(seq)); } return intSeqs; }当判定自己是leader时,执行这个方法,里面调用的是context的runLeaderProcess方法:
// TODO: get this core param out of here protected void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement) throws KeeperException, InterruptedException, IOException { context.runLeaderProcess(weAreReplacement,0); }看下runLeaderProcess:
@Override void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs) throws KeeperException, InterruptedException { log.info("I am going to be the leader {}", id); final String id = leaderSeqPath .substring(leaderSeqPath.lastIndexOf("/") + 1); ZkNodeProps myProps = new ZkNodeProps("id", id); // 创建节点,在“/overseer_elect/leader/”中写入id zkClient.makePath(leaderPath, ZkStateReader.toJSON(myProps), CreateMode.EPHEMERAL, true); if(pauseBeforeStartMs >0){ try { Thread.sleep(pauseBeforeStartMs); } catch (InterruptedException e) { Thread.interrupted(); log.warn("Wait interrupted ", e); } } overseer.start(id); }
(待续)
相关文章推荐
- Solr4.7源码分析-启动篇之Solr Cloud(一)
- Solr4.7源码分析-启动篇(四)
- Solr4.7源码分析-启动篇(三)
- solrcloud集群启动管理过程基于源码的分析
- Solr4.7源码分析-启动篇(一)
- SolrCloud查询源码分析以及通信机制
- Solr4.8.0源码分析(19)之缓存机制(二)
- Solr4.8.0源码分析(24)之SolrCloud的Recovery策略(五)
- Solr4.8.0源码分析(16)之SolrCloud索引深入(3)
- solrCloud选举leader的逻辑分析
- Tomcat源码分析(七)--单一启动/关闭机制(生命周期)
- Solr4.8.0源码分析(23)之SolrCloud的Recovery策略(四)
- Solr4.8.0源码分析(17)之SolrCloud索引深入(4)
- Solr4.8.0源码分析(21)之SolrCloud的Recovery策略(二)
- Tomcat源码分析(七)--单一启动/关闭机制(生命周期)
- Solr4.8.0源码分析(22)之SolrCloud的Recovery策略(三)
- CloudFoundry warden 启动源码分析
- solrCloud选举leader的逻辑分析
- Tomcat源码分析(七)--单一启动/关闭机制(生命周期)
- Solr4.8.0源码分析(15) 之 SolrCloud索引深入(2)