python rq是一个简洁的任务队列框架。比celery更加轻量可扩展。
Queue的类成员默认有以下两个
redis_queue_namespace_prefix = 'rq:queue:'
redis_queues_keys = 'rq:queues'
key是self._key
prefix = self.redis_queue_namespace_prefix
name="default"
def key()
return self._key = '{0}{1}'.format(prefix, name) # rq:queue:default
入队列的操作都会走到enqueue_job(self, job, pipeline=None, at_front=False)函数
enqueue_job
def enqueue_job(self, job, pipeline=None, at_front=False):
"""Enqueues a job for delayed execution.
If Queue is instantiated with is_async=False, job is executed immediately.
"""
pipe = pipeline if pipeline is not None else self.connection.pipeline()
# Add Queue key set
pipe.sadd(self.redis_queues_keys, self.key) # 给集合rq:queues 添加rq:queue:default
job.set_status(JobStatus.QUEUED, pipeline=pipe)
job.origin = self.name
job.enqueued_at = utcnow()
if job.timeout is None:
job.timeout = self._default_timeout
job.save(pipeline=pipe)
job.cleanup(ttl=job.ttl, pipeline=pipe)
if self._is_async:
self.push_job_id(job.id, pipeline=pipe, at_front=at_front)
if pipeline is None:
pipe.execute()
if not self._is_async:
job = self.run_job(job)
return job
rq主要使用Redis的List数据结构作为任务的存储结构。 lpush、rpush用于将任务加入列表,blpop、lpop分别可以阻塞和非阻塞从列表里面读取任务。Worker会使用os.fork创建子进程执行任务。
def fork_work_horse(self, job, queue):
"""Spawns a work horse to perform the actual work and passes it a job.
"""
child_pid = os.fork()
os.environ['RQ_WORKER_ID'] = self.name
os.environ['RQ_JOB_ID'] = job.id
if child_pid == 0: # 子进程
self.main_work_horse(job, queue)
else:
self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
如果你开启了默认的异常处理,那么出现的异常会通过FailedJobRegistry添加到redis存储起来
if not self.disable_default_exception_handler:
failed_job_registry = FailedJobRegistry(job.origin, job.connection,
job_class=self.job_class)
failed_job_registry.add(job, ttl=job.failure_ttl,
exc_string=exc_string, pipeline=pipeline)
调用完默认的处理还会调用用户自定义的异常处理
def handle_exception(self, job, *exc_info):
"""Walks the exception handler stack to delegate exception handling."""
exc_string = Worker._get_safe_exception_string(
traceback.format_exception_only(*exc_info[:2]) + traceback.format_exception(*exc_info)
)
self.log.error(exc_string, exc_info=True, extra={
'func': job.func_name,
'arguments': job.args,
'kwargs': job.kwargs,
'queue': job.origin,
})
for handler in self._exc_handlers:
self.log.debug('Invoking exception handler %s', handler)
fallthrough = handler(job, *exc_info)
# Only handlers with explicit return values should disable further
# exc handling, so interpret a None return value as True.
if fallthrough is None:
fallthrough = True
if not fallthrough:
break
下面是一个用户自定义异常处理的例子
import traceback
from rq.job import Job
from rq.connections import get_current_connection
from rq.job import JobStatus
from rq.registry import FailedJobRegistry
from . import PointProcessExitType
def move_to_failed_queue(job, *exc_info):
exc_string = ''.join(traceback.format_exception_only(*exc_info[:2]) +
traceback.format_exception(*exc_info))
print('进程意外终止,没有覆盖的异常处理', "-"*100)
print(exc_string)
print('-'*100)
with get_current_connection().pipeline() as pipeline:
job.set_status(JobStatus.FAILED, pipeline=pipeline)
failed_job_registry = FailedJobRegistry(job.origin, job.connection,
job_class=Job)
failed_job_registry.add(job, ttl=job.failure_ttl,
exc_string=exc_string, pipeline=pipeline)
try:
pipeline.execute()
except Exception as e:
print('move_to_failed_queue error', e)
traceback.print_exc()
def my_handler(job, exc_type, exc_value, traceback):
"""处理异常 判断到是人为的终止进程就不会将异常写入到数据库"""
if str(exc_value) == PointProcessExitType.EXIT_BY_CALLING_PAUSE:
print("task_exception.py:my_handler->是暂停任务的异常, 忽略")
return False # 返回false不会走move_to_failed_queue
else:
return True
在worker中会调用Job的fetch方法得到job,
@classmethod
def fetch(cls, id, connection=None):
"""Fetches a persisted job from its corresponding Redis key and
instantiates it.
"""
job = cls(id, connection=connection)
job.refresh()
return job
得到一个job之后会调用里面的执行函数func
def _unpickle_data(self):
self._func_name, self._instance, self._args, self._kwargs = unpickle(self.data)
@property
def func(self):
func_name = self.func_name
if func_name is None:
return None
if self.instance:
return getattr(self.instance, func_name)
return import_attribute(self.func_name)
在执行的时候会通过pickle进行loads,在pickle里实现了自动导入相关依赖模块。
一个job在保存到redis的时候, 先使用pickle进行序列化,然后会使用zlib压缩保存到redis
obj['data'] = zlib.compress(self.data)










网友评论