상세 컨텐츠

본문 제목

[DynamoDB] 데이터 scan시 기억해야할 ResultIterator, segment, last_evaluated_key, limit 개념 및 활용법 : 네이버 블로그

카테고리 없음

by jisooo 2020. 1. 19. 17:44

본문

 

(예시 코드는 pynamoDB를 이용하였습니다.)

https://github.com/pynamodb/PynamoDB


pynamoDB에서는 scan의 검색결과로 위 코드에 나와있는 ResultIterator라는 결과집합을 반환한다.

(scan뿐만아니라 query의 검색결과도 ResultIterator를 반환한다.)

scan해서 나온 데이터의 갯수가 0개이더라도 위의 ResultIterator 타입의 결과집합이 나오므로,

꼭 null로 체크하면 안되고, 결과값으로 나온 iterator를 돌려서 사이즈로 항목을 체크해줘야 한다.

위의 ResultIterator 코드를 잠시 보면,

class ResultIterator(object): """ ResultIterator handles Query and Scan item pagination. http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.html#Query.Pagination http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.Pagination """ def __init__(self, operation, args, kwargs, map_fn=None, limit=None, rate_limit = None): self.page_iter = PageIterator(operation, args, kwargs, rate_limit) self._first_iteration = True self._map_fn = map_fn self._limit = limit self._total_count = 0 def _get_next_page(self): page = next(self.page_iter) self._count = page[CAMEL_COUNT] self._items = page.get(ITEMS) # not returned if 'Select' is set to 'COUNT' self._index = 0 if self._items else self._count self._total_count += self._count def __iter__(self): return self def __next__(self): if self._limit == 0: raise StopIteration if self._first_iteration: self._first_iteration = False self._get_next_page() while self._index == self._count: self._get_next_page() item = self._items[self._index] self._index += 1 if self._limit is not None: self._limit -= 1 if self._map_fn: item = self._map_fn(item) return item def next(self): return self.__next__() ... ... ...

__iter__, __next__ 메소드를 구현하고 있다. (Iterator)

따라서 yield 키워드를 사용하거나 반복문 내에서 item을 하나하나 꺼내와서 사용할 수 있다.

result_iter = User.scan(limit=100, filter_condition=User.phone_number.startswith("+82") data = [data async for data in result_iter] print(len(data)) print(result_iter.total_count) assert len(data) == result_iter.total_count

주의할점은, 결과집합의 total_count나 last_evaluated_key값을 제대로 받아오려면,

data = [data async for data in result_iter]

위 코드처럼 ResultIterator를 반복문으로 돌려주고 나서 total_count나 last_evaluated_key 연산을 해올 수 있다.

scan한 결과의 result_iter를 바로 받아서 total_count나 last_evaluated_key에 접근하려고 하면 각각 0, None을 리턴할 것이니

사용할 때 꼭 ResultIterator를 반복문으로 모두 돌려준뒤 사용하자.


2) filter_condition

테이블에서 데이터를 스캔해올 때, 특정 검색 조건을 걸어 스캔하는 옵션이다.

형식은 pynamoDB에서는 {ModelClass}.{attrubute_name}.{condition} 와 같이 사용한다.

아래는 예시 Model 클래스인 User 클래스이다.

(pynamoDB의 모델로써 클래스를 사용하려면 아래처럼 pynamodb.Model을 상속받아야 한다.)

class User(Model): class Meta: table_name = DYNAMODB_TABLE_NAME region = DYNAMODB_REGION billing_mode = 'PAY_PER_REQUEST' aws_access_key_id = AWS_ACCESS_KEY_ID aws_secret_access_key = AWS_SECRET_ACCESS_KEY class LogIdIndex(GlobalSecondaryIndex): class Meta: index_name = "phone_number_index" billing_mode = 'PAY_PER_REQUEST' projection = AllProjection() phone_number = UnicodeAttribute(hash_key=True) user_id = UnicodeAttribute(hash_key=True) created_at = UTCDateTimeAttribute(range_key=True) password = UnicodeAttribute(null=True) phone_number = UnicodeAttribute() phone_number_index = LogIdIndex()

위와 같이 User Model 클래스를 작성하였다.

filtering_start_time = () filtering_end_time = () scan_iterator_results = User.scan( filter_condition=User.created_at.between(filtering_start_time, filtering_end_time) )

User model의 created_at attrubute에 between 조건을 걸어,

created_at attrubute의 값이 filtering_start_time ~ filtering_end_time 날짜를 포함한 사이에 해당하는 행들을 조건으로 가져온다.

between 조건 이외에도 문자열의 경우 startswith(prefix), exist, does_not_exist, contains 등의 조건으로 필터를 적용할 수 있다.

PynamoDB에서 위와 같은 필터링을 적용하는 방법은 아래 문서에 잘 설명되어있다.

https://pynamodb.readthedocs.io/en/latest/conditional.html


3) segment / total_segment

DynamoDB에서 scan을 사용할 때, 병렬로 여러 개의 스캔단위를 돌릴 수 있도록 사용할 수 있는 개념이 segment이다.

DynamoDB에서 한번에 많은 양의 데이터를 스캔하려고 할 때 아래와 같이 몇가지 문제점이 발생할 수 있다.

스캔할 테이블 또는 인덱스가 클수록 Scan을 완료하는 데 걸리는 시간이 늘어납니다. 또한 순차적 Scan은 프로비저닝된 읽기 처리량 용량을 항상 최대한 사용할 수 있는 것은 아닙니다. DynamoDB가 여러 물리적 파티션 간에 라지 테이블 데이터를 분산해도 Scan 작업은 한 번에 한 파티션만 읽을 수 있습니다. 이러한 이유로 Scan의 처리량은 단일 파티션의 최대 처리량에 따라 제약을 받습니다.

https://docs.aws.amazon.com/ko_kr/amazondynamodb/latest/developerguide/Scan.html#Scan.Pagination

결론적으로 scan으로 한번의 많은 데이터를 처리하려고 하면,

시간도 비효율적이고, DynamoDB의 읽기 용량을 최대한 다 사용할수도 없고,

또한 scan의 결과 데이터가 1MB를 넘으면 last_evaluated_key를 이용한 pagination 작업을 설정해줘야 한다!

이러한 문제점 때문에 사용할 수 있는 개념이 segment이다.

위의 그림이 AWS 문서에 설명되어 있는데,

전체 Data items들을 사용자가 설정한 total_segment 갯수로 쪼개서, 각 segment를 병렬로 scan 작업을 돌릴 수 있다.

예를 들어 전체 1000개의 데이터를 scan 하려고 하는데,

한번에 모든 데이터를 scan하지 않고, 전체 segment를 10개로 scan 단위로 쪼갤 수 있다.

그럼 각 segment 작업단위당 100개의 데이터만을 스캔하여 전체 10개의 segment를 병렬로 작업할 수 있다.

TOTAL_SEGMENT = 10 for segment in range(TOTAL_SEGMENT): result_iterator = User.scan( filter_condition=User.phone_number.startswith("+82"), segment=segment, total_segment=TOTAL_SEGMENT) yield result_iterator )

위 코드는 총 10개의 segment로 스캔 단위로 쪼개서 각 세그먼트의 결과 데이터를 yield하는 코드이다.

위의 코드를, 각 세그먼트를 동시에 병렬로 돌릴 수 있는 코드로 작성해보자. (비동기 코드 작성을 위해 inpynamoDB, asyncio 사용)

https://github.com/MyMusicTaste/InPynamoDB

TOTAL_SEGMENT = 10 futures = [] for segment in range(TOTAL_SEGMENT): # future는 아직 코드(scan 작업)가 수행되지 않은 coroutine 객체를 담는다. future = User.scan( filter_condition=User.phone_number.startswith("+82"), segment=segment, total_segment=TOTAL_SEGMENT ) futures.append(future) ) # futures에 담긴 coroutine 리스트가 각각 동시에 수행되어 segment_result_list 변수에 저장된다. segment_result_list = await asyncio.gather(*futures)

4) last_evaluated_key

위의 segment에서는 많은 양의 데이터를 병렬로 scan 작업을 하기 위해 사용하기 위해 사용했다면,

last_evaluated_key는 DynamoDB에서 한번에 스캔할 수 있는 데이터의 크기가 1MB로 한정되어있기 때문에,

pagination을 설정하여 많은 양의 데이터를 순차적으로 가져올 수 있는 기능이다. pagination을 위해 보통 limit 옵션과 함께 쓰인다.

아래의 예시 코드를 보자.

last_evaluated_key = None while True: result_iter = await User.scan(last_evaluated_key=last_evaluated_key, limit=100) data = [data async for data in result_iter] last_evaluated_key = await result_iter.last_evaluated_key if last_evaluated_key is None: break

위 코드를 보면,

첫번째 반복문에서 result_iter에 담긴 item의 수는 limit을 100개로 지정했으므로 100개의 아이템만 저장이 되어있다.

만약 전체 아이템의 갯수가 1000개일 경우,

0~99번째 인덱스의 User Model 객체 데이터들이 data 변수에 담기게 된다.

data 변수에 각 Model을 할당한 후, last_evaluated_key를 읽어오면, 99번째 User Model 객체의

hash_key, range_key 정보가 last_evaluated_key 변수에 담긴다.

(last_evaluated_key는 아래와 같이 hash_key, range_key 형식이다.)

{'user_id': {'S': '83a92a00116e4450babf56714c4b2b54'}, 'created_at': {'S': '2020-01-06T11:17:58.313760+0000'}}

그렇게 반복문을 10번 돌면 1000개의 데이터를 모두 읽어오고,

11번째 반복문에서는 더이상 읽어올 데이터가 없으므로 빈 item list를 가지고 있는 ResultIterator가 스캔의 결과로 반환된다.

result_iter를 반복문으로 돌린뒤, last_evaluated_key를 받아오면 빈 item list를 갖고있으므로 None을 리턴한다.

더이상 pagination할 수 있는 last_evaluated_key가 없으므로 반복문을 종료하게 된다.

 

댓글 영역