Day 44 Airflow(3)

๐Ÿ“‹ย ๊ณต๋ถ€ ๋‚ด์šฉ

Airflow ์‹ค์Šต : DAG ๊ตฌํ˜„ํ•˜๊ธฐ

Primary Key Uniqueness ๋ณด์žฅํ•˜๊ธฐ

ํ€ด์ฆˆ

1
2
3
4
5
6
7
8
# Weather_to_Redshift_v2.py

INSERT INTO {schema}.{table}
SELECT date, temp, min_temp, max_temp FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;

์—ฌ๊ธฐ์„œ transaction์œผ๋กœ ์ฒ˜๋ฆฌ๋˜์–ด์•ผ ํ•˜๋Š” ์ตœ์†Œ ๋ฒ”์œ„์˜ SQL๋“ค์€?

Upsert

Insert & Update

  • primary key๋ฅผ ๊ธฐ์ค€์œผ๋กœ
    • ์กด์žฌํ•˜๋Š” ๋ ˆ์ฝ”๋“œ๋ผ๋ฉด, ์ƒˆ ์ •๋ณด๋กœ ์ˆ˜์ •
    • ์กด์žฌํ•˜์ง€ ์•Š๋Š” ๋ ˆ์ฝ”๋“œ๋ผ๋ฉด ์ƒˆ ๋ ˆ์ฝ”๋“œ ์ ์žฌ
  • DW๋งˆ๋‹ค UPSERT๋ฅผ ํšจ์œจ์ ์œผ๋กœ ์‹คํ–‰ํ•ด์ฃผ๋Š” ๋ฌธ๋ฒ•์„ ์ง€์›ํ•ด์คŒ

Backfill

๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์˜ค๋Š” ๋ฐ ์‹คํŒจํ•˜๊ฑฐ๋‚˜, ์ฝ์–ด์˜จ ๋ฐ์ดํ„ฐ์˜ ๋ฌธ์ œ ๋•Œ๋ฌธ์— ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์„ ์žฌ์‹คํ–‰ํ•˜์—ฌ ๋‹ค์‹œ ์ฝ์–ด์™€์•ผ ํ•˜๋Š” ๊ณผ์ •

Incremental Update ์‹คํŒจ

  • ํ•˜๋ฃจ์— ํ•œ ๋ฒˆ ๋™์ž‘ํ•˜๋Š” incremental update
  • ์ค‘๊ฐ„์— ๋ฉฐ์น ๋™์•ˆ ์ด ๊ณผ์ •์ด ์‹คํŒจํ•œ ๊ฒฝ์šฐ, ๊ทธ ์ดํ›„์˜ ์‹คํ–‰์—๋„ ์˜ํ–ฅ์„ ์ฃผ๊ฒŒ ๋˜์–ด์žˆ์Œ
  • ์‹คํŒจํ•œ ๋ถ€๋ถ„์„ ์žฌ์‹คํ–‰ -> ์–ผ๋งˆ๋‚˜ ์ค‘์š”ํ•œ๊ฐ€?

Backfill์˜ ์šฉ์ด์„ฑ

์‹คํŒจํ•œ ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์˜ ์žฌ์‹คํ–‰์ด ์–ผ๋งˆ๋‚˜ ์šฉ์ดํ•œ ๊ตฌ์กฐ์ธ๊ฐ€?

  • full refresh

    • ๋ฌธ์ œ๊ฐ€ ์ƒ๊ธฐ๋ฉด ๋‹ค์‹œ ์‹คํ–‰ํ•˜๋ฉด ๋จ
    • backfill ๋ถˆํ•„์š”
  • Incremental Update

    • ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์‹œ ์ฝ์–ด์™€์•ผ ํ•˜๋ฉด ์ฒ˜์Œ๋ถ€ํ„ฐ ๋ชจ๋‘ ๋‹ค ์žฌ์‹คํ–‰ํ•ด์•ผ ํ•จ ( ํšจ์œจ์„ฑ์€ ๋” ์ข‹์„ ์ˆ˜ ์žˆ์ง€๋งŒ, ์šด์˜&์œ ์ง€๋ณด์ˆ˜๊ฐ€ ์–ด๋ ค์›Œ์ง)
    • backfill ํ•„์š”

Airflow : backfill์„ ์‰ฝ๊ฒŒ ํ•  ์ˆ˜ ์žˆ๋„๋ก ๋””์ž์ธ๋จ

Backfill of Daily DAG

Daily DAG

  • ์ง€๊ธˆ ์‹œ๊ฐ„์„ ๊ธฐ์ค€์œผ๋กœ ์–ด์ œ ๋‚ ์งœ๋ฅผ ๊ณ„์‚ฐ, ์–ด์ œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์˜ด

  • ๋งค์ผ ๋ฌธ์ œ ์—†์ด ๋™์ž‘ํ•˜๋ฉด OK, BUT ๋ฐ์ดํ„ฐ ์ฝ์–ด์˜ค๊ธฐ์— ์‹คํŒจํ•˜๋Š” ๊ฒฝ์šฐ ? -> ํŠน์ • ๋‚ ์งœ์˜ ๋ฐ์ดํ„ฐ๊ฐ€ ๋น ์ ธ์žˆ์Œ -> ์‹คํŒจํ•œ ๋‚  ๊ธฐ์ค€์œผ๋กœ ์ „๋‚ ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์—…๋ฐ์ดํŠธ ํ•˜๋Š” ์ฝ”๋“œ๋ฅผ ์ƒˆ๋กœ ์ž‘์„ฑํ•ด์•ผ ํ•จ (์›ํ•˜๋Š” ๋‚ ์งœ๋ฅผ ํ•˜๋“œ์ฝ”๋”ฉํ•˜๋Š” ๋ฐฉ์‹)

    1
    2
    3
    4
    
    from datetime import datetime, timedelta
    # y = datetime.now() - timedelta(1)
    # yesterday = datetime.strftime(y, '%Y-%m-%d')
    yesterday = '2023-01-01'
    
  • ์‹ค์ˆ˜ํ•˜๊ธฐ ์‰ฝ๊ณ  ์ˆ˜์ •ํ•˜๋Š” ๋ฐ ์‹œ๊ฐ„์ด ๋งŽ์ด ๊ฑธ๋ฆผ

DAG๋ฅผ ์ƒ์„ฑํ•  ๋•Œ ๋ถ€ํ„ฐ backfill์„ ์‰ฝ๊ฒŒ ๋งŒ๋“ค์–ด์•ผ ํ•จ

Backfill์„ ์šฉ์ดํ•˜๊ฒŒ ํ•˜๋Š” ๊ตฌ์กฐ

  • ๋‚ ์งœ๋ณ„๋กœ backfill ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋ก
  • ๋‚ ์งœ๋Š” ์‹œ์Šคํ…œ์—์„œ ETL ์ธ์ž๋กœ ์ œ๊ณต
  • ๋‚ ์งœ๋ฅผ ๋”ฐ๋กœ ๊ณ„์‚ฐํ•˜์ง€ ์•Š๊ณ , ์‹œ์Šคํ…œ์ด ์ •ํ•ด์ค€ ๋‚ ์งœ๋ฅผ ์‚ฌ์šฉ

Airflow์˜ ๊ตฌ์กฐ

  • ETL๋ณ„๋กœ ์‹คํ–‰๋‚ ์งœ, ๊ฒฐ๊ณผ๋ฅผ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ DB์— ๊ธฐ๋ก
  • ๋ชจ๋“  DAG ์‹คํ–‰์— execution_date ์ง€์ •
  • execution_date๋ฅผ ๋ฐ”ํƒ•์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐฑ์‹ ํ•˜๋„๋ก ์ฝ”๋“œ ์ž‘์„ฑ

DAG Parameter

date ๊ด€๋ จ parameter ์ •๋ฆฌ

Airflow ์Šค์ผ€์ฅด๋ง

schedule_interval์ด ์ง€๋‚œ ์ดํ›„์—, execution_date ๊ธฐ์ค€์œผ๋กœ ์‹คํ–‰์ด ๋œ๋‹ค.

start_date : 2023-12-01 00:00:00

Intervalexecution_date์‹คํ–‰ ๋‚ ์งœ
1 day2023-12-012023-12-02
2023-12-022023-12-03
1 hour2023-12-01 01:00:002023-12-01 02:00:00
10 minutes2023-12-01 00:20:002023-12-01 00:30:00

์ฐธ๊ณ  : https://it-sunny-333.tistory.com/157

start_date

์ฒ˜์Œ ์ฝ์–ด์™€์•ผ ํ•˜๋Š” ๋ฐ์ดํ„ฐ์˜ ๋‚ ์งœ

  • 2020-11-07์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์˜ด

  • 2020-11-08 ๋ถ€ํ„ฐ ETL ๋™์ž‘

    -> start_date : 2020-11-07

execution_date

์ฝ์–ด์™€์•ผ ํ•˜๋Š” ๋ฐ์ดํ„ฐ์˜ ๋‚ ์งœ

  • 2020-11-08 ETL ๋™์ž‘

    -> execution_date : 2020-11-07

  • 2023-12-14 ETL ๋™์ž‘ -> execution_date : 2023-12-13 (start_date : 2020-11-07)

catchup

DAG ํ™œ์„ฑํ™” ์‹œ์  > start_date
๊ทธ ์‚ฌ์ด ๊ธฐ๊ฐ„๋™์•ˆ ์‹คํ–‰๋˜์ง€ ์•Š์€ job์„ ์–ด๋–ป๊ฒŒ ํ•  ์ง€ ์ •ํ•˜๋Š” ํŒŒ๋ผ๋ฏธํ„ฐ

  • True: ๋””ํดํŠธ๊ฐ’, ์‹คํ–‰๋˜์ง€ ์•Š์€ job์„ ๋ชจ๋‘ ์‹คํ–‰ํ•˜์—ฌ ๋”ฐ๋ผ์žก์œผ๋ ค๊ณ  ํ•จ
  • False: ์‹คํ–‰๋˜์ง€ ์•Š์€ job์„ ๋ฌด์‹œํ•จ
  • ์ž˜ ๋ชจ๋ฅด๋ฉด ํ•ญ์ƒ False๋กœ ์„ธํŒ…!

๐Ÿ‘€ย CHECK

(์–ด๋ ต๊ฑฐ๋‚˜ ์ƒˆ๋กญ๊ฒŒ ์•Œ๊ฒŒ ๋œ ๊ฒƒ ๋“ฑ ๋‹ค์‹œ ํ™•์ธํ•  ๊ฒƒ๋“ค)

openweathermap api

  • https://openweathermap.org/api/one-call-3
  • ๊ตฌ๋…ํ•œ ์ดํ›„์— ๋ฐ”๋กœ ํ—ˆ๊ฐ€๊ฐ€ ์•ˆ๋˜๋Š” ๋ฌธ์ œ๊ฐ€ ์žˆ์Œ..
  • ๊ธฐ์กด ์ฝ”๋“œ๋ฅผ 2.5 -> 3.0 ์œผ๋กœ ๋ฐ”๊ฟ”์•ผ ํ•จ

ํ€ด์ฆˆ

  • Alt text
  • Alt text

DAG ์ž‘์„ฑ ๊ณผ์ œ

Code : GitHub Link

UpdateSymbol_v2์˜ Incremental Update ๋ฐฉ์‹ ์ˆ˜์ •ํ•ด๋ณด๊ธฐ

  • ์•ž์„œ ๋ฐฐ์šด ROW_NUMBER ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•ด์„œ Primary key๊ฐ€ ๋™์ผํ•œ ๋ ˆ์ฝ”๋“œ๋“ค์„ ์ฒ˜๋ฆฌํ•˜๊ธฐ

    1
    2
    3
    4
    5
    6
    7
    
    alter_sql = f"""DELETE FROM {schema}.{table};
        INSERT INTO {schema}.{table}
        SELECT date, "open", high, low, close, volume FROM (
            SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
            FROM t
        )
        WHERE seq = 1;"""
    
  • Incremental Update๊ฐ€ ์ง„ํ–‰๋๋Š”์ง€ ํ™•์ธํ•˜๊ธฐ ์œ„ํ•ด, ์–ด์ œ ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•œ stock_info_v2์˜ ๋ ˆ์ฝ”๋“œ๋ฅผ ๋ณต์‚ฌํ•ด์™”๊ณ , created_date๋ฅผ ์–ด์ œ๋กœ ์ง€์ •ํ–ˆ๋‹ค.

    Alt text

  • Update ์ด์ „

    Alt text

  • Update ์ดํ›„

    ์ƒˆ๋กœ ๋ ˆ์ฝ”๋“œ๊ฐ€ ํ•˜๋‚˜ ์ถ”๊ฐ€๋์œผ๋ฉฐ, created_date๊ฐ€ DAG ์‹คํ–‰ ์‹œ๊ฐ„์œผ๋กœ ๋ณ€๊ฒฝ๋œ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

    Alt text

gcp sdk ํ™œ์šฉํ•ด์„œ ์„œ๋ฒ„<->๋กœ์ปฌ ํŒŒ์ผ ํ†ต์‹ 

1
gcloud compute scp {option} {from_path} {to_path}

๋‹ค์šด๋กœ๋“œ (์„œ๋ฒ„ -> ๋กœ์ปฌ)

  • ์„œ๋ฒ„์ชฝ ํด๋” ์•ˆ์˜ ๋ชจ๋“  ํŒŒ์ผ์„ ์ „๋ถ€ ๋‹ค์šด๋กœ๋“œ
1
gcloud compute scp --recurse "airflow-test":/var/lib/airflow/dags ~/github-repo/dags

์—…๋กœ๋“œ (๋กœ์ปฌ -> ์„œ๋ฒ„)

  • ๊ถŒํ•œ์ด ์žˆ์–ด์•ผ ์—…๋กœ๋“œ ๊ฐ€๋Šฅ -> root@
  • ๋กœ์ปฌ ํŠน์ • ํŒŒ์ผ์„ ์„œ๋ฒ„์ชฝ ํด๋” ์•ˆ์œผ๋กœ ์—…๋กœ๋“œ
1
gcloud compute scp ~/github-repo/dags/{filename} root@"airflow-test":/var/lib/airflow/dags
  • airflow ์œ ์ €์— ์—…๋กœ๋“œํ•œ ํŒŒ์ผ์— ๋Œ€ํ•œ ๊ถŒํ•œ์ด ์—†์–ด์„œ, chmor 664 {filename} ์œผ๋กœ ์ˆ˜์ • ๊ถŒํ•œ ๋ถ€์—ฌํ•จ

vscode remote ssh ๊ด€๋ จ ๋ฌธ์ œ

  • GCP Vm instance์˜ RAM์€ 2GB์ธ๋ฐ, ์ด ์ค‘ airflow ํ”„๋กœ์„ธ์Šค๊ฐ€ 1.3GB์ •๋„๋ฅผ ์ฐจ์ง€ํ•จ
  • vscode๋ฅผ ์œ„ํ•œ ssh server๋„ 1GB์ •๋„ ํ•„์š”ํ•จ
  • ๊ทธ๋ž˜์„œ vscode remote ssh๋กœ ์—ฐ๊ฒฐํ•˜๊ณ  ์žˆ์œผ๋ฉด ์ž๊พธ ๋ฉˆ์ถ”๊ณ  airflow ์›น ์„œ๋ฒ„๊ฐ€ ์ž˜ ์•ˆ๋Œ์•„๊ฐ€๋Š” ๋“ฑ ๋ฌธ์ œ๊ฐ€ ์žˆ์—ˆ๋˜ ๊ฒƒ
  • RAM ์ฆ์„คํ•˜๊ธฐ๋กœ ๊ฒฐ์ • (์–ด์ฐจํ”ผ ์˜ค๋ž˜ ์‚ฌ์šฉํ•  ๊ฒƒ ์•„๋‹ˆ๊ณ  ๋ฌด๋ฃŒ ํฌ๋ ˆ๋”ง์ด๋ผ ๊ดœ์ฐฎ)

โ— ๋Š๋‚€ ์ 

  • ์•Œ๊ณ ๋ฆฌ์ฆ˜ ๋ฌธ์ œ ํ’€ ๋•Œ ํ•จ์ˆ˜๋ณ„๋กœ ๋‚˜๋ˆ„๋Š” ์Šต๊ด€, ์ฃผ์„ ์ ๋Š” ์Šต๊ด€ ๋งŒ๋“ค์–ด๋ณด๊ธฐ
  • ๋‹ค์Œ ์งˆ๋ฌธ์— ๋Œ€ํ•ด ์ƒ๊ฐ/์ฐพ์•„๋ณด๊ธฐ
    • DAG์—์„œ ๊ฐ™์€ ํ•จ์ˆ˜, task๋ฅผ ์—ฌ๋Ÿฌ๋ฒˆ ํ˜ธ์ถœ ํ•  ์ˆ˜ ์žˆ๋Š”๊ฐ€
    • @task ์™ธ์— ๋‹ค๋ฅธ decorator ์žˆ๋Š”์ง€ ์ฐพ์•„๋ณด๊ธฐ
    • ๊ณผ์ œ์—์„œ ๊ธฐ์กด created_date๋ฅผ ๊ฐ€์ ธ์˜ค์ง€ ์•Š๊ณ  ์ƒˆ๋กœ ๋ ˆ์ฝ”๋“œ๋ฅผ ๋งŒ๋“ค ๋•Œ ์ƒ์„ฑํ•˜๋Š” ์ด์œ ๋Š” ๋ญ˜๊นŒ?
Hugo๋กœ ๋งŒ๋“ฆ
Jimmy์˜ Stack ํ…Œ๋งˆ ์‚ฌ์šฉ ์ค‘