nsq源码导读(1.1) nsq的核心nsqd
2015-07-05 17:13
399 查看
Nsqd研究
简介
nsqd是nsq中的核心,它实现了消息的订阅与分发.之前我把nsq当成了对外的处理推送的系统,经过梳理发现其原来是一个分布式消息队列.通过topic->channel实现了广播,而又通过channel->consumer实现了消息的分发.举个例子,在一个大型推送系统内,存在了上亿的用户,若想给特定的用户推送信息,那么需要将用户全部找出然后将用户信息交予推送服务器推送信息.在这个过程中,由于用户数量庞大那么查询速度可能很慢,且一次性交予推送服务器这么多数据推送服务器不一定能完全消化,故上诉就存在了两个性能的瓶颈.当然在此也可以使用异步或分批查询的方式自行处理,那工作量会大很多.
然而有个更简单的方案,那就是使用消息队列,数据库分批查询后将消息发布到消息推了,而订阅了该消息的推送服务器则从队列中取出消息进行处理,这样一来他们将相互不受对方的性能瓶颈约束,即用户还没有查询完推送服务器就可能将消息处理完毕,或用户已经查询完毕,数据库已经在干其他事,推送服务器仍然在处理之前的消息.
各文件详解
apps/nsqd/nsqd.go
此文件是nsqd的入口程序,首先设定了nsqd的可配置参数提供用户根据自身情况配置nsqd具体配置信息nsq官网有介绍func main() { flagSet := nsqFlagset() flagSet.Parse(os.Args[1:]) rand.Seed(time.Now().UTC().UnixNano()) if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) { fmt.Println(version.String("nsqd")) return } signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) var cfg config configFile := flagSet.Lookup("config").Value.String() if configFile != "" { _, err := toml.DecodeFile(configFile, &cfg) if err != nil { log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error()) } } cfg.Validate() opts := nsqd.NewNSQDOptions() options.Resolve(opts, flagSet, cfg) nsqd := nsqd.NewNSQD(opts) nsqd.LoadMetadata() err := nsqd.PersistMetadata() if err != nil { log.Fatalf("ERROR: failed to persist metadata - %s", err.Error()) } nsqd.Main() <-signalChan nsqd.Exit() }
大体步骤与nsqlookup开始时相同,但nsqd启动之前会先读取本地配置(也许此次启动不是首次启动)读取上一次关闭nsqd时未处理的信息(topic与channel),而后紧跟着
nsqd.PersistMetadata()将启动之前的状态保存下来.而后调用
nsqd.Main()开始了处理
nsq/nsqd.go
func (n *NSQD) Main() { var httpListener net.Listener var httpsListener net.Listener ctx := &context{n} // 初始化tcp连接 tcpListener, err := net.Listen("tcp", n.opts.TCPAddress) if err != nil { n.logf("FATAL: listen (%s) failed - %s", n.opts.TCPAddress, err) os.Exit(1) } n.Lock() n.tcpListener = tcpListener n.Unlock() tcpServer := &tcpServer{ctx: ctx} n.waitGroup.Wrap(func() { protocol.TCPServer(n.tcpListener, tcpServer, n.opts.Logger) }) // 若配置了tls则开启https if n.tlsConfig != nil && n.opts.HTTPSAddress != "" { httpsListener, err = tls.Listen("tcp", n.opts.HTTPSAddress, n.tlsConfig) if err != nil { n.logf("FATAL: listen (%s) failed - %s", n.opts.HTTPSAddress, err) os.Exit(1) } n.Lock() n.httpsListener = httpsListener n.Unlock() httpsServer := &httpServer{ ctx: ctx, tlsEnabled: true, tlsRequired: true, } n.waitGroup.Wrap(func() { http_api.Serve(n.httpsListener, httpsServer, n.opts.Logger, "HTTPS") }) } // 配置http httpListener, err = net.Listen("tcp", n.opts.HTTPAddress) if err != nil { n.logf("FATAL: listen (%s) failed - %s", n.opts.HTTPAddress, err) os.Exit(1) } n.Lock() n.httpListener = httpListener n.Unlock() httpServer := &httpServer{ ctx: ctx, tlsEnabled: false, tlsRequired: n.opts.TLSRequired == TLSRequired, } n.waitGroup.Wrap(func() { http_api.Serve(n.httpListener, httpServer, n.opts.Logger, "HTTP") }) n.waitGroup.Wrap(func() { n.queueScanLoop() }) n.waitGroup.Wrap(func() { n.idPump() }) n.waitGroup.Wrap(func() { n.lookupLoop() }) if n.opts.StatsdAddress != "" { n.waitGroup.Wrap(func() { n.statsdLoop() }) } }
从Main方法中可以看出nsqd在启动的时候开启了7个轮询(若全部开启).
tcp:用于等待连接nsqd的客户端
https/http:提供了http API
n.queueScanLoop:循环处理消息的分发
n.idPump:生产唯一消息id的一个队列
n.lookupLoop:更新nsqd后给nsqlookup推送最新信息
n.statsdLoop:
由于tcp和https/http不再此类,余下在进行讲解.
idPump
首先先来看最简单的n.idPump
func (n *NSQD) idPump() { factory := &guidFactory{} lastError := time.Unix(0, 0) for { id, err := factory.NewGUID(n.opts.ID) if err != nil { now := time.Now() if now.Sub(lastError) > time.Second { // only print the error once/second n.logf("ERROR: %s", err) lastError = now } runtime.Gosched() continue } select { case n.idChan <- id.Hex(): case <-n.exitChan: goto exit } } exit: n.logf("ID: closing") }
此方法很简单,一直轮询知道程序退出,一直在生产唯一id,由于id是根据时间作为种子故若不使用队列则可能会存在id重复的问题.idPump会向
n.idChan一直写入id直至idChan满或退出程序.
lookupLoop
for _, host := range n.opts.NSQLookupdTCPAddresses { n.logf("LOOKUP: adding peer %s", host) // 新建lookupPeer对象,该对象存储了nsqlookup的信息 lookupPeer := newLookupPeer(host, n.opts.Logger, func(lp *lookupPeer) { ci := make(map[string]interface{}) ci["version"] = version.Binary ci["tcp_port"] = n.RealTCPAddr().Port ci["http_port"] = n.RealHTTPAddr().Port ci["hostname"] = hostname ci["broadcast_address"] = n.opts.BroadcastAddress // 将nsqd基本信息封装后发送给nsqlookup cmd, err := nsq.Identify(ci) if err != nil { lp.Close() return } // 解析nsqlookup返回的信息存入lookupPeer中 resp, err := lp.Command(cmd) if err != nil { n.logf("LOOKUPD(%s): ERROR %s - %s", lp, cmd, err) } else if bytes.Equal(resp, []byte("E_INVALID")) { n.logf("LOOKUPD(%s): lookupd returned %s", lp, resp) } else { err = json.Unmarshal(resp, &lp.Info) if err != nil { n.logf("LOOKUPD(%s): ERROR parsing response - %s", lp, resp) } else { n.logf("LOOKUPD(%s): peer info %+v", lp, lp.Info) } } // 像syncTopicChan通道发送一条该nsqlookup已准备的消息 go func() { syncTopicChan <- lp }() }) lookupPeer.Command(nil) // start the connection n.lookupPeers = append(n.lookupPeers, lookupPeer) }
首先轮询开始之前退根据配置
n.opts.NSQLookupdTCPAddresses获取所有的nsqlookup并连接,相互发送配置文件,当然这些行为都是异步的,在nsqd与nsqlookup能开始交互之后通过
syncTopicChan告知某个nsqlookup已准备好.
// for announcements, lookupd determines the host automatically ticker := time.Tick(15 * time.Second) for { select { case <-ticker: // send a heartbeat and read a response (read detects closed conns) for _, lookupPeer := range n.lookupPeers { n.logf("LOOKUPD(%s): sending heartbeat", lookupPeer) cmd := nsq.Ping() _, err := lookupPeer.Command(cmd) if err != nil { n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err) } } case val := <-n.notifyChan: var cmd *nsq.Command var branch string switch val.(type) { case *Channel: // notify all nsqlookupds that a new channel exists, or that it's removed branch = "channel" channel := val.(*Channel) if channel.Exiting() == true { cmd = nsq.UnRegister(channel.topicName, channel.name) } else { cmd = nsq.Register(channel.topicName, channel.name) } case *Topic: // notify all nsqlookupds that a new topic exists, or that it's removed branch = "topic" topic := val.(*Topic) if topic.Exiting() == true { cmd = nsq.UnRegister(topic.name, "") } else { cmd = nsq.Register(topic.name, "") } } for _, lookupPeer := range n.lookupPeers { n.logf("LOOKUPD(%s): %s %s", lookupPeer, branch, cmd) _, err := lookupPeer.Command(cmd) if err != nil { n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err) } } case lookupPeer := <-syncTopicChan: // 注册nsqlookup后将nsqd现有状态同步至nsqlookup var commands []*nsq.Command // build all the commands first so we exit the lock(s) as fast as possible n.RLock() for _, topic := range n.topicMap { topic.RLock() if len(topic.channelMap) == 0 { commands = append(commands, nsq.Register(topic.name, "")) } else { for _, channel := range topic.channelMap { commands = append(commands, nsq.Register(channel.topicName, channel.name)) } } topic.RUnlock() } n.RUnlock() for _, cmd := range commands { n.logf("LOOKUPD(%s): %s", lookupPeer, cmd) _, err := lookupPeer.Command(cmd) if err != nil { n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err) break } } case <-n.exitChan: goto exit } }
<-ticker:心跳计时器,每隔15秒像服务器发送送心跳
lookupPeer := <-syncTopicChan:如上所讲,当nsqlookup配置完毕后到该通道,接下来nsqd会将所有的订阅关系发送至nsqlookup进行注册.
val := <-n.notifyChan:每当本地订阅发生变化时topic/channel被删除或增加,像nsqlookup发送状态变化的通知.
n.statsdLoop
根据n.opts.StatsdAddress, n.opts.StatsdPrefix指定的状态接口定时将nsqd的状态以短连接的方式发送至一个状态监护进程.包括了nsqd的应用资源信息,以及nsqd上topic的信息.
n.queueScanLoop()
该loop在一定的间隔内处理消息,将消息分发给消费者,由于比较复杂我只能从我理解的代码上进行讲解.func (n *NSQD) queueScanLoop() { workCh := make(chan *Channel, n.opts.QueueScanSelectionCount) responseCh := make(chan bool, n.opts.QueueScanSelectionCount) closeCh := make(chan int) workTicker := time.NewTicker(n.opts.QueueScanInterval) refreshTicker := time.NewTicker(n.opts.QueueScanRefreshInterval) channels := n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) for { select { case <-workTicker.C: if len(channels) == 0 { continue } case <-refreshTicker.C: channels = n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit } num := n.opts.QueueScanSelectionCount if num > len(channels) { num = len(channels) } loop: // 随机处理channel for _, i := range util.UniqRands(num, len(channels)) { workCh <- channels[i] } numDirty := 0 for i := 0; i < num; i++ { if <-responseCh { numDirty++ } } if float64(numDirty)/float64(num) > n.opts.QueueScanDirtyPercent { goto loop } } exit: n.logf("QUEUESCAN: closing") close(closeCh) workTicker.Stop() refreshTicker.Stop() }
<-refreshTicker.C表示每隔一定时间就会刷新channel状态,意思就是当某个channel订阅了topic后可能会过一定的时间后才能对该通道发布数据(对topic推的消息还是会实时发送给channel,但由于没有刷新channel列表故不能像channel下的client发送消息)
<-workTicker若channel不为0,将会开始处理消息,处理时随机抽样,直至处理到抽样比率小于n.opts.QueueScanDirtyPercent后开始下次循环.每次将需处理的消息放入
workCh内交予queueScanWorker内处理.
queueScanWorker
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) { for { select { case c := <-workCh: now := time.Now().UnixNano() dirty := false if c.processInFlightQueue(now) { dirty = true } if c.processDeferredQueue(now) { dirty = true } responseCh <- dirty case <-closeCh: return } } }
queueScanWorker会根据获取到的channel对象,从其InFlight队列和Deferred队列中取出要消息进行处理,
dirty为true代表以及处理channel中的消息.
resizePool
该方法则根据当前的channel数量,以及n.opts.QueueScanSelectionCount控制queueScanWorker的数量.
GetTopic
// 线程安全的方式获取一个Topic对象 // GetTopic performs a thread safe operation // to return a pointer to a Topic object (potentially new) func (n *NSQD) GetTopic(topicName string) *Topic { // 若Topic存在则直接获取 n.Lock() t, ok := n.topicMap[topicName] if ok { n.Unlock() return t } // 不存在则创建一个 deleteCallback := func(t *Topic) { n.DeleteExistingTopic(t.name) } t = NewTopic(topicName, &context{n}, deleteCallback) n.topicMap[topicName] = t n.logf("TOPIC(%s): created", t.name) // release our global nsqd lock, and switch to a more granular topic lock while we init our // channels from lookupd. This blocks concurrent PutMessages to this topic. t.Lock() n.Unlock() // if using lookupd, make a blocking call to get the topics, and immediately create them. // this makes sure that any message received is buffered to the right channels // 若存在lookup,同步topic数据(若其他nsqd存在此topic,将其信息copy至下来) if len(n.lookupPeers) > 0 { channelNames, _ := lookupd.GetLookupdTopicChannels(t.name, n.lookupHTTPAddrs()) for _, channelName := range channelNames { if strings.HasSuffix(channelName, "#ephemeral") { // we don't want to pre-create ephemeral channels // because there isn't a client connected continue } t.getOrCreateChannel(channelName) } } t.Unlock() // NOTE: I would prefer for this to only happen in topic.GetChannel() but we're special // casing the code above so that we can control the locks such that it is impossible // for a message to be written to a (new) topic while we're looking up channels // from lookupd... // // update messagePump state // 更新topic状态 select { case t.channelUpdateChan <- 1: case <-t.exitChan: } return t }
该方法获取(若没有则创建)一个topic对象,若获取的topic对象不存在则创建后从nsqlookup上搜索已有此topic的注册信息,若存在则将其cp一份至本地,同步订阅状态.
在此我有个疑问,若nsqdA将topicA订阅信息注册至nsqlookupA后,nsqdB同步topicA状态后又在此基础之上添加了topicA->channelB关系,虽然此状态会注册到nsqlookup上,但nsqdA却没有办法获取到此条状态.因为同步状态只会在首次创建topic时才会发生,而后nsqlookup更新状态后不会将最新状态反馈给nsqd
Notify
// 添加或移除topic/channel时将会调用 func (n *NSQD) Notify(v interface{}) { // since the in-memory metadata is incomplete, // should not persist metadata while loading it. // nsqd will call `PersistMetadata` it after loading persist := !n.getFlag(flagLoading) n.waitGroup.Wrap(func() { // by selecting on exitChan we guarantee that // we do not block exit, see issue #123 select { case <-n.exitChan: case n.notifyChan <- v: if !persist { return } n.Lock() err := n.PersistMetadata() if err != nil { n.logf("ERROR: failed to persist metadata - %s", err) } n.Unlock() } }) }
此方法在添加或移除topic/channel时将会调用,目的是为了通知nsqlookup注册信息有改变.
相关文章推荐
- 【C语言】printf函数和scanf函数典型例子
- wampserver2.2 在window2003下的安装的主要问题
- VOB管理概述
- wampserver You don't have permission to access / on this server. 解决方法
- Eclipse删除代码中所有注释及空格
- Spring 在XML中声明切面/AOP
- 最正确的学习方法,程序员如何学习一项你之前完全不了解的新技术?
- linux驱动之异步通知
- Asp.net Vnext Routing
- 基于u盘安装centos6.0
- Hekaton是如何影响你数据库的目标恢复时间(RTO)的
- cordova入门——创建cordova项目(二)
- 基本图像处理
- Redis 数据结构使用场景
- Ogre编程入门与进阶】第十七章 Ogre3D 和CEGUI 的结合
- leetcode Balanced Binary Tree 树
- android installer hijacking---安卓安装劫持
- 如何创建phpinfo查看php信息?
- 民科到底是什么?
- 图解YARN工作原理