๐ย ๊ณต๋ถ ๋ด์ฉ
OLTP ํ ์ด๋ธ ๋ณต์ฌํ๊ธฐ
- Production MySQL Tables (OLTP) -> AWS Redshift (OLAP)
- MySQL Tables(Source) -> Airflow Server
COPY Command
: -> S3(Cloud Storage) -> Data WarehouseINSERT Command
: -> Data Warehouse
์ค์ต ๊ด๋ จ ์ค์
๊ถํ ์ค์
๋ฒํท๊ณผ DB ์๋ฒ๋ฅผ ๊ฐ์ง ๊ณ์ ์์ ์ค์
1. Airflow DAG์์ S3 ์ ๊ทผ (Write ๊ถํ)
- IAM User๋ฅผ ์์ฑ
- User์๊ฒ S3๋ฒํท์ ๋ํ Read/Write ๊ถํ์ ์ค์
- User์ access key, secret key ์ฌ์ฉ
Custom Policy
|
|
2. Redshift์์ S3 ์ ๊ทผ (Read ๊ถํ)
- Redshift์์ S3 ์ ๊ทผํ ์ ์๋ Role ์์ฑ ํ Redshift์ ์ง์
Airflow Connection ์ค์
S3 : AWS type ์ ํ ํ access key id, secret access key ์ ๋ ฅ (Extra์ ์ง์ญ ์ ๋ ฅ)
MySQL : ํธ์คํธ, schema, id/pw, port ์ ๋ ฅ
DB Tables
MySQL(OLTP) Table
|
|
Redshift(OLAP) Table
- DAG ์คํ ์ด์ ์ ํด๋น ํ ์ด๋ธ์ ๋ฏธ๋ฆฌ ์์ฑํด์ผ ํจ
|
|
Full Refresh
tasks in DAG
์ด๋ฏธ ์์ฑ๋์ด์๋ operator์ ํ๋ผ๋ฏธํฐ๊ฐ๋ง ์์ฑํ์ฌ task๋ฅผ ์์ฑ
Incremental Update๋ ๊ฐ์ ๋ฐฉ์ ์ฌ์ฉ
|
|
SqlToS3Operator
MySQL SQL ๊ฒฐ๊ณผ -> S3
์ ์ฅ ์์น : s3://s3_bucket/s3_key (s3://grepp-data-engineering/{user_id}-nps)
Task code
1 2 3 4 5 6 7 8 9 10 11 12
mysql_to_s3_nps = SqlToS3Operator( task_id = 'mysql_to_s3_nps', query = "SELECT * FROM prod.nps", s3_bucket = s3_bucket, s3_key = s3_key, #{schema} - {table} sql_conn_id = "mysql_conn_id", aws_conn_id = "aws_conn_id", verify = False, replace = True, #ํ์ผ์ด ์ด๋ฏธ ์กด์ฌํ๋ฉด ๋ฎ์ด์ pd_kwargs={"index": False, "header": False}, #index, header๋ฅผ ์ ์ธํ๊ณ copy dag = dag )
S3ToRedshiftOperator
S3 -> Redshift ํ ์ด๋ธ ( Redshift : {schema}.nps )
COPY command is used
Task code
1 2 3 4 5 6 7 8 9 10 11 12
s3_to_redshift_nps = S3ToRedshiftOperator( task_id = 's3_to_redshift_nps', s3_bucket = s3_bucket, s3_key = s3_key, schema = schema, table = table, copy_options=['csv'], # file type : csv method = 'REPLACE', # REPLACE : ํ ์ด๋ธ์ด ์ด๋ฏธ ์กด์ฌํ๋ฉด ๋ฎ์ด์ (full refresh) redshift_conn_id = "redshift_dev_db", aws_conn_id = "aws_conn_id", dag = dag )
Incremental Update
MySQL/PostgreSQL Table ์กฐ๊ฑด
- created(timestamp) : Optional
modified(timestamp)
- deleted(boolean) : ๋ ์ฝ๋๋ฅผ ์ญ์ ํ์ง ์๊ณ
deleted = True
๊ตฌํ ๋ฐฉ์
1. ROW_NUMBER๋ก ์ง์ ๊ตฌํ
- Redshift์ A ํ ์ด๋ธ์ temp_A ํ ์ด๋ธ๋ก ๋ณต์ฌ
- MySQL์ A ํ
์ด๋ธ ๋ ์ฝ๋ ์ค
modified == execution_date(์ง๋ ์ผ)
์ธ ๋ชจ๋ ๋ ์ฝ๋๋ฅผ temp_A๋ก ๋ณต์ฌ- MySQL์ ๋ค์ ์ฟผ๋ฆฌ๋ฅผ ๋ณด๋ด๊ณ ๊ฒฐ๊ณผ๋ฅผ ํ์ผ๋ก ์ ์ฅ. S3๋ก ์ ๋ก๋ํ๊ณ COPY ์ํ
1 2 3
SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
- temp_A์ ๋ ์ฝ๋๋ค์ primary key๋ฅผ ๊ธฐ์ค์ผ๋ก ํํฐ์ ํ ๋ค์, modified ๊ฐ์ ๊ธฐ์ค์ผ๋ก DESC ์ ๋ ฌ
- ์ผ๋ จ๋ฒํธ๊ฐ 1์ธ ๊ฒ๋ค๋ง ๋ค์ A๋ก ๋ณต์ฌ
2. UPSERT ์ฌ์ฉ (์ค์ต์์ ์ฌ์ฉ)
S3ToRedshiftOperator
๋ก ๊ตฌํ
- query ํ๋ผ๋ฏธํฐ
1 2 3
SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
- method ํ๋ผ๋ฏธํฐ :
โUPSERTโ
- upsert_keys ํ๋ผ๋ฏธํฐ : Primary key
- nps ํ ์ด๋ธ์ด๋ผ๋ฉด โidโ ํ๋๋ฅผ ์ฌ์ฉ
tasks in DAG
SqlToS3Operator
'{{ execution_date }}'
๋ฅผ ์ฟผ๋ฆฌ sql์ ๋ฃ์์ผ๋ก์ airflow๊ฐ ๋๊ฒจ์ฃผ๋ execution date ๊ฐ์ ํ์ฉํ ์ ์์- ์ฌ๊ธฐ์์
modified
๊ฐ ์๋๋ผcreated_at
์ธ ์ด์ ๋ ์ ์ด์ ํ ์ด๋ธ์modified
์ปฌ๋ผ์ด ์กด์ฌํ์ง ์๊ธฐ ๋๋ฌธ
|
|
S3ToRedshiftOperator
|
|
์ค์ต ์งํ
Full Refresh ํ ์คํธํด๋ณด๊ธฐ
1
airflow dags test MySQL_to_Redshift
- ์คํ๋๋ sql query
1 2 3 4 5
COPY imsolem1226.nps FROM 's3://grepp-data-engineering/imsolem1226-nps' credentials 'aws_access_key_id=xxx;aws_secret_access_key=xxx' csv;
- ์คํ ์ฑ๊ณต ํ Redshift์ ์ ์ฅ๋ ๋ฐ์ดํฐ ํ์ธ
- ์คํ๋๋ sql query
Incremental Update (Upsert) ํ ์คํธํด๋ณด๊ธฐ
1
airflow dags test MySQL_to_Redshift_v2
- ์คํ๋๋ sql query
1 2 3 4 5 6 7 8
COPY # FROM 's3://grepp-data-engineering/imsolem1226-nps' credentials 'aws_access_key_id=xxx;aws_secret_access_key=xxx' csv; -- ์์ ํ ์ด๋ธ๊ณผ id๊ฐ ๊ฐ์ ๋ฐ์ดํฐ๋ค(์ค๋ณต๋ ๋ฐ์ดํฐ๋ค)์ ๋ชจ๋ ์ง์ฐ๊ณ , ์์ ํ ์ด๋ธ์ ๋ฐ์ดํฐ๋ง ๋ค์ ์ถ๊ฐํ๋ค DELETE FROM imsolem1226.nps USING #nps WHERE nps.id = #nps.id; INSERT INTO imsolem1226.nps SELECT * FROM #nps;
- ์คํ๋๋ sql query
Backfill ์คํ
- ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ๋ฒ ๋ค์ ์ฝ์ด์์ผ ํ๋ ๊ฒฝ์ฐ ํ๋ฒ์ ํ๋์ฉ vs. ํ๋ฒ์ ์ฌ๋ฌ๊ฐ์ฉ
- ๋์์ ์ฌ๋ฌ ์์ฒญ์ด ๋ค์ด๊ฐ๊ฒ ๋๋ฉด ๋ฐ์ดํฐ ์์ค ์ชฝ์ ๋ฌธ์ ๊ฐ ๋ฐ์
- ๋ฐ์ดํฐ ์ฝ๊ธฐ๋ฅผ ์ ๋ด์ผ๋ก ํ๋ worker๋ฅผ ๋ง๋ค์ด ๋๋ ๋ฐฉ์์ผ๋ก ๋ถ์ฐํ์ฌ ํด๊ฒฐ
- ๋ฐ์ดํฐ ์ฐ๊ธฐ๋ main(master?)์์ ์งํ
- ์ฌ๋ฌ ๋ ์ง์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์ค๋ ํ๋ก์ธ์ค๊ฐ ๋์์ ์คํ๋๋ ๊ฒฝ์ฐ, ๋ฐ์ดํฐ ๋ฎ์ด์ฐ๊ธฐ ๋ฑ ๋ฌธ์ ๋ฐ์
- ํ๋์ฉ ์คํํ๋๊ฒ์ด ์์ :
max_active_runs
(DAG parameter)๋ฅผ 1๋ก ์ธํ (?)
Command Line
|
|
- catchUp : True๋ก
- execution_date์ ์ฌ์ฉํด์ Incremental update๊ฐ ๊ตฌํ๋์ด ์์
- start_date๋ถํฐ ์์ํ์ง๋ง end_date์ ํฌํจํ์ง ์์
- ์คํ์์๋ ๋๋ค (๋ ์ง, ์๊ฐ ์ X)
- ๋ ์ง์์ผ๋ก ํ๊ณ ์ถ๋ค๋ฉด DAG default_args์ depends_on_past๋ฅผ True๋ก ์ค์
1 2 3
default_args = { 'depends_on_past': True, ...
How to Make Your DAG Backfill ready
๋ชจ๋ DAG๊ฐ backfill์ ํ์๋ก ํ์ง๋ ์์
- Full Refresh๋ฅผ ํ๋ค๋ฉด backfill์ ์๋ฏธ๊ฐ ์์
- Incremental Update๋ฅผ ํด๋,
๋ง์ง๋ง ์ ๋ฐ์ดํธ ์๊ฐ ๊ธฐ์ค backfill ์ ํ๋ ๊ฒฝ์ฐ์๋ execution_date์ ์ด์ฉํ backfill์ ํ์ํ์ง ์์ (Data Warehouse ํ ์ด๋ธ์ ๊ธฐ๋ก๋ ์๊ฐ ๊ธฐ์ค)
backfill ๊ตฌํ ์ํฉ ๋ฐ ์๊ฑด
- ๋ฐ์ดํฐ๊ฐ ๊ต์ฅํ ์ปค์ง๋ฉด backfill ๊ธฐ๋ฅ ๊ตฌํ์ ํ์
- airflow๋ฅผ ํ์ฉํ๋๊ฒ์ด ๋ง์ด ๋์๋จ
๋ฐ์ดํฐ ์์ค๊ฐ backfill ๋ฐฉ์์ ์ง์
ํ๋๊ฒ์ด ์ ์ผ ์ค์
backfill์ ์ด๋ป๊ฒ ๊ตฌํํ ๊ฒ์ธ๊ฐ?
execution_date
์ ์ฌ์ฉํด์ ์ ๋ฐ์ดํธํ ๋ฐ์ดํฐ ๊ฒฐ์ catchup
ํ๋๋ฅผTrue
๋ก ์ค์ - start_date/end_date์ backfillํ๋ ค๋ ๋ ์ง๋ก ์ค์
- DAG๋ฅผ ๊ตฌํํ ๋ execution_date์ ๊ณ ๋ คํด์ผ ํ๋ฉฐ,
idempotent
ํด์ผํจ
๐ย CHECK
(์ด๋ ต๊ฑฐ๋ ์๋กญ๊ฒ ์๊ฒ ๋ ๊ฒ ๋ฑ ๋ค์ ํ์ธํ ๊ฒ๋ค)