Python マルチプロセスの例

前回のチュートリアルでは、Python CSVの例について学びました。このチュートリアルでは、例を用いてPythonのマルチプロセッシングを学びます。

Pythonマルチプロセッシング

並列処理は最近注目されています。まだ並列処理について知らない場合は、wikipediaから学んでください。CPUメーカーがプロセッサにますます多くのコアを追加するにつれて、並列コードを作成することはパフォーマンスを向上させる素晴らしい方法です。Pythonはmultiprocessingモジュールを導入し、並列コードを書くことを可能にしました。このモジュールの主な動機を理解するには、並列プログラミングの基本を知る必要があります。この記事を読んだ後、このトピックについていくつかの知識を得ることができると思います。

Pythonマルチプロセッシングのプロセス、キュー、およびロック

Pythonのマルチプロセッシングモジュールには、並列プログラムを構築するための多数のクラスがあります。その中でも、3つの基本的なクラスはProcessQueue、およびLockです。これらのクラスは、並列プログラムの構築を支援します。しかし、それらについて説明する前に、簡単なコードでこのトピックを開始しましょう。並列プログラムを有用にするには、PCにいくつのコアがあるかを知っておく必要があります。Pythonのマルチプロセッシングモジュールを使用すると、それを知ることができます。次の簡単なコードは、PCのコア数を表示します。

import multiprocessing

print("Number of cpu : ", multiprocessing.cpu_count())

次の出力は、あなたのPCによって異なる場合があります。私の場合、コア数は8です。

Pythonのマルチプロセッシングプロセスクラス

Pythonのマルチプロセス Process クラスは、別のPythonプロセスを設定し、それにコードを実行する方法と親アプリケーションが実行を制御する方法を提供する抽象化です。 Processクラスに属する2つの重要な関数があります – start() 関数と join() 関数。まず、プロセスで実行される関数を書く必要があります。次に、プロセスオブジェクトをインスタンス化する必要があります。プロセスオブジェクトを作成しても、start() 関数を使用して処理を開始するまで何も起こりません。その後、プロセスは実行され、その結果が返されます。その後、プロセスに対して join() 関数を呼び出して完了を通知します。 join() 関数を呼び出さない場合、プロセスはアイドル状態のままで終了しません。そのため、多くのプロセスを作成してそれらを終了しない場合、リソースが不足する可能性があります。その場合は、それらを手動で終了する必要があります。重要な点として、プロセスを介して引数を渡す場合は args キーワード引数を使用する必要があります。次のコードは、Processクラスの使用法を理解するのに役立ちます。

from multiprocessing import Process


def print_func(continent='Asia'):
    print('The name of continent is : ', continent)

if __name__ == "__main__":  # confirms that the code is under main function
    names = ['America', 'Europe', 'Africa']
    procs = []
    proc = Process(target=print_func)  # instantiating without any argument
    procs.append(proc)
    proc.start()

    # 引数を指定してプロセスをインスタンス化する
    for name in names:
        # print(name)
        proc = Process(target=print_func, args=(name,))
        procs.append(proc)
        proc.start()

    # プロセスを完了させる
    for proc in procs:
        proc.join()

次のコードの出力は次のとおりです:

PythonのマルチプロセスQueueクラス

コンピュータのデータ構造について基本的な知識があります。おそらく、キューについて知っているでしょう。Pythonのマルチプロセッシングモジュールは、Queueクラスを提供しており、これはまさにFirst-In-First-Outデータ構造です。これらは任意のピクルPythonオブジェクトを保存できます(単純なものが最適です)し、プロセス間でデータを共有するために非常に便利です。キューは、プロセスのターゲット関数にパラメータとして渡されたときに特に有用で、プロセスがデータを消費することを可能にします。put()関数を使用してデータをキューに挿入し、get()を使用してキューからアイテムを取得できます。以下のコードを見て、簡単な例をご覧ください。

from multiprocessing import Queue

colors = ['red', 'green', 'blue', 'black']
cnt = 1
# キューオブジェクトのインスタンス化
queue = Queue()
print('pushing items to queue:')
for color in colors:
    print('item no: ', cnt, ' ', color)
    queue.put(color)
    cnt += 1

print('\npopping items from queue:')
cnt = 0
while not queue.empty():
    print('item no: ', cnt, ' ', queue.get())
    cnt += 1

Pythonマルチプロセッシングロッククラス

Lockクラスのタスクは非常にシンプルです。コードがロックを獲得し、他のプロセスが同様のコードを実行できないようにします。ロックが解除されるまで。したがって、Lockクラスのタスクは主に2つです。1つはロックを獲得し、もう1つはロックを解除することです。ロックを獲得するには、acquire()関数を使用し、ロックを解除するにはrelease()関数を使用します。

Pythonマルチプロセッシングの例

このPythonのマルチプロセスの例では、私たちの知識をすべて結集します。いくつかのタスクを実行する必要があるとします。そのタスクを完了するために、複数のプロセスを使用します。したがって、2つのキューを維持します。1つはタスクを含み、もう1つは完了したタスクのログを含みます。その後、タスクを完了するためのプロセスをインスタンス化します。PythonのQueueクラスはすでに同期されているため、Lockクラスを使用して複数のプロセスが同じキューオブジェクトにアクセスできないようにブロックする必要はありません。そのため、この場合、Lockクラスを使用する必要はありません。以下は、タスクをキューに追加し、次にプロセスを作成して開始し、join()を使用してプロセスを完了する実装です。最後に、2番目のキューからログを出力しています。

from multiprocessing import Lock, Process, Queue, current_process
import time
import queue # imported for using queue.Empty exception


def do_job(tasks_to_accomplish, tasks_that_are_done):
    while True:
        try:
            '''
                try to get task from the queue. get_nowait() function will 
                raise queue.Empty exception if the queue is empty. 
                queue(False) function would do the same task also.
            '''
            task = tasks_to_accomplish.get_nowait()
        except queue.Empty:

            break
        else:
            '''
                if no exception has been raised, add the task completion 
                message to task_that_are_done queue
            '''
            print(task)
            tasks_that_are_done.put(task + ' is done by ' + current_process().name)
            time.sleep(.5)
    return True


def main():
    number_of_task = 10
    number_of_processes = 4
    tasks_to_accomplish = Queue()
    tasks_that_are_done = Queue()
    processes = []

    for i in range(number_of_task):
        tasks_to_accomplish.put("Task no " + str(i))

    プロセスの作成
    for w in range(number_of_processes):
        p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done))
        processes.append(p)
        p.start()

    プロセスの完了
    for p in processes:
        p.join()

    出力の表示
    while not tasks_that_are_done.empty():
        print(tasks_that_are_done.get())

    return True


if __name__ == '__main__':
    main()

タスクの数に応じて、コードの実行には時間がかかります。以下のコードの出力は時間とともに異なります。

Pythonのマルチプロセスプール

Pythonのマルチプロセスプールは、複数の入力値にわたって関数を並行して実行し、入力データをプロセス間で分散する(データ並列処理)ために使用できます。以下は、簡単なPythonのマルチプロセスプールの例です。

from multiprocessing import Pool

import time

work = (["A", 5], ["B", 2], ["C", 1], ["D", 3])


def work_log(work_data):
    print(" Process %s waiting %s seconds" % (work_data[0], work_data[1]))
    time.sleep(int(work_data[1]))
    print(" Process %s Finished." % work_data[0])


def pool_handler():
    p = Pool(2)
    p.map(work_log, work)


if __name__ == '__main__':
    pool_handler()

以下の画像は、上記のプログラムの出力を示しています。プールサイズが2であるため、work_log関数の実行が並行して行われています。1つの関数の処理が完了すると、次の引数が選択され、そのように続きます。 以上がPythonのmultiprocessingモジュールの説明です。参照:公式ドキュメント

Source:
https://www.digitalocean.com/community/tutorials/python-multiprocessing-example