美文网首页
python线程的实现,线程池

python线程的实现,线程池

作者: 昆仑草莽 | 来源:发表于2019-05-07 09:28 被阅读0次

python通过标准库threading实现多线程的运行。
程序的运行总要考虑并发,并行数。在多线程程序中为了确保程序在运行中出现争抢资源的现象,使用线程锁或者线程池来规避资源的争抢。

线程的实现

Python通过两个标准库_thread 和threading,提供对线程的支持 , threading对_thread进行了封装。threading模块中提供了Thread , Lock ,RLock , Condition等组件
因此在实际的使用中我们一般都是使用threading
Thread类的说明:
参数说明:

参数 描述
target 表示调用对象,即子线程要执行的任务
name 子线程的名称
args 传入target函数的位置参数,是一个元组,参数后面必须加 逗号

常用实例方法:

实例方法 描述
Thread.run(self) 线程启动时运行的方法,由该方法调用target参数所指定的函数
Thread.start(self) 启动线程,start就是帮你去调用run方法
Thread.terminate(self) 强制终止线程
Thread.join(self,timeout=None) 阻塞调用,主线程进行等待
Thread.setDeamon(self,deamonic) 将子线程设置为守护线程
Thread.getname(self,name) 获取线程名称
Thread.setname(self,name) 设置线程名称

创建线程:
在python中由两种方法创建线程,实例Thread类和重写Thread类
1、实例Thread类

import threading
import time
#定义线程要运行的函数
def thread_print(startname,endname):
    print('我是: {}'.format(startname))
    time.sleep(2) #为了便于观察,使程序睡2秒
    print('线程结束')

#建立线程实例,args是一个元组,必须加逗号
t1 = threading.Thread(target=thread_print,args=('一','二'))
t2 = threading.Thread(target=thread_print,args=('开始',0))
t1.setDaemon(True)
#t2.setDaemon(True)
t1.start()
t2.start()
输出:
我是: 一
我是: 开始 #睡眠2秒
线程结束
线程结束
Process finished with exit code 0

2、继承Thread类

import threading
import time

#继承threading类中的Thread类
class MyThread(threading.Thread):
   # 线程中需要的参数
   def __init__(self,name):
       super().__init__()
       self.name = name
   #重构run方法
   def run(self):
       print('I am {}'.format(self.name))
       time.sleep(3)

#创建实例化线程
t1 = MyThread('apple')
t2 = MyThread('banana')
#启动线程,调用类中的run方法
t1.start()
t2.start()
#获取线程名称
print(t1.getName())
print(t2.getName())
输出:
I am apple
I am banana
apple
banana
Process finished with exit code 0

在知道了这两种方法后,我们来看子线程和主线程以及一些线程定义
主线程:当一个程序启动时 , 就有一个线程开始运行 , 该线程通常叫做程序的主线程
子线程:因为程序是开始时就执行的 , 如果你需要再创建线程 , 那么创建的线程就是这个主线程的子线程
主线程的重要性体现在两方面 :
1、是产生其他子线程的线程
2、通常它必须最后完成执行比如执行各种关闭操作
join:阻塞调用程序 , 直到调用join () 方法的线程执行结束, 才会继续往下执行

import threading
import time

#继承threading类中的Thread类
class MyThread(threading.Thread):
    # 线程中需要的参数
    def __init__(self,name):
        super().__init__()
        self.name = name
    #重构run方法
    def run(self):
        print('I am {}'.format(self.name))
        time.sleep(3)
        print('子线程结束!!!')

#创建实例化线程
t1 = MyThread('apple')
#启动线程,调用类中的run方法
t1.start()
t1.join() #只有等待子线程结束,主线程才能结束
print('主线程结束!!!')
输出:
I am apple
子线程结束!!!
主线程结束!!!
Process finished with exit code 0

setDeamon():setDaemon() 与 join() 基本上是相对的 , join会等子线程执行完毕 ; 而setDaemon则不会等

import threading
import time

#继承threading类中的Thread类
class MyThread(threading.Thread):
    # 线程中需要的参数
    def __init__(self,name):
        super().__init__()
        self.name = name
    #重构run方法
    def run(self):
        print('I am {}'.format(self.name))
        time.sleep(3)
        print('子线程结束!!!')

#创建实例化线程
t1 = MyThread('apple')
#启动线程,调用类中的run方法
t1.setDaemon(True) #放在子线程启动之前,否则会报错
t1.start()
print('主线程结束!!!')
输出:
I am apple  #子线程没有结束,主线程就已经结束
主线程结束!!!
Process finished with exit code 0

线程间的通信

互斥锁:在多线程中 , 所有变量对于所有线程都是共享的 , 因此 , 线程之间共享数据最大的危险在于多个线程同时修改一个变量 , 那就乱套了 , 所以我们需要互斥锁 , 来锁住数据。
提示!
因为线程属于同一个进程,因此它们之间共享内存区域。因此全局变量是公共的。

from threading import Thread

a = 1
def func():
    global a
    a = 2

t = Thread(target=func())
t.start()
t.join()
print(a)
输出:
2

Process finished with exit code 0

上面是一个共享参数,也就是共享内存的问题,多线程下,共享内存会出现互相竞争的问题

from threading import Thread

a = 0
def add_func():
    global a
    for i in range(1000000):
        a += 1
def sub_func():
    global a
    for i in range(1000000):
        a -= 1

t_add = Thread(target=add_func)
t_sub = Thread(target=sub_func)
t_add.start()
t_sub.start()
t_add.join()
t_sub.join()
print(a)
输出说明:
当取值小于10000时,结果为0 是正常的
当取值大于10000时,结果就会别的数,有时为正,有时为负数,这就说明这两个线程在互相抢占资源造成结果的不正确。

使用锁来控制共享资源的访问

from threading import Thread,Lock

a = 0
lock = Lock()
def add_func():
    global a
    for i in range(1000000):
        lock.acquire()
        a += 1
        lock.release()
def sub_func():
    global a
    for i in range(1000000):
        lock.acquire()
        a -= 1
        lock.release()

t_add = Thread(target=add_func)
t_sub = Thread(target=sub_func)
t_add.start()
t_sub.start()
t_add.join()
t_sub.join()
print(a)
输出:
0

Process finished with exit code 0

或者

from threading import Thread,Lock

a = 0
lock = Lock()
def add_func():
    global a
    for i in range(1000000):
        with lock: #上下文管理器,会自动关闭锁
            a += 1
def sub_func():
    global a
    for i in range(1000000):
        with lock:
            a -= 1

t_add = Thread(target=add_func)
t_sub = Thread(target=sub_func)
t_add.start()
t_sub.start()
t_add.join()
t_sub.join()
print(a)
输出:
0

Process finished with exit code 0

但是,加锁的缺点是程序会运行非常缓慢。

队列:队列就好像是排队,有一个入口,一个出口,先入先出。 线程队列操作命令:
操作命令 描述
put(item) 入队
get() 出队
empty() 测试空 #近似
full() 测试满 #近似
qsize() 队列长度
task_done() 任务完成
join() 等待完成 (此处的join和线程的阻塞不是一回事)
from threading import Thread
from queue import Queue
from random import randint

#创建队列,指定长度
my_q = Queue(10)

def func_put(q):
    '''生产数据'''
    for i in range(10):
        num = randint(0,100)
        q.put(num)

def func_get(qq):
    '''取出数据'''
    for j in range(5): #每次取出5个数值
        num = qq.get()
        print(num)


t1 = Thread(target=func_put, args=(my_q,))
t2 = Thread(target=func_get, args=(my_q,))
t1.start()
t2.start()
t1.join()
t2.join()
输出:
48
10
78
41
27
Process finished with exit code 0

队列保证了数据的安全性,没有争抢资源的现象。是按照顺序依次取出的。

from queue import Queue

my_q = Queue(4)
my_q.put(1,)
print(my_q.qsize())
my_q.get()
print(my_q.qsize())
print(my_q.empty())
my_q.put(1,)
my_q.put(1,)
my_q.put(1,)
my_q.put(1,)
print(my_q.full())
my_q.task_done()
my_q.task_done()
my_q.task_done()
my_q.task_done()
my_q.task_done()#每put一次,就需要任务完成一次
my_q.join() #检测put和task_dane()是否相同,不相同就会阻塞,相同就会完成
print('ok')
输出:
1
0
True
True
ok
Process finished with exit code 0

线程池

线程池:

主线程: 相当于生产者,只管向线程池提交任务。并不关心线程池是如何执行任务的。因此,并不关心是哪一个线程执行的这个任务
线程池: 相当于消费者,负责接收任务,并将任务分配到一个空闲的线程中去执行

线程的简单实现:
from threading import Thread
from queue import Queue
import time

class ThreadPool:
    def __init__(self,n):
        self.queue = Queue()
        for i in range(n):
            Thread(target=self.worker).start()

    def worker(self):
        while True:
            func,args,kwargs = self.queue.get()
            func(*args,**kwargs)
            self.queue.task_done()
    def apply_sy(self,func,args = (),kwargs = {}):
        self.queue.put((func,args,kwargs))
    def join(self):
        self.queue.join()

def t1():
    print('任务1')
    time.sleep(2)
    print('任务1完成')
def t2 (*args,**kwargs):
    print('任务2',args,kwargs)
    time.sleep(2)
    print('任务2完成',args,kwargs)

pool = ThreadPool(4)

pool.apply_sy(t1)
pool.apply_sy(t2,args = (1,2),kwargs = {'a':3,'b':4})
print('任务提交')
pool.join()
print('任务完成')
输出:
任务提交
任务1任务2 
(1, 2) {'a': 3, 'b': 4}
任务2完成 任务1完成
(1, 2) {'a': 3, 'b': 4}
任务完成
Process finished with exit code -1

python内置线程池:

from multiprocessing.pool import ThreadPool
import time

def t1():
    print('任务1')
    time.sleep(2)
    print('任务1完成')
def t2 (*args,**kwargs):
    print('任务2',args,kwargs)
    time.sleep(2)
    print('任务2完成',args,kwargs)

pool = ThreadPool(4)

pool.apply_async(t1)
pool.apply_async(t2,args = (1,2),kwds = {'a':3,'b':4})
print('任务提交')
pool.close() #在join前必须要有close,这样就不允许提交任务了
pool.join()
print('任务完成')
输出:
任务提交任务1
任务2 (1, 2) {'a': 3, 'b': 4}

任务2完成任务1完成
 (1, 2) {'a': 3, 'b': 4}
任务完成
Process finished with exit code 0

池的其他操作:
1、关闭操作:close -关闭提交通道,不允许在提交任务
2、终止操作:terminate -终止进程池,终止所有任务。

from multiprocessing.pool import ThreadPool
import time

def t1():
    print('任务1')
    time.sleep(2)
    print('任务1完成')
def t2 (*args,**kwargs):
    print('任务2',args,kwargs)
    time.sleep(2)
    print('任务2完成',args,kwargs)

pool = ThreadPool(4)

pool.apply_async(t1)
pool.apply_async(t2,args = (1,2),kwds = {'a':3,'b':4})
print('任务提交')
pool.terminate() #终止进程池,终止所有任务
pool.join()
print('任务完成')
输出:
任务提交
任务完成
Process finished with exit code 0

相关文章

网友评论

      本文标题:python线程的实现,线程池

      本文链接:https://www.haomeiwen.com/subject/lorhoqtx.html