Parallel
[TOC]
并行化程序和多线程。一般用thread或者threading,multiprocessing,进程通信使用Pipe或者Queue(更安全),MPI 也可以。
Python多线程支持用两种方式来创建线程:一种是通过继承Thread类,重写它的run()方法(注意,不是start()方法);另一种是创建一个threading.Thread对象,在它的初始化函数(init())中将可调用对象作为参数传入。实际应用中,推荐优先使用threading模块而不是thread模块,(除非有特殊需要)。
【备注:线程的问题再看看。改善程序的91个建议一书+操作系统。】
GIL
Global Interpreter Lock 确保虚拟机中仅有一个线程运行。Python中的一种机制,导致多线程编程的不理想。
Multiprocessing
multiprocessing 是一个用与 threading 模块相似API的支持产生进程的包。 multiprocessing 包同时提供本地和远程并发,使用子进程代替线程,有效避免 Global Interpreter Lock 带来的影响。因此, multiprocessing 模块允许程序员充分利用机器上的多个核心。Unix 和 Windows 上都可以运行。
Pool: thread没有进程池,,Process类似thread.Thread
from multiprocessing import Pool,Process
def function(x):
pass
def main():
with Pool(5) as p:
p.map(function,X)
#function要加入进程任务的任务
# x 参数序列
p=Process(target=function,args=(x1,x2))
p.satrt()
p.join()
Queue
Pipe()
q=Queue()
parents_conn,child_conn=Pipe()# 代表管道两端
p=Process(target=function.args=(q,))
p=Process(target=function,args=(child_conn,))
p.start()
p.join()
Lock():保证进程同步,仅有一个进程打印状态
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
Thread Pool
线程生命周期:create,initial,run,block,terminate
启动时间:Ts 运行时间Tr 销毁时间Td
构建一个POOL(队列)放入任务,多线程池依次从队列中取任务。避免线程不断创建销毁浪费时间,等全部任务完成后再销毁。
from threading import Thread
import Queue
class worker(Thread):
def _init_(self,workQueue,resultQueue):
def run(self):
while True:
try:
callable,args,kwds = self.workQueue.get(False)#fetch a task
self.resultQueue.put(callble(*args,**kwds))# return result
except Queue.Empty:
break
class WorkerManager():
# 管理线程
def _init_(self,numbers_of_worker):
self.workQueue=Queue.Queue()#创建任务队列
self.resultQueue=Queue.Queue()#创建结果队列
self.workers=[]
self._recruitThreads(self, numbers_of_worker):
for i inrange(numbers_of_worker):
worker=Worker(self.workQueue,resultQueue)
self.workers.append(worker)
def start(self):#启动线程
for w in self.workers:
w.start()
def wait_for_complete( self):
while len(self.workers):
worker = self.workers.pop(#从池中取出一个线程处理请求
worker.join( )
if worker.isAlive() and not self.workQueue.empty():
self.workers.append( worker ) # 重新加入线程池中
print "All jobs were completed."
def add_job( self, callable, *args, **kwds ):
self.workQueue.put( (callable, args, kwds) ) # 往工作队列中加入请求
def get_result( self, *args, **kwds ):# 获取处理结果
return self.resultQueue.get( *args, **kwds )
def funtion():
pass
wm = WorkerManager(2) #
创建线程池
for i in urls:
wm.add_job( download_file, i ) #
将所有请求加入队列中
wm.start()
wm.wait_for_complete()
或者threadpool模块
import threadpool as tp
def function(args):
pass
pool_size=2
pool=tp.ThreadPool(pool_size)# 创建pool
request=tp.makeRequests(function,args)# 创建request
pool.putRequest(threadpool.WorkRequest(download_file,args=))#将具体的请求放入线程池
pool.poll()#处理任务队列中的新的请求
pool.wait()
print "destory all threads before exist"
pool.dismissWorkers(pool_size, do_join=True)#完成后退出
MPI
服务器上一般采用多核CPU运行程序,不同CPU之间如何协同合作、消息传递?MPI是一个通用的多线程工具,本文主要介绍MPI的python包用法。运行程序时,最常用的用法是直接多次运行mpirun -np 10 main.py [argments],
运行openAIES复现时程序
mpirun -bind-to core -map-by node -report-bindings python main.py -e 20 -g Alien -c ./configurations/sample_configurations.json -r Test










网友评论