//Golang:通过Redis消息订阅实现key过期通知
func subs() {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "123456", // 如果没有设置密码,则为空字符串
DB: 0, // 使用默认数据库
})
defer client.Close()
// 修改配置,开启事件监听 ps: 修改配置文件,效果等同
_, err := client.ConfigSet("notify-keyspace-events", "Ex").Result()
if err != nil {
panic(err)
}
for i := 0; i < 10000; i++ {
var s = `{ "act": "toUid","type": "chat","msg": {"id":"` + strconv.Itoa(i) + `","uid": "2","msg": "我很好","touid": "1"}}`
// 设置key,并设置过期时间为10秒
err = client.Set(s, "myvalue", 1*time.Second).Err()
if err != nil {
panic(err)
}
}
//订阅
pubsub := client.Subscribe("__keyevent@0__:expired")
defer pubsub.Close()
// 开启goroutine,接收过期事件
go func() {
for msg := range pubsub.Channel() {
// 处理过期事件
fmt.Println("Key expired:", msg.Payload)
}
}()
select {} //阻塞主进程
}
func main() {
subs()
}
Redis提供了发布订阅功能,可以用于消息的传输,Redis的发布订阅机制包括三个部分,发布者,订阅者和Channel。
发布订阅架构图:
发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。
下面介绍golang如何使用redis的发布订阅功能。
go redis发布订阅常用函数:
Subscribe - 订阅channel
PSubscribe - 订阅channel支持通配符匹配
Publish - 将信息发送到指定的channel。
PubSubChannels - 查询活跃的channel
PubSubNumSub - 查询指定的channel有多少个订阅者
1.Subscribe
订阅channel
例子1:
// 订阅channel1这个channel
sub := client.Subscribe("channel1")
// 读取channel消息
iface, err := sub.Receive()
if err != nil {
// handle error
}
// 检测收到的消息类型
switch iface.(type) {
case *redis.Subscription:
// 订阅成功
case *redis.Message:
// 处理收到的消息
// 这里需要做一下类型转换
m := iface.(redis.Message)
// 打印收到的小
fmt.Println(m.Payload)
case *redis.Pong:
// 收到Pong消息
default:
// handle error
}
例子2:
使用golang channel的方式处理消息
// 订阅channel1这个channel
sub := client.Subscribe("channel1")
// sub.Channel() 返回go channel,可以循环读取redis服务器发过来的消息
for msg := range sub.Channel() {
// 打印收到的消息
fmt.Println(msg.Channel)
fmt.Println(msg.Payload)
}
例子3:
取消订阅
// 订阅channel1这个channel
sub := client.Subscribe("channel1")
// 忽略其他处理逻辑
// 取消订阅
sub.Unsubscribe("channel1")
2.PSubscribe
用法跟Subscribe一样,区别是PSubscribe订阅通道(channel)支持模式匹配。
例子:
// 订阅channel1这个channel
sub := client.PSubscribe("ch_user_*")
// 可以匹配ch_user_开头的任意channel
3.Publish
将消息发送到指定的channel
// 将"message"消息发送到channel1这个通道上
client.Publish("channel1","message")
4.PubSubChannels
查询活跃的channel
// 没有指定查询channel的匹配模式,则返回所有的channel
chs, _ := client.PubSubChannels("").Result()
for _, ch := range chs {
fmt.Println(ch)
}
// 匹配user_开头的channel
chs, _ := client.PubSubChannels("user_*").Result()
5.PubSubNumSub
查询指定的channel有多少个订阅者
// 查询channel1,channel2两个通道的订阅者数量
chs, _ := client.PubSubNumSub("channel1", "channel2").Result()
for ch, count := range chs {
fmt.Println(ch) // channel名字
fmt.Println(count) // channel的订阅者数量
}
发布消息示例1
func (s SerialReader) processSerialData(redisConnection *redis.Client) {
serialConnection := s.newSerialConnection()
reader := bufio.NewReader(serialConnection)
re := regexp.MustCompile("^([^:]+):(.+)$")
for {
reply, err := reader.ReadBytes('\n')
if err != nil {
log.Printf("processSerialData() #1: %s", err)
continue
}
data := strings.TrimSpace(string(reply))
if len(data) == 0 {
continue
}
log.Printf("%s", data)
raw_data := re.FindStringSubmatch(data)
if len(raw_data) == 0 {
continue
}
metrics := strings.Split(raw_data[2], ",")
channel := fmt.Sprintf("rpi-moteino-collector:%s", raw_data[1])
for i := 0; i < len(metrics); i++ {
metric_key_and_value := strings.Split(metrics[i], ":")
if len(metric_key_and_value) != 2 {
continue
}
value := fmt.Sprintf("%s,%d,%s", metric_key_and_value[0], uint64(time.Now().Unix()), metric_key_and_value[1])
_, err := redisConnection.LPush(channel, value)
if err != nil {
log.Printf("processSerialData() #2: %s", err)
}
_, err = redisConnection.Publish(channel, value)
if err != nil {
log.Printf("processSerialData() #3: %s", err)
}
}
_, err = redisConnection.LTrim(channel, 0, 10)
if err != nil {
log.Printf("processSerialData() #4: %s", err)
}
}
}
发布消息示例2: spawnPublisher
func spawnPublisher() error {
var err error
var publisher *redis.Client
publisher = redis.New()
err = publisher.Connect(host, port)
if err != nil {
log.Fatalf("Publisher failed to connect: %s\n", err.Error())
return err
}
log.Println("Publisher connected to redis-server.")
log.Println("Publishing some messages...")
publisher.Publish("channel", "Hello world!")
publisher.Publish("channel", "Do you know how to count?")
for i := 0; i < 3; i++ {
publisher.Publish("channel", i)
}
log.Printf("Closing publisher...\n")
publisher.Quit()
return nil
}










网友评论