๐ย ๊ณต๋ถ ๋ด์ฉ
Airflow ์ค์ต : DAG ๊ตฌํํ๊ธฐ
Primary Key Uniqueness ๋ณด์ฅํ๊ธฐ
ํด์ฆ
|
|
์ฌ๊ธฐ์ transaction์ผ๋ก ์ฒ๋ฆฌ๋์ด์ผ ํ๋ ์ต์ ๋ฒ์์ SQL๋ค์?
Upsert
Insert & Update
- primary key๋ฅผ ๊ธฐ์ค์ผ๋ก
- ์กด์ฌํ๋ ๋ ์ฝ๋๋ผ๋ฉด, ์ ์ ๋ณด๋ก ์์
- ์กด์ฌํ์ง ์๋ ๋ ์ฝ๋๋ผ๋ฉด ์ ๋ ์ฝ๋ ์ ์ฌ
- DW๋ง๋ค UPSERT๋ฅผ ํจ์จ์ ์ผ๋ก ์คํํด์ฃผ๋ ๋ฌธ๋ฒ์ ์ง์ํด์ค
Backfill
๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์ค๋ ๋ฐ ์คํจํ๊ฑฐ๋, ์ฝ์ด์จ ๋ฐ์ดํฐ์ ๋ฌธ์ ๋๋ฌธ์ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ์ฌ์คํํ์ฌ ๋ค์ ์ฝ์ด์์ผ ํ๋ ๊ณผ์
Incremental Update ์คํจ
- ํ๋ฃจ์ ํ ๋ฒ ๋์ํ๋ incremental update
- ์ค๊ฐ์ ๋ฉฐ์น ๋์ ์ด ๊ณผ์ ์ด ์คํจํ ๊ฒฝ์ฐ, ๊ทธ ์ดํ์ ์คํ์๋ ์ํฅ์ ์ฃผ๊ฒ ๋์ด์์
- ์คํจํ ๋ถ๋ถ์ ์ฌ์คํ -> ์ผ๋ง๋ ์ค์ํ๊ฐ?
Backfill์ ์ฉ์ด์ฑ
์คํจํ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ์ฌ์คํ ์ด ์ผ๋ง๋ ์ฉ์ดํ ๊ตฌ์กฐ์ธ๊ฐ?
full refresh
- ๋ฌธ์ ๊ฐ ์๊ธฐ๋ฉด ๋ค์ ์คํํ๋ฉด ๋จ
- backfill ๋ถํ์
Incremental Update
- ๋ฐ์ดํฐ๋ฅผ ๋ค์ ์ฝ์ด์์ผ ํ๋ฉด ์ฒ์๋ถํฐ ๋ชจ๋ ๋ค ์ฌ์คํํด์ผ ํจ ( ํจ์จ์ฑ์ ๋ ์ข์ ์ ์์ง๋ง, ์ด์&์ ์ง๋ณด์๊ฐ ์ด๋ ค์์ง)
- backfill ํ์
Airflow : backfill์ ์ฝ๊ฒ ํ ์ ์๋๋ก ๋์์ธ๋จ
Backfill of Daily DAG
Daily DAG
์ง๊ธ ์๊ฐ์ ๊ธฐ์ค์ผ๋ก ์ด์ ๋ ์ง๋ฅผ ๊ณ์ฐ, ์ด์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์ด
๋งค์ผ ๋ฌธ์ ์์ด ๋์ํ๋ฉด OK, BUT ๋ฐ์ดํฐ ์ฝ์ด์ค๊ธฐ์ ์คํจํ๋ ๊ฒฝ์ฐ ? -> ํน์ ๋ ์ง์ ๋ฐ์ดํฐ๊ฐ ๋น ์ ธ์์ -> ์คํจํ ๋ ๊ธฐ์ค์ผ๋ก ์ ๋ ์ ๋ฐ์ดํฐ๋ฅผ ์ ๋ฐ์ดํธ ํ๋ ์ฝ๋๋ฅผ ์๋ก ์์ฑํด์ผ ํจ (์ํ๋ ๋ ์ง๋ฅผ ํ๋์ฝ๋ฉํ๋ ๋ฐฉ์)
1 2 3 4
from datetime import datetime, timedelta # y = datetime.now() - timedelta(1) # yesterday = datetime.strftime(y, '%Y-%m-%d') yesterday = '2023-01-01'
์ค์ํ๊ธฐ ์ฝ๊ณ ์์ ํ๋ ๋ฐ ์๊ฐ์ด ๋ง์ด ๊ฑธ๋ฆผ
DAG๋ฅผ ์์ฑํ ๋ ๋ถํฐ backfill์ ์ฝ๊ฒ ๋ง๋ค์ด์ผ ํจ
Backfill์ ์ฉ์ดํ๊ฒ ํ๋ ๊ตฌ์กฐ
- ๋ ์ง๋ณ๋ก backfill ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋ก
- ๋ ์ง๋ ์์คํ ์์ ETL ์ธ์๋ก ์ ๊ณต
- ๋ ์ง๋ฅผ ๋ฐ๋ก ๊ณ์ฐํ์ง ์๊ณ , ์์คํ ์ด ์ ํด์ค ๋ ์ง๋ฅผ ์ฌ์ฉ
Airflow์ ๊ตฌ์กฐ
- ETL๋ณ๋ก ์คํ๋ ์ง, ๊ฒฐ๊ณผ๋ฅผ ๋ฉํ๋ฐ์ดํฐ DB์ ๊ธฐ๋ก
- ๋ชจ๋ DAG ์คํ์
execution_date
์ง์ execution_date
๋ฅผ ๋ฐํ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ๊ฐฑ์ ํ๋๋ก ์ฝ๋ ์์ฑ
DAG Parameter
date ๊ด๋ จ parameter ์ ๋ฆฌ
Airflow ์ค์ผ์ฅด๋ง
schedule_interval์ด ์ง๋ ์ดํ์, execution_date ๊ธฐ์ค์ผ๋ก ์คํ์ด ๋๋ค.
start_date : 2023-12-01 00:00:00
Interval | execution_date | ์คํ ๋ ์ง |
---|---|---|
1 day | 2023-12-01 | 2023-12-02 |
2023-12-02 | 2023-12-03 | |
1 hour | 2023-12-01 01:00:00 | 2023-12-01 02:00:00 |
10 minutes | 2023-12-01 00:20:00 | 2023-12-01 00:30:00 |
์ฐธ๊ณ : https://it-sunny-333.tistory.com/157
start_date
์ฒ์
์ฝ์ด์์ผ ํ๋ ๋ฐ์ดํฐ์ ๋ ์ง
2020-11-07์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์ด
2020-11-08 ๋ถํฐ ETL ๋์
-> start_date : 2020-11-07
execution_date
์ฝ์ด์์ผ ํ๋ ๋ฐ์ดํฐ์ ๋ ์ง
2020-11-08 ETL ๋์
-> execution_date : 2020-11-07
2023-12-14 ETL ๋์ -> execution_date : 2023-12-13 (start_date : 2020-11-07)
catchup
DAG ํ์ฑํ ์์ > start_date
๊ทธ ์ฌ์ด ๊ธฐ๊ฐ๋์ ์คํ๋์ง ์์ job์ ์ด๋ป๊ฒ ํ ์ง ์ ํ๋ ํ๋ผ๋ฏธํฐ
True
: ๋ํดํธ๊ฐ, ์คํ๋์ง ์์ job์ ๋ชจ๋ ์คํํ์ฌ ๋ฐ๋ผ์ก์ผ๋ ค๊ณ ํจFalse
: ์คํ๋์ง ์์ job์ ๋ฌด์ํจ- ์ ๋ชจ๋ฅด๋ฉด ํญ์ False๋ก ์ธํ !
๐ย CHECK
(์ด๋ ต๊ฑฐ๋ ์๋กญ๊ฒ ์๊ฒ ๋ ๊ฒ ๋ฑ ๋ค์ ํ์ธํ ๊ฒ๋ค)
openweathermap api
- https://openweathermap.org/api/one-call-3
- ๊ตฌ๋ ํ ์ดํ์ ๋ฐ๋ก ํ๊ฐ๊ฐ ์๋๋ ๋ฌธ์ ๊ฐ ์์..
- ๊ธฐ์กด ์ฝ๋๋ฅผ 2.5 -> 3.0 ์ผ๋ก ๋ฐ๊ฟ์ผ ํจ
ํด์ฆ
DAG ์์ฑ ๊ณผ์
UpdateSymbol_v2์ Incremental Update ๋ฐฉ์ ์์ ํด๋ณด๊ธฐ
์์ ๋ฐฐ์ด ROW_NUMBER ๋ฐฉ์์ ์ฌ์ฉํด์ Primary key๊ฐ ๋์ผํ ๋ ์ฝ๋๋ค์ ์ฒ๋ฆฌํ๊ธฐ
1 2 3 4 5 6 7
alter_sql = f"""DELETE FROM {schema}.{table}; INSERT INTO {schema}.{table} SELECT date, "open", high, low, close, volume FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq FROM t ) WHERE seq = 1;"""
Incremental Update๊ฐ ์งํ๋๋์ง ํ์ธํ๊ธฐ ์ํด, ์ด์ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ
stock_info_v2
์ ๋ ์ฝ๋๋ฅผ ๋ณต์ฌํด์๊ณ , created_date๋ฅผ ์ด์ ๋ก ์ง์ ํ๋ค.Update ์ด์
Update ์ดํ
์๋ก ๋ ์ฝ๋๊ฐ ํ๋ ์ถ๊ฐ๋์ผ๋ฉฐ, created_date๊ฐ DAG ์คํ ์๊ฐ์ผ๋ก ๋ณ๊ฒฝ๋๊ฒ์ ๋ณผ ์ ์๋ค.
gcp sdk ํ์ฉํด์ ์๋ฒ<->๋ก์ปฌ ํ์ผ ํต์
|
|
๋ค์ด๋ก๋ (์๋ฒ -> ๋ก์ปฌ)
- ์๋ฒ์ชฝ ํด๋ ์์ ๋ชจ๋ ํ์ผ์ ์ ๋ถ ๋ค์ด๋ก๋
|
|
์ ๋ก๋ (๋ก์ปฌ -> ์๋ฒ)
- ๊ถํ์ด ์์ด์ผ ์
๋ก๋ ๊ฐ๋ฅ ->
root@
- ๋ก์ปฌ ํน์ ํ์ผ์ ์๋ฒ์ชฝ ํด๋ ์์ผ๋ก ์ ๋ก๋
|
|
- airflow ์ ์ ์ ์
๋ก๋ํ ํ์ผ์ ๋ํ ๊ถํ์ด ์์ด์,
chmor 664 {filename}
์ผ๋ก ์์ ๊ถํ ๋ถ์ฌํจ
vscode remote ssh ๊ด๋ จ ๋ฌธ์
- GCP Vm instance์ RAM์ 2GB์ธ๋ฐ, ์ด ์ค airflow ํ๋ก์ธ์ค๊ฐ 1.3GB์ ๋๋ฅผ ์ฐจ์งํจ
- vscode๋ฅผ ์ํ ssh server๋ 1GB์ ๋ ํ์ํจ
- ๊ทธ๋์ vscode remote ssh๋ก ์ฐ๊ฒฐํ๊ณ ์์ผ๋ฉด ์๊พธ ๋ฉ์ถ๊ณ airflow ์น ์๋ฒ๊ฐ ์ ์๋์๊ฐ๋ ๋ฑ ๋ฌธ์ ๊ฐ ์์๋ ๊ฒ
- RAM ์ฆ์คํ๊ธฐ๋ก ๊ฒฐ์ (์ด์ฐจํผ ์ค๋ ์ฌ์ฉํ ๊ฒ ์๋๊ณ ๋ฌด๋ฃ ํฌ๋ ๋ง์ด๋ผ ๊ด์ฐฎ)
โ ๋๋ ์
- ์๊ณ ๋ฆฌ์ฆ ๋ฌธ์ ํ ๋ ํจ์๋ณ๋ก ๋๋๋ ์ต๊ด, ์ฃผ์ ์ ๋ ์ต๊ด ๋ง๋ค์ด๋ณด๊ธฐ
- ๋ค์ ์ง๋ฌธ์ ๋ํด ์๊ฐ/์ฐพ์๋ณด๊ธฐ
- DAG์์ ๊ฐ์ ํจ์, task๋ฅผ ์ฌ๋ฌ๋ฒ ํธ์ถ ํ ์ ์๋๊ฐ
- @task ์ธ์ ๋ค๋ฅธ decorator ์๋์ง ์ฐพ์๋ณด๊ธฐ
- ๊ณผ์ ์์ ๊ธฐ์กด created_date๋ฅผ ๊ฐ์ ธ์ค์ง ์๊ณ ์๋ก ๋ ์ฝ๋๋ฅผ ๋ง๋ค ๋ ์์ฑํ๋ ์ด์ ๋ ๋ญ๊น?