美文网首页技术文技术干货
NSQ学习:实现有序的消息队列

NSQ学习:实现有序的消息队列

作者: imxyb | 来源:发表于2019-06-11 22:51 被阅读0次

NSQ是一个内存+磁盘型的消息中间件,它使用push流的方式源源不断把消息推送给客户端,并且为了使服务端更加简单、高效,NSQ并不提供有序的消息队列。因此,如果对消息有顺序要求,只有两种解决办法:

  1. 改用类似kafka之类的有序消息队列;
  2. 生产者和消费者达成一个协议,比如增加一个序列号或者时间戳来表示顺序。

本文要介绍的是第二种方法,下面就来简单用golang实现一个有序的NSQ顺序消息队列。

首先,使用go-nsq客户端写一个main函数,并且连接上nsqlookupd:

func main() {
    cfg := nsq.NewConfig()
    cfg.LookupdPollInterval = time.Second
    customer, err := nsq.NewConsumer("test2", "t1", cfg)
    if err != nil {
        log.Panic(err)
    }
    customer.AddHandler(&Customter{})
    customer.SetLogger(nil, 0)
    if err := customer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
        log.Panic(err)
    }
    select {}
}

这样就表示这个消费者客户端订阅了test2这个topic的t1channel了。

接下来,我们与生产者端协商消息的格式是一个JSON文本字符串,并且这个JSON会传递一个时间戳用来表示消息的先后顺序,那么我们可以定义一个实现了sort.Interface的切片:

type MyMessage struct {
    Name     string `json:"name"`
    CreateAt int64  `json:"create_at"`
}

type MyMessageList []MyMessage

func (list MyMessageList) Len() int {
    return len(list)
}

func (list MyMessageList) Less(i, j int) bool {
    return list[i].CreateAt < list[j].CreateAt
}

func (list MyMessageList) Swap(i, j int) {
    list[i], list[j] = list[j], list[i]
}

sort.Interface有三个方法,分别是LenLessSwap。当实现了这三个方法,MyMessageList这个切片就能够把里面的MyMessage按照createAt字段的升序来进行排序。

最后,我们定义一个Customer结构体的HandleMessage方法来接收NSQ生产者的消息:

type Customter struct{}

// 存放消息结构体
var messageBuffer []MyMessage

func (c *Customter) HandleMessage(nsqMsg *nsq.Message) error {
    var msg MyMessage
    err := json.Unmarshal(nsqMsg.Body, &msg)
    if err != nil {
        log.Panicln(err)
    }
    messageBuffer = append(messageBuffer, msg)
    if len(messageBuffer) == 3 {
        sort.Sort(MyMessageList(messageBuffer))
        // do something for the ordered message buffer
        fmt.Println(messageBuffer)
        // reset buffer
        messageBuffer = messageBuffer[:0]
    }
    return nil
}

上面代码非常简单,每次收到一条消息,我们都会先把它放到一个buffer里面,然后当buffer的长度达到3的时候,就调用sort.Sort来进行一次排序,最后就能顺利拿到3个有序的消息,就可以做相应的操作了。

我们来简单实验一下,首先启动客户端(nsqd和nsqlookupd的启动参考官网文档,这里不介绍):

 $ ~/code/golang/src/nsq-test > go run main.go

然后在终端通过curl给nsqd的test2 topic生产3条数据:

curl -d '{"name":"aaa","create_at":1560262051}' http://127.0.0.1:4151/pub\?topic\=test2
curl -d '{"name":"bbb","create_at":1560262060}' http://127.0.0.1:4151/pub\?topic\=test2
curl -d '{"name":"ccc","create_at":1560262053}' http://127.0.0.1:4151/pub\?topic\=test2

注意,这里按照时间戳先后应该是aaacccbbb

最后,我们回到启动客户端的终端查看,会发现打印出正确的结果:

# aaa ccc bbb 顺序是正确的
[{aaa 1560262051} {ccc 1560262053} {bbb 1560262060}]

到此就大功告成了~

完整代码在此:

package main

import (
    "encoding/json"
    "fmt"
    "github.com/nsqio/go-nsq"
    "log"
    "sort"
    "time"
)

type MyMessage struct {
    Name     string `json:"name"`
    CreateAt int64  `json:"create_at"`
}

type MyMessageList []MyMessage

func (list MyMessageList) Len() int {
    return len(list)
}

func (list MyMessageList) Less(i, j int) bool {
    return list[i].CreateAt < list[j].CreateAt
}

func (list MyMessageList) Swap(i, j int) {
    list[i], list[j] = list[j], list[i]
}

type Customter struct{}

var messageBuffer []MyMessage

func (c *Customter) HandleMessage(nsqMsg *nsq.Message) error {
    var msg MyMessage
    err := json.Unmarshal(nsqMsg.Body, &msg)
    if err != nil {
        log.Panicln(err)
    }
    messageBuffer = append(messageBuffer, msg)
    if len(messageBuffer) == 3 {
        sort.Sort(MyMessageList(messageBuffer))
        // do something for the ordered message buffer
        fmt.Println(messageBuffer)
        // reset buffer
        messageBuffer = messageBuffer[:0]
    }
    return nil
}

func main() {
    cfg := nsq.NewConfig()
    cfg.LookupdPollInterval = time.Second
    customer, err := nsq.NewConsumer("test2", "t1", cfg)
    if err != nil {
        log.Panic(err)
    }
    customer.AddHandler(&Customter{})
    customer.SetLogger(nil, 0)
    if err := customer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
        log.Panic(err)
    }
    select {}
}

相关文章

  • NSQ学习:实现有序的消息队列

    NSQ是一个内存+磁盘型的消息中间件,它使用push流的方式源源不断把消息推送给客户端,并且为了使服务端更加简单、...

  • docker 搭建nsq集群

    nsq简介 nsq是go实现的高性能消息队列,部署相当简单。 一.搭建nsq集群 1.拉取docker镜像 2.启...

  • 实操笔记:为 NSQ 配置监控服务的心路历程

    在 Go 语言实现的实时消息队列中, NSQ 的热度可以排第一。 NSQ 这款消息中间件简单易用,其设计目标是为在...

  • NSQ消息队列

    1.安装 根据官方安装指引页面下载最新稳定版的二进制包https://nsq.io/deployment/inst...

  • nsq消息队列

    一、安装nsq: 安装godepgo get github.com/kr/godep 安装assertgo get...

  • NSQ 消息队列

    异步任务与消息队列 同步任务:一定要等任务执行完了,得到结果,才执行下一个任务。 异步任务:不用等任务执行完,直接...

  • Go消息中间件Nsq系列(八)------topic(主题发布)

    上一篇: Go消息中间件Nsq系列(七)------go-diskqueue 文件队列实现 1. Topic/Ch...

  • 23.NSQ

    NSQ是目前比较流行的一个分布式的消息队列,本文主要介绍了NSQ及Go语言如何操作NSQ。 组件: nsqdloo...

  • nsq 消息队列设计

    消息传递设计 nsqlookupd:作用类似nsqd的注册中心,也复制给消费者提供nsqd地址 nsqd:管理to...

  • 消息队列NSQ使用

    NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,代码托管在GitHub。NSQ可用于大规模...

网友评论

    本文标题:NSQ学习:实现有序的消息队列

    本文链接:https://www.haomeiwen.com/subject/urfhfctx.html