10주차 3일차와 4일차에는 OpenAPI로 데이터를 가져와(Extract) → 변환하고(Tranform) → 이를 Redshift의 table로 저장(Load)하는 실습들을 진행하였다.
실습 정리에 앞서, Primary Key Uniqueness와 관련된 이론과 Upsert 기법을 짧게 정리후,
주제별로 총 3개의 실습을 순차적으로 정리하겠다.
[ 구성 ]
1. Primary Key Uniqueness와 Upsert 기법
2. 실습1. Yahoo Finance API 로 애플 주식 읽어오기 (버전 3가지)
3. 실습2. RestCountries의 세계 나라 정보 API로 국가 정보 읽어오기
4. 실습3. Open Weathermap API로 서울 8일 낮/최소/최대 온도 읽어오기 (버전 3가지)
* 위 실습은 프로그래머스 데이터 엔지니어링 데브코스에서 진행한 실습입니다.
이론
Primary Key Uniqueness와 Upsert 기법
Primary Key Uniqueness란
Primary Key란
- Primary Key : 테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드(들)
- 하나 필드가 아닌 다수의 필드도 사용할 수 있음 ⇒ composite key라고 함
- ex. PRIMARY KEY (order_id, product_id)
- DW는 PK를 보장해주지 않지만, 스키마에 지정을 해두면, 이후 RDB에서 PK가 중복되는 것을 막아주거나 쿼리를 더 효율적으로 짜줌
DW는 PK 유일성 보장 안 된다
빅데이터 기반의 데이터 웨어하우스는 PK를 지켜주지 않는다.
- 이유 : 데이터가 커서 PK를 보장하는데 메모리와 시간이 크기에, 대용량 데이터 적재에 어려움이 있음
- 일반적으로 데이터웨어하우스는 PK를 기준으로 유일성을 보장해주지 않는다.
→ 이는 데이터 인력(데이터엔지니어, 데이터분석가)의 책임
Primary Key 유지 방법
- 임시 테이블에 현재 모든 레코드 복사
CREATE TEMP TABLE t AS SELECT * FROM - 임시 테이블에 새 데이터 저장
(레코드 중복 존재 가능) - 원본 테이블의 레코드 삭제 후(DELETE FROM), 임시 테이블의 내용을 원본 테이블로 복사
- 이때 중복을 걸러주는 SQL 작성
- ROW_NUMBER이용
ex. ROW_NUMBER() OVER (partition by date order by created_date DESC) seq
Upsert (Insert + Update)
Primary Key를 기준으로 존재하는 레코드라면 새 정보로 수정, 그렇지 않으면 새 레코드 적재
- 보통 데이터 웨어하우스마다 UPSERT를 효율적으로 해주는 문법을 지원해줌
앞에서 작성한 코드가 Upsert 코드임
실습 1.
Yahoo Finance API 로 애플 주식 읽어오기 (버전 3가지)
실습 내용 : Yahoo Finance API 호출하여 지난 30일에 대한 애플 주식 정보를 수집한다.
개요 : ETL을 통해 Redshift의 stock_info테이블에 저장한다.
- Incremental Update로 구현하면 이때 매일 DAG를 실행하여 하루치 데이터가 늘어난다.
- Redshift연결은 PostgresHook 이용함. autocommit은 True
- 버전 3으로 나누어 진행되며 버전 별로 실습 고도화 됨
- v1(Github) : Full Refresh 로 구현
- v2(Github) : Incremental Update 로 구현 (DISTINCT방식으로 중복 처리)
- v3(Github) : Incremental Update 로 구현 (ROW_NUMBER방식 이용하여 중복 처리)
Load 함수를 중점적으로 v1~v3의 구현을 비교해본다.
V1_Full Refresh 구현
코드
try:
# 트랜잭션 (Full Refresh 형태)
cur.execute("BEGIN;")
cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
cur.execute(f"""
CREATE TABLE {schema}.{table} (
date date,
"open" float,
high float,
low float,
close float,
volume bigint
);""")
for r in records:
sql = f"INSERT INTO {schema}.{table} VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
print(sql)
cur.execute(sql)
cur.execute("COMMIT;")
except Exception as error:
print(error)
cur.execute("ROLLBACK;")
raise
logging.info("load done")
트랜잭션 (BEGIN, COMMIT/ROLLBACK)
- 테이블이 있다면 삭제 (DROP TABLE)
- 새 테이블 생성 (CREATE TABLE)
- 한 줄씩 데이터 삽입 (INSERT INTO)
실행 결과
터미널로 실행하였다.
airflow dags test UpdateSymbol 2023-05-30
만들어진 테이블에 데이터가 잘 적재된 것을 확인하였다.
v2_Incremental Update 구현 (DISTINCT 중복제거)
코드
def _create_table(cur, schema, table, drop_first):
"""
테이블 생성
"""
if drop_first:
cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
date date,
"open" float,
high float,
low float,
close float,
volume bigint
);""")
@task
def load(schema, table, records):
"""
흐름 : 원본테이블 없으면 생성 -> 임시테이블에 기존 테이블의 데이터 복사
-> 데이터 추가 -> 원본데이터 반영
"""
logging.info("load started")
cur = get_Redshift_connection()
try:
# 트랜잭션 (Incremental Update 형태)
cur.execute("BEGIN;")
# 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드
_create_table(cur, schema, table, False)
# 임시 테이블로 원본 테이블을 복사
cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
for r in records:
sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
print(sql)
cur.execute(sql)
# 원본 테이블 생성
_create_table(cur, schema, table, True)
# 임시 테이블 내용을 원본 테이블로 복사
cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;")
cur.execute("COMMIT;")
except Exception as error:
print(error)
cur.execute("ROLLBACK;")
raise
logging.info("load done")
트랜잭션 (BEGIN, COMMIT/ROLLBACK)
테이블 생성하는 구문을 함수로 뺌
- 첫 실행이라면 새 원본 테이블 생성 (CREATE TABLE IF NOT EXISTS)
- 새 임시 테이블 생성 후 기존 원본 테이블의 내용 가져옴 (CREATE TEMP TABLE)
- 한 줄씩 데이터 삽입 (INSERT INTO)
- 임시테이블에 기존데이터와 새 데이터 모두 적재됨
- 기존의 원본 테이블을 삭제하며 원본 테이블 생성 (DROP TABLE, CREATE TABLE)
- 임시 테이블 내용을 원본 테이블로 복사 (INSERT INTO .. SELECT …)
- 이때 DISTINCT로 중복되는 레코드 제거함
실행 결과
터미널로 실행하였다.
airflow dags test UpdateSymbol_v2 2023-05-30
v3_Incremental Update 구현 (ROW_NUMBER 중복제거)
기존의 v2같은 경우 중복을 DISTINCT로 제거하는데, 이는 모든 필드 단위로 구분하여 중복을 없애기에,
같은 날짜에 다른 값이 들어갈 수 있다. (ex. 하루에 2번 호출되는 경우 등등..)
그렇기에 Date을 PK로 잡고 이에 대한 중복을 없애는 것이 필요하다.
v3에서는 중복을 Row_number를 이용하여 없앤다.
코드
V2와 전반적으로 동일함. 달라진 점만 정리하면 다음과 같다.
- 원본 테이블 필드에 created_date 추가
- ROW_NUMBER() 추가하여 중복제거(alter_sql)
def _create_table(cur, schema, table, drop_first):
"""
테이블 생성
"""
if drop_first:
cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
date date,
"open" float,
high float,
low float,
close float,
volume bigint,
created_date timestamp default GETDATE()
);""")
@task
def load(schema, table, records):
logging.info("load started")
cur = get_Redshift_connection()
try:
# 트랜잭션 (Incremental Update 형태)
cur.execute("BEGIN;")
# 원본 테이블이 없으면 생성
_create_table(cur, schema, table, False)
# 임시 테이블에 원본 테이블의 레코드 복사
cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
for r in records:
sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
print(sql)
cur.execute(sql)
# 원본 테이블에 임시 테이블 내용 복사 (+중복제거)
alter_sql = f"""DELETE FROM {schema}.{table};
INSERT INTO {schema}.{table}
SELECT date, "open", high, low, close, volume FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;"""
cur.execute(alter_sql)
cur.execute("COMMIT;")
except Exception as error:
print(error)
cur.execute("ROLLBACK;")
raise
logging.info("load done")
트랜잭션 (BEGIN, COMMIT/ROLLBACK)
테이블 생성하는 구문을 함수로 뺌
- 첫 실행이라면 새 원본 테이블 생성 (CREATE TABLE IF NOT EXISTS)
- 새 임시 테이블 생성 후 기존 원본 테이블의 내용 가져옴 (CREATE TEMP TABLE)
- 한 줄씩 데이터 삽입 (INSERT INTO)
- 임시테이블에 기존데이터와 새 데이터 모두 적재됨
- 기존의 원본 테이블을 삭제하며 원본 테이블 생성과 동시에 ROW_NUMBER로 중복 제거함 (alter_sql)
실행 결과
터미널로 실행하였다.
airflow dags test UpdateSymbol_v3 2023-05-30
실습 2.
RestCountries의 세계 나라 정보 API로 국가 정보 읽어오기
세계 나라 정보 API로 국가 정보 읽어오기
실습 내용 : 세계 나라 정보 API 호출하여 나라별로 국가명, 인구수, 지역 정보를 불러와 저장한다.
개요 : ETL을 통해 Redshift의 stock_info테이블에 저장한다.
- Full Refresh로 구현한다.
- 코드 전문 : (Github)
- ETL 각 과정을 함수로 만들어, Redshift의 country_info 테이블에 적재함
주요 코드
for record in records:
# 따옴표 처리를 위해 파라미터 바인딩 사용
sql = f"INSERT INTO {schema}.{table} VALUES (%s, %s, %s);"
cur.execute(sql, record)
- country에는 작은 따옴표가 들어가는 경우가 있다.
- 따옴표를 유지한채 저장하기 위해서 파라미터 바인딩을 이용하여, execute 실행시 sql문과 함께 두번째 인자로 파라미터 배열을 같이 넣는다.
- 추가로 json 읽는 코드는 다음과 같다.
- response.json() 혹은 json.loads(response)
실행 결과
실습 3.
Open Weathermap API로 서울 8일 낮/최소/최대 온도 읽어오기 (버전 2가지)
실습 내용 : OpenWeather API를 이용해 오늘로부터 향후 8일의 시간별 날씨 정보 가져옴
실습을 진행하려면 OpenWeather 사이트 방문해서 회원가입 후 카드 등록까지 해서 OneCAll3.0을 구독해야된다.
(100번 호출까지는 무료)
(회원가입만 해도 key를 발급할 수 있는데, 카드를 등록하지 않으면 날씨 정보를 못 불러왔었다.)
개요 : ETL을 통해 Redshift의 stock_info테이블에 저장한다.
- Incremental Update로 구현하면 이때 매일 DAG를 실행하여 하루치 데이터가 늘어난다.
- Redshift연결은 PostgresHook 이용함. autocommit은 False
- 버전 2으로 나누어 진행되며 버전 별로 실습 고도화 됨
- v1(Github) : Full Refresh 로 구현
- v2(Github) : Incremental Update 로 구현 (ROW_NUMBER방식 이용하여 중복 처리)
ETL 과정이 구현된 etl()함수를 중점적으로 정리하였다.
V1_Full Refresh 구현
코드
@task
def etl(schema, table):
api_key = Variable.get("open_weather_api_key")
# 서울의 위도/경도
lat = 37.5665
lon = 126.9780
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
response = requests.get(url)
data = json.loads(response.text) # response.json()
ret = []
print("check data : ", data)
# daily에는 앞으로 8일간 날씨 정보 들어감
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # epoch의 형식 변환
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
CREATE TABLE {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
"""
insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret) # 한 번에 insert함
logging.info(drop_recreate_sql)
logging.info(insert_sql)
# 예외처리 (raise 잊지말기)
try:
cur.execute(drop_recreate_sql)
cur.execute(insert_sql)
cur.execute("Commit;")
except Exception as e:
cur.execute("Rollback;")
raise
트랜잭션 (autocommit False로 설정됨)
1. json으로 데이터 가져와 파싱함
2. data["daily"] 에 저장된 앞으로 8일간 날씨 정보를 일마다 빼와서, 날짜 타입 변경후 저장
- 이때 ret.append("('{}',{},{},{})".format() 으로 저장하여 INSERT시 join으로 한 번에 묶어줌
3. DROP TABLE IF EXISTS 후 CREATE TABLE 한다.
4. INSERT INTO ... 와 ",".join(ret) 으로 모든 레코드를 삽입한다.
실행 결과
터미널로 실행해 주었다.
잘 저장된 것을 확인할 수 있다.
V2_Incremental Update 구현
코드
@task
def etl(schema, table, lat, lon, api_key):
# Extract
# ...
# Transform
# ...
# Load
cur = get_Redshift_connection()
# 1) 원본 테이블이 없다면 생성
create_table_sql = f"""CREATE TABLE IF NOT EXISTS {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);"""
logging.info(create_table_sql)
# 2) 임시 테이블 생성
create_t_sql = f"""CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};"""
logging.info(create_t_sql)
try:
cur.execute(create_table_sql)
cur.execute(create_t_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# 3) 임시 테이블 데이터 입력
# ret를 저장할 때 '('로 묶어주어 한 번에 join 가능함
insert_sql = f"INSERT INTO t VALUES " + ",".join(ret)
logging.info(insert_sql)
try:
cur.execute(insert_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# 4) 기존 테이블 대체
alter_sql = f"""DELETE FROM {schema}.{table};
INSERT INTO {schema}.{table}
SELECT date, temp, min_temp, max_temp FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;"""
logging.info(alter_sql)
try:
cur.execute(alter_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
트랜잭션 (autocommit False로 설정됨)
load 구현 부분을 중심적으로 정리함
- 첫 실행이라면 새 원본 테이블 생성 (CREATE TABLE IF NOT EXISTS)
- 새 임시 테이블 생성 후 기존 원본 테이블의 내용 가져옴 (CREATE TEMP TABLE)
- 한 번에 데이터 삽입 (INSERT INTO)
- 임시테이블에 기존데이터와 새 데이터 모두 적재됨
- 기존의 원본 테이블을 삭제하며 원본 테이블 생성과 동시에 ROW_NUMBER로 중복 제거함 (alter_sql)
실행 결과
터미널로 실행해 주었다.
잘 적재된 것을 확인할 수 있다.
'#️⃣ Data Engineering > Airflow' 카테고리의 다른 글
Airflow 실습_OLTP에서 OLAP으로 데이터 적재하기 (0) | 2023.06.09 |
---|---|
Airflow Backfill_Execution Date과 StartDate (0) | 2023.06.08 |
Airflow 실습_기본코드 정리 및 ETL 코드 개선하기 (0) | 2023.06.07 |
Airflow 설치(Docker-compose)와 간단한 실습 (0) | 2023.06.07 |
Airflow란? 구성요소 알아보기 (0) | 2023.06.07 |