data engineering (데이터 웨어하우스 vs 데이터 레이크)

데이터 웨어하우스 vs 데이터 레이크

  • 데이터 레이크라는 개념은 비교적 최신의 개념이다. 데이터 웨어하우스라고 하는 MySQL, PostgreSQL 같은 RDBMS 프로그램들을 넘어서 데이터들이 너무나 방대해졌기 때문에 나온 시스템이라고 할 수 있다.

  • 이전의 데이터 웨어하우스는 미리 짜여진 구조를 통해 가공해서 저장했기에 좀 더 접근하기 쉬었다. 반면에 데이터 레이크는 데이터가 너무 방대하기 때문에 어떤 데이터를 어떻게 사용할지 모르므로 Raw 데이터를 저장한다. 그렇기 때문에 데이터에 대한 접근성이 조금 떨어지는 면이 있었지만 Hadoop이나 Spark 같은 다양한 서비스들을 통해 그런 단점을 보완하여 처리할 수 있게 되었다.

  • 관계형 DB를 사용할 경우에는 데이터가 기하급수적으로 늘어날 수록 비용이나 관리비용도 많이 소요될 것이다.

시대의 변화에 따른 데이터 저장 방시의 진화

  • ETL, 즉 Extract하고, Transform 한 후에 Load하는 과정을 일컫는 용어이며, 이전의 데이터 웨어하우스에서는 이런 순서를 거쳐 작업했지만 최근에는 ELT, 우선 Extract하고, Load한뒤에 데이터 레이크에 넣은 다음에 Transformer하자라고 많이들 이야기 하고 있다.
  • 아래 그림은 하나의 예로서, 먼저 여러 곳에 산재해 있는 정형/비정형 데이터들을 데이터 레이크에 한곳으로 모아 그 다음 Spark가 됐던 다른 빅데이터 처리 시스템을 통해 재가공을 한 후 다른 애플리케이션이나 BI TooL들에 활용할 수 있다.

ETL

데이터 레이크 아키텍쳐

  • 여러 어플리케이션에서 나오는 데이터나 해당 API를 통해서 얻게되는 데이터들을 모아 어떻게 재가공해야할지를 고민하는 것이 가장 큰 문제일 것이다. 또한 이런 것들 통해서 redash같은 시각화를 통해 insight를 얻어야 할 것이다.

다양한 협업 툴 그리고 데이터 그리고 API

  • 또한, error가 나온다면, 그 날에 데이터만 어떻게 backfill을 통해서 확보를 할 것인지를 고민해야한다.

  • 챗봇을 통해서 구축을 할 때, 아티스트가 늘어났을때, 새로운 artist가 입력을 들어왔을 때, 저장되어있던 정보에는 없던 Unknown artist이면, Trigger base의 Lambda에 의해서 Unknown artist에 대한 데이터를 확보를 하고, 데이터 레이크는 Latency(데이터를 주고받는 속도의 개념)가 느리기 때문에 다양한 DB에 RDBMS를 구축하여 상황에 맞게 저장한다. 그러나 이 모든 데이터들이 존재하는 DB를 한곳으로 묶어야 되는 부분이 데이터 레이크이다. 그 모든걸 필자는 AWS S3에 옮길 것이다. 옮기는 과정에서 스케쥴링은 어떻게 할 것인지 그리고, 어떤 이벤트가 있을때 업데이트를 할 것인지를 정해주어야 할 것이다.

데이터 파이프라인

S3(Simple Storage System)

  • 데이터 레이크 역할을 할 S3 bucket을 만들 것이다. 일종의 폴더라고 생각하여도 될 것 같다. 아래와 같은 단계로 bucket을 만든다.

AWS S3

  • create bucket 버튼을 클릭한다.

S3 bucket 생성 과정 - 01

  • 생성할 bucket 이름을 설정한다.

S3 bucket 생성 과정 - 02

  • bucket에 관련된 설정 중 tag 부분을 설정할 수 있는 부분인데, 생성후에도 설정 가능하므로 다음단계로 넘어간다.

S3 bucket 생성 과정 - 03

  • configure option을 설정하는 단계이며, 아래와 같이 모두 public access 하게끔 default 설정값으로 선택하였다.

S3 bucket 생성 과정 - 04

  • 마지막으로 앞의 과정에서 설정한 부분들을 요약해서 보여준다. 맨 아래 Create bucket 버튼을 누르면 bucket 생성이 완료된다.

S3 bucket 생성 과정 - 05

  • 위의 단계를 다 거치면 처음과 다르게 하나의 bucket이 생성된 것을 확인할 수 있다. 물론 이전에 이미 bucket을 생성하신 분들은 여러개의 bucket 리스트가 보일 것이다.

bucket 생성 완료

  • AWS Glue는 어떨때는 데이터가 형식이 없으니까 원래는 키값이 3개였는데, 그 후 점점 키값이 늘어날 수 도 있다. 이런 경우 다양한 Table의 스키마를 관리 할 수 있다. 데이터 레이크 경우에는 지속적으로 변할수 있는 시스템이라는 것을 인지하고 있어야하기 때문에, 위와 같은 경우들을 자동화 할 수 있는 서비스이다. 가장 큰 부분이 Crwaler이다. 어떠한 Table에 변형이 일어났을 때, 이 Crwaler가 감지를 해서, 그것을 반영을 해준다.

AWS Glue

  • S3에 올릴 파일작업 Python Script 작성
    • bucket의 key값이라고 하는 dt(data type)을 정해야한다. top tracks와 같이 지속적으로 변화하는 데이터는 방대한양으로 늘어났을때는 결국엔 쪼개서 scan을 해야하므로 어떠한 형식을 통해서 Spark나 Hadoop이 readable한 형식으로 partition을 만들어놔야 Spark나 Haddep에서 최근의 데이터를 갖고있는 마지막 partition만 확인하면 되기 때문이다. 필자는 날짜를 통해 시점이 언제일지 알 수 있도록 partition을 구분지어 줄 것이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import sys
import os
import base64
import boto3
import requests
import logging
import json
import pymysql
import sys, os, argparse
from datetime import datetime
import pandas as pd

def main(host, user, passwd, db, port, client_id, client_secret):

try:
# use_unicode=True를 써야 한글같은 경우는 깨지지 않는다.
conn = pymysql.connect(host=host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
cursor = conn.cursor()
except:
logging.error("could not connect to rds")
# 보통 문제가 없으면 0
# 문제가 있으면 1을 리턴하도록 안에 숫자를 넣어준다.
sys.exit(1)

headers = get_headers(client_id, client_secret)

# RDS - 아티스트 ID를 가져오고
cursor.execute("SELECT id FROM artists")

dt = datetime.utcnow().strftime("%Y-%m-%d")
print(dt)

for (id, ) in cursor.fetchall():


# Spotify API를 통해서 데이터를 불러오고

# .json형태로 저장한뒤에
with open('top_tracks.json', 'w') as f:
for top_track in top_tracks:
json.dump(top_track, f)
f.write(os.linesep)
# S3에 import를 시킨다.
s3 = boto3.resource('s3')
object = s3.Object('spotify-chatbot-project', 'dt={}/topt-racks.json'.format(dt))

def get_headers(client_id, client_secret):

endpoint = "https://accounts.spotify.com/api/token"
encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')

headers = {
"Authorization": "Basic {}".format(encoded)
}

payload = {
"grant_type": "client_credentials"
}

r = requests.post(endpoint, data=payload, headers=headers)

access_token = json.loads(r.text)['access_token']

headers = {
"Authorization": "Bearer {}".format(access_token)
}

return headers

if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--client_id', type=str, help='Spotify app client id')
parser.add_argument('--client_secret', type=str, help='Spotify client secret')
parser.add_argument('--host', type=str, help='end point host')
parser.add_argument('--username', type=str, help='AWS RDS id')
parser.add_argument('--database', type=str, help='DB name')
parser.add_argument('--password', type=str, help='AWS RDS password')
args = parser.parse_args()
port = 3306
main(host=args.host, user=args.username, passwd=args.password, db=args.database, port=port, client_id=args.client_id, client_secret=args.client_secret)
  • 필자가 사용할 Spark는 Parquet이라는 format을 더 선호하기에, Parquet으로 변형을 한후, compression(압축)을 통해서 데이터 Volume도 줄이면서 더 Performance도 좋게끔 할 것이다. 위에서 만든 top_tracks.json 로컬 파일을 S3에 저장을 할 것이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import sys
import os
import base64
import boto3
import requests
import logging
import json
import pymysql
import sys, os, argparse
from datetime import datetime
import pandas as pd

def main(host, user, passwd, db, port, client_id, client_secret):

try:
# use_unicode=True를 써야 한글같은 경우는 깨지지 않는다.
conn = pymysql.connect(host=host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
cursor = conn.cursor()
except:
logging.error("could not connect to rds")
# 보통 문제가 없으면 0
# 문제가 있으면 1을 리턴하도록 안에 숫자를 넣어준다.
sys.exit(1)

headers = get_headers(client_id, client_secret)

# RDS - 아티스트 ID를 가져오고
cursor.execute("SELECT id FROM artists LIMIT 10")

# Top tracks를 Spotify에서 가져오고
top_tracks = []
for (id, ) in cursor.fetchall():

URL = 'https://api.spotify.com/v1/artists/{id}/top-tracks'.format(id)
params = {
'country' : 'US'
}
r = requests.get(URL, params=params, headers=headers)
raw = json.loads(r.text)
top_tracks.extend(raw['tracks'])

top_tracks = pd.DataFrame(top_tracks)
top_tracks.to_parquet('top-tracks.parquet', engine='pyarrow', compression='snappy')
sys.exit(0)

dt = datetime.utcnow().strftime("%Y-%m-%d")

# S3에 import를 시킨다.
s3 = boto3.resource('s3')

object = s3.Object('spotify-chatbot-project', 'dt={}/top-tracks.parquet'.format(dt))

data = open('top-tracks.parquet', 'rb')
object.put(Body=data)


def get_headers(client_id, client_secret):

endpoint = "https://accounts.spotify.com/api/token"
encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')

headers = {
"Authorization": "Basic {}".format(encoded)
}

payload = {
"grant_type": "client_credentials"
}

r = requests.post(endpoint, data=payload, headers=headers)

access_token = json.loads(r.text)['access_token']

headers = {
"Authorization": "Bearer {}".format(access_token)
}

return headers

if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--client_id', type=str, help='Spotify app client id')
parser.add_argument('--client_secret', type=str, help='Spotify client secret')
parser.add_argument('--host', type=str, help='end point host')
parser.add_argument('--username', type=str, help='AWS RDS id')
parser.add_argument('--database', type=str, help='DB name')
parser.add_argument('--password', type=str, help='AWS RDS password')
args = parser.parse_args()
port = 3306
main(host=args.host, user=args.username, passwd=args.password, db=args.database, port=port, client_id=args.client_id, client_secret=args.client_secret)
  • 위의 방법처럼 한다면 error가 발생되는데, 그 이유는 nested column이라 해서 아래 그림처럼 어떠한 key값의 안에 list형식으로 존재하는 struct형식이기 때문에 발생된다. parquet화 해서 사용하면 데이터를 통해서 performance가 빨라지지만, 그렇게 performance를 좋게하려면 정확하게 define을 해 주어야 한다. 보통 json 형식으로 가장 raw 형태로 저장한다음, processing job이 한 번 돌은후에, 새로운 data가 가장 raw data가 S3에 들어왔을때, trigger가 되어서 해당 parquet화를 하고 싶은 몇개의 데이터만 뽑은 후 다시 돌아서 다른 S3 bucket안에 저장하는 데이터 파이프라인을 거친다.

top tracks API response 구조

  • 앞으로의 작업은 jsonpath라는 package가 필요하므로 아래의 코드처럼 설치를 해주어야 한다.
1
pip install jsonpath --user
  • 이제 top_tracks 뿐만 아니라 Audio Feature들도 추가해서 parquet 형태로 S3에 저장해 줄 것이다. 필자의 경우 국가코드는 US에 대해서만 우선 실행했으며, artist_id들 중 Audio feature가 US에서는 존재하지 않는 artist들이 있어 이 부분은 나중에 다른 국가나 artist들에 대해 동일한 현상으로 error가 발생되는 경우를 방지하기 위해 if문으로 Null값이 포함되어있는지 아닌지를 check해본뒤 리스트에 추가해주는 방식으로 코드를 작성했다.
  • spotify_make_s3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import sys
import os
import base64
import boto3
import requests
import logging
import json
import pymysql
import sys, os, argparse
from datetime import datetime
import pandas as pd
import jsonpath
from pandas.io.json import json_normalize

def main(host, user, passwd, db, port, client_id, client_secret):

try:
# use_unicode=True를 써야 한글같은 경우는 깨지지 않는다.
conn = pymysql.connect(host=host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
cursor = conn.cursor()
except:
logging.error("could not connect to rds")
# 보통 문제가 없으면 0
# 문제가 있으면 1을 리턴하도록 안에 숫자를 넣어준다.
sys.exit(1)

headers = get_headers(client_id, client_secret)

# RDS - 아티스트 ID를 가져오고
cursor.execute("SELECT id FROM artists")

# jsonpath라는 package를 통해서 해당 path안에 어떤 데이터를 insert했을때,
# key 값을 자동으로 찾아서 그에 해당하는 value값을 가져오기 때문이다.
top_track_keys = {
'id' : 'id',
'name' : 'name',
'popularity' : 'popularity',
'external_url' : 'external_urls.spotify'
}

# Top tracks를 Spotify에서 가져오고
top_tracks = []
for (id, ) in cursor.fetchall():

URL = 'https://api.spotify.com/v1/artists/{}/top-tracks'.format(id)
params = {
'country' : 'US'
}
r = requests.get(URL, params=params, headers=headers)
raw = json.loads(r.text)

for i in raw['tracks']:
top_track = {}
for k, v in top_track_keys.items():
top_track.update({k: jsonpath.jsonpath(i, v)})
# 데이터를 mapping하기 위해서 artist_id를 추가한다.
top_track.update({'artist_id': id})
top_tracks.append(top_track)

# track_id
track_ids = [i['id'][0] for i in top_tracks]

# parquet화 할 수 있는 방법은 여러가지 package가 있지만 pandas를 사용할 것 이다.
# 필자가 사용할 Spark는 Parquet이라는 format을 더 선호하기에, Parquet으로 변형을 한후,
# compression(압축)을 통해서 데이터 Volume도 줄이면서 더 Performance도 좋게끔 할 것이다.
# 위에서 만든 top_tracks.json local 파일을 S3에 저장을 할 것이다.
top_tracks = pd.DataFrame(top_tracks)
top_tracks.to_parquet('top-tracks.parquet', engine='pyarrow', compression='snappy')

dt = datetime.utcnow().strftime("%Y-%m-%d")

# S3에 import를 시킨다.
s3 = boto3.resource('s3')
# bucket의 key값이라고 하는 data type을 정해야한다.
# top tracks와 같이 지속적으로 변화하는 데이터는 방대한양으로 늘어났을때는
# 결국엔 쪼개서 scan을 해야하므로 어떠한 형식을 통해서 Spark나 Hadoop이 readable한 형식으로
# partition을 만들어놔야 Spark나 Haddep에서 최근의 데이터를 갖고있는 마지막 partition만 확인하면 되기 때문이다.
# 필자는 날짜를 통해 시점이 언제일지 알 수 있도록 partition을 구분지어 줄 것이다.
object = s3.Object('spotify-chatbot-project', 'top-tracks/dt={}/top-tracks.parquet'.format(dt))

data = open('top-tracks.parquet', 'rb')
object.put(Body=data)

tracks_batch = [track_ids[i: i+100] for i in range(0, len(track_ids), 100)]

audio_features = []
null_features = []
for batch in tracks_batch:
ids = ','.join(batch)
URL = 'https://api.spotify.com/v1/audio-features/?ids={}'.format(ids)

r = requests.get(URL, headers=headers)
if 'null' in r.text:
raw = json.loads(r.text)
for i in raw['audio_features']:
if pd.isnull(i) == False:
# audio_features는 dictionary key값 안에 또 다른 list형식으로 되어있지 않으므로
# 그냥 사용해도 된다.
null_features.append(i)

else:
raw = json.loads(r.text)
audio_features.extend(raw['audio_features'])

audio_features.extend(null_features)
audio_features = json_normalize(audio_features)
audio_features.to_parquet('audio_features.parquet', engine='pyarrow', compression='snappy')

s3 = boto3.resource('s3')
object = s3.Object('spotify-chatbot-project', 'audio_features/dt={}/top-tracks.parquet'.format(dt))
data = open('audio_features.parquet', 'rb')
object.put(Body=data)




def get_headers(client_id, client_secret):

endpoint = "https://accounts.spotify.com/api/token"
encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')

headers = {
"Authorization": "Basic {}".format(encoded)
}

payload = {
"grant_type": "client_credentials"
}

r = requests.post(endpoint, data=payload, headers=headers)

access_token = json.loads(r.text)['access_token']

headers = {
"Authorization": "Bearer {}".format(access_token)
}

return headers


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--client_id', type=str, help='Spotify app client id')
parser.add_argument('--client_secret', type=str, help='Spotify client secret')
parser.add_argument('--host', type=str, help='end point host')
parser.add_argument('--username', type=str, help='AWS RDS id')
parser.add_argument('--database', type=str, help='DB name')
parser.add_argument('--password', type=str, help='AWS RDS password')
args = parser.parse_args()
port = 3306
main(host=args.host, user=args.username, passwd=args.password, db=args.database, port=port, client_id=args.client_id, client_secret=args.client_secret)
  • 위의 코드 실행 결과 아래 그림과 같이 S3에 각각의 파일 형식으로 저장되어 업데이트시에 해당 시간과 날짜에 의해 partition되어지는 데이터 저장 결과를 볼 수 있다.

S3 bucket에 저장 결과 - 01

S3 bucket에 저장 결과 - 01

  • 여기에 top-tracks 데이터와 audio feature 데이터 뿐만아니라 artist 데이터도 S3에 parquet형식으로 저장해 줄 것이다.
  • spotify_s3_artist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import sys
import os
import base64
import boto3
import requests
import logging
import json
import pymysql
import sys, os, argparse
from datetime import datetime
import pandas as pd
import jsonpath
from pandas.io.json import json_normalize


def main(host, user, passwd, db, port, client_id, client_secret):

try:
# use_unicode=True를 써야 한글같은 경우는 깨지지 않는다.
conn = pymysql.connect(host=host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
cursor = conn.cursor()
except:
logging.error("could not connect to rds")
# 보통 문제가 없으면 0
# 문제가 있으면 1을 리턴하도록 안에 숫자를 넣어준다.
sys.exit(1)

# RDS - 아티스트 ID를 가져오고
cursor.execute("SELECT * FROM artists")
colnames = [d[0] for d in cursor.description]
artists = [dict(zip(colnames, row)) for row in cursor.fetchall()]
artists = pd.DataFrame(artists)

artists.to_parquet('artists.parquet', engine='pyarrow', compression='snappy')

dt = datetime.utcnow().strftime("%Y-%m-%d")

s3 = boto3.resource('s3')
object = s3.Object('spotify-chatbot-project', 'artists/dt={}/artists.parquet'.format(dt))
data = open('artists.parquet', 'rb')
object.put(Body=data)

if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--client_id', type=str, help='Spotify app client id')
parser.add_argument('--client_secret', type=str, help='Spotify client secret')
parser.add_argument('--host', type=str, help='end point host')
parser.add_argument('--username', type=str, help='AWS RDS id')
parser.add_argument('--database', type=str, help='DB name')
parser.add_argument('--password', type=str, help='AWS RDS password')
args = parser.parse_args()
port = 3306
main(host=args.host, user=args.username, passwd=args.password, db=args.database, port=port, client_id=args.client_id, client_secret=args.client_secret)

artist 데이터 parquet 형식으로 s3에 저장