您的位置:首页 > 其它

nsq源码导读(1.1) nsq的核心nsqd

2015-07-05 17:13 399 查看

Nsqd研究

简介

nsqd是nsq中的核心,它实现了消息的订阅与分发.之前我把nsq当成了对外的处理推送的系统,经过梳理发现其原来是一个分布式消息队列.通过topic->channel实现了广播,而又通过channel->consumer实现了消息的分发.

举个例子,在一个大型推送系统内,存在了上亿的用户,若想给特定的用户推送信息,那么需要将用户全部找出然后将用户信息交予推送服务器推送信息.在这个过程中,由于用户数量庞大那么查询速度可能很慢,且一次性交予推送服务器这么多数据推送服务器不一定能完全消化,故上诉就存在了两个性能的瓶颈.当然在此也可以使用异步或分批查询的方式自行处理,那工作量会大很多.

然而有个更简单的方案,那就是使用消息队列,数据库分批查询后将消息发布到消息推了,而订阅了该消息的推送服务器则从队列中取出消息进行处理,这样一来他们将相互不受对方的性能瓶颈约束,即用户还没有查询完推送服务器就可能将消息处理完毕,或用户已经查询完毕,数据库已经在干其他事,推送服务器仍然在处理之前的消息.

各文件详解

apps/nsqd/nsqd.go

此文件是nsqd的入口程序,首先设定了nsqd的可配置参数提供用户根据自身情况配置nsqd具体配置信息nsq官网有介绍

func main() {

flagSet := nsqFlagset()
flagSet.Parse(os.Args[1:])

rand.Seed(time.Now().UTC().UnixNano())

if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Println(version.String("nsqd"))
return
}

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

var cfg config
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
}
}
cfg.Validate()

opts := nsqd.NewNSQDOptions()
options.Resolve(opts, flagSet, cfg)
nsqd := nsqd.NewNSQD(opts)

nsqd.LoadMetadata()
err := nsqd.PersistMetadata()
if err != nil {
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
nsqd.Main()
<-signalChan
nsqd.Exit()
}


大体步骤与nsqlookup开始时相同,但nsqd启动之前会先读取本地配置(也许此次启动不是首次启动)读取上一次关闭nsqd时未处理的信息(topic与channel),而后紧跟着
nsqd.PersistMetadata()
将启动之前的状态保存下来.而后调用
nsqd.Main()
开始了处理

nsq/nsqd.go

func (n *NSQD) Main() {
var httpListener net.Listener
var httpsListener net.Listener

ctx := &context{n}
// 初始化tcp连接
tcpListener, err := net.Listen("tcp", n.opts.TCPAddress)
if err != nil {
n.logf("FATAL: listen (%s) failed - %s", n.opts.TCPAddress, err)
os.Exit(1)
}
n.Lock()
n.tcpListener = tcpListener
n.Unlock()
tcpServer := &tcpServer{ctx: ctx}
n.waitGroup.Wrap(func() {
protocol.TCPServer(n.tcpListener, tcpServer, n.opts.Logger)
})

// 若配置了tls则开启https
if n.tlsConfig != nil && n.opts.HTTPSAddress != "" {
httpsListener, err = tls.Listen("tcp", n.opts.HTTPSAddress, n.tlsConfig)
if err != nil {
n.logf("FATAL: listen (%s) failed - %s", n.opts.HTTPSAddress, err)
os.Exit(1)
}
n.Lock()
n.httpsListener = httpsListener
n.Unlock()
httpsServer := &httpServer{
ctx:         ctx,
tlsEnabled:  true,
tlsRequired: true,
}
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpsListener, httpsServer, n.opts.Logger, "HTTPS")
})
}
// 配置http
httpListener, err = net.Listen("tcp", n.opts.HTTPAddress)
if err != nil {
n.logf("FATAL: listen (%s) failed - %s", n.opts.HTTPAddress, err)
os.Exit(1)
}
n.Lock()
n.httpListener = httpListener
n.Unlock()
httpServer := &httpServer{
ctx:         ctx,
tlsEnabled:  false,
tlsRequired: n.opts.TLSRequired == TLSRequired,
}
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpListener, httpServer, n.opts.Logger, "HTTP")
})

n.waitGroup.Wrap(func() { n.queueScanLoop() })
n.waitGroup.Wrap(func() { n.idPump() })
n.waitGroup.Wrap(func() { n.lookupLoop() })
if n.opts.StatsdAddress != "" {
n.waitGroup.Wrap(func() { n.statsdLoop() })
}
}


从Main方法中可以看出nsqd在启动的时候开启了7个轮询(若全部开启).

tcp:用于等待连接nsqd的客户端

https/http:提供了http API

n.queueScanLoop:循环处理消息的分发

n.idPump:生产唯一消息id的一个队列

n.lookupLoop:更新nsqd后给nsqlookup推送最新信息

n.statsdLoop:

由于tcp和https/http不再此类,余下在进行讲解.

idPump

首先先来看最简单的
n.idPump


func (n *NSQD) idPump() {
factory := &guidFactory{}
lastError := time.Unix(0, 0)
for {
id, err := factory.NewGUID(n.opts.ID)
if err != nil {
now := time.Now()
if now.Sub(lastError) > time.Second {
// only print the error once/second
n.logf("ERROR: %s", err)
lastError = now
}
runtime.Gosched()
continue
}
select {
case n.idChan <- id.Hex():
case <-n.exitChan:
goto exit
}
}

exit:
n.logf("ID: closing")
}


此方法很简单,一直轮询知道程序退出,一直在生产唯一id,由于id是根据时间作为种子故若不使用队列则可能会存在id重复的问题.idPump会向
n.idChan
一直写入id直至idChan满或退出程序.

lookupLoop

for _, host := range n.opts.NSQLookupdTCPAddresses {
n.logf("LOOKUP: adding peer %s", host)
// 新建lookupPeer对象,该对象存储了nsqlookup的信息
lookupPeer := newLookupPeer(host, n.opts.Logger, func(lp *lookupPeer) {
ci := make(map[string]interface{})
ci["version"] = version.Binary
ci["tcp_port"] = n.RealTCPAddr().Port
ci["http_port"] = n.RealHTTPAddr().Port
ci["hostname"] = hostname
ci["broadcast_address"] = n.opts.BroadcastAddress
// 将nsqd基本信息封装后发送给nsqlookup
cmd, err := nsq.Identify(ci)
if err != nil {
lp.Close()
return
}
// 解析nsqlookup返回的信息存入lookupPeer中
resp, err := lp.Command(cmd)
if err != nil {
n.logf("LOOKUPD(%s): ERROR %s - %s", lp, cmd, err)
} else if bytes.Equal(resp, []byte("E_INVALID")) {
n.logf("LOOKUPD(%s): lookupd returned %s", lp, resp)
} else {
err = json.Unmarshal(resp, &lp.Info)
if err != nil {
n.logf("LOOKUPD(%s): ERROR parsing response - %s", lp, resp)
} else {
n.logf("LOOKUPD(%s): peer info %+v", lp, lp.Info)
}
}

// 像syncTopicChan通道发送一条该nsqlookup已准备的消息
go func() {
syncTopicChan <- lp
}()
})
lookupPeer.Command(nil) // start the connection
n.lookupPeers = append(n.lookupPeers, lookupPeer)
}


首先轮询开始之前退根据配置
n.opts.NSQLookupdTCPAddresses
获取所有的nsqlookup并连接,相互发送配置文件,当然这些行为都是异步的,在nsqd与nsqlookup能开始交互之后通过
syncTopicChan
告知某个nsqlookup已准备好.

// for announcements, lookupd determines the host automatically
ticker := time.Tick(15 * time.Second)
for {
select {
case <-ticker:
// send a heartbeat and read a response (read detects closed conns)
for _, lookupPeer := range n.lookupPeers {
n.logf("LOOKUPD(%s): sending heartbeat", lookupPeer)
cmd := nsq.Ping()
_, err := lookupPeer.Command(cmd)
if err != nil {
n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err)
}
}
case val := <-n.notifyChan:
var cmd *nsq.Command
var branch string

switch val.(type) {
case *Channel:
// notify all nsqlookupds that a new channel exists, or that it's removed
branch = "channel"
channel := val.(*Channel)
if channel.Exiting() == true {
cmd = nsq.UnRegister(channel.topicName, channel.name)
} else {
cmd = nsq.Register(channel.topicName, channel.name)
}
case *Topic:
// notify all nsqlookupds that a new topic exists, or that it's removed
branch = "topic"
topic := val.(*Topic)
if topic.Exiting() == true {
cmd = nsq.UnRegister(topic.name, "")
} else {
cmd = nsq.Register(topic.name, "")
}
}

for _, lookupPeer := range n.lookupPeers {
n.logf("LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
_, err := lookupPeer.Command(cmd)
if err != nil {
n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err)
}
}
case lookupPeer := <-syncTopicChan: // 注册nsqlookup后将nsqd现有状态同步至nsqlookup
var commands []*nsq.Command
// build all the commands first so we exit the lock(s) as fast as possible
n.RLock()
for _, topic := range n.topicMap {
topic.RLock()
if len(topic.channelMap) == 0 {
commands = append(commands, nsq.Register(topic.name, ""))
} else {
for _, channel := range topic.channelMap {
commands = append(commands, nsq.Register(channel.topicName, channel.name))
}
}
topic.RUnlock()
}
n.RUnlock()

for _, cmd := range commands {
n.logf("LOOKUPD(%s): %s", lookupPeer, cmd)
_, err := lookupPeer.Command(cmd)
if err != nil {
n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err)
break
}
}
case <-n.exitChan:
goto exit
}
}


<-ticker:心跳计时器,每隔15秒像服务器发送送心跳

lookupPeer := <-syncTopicChan:如上所讲,当nsqlookup配置完毕后到该通道,接下来nsqd会将所有的订阅关系发送至nsqlookup进行注册.

val := <-n.notifyChan:每当本地订阅发生变化时topic/channel被删除或增加,像nsqlookup发送状态变化的通知.

n.statsdLoop

根据
n.opts.StatsdAddress, n.opts.StatsdPrefix
指定的状态接口定时将nsqd的状态以短连接的方式发送至一个状态监护进程.包括了nsqd的应用资源信息,以及nsqd上topic的信息.

n.queueScanLoop()

该loop在一定的间隔内处理消息,将消息分发给消费者,由于比较复杂我只能从我理解的代码上进行讲解.

func (n *NSQD) queueScanLoop() {
workCh := make(chan *Channel, n.opts.QueueScanSelectionCount)
responseCh := make(chan bool, n.opts.QueueScanSelectionCount)
closeCh := make(chan int)

workTicker := time.NewTicker(n.opts.QueueScanInterval)
refreshTicker := time.NewTicker(n.opts.QueueScanRefreshInterval)

channels := n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)

for {
select {
case <-workTicker.C:
if len(channels) == 0 {
continue
}
case <-refreshTicker.C:
channels = n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
continue
case <-n.exitChan:
goto exit
}

num := n.opts.QueueScanSelectionCount
if num > len(channels) {
num = len(channels)
}

loop:
// 随机处理channel
for _, i := range util.UniqRands(num, len(channels)) {
workCh <- channels[i]
}

numDirty := 0
for i := 0; i < num; i++ {
if <-responseCh {
numDirty++
}
}

if float64(numDirty)/float64(num) > n.opts.QueueScanDirtyPercent {
goto loop
}
}

exit:
n.logf("QUEUESCAN: closing")
close(closeCh)
workTicker.Stop()
refreshTicker.Stop()
}


<-refreshTicker.C
表示每隔一定时间就会刷新channel状态,意思就是当某个channel订阅了topic后可能会过一定的时间后才能对该通道发布数据(对topic推的消息还是会实时发送给channel,但由于没有刷新channel列表故不能像channel下的client发送消息)

<-workTicker
若channel不为0,将会开始处理消息,处理时随机抽样,直至处理到抽样比率小于n.opts.QueueScanDirtyPercent后开始下次循环.每次将需处理的消息放入
workCh
内交予queueScanWorker内处理.

queueScanWorker

func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
for {
select {
case c := <-workCh:
now := time.Now().UnixNano()
dirty := false
if c.processInFlightQueue(now) {
dirty = true
}
if c.processDeferredQueue(now) {
dirty = true
}
responseCh <- dirty
case <-closeCh:
return
}
}
}


queueScanWorker会根据获取到的channel对象,从其InFlight队列和Deferred队列中取出要消息进行处理,
dirty
为true代表以及处理channel中的消息.

resizePool

该方法则根据当前的channel数量,以及
n.opts.QueueScanSelectionCount
控制queueScanWorker的数量.

GetTopic

// 线程安全的方式获取一个Topic对象
// GetTopic performs a thread safe operation
// to return a pointer to a Topic object (potentially new)
func (n *NSQD) GetTopic(topicName string) *Topic {
// 若Topic存在则直接获取
n.Lock()
t, ok := n.topicMap[topicName]
if ok {
n.Unlock()
return t
}

// 不存在则创建一个
deleteCallback := func(t *Topic) {
n.DeleteExistingTopic(t.name)
}
t = NewTopic(topicName, &context{n}, deleteCallback)
n.topicMap[topicName] = t

n.logf("TOPIC(%s): created", t.name)

// release our global nsqd lock, and switch to a more granular topic lock while we init our
// channels from lookupd. This blocks concurrent PutMessages to this topic.
t.Lock()
n.Unlock()
// if using lookupd, make a blocking call to get the topics, and immediately create them.
// this makes sure that any message received is buffered to the right channels
// 若存在lookup,同步topic数据(若其他nsqd存在此topic,将其信息copy至下来)
if len(n.lookupPeers) > 0 {
channelNames, _ := lookupd.GetLookupdTopicChannels(t.name, n.lookupHTTPAddrs())
for _, channelName := range channelNames {
if strings.HasSuffix(channelName, "#ephemeral") {
// we don't want to pre-create ephemeral channels
// because there isn't a client connected
continue
}
t.getOrCreateChannel(channelName)
}
}
t.Unlock()

// NOTE: I would prefer for this to only happen in topic.GetChannel() but we're special
// casing the code above so that we can control the locks such that it is impossible
// for a message to be written to a (new) topic while we're looking up channels
// from lookupd...
//
// update messagePump state
// 更新topic状态
select {
case t.channelUpdateChan <- 1:
case <-t.exitChan:
}
return t
}


该方法获取(若没有则创建)一个topic对象,若获取的topic对象不存在则创建后从nsqlookup上搜索已有此topic的注册信息,若存在则将其cp一份至本地,同步订阅状态.

在此我有个疑问,若nsqdA将topicA订阅信息注册至nsqlookupA后,nsqdB同步topicA状态后又在此基础之上添加了topicA->channelB关系,虽然此状态会注册到nsqlookup上,但nsqdA却没有办法获取到此条状态.因为同步状态只会在首次创建topic时才会发生,而后nsqlookup更新状态后不会将最新状态反馈给nsqd

Notify

// 添加或移除topic/channel时将会调用
func (n *NSQD) Notify(v interface{}) {
// since the in-memory metadata is incomplete,
// should not persist metadata while loading it.
// nsqd will call `PersistMetadata` it after loading
persist := !n.getFlag(flagLoading)
n.waitGroup.Wrap(func() {
// by selecting on exitChan we guarantee that
// we do not block exit, see issue #123
select {
case <-n.exitChan:
case n.notifyChan <- v:
if !persist {
return
}
n.Lock()
err := n.PersistMetadata()
if err != nil {
n.logf("ERROR: failed to persist metadata - %s", err)
}
n.Unlock()
}
})
}


此方法在添加或移除topic/channel时将会调用,目的是为了通知nsqlookup注册信息有改变.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: