Python多进程示例

在我们之前的教程中,我们学习了Python CSV示例。在这个教程中,我们将学习Python多进程的示例。

Python多进程

并行处理如今受到越来越多的关注。如果你仍然不了解并行处理,请从wikipedia学习。随着CPU制造商开始向其处理器添加越来越多的核心,编写并行代码是提高性能的一个很好的方法。Python引入了multiprocessing模块,让我们编写并行代码。要理解此模块的主要动机,我们必须了解一些关于并行编程的基础知识。阅读本文后,我们希望您能对这个主题有所了解。

Python多进程进程、队列和锁

在Python的多进程模块中有很多类可用于构建并行程序。其中,三个基本类是ProcessQueueLock。这些类将帮助您构建并行程序。但在描述它们之前,让我们用简单的代码来开始这个话题。要使并行程序有用,您必须知道您的计算机有多少个核心。Python的多进程模块使您能够了解这一点。以下简单代码将打印出您计算机上的核心数量。

import multiprocessing

print("Number of cpu : ", multiprocessing.cpu_count())

以下输出可能因您的计算机而异。对我来说,核心数是8。

Python多进程Process类

Python多进程Process类是一个抽象,它设置了另一个Python进程,提供了运行代码的方法以及父应用程序控制执行的方式。Process类有两个重要的函数 – start()join()函数。首先,我们需要编写一个将由进程运行的函数。然后,我们需要实例化一个进程对象。如果我们创建了一个进程对象,在我们告诉它通过start()函数开始处理之前,什么都不会发生。然后,进程将运行并返回其结果。之后,我们通过join()函数告诉进程完成。如果没有调用join()函数,进程将保持空闲状态并且不会终止。因此,如果您创建了许多进程而没有终止它们,您可能会面临资源稀缺的问题。然后,您可能需要手动终止它们。一个重要的事情是,如果您想通过进程传递任何参数,您需要使用args关键字参数。以下代码将有助于理解Process类的用法。

from multiprocessing import Process


def print_func(continent='Asia'):
    print('The name of continent is : ', continent)

if __name__ == "__main__":  # confirms that the code is under main function
    names = ['America', 'Europe', 'Africa']
    procs = []
    proc = Process(target=print_func)  # instantiating without any argument
    procs.append(proc)
    proc.start()

    # 使用参数实例化进程
    for name in names:
        # print(name)
        proc = Process(target=print_func, args=(name,))
        procs.append(proc)
        proc.start()

    # 完成进程
    for proc in procs:
        proc.join()

以下代码的输出将是:

Python multiprocessing Queue class

Python的多进程模块提供了`Queue`类,它恰好是一个先进先出的数据结构。它们可以存储任何pickle Python对象(尽管简单的对象最好),非常适用于在进程之间共享数据。当作为参数传递给进程的目标函数时,队列特别有用,以便进程消耗数据。通过使用`put()`函数,我们可以将数据插入队列,使用`get()`我们可以从队列中获取项目。请看下面的代码作为一个快速示例。

from multiprocessing import Queue

colors = ['red', 'green', 'blue', 'black']
cnt = 1
# 实例化一个队列对象
queue = Queue()
print('pushing items to queue:')
for color in colors:
    print('item no: ', cnt, ' ', color)
    queue.put(color)
    cnt += 1

print('\npopping items from queue:')
cnt = 0
while not queue.empty():
    print('item no: ', cnt, ' ', queue.get())
    cnt += 1

Python多进程锁类

Lock类的任务非常简单。它允许代码声明锁定,以便没有其他进程可以执行相似的代码,直到锁定被释放为止。因此,Lock类的任务主要是两个。一个是声明锁,另一个是释放锁。要声明锁,使用`acquire()`函数,要释放锁,使用`release()`函数。

Python多进程示例

在这个Python多进程示例中,我们将汇总所有的知识。假设我们有一些任务要完成。为了完成这些任务,我们将使用多个进程。因此,我们将维护两个队列。一个队列将包含任务,另一个队列将包含已完成任务的日志。然后,我们实例化进程来完成任务。请注意,Python的Queue类已经是同步的。这意味着我们不需要使用Lock类来阻止多个进程访问同一个队列对象。因此,在这种情况下,我们不需要使用Lock类。下面是实现的代码,其中我们将任务添加到队列中,然后创建进程并启动它们,然后使用join()来完成进程。最后,我们从第二个队列中打印日志。

from multiprocessing import Lock, Process, Queue, current_process
import time
import queue # imported for using queue.Empty exception


def do_job(tasks_to_accomplish, tasks_that_are_done):
    while True:
        try:
            '''
                try to get task from the queue. get_nowait() function will 
                raise queue.Empty exception if the queue is empty. 
                queue(False) function would do the same task also.
            '''
            task = tasks_to_accomplish.get_nowait()
        except queue.Empty:

            break
        else:
            '''
                if no exception has been raised, add the task completion 
                message to task_that_are_done queue
            '''
            print(task)
            tasks_that_are_done.put(task + ' is done by ' + current_process().name)
            time.sleep(.5)
    return True


def main():
    number_of_task = 10
    number_of_processes = 4
    tasks_to_accomplish = Queue()
    tasks_that_are_done = Queue()
    processes = []

    for i in range(number_of_task):
        tasks_to_accomplish.put("Task no " + str(i))

    # 创建进程
    for w in range(number_of_processes):
        p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done))
        processes.append(p)
        p.start()

    # 完成进程
    for p in processes:
        p.join()

    # 打印输出
    while not tasks_that_are_done.empty():
        print(tasks_that_are_done.get())

    return True


if __name__ == '__main__':
    main()

根据任务的数量,该代码将花费一些时间来显示输出。以下代码的输出将会随着时间的变化而变化。

Python多进程池

Python多进程池可用于在多个输入值上并行执行函数,将输入数据分发到多个进程(数据并行)。以下是一个简单的Python多进程池示例。

from multiprocessing import Pool

import time

work = (["A", 5], ["B", 2], ["C", 1], ["D", 3])


def work_log(work_data):
    print(" Process %s waiting %s seconds" % (work_data[0], work_data[1]))
    time.sleep(int(work_data[1]))
    print(" Process %s Finished." % work_data[0])


def pool_handler():
    p = Pool(2)
    p.map(work_log, work)


if __name__ == '__main__':
    pool_handler()

以下图片显示了上述程序的输出。请注意,池大小为2,因此 work_log 函数的两个执行在并行进行。当其中一个函数处理完成时,它会选择下一个参数,依此类推。 这就是关于 Python 多进程模块的全部内容。参考:官方文档

Source:
https://www.digitalocean.com/community/tutorials/python-multiprocessing-example