在我們之前的教程中,我們學習了有關 Python CSV 範例 的內容。在這個教程中,我們將學習使用 Python 進行多處理,並提供示例。
Python 多處理
並行處理現在越來越受到關注。如果你仍然不了解並行處理,可以從 wikipedia 學習。隨著 CPU 製造商不斷向處理器添加更多核心,創建並行代碼是提高性能的一種很好的方法。Python 引入了 multiprocessing 模塊,讓我們能夠編寫並行代碼。要理解該模塊的主要動機,我們必須了解一些有關並行編程的基礎知識。閱讀本文後,我們希望您能夠在這個主題上獲得一些知識。
Python 多處理的 Process、Queue 和 Locks
有很多Python多处理模块中的类可用于构建并行程序。其中,三个基本类是Process
、Queue
和Lock
。这些类将帮助您构建并行程序。但在描述它们之前,让我们通过简单的代码来介绍这个主题。要使并行程序有用,您必须知道您的计算机有多少个核心。Python多处理模块使您能够知道这一点。以下简单代码将打印您计算机中的核心数。
import multiprocessing
print("Number of cpu : ", multiprocessing.cpu_count())
以下输出可能会因您的计算机而异。对我来说,核心数量是8。
Python多处理Process类
Python 多進程 Process
類是一個抽象,它設置另一個 Python 進程,並提供運行代碼的方式以及父應用程序控制執行的方法。 Process 類有兩個重要的函數 – start()
和 join()
函數。 首先,我們需要編寫一個函數,該函數將由進程運行。 然後,我們需要實例化一個進程對象。 如果我們創建了一個進程對象,除非我們告訴它開始處理,否則不會發生任何事情,我們可以通過 start()
函數來實現這一點。 然後,該進程將運行並返回其結果。 然後我們通過 join()
函數告訴進程完成。 如果沒有 join()
函數調用,進程將保持空閒並且不會終止。 因此,如果創建了許多進程並且不終止它們,則可能會面臨資源稀缺。 然後您可能需要手動終止它們。 一個重要的事情是,如果您想要通過進程傳遞任何參數,則需要使用 args
關鍵字參數。 以下代碼將有助於理解 Process 類的使用。
from multiprocessing import Process
def print_func(continent='Asia'):
print('The name of continent is : ', continent)
if __name__ == "__main__": # confirms that the code is under main function
names = ['America', 'Europe', 'Africa']
procs = []
proc = Process(target=print_func) # instantiating without any argument
procs.append(proc)
proc.start()
# 實例化具有參數的進程
for name in names:
# print(name)
proc = Process(target=print_func, args=(name,))
procs.append(proc)
proc.start()
# 完成進程
for proc in procs:
proc.join()
以下代碼的輸出將為:
Python 多進程 Queue 類
你對計算機數據結構有基本的了解,可能知道佇列。Python多進程模塊提供了Queue
類,它確實是一個先進先出的數據結構。它們可以存儲任何pickle Python對象(儘管簡單的對象最好),在進程之間共享數據時非常有用。當將佇列作為參數傳遞給進程的目標函數時,佇列尤其有用,以便進程可以消耗數據。通過使用put()
函數,我們可以將數據插入到佇列中,並使用get()
我們可以從佇列中獲取項目。請參閱以下代碼以獲取快速示例。
from multiprocessing import Queue
colors = ['red', 'green', 'blue', 'black']
cnt = 1
# 實例化一個佇列對象
queue = Queue()
print('pushing items to queue:')
for color in colors:
print('item no: ', cnt, ' ', color)
queue.put(color)
cnt += 1
print('\npopping items from queue:')
cnt = 0
while not queue.empty():
print('item no: ', cnt, ' ', queue.get())
cnt += 1
Python多進程鎖定類
Lock類的任務非常簡單。它允許代碼聲明鎖定,以便在釋放鎖定之前,沒有其他進程可以執行相似的代碼。因此,Lock類的任務主要是兩個。一個是聲明鎖定,另一個是釋放鎖定。要聲明鎖定,使用acquire()
函數,要釋放鎖定,使用release()
函數。
Python多進程示例
在這個 Python 多處理示例中,我們將結合所有的知識。假設我們有一些任務需要完成。為了完成這些任務,我們將使用多個進程。因此,我們將維護兩個隊列。一個將包含任務,另一個將包含已完成任務的日誌。然後,我們實例化進程來完成任務。注意,Python 的 Queue 類已經同步。這意味著我們不需要使用 Lock 類來阻塞多個進程訪問同一個隊列對象。這就是為什麼在這種情況下,我們不需要使用 Lock 類的原因。下面是實現代碼,我們在其中將任務添加到隊列中,然後創建進程並啟動它們,然後使用 join()
完成進程。最後,我們從第二個隊列中打印日誌。
from multiprocessing import Lock, Process, Queue, current_process
import time
import queue # imported for using queue.Empty exception
def do_job(tasks_to_accomplish, tasks_that_are_done):
while True:
try:
'''
try to get task from the queue. get_nowait() function will
raise queue.Empty exception if the queue is empty.
queue(False) function would do the same task also.
'''
task = tasks_to_accomplish.get_nowait()
except queue.Empty:
break
else:
'''
if no exception has been raised, add the task completion
message to task_that_are_done queue
'''
print(task)
tasks_that_are_done.put(task + ' is done by ' + current_process().name)
time.sleep(.5)
return True
def main():
number_of_task = 10
number_of_processes = 4
tasks_to_accomplish = Queue()
tasks_that_are_done = Queue()
processes = []
for i in range(number_of_task):
tasks_to_accomplish.put("Task no " + str(i))
# 創建進程
for w in range(number_of_processes):
p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done))
processes.append(p)
p.start()
# 完成進程
for p in processes:
p.join()
# 打印輸出
while not tasks_that_are_done.empty():
print(tasks_that_are_done.get())
return True
if __name__ == '__main__':
main()
根據任務的數量不同,代碼將需要一些時間來顯示輸出。以下代碼的輸出將因時間而異。
Python 多處理池
Python 多處理池可用於跨多個輸入值並行執行函數,將輸入數據跨進程分配(數據並行)。下面是一個簡單的 Python 多處理池示例。
from multiprocessing import Pool
import time
work = (["A", 5], ["B", 2], ["C", 1], ["D", 3])
def work_log(work_data):
print(" Process %s waiting %s seconds" % (work_data[0], work_data[1]))
time.sleep(int(work_data[1]))
print(" Process %s Finished." % work_data[0])
def pool_handler():
p = Pool(2)
p.map(work_log, work)
if __name__ == '__main__':
pool_handler()
以下圖像顯示了上述程序的輸出。請注意,池大小為2,因此 work_log
函數正在並行執行兩次。當其中一個函數處理完成時,它會選擇下一個參數,依此類推。因此,這就是關於 Python 多進程模塊的全部內容。參考:官方文檔
Source:
https://www.digitalocean.com/community/tutorials/python-multiprocessing-example