오늘은 10주차 Airflow 기본의 마지막 학습 날이었다.
수업에서는 Mysql의 nps 테이블의 내용을 Airflow를 이용하여 S3에 적재 후, Redshift로 최종 데이터를 적재하는 실습을 진행하였다.
이번주에 배운 Airflow와 지난 주에 배운 Redshfit와 S3를 총 연결하는 실습이라 즐기며 실습을 진행했고,
실제 실무에서 할 만한 작업을 데모로 진행해본 것 같아 의미가 컸다. 👍
목차는 다음과 같다.
실습에 앞서, Mysql에서 Redshfit로 데이터를 적재할 수 있는 방법을 정리한 후,
이를 Full Refresh와 Incremental Update 형식 두 가지로 작성한 Dag를 직접 실행한 내용을 정리해보겠다.
실습 개요
OLTP(MySQL) → Airflow → S3 → OLAP(Redshfit)
실습 과정을 그림으로 그려 정리해보았다.
* 위 실습은 프로그래머스 데이터 엔지니어링 데브코스에서 진행한 실습입니다.
* 실습에 사용된 Redshift, MySQL, S3는 실습환경이 구축된 계정 정보를 받아 진행했습니다.
01. MySQL 테이블의 record를 Redshift로 옮기는 법
방법 1 (데이터가 적을 때)
- INSERT INTO로 Mysql에서 Redshfit바로 레코드 옮김
- 이는 데이터 많으면 좋지 않음. 레코드가 적을 때 선호되는 방식
방법 2 (데이터가 많을 때)
- 소스에서 데이터를 COPY 해서 파일로 저장
- 해당 파일을 S3 버킷에 저장
- s3 버킷의 파일을 Redshfit로 Bulk Update함
위 사이클을 관리해주는 것이 Airflow
- 필요한 연결: 이 작동을 위해서는 Mysql과 S3에 대한 Connection
- 필요한 권한 :
- Airflow가 S3 Write하는 권한 (IAM User)
- Redshfit가 S3 Read하는 권한 (IAM Role)
02. 실습 환경 셋팅
1) Airflow 스케줄러에 Mysql 모듈 설치
실습을 하려면 Airflow에서 Connection으로 Mysql 을 등록해야된다.
이를 위해서는 다음과 같이 모듈 설치가 필요하다.
docker exec --user root -it (스케줄러 컨테이너id) sh
sudo apt-get update
sudo apt-get install -y default-libmysqlclient-dev
sudo apt-get install -y gcc
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"
2) Redshift에 nps 테이블 생성
Google Colab에서 Redshift 연결해서 nps 테이블을 생성한다.
이유는 뒤에 실습 코드는 새 데이터를 적재 시, 기존의 테이블의 레코드 삭제 후 테이블 생성하여 코드를 적재하는 식이라
기존에 만들어진 테이블이 필요하다.
%%sql
CREATE TABLE hmk9667.nps (
id INT NOT NULL primary key,
created_at timestamp,
score smallint
)
3) Airflow Connection 등록
등록1. Mysql 정보
- Connection Id: mysql_conn_id
- Connection Type: MySQL
- Host : (호스트명)
- Schema: prod
- Login: (id)
- Password: (pw)
- Port: 3306
등록2. AWS S3 연결을 위해 생성한 IAM Access Key
AWS IAM 으로 사용자를 만들어, S3 Bucket에 대한 Write, Read 권한을 주고,
해당 IAM 유저의 Access Key ID와 Secret Access Key를 사용하여 Airflow에서 접근가능하도록 한다.
IAM 유저를 만들고 Policy를 직접 추가할 때,
1) 대상이 되는 S3 bucket에 대한 권한만 지정
2) S3 모든 권한 지정 (AmazonS3FullAccess)
2가지 방법이 있다.
최소한의 권한만 주는게 좋기에 1번 방법을 권고한다.
아래는 IAM 유저의 Custom Policy 지정하는 JSON 이다.
- IAM 유저의 Custom Policy
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetBucketLocation", "s3:ListAllMyBuckets" ], "Resource": "arn:aws:s3:::*" }, { "Effect": "Allow", "Action": "s3:*", "Resource": [ "arn:aws:s3:::(버킷명)", "arn:aws:s3:::(버킷명)/*" ] } ] }
Airflow Connection 등록
- Connection id : aws_conn_id
- Connection Type: Amazon Web Service
- AWS Access Key ID: (키정보)
- AWS Secret Access Key: (키정보)
- Extra : { "region_name": "ap-northeast-2" }
- Extra 부분에 aws 리전 정보를 등록해줘야 한다.
03. DAG 작성 및 실행 결과
소스코드 설명은 주요부분을 중점적으로 정리해보았다.
v1_Full Refresh
코드 작성
소스코드(Github)
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = "SELECT * FROM prod.nps",
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
method = 'REPLACE',
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
dag = dag
)
SqlToS3Operator, S3ToRedshiftOperator 의 2가지 오퍼레이터를 이용함
[ SqlToS3Operator ]
- task_id : task id
- query : mysql에서 실행할 쿼리문
- s3_bucket : 적재할 s3 버킷 이름
- s3_key : 적재할 폴더와 파일명 (즉 path정보)
- sql_conn_id : MySQL연결 키 (connection에 등록한 키 이름)
- aws_conn_id : aws 연결 키 (connection에 등록한 키 이름)
- verify : 파일에 대한 쓰기 작업이 완료된 후 파일의 일관성을 검사할지 여부 (디폴트 False)
- replace : s3의 데이터 적재 경로에 이미 데이터가 있을 때 삭제하고 생성할 지(=overwrite) 여부 (True면 삭제함)
- pd_kwargs : 내부 로직 상 데이터를 읽어올 때 pandas를 통해 읽어오는데, 이때 읽어오는 옵션을 설정함
- index : False면 일련번호 붙이지 않음
- header : 헤더 없이 가져옴
- dag : dag
공식 문서 : https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/transfer/sql_to_s3.html
[ S3ToRedshiftOperator ]
- task_id : 태스크 id
- s3_bucket : 소스가 되는 s3 버킷명
- s3_key : 소스가 되는 s3 경로
- schema : Redshift에적재할 스키마
- table : Redshift에 적재할 테이블
- copy_options : 소스 데이터 형식 지정
- ['csv'] : csv 파일임을 지정함
- method : Redshift 저장 방식
- 종류 : REPLACE(full Refresh), UPSERT(pk기준으로 중복제거 후 insert), APPEND
- redshift_conn_id : redshift 연결 키 (connection에 연결된 키 이름)
- aws_conn_id : aws 연결 키 (connection에 등록한 키 이름)
- dag :dag
실행 결과
처음엔 환경설정 잘못 해서 오류가 났지만, 설정 후에는 잘 실행된 것을 확인하였다.
v2_Incremental Update
필요 조건
MySQL이나 PostgreSQL 테이블을 Incremental Update 방식으로 적재하려면
테이블은 다음 필드들을 가지고 있어야 한다.
- created (timestamp) : Optional; 생성시간
- modified (timestamp) : 마지막 수정시간
- deleted (boolean) : 레코드 삭제 시, 삭제 안하고 True로 설정함
구현 방식 종류
Incremental Update를 구현하는 방식은 2가지 이다.
1. 직접 ROW_NUMBER로 구현
- SELECT * FROM A WHERE DATE(modified) = DATE(execution_date) 쿼리를 MySQL에 보냄
- 쿼리 결과를 파일로 저장 후 S3 업로드 후 Redshift로 Bulk Update (COPY) 수행함
2. S3ToRedshiftOperator로 구현하여 method를 UPSERT로 지정
- SELECT * FROM A WHERE DATE(modified) = DATE(execution_date) 로 query 파라미터 지정
- method 를 "UPSERT"로 지정
- upsert_keys를 PK로 지정
코드 작성
소스코드(Github)
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
print(sql)
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = sql,
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
method = "UPSERT",
upsert_keys = ["id"],
dag = dag
)
- SqlToS3Operator의 query 변경함
- 위 코드의 sql 의미 : 레코드 생성된 날짜와 Airflow의 execution_date가 같은 레코드만 읽어옴
- {{ execution_date }} 처럼, {{+공백 으로 작성하면 airflow 시스템 변수로 바꿔치기됨
- S3ToRedshiftOperator의 method는 UPSERT로 하고, upsert_keys에 중복 제거 시 기준 PK값을 등록한다.
- upsert_keys는 리스트라 여러개 작성 가능하다.
실행 결과
터미널로 실행하였다.
이때 execution_date를 꼭 뒤에 써줘야된다.
이 의미는 해당 날짜에 누락된 데이터를 읽어서 중복 데이터 없애고 적재한다는 의미이다. (backfill)
(실행 결과 사진 첨부)
03. Backfill 실행하기
Backfill이 필요한 경우
- Full Refresh가 아닌 Incremental Update인 경우 필요
- 데이터 소스가 backfill 방식을 지원해야함
- “catchup” 필드를 True로 설정
Backfill 명령어
airflow dags backfill dag_id -s (start_date) -e (execution_date)
- start_date부터 시작하지만 end_date은 포함하지 않음
- “execution_date”을 사용해서 업데이트할 데이터 결정
- 다음으로 중요한 것은 DAG 구현이 execution_date을 고려해야 하는 것이고 idempotent 해야함
- 실행순서는 날짜/시간순은 아니고 랜덤. 만일 날짜순으로 하고 싶다면
- DAG default_args의 depends_on_past를 True로 설정
default_args = { 'depends_on_past': True, ,,)
- DAG default_args의 depends_on_past를 True로 설정
'#️⃣ Data Engineering > Airflow' 카테고리의 다른 글
Ch1. Airflow 살펴보기 (0) | 2023.11.14 |
---|---|
Airflow 운영과 대안 (0) | 2023.06.22 |
Airflow Backfill_Execution Date과 StartDate (0) | 2023.06.08 |
Airflow 실습_OpenAPI와 Upsert를 이용한 DAG (0) | 2023.06.08 |
Airflow 실습_기본코드 정리 및 ETL 코드 개선하기 (0) | 2023.06.07 |