美文网首页
golang 基于channel的生产者消费者实现

golang 基于channel的生产者消费者实现

作者: 樹上一隻貓 | 来源:发表于2019-11-30 15:47 被阅读0次

背景


公司开发新系统,使用go语言。

模式为来一个客户给部署一套,用户在万个以下。

为方便部署,要求单可执行文件。

分模块之后耦合严重,单独部署消息队列服务没有必要,且会增加部署成本。

所以自己手写了一个生产者消费者的库,整合到程序中。

核心代码


核心结构体的定义
// 结构定义
type Broker struct {
    event      chan interface{} // 接收事件的管道
    handlers   []func(interface{})  // 处理事件的方法
    handlersMu sync.RWMutex // 添加方法时的锁
    Name       string // 名称
    wait       *sync.WaitGroup // wait group 用于停止时等待所有事件处理完成
    onceStart  sync.Once // 确保只启动一次
    onceStop   sync.Once // 确保只关闭一次
}
启动代码 事件处理循环
func (b *Broker) Start() {
    b.onceStart.Do(func() {
        b.wait.Add(1)
        go func() {
            for {
                event, ok := <-b.event
                if ok {
                    // 事件分发
                    b.handlersMu.RLock()
                    for _, v := range b.handlers {
                        v(event) // 有recover
                    }
                    b.handlersMu.RUnlock()
                } else {
                    // 通道已经关闭
                    b.wait.Done()
                    return
                }
            }
        }()
    })
}
创建并启动
// NewStartedBroker 创建broker,并开始
func NewStartedBroker(name string, chanBuf int) *Broker {
    b := &Broker{
        event:    make(chan interface{}, chanBuf),
        handlers: make([]func(interface{}), 0),
        Name:     name,
        wait:     &sync.WaitGroup{},
    }
    b.Start()
    return b
}
注册处理方法
// Register 注册事件
func (b *Broker) Register(ctx context.Context, f func(interface{})) (err error) {
    b.handlersMu.Lock()
    defer b.handlersMu.Unlock()
    b.handlers = append(b.handlers, func(o interface{}) {
        defer func() {
            if err := recover(); err != nil {
                err = fmt.Errorf("panic on broker handler msg name:%v err:%v msg:%v", b.Name, err, o)
                log.Println(err)
            }
        }()
        f(o)
    })
    return nil
}
事件发送
// Send 注册事件
func (b *Broker) Send(ctx context.Context, o interface{}) (err error) {
    defer func() {
        if errs := recover(); errs != nil {
            err = fmt.Errorf("消息处理异常 name:%v msg:%v err:%v", b.Name, o, errs)
            log.Println(err)
        }
    }()
    b.event <- o
    return
}
关闭方法
// Stop 调用stop之前确保写入方都已经退出了,不然要panic
func (b *Broker) Stop() {
    b.onceStop.Do(func() {
        close(b.event)
        b.wait.Wait()
    })
}
删除所有处理方法
func (b *Broker) Clear() {
    b.handlersMu.Lock()
    defer b.handlersMu.Unlock()
    b.handlers = b.handlers[0:0]
}

链接

项目代码 https://github.com/krilie/go-smq

相关文章

  • golang 基于channel的生产者消费者实现

    背景 公司开发新系统,使用go语言。 模式为来一个客户给部署一套,用户在万个以下。 为方便部署,要求单可执行文件。...

  • Golang Channel实现

    Channel是Golang实现CSP的核心。 基于channel通信主要涉及buf(数据)和sendq、recv...

  • golang通用连接池的实现

    golang的channel除了goroutine通信之外还有很多其他的功能,本文将实现一种基于channel的通...

  • Golang源码之Channel

    引用 图解Golang的channel底层原理 深入理解Golang Channel Go语言设计与实现-Chan...

  • channel的关闭

    1.生产者和消费者 2.关闭channel

  • 2019-04-24

    基于生产者与消费者的多线程Python实现 生产者生产,消费者消费,都是针对二者共同的钱包,生产者生产的钱放入钱包...

  • Golang channel 的实现原理

    Golang channel 的实现原理 Channel 是golang语言自身提供的一种非常重要的语言特性, 它...

  • Golang RabbitMQ work模式(并发消费) ---

    Golang RabbitMQ work模式, 实现多个消费者并发消费。 说明:P 代表生产者 , C1、C2 代...

  • Dubbo起步

    目录 定义接口 生产者生产者依赖生产者实现 消费者消费者依赖消费者实现 初步印象 定义接口 生产者 生产者依赖 生...

  • SynchronousQueue

    这个队列没有容量,一条也没有。基于生产者-消费者模式,可实现同步阻塞的功能。生产者生产数据后,如果没有消费者进行消...

网友评论

      本文标题:golang 基于channel的生产者消费者实现

      本文链接:https://www.haomeiwen.com/subject/dssowctx.html