美文网首页
Python RQ一个简洁的消息队列框架

Python RQ一个简洁的消息队列框架

作者: 霡霂976447044 | 来源:发表于2020-01-03 14:31 被阅读0次

python rq是一个简洁的任务队列框架。比celery更加轻量可扩展。
Queue的类成员默认有以下两个

redis_queue_namespace_prefix = 'rq:queue:'
redis_queues_keys = 'rq:queues'

key是self._key

prefix = self.redis_queue_namespace_prefix
name="default"
def key()
  return self._key = '{0}{1}'.format(prefix, name) # rq:queue:default

入队列的操作都会走到enqueue_job(self, job, pipeline=None, at_front=False)函数
enqueue_job

    def enqueue_job(self, job, pipeline=None, at_front=False):
        """Enqueues a job for delayed execution.

        If Queue is instantiated with is_async=False, job is executed immediately.
        """
        pipe = pipeline if pipeline is not None else self.connection.pipeline()

        # Add Queue key set 
        pipe.sadd(self.redis_queues_keys, self.key) # 给集合rq:queues 添加rq:queue:default
        job.set_status(JobStatus.QUEUED, pipeline=pipe)

        job.origin = self.name
        job.enqueued_at = utcnow()

        if job.timeout is None:
            job.timeout = self._default_timeout
        job.save(pipeline=pipe)
        job.cleanup(ttl=job.ttl, pipeline=pipe)

        if self._is_async:
            self.push_job_id(job.id, pipeline=pipe, at_front=at_front)

        if pipeline is None:
            pipe.execute()

        if not self._is_async:
            job = self.run_job(job)

        return job

rq主要使用Redis的List数据结构作为任务的存储结构。 lpushrpush用于将任务加入列表,blpoplpop分别可以阻塞和非阻塞从列表里面读取任务。Worker会使用os.fork创建子进程执行任务。

    def fork_work_horse(self, job, queue):
        """Spawns a work horse to perform the actual work and passes it a job.
        """
        child_pid = os.fork()
        os.environ['RQ_WORKER_ID'] = self.name
        os.environ['RQ_JOB_ID'] = job.id
        if child_pid == 0:  # 子进程
            self.main_work_horse(job, queue)
        else: 
            self._horse_pid = child_pid
            self.procline('Forked {0} at {1}'.format(child_pid, time.time()))

如果你开启了默认的异常处理,那么出现的异常会通过FailedJobRegistry添加到redis存储起来

 if not self.disable_default_exception_handler:
                failed_job_registry = FailedJobRegistry(job.origin, job.connection,
                                                        job_class=self.job_class)
                failed_job_registry.add(job, ttl=job.failure_ttl,
                                        exc_string=exc_string, pipeline=pipeline)

调用完默认的处理还会调用用户自定义的异常处理

    def handle_exception(self, job, *exc_info):
        """Walks the exception handler stack to delegate exception handling."""
        exc_string = Worker._get_safe_exception_string(
            traceback.format_exception_only(*exc_info[:2]) + traceback.format_exception(*exc_info)
        )
        self.log.error(exc_string, exc_info=True, extra={
            'func': job.func_name,
            'arguments': job.args,
            'kwargs': job.kwargs,
            'queue': job.origin,
        })

        for handler in self._exc_handlers:
            self.log.debug('Invoking exception handler %s', handler)
            fallthrough = handler(job, *exc_info)

            # Only handlers with explicit return values should disable further
            # exc handling, so interpret a None return value as True.
            if fallthrough is None:
                fallthrough = True

            if not fallthrough:
                break

下面是一个用户自定义异常处理的例子

import traceback

from rq.job import Job
from rq.connections import get_current_connection
from rq.job import JobStatus
from rq.registry import FailedJobRegistry

from . import PointProcessExitType


def move_to_failed_queue(job, *exc_info):
    exc_string = ''.join(traceback.format_exception_only(*exc_info[:2]) +
                         traceback.format_exception(*exc_info))
    print('进程意外终止,没有覆盖的异常处理', "-"*100)
    print(exc_string)
    print('-'*100)
    with get_current_connection().pipeline() as pipeline:
        job.set_status(JobStatus.FAILED, pipeline=pipeline)

        failed_job_registry = FailedJobRegistry(job.origin, job.connection,
                                                job_class=Job)
        failed_job_registry.add(job, ttl=job.failure_ttl,
                                exc_string=exc_string, pipeline=pipeline)
        try:
            pipeline.execute()
        except Exception as e:
            print('move_to_failed_queue error', e)
            traceback.print_exc()


def my_handler(job, exc_type, exc_value, traceback):
    """处理异常 判断到是人为的终止进程就不会将异常写入到数据库"""
    if str(exc_value) == PointProcessExitType.EXIT_BY_CALLING_PAUSE:
        print("task_exception.py:my_handler->是暂停任务的异常, 忽略")
        return False  # 返回false不会走move_to_failed_queue
    else:
        return True

在worker中会调用Job的fetch方法得到job,


    @classmethod
    def fetch(cls, id, connection=None):
        """Fetches a persisted job from its corresponding Redis key and
        instantiates it.
        """
        job = cls(id, connection=connection)
        job.refresh()
        return job

得到一个job之后会调用里面的执行函数func

   def _unpickle_data(self):
        self._func_name, self._instance, self._args, self._kwargs = unpickle(self.data)

    @property
    def func(self):
        func_name = self.func_name
        if func_name is None:
            return None

        if self.instance:
            return getattr(self.instance, func_name)

        return import_attribute(self.func_name)

在执行的时候会通过pickle进行loads,在pickle里实现了自动导入相关依赖模块。

一个job在保存到redis的时候, 先使用pickle进行序列化,然后会使用zlib压缩保存到redis

obj['data'] = zlib.compress(self.data)

相关文章

网友评论

      本文标题:Python RQ一个简洁的消息队列框架

      本文链接:https://www.haomeiwen.com/subject/zjzamctx.html