nsqlookupd tcpServer 命令分析
2014-08-01 17:15
225 查看
上上篇分析了,nsqlookupd 命令行参数解析和启动流程
上篇分析了,nsqlookupd 监听协程,连接处理协程的流程
这篇文章分析 tcpServer 和 httpServer 每一个都处理什么命令,基本处理过程
一、tcpServer
1、case "PING”: 健康检查
更新 client.peerInfo.lastUpdate 为 time.Now().UnixNano()
回复 “OK”
2、case "IDENTIFY”: 身份认证,client 注册(ip,port等)
此命令,每个 client 只发送一次。
server 继续从 conn 读取1个 int32,这个数字表明 body 长度
err = binary.Read(reader, binary.BigEndian, &bodyLen)
继续从 conn 读取上面获取长度的字节流,原始报文是 json 格式
peerInfo := PeerInfo{id: client.RemoteAddr().String()}
err = json.Unmarshal(body, &peerInfo)
解码成 PeerInfo;同时 验证字段是否完整,设置 lateUpdate
下一步就是要把 producer 的信息保存起来,这块涉及一个非常重要的 struct,在 NSQLookupd 对其有一个指针。
type NSQLookupd struct {
// ............
DB *RegistrationDB
}
type RegistrationDB struct {
sync.RWMutex // 锁用来协调多个协程的访问
registrationMap map[Registration]Producers // producer slice
}
// 下面这句是非常重要的注册 添加生产者,注意到 Registration 只有第一个字段有值
// 添加时会做判断,根据 peerInfo.Id,也就是 client 的 ip 地址
p.context.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo})
注意其中锁的使用,sync.RWMutex 默认初始值就可以直接使用
r.Lock()
defer r.Unlock() // 方法结束时,确保释放锁
最后一步把 nsqlookupd 主机的信息编码,发送给 client
3、case "REGISTER”: topic,channel 注册
client 首先要 IDENTITY,即身份认证
// channel 注册
key := Registration{"channel", topic, channel}
p.context.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo})
// topic 注册
key := Registration{"topic", topic, ""}
p.context.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo})
注意 key 和 client 认证时候的不同
给 client 返回 “OK”
4、case "UNREGISTER”: 注销 topic, channel
这个地方有个小细节,不明白作者为什么要创建新的 Producer slice
cleaned := make(Producers, 0)
for _, producer := range producers {
if
producer.peerInfo.id
!= id {
cleaned = append(cleaned, producer)
} else {
removed = true
}
}
// Note: this leaves keys in the DB even if they have empty lists
r.registrationMap[k] = cleaned
上篇分析了,nsqlookupd 监听协程,连接处理协程的流程
这篇文章分析 tcpServer 和 httpServer 每一个都处理什么命令,基本处理过程
一、tcpServer
1、case "PING”: 健康检查
更新 client.peerInfo.lastUpdate 为 time.Now().UnixNano()
回复 “OK”
2、case "IDENTIFY”: 身份认证,client 注册(ip,port等)
此命令,每个 client 只发送一次。
server 继续从 conn 读取1个 int32,这个数字表明 body 长度
err = binary.Read(reader, binary.BigEndian, &bodyLen)
继续从 conn 读取上面获取长度的字节流,原始报文是 json 格式
peerInfo := PeerInfo{id: client.RemoteAddr().String()}
err = json.Unmarshal(body, &peerInfo)
解码成 PeerInfo;同时 验证字段是否完整,设置 lateUpdate
下一步就是要把 producer 的信息保存起来,这块涉及一个非常重要的 struct,在 NSQLookupd 对其有一个指针。
type NSQLookupd struct {
// ............
DB *RegistrationDB
}
type RegistrationDB struct {
sync.RWMutex // 锁用来协调多个协程的访问
registrationMap map[Registration]Producers // producer slice
}
// 下面这句是非常重要的注册 添加生产者,注意到 Registration 只有第一个字段有值
// 添加时会做判断,根据 peerInfo.Id,也就是 client 的 ip 地址
p.context.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo})
注意其中锁的使用,sync.RWMutex 默认初始值就可以直接使用
r.Lock()
defer r.Unlock() // 方法结束时,确保释放锁
最后一步把 nsqlookupd 主机的信息编码,发送给 client
3、case "REGISTER”: topic,channel 注册
client 首先要 IDENTITY,即身份认证
// channel 注册
key := Registration{"channel", topic, channel}
p.context.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo})
// topic 注册
key := Registration{"topic", topic, ""}
p.context.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo})
注意 key 和 client 认证时候的不同
给 client 返回 “OK”
4、case "UNREGISTER”: 注销 topic, channel
这个地方有个小细节,不明白作者为什么要创建新的 Producer slice
cleaned := make(Producers, 0)
for _, producer := range producers {
if
producer.peerInfo.id
!= id {
cleaned = append(cleaned, producer)
} else {
removed = true
}
}
// Note: this leaves keys in the DB even if they have empty lists
r.registrationMap[k] = cleaned
相关文章推荐
- NSQ系列之nsqlookupd代码分析三(详解tcpServer 中的IOLoop方法)
- NSQ系列之nsqlookupd代码分析二(初识nsqlookupd tcpServer)
- nsqlookupd httpServer 命令分析
- uIP TCP Server 运行机制分析
- uIP TCP Server 运行机制分析
- Delph7中TcpClient和TcpServer用法分析
- muduo源码分析之TcpServer
- muduo源码分析--事件回调层次是怎么传递的Tcpserver Channel TcpConnection
- curl 分析tcp与ssl握手时间命令
- muduo源码分析之多线程TcpServer
- Python 写的TCP Server端口转发,可用于协议分析
- 13.从代码分析TCPServer类的机制 (副标题:create-bind-listen)
- tomcat server组件监听shutdown命令关闭服务器之源码分析
- 【运维基本功】centos6.5下巧用netstat命令的参数分析TCP连接与关闭过程,图文详解
- 11-hbase-daemon.sh start regionserver启动命令分析
- POCO中的TCPServer分析
- muduo源码分析--TcpServer
- 分析openvswitch ovs-vsctl 命令与ovsdb-server的交互数据
- 用协议分析工具学习TCP/IP
- 关于TCP/IP数据包的截取和分析