美文网首页
Python之多线程、多进程

Python之多线程、多进程

作者: 天命_风流 | 来源:发表于2020-03-02 00:18 被阅读0次

多线程

多线程的含义

一个进程可以创建多个线程进行并发计算,从而让进程同时处理多个计算逻辑,或者将一个大的计算任务拆分成多个小的计算任务,然后同时计算这些子任务。
通过查阅相关资料,其它某些语言的多线程可以调用多个CPU同时进行运算(并行),但是对于Python来说,它的C解释器设置了一个GIL(全局解释器锁:Global Interpreter Lock),使得同一时刻只有一个线程执行,也就是说,无论你怎么设置你的多线程,CPython的都只能利用一个CPU,这也就无法发挥计算机多核心的优势了。
由于我使用的是CPython,所以下面的Python都是指在C解释器下实现的python。

Python下多线程的作用

由于GIL带来的影响,计算密集型的程序中使用多线程不会带来性能提升,甚至由于线程切换需要耗费资源,它的性能还会下降。
但是Python的多线程依然有它的用武之地,在处理需要等待、需要大量使用慢速设备(内存和硬盘的性能相对于CPU来说非常慢,因此我将这种设备定义为慢速设备)等情况下,多线程仍然有不俗的表现。

如何规避GIL带来的影响呢?有以下几点建议:使用非C的解释器、使用C扩展进行数据运算、使用多进程 ...

编写多线程需要注意的问题

  • 注意各个线程之间的通信问题,这包括是否共享对数据的修改,如何向其它线程发送信息,如何设置信号。
  • 注意线程内部的资源使用,这包括了对临界资源的争抢,如何保持现有数据不被其它线程意外修改,如何对线程进行状态保存。
  • 注意各个线程的调度,这包括了线程启动顺序,如何设置线程之间的同步操作,如何防止程序死锁的问题。

Python的多线程

线程的建立和启动

我们有两种方式建立多线程:直接构建线程,使用类继承构件线程
直接构建线程:

import time
from threading import Thread    #  多线程最常使用的库


def t(n):
    while n > 0:
        print(n)
        n -= 1
        time.sleep(0.5)


if __name__ == '__main__':
    a = Thread(target=t, args=(3,))  # 实例化一个线程
    b = Thread(target=t, args=(3,))
    c = Thread(target=t, args=(3,), daemon=True)  # 实例一个守护线程,守护线程如果不自动退出,只会在程序结束时结束,因此它不可以使用jion
    a.start()  # 开启线程
    b.start()
    c.start()
    a.join()  # 等待线程执行完成,然后才能执行下面的代码
    if a.is_alive():  # 判断线程是否在执行
        print('a仍然执行')
    if b.is_alive():
        print('b仍然执行')
  • join方法:等待一个线程执行完毕,若线程未执行,则一直阻塞等待
  • 守护线程:不可以被join等待的线程,在主程序执行完毕之后会被立即关闭

用类继承:

from threading import Thread
import time


class CountdownThread(Thread):  # 继承
    def __init__(self, n):
        super().__init__()
        self.n = n

    def run(self):
        while self.n > 0:
            print(self.n)
            self.n -= 1
            time.sleep(0.3)


c = CountdownThread(3)
c.start()  # 会执行实例的run方法
d = CountdownThread(3)
d.start()
  • 这种方法对类构建了一层继承关系,这可能会影响代码的灵活性,因此不提倡使用
进程间的信号传递

Event:全局信号

from threading import Thread, Event
import time


def t(x, e):
    e.wait()  # 等待信号
    print(x)


if __name__ == '__main__':
    event = Event()  # 设置一个信号
    t1 = Thread(target=t, args=('函数运行', event))
    t1.start()
    time.sleep(2)
    event.set()  # 传递出信号,所有event.wait()后面的代码开始执行

  • 你可以使用 event.clear()移除信号,这样所有 event.wait()会回复阻塞

Condition:条件变量
https://www.jianshu.com/p/5d2579938517

进程间的数据共享

使用队列:

import threading
import time
import queue


def product():
    while True:
        if que.qsize() == 10:
            print('现在有10个包子,我决定休息一会')
            time.sleep(6)  # 够10个包子就休息6秒
        else:
            que.put(1)
            print('造包子,现在有{}个'.format(que.qsize()))
            time.sleep(0.2)  # 每0.2秒造一个包子


def consumer(name):
    while True:
        que.get()
        print('{}吃了一个包子,还剩下{}个'.format(name, que.qsize()))
        time.sleep(1.2)  # 每1.2秒吃一个包子


que = queue.Queue()  # 实例化一个可以在进程间共享的队列
p = threading.Thread(target=product)
c1 = threading.Thread(target=consumer, args=('选手1',))
c2 = threading.Thread(target=consumer, args=('选手2',))

p.start()
c1.start()
c2.start()
  • .qsize() 获取队列元素的个数
  • .get()从队列中获取数据
  • .put()向队列中放入数据
  • 在实例化一个队列的时候,可以使用maxsize设定队列的最大容量
  • 队列空时使用get 和 队列满时使用put 都会引起队列的阻塞,知道其它地方放入/取走了数据
  • 可以使用 timeout设置阻塞时间,阻塞超时会引发异常,并丢弃当请求(get 或 put)
  • 使用block = False设置为非阻塞状态,遇到队列 空/满 的时候会直接抛出异常并丢弃请求

下面用一个实例解释一下队列中的 task_done 和 join:

import threading
import time
import queue


def product():
    while True:
        if que.qsize() == 10:  # 队列中有10个包子的时候生产者退出生产
            print('现在有10个包子,我回家了')
            break
        else:
            que.put(1)
            print('造包子,现在有{}个'.format(que.qsize()))
            time.sleep(0.1)  # 每0.2秒造一个包子


def consumer(name):
    while True:
        que.get()
        print('{}吃了一个包子,还剩下{}个'.format(name, que.qsize()))
        time.sleep(0.5)  # 每1.2秒吃一个包子
        que.task_done()  # task_done 要和get配合使用,每进行一次get,就要用一次task_done
        if que.qsize() == 0:  # 队列为空时消费者推出消费
            break


que = queue.Queue()  # 实例化一个可以在进程间共享的队列
p = threading.Thread(target=product)
c1 = threading.Thread(target=consumer, args=('选手1',))
c2 = threading.Thread(target=consumer, args=('选手2',))

p.start()
c1.start()
c2.start()
que.join()  # 保持阻塞,直到队列内部的计数器认为队列为空为止,注意队列计数器的规则:遇到put就+1,遇到task_done就-1
print('over')
  • .join会阻塞程序,直到队列计数判断队列为空为止
  • 计数加减的条件:遇到put 就 +1 ,遇到task_done 就 -1
  • join不会判断队列中数据元素的个数,只会根据上面提到的计数条件判断。即使队列仍然有数据,但是join计数器为0时,就会放弃阻塞。
  • 我们可以根据这个特性定制队列的阻塞条件。

多线程的一大问题是多个线程争抢同一个资源,这样争抢的结果可能会引发错误:A线程正在使用资源S,S状态为S1,这时B线程修改了S,状态改为S2,如此一来,A线程的计算结果和S(修改后的状态为S2)不符。
为了解决这种问题,我们需要对临界资源上锁:保证A线程使用完S之后再允许其它线程修改S。

import threading
import time

n = 0#设置n为一个临界资源
n_lock = threading.Lock()#为n设置一把锁

def a(x):
    n_lock.acquire()#上锁,上锁后,所有n_lock.acquire()都会被阻塞
    time.sleep(3)#假设一个程序需要花3秒进行运算
    global n
    n += x
    print(n)
    n_lock.release()#开锁,释放资源

t1 = threading.Thread(target=a,args=(3,))
t2 = threading.Thread(target=a,args=(5,))
t1.start()
t2.start()

当然,伟大的同行已经为我们写好了Lock的上下文环境,所以你可以使用with达到与上面相同的结果:

import threading
import time

n = 0  # 设置n为一个临界资源
n_lock = threading.Lock()  # 为n设置一把锁


def a(x):
    with n_lock:  # 自动用n_lock锁住下面的代码,当代码执行完毕后自动释放
        time.sleep(3)  # 假设一个程序需要花3秒进行运算
        global n
        n += x
        print(n)


t1 = threading.Thread(target=a, args=(3,))
t2 = threading.Thread(target=a, args=(5,))
t1.start()
t2.start()

Lock是一把只能一个请求者使用的锁,我们可以使用Semaphore(信号量)设置使用数量:

from threading import Semaphore
import threading
import time

n_lock = Semaphore(2)  # 允许两个人同时请求n_lock.acquire()
n = 0


def a(x):
    with n_lock:  # 自动用n_lock锁住下面的代码,当代码执行完毕后自动释放
        time.sleep(3)  # 假设一个程序需要花3秒进行运算
        global n
        n += x
        print(n)


t1 = threading.Thread(target=a, args=(3,))
t2 = threading.Thread(target=a, args=(5,))
t3 = threading.Thread(target=a, args=(10,))
t1.start()
t2.start()
t3.start()
  • threading库中还为我们提供了RLock( )和BoundedSemaphore( ),他们的功能和上面提到的Lock和Semaphore类似,但是有一些不同。
  • Condition条件变量内部包含了资源锁和信号传递两个功能。

其实锁和信号量的作用远远不止于对临界资源的保护,他们还可以同步各个进程(线程)之间的进度。具体可以看一看操作系统的相关内容。

保存线程专有状态

如果你需要保存当前运行线程的专有状态,且这个状态对其它线程是不可见的,那就需要使用这个技巧。

import threading

t = threading.local()

def prin():
    t.inner_id = threading.get_ident()
    print(t.inner_id)


t1=threading.Thread(target=prin)
t2=threading.Thread(target=prin)
t3=threading.Thread(target=prin)
prin()
t1.start()
t2.start()
t3.start()
  • 在底层,threading.local()实例为每个线程维护这一个单独的实例字典。所有对实例的常见操作,比如获取、设定以及删除,都是作用于每个线程专有的字典上。这使得不同线程的数据得到隔离。
创建线程池

可以使用threadin库提供的线程池,但是这里我选了另一个库的线程池,下面的代码是自动调用线程池,你可以使用pool.submit()手动为线程池调用代码。

from concurrent.futures import ThreadPoolExecutor  # 引入线程池
import time


def t(n):
    return n ** n ** n


if __name__ == '__main__':
    start = time.time()
    pool = ThreadPoolExecutor(max_workers=4)  # 实例化一个线程池,最多使用4个线程
    # with ThreadPoolExecutor(max_workers=4) as pool:#这行代码等价于上一行代码
    r = pool.map(t, [6 for i in range(20000)])  # 使用这个线程池调用 t(6) 函数 20000次
    pool.shutdown()  # 等待所有线程执行完毕
    print(time.time() - start)  # 一个简单的计算

    start = time.time()
    for i in range(20000):  # 这是直接使用单线程进行20000次 t(6)的计算
        t(6)
    print(time.time() - start)
###############  执行结果 ################
19.726083517074585
17.407222509384155

WTF???为什么线程并发耗费的时间反而比单线程多?
这就是最开始所讲到的,由于GIL的限制,Python只能发挥一个核心的性能,多线程只能发挥一个核心的性能,对于计算密集的程序,反而会因为线程切换增加计算成本。
那么如何破除这种困境呢?解决方案之一就是使用多进程

多进程

多进程的原理和多线程类似,python通过创建多个进程进行计算,由于一个python进程有一个GIL锁,多进程就可以解除这样的限制。这使得Python可以利用到CPU的多个核心。
依然以上一个例子作为对比:

from concurrent.futures import ProcessPoolExecutor  # 引入进程池
import time


def t(n):
    return n ** n ** n


if __name__ == '__main__':
    start = time.time()
    pool = ProcessPoolExecutor(max_workers=8)  # 实例化一个进程池
    # with ProcessPoolExecutor(4) as pool:#这行代码等价于上一行代码
    r = pool.map(t, [6 for i in range(20000)])  # 使用这个进程池调用 t(6) 函数 20000次
    pool.shutdown()  # 等待所有进程执行完毕
    print(time.time() - start)  # 一个简单的计算

    start = time.time()
    for i in range(20000):  # 这是直接使用单线程进行20000次 t(6)的计算
        t(6)
    print(time.time() - start)
###############  执行结果 ################
10.398910760879517
17.622284173965454

我的计算机是8核心,所以我将进程池上限设置为8,你会发现,使用进程池的运行速度比线程池快了将近一倍,这比你想象中的要慢?有以下几点原因:

  • 创建进程是非常耗费资源的,上面代码在后台加载了共8个python程序,这需要花费大量时间。
  • 进程间的通信非常困难,尽管伟大的同行们已经构建了进程通信工具,但是其内部都是使用pickle编码传递信息,相比进程通信,这很低效。
    除此之外,多进程编码还有一些你需要注意的地方:
  • 这种技术只适用于可以将问题直接分解为多个独立子问题的情况。
  • 多进程的任务只能使用函数提交,其它类型的可调用对象(实例的方法,类的方法,lambda等)都不支持并行处理。
  • 并行的函数的参数和返回值必须兼容pickle编码,否则无法进行进程间传递。
  • 进程建立后就很难控制其行为,因此“大量计算,行为简单”的任务是多进程并行的首选。
  • 多进程和多线程结合需要格外小心,一般我们先加载多进程,然后使用多线程。

多进程的编码工具和多线程的工具十分相似,我看到有个小伙伴写的还不错,点这里即可查看。(实际上是我懒)

最后附一份思维导图

并发

相关文章

  • python之多线程与多进程入门

    python之多线程与多进程 关键词: GIL锁,IO繁忙,线程安全,线程同步,进程池,进程通信,队列 GIL锁;...

  • python多进程

    python之多进程 写在前面 前面学习了多线程今天来学习一下多进程,相对于多线程,其实多进程的使用在日常生活中会...

  • Python之多线程、多进程

    多线程 多线程的含义 一个进程可以创建多个线程进行并发计算,从而让进程同时处理多个计算逻辑,或者将一个大的计算任务...

  • python 进程和线程之多进程

    python学习笔记,特做记录,分享给大家,希望对大家有所帮助。 多进程 要让Python程序实现多进程(mult...

  • python多进程

    参考: python并发编程之多进程(实践篇)python中的多线程无法利用多核优势,如果想要充分地使用多核CPU...

  • python 进程和线程之多线程

    python学习笔记,特做记录,分享给大家,希望对大家有所帮助。 多线程 多任务可以由多进程完成,也可以由一个进程...

  • Python札记52_进程和线程2

    在之前的札记Python札记50_进程和线程1中介绍了进程、线程和子进程以及多进程的相关知识,本札记中重点介绍多线...

  • 11-9 多线程和多进程

    Python的GIL是针对进程还是线程?  是线程 Python多核cpu可以运行多线程吗? Python线程执行...

  • Python中的线程与进程

    进程会启动一个解释器进程,线程共享一个解释器进程 Python的线程开发 python的线程开发使用标准库thre...

  • 阿里大数据工程师整理内部Python协程笔记,赶紧保存!

    python线程多并发,是指在一个进程中开启n个线程,以此来达到并发执行任务。但是python中的线程有GIL解释...

网友评论

      本文标题:Python之多线程、多进程

      本文链接:https://www.haomeiwen.com/subject/loofkhtx.html