상세 컨텐츠

본문 제목

Futures 라이브러리와 파이썬 병렬성 구현 - GIL, concurrent.futures

개발/python-병렬처리

by Matthew0633 2022. 5. 12. 19:08

본문

비동기 방식 (asynchrony)

Asynchrony, in computer programming, refers to the occurrence of events independent of the main program flow and ways to deal with such events. These may be "outside" events such as the arrival of signals, or actions instigated by a program that take place concurrently with program execution, without the program blocking to wait for results.[1] Asynchronous input/output is an example of the latter case of asynchrony, and lets programs issue commands to storage or network devices that service these requests while the processor continues executing the program. Doing so provides a degree of parallelism.[1]
(https://en.wikipedia.org/wiki/Asynchrony_(computer_programming))

앞 작업이 끝나야 다음 작업이 시작되는 선형적인 방식과 달리 앞의 작업이 끝나지 않아도 다른 작업을 시작하여 동시에 여러가지 작업을 수행할 수 있는 방식이다. 아무 이유없이 앞의 작업 도중 다른 작업으로 넘어가는 것이 아니다. 네트워크에서의 I/O 작업과 같이 작업 요청 후 시간이 걸리는 경우, 끝날 때까지 유휴시간 동안 단순히 기다리는 것이 아니라, 상태값을 반환받아, 또다른 작업을 수행하거나 요청한다. 그리고 이전에 요청했던 작업이 끝나면 콜백 신호를 받게된다.

비동기 방식은 특정 작업 요청 이후 유휴시간동안 CPU 리소스가 낭비되는 것을 방지하여 작업의 효율을 증가시킬 수 있다.

GIL (Global Interpreter Lock)

GIL 의 목적은 object에 대한 reference count 보호를 통해, 정확한 memory 관리를 수행하기 위함이다 (memory leakage 방지)

  • 다른 언어와 달리 python에서는 왜 하필 memory management 에 대한 솔루션이 GIL일까?
    • 쓰레드 기반의 개념이 없던 초기에 python 라이브러리들이 c로 구현되기 시작했고, c extension에서 주로 사용되던 것이 GIL 이었다.
  • 그렇다면 왜 아직도 python에서 GIL이 없어지지 않았나?
    • GIL 을 제거했을 때 오히려 python 속도가 감소했기 때문이다 (feat. 귀도반로섬)
  • GIL을 우회하는 방법
    • 멀티프로세싱 (스레드가 아닌 프로세스 단위로 병렬처리 구현)
    • CPython (추후에 공부해볼 계획이다)

futures module

( 공식문서 : https://docs.python.org/3/library/concurrent.futures.html )

  • 파이썬 3.2 version 이후에 등장한 futures 모듈은 이러한 비동기방식의 작업을 high level로 사용할 수 있도록 지원한다.
  • 멀티스레딩과 멀티프로세싱 사용을 위한 API를 통일하여 둘다 사용하기 쉽고 서로간에 수정도 쉽다.
  • 실행중인 작업 취소, 완료여부체크, 타임아웃 지정, 콜백추가 등의 코드를 간편하게 작성할 수 있다

map() 을 활용한 기본적인 동시작업 수행

그렇다면 멀티스레딩을 위한 아주 기본적인 예시를 보자. 중요부분은 아래와 같이 굉장히 간단하다

  1. 개별 작업을 수행할 함수를 정의한다 (sum_gen)
  2. with문 안에서 ThreadPoolExecutor객체 (excutor) 에 map() 메소드를 통해 작업함수 (sum_gen) 와 작업목록 (WORKS) 을 전달하여 결과를 받는다
from concurrent.futures import ThreadPoolExecutor

WORKS = [100000, 1000000, 10000000, 10000000]

# [작업수행 함수] - 누적 합계 함수(Generator 사용)
def sum_gen(n):
    return sum(n for n in range(1, n+1))

# [worker 개수]
n_worker = min(10, len(WORK_LIST)) # worker 개수 : work 개수대로 사용, 최대 10개 사용

# [ThreadPoolExecutor]
with ThreadPoolExecutor(max_workers = n_worker) as excutor:
    # map -> 작업 순서 유지, 즉시 실행
    result = excutor.map(sum_gen, WORKS)

# 최종 결과 출력
print(f'\n Result -> {list(result)}')

wait() 을 활용한 동시작업 수행

만약, 특정 작업이 실패할 가능성이 있거나, 오래 걸릴 수 있어 timeout을 지정하여 pass 하고자 한다면, wait()을 사용할 수 있다. wait() 에서는 완료한 작업과 실패한 작업을 따로 관리하며, 또한 timeout 지정이 가능하여, 특정 작업이 해당 시간 내에 완료되지 않을 경우 pass하는 방식으로 작업을 진행할 수 있다

사용코드 면에서는 map()과 약간의 차이만 존재한다. executor.submit() 에서 개별 작업을 함수에 맵핑시켜 스케쥴링 목록 (future_list)을 생성하고 wait()내에 넣어주면 결과들에 대한 future 객체들이 반환된다.

이들은 done, not_done 이라는 attribute를 가지며, 값으로는 각각 완료한 작업의 future 객체와 그렇지 않은 작업의 future 객체를 가진다. 완료한 작업의 결과값을 반환받고 싶다면 result.done 의 원소마다 result() 메소드를 사용하면 그 결과값을 모을 수 있다

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

WORKS = [10000, 100000, 1000000, 10000000]

# [작업수행 함수] - 누적 합계 함수(Generator 사용)
def sum_gen(n):
  return sum(n for n in range(1, n+1))

# [worker 개수]
n_worker = min(10, len(WORKS)) # worker 개수 : work 개수만큼 사용 (최대 10개)

futures_list = []

# [ThreadPoolExecutor] ; ProcessPoolExecutor 도 사용가능
with ThreadPoolExecutor() as excutor:
  # [스케쥴링 목록 생성]
  for work in WORKS:
    # future 반환
    future = excutor.submit(sum_generator, work)
    # 스케쥴링
    futures_list.append(future)
    # 스케쥴링 확인
    print('Scheduled for {} : {}'.format(work, future))
    
  # [작업 수행]
  result = wait(futures_list, timeout=7) # 7초 이내에 완료되지 않으면 실패
  print('Completed Tasks : ' + str(result.done)) # 성공
  print('Pending ones after waiting for 7seconds : ' + str(result.not_done))  # 실패

  # [결과값 저장]
  my_results = [future.result() for future in result.done]

wait() 를 사용한 코드를 필요한 기능만 pythonic하고 간결하게 정리해보자

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

WORKS = [10000, 100000, 1000000, 10000000]

# [작업수행 함수] - 누적 합계 함수(Generator 사용)
def sum_gen(n):
    return sum(n for n in range(1, n+1))

# [worker 개수]
n_worker = min(10, len(WORKS)) # worker 개수 : work 개수만큼 사용 (최대 10개)

# [ThreadPoolExecutor] ; ProcessPoolExecutor 도 사용가능
with ThreadPoolExecutor() as excutor:

  # [스케쥴링 목록 생성]
  futures_list = [excutor.submit(sum_gen, work) for work in WORKS]

  # [작업 수행]
  result = wait(futures_list, timeout=7) # 7초 이내에 완료되지 않으면 실패

  # [결과값 저장]
  my_results = [future.result() for future in result.done]

as_completed() 를 활용한 동시작업

wait()은 timeout을 지정할 수는 있으나, 작업의 결과를 순차적으로 반환한다. 그러나 as_completed()는 특정 작업이 끝나자마자 yield로 반환하여 완료되는 작업에 대해 다음 작업 수행을 바로 가능하게 하여 작업의 효율성을 더 크게할 수 있다.

as_completed() 의 결과 원소는 특정 작업의 수행 결과에 해당한다. 그리고 개별 작업 수행결과는 done(), result(), cancelled를 가진다. done()는 작업 완료여부의 boolean 값을 반환하고, result() 는 완료한 작업일 경우 결과값을 반환한다. cancelled 는 완료 실패 여부를 나타내기 위한 attribute 인데 완료된 경우에도 state=finished 로 표시된다

코드는 wait()과 모두 같고 “[as_completed 결과 출력]” 부분만 달라진다.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

WORKS = [10000, 100000, 1000000, 10000000]

# [작업수행 함수] - 누적 합계 함수(Generator 사용)
def sum_gen(n):
    return sum(n for n in range(1, n+1))

# [worker 개수]
n_worker = min(10, len(WORK_LIST)) # worker 개수 : work 개수대로 사용, 최대 10개 사용

# [ThreadPoolExecutor] ; ProcessPoolExecutor 도 사용가능
with ThreadPoolExecutor() as excutor:

  # [스케쥴링 목록 생성]
  futures_list = [excutor.submit(sum_gen, work) for work in WORKS]

  # [as_completed 결과 출력]
  for future in as_completed(futures_list):
    result = future.result()
    done = future.done()
    cancelled = future.cancelled
    print('Future Result : {}, Done : {}'.format(result, done))
    print('Future Cancelled : {}'.format(cancelled))

as_completed() 를 사용한 코드를 필요한 기능만 pythonic하고 간결하게 정리해보자

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

WORKS = [10000, 100000, 1000000, 10000000]

# [작업수행 함수] - 누적 합계 함수(Generator 사용)
def sum_gen(n):
    return sum(n for n in range(1, n+1))

# [worker 개수]
n_worker = min(10, len(WORKS)) # worker 개수 : work 개수만큼 사용 (최대 10개)

# [ThreadPoolExecutor] ; ProcessPoolExecutor 도 사용가능
with ThreadPoolExecutor() as excutor:

    # [스케쥴링 목록 생성]
    futures_list = [excutor.submit(sum_gen, work) for work in WORKS]

    # [작업 완료 결과값 저장]
    result = [future.result() for future in as_complted(futures_list) if future.done()]

관련글 더보기

댓글 영역