您的位置:首页 > 其它

Solr4.7源码分析-启动篇之Solr Cloud(二)——solr选举机制

2014-07-31 22:05 483 查看
接前一篇文章
http://blog.csdn.net/wenchanter/article/details/37915127
solr cloud启动时,会为观察节点选出leader,在深入solr选举机制前,先梳理下ClusterState.json的层次结构和在创建这个json文件时用到的代码结构。

{

"autocpltCollection”:{          -----------------------------------DocCollection
"shards":{
"shard1”:{                  -----------------------------------Slice
"range":"80000000-ffffffff",
"state":"active",
"replicas":{
"core_node1”:{          -----------------------------------Replica
"state":"active",
"base_url":"http://solr1.doc.test.163.com:9091/solr",
"core":"autocplt",
"node_name":"solr1.doc.test.163.com:9091_solr",
"leader":"true"},
"core_node3":{
"state":"down",
"base_url":"http://solr3.doc.test.163.com:8983/solr",
"core":"autocplt",
"node_name":"solr3.doc.test.163.com:8983_solr"}}},
"shard2":{
"range":"0-7fffffff",
"state":"active",
"replicas":{"core_node2":{
"state":"active",
"base_url":"http://solr2.doc.test.163.com:8080/solr",
"core":"autocplt",
"node_name":"solr2.doc.test.163.com:8080_solr",
"leader":"true"}}}},
"maxShardsPerNode":"1",
"router":{"name":"compositeId"},
"replicationFactor":"1",
"autoCreated":"true"},
}


最上层是ClusterState实例,里面的属性:

private Integer zkClusterStateVersion;
private final Map<String, DocCollection> collectionStates;  // Map<collectionName, Map<sliceName,Slice>>
private Set<String> liveNodes;
private final ZkStateReader stateReader;




ClusterState包含有collectionStates,是个Map,value是DocCollection,这个DocCollection可视为autocpltCollection那层:
private final String name;
private final Map<String, Slice> slices;
private final Map<String, Slice> activeSlices;
private final DocRouter router;
protected final Map<String,Object> propMap; // 这个在父类ZkNodeProps中




DocCollection包含slices,Slice可以视为shard1,shard2那一层:
private final String name;
private final DocRouter.Range range;
private final Integer replicationFactor;      // FUTURE: optional per-slice override of the collection replicationFactor
private final Map<String,Replica> replicas;
private final Replica leader;
private final String state;
private final String parent;
private final Map<String, RoutingRule> routingRules;




Slice包含replicas,Replica可以视为core_node1那层:



接下来是solr的选举逻辑,在ZkController中调用init方法时,有leader选举的一段代码:

overseerElector = new LeaderElector(zkClient);
this.overseer = new Overseer(shardHandler, adminPath, zkStateReader);
ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName());
overseerElector.setup(context);
overseerElector.joinElection(context, false);
其中先初始化一个leader选举器LeaderElector:

LeaderElector的类注释本身已经说的比较详细:

/**
* Leader Election process. This class contains the logic by which a
* leader is chosen. First call * {@link #setup(ElectionContext)} to ensure
* the election process is init'd. Next call
* {@link #joinElection(ElectionContext, boolean)} to start the leader election.
*
* The implementation follows the classic ZooKeeper recipe of creating an
* ephemeral, sequential node for each candidate and then looking at the set
* of such nodes - if the created node is the lowest sequential node, the
* candidate that created the node is the leader. If not, the candidate puts
* a watch on the next lowest node it finds, and if that node goes down,
* starts the whole process over by checking if it's the lowest sequential node, etc.
*
*/
看下overseerElector.setup(context);

这个setUp只是创建了选举的路径/overseer_elect/election

/**
* Set up any ZooKeeper nodes needed for leader election.
*/
public void setup(final ElectionContext context) throws InterruptedException,
KeeperException {
this.context = context;
String electZKPath = context.electionPath + LeaderElector.ELECTION_NODE;

zkCmdExecutor.ensureExists(electZKPath, zkClient);
}
之后看下overseerElector.joinElection(context,false);

/**
* Begin participating in the election process. Gets a new sequential number
* and begins watching the node with the sequence number before it, unless it
* is the lowest number, in which case, initiates the leader process. If the
* node that is watched goes down, check if we are the new lowest node, else
* watch the next lowest numbered node.
*
* @return sequential node number
*/
public int joinElection(ElectionContext context, boolean replacement) throws KeeperException, InterruptedException, IOException {
// 这个调用的OverseerElectionContext.joinedElectionFired,里面就是个Overseer的close,还不知何用
context.joinedElectionFired();
// overseer_elect/election
final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE;

long sessionId = zkClient.getSolrZooKeeper().getSessionId();
String id = sessionId + "-" + context.id;
String leaderSeqPath = null;
boolean cont = true;
int tries = 0;
while (cont) {
try {
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
CreateMode.EPHEMERAL_SEQUENTIAL, false);
context.leaderSeqPath = leaderSeqPath;
cont = false;
} catch (ConnectionLossException e) {
// we don't know if we made our node or not...
List<String> entries = zkClient.getChildren(shardsElectZkPath, null, true);

boolean foundId = false;
for (String entry : entries) {
String nodeId = getNodeId(entry);
if (id.equals(nodeId)) {
// we did create our node...
foundId  = true;
break;
}
}
if (!foundId) {
cont = true;
if (tries++ > 20) {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
}
try {
Thread.sleep(50);
} catch (InterruptedException e2) {
Thread.currentThread().interrupt();
}
}

} catch (KeeperException.NoNodeException e) {
// we must have failed in creating the election node - someone else must
// be working on it, lets try again
if (tries++ > 20) {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
}
cont = true;
try {
Thread.sleep(50);
} catch (InterruptedException e2) {
Thread.currentThread().interrupt();
}
}
}
// 取最后面的数字
int seq = getSeq(leaderSeqPath);
checkIfIamLeader(seq, context, replacement);

return seq;
}
getSeq是取-n_下面的数字,然后转成int

/**
* Returns int given String of form n_0000000001 or n_0000000003, etc.
*
* @return sequence number
*/
public static int getSeq(String nStringSequence) {
int seq = 0;
Matcher m = LEADER_SEQ.matcher(nStringSequence);
if (m.matches()) {
seq = Integer.parseInt(m.group(1));
} else {
throw new IllegalStateException("Could not find regex match in:"
+ nStringSequence);
}
return seq;
}
然后通过checkIfIamLeader方法判断自己是否是leader

/**
* Check if the candidate with the given n_* sequence number is the leader.
* If it is, set the leaderId on the leader zk node. If it is not, start
* watching the candidate that is in line before this one - if it goes down, check
* if this candidate is the leader again.
*
* @param replacement has someone else been the leader already?
*/
private void checkIfIamLeader(final int seq, final ElectionContext context, boolean replacement) throws KeeperException,
InterruptedException, IOException {
// leader changed - close the overseer
context.checkIfIamLeaderFired();
// get all other numbers…
// overseer_elect/election
final String holdElectionPath = context.electionPath + ELECTION_NODE;
List<String> seqs = zkClient.getChildren(holdElectionPath, null, true);

// 依照最后的数字排序
sortSeqs(seqs);
// 截取后面的数字,变成Integer的list
List<Integer> intSeqs = getSeqs(seqs);
if (intSeqs.size() == 0) {
log.warn("Our node is no longer in line to be leader");
return;
}
if (seq <= intSeqs.get(0)) {
// first we delete the node advertising the old leader in case the ephem is still there
try {
zkClient.delete(context.leaderPath, -1, true);
} catch(Exception e) {
// fine
}

//
runIamLeaderProcess(context, replacement);
} else {
// I am not the leader - watch the node below me
int i = 1;
for (; i < intSeqs.size(); i++) {
int s = intSeqs.get(i);
if (seq < s) {
// we found who we come before - watch the guy in front
break;
}
}
// 数组index-2,找到前面的那个数字
int index = i - 2;
if (index < 0) {
log.warn("Our node is no longer in line to be leader");
return;
}
try {
// 在前面的一个数字的节点路径上注册一个getData的watcher,当前面的那个节点发生变化时,触发判断自己是不是leader
zkClient.getData(holdElectionPath + "/" + seqs.get(index),
new Watcher() {

@Override
public void process(WatchedEvent event) {
// session events are not change events,
// and do not remove the watcher
if (EventType.None.equals(event.getType())) {
return;
}
// am I the next leader?
try {
checkIfIamLeader(seq, context, true);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.warn("", e);
} catch (IOException e) {
log.warn("", e);
} catch (Exception e) {
log.warn("", e);
}
}

}, null, true);
} catch (KeeperException.SessionExpiredException e) {
throw e;
} catch (KeeperException e) {
log.warn("Failed setting watch", e);
// we couldn't set our watch - the node before us may already be down?
// we need to check if we are the leader again
checkIfIamLeader(seq, context, true);
}
}
}
排序:

/**
* Sort n string sequence list.
*/
public static void sortSeqs(List<String> seqs) {
Collections.sort(seqs, new Comparator<String>() {

@Override
public int compare(String o1, String o2) {
return Integer.valueOf(getSeq(o1)).compareTo(
Integer.valueOf(getSeq(o2)));
}
});
}
取得Seqs:

/**
* Returns int list given list of form n_0000000001, n_0000000003, etc.
*
* @return int seqs
*/
private List<Integer> getSeqs(List<String> seqs) {
List<Integer> intSeqs = new ArrayList<Integer>(seqs.size());
for (String seq : seqs) {
intSeqs.add(getSeq(seq));
}
return intSeqs;
}
当判定自己是leader时,执行这个方法,里面调用的是context的runLeaderProcess方法:

// TODO: get this core param out of here
protected void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement) throws KeeperException,
InterruptedException, IOException {
context.runLeaderProcess(weAreReplacement,0);
}
看下runLeaderProcess:

@Override
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs) throws KeeperException,
InterruptedException {
log.info("I am going to be the leader {}", id);
final String id = leaderSeqPath
.substring(leaderSeqPath.lastIndexOf("/") + 1);
ZkNodeProps myProps = new ZkNodeProps("id", id);

// 创建节点,在“/overseer_elect/leader/”中写入id
zkClient.makePath(leaderPath, ZkStateReader.toJSON(myProps),
CreateMode.EPHEMERAL, true);
if(pauseBeforeStartMs >0){
try {
Thread.sleep(pauseBeforeStartMs);
} catch (InterruptedException e) {
Thread.interrupted();
log.warn("Wait interrupted ", e);
}
}

overseer.start(id);
}


(待续)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: