בשיעור הקודם שלנו, למדנו על דוגמה ל-CSV בפייתון. בשיעור הזה אנחנו מתכוונים ללמוד על עיבוד מרובה תהליכים בפייתון עם דוגמאות.
עיבוד מרובה תהליכים בפייתון
עיבוד מקבילי מקבל תשומת לב רבה בימינו. אם עדיין אינך מכיר את עיבוד המקבילי, למד מ-ויקיפדיה. כאשר יצרני מעבדים מתחילים להוסיף יותר ויותר ליבות למעבדים שלהם, כתיבת קוד מקבילי הוא דרך נהדרת לשפר את הביצועים. פייתון הציגה את המודול multiprocessing כדי לאפשר לנו לכתוב קוד מקבילי. כדי להבין את המניע העיקרי של מודול זה, עלינו לדעת מספר יסודות על תכנות מקבילי. לאחר קריאת מאמר זה, אנו מקווים שתהיה מסוגל לרכוש מספר ידע בנושא זה.
תהליכי multiprocessing בפייתון, תור ונעילות
ישנם מספר רב של מחלקות במודול ההרצת מרובות תהליך בפייתון לבניית תוכנית מקבילית. ביניהן, שלוש מחלקות בסיסיות הן Process
, Queue
ו־Lock
. מחלקות אלו יעזרו לך לבנות תוכנית מקבילית. אך לפני שנתאר על אלו, בוא נתחיל עם קוד פשוט. כדי להפוך תוכנית מקבילית לשימושית, עליך לדעת כמה ליבות יש במחשב שלך. מודול ההרצת מרובות תהליך של פייתון מאפשר לך לדעת את זה. הקוד הפשוט הבא ידפיס את מספר הליבות במחשב שלך.
import multiprocessing
print("Number of cpu : ", multiprocessing.cpu_count())
הפלט הבא עשוי להשתנות עבור המחשב שלך. בשבילי, מספר הליבות הוא 8.
מחלקת ה־Process ב־multiprocessing של פייתון
המחלקה Process
של Python multiprocessing היא הסתברות שמכינה תהליך נוסף של Python, מספקת לו להריץ קוד ודרך לשלוט בביצוע. ישנם שני פונקציות חשובות ששייכות למחלקת Process – פונקציות start()
ו־join()
. בתחילה, עלינו לכתוב פונקציה שתופעל על ידי התהליך. לאחר מכן, עלינו ליצור אובייקט תהליך. אם ניצור אובייקט תהליך, לא יקרה כלום עד שנגיד לו להתחיל לעבוד באמצעות הפונקציה start()
. לאחר מכן, התהליך ירוץ ויחזיר את התוצאה שלו. לאחר מכן אנו אומרים לתהליך להשלים באמצעות הפונקציה join()
. ללא קריאה לפונקצית join()
, התהליך יישאר תקוע ולא יסיים. לכן, אם תיצור הרבה תהליכים ולא תסיים אותם, עשויים להתקיים חסרון במשאבים. לכן יכול להיות נדרש להפיל אותם ידנית. דבר חשוב אחד הוא, אם ברצונך להעביר ארגומנט כלשהו דרך התהליך, עליך להשתמש בפרמטר המקורי args
. הקוד הבא יהיה מועיל להבנת שימוש במחלקת Process.
from multiprocessing import Process
def print_func(continent='Asia'):
print('The name of continent is : ', continent)
if __name__ == "__main__": # confirms that the code is under main function
names = ['America', 'Europe', 'Africa']
procs = []
proc = Process(target=print_func) # instantiating without any argument
procs.append(proc)
proc.start()
# instantiating process with arguments
for name in names:
# print(name)
proc = Process(target=print_func, args=(name,))
procs.append(proc)
proc.start()
# complete the processes
for proc in procs:
proc.join()
פלט הקוד הבא יהיה:
מחלקת Queue של multiprocessing ב־Python
יש לך ידע בסיסי אודות מבני נתונים של מחשב, כנראה שאתה מכיר על תור. מודולי ה־Multiprocessing של פייתון מספקים קלאס Queue
שהוא בדיוק מבנה נתונים של First-In-First-Out. הם יכולים לאחסן כל אובייקט pickle של פייתון (אם כי אובייקטים פשוטים הם הטובים ביותר) והם נפלאים לשיתוף נתונים בין תהליכים. התורים מיועדים במיוחד כאשר מועברים כפרמטר לפונקציית היעד של תהליך כדי לאפשר לתהליך לצרוך נתונים. באמצעות הפונקציה put()
אנו יכולים להכניס נתונים לתוך התור ובאמצעות get()
אנו יכולים לקבל פריטים מהתורים. ראה את הקוד הבא לדוגמה מהירה.
from multiprocessing import Queue
colors = ['red', 'green', 'blue', 'black']
cnt = 1
# יצירת אובייקט תור
queue = Queue()
print('pushing items to queue:')
for color in colors:
print('item no: ', cnt, ' ', color)
queue.put(color)
cnt += 1
print('\npopping items from queue:')
cnt = 0
while not queue.empty():
print('item no: ', cnt, ' ', queue.get())
cnt += 1
קלאס Lock של Multiprocessing של פייתון
משימת קלאס ה־Lock די פשוטה. הוא מאפשר לקוד להטיל נעילה כך שאף תהליך אחר לא יוכל לבצע את אותו הקוד עד שהנעילה תוחרט. לכן, משימת קלאס ה־Lock בעיקר שתיים. אחת היא לקבוע נעילה והשנייה היא לשחרר את הנעילה. כדי לקבוע נעילה משמשת הפונקציה acquire()
וכדי לשחרר נעילה משמשת הפונקציה release()
.
דוגמה ל־Multiprocessing בפייתון
בדוגמת הרצה זו של Python multiprocessing, נמזג את כל הידע שלנו יחד. נניח שיש לנו מספר משימות לבצע. כדי לבצע את המשימה, נשתמש במספר תהליכים. לכן, נשמור על שתי תורות. אחת תכיל את המשימות והשנייה תכיל את הלוג של המשימות שהושלמו. אז ניצור את התהליכים כדי להשלים את המשימה. שים לב שמחלקת התורים של פייתון כבר מסונכרנת. זה אומר, שאנחנו לא צריכים להשתמש במחלקת Lock כדי לחסום מספר תהליכים לגשת לאותו אובייקט תור. לכן, אין צורך להשתמש במחלקת Lock במקרה זה. למטה מימוש כאשר אנחנו מוסיפים משימות לתור, לאחר מכן יוצרים תהליכים ומפעילים אותם, ואז משתמשים ב־join()
כדי להשלים את התהליכים. לבסוף אנו מדפיסים את הלוג מהתור השני.
from multiprocessing import Lock, Process, Queue, current_process
import time
import queue # imported for using queue.Empty exception
def do_job(tasks_to_accomplish, tasks_that_are_done):
while True:
try:
'''
try to get task from the queue. get_nowait() function will
raise queue.Empty exception if the queue is empty.
queue(False) function would do the same task also.
'''
task = tasks_to_accomplish.get_nowait()
except queue.Empty:
break
else:
'''
if no exception has been raised, add the task completion
message to task_that_are_done queue
'''
print(task)
tasks_that_are_done.put(task + ' is done by ' + current_process().name)
time.sleep(.5)
return True
def main():
number_of_task = 10
number_of_processes = 4
tasks_to_accomplish = Queue()
tasks_that_are_done = Queue()
processes = []
for i in range(number_of_task):
tasks_to_accomplish.put("Task no " + str(i))
# יצירת התהליכים
for w in range(number_of_processes):
p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done))
processes.append(p)
p.start()
# השלמת התהליכים
for p in processes:
p.join()
# הדפסת הפלט
while not tasks_that_are_done.empty():
print(tasks_that_are_done.get())
return True
if __name__ == '__main__':
main()
בהתאם למספר המשימות, הקוד ייקח זמן מסוים כדי להציג לך את הפלט. פלט הקוד הבא ישתנה מפעם לפעם.
Pool של Python multiprocessing
Pool של Python multiprocessing ניתן לשימוש לביצוע פונקציה במקביל על ערכי קלט מרובים, ולהפצת הנתונים הנכנסים בין תהליכים (פרלליזם של נתונים). למטה דוגמה פשוטה ל־Pool של Python multiprocessing.
from multiprocessing import Pool
import time
work = (["A", 5], ["B", 2], ["C", 1], ["D", 3])
def work_log(work_data):
print(" Process %s waiting %s seconds" % (work_data[0], work_data[1]))
time.sleep(int(work_data[1]))
print(" Process %s Finished." % work_data[0])
def pool_handler():
p = Pool(2)
p.map(work_log, work)
if __name__ == '__main__':
pool_handler()
התמונה למטה מראה את פלט התוכנית שנמצא למעלה. שימו לב שגודל הבריכה הוא 2, כך ששתי ביצועי פונקציית work_log
מתרחשים במקביל. כאשר אחת הביצועים מסיימת את העיבוד שלה, היא מבצעת את הפעולות על הארגומנט הבא וכן הלאה. אז זהו הכל לגבי מודול הריבויות בפייתון. מקור: תיעוד רשמי
Source:
https://www.digitalocean.com/community/tutorials/python-multiprocessing-example