Python 多进程示例

在我們之前的教程中,我們學習了有關 Python CSV 範例 的內容。在這個教程中,我們將學習使用 Python 進行多處理,並提供示例。

Python 多處理

並行處理現在越來越受到關注。如果你仍然不了解並行處理,可以從 wikipedia 學習。隨著 CPU 製造商不斷向處理器添加更多核心,創建並行代碼是提高性能的一種很好的方法。Python 引入了 multiprocessing 模塊,讓我們能夠編寫並行代碼。要理解該模塊的主要動機,我們必須了解一些有關並行編程的基礎知識。閱讀本文後,我們希望您能夠在這個主題上獲得一些知識。

Python 多處理的 Process、Queue 和 Locks

有很多Python多处理模块中的类可用于构建并行程序。其中,三个基本类是ProcessQueueLock。这些类将帮助您构建并行程序。但在描述它们之前,让我们通过简单的代码来介绍这个主题。要使并行程序有用,您必须知道您的计算机有多少个核心。Python多处理模块使您能够知道这一点。以下简单代码将打印您计算机中的核心数。

import multiprocessing

print("Number of cpu : ", multiprocessing.cpu_count())

以下输出可能会因您的计算机而异。对我来说,核心数量是8。

Python多处理Process类

Python 多進程 Process 類是一個抽象,它設置另一個 Python 進程,並提供運行代碼的方式以及父應用程序控制執行的方法。 Process 類有兩個重要的函數 – start()join() 函數。 首先,我們需要編寫一個函數,該函數將由進程運行。 然後,我們需要實例化一個進程對象。 如果我們創建了一個進程對象,除非我們告訴它開始處理,否則不會發生任何事情,我們可以通過 start() 函數來實現這一點。 然後,該進程將運行並返回其結果。 然後我們通過 join() 函數告訴進程完成。 如果沒有 join() 函數調用,進程將保持空閒並且不會終止。 因此,如果創建了許多進程並且不終止它們,則可能會面臨資源稀缺。 然後您可能需要手動終止它們。 一個重要的事情是,如果您想要通過進程傳遞任何參數,則需要使用 args 關鍵字參數。 以下代碼將有助於理解 Process 類的使用。

from multiprocessing import Process


def print_func(continent='Asia'):
    print('The name of continent is : ', continent)

if __name__ == "__main__":  # confirms that the code is under main function
    names = ['America', 'Europe', 'Africa']
    procs = []
    proc = Process(target=print_func)  # instantiating without any argument
    procs.append(proc)
    proc.start()

    # 實例化具有參數的進程
    for name in names:
        # print(name)
        proc = Process(target=print_func, args=(name,))
        procs.append(proc)
        proc.start()

    # 完成進程
    for proc in procs:
        proc.join()

以下代碼的輸出將為:

Python 多進程 Queue 類

你對計算機數據結構有基本的了解,可能知道佇列。Python多進程模塊提供了Queue類,它確實是一個先進先出的數據結構。它們可以存儲任何pickle Python對象(儘管簡單的對象最好),在進程之間共享數據時非常有用。當將佇列作為參數傳遞給進程的目標函數時,佇列尤其有用,以便進程可以消耗數據。通過使用put()函數,我們可以將數據插入到佇列中,並使用get()我們可以從佇列中獲取項目。請參閱以下代碼以獲取快速示例。

from multiprocessing import Queue

colors = ['red', 'green', 'blue', 'black']
cnt = 1
# 實例化一個佇列對象
queue = Queue()
print('pushing items to queue:')
for color in colors:
    print('item no: ', cnt, ' ', color)
    queue.put(color)
    cnt += 1

print('\npopping items from queue:')
cnt = 0
while not queue.empty():
    print('item no: ', cnt, ' ', queue.get())
    cnt += 1

Python多進程鎖定類

Lock類的任務非常簡單。它允許代碼聲明鎖定,以便在釋放鎖定之前,沒有其他進程可以執行相似的代碼。因此,Lock類的任務主要是兩個。一個是聲明鎖定,另一個是釋放鎖定。要聲明鎖定,使用acquire()函數,要釋放鎖定,使用release()函數。

Python多進程示例

在這個 Python 多處理示例中,我們將結合所有的知識。假設我們有一些任務需要完成。為了完成這些任務,我們將使用多個進程。因此,我們將維護兩個隊列。一個將包含任務,另一個將包含已完成任務的日誌。然後,我們實例化進程來完成任務。注意,Python 的 Queue 類已經同步。這意味著我們不需要使用 Lock 類來阻塞多個進程訪問同一個隊列對象。這就是為什麼在這種情況下,我們不需要使用 Lock 類的原因。下面是實現代碼,我們在其中將任務添加到隊列中,然後創建進程並啟動它們,然後使用 join() 完成進程。最後,我們從第二個隊列中打印日誌。

from multiprocessing import Lock, Process, Queue, current_process
import time
import queue # imported for using queue.Empty exception


def do_job(tasks_to_accomplish, tasks_that_are_done):
    while True:
        try:
            '''
                try to get task from the queue. get_nowait() function will 
                raise queue.Empty exception if the queue is empty. 
                queue(False) function would do the same task also.
            '''
            task = tasks_to_accomplish.get_nowait()
        except queue.Empty:

            break
        else:
            '''
                if no exception has been raised, add the task completion 
                message to task_that_are_done queue
            '''
            print(task)
            tasks_that_are_done.put(task + ' is done by ' + current_process().name)
            time.sleep(.5)
    return True


def main():
    number_of_task = 10
    number_of_processes = 4
    tasks_to_accomplish = Queue()
    tasks_that_are_done = Queue()
    processes = []

    for i in range(number_of_task):
        tasks_to_accomplish.put("Task no " + str(i))

    # 創建進程
    for w in range(number_of_processes):
        p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done))
        processes.append(p)
        p.start()

    # 完成進程
    for p in processes:
        p.join()

    # 打印輸出
    while not tasks_that_are_done.empty():
        print(tasks_that_are_done.get())

    return True


if __name__ == '__main__':
    main()

根據任務的數量不同,代碼將需要一些時間來顯示輸出。以下代碼的輸出將因時間而異。

Python 多處理池

Python 多處理池可用於跨多個輸入值並行執行函數,將輸入數據跨進程分配(數據並行)。下面是一個簡單的 Python 多處理池示例。

from multiprocessing import Pool

import time

work = (["A", 5], ["B", 2], ["C", 1], ["D", 3])


def work_log(work_data):
    print(" Process %s waiting %s seconds" % (work_data[0], work_data[1]))
    time.sleep(int(work_data[1]))
    print(" Process %s Finished." % work_data[0])


def pool_handler():
    p = Pool(2)
    p.map(work_log, work)


if __name__ == '__main__':
    pool_handler()

以下圖像顯示了上述程序的輸出。請注意,池大小為2,因此 work_log 函數正在並行執行兩次。當其中一個函數處理完成時,它會選擇下一個參數,依此類推。因此,這就是關於 Python 多進程模塊的全部內容。參考:官方文檔

Source:
https://www.digitalocean.com/community/tutorials/python-multiprocessing-example