비동기 방식 (asynchrony)
Asynchrony, in computer programming, refers to the occurrence of events independent of the main program flow and ways to deal with such events. These may be "outside" events such as the arrival of signals, or actions instigated by a program that take place concurrently with program execution, without the program blocking to wait for results.[1] Asynchronous input/output is an example of the latter case of asynchrony, and lets programs issue commands to storage or network devices that service these requests while the processor continues executing the program. Doing so provides a degree of parallelism.[1]
(https://en.wikipedia.org/wiki/Asynchrony_(computer_programming))
앞 작업이 끝나야 다음 작업이 시작되는 선형적인 방식과 달리 앞의 작업이 끝나지 않아도 다른 작업을 시작하여 동시에 여러가지 작업을 수행할 수 있는 방식이다. 아무 이유없이 앞의 작업 도중 다른 작업으로 넘어가는 것이 아니다. 네트워크에서의 I/O 작업과 같이 작업 요청 후 시간이 걸리는 경우, 끝날 때까지 유휴시간 동안 단순히 기다리는 것이 아니라, 상태값을 반환받아, 또다른 작업을 수행하거나 요청한다. 그리고 이전에 요청했던 작업이 끝나면 콜백 신호를 받게된다.
비동기 방식은 특정 작업 요청 이후 유휴시간동안 CPU 리소스가 낭비되는 것을 방지하여 작업의 효율을 증가시킬 수 있다.
GIL (Global Interpreter Lock)
GIL 의 목적은 object에 대한 reference count 보호를 통해, 정확한 memory 관리를 수행하기 위함이다 (memory leakage 방지)
futures module
( 공식문서 : https://docs.python.org/3/library/concurrent.futures.html )
map()
을 활용한 기본적인 동시작업 수행
그렇다면 멀티스레딩을 위한 아주 기본적인 예시를 보자. 중요부분은 아래와 같이 굉장히 간단하다
sum_gen
)with
문 안에서 ThreadPoolExecutor
객체 (excutor
) 에 map()
메소드를 통해 작업함수 (sum_gen
) 와 작업목록 (WORKS
) 을 전달하여 결과를 받는다from concurrent.futures import ThreadPoolExecutor
WORKS = [100000, 1000000, 10000000, 10000000]
# [작업수행 함수] - 누적 합계 함수(Generator 사용)
def sum_gen(n):
return sum(n for n in range(1, n+1))
# [worker 개수]
n_worker = min(10, len(WORK_LIST)) # worker 개수 : work 개수대로 사용, 최대 10개 사용
# [ThreadPoolExecutor]
with ThreadPoolExecutor(max_workers = n_worker) as excutor:
# map -> 작업 순서 유지, 즉시 실행
result = excutor.map(sum_gen, WORKS)
# 최종 결과 출력
print(f'\n Result -> {list(result)}')
wait()
을 활용한 동시작업 수행
만약, 특정 작업이 실패할 가능성이 있거나, 오래 걸릴 수 있어 timeout을 지정하여 pass 하고자 한다면, wait()
을 사용할 수 있다. wait()
에서는 완료한 작업과 실패한 작업을 따로 관리하며, 또한 timeout
지정이 가능하여, 특정 작업이 해당 시간 내에 완료되지 않을 경우 pass하는 방식으로 작업을 진행할 수 있다
사용코드 면에서는 map()과 약간의 차이만 존재한다. executor.submit()
에서 개별 작업을 함수에 맵핑시켜 스케쥴링 목록 (future_list
)을 생성하고 wait()
내에 넣어주면 결과들에 대한 future 객체
들이 반환된다.
이들은 done
, not_done
이라는 attribute를 가지며, 값으로는 각각 완료한 작업의 future 객체와 그렇지 않은 작업의 future 객체를 가진다. 완료한 작업의 결과값을 반환받고 싶다면 result.done
의 원소마다 result()
메소드를 사용하면 그 결과값을 모을 수 있다
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed
WORKS = [10000, 100000, 1000000, 10000000]
# [작업수행 함수] - 누적 합계 함수(Generator 사용)
def sum_gen(n):
return sum(n for n in range(1, n+1))
# [worker 개수]
n_worker = min(10, len(WORKS)) # worker 개수 : work 개수만큼 사용 (최대 10개)
futures_list = []
# [ThreadPoolExecutor] ; ProcessPoolExecutor 도 사용가능
with ThreadPoolExecutor() as excutor:
# [스케쥴링 목록 생성]
for work in WORKS:
# future 반환
future = excutor.submit(sum_generator, work)
# 스케쥴링
futures_list.append(future)
# 스케쥴링 확인
print('Scheduled for {} : {}'.format(work, future))
# [작업 수행]
result = wait(futures_list, timeout=7) # 7초 이내에 완료되지 않으면 실패
print('Completed Tasks : ' + str(result.done)) # 성공
print('Pending ones after waiting for 7seconds : ' + str(result.not_done)) # 실패
# [결과값 저장]
my_results = [future.result() for future in result.done]
wait()
를 사용한 코드를 필요한 기능만 pythonic하고 간결하게 정리해보자
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed
WORKS = [10000, 100000, 1000000, 10000000]
# [작업수행 함수] - 누적 합계 함수(Generator 사용)
def sum_gen(n):
return sum(n for n in range(1, n+1))
# [worker 개수]
n_worker = min(10, len(WORKS)) # worker 개수 : work 개수만큼 사용 (최대 10개)
# [ThreadPoolExecutor] ; ProcessPoolExecutor 도 사용가능
with ThreadPoolExecutor() as excutor:
# [스케쥴링 목록 생성]
futures_list = [excutor.submit(sum_gen, work) for work in WORKS]
# [작업 수행]
result = wait(futures_list, timeout=7) # 7초 이내에 완료되지 않으면 실패
# [결과값 저장]
my_results = [future.result() for future in result.done]
as_completed() 를 활용한 동시작업
wait()
은 timeout을 지정할 수는 있으나, 작업의 결과를 순차적으로 반환한다. 그러나 as_completed()
는 특정 작업이 끝나자마자 yield
로 반환하여 완료되는 작업에 대해 다음 작업 수행을 바로 가능하게 하여 작업의 효율성을 더 크게할 수 있다.
as_completed()
의 결과 원소는 특정 작업의 수행 결과에 해당한다. 그리고 개별 작업 수행결과는 done()
, result()
, cancelled
를 가진다. done()
는 작업 완료여부의 boolean 값을 반환하고, result()
는 완료한 작업일 경우 결과값을 반환한다. cancelled
는 완료 실패 여부를 나타내기 위한 attribute 인데 완료된 경우에도 state=finished 로 표시된다
코드는 wait()
과 모두 같고 “[as_completed 결과 출력]” 부분만 달라진다.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed
WORKS = [10000, 100000, 1000000, 10000000]
# [작업수행 함수] - 누적 합계 함수(Generator 사용)
def sum_gen(n):
return sum(n for n in range(1, n+1))
# [worker 개수]
n_worker = min(10, len(WORK_LIST)) # worker 개수 : work 개수대로 사용, 최대 10개 사용
# [ThreadPoolExecutor] ; ProcessPoolExecutor 도 사용가능
with ThreadPoolExecutor() as excutor:
# [스케쥴링 목록 생성]
futures_list = [excutor.submit(sum_gen, work) for work in WORKS]
# [as_completed 결과 출력]
for future in as_completed(futures_list):
result = future.result()
done = future.done()
cancelled = future.cancelled
print('Future Result : {}, Done : {}'.format(result, done))
print('Future Cancelled : {}'.format(cancelled))
as_completed()
를 사용한 코드를 필요한 기능만 pythonic하고 간결하게 정리해보자
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed
WORKS = [10000, 100000, 1000000, 10000000]
# [작업수행 함수] - 누적 합계 함수(Generator 사용)
def sum_gen(n):
return sum(n for n in range(1, n+1))
# [worker 개수]
n_worker = min(10, len(WORKS)) # worker 개수 : work 개수만큼 사용 (최대 10개)
# [ThreadPoolExecutor] ; ProcessPoolExecutor 도 사용가능
with ThreadPoolExecutor() as excutor:
# [스케쥴링 목록 생성]
futures_list = [excutor.submit(sum_gen, work) for work in WORKS]
# [작업 완료 결과값 저장]
result = [future.result() for future in as_complted(futures_list) if future.done()]
파이썬 비동기 처리 작업 구현 - asyncio 라이브러리 (0) | 2022.05.12 |
---|---|
코루틴 (Coroutine) (3) - 개념, 장점, 구현 (0) | 2022.05.11 |
코루틴 (Coroutine) (2) - 병행성과 Generator (0) | 2022.05.11 |
코루틴 (Coroutine) (1) - Iterator 와 Generator (0) | 2022.05.11 |
클로저 (Closure) (2) - 클로저 개념 및 구현 (0) | 2022.05.09 |
댓글 영역