美文网首页
python celery 提供广播服务

python celery 提供广播服务

作者: 天空蓝雨 | 来源:发表于2021-05-13 15:39 被阅读0次

这个基于 场景: 集群每个机器内存实时生效( 针对一个加载文件到内存的问题,有一个通知操作,通知每台机器从新加载文件)
为什么使用 广播, 而不是每次都直接读最新的文件?
每次读文件:缺点
消耗 io, 大量请求每次打开文件并不合理。打开文件读取的内容比较耗时,所以只能加载一次
广播 优点
及时性,高效性,解耦性

广播通知就比较好实现了:
把任务直接写在 广播任务里, 但是这里我需要更细致的操作就是:我要修改服务的内存,一个 类的属性。因为这个属性是从文件加载的,因为服务 进程class内存是固定的地址( 单进程, 多进程还是需要做内存同步,共享内存等),所以我觉得直接通知修改内存是最直接的。

通知的 celery 和服务不在一个进程,没法直接修改服务的内存的
于是我想了一个方法,就是直接 requests 请求 本机 127.0.0.1 的一个更新内存的接口( 接口导入那个 class,修改类属性就可以影响这个服务进程的 内存了)。 因为我的 consumer 和 服务是在同一台机器的。这就好办了。所有的请求 url 都设为 127.0.0.1 即可。

其他方法: 一开始也想就是,一台机器收到更新内存操作,直接向其他机器发 请求,但是这个会增加 运维工作量, 每台部署机器都要 更新ip 名单,而且所有的通知都放在一台机器,势必增加失败的风险。

注意: 这里我使用 celery ,是一个分布式的,所以celery 运行的和服务根本不在 一个进程。
我傻逼了,第一次定义普通任务不生效是因为

遇到的问题:

  • req = requests.get("http://127.0.0.1/boardCast/change_calss_2", params=request_params)
    由于服务接口是需要参数校验的(一开始忘记中间件里有全局校验),所以需要传对应的参数哦,一开始忘记了,后面才发现 task 有报错

  • 一开始我用 普通任务尝试的,发现是可以的,但是后面切换到广播任务就不行了,服务一直收不到任务,后来搜索发现,可能是由于 redis 不支持 广播任务( 但是现在这个结论是错误的,redis 是支持广播的, 本文最后会讲),切换为 rabbitmq ( 具体看 celery 文档,和 redis 用法有点不一样,就是 rabbitmq 配置问题)还是不行啊,于是想不是这里的问题,后来发现 app.conf.task_routes 没有一直没改,而我的任务函数名字都变了。草,自己傻逼了。


    就是 任务函数路径,一直忘记改了

    ok ,其他问题暂时没有了

redis celery 广播:
正确的方式:

app = Celery('bbk_server', broker=f"redis://:{redis_pass}@{redis_server}:{redis_port}/{redis_database_name}")

q = Queue('bbk_broadcast', Exchange('bbk_broadcast', type='fanout'), )
app.conf.task_queues = (q, ) 
app.conf.task_routes = {
    'common.tasks.notice_0': {
        'queue': 'bbk_broadcast',
    }
}

上述, rabbitmq 和 redis 都可以实现。
下面的试过了,只能用 rabbitmq:

app.conf.task_queues = (Broadcast("bbk_broadcast"),)

目前不知道啥原因,有时间在定位,估计是个 bug, issue 有一个人说 redis 下面不起作用,上面的有效.

关于 celery 的任务路由配置,大体有三个
1 直接指定管道
2 指定 exchange 的值
3 指定 routing_key 的值( fanout 不需要指定 )

定义 队列
app.conf.task_queues = (
    Queue('default',    routing_key='task.#'),
    Queue('feed_tasks', routing_key='feed.#'),
)

定义任务路由到队列 (queue  和  exchange   和 routing_key  都要满足才转发   类似 py  all() 函数 )
task_routes = {
        'feeds.tasks.import_feed': {
            'queue': 'feed_tasks',
            'routing_key': 'feed.import',
        },
}
将 'feeds.tasks.import_feed' 任务路由到   feed_tasks队列

使服务器z专门使用指定队列的任务( -Q 指定):
celery -A proj worker -Q feed_tasks

相关文章

网友评论

      本文标题:python celery 提供广播服务

      本文链接:https://www.haomeiwen.com/subject/fcjbvctx.html