상세 컨텐츠

본문 제목

[Python, asyncio] Coroutine과 task, event_loop 개념과 사용법 정리.

프로그래밍/Python

by jisooo 2019. 9. 28. 00:46

본문

 

asyncio는 async/await 구문을 사용하여 동시성 코드를 작성하는 라이브러리입니다.

asyncio는 고성능 네트워크 및 웹 서버, 데이터베이스 연결 라이브러리, 분산 작업 큐 등을 제공하는

여러 파이썬 비동기 프레임워크의 기반으로 사용됩니다.

asyncio는 다음과 같은 작업을 위한 고수준 API 집합을 제공합니다:

- 파이썬 코루틴들을 동시에 실행하고 실행을 완전히 제어할 수 있습니다.

- 네트워크 IO와 IPC를 수행합니다;

- 자식 프로세스를 제어합니다;

- 를 통해 작업을 분산합니다;

- 동시성 코드를 동기화합니다;

또한 라이브러리와 프레임워크 개발자가 다음과 같은 작업을 할 수 있도록 하는 저수준 API가 있습니다:

- 네트워킹, 자식 프로세스 실행, OS 시그널 처리 등의 비동기 API를 제공하는 이벤트 루프를 만들고 관리합니다.

- 트랜스포트를 사용하여 효율적인 프로토콜을 구현합니다.

- 콜백 기반 라이브러리와 async/await 구문을 사용한 코드 간에 다리를 놓습니다.

import asyncio
 
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
 
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

파이썬 3.6 공식문서에 작성된 예시 코드이다.

우선 print_sum()이라는 coroutine이 있고, 그 안에서 compute()의 수행 완료를 기다린다.

첫번째 그림에서 위 코드의 동작 절차를 잘 보여주고 있는데,

print_sum() 코루틴을 이벤트 루프 내에서 실행시킨다.

print_sum()에서 compute() 코루틴이 실행완료될때까지 await 키워드를 붙여줬으므로,

compute() 코루틴의 내용들이 모두 수행완료되고 리턴값을 받을 때까지 print_sum()의 아래 코드는 수행되지 않는다.

 

print("Compute %s + %s ..." % (x, y))

문장이 수행되고, 1초동안 작업을 중단한 뒤, x + y값을 print_sum()의 results에 할당한뒤 이후의 코드들을 수행한다.

<실행 결과>

Compute 1 + 2 ...
# 1초동안 수행을 중단하고 아래의 결과가 나타난다.
1 + 2 = 3

2. coroutine

https://en.wikipedia.org/wiki/Coroutine

코루틴이란? (= 동시 실행 루틴이라고도 한다.)

프로그램에서 순서는 일반적으로 불려지는 쪽이 부르는 쪽에 속하고 있는 것이 대부분이지만

어느 쪽도 종속 관계가 아니라 대등한 관계로 서로 호출하는 것이다.

예를 들면, 게임 프로그램에서 각 플레이어 루틴은 서로 코루틴된다.

복수 프로세스 간에서 한정된 형태의 통신을 행하는 프로그램을 순차 제어로 실현한 것으로 볼 수도 있다.

①호출 루틴과 피호출 루틴이 대등 관계를 유지하는 처리 절차.

부차적 프로그램의 수행이 완전히 끝나기 전에 제어가 호출 루틴으로 돌아가는 것이 동시 실행 과정이다.

그리고 제어가 부차적 프로그램으로 돌아왔을 때는 중단된 부분부터 다시 수행이 계속된다.

②주종 관계를 갖지 않고 서로 호출하는 둘 이상의 모듈들.

서브루틴의 제어 전달 개념과 유사한 것. 각 호출에서 초기화되는 서브루틴과는 달리,

호출 시 관련된 모든 정보를 보존하는 능력을 갖는다.

그리고 다음에 다시 시작할 때에는 이전에 실행했던 다음부터 실행할 수 있는 논리를 갖는다.


3. task

우리는 객체가 await 표현식에서 사용될 수 있을 때 어웨이터블 객체라고 말합니다.

많은 asyncio API는 어웨이터블을 받아들이도록 설계되었습니다.

어웨이터블 객체에는 세 가지 주요 유형이 있습니다: 코루틴, 태스크퓨처.

태스크는 코루틴을 동시에 예약하는 데 사용됩니다.

코루틴이 asyncio.create_task()와 같은 함수를 사용하여 태스크로 싸일 때 코루틴은 곧 실행되도록 자동으로 예약됩니다:


* task 만들기

asyncio.create_task(coro)

coro 코루틴Task로 감싸서 실행을 예약하고 Task 객체를 반환한다.

이 함수는 파이썬 3.7에서 추가되었습니다.

파이썬 3.7 이전 버전에서는, 대신 저수준 asyncio.ensure_future() 함수를 사용할 수 있다.

먼저 create_task로 코루틴의 실행을 예약한뒤, 이벤트 루프를 통해 예약한 태스크를 실행시킬 수 있다.

get_running_loop()에 의해 반환된 루프에서 태스크가 실행되고,

현재 스레드에 실행 중인 루프가 없으면 RuntimeError가 발생한다.

* get_event_loop()로 이벤트 루프를 얻어오면,

만약 현재 스레드에 실행 중인 루프가 없다해도, 새로 이벤트 루프를 생성하여 반환하므로 에러가 발생하지 않는다!

async def coroutine():
print("coroutine executed.")
# In Python 3.7+ # coroutine을 task로 만들지만 아직 실행시키지 않고 실행 예약을 함.
task = asyncio.create_task(coroutine())
 # This works in all Python versions but is less readable
# create_task와 같은 기능. 코루틴을 실행 예약 함.
task = asyncio.ensure_future(coroutine()) ...

아래의 테스트 코드를 통해 실행해보았다.

task에 coroutine 실행을 예약하고, asyncio의 이벤트 루프를 받아와, 예약된 task들을 실행시킬 수 있다.

import asyncio
async def coroutine1():
print("Hello coroutine1!")
 
async def coroutine2():
print("Hello coroutine2!")
 
async def coroutine3():
print("Hello coroutine3!")
 
async def main():
#python 3.7
#task = await asyncio.create_task(coroutine1())
task = await asyncio.ensure_future(coroutine1())
 
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

<실행결과>

Hello coroutine1!


* 동시에 여러 태스크(코루틴) 실행하기.

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

gather function은 인자의 aws 시퀀스에 있는 어웨이터블 객체를 동시에 실행한다.

인자로 넘어온 모든 어웨이터블이 성공적으로 완료되면,

결과는 반환된 값들이 합쳐진 리스트를 반환한다.

(결괏값의 순서는 aws에 있는 어웨이터블의 순서와 일치)

아래의 테스트 코드를 보자.

import asyncio
# 위 테스트 코드와 차이점은 각 코루틴 함수에서 리턴값이 추가되었다.
 
async def coroutine1():
print("Hello coroutine1!")
return "coroutine1"
 
async def coroutine2():
print("Hello coroutine2!")
return "coroutine2"
 
async def coroutine3():
print("Hello coroutine3!")
return "coroutine3"
 
async def main():
# 코루틴 실행 결과(리턴값)가 coroutine_list 변수에 list로 담긴다.
coroutine_list = await asyncio.gather(coroutine1(), coroutine2(), coroutine3())
print(coroutine_list)
 
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

<실행결과>

Hello coroutine2!
Hello coroutine1!
Hello coroutine3!
['coroutine1', 'coroutine2', 'coroutine3']

* asyncio의 event_loop 실행하기.

loop.run_until_complete(future)

future(Future의 인스턴스)가 완료할 때까지 실행시키는 함수이다.

인자가 코루틴 객체 면, asyncio.Task로 실행되도록 묵시적으로 예약된다.

퓨처의 결과를 반환하거나 퓨처의 예외를 발생시킨다.

* python 3.7버전부터는 event_loop를 직접 얻어와 future를 실행하는 저수준 API를 사용하는 것보다,

아래와 같이 run()함수를 통해 코루틴을 실행시키는 것을 권장한다.

asyncio.run(coro, *, debug=False)

이 함수는 전달된 코루틴을 실행하고, asyncio 이벤트 루프와 비동기 제너레이터의 파이널리제이션을 관리합니다.

다른 asyncio 이벤트 루프가 같은 스레드에서 실행 중일 때, 이 함수를 호출할 수 없습니다.

debugTrue면, 이벤트 루프가 디버그 모드로 실행됩니다.

이 함수는 항상 새 이벤트 루프를 만들고 끝에 이벤트 루프를 닫습니다.

asyncio 프로그램의 메인 진입 지점으로 사용해야 하고, 이상적으로는 한 번만 호출해야 합니다.

* 이 함수는 파이썬 3.7에서 잠정적으로 asyncio에 추가됨.

필자는 test 코드를 작성할 때,

임의의 여러 데이터를 PynamoDB의 모델 객체로 만들고,

데이터베이스에 저장하는 작업을 위의 futures 객체 리스트로 만들어 gather() function을 이용해 동시에 수행하는 코드로 활용한 적이 있었다.

# 테스트용 데이터로 기본 샘플 데이터 추가 futures = [User.initialize( hash_key=uuid.uuid4().hex, phone=data['phone'], join_date=data['join_date'], user_type=data['user_type'] ) for data in range(SAMPLE_DATA)]

위와 같이 futures 변수에 SAMPLE_DATA갯수만큼의 PynamoDB의 모델 객체를 초기화하는 코드(User.initialize())를

아직 수행하지 않은! 코루틴 객체가 리스트로 담기게 된다.

디버깅을 하면 위와 같이 coroutine 객체가 futures에 담기게 된다.

만약에 이미 초기화된 모델 객체를 리스트에 담으려면

futures = [await User.initialize( hash_key=uuid.uuid4().hex, phone=data['phone'], address=data['address'],
user_type=data['user_type'] ) for data in range(SAMPLE_DATA)]

위와 같이 'await' 키워드를 붙여주면 된다. 이는 await 뒤의 awaitable 객체를 모두 수행할 때 까지 기다린다.

dummy_users = await asyncio.gather(*futures)

아직 수행되지 않은 User model 코루틴 객체들이 담긴 futures 리스트를 인자로 해서

asyncio.gather() function을 호출한다. (await를 앞에 붙여서!)

그러면 gather function을 통해 각 User.initialize(..) 가 담긴 코루틴 리스트 요소 하나하나를 실행시킨다.

그리고 dummy_users에는 각 futures 리스트 요소 하나하나가 실행된 순서대로 결과값이 리스트로 전달된다.

여기선 결과값이 초기화된 pynamodb의 Model을 상속받은 User 객체가 된다.

futures = [user.save() for user in dummy_user] await asyncio.gather(*futures)

dummy_user에 담긴 Model 리스트들을 DB에 저장하는 save() coroutine 리스트를 futures에 다시 저장한다.

futures에 저장된 user.save() coroutine 리스트는 이 시점에서 역시 아직 모델이 저장되지 않은 실행되지 않은 상태의 코루틴 리스트이다.

둘째줄에서 asyncio.gather() function을 통해 각 model을 저장하는 코루틴들이 수행되어 동시에 model 객체들이 저장된다.

 

* 위 코드는 비동기를 지원하는 inpynamoDB 라이브러리를 사용하였다.

https://github.com/MyMusicTaste/InPynamoDB

관련글 더보기

댓글 영역