Python 병렬 처리 예제

이전 튜토리얼에서는 Python CSV 예제에 대해 배웠습니다. 이번 튜토리얼에서는 예제와 함께 Python Multiprocessing을 배우게 됩니다.

Python Multiprocessing

병렬 처리는 요즘 더 많은 관심을 받고 있습니다. 아직 병렬 처리에 대해 알지 못하는 경우, 위키피디아에서 배우세요. CPU 제조업체들이 프로세서에 점점 더 많은 코어를 추가함에 따라 병렬 코드를 작성하여 성능을 향상시키는 것이 좋은 방법입니다. Python은 multiprocessing 모듈을 도입하여 병렬 코드를 작성할 수 있게 했습니다. 이 모듈의 주요 동기를 이해하기 위해서는 병렬 프로그래밍에 대한 몇 가지 기본 지식이 필요합니다. 이 글을 읽은 후에는 이 주제에 대한 몇 가지 지식을 얻을 수 있기를 희망합니다.

Python Multiprocessing 프로세스, 큐 및 락

파이썬 multiprocessing 모듈에는 병렬 프로그램을 구축하는 데 사용할 수 있는 다양한 클래스가 있습니다. 그 중에서도 세 가지 기본 클래스는 Process, Queue, Lock입니다. 이러한 클래스들을 사용하면 병렬 프로그램을 만드는 데 도움이 될 것입니다. 하지만 이에 대해 설명하기 전에 간단한 코드로 이 주제를 시작해 보겠습니다. 병렬 프로그램을 유용하게 만들기 위해서는 컴퓨터에 몇 개의 코어가 있는지 알아야 합니다. 파이썬 Multiprocessing 모듈을 사용하면 이를 알 수 있습니다. 다음의 간단한 코드는 컴퓨터의 코어 수를 출력합니다.

import multiprocessing

print("Number of cpu : ", multiprocessing.cpu_count())

다음 출력은 사용하는 컴퓨터에 따라 다를 수 있습니다. 제 경우에는 코어 수가 8개입니다.

파이썬 multiprocessing Process 클래스

Python multiprocessing Process 클래스는 다른 Python 프로세스를 설정하고 실행 코드 및 부모 애플리케이션이 실행을 제어할 수 있는 추상화입니다. Process 클래스에 속하는 두 가지 중요한 함수가 있습니다 – start()join() 함수입니다. 먼저 프로세스가 실행할 함수를 작성해야 합니다. 그런 다음 프로세스 객체를 인스턴스화해야 합니다. 프로세스 객체를 만들면 start() 함수를 통해 처리를 시작할 때까지 아무 일도 일어나지 않습니다. 그런 다음 프로세스가 실행되고 결과를 반환합니다. 그 후에는 join() 함수를 통해 프로세스를 완료하라고 알려야 합니다. join() 함수 호출 없이는 프로세스가 대기 상태로 유지되고 종료되지 않습니다. 따라서 많은 프로세스를 생성하고 종료하지 않으면 리소스 부족 문제가 발생할 수 있습니다. 그런 다음 수동으로 그들을 종료해야 할 수도 있습니다. 중요한 점은 프로세스를 통해 인수를 전달하려면 args 키워드 인수를 사용해야 한다는 것입니다. 다음 코드는 Process 클래스의 사용법을 이해하는 데 도움이 될 것입니다.

from multiprocessing import Process


def print_func(continent='Asia'):
    print('The name of continent is : ', continent)

if __name__ == "__main__":  # confirms that the code is under main function
    names = ['America', 'Europe', 'Africa']
    procs = []
    proc = Process(target=print_func)  # instantiating without any argument
    procs.append(proc)
    proc.start()

    # 인수를 사용하여 프로세스 인스턴스화
    for name in names:
        # print(name)
        proc = Process(target=print_func, args=(name,))
        procs.append(proc)
        proc.start()

    # 프로세스 완료
    for proc in procs:
        proc.join()

다음 코드의 출력은 다음과 같습니다:

Python multiprocessing Queue 클래스

파이썬 Multiprocessing 모듈은 정확히 먼저 들어온 것이 먼저 나가는 데이터 구조인 Queue 클래스를 제공합니다. 이들은 어떤 pickle Python 객체라도 저장할 수 있으며(단순한 것이 가장 좋음) 프로세스 간 데이터 공유에 극도로 유용합니다. 큐는 특히 프로세스의 대상 함수에 매개변수로 전달되어 프로세스가 데이터를 소비할 수 있도록 할 때 유용합니다. put() 함수를 사용하여 데이터를 큐에 삽입하고 get()을 사용하여 큐에서 항목을 가져올 수 있습니다. 다음 코드를 참조하세요.

from multiprocessing import Queue

colors = ['red', 'green', 'blue', 'black']
cnt = 1
# 큐 객체를 인스턴스화
queue = Queue()
print('pushing items to queue:')
for color in colors:
    print('item no: ', cnt, ' ', color)
    queue.put(color)
    cnt += 1

print('\npopping items from queue:')
cnt = 0
while not queue.empty():
    print('item no: ', cnt, ' ', queue.get())
    cnt += 1

Python multiprocessing Lock 클래스

Lock 클래스의 작업은 매우 간단합니다. 코드가 잠금을 요청하여 잠금이 해제될 때까지 다른 프로세스가 유사한 코드를 실행하지 못하도록 합니다. Lock 클래스의 작업은 주로 두 가지입니다. 하나는 잠금을 요청하는 것이고, 다른 하나는 잠금을 해제하는 것입니다. 잠금을 요청하려면 acquire() 함수를 사용하고 잠금을 해제하려면 release() 함수를 사용합니다.

파이썬 multiprocessing 예제

이 Python multiprocessing 예제에서는 우리의 모든 지식을 합칠 것입니다. 몇 가지 작업을 수행해야 한다고 가정해 보겠습니다. 해당 작업을 완료하기 위해 여러 프로세스를 사용할 것입니다. 그래서 우리는 두 개의 큐를 유지할 것입니다. 하나는 작업을 포함하고 다른 하나는 완료된 작업의 로그를 포함할 것입니다. 그런 다음 프로세스를 완료하도록 인스턴스화합니다. Python Queue 클래스는 이미 동기화되어 있습니다. 즉, 동일한 큐 객체에 대한 여러 프로세스의 액세스를 차단하기 위해 Lock 클래스를 사용할 필요가 없습니다. 따라서 이 경우 Lock 클래스를 사용할 필요가 없습니다. 아래는 큐에 작업을 추가하고, 프로세스를 생성하고 시작한 다음, join()을 사용하여 프로세스를 완료하는 구현입니다. 마지막으로 두 번째 큐에서 로그를 인쇄합니다.

from multiprocessing import Lock, Process, Queue, current_process
import time
import queue # imported for using queue.Empty exception


def do_job(tasks_to_accomplish, tasks_that_are_done):
    while True:
        try:
            '''
                try to get task from the queue. get_nowait() function will 
                raise queue.Empty exception if the queue is empty. 
                queue(False) function would do the same task also.
            '''
            task = tasks_to_accomplish.get_nowait()
        except queue.Empty:

            break
        else:
            '''
                if no exception has been raised, add the task completion 
                message to task_that_are_done queue
            '''
            print(task)
            tasks_that_are_done.put(task + ' is done by ' + current_process().name)
            time.sleep(.5)
    return True


def main():
    number_of_task = 10
    number_of_processes = 4
    tasks_to_accomplish = Queue()
    tasks_that_are_done = Queue()
    processes = []

    for i in range(number_of_task):
        tasks_to_accomplish.put("Task no " + str(i))

    # 프로세스 생성
    for w in range(number_of_processes):
        p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done))
        processes.append(p)
        p.start()

    # 프로세스 완료
    for p in processes:
        p.join()

    # 출력 출력
    while not tasks_that_are_done.empty():
        print(tasks_that_are_done.get())

    return True


if __name__ == '__main__':
    main()

작업 수에 따라 코드가 출력을 보여주기까지 시간이 걸릴 수 있습니다. 다음 코드의 출력은 시간에 따라 다를 것입니다.

Python multiprocessing Pool

Python multiprocessing Pool은 여러 입력 값에 대한 함수의 병렬 실행에 사용할 수 있으며, 입력 데이터를 프로세스간에 분산시켜 처리합니다(데이터 병렬 처리). 아래는 간단한 Python multiprocessing Pool 예제입니다.

from multiprocessing import Pool

import time

work = (["A", 5], ["B", 2], ["C", 1], ["D", 3])


def work_log(work_data):
    print(" Process %s waiting %s seconds" % (work_data[0], work_data[1]))
    time.sleep(int(work_data[1]))
    print(" Process %s Finished." % work_data[0])


def pool_handler():
    p = Pool(2)
    p.map(work_log, work)


if __name__ == '__main__':
    pool_handler()

위 이미지는 위 프로그램의 출력을 보여줍니다. 풀 크기가 2이므로 work_log 함수가 병렬로 두 번 실행됩니다. 함수 처리 중 하나가 완료되면 다음 인수를 선택하고 이어서 진행합니다. 파이썬 multiprocessing 모듈에 대한 설명은 여기까지입니다. 참조: 공식 문서

Source:
https://www.digitalocean.com/community/tutorials/python-multiprocessing-example