data engineering (AWS DynamoDB 사용해서 오디오 feature 활용하기)

NoSQL

  • RDBMS가 제한되는 점들을 보완하기 위해 활용하기 시작했다. NoSQL 같은 경우는 RDB 처럼 특정 Structure가 정해져 있지 않기 때문에 데이터의 추가가 용이하다.

NoSQL

  • Scalability

    • SQL Databases are vertically scalable - CPU, RAM or SSD

      • 여러가지 사양들을 정해놓아서 정해진 resource를 초과할 경우 또다른 resource를 추가 시켜주어야 한다.
    • NoSQL Databases are horizontally scalable - Sharding / Partitioning

      • NoSQL은 Partitioning을 통해 다른 resource에서 남는 부분을 부족한 부분에 넘겨줄 수 있다.
  • 그렇다면 partition이 무엇이냐 조금 더 자세하게 알아보자면, 우선 해석 그대로 나눈다는 의미가 있다. partition을 해야하는 이유 중 가장 중요한 이유는 데이터가 늘어나면 늘어날수록 SQL을 통하여 query문의 performance가 느려질수 밖에 없는데, 이런 부분을 방지하고자 나누어 주는 것이다. 데이터의 양이 늘어나는데 매번 모든 데이터를 다 읽어와서 작업을 하는 것은 너무 비효율적이기 때문이다. 물론 무조건적으로 NoSQL이 좋다는 의미는 아니다. NoSQL의 경우에는 아래의 그림에서 오른쪽 그림처럼 애초에 우리가 lookup(참조)해야 하는 데이터의 양을 줄여서 나누어 놓는 것이다. 이는 Spark의 경우에도 동일하다.

파티션이란

  • Vertical partitioning을 하는 이유는 중복적인 데이터는 ERD를 Table을 분리해서 관리를 하게끔 해서 Normalization을 하였는데, Normalization을 진행하고도 column 수가 늘어나면, 해당 Table을 읽는데 속도가 느려질수가 있다. 또한, 어떤 column들은 지속적으로 업데이트가 될 수도 있지만, 어떤 column들은 지속적으로 업데이트가 되지 않을 수 있기 때문에 사용한다. RDBM에서는 나눌수는 있지만, 데이터가 엄청나게 많은 양이어서 데이터를 처리하는데 속도의 향상을 기대한다면 사용하지만, 그렇지 않은 경우는 많이 사용하지 않는다.

버티컬 파티션이란

  • Horizontal Partition은 NoSQL에서 무조건 사용된다. 어떤 데이터를 검색할 때 Sharded Key로 빠르게 Access하기 위해서 사용되는 것이다. 마치 RDB에서 primary key를 통한 빠른 search와 유사한 느낌이다. 동일한 컬럼이지만 데이터의 양을 나누어서 각각 저장하는 방식이다. Partition key를 통해 데이터를 나누어 주는데, AWS안의 DynamoDB를 만들면서 확인해 볼 것이다.

호리즌탈 파티션이란

  • spotify Developer artist’s top tracks를 들어가보면, 아래 그림과 같이 path patameter로 해당 artist id를 받아서 결과를 출력 해준다. NoSQL이므로 track 전체를 가져와도 되지만, 그 안에서 필요한 데이터를 또 작업을 해야하는데, artist 한명이 여러개의 track을 가지고 있는 경우도 있으므로 artist id만을 partition key로 사용할 수 없다. 왜냐하면 partition key로 사용되는 경우 해당 column에 동일한 값을 사용할 수 없기 때문이다. 그러므로 artist_id를 partition key로 해놓고, track id를 sort key로 사용할 것이다.

top tracks API 사용법

AWS DynamoDB 사용하기

  • 먼저 AWS에 로그인을 마친 후, 아래 그림과 같이 service 탭을 눌러 내려보면 이전의 RDS를 만들었을 때 보았던, Database란에서 DynamoDB를 볼 수 있다. 클릭하면 하게 되면 아래 그림과 같은 페이지가 나올 것이다.

AWS service에서 DynamoDB 찾기

DynamoDB 페이지

  • 그 다음은, 위의 페이지에서 보이는 것처럼 create table을 클릭하면 다음과 같은 페이지로 이동 될 것이다. 여기서 Table name은 말 그대로 Table의 이름을 지정해주는 부분이고, Partition key 부분은 RDS에서 Primary Key와 동일한 역할(빠르게 참조할 수 있게 해주는)을 하는 부분이라고 생각하면된다. 그러므로 해당 artist_id에 해당하는 row는 유일해야 한다. 하지만 sort key를 사용할 수 있는데, 이 부분은 해당 세션이 언제 생성되었는지를 알려주기 위한 역할이라고 할 수 있다. 위에서 말했던 것 처럼 partition key는 artist_id로 sort key는 track id를 사용할 것이다.
  • Provisioned는 server를 띄우기 때문에 그 상황 안에서 free tier를 사용가능한 사람만 사용할 수 있는데, Auto-Scale이 가능한 부분이 있다. 하지만 Auto-Scale이 실질적으로 적용이 되는 시간차가 있어 해당 Scaling이 진행되는 동안은 traffic이 많아진다면 속도가 느려질 수 있다는 단점이 있을 수 있다.

  • On-demand는 어느 정도 필요한지를 모를 경우에 aws에서 알아서 scaling을 해주고 쓰이는 만큼만 돈을 지불할 상황일 때 사용한다.

  • 필자는 아래와 같이 설정한 후에, DynamoDB를 생성하였다.

DynamoDB 생성 페이지

  • 생성한 후에는 생성된 DynamoDB의 페이지가 아래와 같이 나온다.

DynamoDB 페이지

  • Read capacity units(읽기 요청 단위 1)은 강력히 일관된 읽기 요청 1 또는 최종적 일관된 읽기 요청 2(최대 4 KB 크기 항목의 경우)를 나타냅니다. 트랜잭션 읽기 요청은 최대 4 KB 크기 항목의 1회 읽기를 수행하는 데 2개의 읽기 요청 단위가 필요합니다.

  • Write capacity units(쓰기 요청 단위 1)은 최대 1 KB 크기의 항목에 대해 1회 쓰기를 나타냅니다.

  • 참고

Capacity

  • DB를 사용하다가 primary key가 Table내에서 유일한 RDBMS와는 다르게 partition key를 추가할 수도 있다. 필자의 경우 추가를 할 필요가 없었기에 하진 않았지만, 추가하여 사용할 경우는 아래와 같이 생성해주면된다.

Partition Key와 Sort Key 추가

Audio Feature DynamoDB에 insert 하기

  • AWS service를 사용하기 위한 패키지인 boto3를 설치한다. boto3는 AWS가 제공하는 Python SDK의 이름이다. DynamoDB에 insert를 하거나 select해오거나 할 때 필요하다고 간단하게 이해해도 좋을 것 같다. console에서는 로그인을 통해 인증절차를 거치지만 boto3를 통한 인증방식은 이전의 우리가 설정해 놓았던 aws configure를 통해 설정되있는 상태를 통해 인증하므로 AWS를 console에 로그인하지 않고도 컨트롤할 수 있는 것이다.
1
2
#pip3 install boto3 --user
pip install boto3 --user

오디오 피처 살펴보기

  • 전체적인 flow는 이미 RDS받아놓은 artists Table에서 artist_id들을 받아온 후, Spotify API를 통해 해당 artist_id의 top track 정보를 받아서 AWS의 DynamoDB에 저장할 것이다.
  • Spotify API에서는 해당 artist_id 뿐만아니라 국가에대한 parameter도 받는데, 이는 국가마다 해당 artist의 top track이 다를 수도 있기 때문이다. 필자는 우선 US(미국)과 CA(캐나다)에대해서만 살펴 볼 것이다.

top tracks 페이지

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import sys
import os
import boto3
import requests
import base64
import json
import logging
import pymysql

def main(host, user, passwd, db, port, client_id, client_secret):


try:
dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-2', endpoint_url='http://dynamodb.ap-northeast-2.amazonaws.com')
except:
logging.error('could not connect to dynamodb')
sys.exit(1)

try:
conn = pymysql.connect(host=host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
cursor = conn.cursor()
except:
logging.error("could not connect to rds")
sys.exit(1)

headers = get_headers(client_id, client_secret)

# 사용할 DynamoDB로 만든 Table 이름
table = dynamodb.Table('top_tracks')

cursor.execute('SELECT id FROM artists')

# 국가는 미국과 캐나다만을 선택할 것이다.
countries = ['US', 'CA']
for country in countries:
for (artist_id, ) in cursor.fetchall():

URL = "https://api.spotify.com/v1/artists/{}/top-tracks".format(artist_id)
params = {
'country': country
}

r = requests.get(URL, params=params, headers=headers)

raw = json.loads(r.text)

for track in raw['tracks']:
# 위에서 가져온 track에서는 artist_id가 없기 때문에 아래와 같이 먼저 dictionary로 만들어줌.
data = {
'artist_id': artist_id,
'country': country
}

data.update(track)

table.put_item(
Item=data
)





def get_headers(client_id, client_secret):

endpoint = "https://accounts.spotify.com/api/token"
encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')

headers = {
"Authorization": "Basic {}".format(encoded)
}

payload = {
"grant_type": "client_credentials"
}

r = requests.post(endpoint, data=payload, headers=headers)

access_token = json.loads(r.text)['access_token']

headers = {
"Authorization": "Bearer {}".format(access_token)
}

return headers


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--client_id', type=str, help='Spotify app client id')
parser.add_argument('--client_secret', type=str, help='Spotify client secret')
parser.add_argument('--host', type=str, help='end point host')
parser.add_argument('--username', type=str, help='AWS RDS id')
parser.add_argument('--database', type=str, help='DB name')
parser.add_argument('--password', type=str, help='AWS RDS password')
args = parser.parse_args()
port = 3306
main(host=args.host, user=args.username, passwd=args.password, db=args.database, port=port, client_id=args.client_id, client_secret=args.client_secret)
  • 아래와 같이 AWS console 창에서도 데이터가 insert된 것을 확인 할 수 있다.

Spotify top tracks DynamoDb에 insert

DynamoDB 저장된 데이터 불러오기

  • 아래 그림에서와 같이 boto3를 이용하여 DynamoDB에서 데이터를 select하여 가져오는 방법은 get_item 함수를 사용하는 방법이 있다. 단, 해당 Table의 Partition key 나 sort key로 지정한 column의 값을 전부 입력해 주어야 error 없이 작동된다.

boto3 get item 메서드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import sys
import os
import boto3

def main():

try:
dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-2', endpoint_url='http://dynamodb.ap-northeast-2.amazonaws.com')
except:
logging.error('could not connect to dynamodb')
sys.exit(1)

table = dynamodb.Table('top_tracks')

response = table.get_item(
Key={
'artist_id' : '0L8ExT028jH3ddEcZwqJJ5',
'id' : '0uppYCG86ajpV2hSR3dJJ0'
}
)
print(response)

if __name__=='__main__':
main()

결과

1
{'Item': {'is_playable': True, 'duration_ms': Decimal('282906'), 'external_ids': {'isrc': 'USWB19901574'}, 'uri': 'spotify:track:0uppYCG86ajpV2hSR3dJJ0', 'country': 'US', 'name': 'Give It Away', 'album': {'total_tracks': Decimal('19'), 'images': [{'width': Decimal('640'), 'url': 'https://i.scdn.co/image/ab67616d0000b273153d79816d853f2694b2cc70', 'height': Decimal('640')}, {'width': Decimal('300'), 'url': 'https://i.scdn.co/image/ab67616d00001e02153d79816d853f2694b2cc70', 'height': Decimal('300')}, {'width': Decimal('64'), 'url': 'https://i.scdn.co/image/ab67616d00004851153d79816d853f2694b2cc70', 'height': Decimal('64')}], 'artists': [{'name': 'Red Hot Chili Peppers', 'href': 'https://api.spotify.com/v1/artists/0L8ExT028jH3ddEcZwqJJ5', 'id': '0L8ExT028jH3ddEcZwqJJ5', 'type': 'artist', 'external_urls': {'spotify': 'https://open.spotify.com/artist/0L8ExT028jH3ddEcZwqJJ5'}, 'uri': 'spotify:artist:0L8ExT028jH3ddEcZwqJJ5'}], 'release_date': '1991-09-24', 'name': 'Blood Sugar Sex Magik (Deluxe Edition)', 'album_type': 'album', 'release_date_precision': 'day', 'href': 'https://api.spotify.com/v1/albums/30Perjew8HyGkdSmqguYyg', 'id': '30Perjew8HyGkdSmqguYyg', 'type': 'album', 'external_urls': {'spotify': 'https://open.spotify.com/album/30Perjew8HyGkdSmqguYyg'}, 'uri': 'spotify:album:30Perjew8HyGkdSmqguYyg'}, 'popularity': Decimal('72'), 'artists': [{'name': 'Red Hot Chili Peppers', 'href': 'https://api.spotify.com/v1/artists/0L8ExT028jH3ddEcZwqJJ5', 'id': '0L8ExT028jH3ddEcZwqJJ5', 'type': 'artist', 'external_urls': {'spotify': 'https://open.spotify.com/artist/0L8ExT028jH3ddEcZwqJJ5'}, 'uri': 'spotify:artist:0L8ExT028jH3ddEcZwqJJ5'}], 'disc_number': Decimal('1'), 'href': 'https://api.spotify.com/v1/tracks/0uppYCG86ajpV2hSR3dJJ0', 'track_number': Decimal('9'), 'external_urls': {'spotify': 'https://open.spotify.com/track/0uppYCG86ajpV2hSR3dJJ0'}, 'artist_id': '0L8ExT028jH3ddEcZwqJJ5', 'preview_url': 'https://p.scdn.co/mp3-preview/fcdf3224d230b26b637418c2d1028bc482db7fce?cid=93b8cdc701294ab7992eaf370c7ba1cd', 'is_local': False, 'id': '0uppYCG86ajpV2hSR3dJJ0', 'explicit': False, 'type': 'track'}, 'ResponseMetadata': {'RequestId': 'KKGRB8AQD7G3H83UAA4J40TPE7VV4KQNSO5AEMVJF66Q9ASUAAJG', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'KKGRB8AQD7G3H83UAA4J40TPE7VV4KQNSO5AEMVJF66Q9ASUAAJG', 'x-amz-crc32': '1465669044', 'content-type': 'application/x-amz-json-1.0', 'content-length': '2296', 'date': 'Fri, 21 Feb 2020 12:56:26 GMT'}, 'RetryAttempts': 0}}
  • 그런데, 여기서 어떠한 형식으로든 DB를 사용하려면 query도 사용할 필요가 있을 것이다. 이런 경우 사용하는 것이 Scanning과 Querying이다. Querying은 Partition key(Primary key)를 알고있을 경우에 사용하고, Scanning은 그 이외의 부수적인 다른 attribute(column)의 value를 알고 있을 때 사용한다.

querying, scanning

partition key와 다른 attribute를 사용한 query문

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import sys
import os
import boto3

from boto3.dynamodb.conditions import Key, Attr

def main():

try:
dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-2', endpoint_url='http://dynamodb.ap-northeast-2.amazonaws.com')
except:
logging.error('could not connect to dynamodb')
sys.exit(1)

table = dynamodb.Table('top_tracks')

response = table.query(
KeyConditionExpression=Key('artist_id').eq('0L8ExT028jH3ddEcZwqJJ5')
FilterExpression=Attr('popularity').gt(80)
)
print(response['Items'])

if __name__=='__main__':
main()

Attribute만 사용하는 scanning

  • 허나, 되도록이면 querying을 사용하는 것을 추천한다. 왜냐하면, scan을 key값이 없기 때문에 전체 데이터를 한번 다 돌아서 조건에 맞는 데이터들을 가져오는 것이므로, 데이터의 양이 많아 row의 수가 많다면, 속도가 느려지기 때문이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import sys
import os
import boto3

from boto3.dynamodb.conditions import Key, Attr

def main():

try:
dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-2', endpoint_url='http://dynamodb.ap-northeast-2.amazonaws.com')
except:
logging.error('could not connect to dynamodb')
sys.exit(1)

table = dynamodb.Table('top_tracks')

response = table.scan(
FilterExpression=Attr('popularity').gt(80)
)
print(response['Items'])

if __name__=='__main__':
main()