Adventure42

the answer to the ultimate question of life, the universe, and everything

Parallelism with Multiprocessing

09 Dec 2022 » PythonProgramming

Parallelism

Parallelism with Multiprocessing

Parallelism

  • 완전한 동일한 타이밍(시점)에 task 실행.
  • 다양한 파트로 나눠서 실행.(나눠서 구하고 취합)
  • 멀티프로세싱에서 CPU가 1 Core인 경우 만족하지 않음.
  • deep learning, bitcoin 채굴 등에 사용될 수 있음.


Process vs. Thread 차이 비교

  • 독립된 메모리(process), 공유된 메모리(thread)
  • 많은 메모리 필요(process), 적은 메모리(thread)
  • 좀비(dead) process는 생성 가능성이 높지만, 좀비(dead) thread 생성은 쉽지 않음.
  • 오버헤드 큼(process), 오버헤드 작음(thread)
  • 생성/ 소멸 다소 느림(process), 생성/소명이 상대적으로 빠름(thread)
  • 코드 작성이 쉬움/ 디버깅 어려움(process), 코드 작성이 어려움/디버깅 어려움(thread)
  • Process에서는 code, data, heap, stack 모두 개별적으로 만들어짐.
  • Thread의 경우에는 stack만 개별적으로 만들어지고 나머지(code, data, heap)은 process내 thread들이 share함. Local variables, function arguments, return values와 같은 stack의 요소들, copy of the register, programing counter, 그리고 그 외 thread-specific data를 thread에 개별적으로 생성하여 개별적인 scheduling에 따라 task를 수행할 수 있도록 한다.

gRPC


주로 사용되는 함수는 terminate(), join(), is_alive(), 등이 있다. 특히 join은 개별적으로 실행되는 sub-process들이 모두 같은 시점에 끝나도록 ‘join’해주는 역할을 수행한다.

from multiprocessing import Process
import time
import logging


def prof_func(name):
    print('Sub-Process {}: starting'.format(name))
    time.sleep(3)
    print('Sub-Process {}: finishing'.format(name))

def main():
    # Logging format 설정
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
    # 함수 인자 확인
    p = Process(target=prof_func, args=('First',))

    logging.info('Main-Process: before creating process')
    # 프로세스
    p.start()
    logging.info('Main-Process: during process')
    logging.info('Main-Process: joined process')
    p.join()
    # 프로세스 상태 확인
    print(f'process p is alive: {p.is_alive()}')


# main 시작
if __name__ == "__main__":
    main()


Process 수행 시, process id와 process name을 확인하기 위해 다음과 같이 os의 getpid() 함수와 multiprocessing의 current_process()를 사용할 수 있다.

from multiprocessing import Process, current_process
import os
import random
import time


# 실행
def square(n):
    time.sleep(random.randint(1,3))
    process_id = os.getpid()
    process_name = current_process().name
    result = n * n
    print(f'Process ID: {process_id}, Process Name: {process_name}')
    print(f'Result of {n} squared: {result}')


if __name__ == '__main__':
    # 부모 process 아이디
    parent_process_id = os.getpid()
    print(f'Parent process ID: {parent_process_id}')
    # process 리스트 선언
    processes = list()
    # process 생성 및 실행
    for i in range(1,50): # 10~100 실행
        # 생성
        t = Process(name=str(i), target=square, args=(i,))
        # 배열에 담기 (하나의 리스트로 모두 담아서 한번에 join해주려고)
        processes.append(t)
        # 시작
        t.start()
    for process in processes:
        process.join()
    # 종료
    print('Main-Process Done.')


공유 자원의 활용

프로세스 메모리 공유의 예시

from multiprocessing import Process, current_process, Value, Array
import os


# 실행함수
def generate_update_number(v: int):
    for _ in range(50):
        v.value += 1
    print(current_process().name, "data:", v.value)

def main():
    # parent process id 확인 (디버깅을 위해 꼭 필요)
    parent_process_id = os.getpid()
    # 출력
    print(f'Parent process ID: {parent_process_id}')
    # 프로세스 리스트 선언
    processes = list()
    # 프로세스 메모리 공유 변수
    # 공유 변수의 type(i for int, c for char, 등등), 값, 등등 엄격하게 선언되어야 함.
    share_value = Value('i',0)
    # 공유 변수가 리스트일때는 다음과 같이 Array를 활용한다.
    share_numbers = Array('i', range(50))
    # 위 Value, Array외에 아래 두 library들도 사용 가능 함.
    # from multiprocess import shared_memory 사용 가능 (python 3.8이상 부터)
    # from multiprocess import Manager 사용 가능
    for _ in range(1,10):
        # 생성
        p = Process(target=generate_update_number, args=(share_value,))
        # 배열에 담기
        processes.append(p)
        # 실행
        p.start()
    # join
    for p in processes:
        p.join()  
    # 최종 프로세스 부모 변수 확인
    print('Final Data in parent process', share_value)


if __name__ == "__main__":
    main()


프로세스 통신 구현

queue 또는 pipe를 통해서 worker(sub process)에서 실행한 값을 부모(main process)에게 전달

from multiprocessing import Process, Queue, current_process
import time
import os


# 실행함수
def worker(id, baseNum, q):
    process_id = os.getpid()
    process_name = current_process().name
    # 누적
    sub_total = 0
    # 계산
    for i in range(baseNum):
        sub_total += 1
    # Produce
    q.put(sub_total)
    # 정보출력
    print(f'Process ID: {process_id}, Process name: {process_name}, ID: {id}')
    print(f'Result: {sub_total}')

def main():
    # 부모 process id
    parent_process_id = os.getpid()
    # 출력
    print(f'Parent process id: {parent_process_id}')
    # 프로세스 리스트
    processes = list()
    # 시작시간
    start_time = time.time()
    # Queue 선언
    q = Queue()
    for i in range(5): # 1~100사이 값으로 설정
        # 생성
        t = Process(name=str(i), target=worker, args=(i, 100000000, q))
        # 배열에 담기
        processes.append(t)
        #시작
        t.start()
    # join
    for process in processes:
        process.join()
	# 순수계산 시간
    print("--- %s seconds ---" % (time.time() - start_time))
    # 종료 flag
    q.put('exit')
    total = 0
    # 대기
    while True:
        tmp = q.get()
        if tmp == 'exit':
            break
        else:
            total += tmp
    print('Main-Processing Total Count={}'.format(total))
    print('Main-Processing Done.')


if __name__ == "__main__":
    main()

Multiprocessing에서 shared memory는 Python multiprocessing documentation 참고



References

  1. 프로그래밍-파이썬-완성-인프런-오리지널