Swift源码解析之分配Partition
2018-03-30 18:47
603 查看
执行swift-ring-build的rebalance命令,会首先调用下面函数
def rebalance(): def get_seed(index): try: return argv[index] except IndexError: pass devs_changed = builder.devs_changed try: last_balance = builder.get_balance() #调用RingBuilder类的rebalance()函数 parts, balance = builder.rebalance(seed=get_seed(3)) except exceptions.RingBuilderError as e: print '-' * 79 print("An error has occurred during ring validation. Common\n" "causes of failure are rings that are empty or do not\n" "have enough devices to accommodate the replica count.\n" "Original exception message:\n %s" % e.message ) print '-' * 79 exit(EXIT_ERROR) if not parts: #没有partiton需要移动的情况有两种,一种是经过rebalance的计算,的确没有partition需要移动。 #另外一种是由于min_part_hours参数的限制,在min_part_hours时间内只允许移动一个partition。 print 'No partitions could be reassigned.' print 'Either none need to be or none can be due to ' \ 'min_part_hours [%s].' % builder.min_part_hours exit(EXIT_WARNING) if not devs_changed and abs(last_balance - balance) < 1 and \ not (last_balance == MAX_BALANCE and balance == MAX_BALANCE): print 'Cowardly refusing to save rebalance as it did not change ' \ 'at least 1%.' exit(EXIT_WARNING) try: #验证生成的Ring的一致性 builder.validate() except exceptions.RingValidationError as e: print '-' * 79 print("An error has occurred during ring validation. Common\n" "causes of failure are rings that are empty or do not\n" "have enough devices to accommodate the replica count.\n" "Original exception message:\n %s" % e.message ) print '-' * 79 exit(EXIT_ERROR) print 'Reassigned %d (%.02f%%) partitions. Balance is now %.02f.' % \ (parts, 100.0 * parts / builder.parts, balance) status = EXIT_SUCCESS #如果balance的值大于5,这意味着此时的Ring处于一个非常不稳定的状态。 #需要在min_part_hours时间之后,再次rebalance if balance > 5: print '-' * 79 print 'NOTE: Balance of %.02f indicates you should push this ' % \ balance print ' ring, wait at least %d hours, and rebalance/repush.' \ % builder.min_part_hours print '-' * 79 status = EXIT_WARNING ts = time() #将builder文件和reblance之后生成的ring文件存放到backup目录下。 builder.get_ring().save( pathjoin(backup_dir, '%d.' % ts + basename(ring_file))) builder.save(pathjoin(backup_dir, '%d.' % ts + basename(argv[1]))) builder.get_ring().save(ring_file) builder.save(argv[1]) exit(status)这个函数首先调用swift.common.ring.builder.RingBuilder类的get_balance()函数来获取当前Ring的balance值,这个值标识了一个Ring的平衡程度,也就是健康状态,这个值越高表示这个Ring的balance越需要rebalance。一个健康的Ring的balance的值应该是0Ring的balance的值取决于所有device的balance值,一个device的balance的值指的是超过这个device所希望接纳的partition个数的partition的数量,除以该device所希望接纳的partition的个数,然后乘以100.比如一个device所希望接纳partition个数是123,结果现在它接纳了124个partition,那么这个device的balance的值为(124-123)/123*100=0.83。在这个Ring中,取所有device的balance的值的最大值作为该Ring的balance值。如果Ring没有device的变化(添加或删除),并且rebalance之前和之后的balance的值相差小于1,则认为该Ring不需要rebalance,不会生成新的ring文件。rebalance命令的实际工作仍是由swift.common.ring.builder.RingBuilder类的rebalance()来完成。
#swift/common/ring/builder.py def rebalance(self, seed=None): """ 这是Ringbuild的主要功能函数,它会根据设备权重、zone的信息(尽可能地将partition的副本分配到不在一个zone的设备上), 以及近期的分配情况等信息,重新对partition进行分配。 这个函数并不是partition分配的最佳算法(最佳算法会进行更多的分析从而占用更多的时间) 此函数会一直做rebalance操作直到这个Ring的balance值小于1%,或者balance的值变化小于1% """ old_replica2part2dev = copy.deepcopy(self._replica2part2dev) if seed is not None: random.seed(seed) self._ring = None if self._last_part_moves_epoch is None: #对于新创建的Ring执行_initial_balance() self._initial_balance() self.devs_changed = False return self.parts, self.get_balance() changed_parts = 0 self._update_last_part_moves() last_balance = 0 #函数_adjust_replica2part2dev_size()的主要功能是调整设备查询表 #也就是_replica2part2dev数组,使其大小和维度调整为和当前的replicas数量以及partition数量一致 #并且返回需要新添加的partition与replicas列表 new_parts, removed_part_count = self._adjust_replica2part2dev_size() changed_parts += removed_part_count if new_parts or removed_part_count: self._set_parts_wanted() self._reassign_parts(new_parts) changed_parts += len(new_parts) while True: reassign_parts = self._gather_reassign_parts() self._reassign_parts(reassign_parts) changed_parts += len(reassign_parts) while self._remove_devs: self.devs[self._remove_devs.pop()['id']] = None balance = self.get_balance() if balance < 1 or abs(last_balance - balance) < 1 or \ changed_parts == self.parts: break last_balance = balance self.devs_changed = False self.version += 1 changed_parts = 0 for rep_id, _rep in enumerate(self._replica2part2dev): for part_id, new_device in enumerate(_rep): # IndexErrors will be raised if the replicas are increased or # decreased, and that actually means the partition has changed try: old_device = old_replica2part2dev[rep_id][part_id] except IndexError: changed_parts += 1 continue if old_device != new_device: changed_parts += 1 return changed_parts, balance如果是新创建的Ring,那么控制逻辑进入到_initial_balance()函数中。如果不是新创建的Ring,会直接调用_reassign_parts(),反复对partitions进行重新分配,直到balance值小于1为止(或者两次rebalance操作得到的balance值的变化小于1%,或者所有的partition都已经移动过一次)。新建Ring和非新建Ring的rebalance操作过程类似,区别只是在于新建Ring的情况在_initial_balance()函数里做了初始化工作。
def _initial_balance(self): """ Initial partition assignment is the same as rebalancing an existing ring, but with some initial setup beforehand. """ self._last_part_moves = array('B', (0 for _junk in xrange(self.parts))) self._last_part_moves_epoch = int(time()) self._set_parts_wanted() self._reassign_parts(self._adjust_replica2part2dev_size()[0])对于新建Ring的情况,_adjust_replica2part2dev_size()所返回的tuple的第一个值是一个这样的数组,该数组的每一个元素是一个(partition,replica)的tuple,表示该partition的replica需要被重新分配到device上去。_reassign_parts()函数以上面返回的数组为参数进行分配。_reassign_parts()首先将设备按照每个设备还想接收多少个partiton(根据weight)来排队,并且在rebalance的过程中一直按照这个标准队列,然后遍历整个partition的列表,将partition分配到想接收partition最多的设备上,并且同时保证这个partition的副本之间距离最远。所谓距离最远指的是副本之间尽可能地在不同的region里面,如果一定要在同一个region里面,尽可能地分配到不同zone里面,如果没有满足的情况,就尽可能地分配到具有不同的(IP地址、端口)的设备上。Rebalance()最后将新生成的Ring文件保存以及备份,整个Ring生成过程就完成了。Ring rebalance的过程仅仅是生成不同的Ring文件,也就是修改partition到device的映射,最终partition的移动是由Replicator进程来完成的。参考https://blog.csdn.net/wytdahu/article/details/45672195
相关文章推荐
- ReactiveSwift源码解析(十) Lifetime代码实现
- 第31课: Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- ReactiveSwift源码解析(六) SignalProtocol的take(first)与collect()延展实现
- ReactiveSwift源码解析(十一) Atomic的代码实现以及其中的Defer延迟、Posix互斥锁、递归锁
- ReactiveSwift源码解析(一) Event与Observer代码实现
- Redis源码解析:26集群(二)键的分配与迁移
- ReactiveSwift源码解析(三) Signal代码的基本实现
- [APR源码解析]内存分配current_free_index与max_free_index的作用
- Codis源码解析——slot的分配
- GlusterFS源码解析 —— GlusterFS 内存分配方式
- OpenStack Swift源码分析(4)----swift-ring-builder源代码解析之一
- Spark资源调度分配内幕解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- [图解tensorflow源码] [转载] tensorflow设备内存分配算法解析 (BFC算法)
- Swift源码解析之创建Ring文件
- Hadoop源码解析之申请与分配Container
- Swift源码解析之添加设备到Ring
- ReactiveSwift源码解析(九) SignalProducerProtocol延展中的Start、Lift系列方法的代码实现
- [Spark内核] 第31课:Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- ReactiveSwift源码解析(五) SignalProtocol的observe()、Map、Filter延展实现
- ReactiveSwift源码解析(十二) MutableProperty基本代码实现