美文网首页
Golang:Redis消息订阅

Golang:Redis消息订阅

作者: yichen_china | 来源:发表于2023-08-22 21:13 被阅读0次
//Golang:通过Redis消息订阅实现key过期通知

func subs() {

    client := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "123456", // 如果没有设置密码,则为空字符串
        DB:       0,        // 使用默认数据库
    })

    defer client.Close()

    // 修改配置,开启事件监听 ps: 修改配置文件,效果等同
    _, err := client.ConfigSet("notify-keyspace-events", "Ex").Result()
    if err != nil {
        panic(err)
    }
    for i := 0; i < 10000; i++ {
        var s = `{ "act": "toUid","type": "chat","msg": {"id":"` + strconv.Itoa(i) + `","uid": "2","msg": "我很好","touid": "1"}}`
        // 设置key,并设置过期时间为10秒
        err = client.Set(s, "myvalue", 1*time.Second).Err()
        if err != nil {
            panic(err)
        }
    }

    //订阅
    pubsub := client.Subscribe("__keyevent@0__:expired")
    defer pubsub.Close()

    // 开启goroutine,接收过期事件
    go func() {
        for msg := range pubsub.Channel() {
            // 处理过期事件
            fmt.Println("Key expired:", msg.Payload)
        }
    }()
    select {} //阻塞主进程

}
func main() {
    subs()
}

Redis提供了发布订阅功能,可以用于消息的传输,Redis的发布订阅机制包括三个部分,发布者,订阅者和Channel。

发布订阅架构图:

发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。

下面介绍golang如何使用redis的发布订阅功能。

go redis发布订阅常用函数:

Subscribe - 订阅channel
PSubscribe - 订阅channel支持通配符匹配
Publish - 将信息发送到指定的channel。
PubSubChannels - 查询活跃的channel
PubSubNumSub - 查询指定的channel有多少个订阅者
1.Subscribe
订阅channel

例子1:

// 订阅channel1这个channel
sub := client.Subscribe("channel1")

// 读取channel消息
iface, err := sub.Receive()
if err != nil {
    // handle error
}

// 检测收到的消息类型
switch iface.(type) {
case *redis.Subscription:
    // 订阅成功
case *redis.Message:
    // 处理收到的消息
    // 这里需要做一下类型转换
    m := iface.(redis.Message)
    // 打印收到的小
    fmt.Println(m.Payload)
case *redis.Pong:
    // 收到Pong消息
default:
    // handle error
}

例子2:
使用golang channel的方式处理消息

// 订阅channel1这个channel
sub := client.Subscribe("channel1")

// sub.Channel() 返回go channel,可以循环读取redis服务器发过来的消息
for msg := range sub.Channel() {
    // 打印收到的消息
    fmt.Println(msg.Channel)
    fmt.Println(msg.Payload)
}

例子3:
取消订阅

// 订阅channel1这个channel
sub := client.Subscribe("channel1")

// 忽略其他处理逻辑

// 取消订阅
sub.Unsubscribe("channel1")

2.PSubscribe
用法跟Subscribe一样,区别是PSubscribe订阅通道(channel)支持模式匹配。

例子:

// 订阅channel1这个channel
sub := client.PSubscribe("ch_user_*")
// 可以匹配ch_user_开头的任意channel
3.Publish
将消息发送到指定的channel

// 将"message"消息发送到channel1这个通道上
client.Publish("channel1","message")

4.PubSubChannels
查询活跃的channel

// 没有指定查询channel的匹配模式,则返回所有的channel
chs, _ := client.PubSubChannels("").Result()
for _, ch := range chs {
    fmt.Println(ch)
}

// 匹配user_开头的channel
chs, _ := client.PubSubChannels("user_*").Result()

5.PubSubNumSub
查询指定的channel有多少个订阅者

// 查询channel1,channel2两个通道的订阅者数量
chs, _ := client.PubSubNumSub("channel1", "channel2").Result()
for ch, count := range chs {
    fmt.Println(ch) // channel名字
    fmt.Println(count) // channel的订阅者数量
}

发布消息示例1

func (s SerialReader) processSerialData(redisConnection *redis.Client) {
    serialConnection := s.newSerialConnection()
    reader := bufio.NewReader(serialConnection)
    re := regexp.MustCompile("^([^:]+):(.+)$")

    for {
        reply, err := reader.ReadBytes('\n')

        if err != nil {
            log.Printf("processSerialData() #1: %s", err)
            continue
        }

        data := strings.TrimSpace(string(reply))

        if len(data) == 0 {
            continue
        }

        log.Printf("%s", data)
        raw_data := re.FindStringSubmatch(data)

        if len(raw_data) == 0 {
            continue
        }

        metrics := strings.Split(raw_data[2], ",")
        channel := fmt.Sprintf("rpi-moteino-collector:%s", raw_data[1])

        for i := 0; i < len(metrics); i++ {
            metric_key_and_value := strings.Split(metrics[i], ":")

            if len(metric_key_and_value) != 2 {
                continue
            }

            value := fmt.Sprintf("%s,%d,%s", metric_key_and_value[0], uint64(time.Now().Unix()), metric_key_and_value[1])

            _, err := redisConnection.LPush(channel, value)
            if err != nil {
                log.Printf("processSerialData() #2: %s", err)
            }

            _, err = redisConnection.Publish(channel, value)
            if err != nil {
                log.Printf("processSerialData() #3: %s", err)
            }
        }

        _, err = redisConnection.LTrim(channel, 0, 10)
        if err != nil {
            log.Printf("processSerialData() #4: %s", err)
        }
    }
}

发布消息示例2: spawnPublisher

func spawnPublisher() error {
    var err error
    var publisher *redis.Client

    publisher = redis.New()

    err = publisher.Connect(host, port)

    if err != nil {
        log.Fatalf("Publisher failed to connect: %s\n", err.Error())
        return err
    }

    log.Println("Publisher connected to redis-server.")

    log.Println("Publishing some messages...")

    publisher.Publish("channel", "Hello world!")
    publisher.Publish("channel", "Do you know how to count?")

    for i := 0; i < 3; i++ {
        publisher.Publish("channel", i)
    }

    log.Printf("Closing publisher...\n")

    publisher.Quit()

    return nil
}

相关文章

网友评论

      本文标题:Golang:Redis消息订阅

      本文链接:https://www.haomeiwen.com/subject/pdlymdtx.html