您的位置:首页 > 数据库

NSQ系列之nsqlookupd代码分析四(详解nsqlookupd中的RegitrationDB)

2015-09-02 03:03 561 查看

NSQ系列之nsqlookupd代码分析四(详解nsqlookupd中的RegitrationDB操作方法)

上一章我们大致了解了
nsqlookupd
tcpServer
中的
IOLoop
协议的处理逻辑,里面有提到一个存储
nsqd
PeerInfo
以及
topic
channel
数据信息的
RegitrationDB
的一些操作方法。今天我们就来讲解一下关于
RegitrationDB
的操作方法

废话不多说,直接上代码吧(代码位于nsq/nsqlookupd/regitration_db.go这个文件中)

type RegistrationDB struct {
sync.RWMutex                               //读写锁用于并发操作
registrationMap map[Registration]Producers //定义一个一Regitration为key producer指针的slice为value的map
}

type Registration struct {
Category string
Key      string
SubKey   string
}
type Registrations []Registration

//用于记录client相关信息
type PeerInfo struct {
lastUpdate       int64  //client 心跳包最后接收时间
id               string //client remote address
RemoteAddress    string `json:"remote_address"`
Hostname         string `json:"hostname"`
BroadcastAddress string `json:"broadcast_address"`
TCPPort          int    `json:"tcp_port"`
HTTPPort         int    `json:"http_port"`
Version          string `json:"version"`
}

type Producer struct {
//PeerInfo指针
peerInfo     *PeerInfo
tombstoned   bool
tombstonedAt time.Time
}

type Producers []*Producer

//实现String接口,打印出producer信息
func (p *Producer) String() string {
return fmt.Sprintf("%s [%d, %d]", p.peerInfo.BroadcastAddress, p.peerInfo.TCPPort, p.peerInfo.HTTPPort)
}

//producer标记为tombstoned 并记录当前时间
func (p *Producer) Tombstone() {
p.tombstoned = true
p.tombstonedAt = time.Now()
}

//判断producer是否是tombstoned
func (p *Producer) IsTombstoned(lifetime time.Duration) bool {
return p.tombstoned && time.Now().Sub(p.tombstonedAt) < lifetime
}

//初始化一个RegistrationDB
func NewRegistrationDB() *RegistrationDB {
return &RegistrationDB{
registrationMap: make(map[Registration]Producers),
}
}

// add a registration key
//添加一个Registration key 如果不存在Map中则将其设置为你一个空的Producer
func (r *RegistrationDB) AddRegistration(k Registration) {
r.Lock()
defer r.Unlock()
_, ok := r.registrationMap[k]
if !ok {
r.registrationMap[k] = Producers{}
}
}

// add a producer to a registration
//添加一个producer到registration中
func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
r.Lock()
defer r.Unlock()
producers := r.registrationMap[k]
found := false
for _, producer := range producers {
if producer.peerInfo.id == p.peerInfo.id {
found = true
}
}
if found == false {
r.registrationMap[k] = append(producers, p)
}
return !found
}

// remove a producer from a registration
//移除registration中一个producer
func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) {
r.Lock()
defer r.Unlock()
producers, ok := r.registrationMap[k]
if !ok {
return false, 0
}
removed := false
//这里用到里从一个slice中删除一个元素的方法
cleaned := Producers{}
for _, producer := range producers {
if producer.peerInfo.id != id {
cleaned = append(cleaned, producer)
} else {
removed = true
}
}
// Note: this leaves keys in the DB even if they have empty lists
r.registrationMap[k] = cleaned
//返货是否移除以及新的producers长度
return removed, len(cleaned)
}

// remove a Registration and all it's producers
//删除registration下所有的producers
func (r *RegistrationDB) RemoveRegistration(k Registration) {
r.Lock()
defer r.Unlock()
delete(r.registrationMap, k)
}

func (r *RegistrationDB) needFilter(key string, subkey string) bool {
return key == "*" || subkey == "*"
}

//根据category key subkey查找Registrations
//如果传入的key 或 subkey为*的话则获取所有的registrationMap中所有的registration
//如果key 或 subkey 不为* 的话则 获取具体的registration
//这里实现了类似 * 这个通配符的概念
func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations {
r.RLock()
defer r.RUnlock()
if !r.needFilter(key, subkey) {
k := Registration{category, key, subkey}
if _, ok := r.registrationMap[k]; ok {
return Registrations{k}
}
return Registrations{}
}
results := Registrations{}
for k := range r.registrationMap {
if !k.IsMatch(category, key, subkey) {
continue
}
results = append(results, k)
}
return results
}

//根据category key subkey查找所有的Producer
//同上面的FindRegistrations函数一样,实现了*通配符的概念
func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers {
r.RLock()
defer r.RUnlock()
if !r.needFilter(key, subkey) {
k := Registration{category, key, subkey}
return r.registrationMap[k]
}

results := Producers{}
for k, producers := range r.registrationMap {
if !k.IsMatch(category, key, subkey) {
continue
}
for _, producer := range producers {
found := false
for _, p := range results {
if producer.peerInfo.id == p.peerInfo.id {
found = true
}
}
if found == false {
results = append(results, producer)
}
}
}
return results
}

//根据producer.peerInfo.id查找所属的registration key
func (r *RegistrationDB) LookupRegistrations(id string) Registrations {
r.RLock()
defer r.RUnlock()
results := Registrations{}
for k, producers := range r.registrationMap {
for _, p := range producers {
if p.peerInfo.id == id {
results = append(results, k)
break
}
}
}
return results
}

//依据Registration中的category key subkey,判断是否与Registration匹配
func (k Registration) IsMatch(category string, key string, subkey string) bool {
if category != k.Category {
return false
}
if key != "*" && k.Key != key {
return false
}
if subkey != "*" && k.SubKey != subkey {
return false
}
return true
}

//根据category key subkey过滤Registrations
func (rr Registrations) Filter(category string, key string, subkey string) Registrations {
output := Registrations{}
for _, k := range rr {
if k.IsMatch(category, key, subkey) {
output = append(output, k)
}
}
return output
}

//获取registrationMap中所有Registration的key
func (rr Registrations) Keys() []string {
keys := make([]string, len(rr))
for i, k := range rr {
keys[i] = k.Key
}
return keys
}

//获取registrationMap中所有Registration的subkey
func (rr Registrations) SubKeys() []string {
subkeys := make([]string, len(rr))
for i, k := range rr {
subkeys[i] = k.SubKey
}
return subkeys
}

//过滤出所有可用的Producer
func (pp Producers) FilterByActive(inactivityTimeout time.Duration, tombstoneLifetime time.Duration) Producers {
now := time.Now()
results := Producers{}
for _, p := range pp {
cur := time.Unix(0, atomic.LoadInt64(&p.peerInfo.lastUpdate))
if now.Sub(cur) > inactivityTimeout || p.IsTombstoned(tombstoneLifetime) {
continue
}
results = append(results, p)
}
return results
}

//获取Producers中所有的PeerInfo
func (pp Producers) PeerInfo() []*PeerInfo {
results := []*PeerInfo{}
for _, p := range pp {
results = append(results, p.peerInfo)
}
return results
}

通过上面代码的分析我们不难看出
registration_db.go
文件用
map
的形式保存
Producer
,并提供一系列增、删、改、查的操作。同时使用
RWMutex
做并发控制。

到这里我们讲解了
nsqlookupd
tcpServer
的全部代码了,我们了解了
nsqlookupd
是用来发现并记录
nsqd
服务相关的remot address
tcp
端口
http
端口等信息 以及 相应的
topic
channel
信息的功能,这样好方便消费查询相应的
topic
channel
nsqd
服务链接信息,已实现对
nsqd
进行拓扑管理的功能。

下一章我们开始分析
nsqlookupd
中的
httpServer
相关的代码

PS:顺便附送前面三章的传送门

NSQ系列之nsqlookupd代码分析一(初探nsqlookup)

NSQ系列之nsqlookupd代码分析二(初识nsqlookupd tcpServer)

NSQ系列之nsqlookupd代码分析三(详解tcpServer 中的IOLoop方法)

PS:如果文中有错误,欢迎大家指正哦。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  nsq nsqd nsqlookupd golang