Codis源码解析——slot的分配
2017-08-09 20:30
471 查看
上一篇我们给codis集群中添加了codis-server,接下来就是把1024个slot分配给每个codis-server。Codis给我们提供了多种方式,可以将指定序号的slot移到某个group,也可以将一个group中的多少个slot移动到另一个group。不过最方便的方式就是通过自动rebalance。
首先看一下Slot的结构,可以看到,每个Slot都分配了其所属的BackendAddr。了解结构之后,我们就大概猜到Slot分配过程中需要做什么了
那么,这个自动rebalance的过程是怎么样的呢?我们本地有两个group,就以这两个group为例进行说明
最后一步zk更新路径之后,我们就可以在zk中看到slot被挂到了相应的group下
说明
如有转载,请注明出处
http://blog.csdn.net/antony9118/article/details/77016335
首先看一下Slot的结构,可以看到,每个Slot都分配了其所属的BackendAddr。了解结构之后,我们就大概猜到Slot分配过程中需要做什么了
type Slot struct { id int lock struct { hold bool sync.RWMutex } refs sync.WaitGroup switched bool //migrate表示从何处迁移 backend, migrate struct { id int bc *sharedBackendConn } replicaGroups [][]*sharedBackendConn method forwardMethod }
1 自动rebalance
那么,这个自动rebalance的过程是怎么样的呢?我们本地有两个group,就以这两个group为例进行说明
//第一次进入的时候传入的confirm是false,因为只是指定rebalance的plan。当页面上弹窗问是否迁移的时候,点了OK,又会进入这个方法,传入confirm为true func (s *Topom) SlotsRebalance(confirm bool) (map[int]int, error) { s.mu.Lock() defer s.mu.Unlock() //获取上下文,这个步骤前面几篇都有了,就不再赘述了 ctx, err := s.newContext() if err != nil { return nil, err } var groupIds []int for _, g := range ctx.group { if len(g.Servers) != 0 { groupIds = append(groupIds, g.Id) } } //升序排序,结果是[1,2] sort.Ints(groupIds) if len(groupIds) == 0 { return nil, errors.Errorf("no valid group could be found") } var ( //已分配好,不需要再迁移的,键是groupId,值是当前group不需迁移的slot的数量 assigned = make(map[int]int) //等待分配的,键是groupId,值是当前group等待分配的slot的id组成的切片 pendings = make(map[int][]int) //可以迁出的,键是groupId,值是当前group可以迁出的slot的数量。如果是负数,就表明当前group需要迁入多少个slot moveout = make(map[int]int) //确定要迁移的slot,int切片,每个元素就是确定要迁移的slot的id docking []int ) //计算某个group中槽的数量的方法,固定属于该组的,加上待分配的,再减去要迁移出去的 var groupSize = func(gid int) int { return assigned[gid] + len(pendings[gid]) - moveout[gid] } //如果槽的Action.State等于空字符串,才可以迁移。否则不能迁移 for _, m := range ctx.slots { if m.Action.State != models.ActionNothing { //如果这个槽处于迁移的过程中,就直接归属于targetId的组 assigned[m.Action.TargetId]++ } } //这里是1024/2=512 var lowerBound = MaxSlotNum / len(groupIds) //遍历槽,如果槽所属的group的size小于512,这个槽也不需要迁移 for _, m := range ctx.slots { if m.Action.State != models.ActionNothing { continue } if m.GroupId != 0 { if groupSize(m.GroupId) < lowerBound { assigned[m.GroupId]++ } else { //表示当前槽可以等待分配 pendings[m.GroupId] = append(pendings[m.GroupId], m.Id) } } } //传入一个自定义的比较器,新建红黑树。这个比较器的结果是,红黑树中左边的节点的group size小于右边的 var tree = rbtree.NewWith(func(x, y interface{}) int { var gid1 = x.(int) var gid2 = y.(int) if gid1 != gid2 { if d := groupSize(gid1) - groupSize(gid2); d != 0 { return d } return gid1 - gid2 } return 0 }) for _, gid := range groupIds { tree.Put(gid, nil) } //将不属于任何group和Action.State为""的slot(被称为offline的slot,初始阶段所有slot都是offline的),分配给目前size最小的group for _, m := range ctx.slots { if m.Action.State != models.ActionNothing { continue } if m.GroupId != 0 { continue } //得到整个树最左边节点的键,也就是size最小的group的id dest := tree.Left().Key.(int) tree.Remove(dest) //当前节点要进行迁移 docking = append(docking, m.Id) moveout[dest]-- tree.Put(dest, nil) } //在我们这个例子里面也是512。如果是上限,9999个group,这个值就是1 var upperBound = (MaxSlotNum + len(groupIds) - 1) / len(groupIds) // 当集群中group的数量大于2(上限是9999),红黑树的rebalance。在group size差距最大的两个组之间做迁移准备工作 //from和dest分别是红黑树最左和最右的两个group,换句话说,slot之间的补给传递,都是先比较当前groupsize最大的和最小的组 for tree.Size() >= 2 { //group size最大的groupId from := tree.Right().Key.(int) tree.Remove(from) if len(pendings[from]) == moveout[from] { continue } dest := tree.Left().Key.(int) tree.Remove(dest) var ( fromSize = groupSize(from) destSize = groupSize(dest) ) if fromSize <= lowerBound { break } if destSize >= upperBound { break } if d := fromSize - destSize; d <= 1 { break } moveout[from]++ moveout[dest]-- tree.Put(from, nil) tree.Put(dest, nil) } //moveout的键值对分别是1和2,值都是-512。表明两个group都需要迁入512个slot for gid, n := range moveout { if n < 0 { continue } if n > 0 { sids := pendings[gid] sort.Sort(sort.Reverse(sort.IntSlice(sids))) docking = append(docking, sids[0:n]...) pendings[gid] = sids[n:] } delete(moveout, gid) } //docking升序排列,结果是0到1023 sort.Ints(docking) //键是slot的id,值是这个slot要迁移到的group的id var plans = make(map[int]int) //找到需要迁入slot的group,也就是moveout[gid]为负数的group,从docking中的第一个元素开始迁移到这个group for _, gid := range groupIds { var in = -moveout[gid] for i := 0; i < in && len(docking) != 0; i++ { plans[docking[0]] = gid //docking去除刚刚分配了的首元素 docking = docking[1:] } } if !confirm { return plans, nil } //只有弹窗点击OK,方法才会走到这里。现在开始执行plan中的规划 var slotIds []int for sid, _ := range plans { slotIds = append(slotIds, sid) } sort.Ints(slotIds) for _, sid := range slotIds { m, err := ctx.getSlotMapping(sid) if err != nil { return nil, err } defer s.dirtySlotsCache(m.Id) m.Action.State = models.ActionPending //每一个Slot的Action.Index都是其slotId+1 m.Action.Index = ctx.maxSlotActionIndex() + 1 m.Action.TargetId = plans[sid] //这里就是在zk中更新路径 if err := s.storeUpdateSlotMapping(m); err != nil { return nil, err } } return plans, nil } func (ctx *context) getSlotMapping(sid int) (*models.SlotMapping, error) { if len(ctx.slots) != MaxSlotNum { return nil, errors.Errorf("invalid number of slots = %d/%d", len(ctx.slots), MaxSlotNum) } if sid >= 0 && sid < MaxSlotNum { return ctx.slots[sid], nil } return nil, errors.Errorf("slot-[%d] doesn't exist", sid) } type SlotMapping struct { Id int `json:"id"` GroupId int `json:"group_id"` Action struct { Index int `json:"index,omitempty"` State string `json:"state,omitempty"` TargetId int `json:"target_id,omitempty"` } `json:"action"` } const ( ActionNothing = "" ActionPending = "pending" ActionPreparing = "preparing" ActionPrepared = "prepared" ActionMigrating = "migrating" ActionFinished = "finished" ActionSyncing = "syncing" )
最后一步zk更新路径之后,我们就可以在zk中看到slot被挂到了相应的group下
2 手动迁移
另外两种迁移槽的方式,即我们在开头说过的,一是指定序号的slot移到某个group,二是将一个group中的多少个slot移动到另一个group,主要流程类似,先取出当前集群的上下文,然后根据请求参数做校验,将符合迁移条件的slot放到一个pending切片里面,接下去更新zk,就不专门做介绍了。说明
如有转载,请注明出处
http://blog.csdn.net/antony9118/article/details/77016335
相关文章推荐
- Codis源码解析——处理slot操作(1)
- Codis源码解析——处理slot操作(2)
- Codis源码解析——proxy的启动
- Swift源码解析之分配Partition
- Codis源码解析——proxy添加到集群
- codis-ha源码解析
- Codis源码解析——dashboard的启动(1)
- Codis源码解析——dashboard的启动(2)
- Redis源码解析:26集群(二)键的分配与迁移
- Codis源码解析——sentinel的重同步(1)
- Codis源码解析——codis-server添加到集群
- 分布式缓存技术redis学习系列(八)——JedisCluster源码解读:集群初始化、slot(槽)的分配、值的存取
- Hadoop源码解析之申请与分配Container
- [Spark内核] 第31课:Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- JedisCluster源码解读:集群初始化、slot(槽)的分配、值的存取
- Codis源码解析——fe的启动
- Codis源码解析——proxy监听redis请求
- [图解tensorflow源码] [转载] tensorflow设备内存分配算法解析 (BFC算法)
- Codis源码解析——sentinel的重同步(2)
- Spark资源调度分配内幕解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结