data engineering (Presto란?)

Presto란?

  • Spark의 단점이라 하면, 물론 Spark SQL도 있지만, 어느 정도 Scripting이 필요한 부분이 있다. MySQL 같이 RDS로 데이터 구축을 했을때에는 SQL을 통해서 쉽게 가져올 수 있었지만, Big data로 넘어 오면서 이전 필자의 글을 보았을 때 S3에서 두 곳에 나누어 저장을 했는데, 이런 경우 그럼 RDS와 다르게 어떻게 합칠 수 있는지를 Presto를 통해 하나의 query로 해결할 수 있다. syntax는 SQL과 비슷하다. 다양한 multiple data source를 single query를 통해서 진행 할 수 있는 것이다. Hadoop의 경우는 performance나 여러가지 data analytics 할때 여러가지 issue들이있으며 이전 방식이기 때문에 최근에는 Spark와 Presto로 넘어오는 추세이다. AWS는 Presto기반인 Athena를 통해서 S3의 데이터를 작업할 수 있다.

Serverless란?

  • 말 그대로 server가 없다라고 할 수 있으며, Severless라고 하는 부분은 보통 어떠한 서비스를 만들 때, 우리의 Desktop PC를 계속해서 켜두는 경우가 아니므로 EC2라고 하는 계속 지속적으로 띄어져있는 가상의 서버를 만들게 된다. 이때 서버의 용량을 결정해야하는데, 예를들어 chat bot을 통해 User와 소통을할때 어떤 날은 User가 1명 다른날은 100명 어떤날은 10,000명으로 늘어날수있어서 무작정 큰 서버를 사용하게 되거나 순차적으로 Docker를 통해 병렬적으로 어떤 기준이상이 되면 용량을 늘리는 이러한 작업도 다 비용이 되므로 이러한 문제들을 보완하고자 Serverless라는 개념이 도입된다. 어떠한 요청이 들어올때 server를 띄우는데 지속적으로 요청이 들어온다면 계속적으로 병렬적인 server를 띄운다는 것이다. server안에서 용량을 정하는 것을 알아서 자동적으로 해결해 주므로 비용적인 문제를 보완해준다. AWS에서 EC2같은 경우는 server 하나를 띄우는 것이고, Lambda가 Serverless의 개념을 갖는 서비스이다. 또한 Athena도 Serverless의 개념을 갖는 서비스이다.

AWS Athena의 개요

  • AWS Athena에서도 data lake의 시스템 형태로 데이터를 작업하더라도 query를 통해 작업을 하려면 data warehouse 처럼 table의 형식을 안만들 수는 없다.

  • 먼저, AWS Athena 필자와 같이 처음 Athena를 사용하는 것이라면, create table 버튼을 클릭하면 아래와 같은 형태로 query문을 입력하는 페이지 보일 것이다.

table 생성 및 query문 결과 저장 setting

  • 여기서 query문의 결과를 저장하는 곳을 먼저 설정해 주어야 하는데, 이전의 S3의 spotify-chatbot-project bucket에 새로운 폴더를 추가해 주고 아래 그림과 같이 path를 설정해준다.

S3 query문 결과 저장 폴더 생성

query문 path 설정

  • 이제 아래 그림과 같이 이전에 만들어 놓은 parquet형태의 데이터가 존재하는 folder의 path로 query문을 날려준다. 이전에 존재하지 않았던 테이블이 왼쪽의 tab에 생성되며 아래 부분에는 결과를 알려주는 부분이 보여질 것이다. 그 부분에는 partition되어진 부분은 직접적으로 load를 해주어야 한다는 결과를 알려주고있다.

query문 결과

  • 위의 그림에서 맨 아래 부분의 모든 partition을 보려면, MSCK REPAIR TABLE) command 사용하라고한 것 처럼, query문을 추가해서 사용하였다.

MSCK REPAIR TABLE query문 추가

  • 일반적인 query문과 같이 작성해서 볼 수 있다. 먼저 top_tracks 테이블의 상위 10개만 불러와 볼 것이다. 그리도 partition을 dt로 했기 때문에 dt도 같이 불러와 지는 것을 확인 할 수 있다.

top_tracks의 상위 10개 데이터

dt도 같이 불러옴을 확인

  • audio_features도 동일한 방식으로 만들어 볼 것이다.

audio 테이블 생성

audio 테이블 파티션 로드

audio 테이블 상위 10개 records

  • 이밖으 사용법은 아래 문서를 통해 사용법을 확인할 수 있으며, 유의할점은 우리가 partition으로 나누어 놓은 것을 전체로 불러 왔기에 최근의 partition만을 살펴보기 위해 date를 어떤 값으로 설정했는지 확인해주어야 한다.
  • prestodb documents

presto 문법을 통한 통계치

Apache Spark

  • 데이터를 처리하는 하나의 시스템이다. 데이터는 항상 늘어나고 그리고 너무나 큰 방대한 양을 처리를 해야하기 데이터가 늘어나면 늘어날수록 속도,시간,비용 여러면에서 효율적으로 처리해야한다. 필자는 제플린을 사용할 것인데, 제플린은 Spark를 기반으로 한 Web UI이다. 그래서 Spark를 통해서 처리된 데이터를 시각화한다던지 어떤식으로 output이 나오는지를 볼 수 있다. Jupyter notebook과 비슷하다고 생각하면 된다.

Spark란?

Map Reduce

  • Map Reduce는 데이터가 방대한 양으로 늘어날때 처리하는 방식에 issue가 생길 수 있다. 이런 issue들을 보완하기 위해서 데이터가 여러군데 분산처리 되어있는 형태로 저장되어있는데, S3 bucket에 저장한 방식처럼 partition으로 구분된 데이터를 function이나 어떠한 방식에 의해서 mapping을 해서 필요한 부분만을 줄이는 Reduce 과정을 거치게 된다. 이 방식은 처음 Google File System으로 사용되어지다가 그 뒤에 Map-Reduce방식으로 Hadoop을 사용했으며, 속도에 특화된 Spark를 현재는 주로 사용하고 있다. 예를 들면,

Map Reduce

  • 예를들면, 구글같이 다양한 web page를 크롤링해서 각 페이지들의 노출 랭킹을 분석해야 하는 Page Rank라는 알고리즘을 사용할때 html안에 들어가는 tag라던지 이런 문법적인 요소들과 contents들을 한 곳에 몰아서 분석하기 보다는 아래 그림과 같이 Input을 병렬적으로 나누어 진행하고 그 다음 어떠한 Suffling process를 통해서 Reduce하여 결과를 낸다.

Map Reduce example

  • 필자는 AWS의 EMR(Elastic Map Reduce)서비스를 통해 Spark와 제플린을 설치 해 볼 것이다. EMR은 Spark나 Hadoop 같은 시스템에 cluster를 만드는 곳이라고 생각할 수 있다. cluster는 서버들이라고 말 할 수 있다. EC2를 base로한 EMR을 cluster화해서 하나의 instance ECS server가 아니라 master 아니면 다양한 core Node들이 여러개가 생성이 되어서 방대한 양의 데이터를 처리하기 위해서 필요한 setting을 구성할 수 있는 곳이라고 생각할 수 있다.

  • 서버를 사용하기 위해서는 다양한 권한이라던지 key file들이 필요하므로 security와 같은 부분을 다루어야 하는데 이런 부부은 AWS에서 IAM 서비스에서 작업할 수 있다.

  • 아래 그림과 같이 먼저 AWS에서 EMR 페이지로 이동한다.

EMR 서비스

EMR 서비스 cluster 생성

  • cluster 이름을 정하고, Application은 Spark를 사용할 것이므로 아래 그림과 같이 설정해 주었다. 또한, hardware 부분은 memory optimization등 여러가지 옵션이 존재하지만 모든 것은 다 비용이므로 우선 간단하게 c4 large를 사용해 보려고 한다.

EMR 서비스 quick 옵션들

  • EC2 서버 안에서 다양한 작업들을 하기 위해서 key pair 필요하다. 필자와 같이 한번도 key pair를 생성한 적이 없다면 아래 그림과 같이 Learn how to key pair 버튼을 클릭하여 EC2 페이지로 넘어가 key pair를 생성한다.

key pair

key pair 생성하는 방법

EC2 key pair 페이지

EC2 key pair 생성

EC2 key pair 생성 방법

key pair 생성 결과

  • key pair가 생성되면 동시에 pem 파일이 다운로드 되어질 것이다. 그 파일을 현재 project를 진행하는 폴더로 옮겨 놓는것을 추천한다. 옮겨 놓았다면, 아래 그림과 같이 파일의 모드를 변경해 주어야 한다.

key pair pem 파일 모드 변경

  • 해당 pem 파일이 존재하는 path에서 아래와 같이 변경한다.
1
chmod og-rwx spotift_chatbot.pem
  • key pair를 생성하였으므로 다시 cluster를 생성하는 페이지로 돌아가 key pair를 설정한 뒤에 아래 생성 버튼을 클릭한다.

cluster 생성

  • cluster가 생성되면 아래 그림과 같이 cluster가 속성들에 대한 페이지가 보일 것이다. 또한, Master 권한으로 접속을 하려면 SSH 방식으로 인증 후 접속해야하기 때문에 security group을 관리 해 주어야 한다. Security groups for Master의 값을 클릭하여 EC2의 security group 페이지로 이동하여 Inbound 규칙에 Master와 slave 둘다 아래 그림과 같이 ssh 규칙을 추가해 주어야 한다.

EC2 security group Inbound rule 추가

Inbound rule에 ssh 설정

  • ssh로 접속하는 방법은 아래그림과 같이 enable web connection을 클릭하면 확인 할 수 있다.

ssh로 접속하는 방식

  • pem 파일이 존재하는 path로 가서 아래 그림에서 빨간색 줄이 쳐져있는 command를 실행해 놓은 뒤, 동일한 파일 path에 아래 그림에서 처럼 생성하라는 foxyproxy-settings.xml를 회색 네모칸의 내용들을 복사하여 생성한다. 동일한 파일 path에서 하라고 추천하는 것은 이 path에서 계속 접속할 것이기 때문이다.

ssh로 접속하는 방식 - 01

  • 그 다음은 chrome web store에 가서 foxy proxy를 검색한 후 standard 버젼을 설치해준다. 그러므로 chrome 브라우저를 사용해야 할 것임은 당연히 알 것이라고 생각한다.

ssh로 접속하는 방식 - 02

  • foxy proxy를 chrome browser에 추가했다면, 오른쪽 상단에 여우모양의 아이콘이 생성되었을 것이다. 클릭한 후 option 버튼을 눌러준다.

ssh로 접속하는 방식 - 03

  • 그 다음은 왼쪽 tab에서 import/export를 눌러 준 후에 이전에 만들어 준 xml파일을 눌러 replace해 주면 된다. 그런 다음 public dns로 접속해 보면 맨 아래 그림과 같이 test page를 볼 수 있을 것이다. 또한 dns위에 connection들이 활성화 된 것을 볼 수 있다.

ssh로 접속하는 방식 - 03

ssh로 접속하는 방식 - 04

ssh로 접속하는 방식 - 05

ssh로 접속하는 방식 - 06

  • 아래 그림에서와 같이 connection들 중 제플린을 클릭하면 제플린 페이지로 이동한다.

제플린 접속

zeppelin page

notebook 생성

  • 모든 명령문이 %pyspark로 시작해야 한다. 필자는 기존의 c4.large로 진행시에는 connection error가 생겨 원인을 찾다가 해결치 못하고 우선 r3.xlarge를 선택하여 다시 cluster를 만든 결과 connection error가 발생하지 않았다.
  • 2번째 cell까지는 python에서 진행하던 방식이고 3번째 cell에서 구현하는 방식이 spark의 방식인데 spark는 rdd를 기반으로 방대한 데이터를 분산시켜서 mapping한 후 apply해서 얻은 값을 통합을 해서 받는 구조이다. sc(spark context)를 통해 parallelize하게 3개의 데이터를 쪼개서 rdd로 나누어 준 결과이다.

zeppelin python 방식과 spark방식의 차이

  • 아래와 같이 rdd로 쪼개서 map함수를 통해 쪼개놓은 데이터에 각각 mapping 시켜주고 선택한다. 또한, sqlContext를 통해 S3에 이전에 저장해 놓았던 데이터를 dataFrame형태로 불러와 작업할 수 있다. printSchema()함수를 통해 각각의 값들이 어떤 형태로 들어있는지에 주의를 갖고 살펴봐야 추후에 작업에 어려움이 없다.

zeppeplin spark 함수 및 rdd 개념

zeppelin spark 함수 및 dataFrame

zeppelin spark 함수 및 dataFrame - 01

  • 이렇게 불러온 데이터를 내장 함수를 활용해 기본적인 통계량값들을 계산할 수 있으며, 필요에 따라 사용자 정의 함수(UDF : User Definition Function)을 사용하여 전처리를 할 수 있다. UDF를 통해 정의한 함수를 Boolean값으로 처리하여 다음과 같이 filter함수에 적용시켜 condition을 줄 수도 있다.

sql function 및 udf

sql function 및 udf - 01

  • 본격적으로 S3에 저장해 놓은 모든 데이터들을 join한 master table을 만들기 위해 아래와 같은 작업을 한다. 가장 먼저, artists parquet 데이터를 불러와 아래와 같이 DataFrame으로 만들 수 있다.

artist table

  • 허나, artists 데이터는 잘 변하지 않기 때문에 zeppelin에서 python을 통해 RDS에서 바로 불러오는 것이 좋을 수도 있다. 아래에서 함수 안의 argument값들은 자신의 값에 맞는 것들로 먼저 정해놓고 실행해야 한다.

artist table을 불러오는 다른 방법

  • 최종적으로 artists, top-tracks, 그리고 audio_features 모두 join한 table을 만들 것 이다. 참고로 아래 cell을 실행하기 이전에 master node에 접속해서 먼저 sudo pip install pandas와 sudo pip install pyspark 명령어를 통해 설치해 주어야 한다. zeppelin의 장점 중 하나로 바로 sql table로 지정하여 sql query문으로 작업할 수 있다. 해당 값을 바로 시각화할 수 있는 점도 장점 중의 하나이다. 옵션에서 없는 그래프를 그리는 것은 python이나 다른 library를 활용하여 보완할 수 있다.

모든 데이터를 join한 master table 생성

query문 작성

query문을 통한 분포 시각화

  • 데이터의 audio feature의 분포를 통해 예를 들어 가수의 인기도와 트랙의 인기도의 차이가 거의 없는 해당 가수의 대표적인 트랙을 알고 싶다면 아래와 같은 EDA를 먼저 실행하여 audio feature들의 특징을 파악하는 것이 중요하다. 아래 그림에서 처럼 acousticness는 전체적으로 0쪽으로 치우쳐있어 중심을 대표하는 값으로는 median을 사용해야 될 것이라고 판단할 수 있으며, danceability는 정규분포 꼴을 띄고 있어 mean을 사용해도 무방할 것으로 판단 할 수 있다.

audio features의 평균값들

전체 acousticness의 분포

전체 danceability의 분포

참고하면 좋을 문서 - 01

참고하면 좋을 문서 - 02

참고하면 좋을 문서 - 03