美文网首页
11-7 ThreadPoolExecutor线程池

11-7 ThreadPoolExecutor线程池

作者: 正在努力ing | 来源:发表于2018-08-26 15:41 被阅读0次
from concurrent.futures import ThreadPoolExecutor
import time

executor = ThreadPoolExecutor(3)

def get(url,ti):
    print("start is gone")
    time.sleep(ti)
    print(url)


if __name__ == '__main__':
    url = "www.zhouyoulie.com"
    starttime =  time.time()
    task = executor.submit(get, url,2)
    task2 = executor.submit(get, url,3)
    print(task.done())

    time.sleep(2)
    print(task.done())
    print(task2.done())
    print(time.time()-starttime)
>>> 
start is gone
start is gone
False
www.zhouyoulie.com
True
False
2.0011143684387207
www.zhouyoulie.com

接下来我们分析一下源码吧

class ThreadPoolExecutor(_base.Executor):
    def __init__(self, max_workers=None):
        """Initializes a new ThreadPoolExecutor instance.

        Args:
            max_workers: The maximum number of threads that can be used to
                execute the given calls.
        """
        if max_workers is None:
            # Use this number because ThreadPoolExecutor is often
            # used to overlap I/O instead of CPU work.
            max_workers = (os.cpu_count() or 1) * 5
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")

        self._max_workers = max_workers
        self._work_queue = queue.Queue()
        self._threads = set()
        self._shutdown = False
        self._shutdown_lock = threading.Lock()

     def submit(self, fn, *args, **kwargs):
            with self._shutdown_lock: # 上锁,保证下面这段代码的安全
                if self._shutdown:
                    raise RuntimeError('cannot schedule new futures after shutdown')
    
                f = _base.Future()  #生成了一个future实例
                w = _WorkItem(f, fn, args, kwargs)  #把future也放在_WorkItem
    
                self._work_queue.put(w)     # 又把w加到实例的_work_queue,下面的_adjust_thread_count()函数有用到这个
                self._adjust_thread_count() #重要的来了,调整线程数
                return f
        submit.__doc__ = _base.Executor.submit.__doc__
        
    def _adjust_thread_count(self):
        # When the executor gets lost, the weakref callback will wake up
        # the worker threads.
        def weakref_cb(_, q=self._work_queue):
            q.put(None)
        # TODO(bquinlan): Should avoid creating new threads if there are more
        # idle threads than items in the work queue.
        if len(self._threads) < self._max_workers:
            t = threading.Thread(target=_worker,                    这里的_work就是WorkItem中的_work方法,我也不知道是怎么引用到的,但是通过IDE就找到了那里
                                 args=(weakref.ref(self, weakref_cb),
                                       self._work_queue))  #所以这里的work_queue就会传给到是WorkItem中的_work方法,作为参数
            t.daemon = True
            t.start()
            self._threads.add(t)  # 添加线程
            _threads_queues[t] = self._work_queue
            
    def shutdown(self, wait=True):
        with self._shutdown_lock:
            self._shutdown = True
            self._work_queue.put(None)
        if wait:
            for t in self._threads:
                t.join()
    shutdown.__doc__ = _base.Executor.shutdown.__doc__

是不是觉得少了啥,那就是WorkItem

class _WorkItem(object):
    def __init__(self, future, fn, args, kwargs):
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            result = self.fn(*self.args, **self.kwargs)
        except BaseException as e:
            self.future.set_exception(e)
        else:
            self.future.set_result(result)

def _worker(executor_reference, work_queue):   #接收到上面传来的work_queue(这里装的 _WorkItem实例)
    try:
        while True:
            work_item = work_queue.get(block=True)
            if work_item is not None:
                work_item.run()
                # Delete references to object. See issue16284
                del work_item
                continue
            executor = executor_reference()
            # Exit if:
            #   - The interpreter is shutting down OR
            #   - The executor that owns the worker has been collected OR
            #   - The executor that owns the worker has been shutdown.
            if _shutdown or executor is None or executor._shutdown:
                # Notice other workers
                work_queue.put(None)
                return
            del executor
    except BaseException:
        _base.LOGGER.critical('Exception in worker', exc_info=True)

相关文章

网友评论

      本文标题:11-7 ThreadPoolExecutor线程池

      本文链接:https://www.haomeiwen.com/subject/shyoiftx.html