data engineering (데이터 모델링 및 챗봇 만들기)

Spotify 데이터 유사도 모델링

  • 모든 track을 다 유클리디안 거리를 계산해서 유사도를 측정하기에는 많은 양이기 때문에 해당 Artist의 track들의 audio feature 데이터에 대해 평균을 낸 값을 사용하여 Artist 끼리의 유사도를 계산할 것이다. 해당 유사도를 계산하기 위해 아래와 같이 먼저 RDS에 접속하여 table을 생성해 준다.
1
2
3
mysql -h spotify.cgaj5rvtgf25.ap-northeast-2.rds.amazonaws.com -P 3306 -u hb0619 -p

CREATE TABLE related_artists (artist_id VARCHAR(255), y_artist VARCHAR(255), distance FLOAT, PRIMARY KEY(artist_id, y_artist)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 그 다음은 우리가 미리 만들어놓았던 Athena의 DataBase는 무엇이었는지를 확인하자. 필자는 아래와 같이 따로 Database를 만들지 않고 default로 사용했다. 또한, 입력했었던 날짜를 확인해놓아야 추후에 코드 작성시 Athena로 접속하여 만들어진 테이블들을 참조할 수 있다.

Athena database

  • 필자는 Athena에 미리 만들어놓았던 두가지 top_tracks와 audio_features 테이블을 이용하여 유사도를 구하고 해당 유사도를 MySQL DB에 insert하는 방식으로 작업을 진행 할 것이다.

  • data_modeling.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import sys
import os
import logging
import pymysql
import boto3
import time
import math

host = "end-point url"
port = you port number
username = "your MYSQL DB ID"
database = "your MYSQL DB Name"
password = "your MYSQL DB Password"

def main():

try:
conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
cursor = conn.cursor()
except:
logging.error("could not connect to rds")
sys.exit(1)

athena = boto3.client('athena')

query = """
SELECT
artist_id,
AVG(danceability) AS danceability,
AVG(energy) AS energy,
AVG(loudness) AS loudness,
AVG(speechiness) AS speechiness,
AVG(acousticness) AS acousticness,
AVG(instrumentalness) AS instrumentalness
FROM
top_tracks t1
JOIN
audio_features t2 ON t2.id = t1.id AND CAST(t1.dt AS DATE) = DATE('2020-02-24') AND CAST(t2.dt AS DATE) = DATE('2020-02-24')
GROUP BY t1.artist_id
"""
# 위에서 DATE를 정하는 부분에서 CAST(t1.dt AS DATE) = CURRENT_DATE - INTERVAL '1' DAY 이렇게 현재날짜를 기준으로 차이나는 기간을 통해 정해줄 수 있다.
# 필자는 여러번 Athena에 실행하지 않았기 때문에 최근에 Athena에 만들어 놓은 위의 두 테이블의 데이터를 직접 보고 날짜를 지정했다.

r = query_athena(query, athena)
results = get_query_result(r['QueryExecutionId'], athena)
artists = process_data(results)

query = """
SELECT
MIN(danceability) AS danceability_min,
MAX(danceability) AS danceability_max,
MIN(energy) AS energy_min,
MAX(energy) AS energy_max,
MIN(loudness) AS loudness_min,
MAX(loudness) AS loudness_max,
MIN(speechiness) AS speechiness_min,
MAX(speechiness) AS speechiness_max,
ROUND(MIN(acousticness),4) AS acousticness_min,
MAX(acousticness) AS acousticness_max,
MIN(instrumentalness) AS instrumentalness_min,
MAX(instrumentalness) AS instrumentalness_max
FROM
audio_features
"""
r = query_athena(query, athena)
results = get_query_result(r['QueryExecutionId'], athena)
avgs = process_data(results)[0]

metrics = ['danceability', 'energy', 'loudness', 'speechiness', 'acousticness', 'instrumentalness']

for i in artists:
for j in artists:
dist = 0
for k in metrics:
x = float(i[k])
x_norm = normalize(x, float(avgs[k+'_min']), float(avgs[k+'_max']))
y = float(j[k])
y_norm = normalize(y, float(avgs[k+'_min']), float(avgs[k+'_max']))
dist += (x_norm-y_norm)**2

dist = math.sqrt(dist) ## euclidean distance

data = {
'artist_id': i['artist_id'],
'y_artist': j['artist_id'],
'distance': dist
}

insert_row(cursor, data, 'related_artists')


conn.commit()
cursor.close()


def normalize(x, x_min, x_max):

normalized = (x-x_min) / (x_max-x_min)

return normalized


def query_athena(query, athena):
response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': 'default'
},
ResultConfiguration={
'OutputLocation': "s3://spotify-chatbot-project/athena-panomix-tables/",
'EncryptionConfiguration': {
'EncryptionOption': 'SSE_S3'
}
}
)

return response

# 아래와 같이 Athena API는 response를 받았다고 해서 결과를 보여주는 것이 아니라 실행을 시킨 후에
# 해당 query id를 통해 결과를 가져오는 형식으로 이루어져 있다.
def get_query_result(query_id, athena):

response = athena.get_query_execution(
QueryExecutionId=str(query_id)
)
while response['QueryExecution']['Status']['State'] != 'SUCCEEDED':
if response['QueryExecution']['Status']['State'] == 'FAILED':
logging.error('QUERY FAILED')
break
time.sleep(5)
response = athena.get_query_execution(
QueryExecutionId=str(query_id)
)
# 중요한 점은 MaxResults가 1000이 Max라는 점이다.
response = athena.get_query_results(
QueryExecutionId=str(query_id)
)

return response


def process_data(results):

columns = [col['Label'] for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]

listed_results = []
for res in results['ResultSet']['Rows'][1:]:
values = []
for field in res['Data']:
try:
values.append(list(field.values())[0])
except:
values.append(list(' '))
listed_results.append(dict(zip(columns, values)))

return listed_results


def insert_row(cursor, data, table):

placeholders = ', '.join(['%s'] * len(data))
columns = ', '.join(data.keys())
key_placeholders = ', '.join(['{0}=%s'.format(k) for k in data.keys()])
sql = "INSERT INTO %s ( %s ) VALUES ( %s ) ON DUPLICATE KEY UPDATE %s" % (table, columns, placeholders, key_placeholders)
cursor.execute(sql, list(data.values())*2)


if __name__=='__main__':
main()
  • 위의 파일을 실행시켜보자. 위의 python script 파일이 존재하는 path로 이동하여 아래 명령문을 실행시키면 실행에 완료될때까지 걸린 시간 또한 알 수 있다.
1
2
3
4
5
time python3 data_modeling.py

real 28m11.013s
user 1m36.141s
sys 0m24.518s
  • 이제 MySQL에 접속해서 데이터가 제대로 insert 됬는지 확인해 보자.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
SELECT * FROM related_artists LIMIT 20;

# 결과
+------------------------+------------------------+-----------+
| artist_id | y_artist | distance |
+------------------------+------------------------+-----------+
| 00FQb4jTyendYWaN8pK0wa | 00FQb4jTyendYWaN8pK0wa | 0 |
| 00FQb4jTyendYWaN8pK0wa | 01C9OoXDvCKkGcf735Tcfo | 0.366558 |
| 00FQb4jTyendYWaN8pK0wa | 02rd0anEWfMtF7iMku9uor | 0.327869 |
| 00FQb4jTyendYWaN8pK0wa | 02uYdhMhCgdB49hZlYRm9o | 0.595705 |
| 00FQb4jTyendYWaN8pK0wa | 03r4iKL2g2442PT9n2UKsx | 0.632109 |
| 00FQb4jTyendYWaN8pK0wa | 03YhcM6fxypfwckPCQV8pQ | 0.812604 |
| 00FQb4jTyendYWaN8pK0wa | 04gDigrS5kc9YWfZHwBETP | 0.498764 |
| 00FQb4jTyendYWaN8pK0wa | 04tBaW21jyUfeP5iqiKBVq | 0.322017 |
| 00FQb4jTyendYWaN8pK0wa | 0543y7yrvny4KymoaneT4W | 0.365608 |
| 00FQb4jTyendYWaN8pK0wa | 05E3NBxNMdnrPtxF9oraJm | 0.958604 |
| 00FQb4jTyendYWaN8pK0wa | 06HL4z0CvFAxyc27GXpf02 | 0.483454 |
| 00FQb4jTyendYWaN8pK0wa | 06nevPmNVfWUXyZkccahL8 | 0.0592581 |
| 00FQb4jTyendYWaN8pK0wa | 06nsZ3qSOYZ2hPVIMcr1IN | 0.39567 |
| 00FQb4jTyendYWaN8pK0wa | 085pc2PYOi8bGKj0PNjekA | 0.608243 |
| 00FQb4jTyendYWaN8pK0wa | 08avsqaGIlK2x3i2Cu7rKH | 0.328059 |
| 00FQb4jTyendYWaN8pK0wa | 09C0xjtosNAIXP36wTnWxd | 0.210568 |
| 00FQb4jTyendYWaN8pK0wa | 0BvkDsjIUla7X0k6CSWh1I | 0.606556 |
| 00FQb4jTyendYWaN8pK0wa | 0bvRYuXRvd14RYEE7c0PRW | 0.670187 |
| 00FQb4jTyendYWaN8pK0wa | 0C0XlULifJtAgn6ZNCW2eu | 0.70478 |
| 00FQb4jTyendYWaN8pK0wa | 0cc6vw3VN8YlIcvr1v7tBL | 0.716507 |
+------------------------+------------------------+-----------+
20 rows in set (0.01 sec)
  • JOIN을 통해 각각 이름을 볼수있게 해주면서 가장 distance가 작은 즉 유사성이 큰 데이터 순서로 보여주길 원해 아래와 같은 query를 작성하여 실행시켰다. 그 결과, audio_features로만 모델링을 했음에도 비슷한 장르의 아티스트가 묶여있음을 확인할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
SELECT p1.name, p2.name, p1.url, p2.url, p2.distance FROM artists p1 JOIN (SELECT t1.name, t1.url, t2.y_artist, t2.distance FROM artists t1 JOIN related_artists t2 ON t2.artist_id = t1.id) p2 ON p2.y_artist=p1.id WHERE distance != 0 ORDER BY p2.distance ASC LIMIT 20;

+-------------------------------+-------------------------------+--------------------------------------------------------+--------------------------------------------------------+-----------+
| name | name | url | url | distance |
+-------------------------------+-------------------------------+--------------------------------------------------------+--------------------------------------------------------+-----------+
| Four Tops | Alan Jackson | https://open.spotify.com/artist/7fIvjotigTGWqjIz6EP1i4 | https://open.spotify.com/artist/4mxWe1mtYIYfP040G38yvS | 0.0241995 |
| Alan Jackson | Four Tops | https://open.spotify.com/artist/4mxWe1mtYIYfP040G38yvS | https://open.spotify.com/artist/7fIvjotigTGWqjIz6EP1i4 | 0.0241995 |
| Martha Reeves & The Vandellas | Jimmy Ruffin | https://open.spotify.com/artist/1Pe5hlKMCTULjosqZ6KanP | https://open.spotify.com/artist/0hF0PwB04hnXfYMiZWfJzy | 0.0258624 |
| Jimmy Ruffin | Martha Reeves & The Vandellas | https://open.spotify.com/artist/0hF0PwB04hnXfYMiZWfJzy | https://open.spotify.com/artist/1Pe5hlKMCTULjosqZ6KanP | 0.0258624 |
| George Harrison | Martha Reeves & The Vandellas | https://open.spotify.com/artist/7FIoB5PHdrMZVC3q2HE5MS | https://open.spotify.com/artist/1Pe5hlKMCTULjosqZ6KanP | 0.0272287 |
| Martha Reeves & The Vandellas | George Harrison | https://open.spotify.com/artist/1Pe5hlKMCTULjosqZ6KanP | https://open.spotify.com/artist/7FIoB5PHdrMZVC3q2HE5MS | 0.0272287 |
| Nik Kershaw | Elton John | https://open.spotify.com/artist/7kCL98rPFsNKjAHDmWrMac | https://open.spotify.com/artist/3PhoLpVuITZKcymswpck5b | 0.0272474 |
| Elton John | Nik Kershaw | https://open.spotify.com/artist/3PhoLpVuITZKcymswpck5b | https://open.spotify.com/artist/7kCL98rPFsNKjAHDmWrMac | 0.0272474 |
| Tammi Terrell | Kim Carnes | https://open.spotify.com/artist/75jNCko3SnEMI5gwGqrbb8 | https://open.spotify.com/artist/5PN2aHIvLEM98XIorsPMhE | 0.0279891 |
| Kim Carnes | Tammi Terrell | https://open.spotify.com/artist/5PN2aHIvLEM98XIorsPMhE | https://open.spotify.com/artist/75jNCko3SnEMI5gwGqrbb8 | 0.0279891 |
| Roger Daltrey | Arcade Fire | https://open.spotify.com/artist/5odf7hjI7hyvAw66tmxhGF | https://open.spotify.com/artist/3kjuyTCjPG1WMFCiyc5IuB | 0.0291541 |
| Arcade Fire | Roger Daltrey | https://open.spotify.com/artist/3kjuyTCjPG1WMFCiyc5IuB | https://open.spotify.com/artist/5odf7hjI7hyvAw66tmxhGF | 0.0291541 |
| Billy Fury | Otis Redding | https://open.spotify.com/artist/7rtLZcKWGV4eaZsBwSKimf | https://open.spotify.com/artist/60df5JBRRPcnSpsIMxxwQm | 0.0292248 |
| Otis Redding | Billy Fury | https://open.spotify.com/artist/60df5JBRRPcnSpsIMxxwQm | https://open.spotify.com/artist/7rtLZcKWGV4eaZsBwSKimf | 0.0292248 |
| Katy Perry | John Fogerty | https://open.spotify.com/artist/6jJ0s89eD6GaHleKKya26X | https://open.spotify.com/artist/5ujCegv1BRbEPTCwQqFk6t | 0.0302168 |
| John Fogerty | Katy Perry | https://open.spotify.com/artist/5ujCegv1BRbEPTCwQqFk6t | https://open.spotify.com/artist/6jJ0s89eD6GaHleKKya26X | 0.0302168 |
| Dierks Bentley | The Cadillac Three | https://open.spotify.com/artist/7x8nK0m0cP2ksQf0mjWdPS | https://open.spotify.com/artist/1nivFfWu6oXBFDNyVfFU5x | 0.0313435 |
| The Cadillac Three | Dierks Bentley | https://open.spotify.com/artist/1nivFfWu6oXBFDNyVfFU5x | https://open.spotify.com/artist/7x8nK0m0cP2ksQf0mjWdPS | 0.0313435 |
| Sheryl Crow | Phil Collins | https://open.spotify.com/artist/4TKTii6gnOnUXQHyuo9JaD | https://open.spotify.com/artist/4lxfqrEsLX6N1N4OCSkILp | 0.0317203 |
| Phil Collins | Sheryl Crow | https://open.spotify.com/artist/4lxfqrEsLX6N1N4OCSkILp | https://open.spotify.com/artist/4TKTii6gnOnUXQHyuo9JaD | 0.0317203 |
+-------------------------------+-------------------------------+--------------------------------------------------------+--------------------------------------------------------+-----------+
20 rows in set (1.18 sec)
  • 이제 Facebook messenger API를 통해 챗봇을 서비스를 만들어 볼 것이다. 아래 그림과 같이 구글에서 Facebook messenger API를 검색하여 페이지로 접속한다.

Facebook messenger API 검색

  • Facebook messenger API 웹페이지를 접속하면 아래 그림처럼 보이며, 그에 대한 작동원리에 대한 설명은 Introduction의 learn more를 클릭하면 두번째 그림과 같이 작동원리를 보여준다. 간단히 말하자면, User가 Facebook messenger API를 통해 질의하면 그 정보를 Business Server에 보내서 해당 질문에 따른 답변을 가져와 보여주는 형식이다.

Facebook messenger API 웹페이지

Facebook messenger API 작동방식

  • Facebook messenger API를 통해서는 다양한 방식의 답변을 제공할 수 있다. 이미 만들어져있는 UI/UX 템플릿들이 존재하기 때문에 원하는 형식에 맞춰 다양하게 서비스를 제공할 수 있다.

FaceBook messenger API의 Quick Answer

Facebook messenger의 message 템플릿

  • Facebook messenger API를 사용하기 위해서는 가장 먼저 페이지가 만들어져 있어야 한다. 아래와 같이 새로운 페이지를 만들거나 이미 만들어져 있는 자신의 페이지를 먼저 등록시킨다.

페이지 생성 - 01

페이지 생성 - 02

페이지 생성 - 03

페이지 생성 - 04

페이지 생성 - 05

  • Lambda를 통해서 AWS와 Facebook messenger API를 연결해 볼 것이다. Lambda를 사용하는 이유는 지난번에 언급했던 것과 같이 EC2와 같이 서버를 항상 띄어놓고 정해진 resource를 통해 서비스를 관리하면 늘어나거나 줄어드는 User에 대해서 유연하게 처리하기 힘들기 때문이다. Lambda는 예를 들어 기하급수적으로 User가 늘더라도 그에따라 병렬적으로 작업하기 때문에 Traffic의 크기에 크게 영향을 받지 않는다. 반대로 EC2의 경우에는 해당 Traffic이 증가함에 따라 여러가지 장치를 구현해 놓아야한다. AWS에 로그인한 후 Lambda를 들어가서, 아래 그림과 같이 새로운 Lambda Function을 생성해 준다.

Facebook messenger API를 사용하기 위한 Lambda Function 생성 - 01

Facebook messenger API를 사용하기 위한 Lambda Function 생성 - 02

Facebook messenger API를 사용하기 위한 Lambda Function 생성 - 03

Facebook messenger API를 사용하기 위한 Lambda Function 생성 - 04

  • 이제 위에서 만든 Lambda Function과 Facebook messenger API를 연결하기 위해서 AWS에서 API 관리 및 설정을 담당하는 API Gateway 페이지로 이동해서 새롭게 API Gate를 만들 것이다.

AWS API Gateway service

AWS API Gateway 생성 - 01

  • 필자는 REST API 방식으로 생성할 것이기 때문에 아래와 같이 설정하였으며, API이름도 설정해 주었다.

AWS API Gateway 생성 - 02

  • 그 다음은 Crawling할 때 한번씩 접해봤을 법한 GET, POST Method를 만들어 주는 과정을 거친다. 먼저 GET은 Integration type을 Lambda Function으로 설정해 주고, Lambda Function은 방금 만들어놓은 것을 사용할 것이다. 이렇게 설정한 뒤에 Integration Request 탭으로 이동하여 Mapping Templates의 Request body passthrough 아래와같이 설정하며, mapping Templates에 application/json을 추가해준다. Generate template을 Method Request passthrough로 설정한 후 최종적으로 save를 하여 GET method의 설정을 마친다. 필자가 사용할 서비스에서는 POST method는 Facebook API에 데이터를 주는 역할 밖에 없기 때문에 크게 설정할 것이 없다.

AWS API Gateway 생성 - 03

AWS API Gateway 생성 - 04

AWS API Gateway 생성 - 05

AWS API Gateway 생성 - 06

AWS API Gateway 생성 - 07

AWS API Gateway 생성 - 08

AWS API Gateway 생성 - 09

  • GET, POST method를 다 설정했다면, 사용하기 위해서는 배포를 해야 할 것이다. 아래 그림과 같이 action버튼을 눌러 deploy api를 선택하여 stage를 새롭게 만들어주며, 이름을 설정한다.

API Gateway 배포 - 01

API Gateway 배포 - 02

  • deploy를 다 완료하게 되면, 아래 그림과 같은 화면이 나타날 것이다. 그 중 아래 빨간색 상자 안에 있는 invoke URL은 우리가 Facebook에 연결해 줄 endpoint 역할을 한다. 추후에 invoke URL 주소를 복사한 후에 아래 그림에서와 같이 Facebook에서 만들어 놓은 app의 콜백 url 추가를 눌러 추가해 줄 것이다.

Facebook messenger API webhook url 추가

  • 위의 그림 처럼 webhook url을 추가 해주려면 Lambda Function을 만들어 주어야 하는데 먼저 아래 그림에서와 토큰을 생성해서 복사한 후 Lambda Function을 아래 그림과 같이 작성해 준뒤에 webhook url을 추가해 줄 수 있다. 참고로 페이지 토큰은 Facebook app에서 page token을 생성하여 해당 값을 적어주고, verify token은 임으로 지정해주면 된다.

페이지 토큰 생성 - 01

페이지 토큰 생성 - 02

webhook url 추가를 위한 Lambda Function 수정

  • 위에서 지정한 verify token과 아래 그림에서 처럼 API Gateway를 클릭하여 이전에 invoke URL의 주소를 복사하여 아래 그림과 같이 webhook url을 추가해준다. 이렇게 하면 connection은 완료한 상태이다.

invoke URL

callback URL 추가

  • 이제 본격적으로 챗봇을 구현하기 위해 Lambda Function의 else 밑의 부분을 수정해 볼 것이다. 먼저 이전과 마찬가지로 Lambda Function은 S3에 올려 그 파일을 사용하기 위해 requirements.txt와 shell script를 포함하는 하나의 파일로 만들어 준다. 또한, AWS에서 S3에 새로운 bucket을 생성해준다. 필자는 아래와 같이 spotify-chat-bot이라는 이름으로 새롭게 bucket을 만들어 주었다.

새로운 S3 bucket 생성

  • 전체적인 구조는 아래와 같다.

    1
    2
    3
    4
    chatbot
    ├── deploy.sh
    ├── lambda_handler.py
    └── requirements.txt
  • deploy.sh

1
2
3
4
5
6
7
8
9
10
#!/bin/bash
rm -rf ./libs
pip install -r requirements.txt -t ./libs

rm *.zip
zip spotify.zip -r *

aws s3 rm s3://spotify-chat-bot/spotify.zip
aws s3 cp ./spotify.zip s3://spotify-chat-bot/spotify.zip
aws lambda update-function-code --function-name spotify-lambda --s3-bucket spotify-chat-bot --s3-key spotify.zip
  • 위와 같이 작성했다면 먼저 deploy.sh의 파일 권한을 바꿔준다. 모든 사용자(a)의 실행(x) 권한 추가(+)하여 준다.

    1
    chmod +x deploy.sh
  • requirements.txt

1
2
requests
pymysql
  • lambda_hendler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# -*- coding: utf-8 -*-
import sys
sys.path.append('./libs')
import logging
import requests
import pymysql
import fb_bot
import json
import base64
import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

PAGE_TOKEN = "your page token"
VERIFY_TOKEN = "your verify code"


def lambda_handler(event, context):

# event['params'] only exists for HTTPS GET

if 'params' in event.keys():

if event['params']['querystring']['hub.verify_token'] == VERIFY_TOKEN:
return int(event['params']['querystring']['hub.challenge'])
else:
logging.error('wrong validation token')
raise SystemExit
else:
logger.info(event)
  • 위와 같이 작성한 상태한 후 해당 파일이 존재하는 path에서 아래 shell script를 작동시킨다.
1
./deploy.sh
  • AWS S3에서 해당 bucket을 확인해 보면 아래와 같이 spotify.zip 파일이 존재함을 확인할 수 있다.

bucket 업로드 확인

  • 다시 AWS Lambda Function으로 돌아가 해당 bucket과 연결을 시켜 볼 것이다.

Lambda Function에 bucket 파일 연결하기

  • Facebook App으로 돌아가서 아래 화면과 같이 필드를 추가해 주어야 한다.

Facebook messenger API 필드 추가 - 01

Facebook messenger API 필드 추가 - 02

  • 이제 webhook으로 연결해 놓은 페이지로 접속하여 아래와 같이 버튼을 만들어 놓는다. 그 이유는 Lambda Function의 나머지 부분을 작성하기 위해서는 어떻게 event 구조가 구성되어져 있는지 확인해야 하기 때문이다.

webhook 페이지 버튼 추가 - 01

webhook 페이지 버튼 추가 - 02

webhook 페이지 버튼 추가 - 03

  • 이제 AWS Lambda Function을 통해 어떻게 message가 들어오는지 확인하기 위해 아래 그림과 같이 page에서 버튼 테스트를 진행하고, 메세지는 간단하게 hello를 입력해보았다.

페이지 버튼 테스트 - 01

페이지 버튼 테스트 - 02

  • AWS CloudWatch에서 log를 살펴보면, 아래 그림과 같이 받아오는 것을 확인 할 수 있다. 아래 빨간색 상자안의 key 값 중 recipient는 해당 페이지의 id이며, sender의 id는 Facebook User의 id이다. 고유의 값은 아니고 각 페이지에 각 User에 대한 id이므로 동일한 User가 다른 페이지에서 요청을 했다면, 다른 id를 갖는다.

페이지 버튼 테스트 - 03

  • app을 관리할 수 있는 python script 파일을 fb_bot.py라는 이름으로 작성해 주었다. 이 파일 또한 위의 lambda function내에 존재할 수 있도록 path를 잡아주어야 한다. Facebook app은 graph Facebook API를 통해서 control할 수 있다. 아래 패키지 중 Enum은 고유한 이름 집합과 값을 정의하는 데 사용할 수 있는 네 가지 열거형 클래스를 정의하는데 사용되어 진다. 아래함수에서 for문을 통해 NotificationType을 작동시킨다면, NotificationType.REGULAR, NotificationType.SILENT_PUSH, NotificationType.no_push 식으로 값이 프린트 된다. 아래 탬플릿에 맞는 형식은 Facebook messenger API에서 확인할 수 있다.

fb_bot.py에 사용된 템플릿 - 01

  • fb_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#!/usr/bin/env python

import sys
sys.path.append("./libs")
import os
import requests
import base64
import json
import logging
from enum import Enum

DEFAULT_API_VERSION = 6.0

## messaging types: "RESPONSE", "UPDATE", "MESSAGE_TAG"

class NotificationType(Enum):
regular = "REGULAR"
silent_push = "SILENT_PUSH"
no_push = "no_push"

class Bot:

def __init__(self, access_token, **kwargs):

self.access_token = access_token
self.api_version = kwargs.get('api_version') or DEFAULT_API_VERSION
self.graph_url = 'https://graph.facebook.com/v{0}'.format(self.api_version)

@property
def auth_args(self):
if not hasattr(self, '_auth_args'):
auth = {
'access_token': self.access_token
}
self._auth_args = auth
return self._auth_args

def send_message(self, recipient_id, payload, notification_type, messaging_type, tag):

payload['recipient'] = {
'id': recipient_id
}

#payload['notification_type'] = notification_type
payload['messaging_type'] = messaging_type

if tag is not None:
payload['tag'] = tag

request_endpoint = '{0}/me/messages'.format(self.graph_url)

response = requests.post(
request_endpoint,
params = self.auth_args,
json = payload
)

logging.info(payload)
return response.json()

def send_text(self, recipient_id, text, notification_type = NotificationType.regular, messaging_type = 'RESPONSE', tag = None):

return self.send_message(
recipient_id,
{
"message": {
"text": text
}
},
notification_type,
messaging_type,
tag
)

def send_quick_replies(self, recipient_id, text, quick_replies, notification_type = NotificationType.regular, messaging_type = 'RESPONSE', tag = None):

return self.send_message(
recipient_id,
{
"message":{
"text": text,
"quick_replies": quick_replies
}
},
notification_type,
messaging_type,
tag
)

def send_attachment(self, recipient_id, attachment_type, payload, notification_type = NotificationType.regular, messaging_type = 'RESPONSE', tag = None):

return self.send_message(
recipient_id,
{
"message": {
"attachment":{
"type": attachment_type,
"payload": payload
}
}
},
notification_type,
messaging_type,
tag
)

def send_action(self, recipient_id, action, notification_type = NotificationType.regular, messaging_type = 'RESPONSE', tag = None):

return self.send_message(
recipient_id,
{
"sender_action": action
},
notification_type,
messaging_type,
tag
)

def whitelist_domain(self, domain_list, domain_action_type):

payload = {
"setting_type": "domain_whitelisting",
"whitelisted_domains": domain_list,
"domain_action_type": domain_action_type
}

request_endpoint = '{0}/me/thread_settings'.format(self.graph_url)

response = requests.post(
request_endpoint,
params = self.auth_args,
json = payload
)

return response.json()

def set_greeting(self, template):

request_endpoint = '{0}/me/thread_settings'.format(self.graph_url)

response = requests.post(
request_endpoint,
params = self.auth_args,
json = {
"setting_type": "greeting",
"greeting": {
"text": template
}
}
)

return response

def set_get_started(self, text):

request_endpoint = '{0}/me/messenger_profile'.format(self.graph_url)

response = requests.post(
request_endpoint,
params = self.auth_args,
json = {
"get_started":{
"payload": text
}
}
)

return response

def get_get_started(self):

request_endpoint = '{0}/me/messenger_profile?fields=get_started'.format(self.graph_url)

response = requests.get(
request_endpoint,
params = self.auth_args
)

return response

def get_messenger_profile(self, field):

request_endpoint = '{0}/me/messenger_profile?fields={1}'.format(self.graph_url, field)

response = requests.get(
request_endpoint,
params = self.auth_args
)

return response


def upload_attachment(self, url):

request_endpoint = '{0}/me/message_attachments'.format(self.graph_url)

response = requests.post(
request_endpoint,
params = self.auth_args,
json = {
"message":{
"attachment":{
"type": "image",
"payload": {
"is_reusable": True,
"url": url
}
}
}
}
)

return response
  • 이제 위의 fb_bot.py를 import하여 lambda_hendler.py 파일을 아래와 같이 수정해 주었다.

  • lambda_hendler.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# -*- coding: utf-8 -*-
import sys
sys.path.append('./libs')
import logging
import requests
import pymysql
import fb_bot
import json
import base64
import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

client_id = "Your Spotify ID"
client_secret = "Your Spotify PW"

PAGE_TOKEN = "Your Page Token"
VERIFY_TOKEN = "Your verify token"

host = "Your RDS End point"
port = 3306
username = "Your RDS ID"
database = "Using RDS table name"
password = "Your RDS PW"

try:
conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
cursor = conn.cursor()
except:
logging.error("could not connect to rds")
sys.exit(1)

bot = fb_bot.Bot(PAGE_TOKEN)


def lambda_handler(event, context):

# event['params'] only exists for HTTPS GET

if 'params' in event.keys():

if event['params']['querystring']['hub.verify_token'] == VERIFY_TOKEN:
return int(event['params']['querystring']['hub.challenge'])
else:
logging.error('wrong validation token')
raise SystemExit
else:
messaging = event['entry'][0]['messaging'][0]
user_id = messaging['sender']['id']

logger.info(messaging)
artist_name = messaging['message']['text']

query = "SELECT image_url, url FROM artists WHERE name = '{}'".format(artist_name)
cursor.execute(query)
raw = cursor.fetchall()
# raw가 0인 경우는 DB안에는 존재하지 않으므로 Spotify API에서 직접 search한다.
if len(raw) == 0:
text = search_artist(cursor, artist_name)
bot.send_text(user_id, text)
sys.exit(0)

image_url, url = raw[0]

payload = {
'template_type': 'generic',
'elements': [
{
'title': "Artist Info: '{}'".format(artist_name),
'image_url': image_url,
'subtitle': 'information',
'default_action': {
'type': 'web_url',
'url': url,
'webview_height_ratio': 'full'
}
}
]
}

bot.send_attachment(user_id, "template", payload)

query = "SELECT t2.genre FROM artists t1 JOIN artist_genres t2 ON t2.artist_id = t1.id WHERE t1.name = '{}'".format(artist_name)

cursor.execute(query)
genres = []
for (genre, ) in cursor.fetchall():
genres.append(genre)

text = "Here are genres of {}".format(artist_name)
bot.send_text(user_id, text)
bot.send_text(user_id, ', '.join(genres))


## 만약에 아티스트가 없을시에는 아티스트 추가

## Spotify API hit --> Artist Search
## Database Upload
## One second
## 오타 및 아티스트가 아닐 경우


def get_headers(client_id, client_secret):

endpoint = "https://accounts.spotify.com/api/token"
encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')

headers = {
"Authorization": "Basic {}".format(encoded)
}

payload = {
"grant_type": "client_credentials"
}

r = requests.post(endpoint, data=payload, headers=headers)

access_token = json.loads(r.text)['access_token']

headers = {
"Authorization": "Bearer {}".format(access_token)
}

return headers


def insert_row(cursor, data, table):

placeholders = ', '.join(['%s'] * len(data))
columns = ', '.join(data.keys())
key_placeholders = ', '.join(['{0}=%s'.format(k) for k in data.keys()])
sql = "INSERT INTO %s ( %s ) VALUES ( %s ) ON DUPLICATE KEY UPDATE %s" % (table, columns, placeholders, key_placeholders)
cursor.execute(sql, list(data.values())*2)

# 추가적으로 S3의 top_tracks에도 업데이트해주기 위해서 top-tracks lambda function을 실행시켜주는 함수이다.
# payload 부분은 lambda_handler 함수 안에서 들어오는 event에 관한 부분이다.
def invoke_lambda(fxn_name, payload, invocation_type='Event'):

lambda_client = boto3.client('lambda')

invoke_response = lambda_client.invoke(
FunctionName = fxn_name,
InvocationType = invocation_type,
Payload = json.dumps(payload)
)

if invoke_response['StatusCode'] not in [200, 202, 204]:
logging.error("ERROR: Invoking lmabda function: '{0}' failed".format(fxn_name))


return invoke_response


def search_artist(cursor, artist_name):

headers = get_headers(client_id, client_secret)

## Spotify Search API
params = {
"q": artist_name,
"type": "artist",
"limit": "1"
}

r = requests.get("https://api.spotify.com/v1/search", params=params, headers=headers)

raw = json.loads(r.text)

if raw['artists']['items'] == []:
return "Could not find artist. Please Try Again!"

artist = {}
artist_raw = raw['artists']['items'][0]
if artist_raw['name'] == params['q']:

artist.update(
{
'id': artist_raw['id'],
'name': artist_raw['name'],
'followers': artist_raw['followers']['total'],
'popularity': artist_raw['popularity'],
'url': artist_raw['external_urls']['spotify'],
'image_url': artist_raw['images'][0]['url']
}
)

for i in artist_raw['genres']:
if len(artist_raw['genres']) != 0:
insert_row(cursor, {'artist_id': artist_raw['id'], 'genre': i}, 'artist_genres')

insert_row(cursor, artist, 'artists')
conn.commit()
r = invoke_lambda('top-tracks', payload={'artist_id': artist_raw['id']})
print(r)

return "We added artist. Please try again in a second!"

return "Could not find artist. Please Try Again!"
  • 위에서 다른 trigger lambda function을 참조할 수 있도록 설정해 주었기 때문에 아래와 같이 role을 다른 lambda function을 invoke 할 수 있도록 IAM 페이지에서 permission을 추가해 주어야 정상적으로 작동된다.

IAM 페이지로 이동

IAM 페이지에서 invoke를 위한 permission 추가

  • 이제 해당 페이지의 test를 진행해 보면 아래와 같이 위에서 설정한 것 처럼 해당아티스트의 url과 장르를 보내주는 것을 확인 할 수 있다. 그림은 없지만 필자는 2PM도 검색해보았는데, 잘 검색되어 나왔다.

BTS 검색 결과

  • 이제껏 data engineering에 관한 몇가지 기초적인 부분들을 실습해 보며, data engineer는 상황에 맞춰 resource를 사용할 수 있도록 선택과 집중을 해야 한다고 생각했다. 그러한, 상황에 맞는 선택과 집중을 위해 해당 비즈니스가 처해있는 상황과 단계를 잘 진단하고 깊게 알고있어야 할 것 같다는 생각을 하게 되는 프로젝트 였다.