저번주 데브코스에서는 DW를 이용하여 대시보드를 구성하는 프로젝트를 진행했고,
이번주에는 데이터 파이프라인에 대해 알아보며 Airflow에 대해 학습을 진행 중이다.
오늘은 데이터 파이프라인이 무엇인지 알아보고, 용도에 따른 데이터 파이프라인 종류 그리고 데이터 파이프라인 설계 시 고려할 점을 살펴보며 마지막에는 ETL을 python으로 직접 구현해보는 실습으로 학습을 진행했다.
새롭게 알게된 것은 다음과 같다.
1. 데이터 파이프라인 종류를 명확히 나눌 수 있게 되었다.
기존에는 다양한 형태가 있다는 것만 알았지, 사용 용도에 따른 ETL 종류를 더 명확히 알게 되었다.
2. 데이터 파이프라인 만들 때, Full Refresh와 멱등성을 보장해야된다는 점 등 고려사항을 이번에 처음 듣고 알게 되었다.
이후에 ETL 만들 때 고려사항 꼭 확인해보며 작업해야겠다.
다음은 학습한 내용을 요약 정리한 것이다.
[1] 데이터 파이프라인(ETL)
ETL : Extract, Transform and Load
- Extract : 데이터 소스로부터 데이터를 가져와 dump하는 것
- Transform : 사용 목적에 맞게 데이터 포맷을 바꾸는 것
- Load : Data Warehouse에 테이블 형태로 적재하는 것
ETL vs ELT
- ETL : 데이터를 외부 → 데이터 웨어하우스 내부로 (데이터 엔지니어 파트)
- ELT : 데이터 웨어하우스 내부 데이터 조작하여 새로운 데이터 만드는 프로세스 (데이터 분석가 파트)
- 데이터 레이크에서 많이 발생하는 작업
- Data Lake에서 Spark와 Athena를 이용해 Data Transforms 과정으로 Data Warehouse 에 적재하는 것도 ELT라고 부름
- dbt(Data Build Tool)이 가장 유명함 (Analytics Engineering 이라고 부름)
Data Lake vs Data Warehouse
- Data Lake : 더 Scalable한 Strorage
- 구조화 + 비구조화 데이터 저장
- Data Warehouse : 보존 기한이 있는 구조화된 데이터 저장 및 sql로 처리하는 스토리지
- 보통 BI툴들과 연결되어 사용됨
Data Pipeline = ETL = Data workflow = DAG
Data Pipeline
데이터를 소스 → 목적지로 복사하는 작업
- 보통 코딩(python/scala) or SQL을 통해 이뤄짐
- 대부분 목적지는 DW임. 혹은 캐시 시스템, 프로덕션 데이터베이스, NoSQL 등
DAG
Airflow에서는 이를 DAG(Directed Acyclic Graph) 라고 부름
- Directed : 한 방향
- Acyclic : 루프 없음
- Graph : 노드가 작업
[2] 데이터 파이프라인 종류
- Raw Data ETL Jobs
- 외부와 내부 데이터 소스에서 데이터 읽고, 포맷 변환 후, DW에 로드
- 보통 데이터 엔지니어 업무
- Summary/Report Jobs
- DW나 DL에서 데이터 읽어 다시 DW에 쓰는 ETL
- raw data로 리포트나 요약테이블 만드는 용도 혹은 AB테스트 결과 분석하는 데이터 파이프라인
- 보통 DBT툴 쓰는 Analytics Engineer 업무
- Production Data Jobs
- DW를 읽어 다른 Storage(프로덕션 환경)으로 쓰는 것
- 용도 : 요약정보가 프로덕션 환경에서 성능 이유로 필요한 경우, ML모델에 필요한 피쳐들 미리 계산하는 경우
- 주로 타겟 스토리지는 MySQL과 같은 관계형 데이터베이스 (OLTP)
- DW를 읽어 다른 Storage(프로덕션 환경)으로 쓰는 것
[3] 데이터 파이프라인을 만들 때 고려할 점
데이터 파이프라인 현실/실상
실상은 데이터 파이프라인은 많은 이유로 실패함
- 원인은 버그도 있으며, 데이터 파이프라인 수가 늘어남에따라 서로에 대한 의존도가 증가하여 복잡해질 수 있으며, 유지보수 비용도 기하급수적으로 늘어난다.
데이터 파이프라인을 만들 때 고려할 점
- Full Refresh
- 데이터가 작으면 Full Refresh형태로 매번 통채로 복사해서 테이블 만든다.
- 새로 생성되거나 업데이트 된 레코드만 업데이트 하는 형태인 Incremental update만이 가능하다면 해당 형태로 진행
- 하지만 이러한 형태는 오류 발생시 복구 어렵다는 단점 존재함
- 멱등성(Idempotency) 보장
- 데이터 파이프라인을 다수 실행해도 최종 테이블 내용이 달라지지 않아야함 (중복 데이터 발생하면 안됨)
- SQL의 transaction이 꼭 필요한 기술
- 실패한 데이터 파이프라인을 재실행하는 방법과 Backfill이 쉬워야함
- Backfill : 과거 데이터를 다시 채우는 과정. Airflow가 해당 특징에 강점 지님
- 데이터 파이프라인의 입력과 출력을 명확히 하고 문서화하기
- 누가 데이터를 요청했는지 기록하기
- 이게 이후 데이터 카탈로그로 들어가서 데이터 디스커버리에 사용가능함
- 주기적으로 불필요한 데이터 삭제하기
- 데이터 디스커버리 툴을 사용해서 불필요한 데이터 확인하고 삭제 가능함
- 데이터 파이프라인 사고시 마다 사고 리포트(post-mortem) 작성하기
- 동일 사고 재발생 방지 목적
- 중요한 데이터 파이프라인의 입력과 출력을 체크하기
- 입력 레코드와 출력 레코드 수 체크나 pk uniquness 보장되는지, 중복 레코드 체크 하는 과정 필요함
- ⇒ 데이터 대상 유닛 테스트
[4] python으로 redshift에 데이터 로드하는 ETL 코드 작성해보기
실습 구성: S3에 저장된 CSV 파일을을 ETL에 각각 해당하는 함수를 실행하여 불러오고, 이를 가공하여 Redshift의 테이블에 적재하는 실습이다.
이때, 잡이 full refresh하게 실행되도록 하는 것이 핵심이다.
Redshfit 연결 객체 생성함수
def get_Redshift_connection():
host = "url"
redshift_user = "id"
redshift_pass = "pw"
port = 5439
dbname = "dev"
conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
dbname=dbname,
user=redshift_user,
password=redshift_pass,
host=host,
port=port
))
conn.set_session(autocommit=True)
return conn.cursor()
Extract 함수
주어진 url로부터 데이터를 추출하는 함수
import requests
def extract(url):
f = requests.get(url)
return (f.text)
Transform 함수
extract 함수로 가져온 raw 데이터를 줄 단위로 접근하여, 이름과 성별을 ',' 구분자로 하여 리스트에 저장하는 함수
def transform(text):
lines = text.strip().split("\\n")
records = []
for l in lines[1:]: # 헤더(첫 줄) 제외 하고 데이터 저장
(name, gender) = l.split(",")
records.append([name, gender])
return records
extract 함수로 가져온 raw 데이터를 줄 단위로 접근하여, 이름과 성별을 ',' 구분자로 하여 리스트에 저장하는 함수
Load 함수
주어진 records 리스트를 redshift의 테이블에 한 줄씩 데이터를 로드하는 함수
def load(records):
"""
[ 흐름 구성 ]
1. BEGIN
2. DELETE
3. INSERT
4. END
5. 실패시 ROLLBACK
6. 실패여부와 상관없이 완료시 connection close
"""
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute("DELETE FROM (table명)")
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
insert_sql = "INSERT INTO (table명) VALUES ('{n}', '{g}')".format(n=name, g=gender)
cur.execute(insert_sql)
cur.execute("END;") # cur.execute("COMMIT;")
except (Exception, psycopg2,DatabaseError) as e:
cur.execute("ROLLBACK")
print("Error occurred during transaction: ", str(e))
raise e
finally:
cur.close()
- BEGIN과 END로 트랜잭션를 시작하고 종료한다.
- 트랜잭션 안에서는 기존의 테이블의 값을 DELETE하고, 데이터를 INSERT한다. (Full Refresh 잡)
- try-except 문을 사용하여, 예외 발생 시 ROLLBACK으로 트랜잭션 롤백하고, 예외 발생 하지 않으면 END문으로 트랜잭션 종료한다.
- 이때 except에서 꼭 raise를 해주어 발생한 원래 exception이 위로 전파되어, 명확히 에러가 드러나도록 한다.
- 예외 발생 여부와 관계없이 cur.close()로 커서를 닫는다.
ETL 코드 실행
link = "(s3버킷에 있는 csv 파일)"
data = extract(link)
lines = transform(data)
load(lines)
결과 조회하기
%%sql
SELECT COUNT(1)
FROM (테이블명);
위 ETL 코드를 다시 실행해도 전체 레코드 수는 100으로, 여러번 실행해도 동일한 결과가 나오는 것을 확인할 수 있다.
이는 load함수에서 Redshift에 테이블에 데이터를 적재할때, delete from하고 insert해주는 작업을 하였기에 가능한 일이다. (Full Refresh 잡)
잠깐! 트랜잭션이란?
Atomic하게 실행되어야 하는 SQL들을 묶어서 하나의 작업처럼 처리하는 방법이다.
- BEGIN과 END(=COMMIT) 사이에 SQL을 배치한다.
- python의 경우 try-except문으로 sql 을 묶고, 에러 발생시 rollback 안나면 commit을 실행하도록 한다
두 가지 종류의 트랜잭션
두 가지 중 어떤 것을 선택할지는 개인 혹은 팀에 따라 다름
autocommit = True
- 기본적으로 모든 SQL이 바로 물리 테이블에 커밋됨
- 바꾸고 싶다면 BEGIN; END; (Rollback) 사용함
autocommit = True
- 기본적으로 모든 SQL이 바로 물리 테이블에 커밋 안됨. 모두 스테이징 상태로 존재
- 커넥션 객체의 .commit()과 .rollback()함수로 커밋할지를 결정함
'#️⃣ Data Engineering > Airflow' 카테고리의 다른 글
Airflow 실습_OpenAPI와 Upsert를 이용한 DAG (0) | 2023.06.08 |
---|---|
Airflow 실습_기본코드 정리 및 ETL 코드 개선하기 (0) | 2023.06.07 |
Airflow 설치(Docker-compose)와 간단한 실습 (0) | 2023.06.07 |
Airflow란? 구성요소 알아보기 (0) | 2023.06.07 |
[Airflow] Airflow 설치 (Docker 이용) (0) | 2022.02.06 |