您的位置:首页 > 编程语言

分布式缓存系统TAIR代码分析二

2017-08-10 21:19 246 查看
在 分布式缓存系统TAIR代码分析一中,介绍了configserver初始化后的工作状况。本文将详细介绍configserver如何创建对照表。

 

2.5 创建对照表策略:

2.5.1 概述

在Tair系统中,对照表是一个非常重要的概念。Tair使用改进后的一致性hash算法来确保数据在数据服务器上趋于均匀分布,同时获取良好的可扩展性。Tair将数据按bucket为单位,存放在存储节点上,每一个节点可以存储多个bucket.对于每一个bucket而言,可能会存在多个备份。若在多备份的情况下,其中一个bucket称为master
bucket, 其他的称为slave bucket. 客户端会拿到一张表,该表记录了每一个bucket存储的节点的地址。那么,对于一个<key, value>而言,通过对hash(key)%count_of_bucket运算,获取具体的bucket, 通过表,查找backet所在的地址。接下来的事情,就是客户端将这个KV存储到这个地址对应的节点中对应的bucket中。而这个表,就称为对照表。

实际上,对照表建立了Key与其存储位置之间的映射。当作为存储数据的节点发生变化的时候,就必须重建对照表。

在tair的配置文件中,有对照表创建策略的配置项,即_build_strategy 。configserver根据_build_strategy的取值不同,而执行不同的创建对照表策略。目前提供两种策略. 配置为1 则是负载均衡优先, 分配的时候尽量让各个 data server 的负载均衡. 配置为 2 的时候, 是位置安全优先, 会尽量将一份数据的不同备份分配到不同机架的机器上. 配置为3的时候,如果服务器分布在多个机器上,那么会优先使用位置安全优先,即策略2.
如果服务器只在一个机架上,那么退化成策略1,只按负载分布[1].

实际上,Tair提供了2中构建对照表的方法。对于每一种方法,构建对照表的流程是相同的。对于不同构建策略,每一个节点上分配的bucket数量的分配方式是不同的。这一点很容易理解。另外,对于一个给定的buket, 为其选择一个合适的存储节点的策略也是不同的。例如,考虑位置安全优先的对照表构建策略中,buket和其备份bucket应该尽可能分布在不同机房。

本文先详细叙述在负载均衡优先的构建对照表策略下,configserver如何构建对照表。然后,在叙述位置安全优先构建策略的不同之处。

 

2.5.2 负载均衡优先策略

在该策略下,构建出的对照表期望达到的效果——使得每一个节点分配的bucket数量尽可能的均衡。假设共有B个bucket, 有N个节点, 那么在负载均衡优先的策略下,每个节点最少负责的bucket的个数为:B/N;若B%N 不为0,那么将有B%N个节点将负载B/N+1个bucket。若B%N为0,那就再好不过了,每个节点分配了相同数量的bucket。由此可见,在这种模式下,任意两个节点负责的bucket数量至多相差1。

configserver通过以下步骤来构建对照表:

1)根据当前对照表,统计出每一个alive节点上存储的bucket数量。假设有节点A,B,C和D, 则形成数据如下的map:

| A , Xa |

| B,  Xb |

| C,  Xc |

| D,  Xd |

其中Xa表示节点A上存储的bucket数量,Xb,Xc,Xd亦然。显然,在启动时 Xa = Xb = Xc = Xd = 0;

2) 根据 1)中生成的map 构建一个 index map (实在不知道如何称呼了),即对于hold相同数量bucket的节点存放在同一列表里,举例如下:

| n1, [A, B] |

| n2, [C   ] |

| n3, [D   ] |

若在1)中Xa == Xb == n1, 那么形成一个列表 [A, B];

3) 给每一个节点分配存储bucket的数量。这一步只是确定每一个节点上可以储存多少个bucket, 而不确定到底存储哪一些bucket。 对于不同的构建对照表策略,这一步骤是不同的。

在具体给每个节点分配bucket的过程中,假设某个节点上已经分配了b个bucket, 现在N个节点已经负责了X个bucket, 若b < X/N, 则让该节点负责B/N个bucket, 否则该节点负责B/N+1个节点。这样做的目的是,在负载均衡的前提下,减少数据迁移。由于允许数据存放多备份,对于某个bucket而言,可能会存储copyCount次,也就是说存在copyCount个bucket的内容是完全一样的,这些一样的backet,我们将其中的一个称作为master
bucket, 其余的称为slave bucket。那对于master bucket显然不能存放在过于集中,也需平均分配在每一个节点上。

也类似地构造一个map, 如下:

| A, Ca |

| B, Cb |

| C, Cc |

| D, Cd |

其中,Ca表示节点A最终要存储的bucket总数;

这个步骤代码实现如下:

[cpp] view
plain copy

void table_builder1::caculate_capable()  

{  

  int S = available_server.size();  

  

  //每个节点上平均存放的bucket的数量  

  tokens_per_node_min = bucket_count * copy_count / S;  

  

  //存放B/N+1个bucket的节点数量  

  tokens_per_node_max_count =  

    bucket_count * copy_count - tokens_per_node_min * S;  

  

  //存放B/N个bucket的节点数量  

  tokens_per_node_min_count = S - tokens_per_node_max_count;  

  

  master_tokens_per_node_min = bucket_count / S;  

  master_tokens_per_node_max_count =  

    bucket_count - master_tokens_per_node_min * S;  

  master_tokens_per_node_min_count = S - master_tokens_per_node_max_count;  

  

  log_debug("bucket_count: %u\n copy_count: %u\n tokens_per_node_min: %d\n tokens_per_node_max_count: %d\n "  

      "tokens_per_node_min_count: %d\n master_tokens_per_node_min: %d\n master_tokens_per_node_max_count: %d\n "  

      "master_tokens_per_node_min_count: %d\n",  

      bucket_count, copy_count, tokens_per_node_min, tokens_per_node_max_count, tokens_per_node_min_count,  

      master_tokens_per_node_min, master_tokens_per_node_max_count, master_tokens_per_node_min_count);  

  

  server_capable.clear();  

  master_server_capable.clear();  

  int max_s = 0;  

  int mmax_s = 0;  

  int i = 0;  

  int pre_node_count_min = 0;  

  int pre_mnode_count_min = 0;  

  int size = 0;  

  int sum = 0;  

  

  //计算节点上现有bucket分配情况  

  for(map<server_id_type, int >::iterator it =  

      tokens_count_in_node.begin(); it != tokens_count_in_node.end();  

      it++) {  

    log_debug("tokens_count_in_node, first: %s, second: %d", tbsys::CNetUtil::addrToString(it->first.first).c_str(), it->second);  

    if(it->second != 0) {  

      size++;  

      sum += it->second;  

    }  

  }  

  if(size > 0) {  

    pre_node_count_min = sum / size;  

  }  

  size = sum = 0;  

  for(map<server_id_type, int>::iterator it =  

      mtokens_count_in_node.begin(); it != mtokens_count_in_node.end();  

      it++) {  

    if(it->second != 0) {  

      size++;  

      sum += it->second;  

    }  

  }  

  if(size > 0) {  

    pre_mnode_count_min = sum / size;  

  }  

  

  log_debug("pre_node_count_min: %d, pre_mnode_count_min: %d",  

      pre_node_count_min, pre_mnode_count_min);  

  

  for(server_list_type::const_iterator it = available_server.begin();  

      it != available_server.end(); it++) {  

    int last_node_count = tokens_count_in_node[*it];  

    log_debug("will caculate server %s:%d last token is %d ",  

        tbsys::CNetUtil::addrToString(it->first).c_str(), it->first, last_node_count);  

  

    log_debug("pre_node_count_min = %d pre_mnode_count_min=%d",pre_node_count_min, pre_mnode_count_min);  

    if(last_node_count <= pre_node_count_min) {        //try my best to make every data sever handle tokenPerNode_min buckets.  

  

   

   

      //pre_node_count_min表示现有节点负责的bucket数量的平均值,last_node_count表示该node现在分配的bucket数量  

      log_debug("try to make ist min");  

      int min_s = i - max_s;  

      if(min_s < tokens_per_node_min_count) {  

        server_capable[*it] = tokens_per_node_min;  

      }  

      else {  

        server_capable[*it] = tokens_per_node_min + 1;  

        max_s++;  

      }  

    }  

    else {  

      log_debug("try to make ist max");  

      if(max_s < tokens_per_node_max_count) {  

        server_capable[*it] = tokens_per_node_min + 1;  

        max_s++;  

      }  

      else {  

        server_capable[*it] = tokens_per_node_min;  

      }  

    }  

  

    int last_mnode_count = mtokens_count_in_node[*it];  

    if(last_mnode_count <= pre_mnode_count_min) {        //try my best to make every data sever handle tokenPerNode_min buckets.  

      int mmin_s = i - mmax_s;  

      if(mmin_s < master_tokens_per_node_min_count) {  

        master_server_capable[*it] = master_tokens_per_node_min;  

      }  

      else {  

        master_server_capable[*it] = master_tokens_per_node_min + 1;  

        mmax_s++;  

      }  

    }  

    else {  

      if(mmax_s < master_tokens_per_node_max_count) {  

        master_server_capable[*it] = master_tokens_per_node_min + 1;  

        mmax_s++;  

      }  

      else {  

        master_server_capable[*it] = master_tokens_per_node_min;  

      }  

    }  

    i++;  

  }  

}  

 

4) 计算每一个节点预期可存储bucket的数量;

有了1) ~ 3)步的数据,现在可以计算每一个节点还可以存储的bucket的数量(实现中用负数的绝对值表示)。

5) 确定每一个bucket的存储位置

首先,为master bucket安排存储节点。对于每一个master bucket当前所在的节点X,检查X节点是否满足要求(每一种策略,需要满足的要求也 不相同, 后文叙述)。若不满足要求,则在该bucket的copyCount -1 个slave bucket中选择一个bucket, 将该bucket作为master bucket(这里有可能选择不到,即时存在slave bucket, 也无法选择。因为,要保证节点上的总master
bucket的数量不要超过一个值); 若没有找到,则需要在alive的节点列表中,选择一个合适的节点S。将S节点作为这个master bucket的存储节点。更新统计数据。

其次,为slave bucket 安排存储节点。具体步骤为,首先检查bucket当前的存储节点是否满足要求(若该节点down掉了,显然不满足要求了)。若不满足要求,为其选择一个合适的节点,将该节点作为bucket的存储节点。更新统计数据。

经过上述5)个步骤,已经构造出了完整的对照表了。具体代码如下:

[cpp] view
plain copy

  

//return value 0 build error  1 ok 2 quick build ok  

int table_builder::rebuild_table(const hash_table_type & hash_table_source,  

    hash_table_type & hash_table_result, bool no_quick_table)  

{  

  

   //步骤1)   

  init_token_count(tokens_count_in_node);  

  init_token_count(tokens_count_in_node_now);  

  init_token_count(mtokens_count_in_node);  

  max_count_now = 0;  

  bool need_build_quick_table = false;  

  

  // compute node count: tokens_count_in_node, mtokens_count_in_node  

  for(hash_table_type::const_iterator it = hash_table_source.begin();  

      it != hash_table_source.end(); it++) {  

    const hash_table_line_type & line = it->second;  

    for(uint32_t i = 0; i < bucket_count; i++) {  

      update_node_count(line[i], tokens_count_in_node);  

      log_debug("line[%d]: %d, server: %s, count: %d",  

          i, it->first, tbsys::CNetUtil::addrToString(line[i].first).c_str(), line[i].second);  

      if(it->first == 0) {  

        if(update_node_count(line[i], mtokens_count_in_node) == false) {  

          // almost every time this will happen  

          need_build_quick_table = true;  

        }     

      }     

    }     

  }  

  

  //步骤2)  

  //init count_server, mcount_server  

  build_index(tokens_count_in_node, count_server);  

  build_index(mtokens_count_in_node, mcount_server);  

  

  //步骤3)  

  caculate_capable();  

  

  //步骤4)  

  //init mcandidate_node, scandidate_node  

  init_candidate(mcandidate_node, &master_server_capable, &mcount_server);  

  init_candidate(scandidate_node, &server_capable, &count_server);  

  hash_table_result = hash_table_source;  

  // no_quick_table is always true, will not reach this branch  

  if(need_build_quick_table && !no_quick_table) {  

    if(build_quick_table(hash_table_result)) {  

      return BUILD_QUICK;  

    }     

    log_error("build quick table fail");  

    return BUILD_ERROR;  

  }  

  if(available_server.size() < copy_count) {  

    log_error("rebuild table fail, available size: %u, copy count: %u", available_server.size(), copy_count);  

    return BUILD_ERROR;  

  }  

  //  

  //we will check master first, then other slaves  

  /////////////////////////////////////////////////////////////////////////////////////  

  //checke every node and find out the bad one  

  //a good one must  

  //    1 the buckets this one charge of must not large than tokenPerNode ;  

  //    2 the master buckets this one charge of must not large than masterTokenPerNode;  

  //    3 copys of a same bucket must seperated store in different data server  

  /////////////////////////////////////////////////////////////////////////////////////  

  int i = 0;  

  

  //以下是步骤5)  

  for(hash_table_type::iterator it = hash_table_result.begin();  

      it != hash_table_result.end() && i < 2; it++, i++) {  

    int line_num_out = it->first;  

    for(uint32_t node_idx = 0; node_idx != bucket_count; node_idx++) {  

      for(uint32_t line_num = line_num_out; line_num < copy_count;  

          line_num++) {  

        if(line_num_out == 0 && line_num != 0)  

          continue;  

        int change_type = 0;        //not need migrate  

        server_id_type node_id = hash_table_result[line_num][node_idx];  

        int consider = CONSIDER_ALL;  

        if(line_num_out == 0)  

          consider = CONSIDER_BASE;  

        change_type =  

          is_this_node_OK(node_id, line_num, node_idx, hash_table_result,  

              consider, true);  

        log_debug("change type: %d", change_type);  

        if(change_type == INVALID_NODE) {  

          node_id.first = INVALID_FLAG;  

        }  

  

        if(change_type != 0) {  

          if(line_num == 0) {  

            if(change_master_node(node_idx, hash_table_result, false)) {  

              continue;  

            }  

          }  

          server_id_type old_node = hash_table_result[line_num][node_idx];  

          invaliad_node(line_num, node_idx, hash_table_result);  

          server_id_type suitable_node =  

            get_suitable_node(line_num, node_idx, hash_table_result,  

                old_node);  

          if(suitable_node.first == INVALID_FLAG) {  

            log_error("I am give up, why this happend?\n");  

            return BUILD_ERROR;  

          }  

          update_node(line_num, node_idx, suitable_node,  

              hash_table_result);  

          node_id = suitable_node;  

        }  

        int token_per_node_min = get_tokens_per_node(node_id);  

        if(++tokens_count_in_node_now[node_id] == token_per_node_min + 1) {  

          max_count_now++;  

        }  

        log_debug("token_per_node_min: %d, max_count_now: %d, tokens_count_in_node_now[%s]: %d\n",  

            token_per_node_min, max_count_now,  

            tbsys::CNetUtil::addrToString(node_id.first).c_str(), tokens_count_in_node_now[node_id]);  

      }  

    }  

    if(it == hash_table_result.begin()) {  

      log_debug("first line ok");  

    }  

  }  

  return BUILD_OK;  

}  

2.5.3 位置优先策略构建对照表

后续明天补上吧,今天实在不想写了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: