美文网首页Golang
nsq源码(11) nsqlookupd与消费者交互

nsq源码(11) nsqlookupd与消费者交互

作者: Linrundong | 来源:发表于2019-02-13 09:39 被阅读9次

nsq一共提供了几种消费者客户端工具:nsq_to_file、nsq_to_http、nsq_to_nsq

nsq_to_file 消息写入文件

执行命令:nsq_to_file --topic=test --output-dir=/tmp --channel=chan --lookupd-http-address=127.0.0.1:4161

  • nsq_to_file提供连接tcp或http两种参数
    • -lookupd-http-address value
      lookupd HTTP address (may be given multiple times)
    • -nsqd-tcp-address value
      nsqd TCP address (may be given multiple times)
  • 总览nsq_to_file这个消费者的运行流程:
func main() {
    ...
    discoverer := newTopicDiscoverer(cfg, hupChan, termChan, *httpConnectTimeout, *httpRequestTimeout)
    
    // 请求 nsqlookupd,获取生产者信息并连接
    discoverer.updateTopics(topics, *topicPattern)
    // 开启一个poll线程
    discoverer.poller(lookupdHTTPAddrs, len(topics) == 0, *topicPattern)
}


func (t *TopicDiscoverer) updateTopics(topics []string, pattern string) {
    // 遍历处理topics
    for _, topic := range topics {
        cfl, err := newConsumerFileLogger(topic, t.cfg)
        if err != nil {
            log.Printf("ERROR: couldn't create logger for new topic %s: %s", topic, err)
            continue
        }
    }
}

func newConsumerFileLogger(topic string, cfg *nsq.Config) (*ConsumerFileLogger, error) {
    c, err := nsq.NewConsumer(topic, *channel, cfg)
    if err != nil {
        return nil, err
    }

    c.AddHandler(f)

    err = c.ConnectToNSQDs(nsqdTCPAddrs)
    if err != nil {
        return nil, err
    }

    err = c.ConnectToNSQLookupds(lookupdHTTPAddrs)
    if err != nil {
        return nil, err
    }
}

ConnectToNSQLookupd与nsqlookupd交互

  • 主线程执行ConnectToNSQLookupd
  • ConnectToNSQLookupd将一个nsqlookupd地址添加到此使用者实例的列表中。
    如果它是第一个被添加的,它将启动一次Consumer.lookupdLoop()协程进行搜索生产者
  • 开启queryLookupd()线程来定时开启Consumer.lookupdLoop()协程poll搜索生产者
func (r *Consumer) ConnectToNSQLookupd(addr string) error {
    if atomic.LoadInt32(&r.stopFlag) == 1 {
        return errors.New("consumer stopped")
    }
    if atomic.LoadInt32(&r.runningHandlers) == 0 {
        return errors.New("no handlers")
    }

    if err := validatedLookupAddr(addr); err != nil {
        return err
    }

    atomic.StoreInt32(&r.connectedFlag, 1)

    r.mtx.Lock()
    for _, x := range r.lookupdHTTPAddrs {
        if x == addr {
            r.mtx.Unlock()
            return nil
        }
    }
    r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)
    numLookupd := len(r.lookupdHTTPAddrs)
    r.mtx.Unlock()

    // 第一次处理才开启lookupdLoop线程
    if numLookupd == 1 {
        r.queryLookupd()
        r.wg.Add(1)
        go r.lookupdLoop()
    }

    return nil
}

获取生产者信息

  • nsq_to_file 客户端会请求 nsqlookupd的http接口,获取nsqlookupd分配给消费者的nsqd节点生产者信息
  • 再去连接生产者,并发订阅消息指令
func (r *Consumer) queryLookupd() {
    retries := 0

retry:
    endpoint := r.nextLookupdEndpoint()

    r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint)

    var data lookupResp
    // 请求nsqlookupd 获取分配给此消费者的nsqd节点信息
    err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
    if err != nil {
        r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
        retries++
        if retries < 3 {
            r.log(LogLevelInfo, "retrying with next nsqlookupd")
            goto retry
        }
        return
    }

    var nsqdAddrs []string
    // 获取生产者地址
    for _, producer := range data.Producers {
        broadcastAddress := producer.BroadcastAddress
        port := producer.TCPPort
        joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
        nsqdAddrs = append(nsqdAddrs, joined)
    }
    // apply filter
    if discoveryFilter, ok := r.behaviorDelegate.(DiscoveryFilter); ok {
        nsqdAddrs = discoveryFilter.Filter(nsqdAddrs)
    }
    for _, addr := range nsqdAddrs {
        // 连接生产者,并发订阅消息指令
        err = r.ConnectToNSQD(addr)
        if err != nil && err != ErrAlreadyConnected {
            r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
            continue
        }
    }
}
  • 获取的nsqd,以及topic,chanel信息

    日志1.png
  • qureying完成后进行连接

    日志2.png

Consumer.lookupdLoop()协程poll

  • lookupdLoop()协程会定时去执行queryLookupd()以获取nsqlookupd分配的nsqd生产者信息
// poll all known lookup servers every LookupdPollInterval
func (r *Consumer) lookupdLoop() {
    for {
        select {
        case <-ticker.C:
            r.queryLookupd()
        case <-r.lookupdRecheckChan:
            r.queryLookupd()
        case <-r.exitChan:
            goto exit
        }
    }
}

相关文章

网友评论

    本文标题:nsq源码(11) nsqlookupd与消费者交互

    本文链接:https://www.haomeiwen.com/subject/rwrreqtx.html