做图片素材的网站,wordpress入门视频教程,wordpress插件对话,网站运营一月多少钱Python 中考虑 concurrent.futures 实现真正的并行计算
思考#xff0c;如何将代码所要执行的计算任务划分成多个独立的部分并在各自的核心上面平行地运行。
Python 的全局解释器锁#xff08;global interpreter lock#xff0c;GIL#xff09;导致没办法用线程来实现真…Python 中考虑 concurrent.futures 实现真正的并行计算
思考如何将代码所要执行的计算任务划分成多个独立的部分并在各自的核心上面平行地运行。
Python 的全局解释器锁global interpreter lockGIL导致没办法用线程来实现真正的并行所以先把这种方案排除掉。另一种常见的方案是把那些对性能要求比较高的performance-critical代码用 C 语言重写成扩展模块。然而用 C 语言重写 Python 代码代价是比较高的。所以还是要能够在 Python 语言自身的范围内解决这种复杂的并行计算问题。
Python 内置的 multiprocessing 模块提供了多进程机制这种机制很容易通过内置的 concurrent.futures 模块来使用。这种方案可以启动许多条子进程child process这些进程是独立于主解释器的它们有各自的解释器与相应的全局解释器锁因此这些子进程可以平行地运行在 CPU 的各个核心上面。每条子进程都能够充分利用它所在的这个核心来执行运算。这些子进程都有指向主进程的链接用来接收所要执行的计算任务并返回结果。
例如现在要用 Python 来执行某种计算量很大的工作而且想把 CPU 里的各个核心充分利用起来。用下面这个计算最大公约数的函数来模拟刚才讲的那种工作。
# my_module.py
def gcd(pair):a, b pairlow min(a, b)for i in range(low, 0, -1):if a % i 0 and b % i 0:return iassert False, Not reachable如果把有待求解最大公约数的那些元组按照先后顺序交给这个函数去执行那么程序花费的总时间就会随着元组的数量呈正比例上升因为根本就没有做平行计算。
# run_serial.py
import timenumbers [(1963309, 2265973), (2030677, 3814172),(1551645, 2229620), (2039045, 2020802),(1823712, 1924928), (2293129, 1020491),(1281238, 2273782), (3823812, 4237281),(3812741, 4729139), (1292391, 2123811),
]def main():start time.time()results list(map(gcd, numbers))end time.time()delta end - startprint(fTook {delta:.3f} seconds)main()#
# Took 0.863 seconds直接把这种代码分给多条 Python 线程去执行是不会让程序提速的因为它们全都受制于同一个 Python 全局解释器锁GIL无法真正平行地运行在各自的 CPU 核心上面。现在就来演示这一点。使用 concurrent.futures 模块里面的 ThreadPoolExecutor 类并允许它最多可以启用四条工作线程根据机器核心数设置。
# run_threads.py
from concurrent.futures import ThreadPoolExecutor
import timenumbers [(1963309, 2265973), (2030677, 3814172),(1551645, 2229620), (2039045, 2020802),(1823712, 1924928), (2293129, 1020491),(1281238, 2273782), (3823812, 4237281),(3812741, 4729139), (1292391, 2123811),
]def main():start time.time()pool ThreadPoolExecutor(max_workers4)results list(pool.map(gcd, numbers))end time.time()delta end - startprint(fTook {delta:.3f} seconds)main()#
# Took 0.846 seconds由于要启动线程池并和它通信这种写法比单线程版本还慢。但是请注意只需要变动一行代码就能让程序出现奇效也就是把 ThreadPoolExecutor 改成 concurrent.futures 模块里的 ProcessPoolExecutor。这样一改程序立刻就快了起来。
# run_parallel.py
from concurrent.futures import ProcessPoolExecutor
import timenumbers [(1963309, 2265973), (2030677, 3814172),(1551645, 2229620), (2039045, 2020802),(1823712, 1924928), (2293129, 1020491),(1281238, 2273782), (3823812, 4237281),(3812741, 4729139), (1292391, 2123811),
]def main():start time.time()pool ProcessPoolExecutor(max_workers4) # The one changeresults list(pool.map(gcd, numbers))end time.time()delta end - startprint(fTook {delta:.3f} seconds)if __name__ __main__:main()#
# Took 0.464 seconds程序变得比原来快多了。这是为什么呢因为 ProcessPool-Executor 类会执行下面这一系列的步骤当然这实际上是由 multiprocessing 模块里的底层机制所推动的。
1从包含输入数据的NUMBERS列表里把每个元素取出来以便交给 map。2用 pickle 模块对每个元素做序列化处理把它转成二进制形式。3将序列化之后的数据从主解释器所在的进程经由本地 socket 复制到子解释器所在的进程。4在子进程里面用 pickle 模块对数据做反序列化处理把它还原成 Python 对象。5引入包含 gcd 函数的那个 Python 模块。6把刚才还原出来的那个对象交给 gcd 函数去处理此时其他子进程也可以把它们各自的那份数据交给它们各自的 gcd 函数执行。7对执行结果做序列化处理把它转化成二进制形式。8将二进制数据通过 socket 复制到上级进程。9在上级进程里面对二进制数据做反序列化处理把它还原成 Python 对象。10把每条子进程所给出的结果都还原好最后合并到一个 list 里面返回。
从开发者这边来看这个过程似乎很简单但实际上multiprocessing 模块与 Proce-ssPoolExecutor 类要做大量的工作才能实现出这样的并行效果。同样的效果假如改用其他语言来做那基本上只需要用一把锁或一项原子操作就能很好地协调多个线程从而实现并行。但这在 Python 里面不行所以才考虑通过 ProcessPoolExecutor 来实现。然而这样做的开销很大因为它必须在上级进程与子进程之间做全套的序列化与反序列化处理。
这个方案对那种孤立的而且数据利用度较高的任务来说比较合适。所谓孤立isolated这里指每一部分任务都不需要跟程序里的其他部分共用状态信息。所谓数据利用度较高high-leverage这里指任务所使用的原始材料以及最终所给出的结果数据量都很小因此上级进程与子进程之间只需要互传很少的信息就行然而在把原始材料加工成最终产品的过程中却需要做大量运算。
如果你面对的计算任务不具备刚才那两项特征那么使用 ProcessPoolExecutor 所引发的开销可能就会盖过因为并行而带来的好处。在这种情况下可以考虑直接使用 multiprocessing 所提供的一些其他高级功能例如共享内存shared memory、跨进程的锁cross-process lock、队列queue以及代理proxy等。但是这些功能都相当复杂即便两个 Python 线程之间所要共享的进程只有一条也是要花很大工夫才能在内存空间里面将这些工具安排到位。假如需要共享的进程有很多条而且还涉及 socket那么这种代码理解起来会更加困难。
总之不要刚一上来就立刻使用跟 multiprocessing 这个内置模块有关的机制而是可以先试着用 ThreadPoolExecutor 来运行这种孤立且数据利用度较高的任务。把这套方案实现出来之后再考虑向 ProcessPoolExecutor 方案迁移。如果 ProcessPoolExecutor 方案也无法满足要求而且其他办法也全都试遍了那么最后可以考虑直接使用 multiprocessing 模块里的高级功能来编写代码。