您的位置:首页 > 移动开发 > Swift

Swift源码解析之分配Partition

2018-03-30 18:47 603 查看
执行swift-ring-build的rebalance命令,会首先调用下面函数   
def rebalance():
def get_seed(index):
try:
return argv[index]
except IndexError:
pass
devs_changed = builder.devs_changed
try:
last_balance = builder.get_balance()
#调用RingBuilder类的rebalance()函数
parts, balance = builder.rebalance(seed=get_seed(3))
except exceptions.RingBuilderError as e:
print '-' * 79
print("An error has occurred during ring validation. Common\n"
"causes of failure are rings that are empty or do not\n"
"have enough devices to accommodate the replica count.\n"
"Original exception message:\n %s" % e.message
)
print '-' * 79
exit(EXIT_ERROR)
if not parts:
#没有partiton需要移动的情况有两种,一种是经过rebalance的计算,的确没有partition需要移动。
#另外一种是由于min_part_hours参数的限制,在min_part_hours时间内只允许移动一个partition。
print 'No partitions could be reassigned.'
print 'Either none need to be or none can be due to ' \
'min_part_hours [%s].' % builder.min_part_hours
exit(EXIT_WARNING)
if not devs_changed and abs(last_balance - balance) < 1 and \
not (last_balance == MAX_BALANCE and balance == MAX_BALANCE):
print 'Cowardly refusing to save rebalance as it did not change ' \
'at least 1%.'
exit(EXIT_WARNING)
try:
#验证生成的Ring的一致性
builder.validate()
except exceptions.RingValidationError as e:
print '-' * 79
print("An error has occurred during ring validation. Common\n"
"causes of failure are rings that are empty or do not\n"
"have enough devices to accommodate the replica count.\n"
"Original exception message:\n %s" % e.message
)
print '-' * 79
exit(EXIT_ERROR)
print 'Reassigned %d (%.02f%%) partitions. Balance is now %.02f.' % \
(parts, 100.0 * parts / builder.parts, balance)
status = EXIT_SUCCESS
#如果balance的值大于5,这意味着此时的Ring处于一个非常不稳定的状态。
#需要在min_part_hours时间之后,再次rebalance
if balance > 5:
print '-' * 79
print 'NOTE: Balance of %.02f indicates you should push this ' % \
balance
print ' ring, wait at least %d hours, and rebalance/repush.' \
% builder.min_part_hours
print '-' * 79
status = EXIT_WARNING
ts = time()
#将builder文件和reblance之后生成的ring文件存放到backup目录下。
builder.get_ring().save(
pathjoin(backup_dir, '%d.' % ts + basename(ring_file)))
builder.save(pathjoin(backup_dir, '%d.' % ts + basename(argv[1])))
builder.get_ring().save(ring_file)
builder.save(argv[1])
exit(status)
这个函数首先调用swift.common.ring.builder.RingBuilder类的get_balance()函数来获取当前Ring的balance值,这个值标识了一个Ring的平衡程度,也就是健康状态,这个值越高表示这个Ring的balance越需要rebalance。一个健康的Ring的balance的值应该是0Ring的balance的值取决于所有device的balance值,一个device的balance的值指的是超过这个device所希望接纳的partition个数的partition的数量,除以该device所希望接纳的partition的个数,然后乘以100.比如一个device所希望接纳partition个数是123,结果现在它接纳了124个partition,那么这个device的balance的值为(124-123)/123*100=0.83。在这个Ring中,取所有device的balance的值的最大值作为该Ring的balance值。如果Ring没有device的变化(添加或删除),并且rebalance之前和之后的balance的值相差小于1,则认为该Ring不需要rebalance,不会生成新的ring文件。rebalance命令的实际工作仍是由swift.common.ring.builder.RingBuilder类的rebalance()来完成。
#swift/common/ring/builder.py
def rebalance(self, seed=None):
"""
这是Ringbuild的主要功能函数,它会根据设备权重、zone的信息(尽可能地将partition的副本分配到不在一个zone的设备上),
以及近期的分配情况等信息,重新对partition进行分配。
这个函数并不是partition分配的最佳算法(最佳算法会进行更多的分析从而占用更多的时间)
此函数会一直做rebalance操作直到这个Ring的balance值小于1%,或者balance的值变化小于1%
"""
old_replica2part2dev = copy.deepcopy(self._replica2part2dev)
if seed is not None:
random.seed(seed)
self._ring = None
if self._last_part_moves_epoch is None:
#对于新创建的Ring执行_initial_balance()
self._initial_balance()
self.devs_changed = False
return self.parts, self.get_balance()
changed_parts = 0
self._update_last_part_moves()
last_balance = 0
#函数_adjust_replica2part2dev_size()的主要功能是调整设备查询表
#也就是_replica2part2dev数组,使其大小和维度调整为和当前的replicas数量以及partition数量一致
#并且返回需要新添加的partition与replicas列表
new_parts, removed_part_count = self._adjust_replica2part2dev_size()
changed_parts += removed_part_count
if new_parts or removed_part_count:
self._set_parts_wanted()
self._reassign_parts(new_parts)
changed_parts += len(new_parts)
while True:
reassign_parts = self._gather_reassign_parts()
self._reassign_parts(reassign_parts)
changed_parts += len(reassign_parts)
while self._remove_devs:
self.devs[self._remove_devs.pop()['id']] = None
balance = self.get_balance()
if balance < 1 or abs(last_balance - balance) < 1 or \
changed_parts == self.parts:
break
last_balance = balance
self.devs_changed = False
self.version += 1
changed_parts = 0
for rep_id, _rep in enumerate(self._replica2part2dev):
for part_id, new_device in enumerate(_rep):
# IndexErrors will be raised if the replicas are increased or
# decreased, and that actually means the partition has changed
try:
old_device = old_replica2part2dev[rep_id][part_id]
except IndexError:
changed_parts += 1
continue
if old_device != new_device:
changed_parts += 1
return changed_parts, balance
如果是新创建的Ring,那么控制逻辑进入到_initial_balance()函数中。如果不是新创建的Ring,会直接调用_reassign_parts(),反复对partitions进行重新分配,直到balance值小于1为止(或者两次rebalance操作得到的balance值的变化小于1%,或者所有的partition都已经移动过一次)。新建Ring和非新建Ring的rebalance操作过程类似,区别只是在于新建Ring的情况在_initial_balance()函数里做了初始化工作。
def _initial_balance(self):
"""
Initial partition assignment is the same as rebalancing an
existing ring, but with some initial setup beforehand.
"""
self._last_part_moves = array('B', (0 for _junk in xrange(self.parts)))
self._last_part_moves_epoch = int(time())
self._set_parts_wanted()
self._reassign_parts(self._adjust_replica2part2dev_size()[0])
对于新建Ring的情况,_adjust_replica2part2dev_size()所返回的tuple的第一个值是一个这样的数组,该数组的每一个元素是一个(partition,replica)的tuple,表示该partition的replica需要被重新分配到device上去。_reassign_parts()函数以上面返回的数组为参数进行分配。_reassign_parts()首先将设备按照每个设备还想接收多少个partiton(根据weight)来排队,并且在rebalance的过程中一直按照这个标准队列,然后遍历整个partition的列表,将partition分配到想接收partition最多的设备上,并且同时保证这个partition的副本之间距离最远。所谓距离最远指的是副本之间尽可能地在不同的region里面,如果一定要在同一个region里面,尽可能地分配到不同zone里面,如果没有满足的情况,就尽可能地分配到具有不同的(IP地址、端口)的设备上。Rebalance()最后将新生成的Ring文件保存以及备份,整个Ring生成过程就完成了。Ring rebalance的过程仅仅是生成不同的Ring文件,也就是修改partition到device的映射,最终partition的移动是由Replicator进程来完成的。参考https://blog.csdn.net/wytdahu/article/details/45672195
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  swift
相关文章推荐