Day 45 Airflow(4)

๐Ÿ“‹ย ๊ณต๋ถ€ ๋‚ด์šฉ

OLTP ํ…Œ์ด๋ธ” ๋ณต์‚ฌํ•˜๊ธฐ

  • Production MySQL Tables (OLTP) -> AWS Redshift (OLAP)

  • MySQL Tables(Source) -> Airflow Server
    1. COPY Command : -> S3(Cloud Storage) -> Data Warehouse
    2. INSERT Command : -> Data Warehouse

์‹ค์Šต ๊ด€๋ จ ์„ค์ •

๊ถŒํ•œ ์„ค์ •

๋ฒ„ํ‚ท๊ณผ DB ์„œ๋ฒ„๋ฅผ ๊ฐ€์ง„ ๊ณ„์ •์—์„œ ์„ค์ •

1. Airflow DAG์—์„œ S3 ์ ‘๊ทผ (Write ๊ถŒํ•œ)

  • IAM User๋ฅผ ์ƒ์„ฑ
  • User์—๊ฒŒ S3๋ฒ„ํ‚ท์— ๋Œ€ํ•œ Read/Write ๊ถŒํ•œ์„ ์„ค์ •
  • User์˜ access key, secret key ์‚ฌ์šฉ

  • Custom Policy
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": ["s3:GetBucketLocation", "s3:ListAllMyBuckets"],
            "Resource": "arn:aws:s3:::*"
        },
        {
            "Effect": "Allow",
            "Action": "s3:*",
            "Resource": [
                "arn:aws:s3:::grepp-data-engineering",
                "arn:aws:s3:::grepp-data-engineering/*"
            ]
        }
    ]
}

2. Redshift์—์„œ S3 ์ ‘๊ทผ (Read ๊ถŒํ•œ)

  • Redshift์—์„œ S3 ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋Š” Role ์ƒ์„ฑ ํ›„ Redshift์— ์ง€์ •

Airflow Connection ์„ค์ •

  • S3 : AWS type ์„ ํƒ ํ›„ access key id, secret access key ์ž…๋ ฅ (Extra์— ์ง€์—ญ ์ž…๋ ฅ)

  • MySQL : ํ˜ธ์ŠคํŠธ, schema, id/pw, port ์ž…๋ ฅ

DB Tables

MySQL(OLTP) Table

1
2
3
4
5
CREATE TABLE prod.nps (
    id INT NOT NULL AUTO_INCREMENT primary key,
    created_at timestamp,
    score smallint
);

Redshift(OLAP) Table

  • DAG ์‹คํ–‰ ์ด์ „์— ํ•ด๋‹น ํ…Œ์ด๋ธ”์„ ๋ฏธ๋ฆฌ ์ƒ์„ฑํ•ด์•ผ ํ•จ
1
2
3
4
5
CREATE TABLE {schema}.nps (
    id INT NOT NULL primary key,
    created_at timestamp,
    score smallint
);

Full Refresh

tasks in DAG

์ด๋ฏธ ์ž‘์„ฑ๋˜์–ด์žˆ๋Š” operator์˜ ํŒŒ๋ผ๋ฏธํ„ฐ๊ฐ’๋งŒ ์ž‘์„ฑํ•˜์—ฌ task๋ฅผ ์ƒ์„ฑ
Incremental Update๋„ ๊ฐ™์€ ๋ฐฉ์‹ ์‚ฌ์šฉ

1
2
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator

SqlToS3Operator

  • MySQL SQL ๊ฒฐ๊ณผ -> S3

  • ์ €์žฅ ์œ„์น˜ : s3://s3_bucket/s3_key (s3://grepp-data-engineering/{user_id}-nps)

  • Task code

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    mysql_to_s3_nps = SqlToS3Operator(
        task_id = 'mysql_to_s3_nps',
        query = "SELECT * FROM prod.nps",
        s3_bucket = s3_bucket,
        s3_key = s3_key, #{schema} - {table}
        sql_conn_id = "mysql_conn_id",
        aws_conn_id = "aws_conn_id",
        verify = False,
        replace = True, #ํŒŒ์ผ์ด ์ด๋ฏธ ์กด์žฌํ•˜๋ฉด ๋ฎ์–ด์”€
        pd_kwargs={"index": False, "header": False}, #index, header๋ฅผ ์ œ์™ธํ•˜๊ณ  copy
        dag = dag
        )
    

S3ToRedshiftOperator

  • S3 -> Redshift ํ…Œ์ด๋ธ” ( Redshift : {schema}.nps )

  • COPY command is used

  • Task code

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    s3_to_redshift_nps = S3ToRedshiftOperator(
        task_id = 's3_to_redshift_nps',
        s3_bucket = s3_bucket,
        s3_key = s3_key,
        schema = schema,
        table = table,
        copy_options=['csv'],   # file type : csv
        method = 'REPLACE',     # REPLACE : ํ…Œ์ด๋ธ”์ด ์ด๋ฏธ ์กด์žฌํ•˜๋ฉด ๋ฎ์–ด์”€ (full refresh)
        redshift_conn_id = "redshift_dev_db",
        aws_conn_id = "aws_conn_id",
        dag = dag
    )
    

Incremental Update

MySQL/PostgreSQL Table ์กฐ๊ฑด

  • created(timestamp) : Optional
  • modified(timestamp)
  • deleted(boolean) : ๋ ˆ์ฝ”๋“œ๋ฅผ ์‚ญ์ œํ•˜์ง€ ์•Š๊ณ  deleted = True

๊ตฌํ˜„ ๋ฐฉ์‹

1. ROW_NUMBER๋กœ ์ง์ ‘ ๊ตฌํ˜„

  • Redshift์˜ A ํ…Œ์ด๋ธ”์„ temp_A ํ…Œ์ด๋ธ”๋กœ ๋ณต์‚ฌ
  • MySQL์˜ A ํ…Œ์ด๋ธ” ๋ ˆ์ฝ”๋“œ ์ค‘ modified == execution_date(์ง€๋‚œ ์ผ)์ธ ๋ชจ๋“  ๋ ˆ์ฝ”๋“œ๋ฅผ temp_A๋กœ ๋ณต์‚ฌ
    • MySQL์— ๋‹ค์Œ ์ฟผ๋ฆฌ๋ฅผ ๋ณด๋‚ด๊ณ  ๊ฒฐ๊ณผ๋ฅผ ํŒŒ์ผ๋กœ ์ €์žฅ. S3๋กœ ์—…๋กœ๋“œํ•˜๊ณ  COPY ์ˆ˜ํ–‰
    1
    2
    3
    
    SELECT *
        FROM A
        WHERE DATE(modified) = DATE(execution_date)
    
  • temp_A์˜ ๋ ˆ์ฝ”๋“œ๋“ค์„ primary key๋ฅผ ๊ธฐ์ค€์œผ๋กœ ํŒŒํ‹ฐ์…˜ํ•œ ๋‹ค์Œ, modified ๊ฐ’์„ ๊ธฐ์ค€์œผ๋กœ DESC ์ •๋ ฌ
  • ์ผ๋ จ๋ฒˆํ˜ธ๊ฐ€ 1์ธ ๊ฒƒ๋“ค๋งŒ ๋‹ค์‹œ A๋กœ ๋ณต์‚ฌ

2. UPSERT ์‚ฌ์šฉ (์‹ค์Šต์—์„œ ์‚ฌ์šฉ)

S3ToRedshiftOperator๋กœ ๊ตฌํ˜„

  • query ํŒŒ๋ผ๋ฏธํ„ฐ
    1
    2
    3
    
    SELECT *
        FROM A
        WHERE DATE(modified) = DATE(execution_date)
    
  • method ํŒŒ๋ผ๋ฏธํ„ฐ : โ€œUPSERTโ€
  • upsert_keys ํŒŒ๋ผ๋ฏธํ„ฐ : Primary key
    • nps ํ…Œ์ด๋ธ”์ด๋ผ๋ฉด โ€œidโ€ ํ•„๋“œ๋ฅผ ์‚ฌ์šฉ

tasks in DAG

SqlToS3Operator

  • '{{ execution_date }}' ๋ฅผ ์ฟผ๋ฆฌ sql์— ๋„ฃ์Œ์œผ๋กœ์„œ airflow๊ฐ€ ๋„˜๊ฒจ์ฃผ๋Š” execution date ๊ฐ’์„ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ์Œ
  • ์—ฌ๊ธฐ์—์„œ modified๊ฐ€ ์•„๋‹ˆ๋ผ created_at์ธ ์ด์œ ๋Š” ์• ์ดˆ์— ํ…Œ์ด๋ธ”์— modified์ปฌ๋Ÿผ์ด ์กด์žฌํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    query = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')", # query ํŒŒ๋ผ๋ฏธํ„ฐ์— ์ƒˆ๋กœ ์ƒ์„ฑ๋œ ๋ฐ์ดํ„ฐ๋งŒ ๊ฐ€์ ธ์˜ค๋„๋ก sql์„ ์ž…๋ ฅ
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    sql_conn_id = "mysql_conn_id",
    aws_conn_id = "aws_conn_id",
    verify = False,
    replace = True,
    pd_kwargs={"index": False, "header": False},
    dag = dag
)

S3ToRedshiftOperator

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    redshift_conn_id = "redshift_dev_db",
    aws_conn_id = "aws_conn_id",
    method = "UPSERT",     # UPSERT ๋ฐฉ์‹์œผ๋กœ ํ…Œ์ด๋ธ”์„ ์ˆ˜์ •
    upsert_keys = ["id"],  # ์ด ํ…Œ์ด๋ธ”์˜ primary key๋ฅผ ๊ธฐ์ค€์œผ๋กœ upsert๋ฅผ ์ง„ํ–‰
    dag = dag
)

์‹ค์Šต ์ง„ํ–‰

  • Full Refresh ํ…Œ์ŠคํŠธํ•ด๋ณด๊ธฐ

    1
    
    airflow dags test MySQL_to_Redshift
    
    • ์‹คํ–‰๋˜๋Š” sql query
      1
      2
      3
      4
      5
      
      COPY imsolem1226.nps
      FROM 's3://grepp-data-engineering/imsolem1226-nps'
      credentials
      'aws_access_key_id=xxx;aws_secret_access_key=xxx'
      csv;
      
    • ์‹คํ–‰ ์„ฑ๊ณต ํ›„ Redshift์— ์ €์žฅ๋œ ๋ฐ์ดํ„ฐ ํ™•์ธ

  • Incremental Update (Upsert) ํ…Œ์ŠคํŠธํ•ด๋ณด๊ธฐ

    1
    
    airflow dags test MySQL_to_Redshift_v2
    
    • ์‹คํ–‰๋˜๋Š” sql query
      1
      2
      3
      4
      5
      6
      7
      8
      
      COPY #
      FROM 's3://grepp-data-engineering/imsolem1226-nps'
      credentials
      'aws_access_key_id=xxx;aws_secret_access_key=xxx'
      csv;
      -- ์ž„์‹œ ํ…Œ์ด๋ธ”๊ณผ id๊ฐ€ ๊ฐ™์€ ๋ฐ์ดํ„ฐ๋“ค(์ค‘๋ณต๋œ ๋ฐ์ดํ„ฐ๋“ค)์„ ๋ชจ๋‘ ์ง€์šฐ๊ณ , ์ž„์‹œ ํ…Œ์ด๋ธ”์˜ ๋ฐ์ดํ„ฐ๋งŒ ๋‹ค์‹œ ์ถ”๊ฐ€ํ•œ๋‹ค
      DELETE FROM imsolem1226.nps USING #nps WHERE nps.id = #nps.id;
      INSERT INTO imsolem1226.nps SELECT * FROM #nps;
      

Backfill ์‹คํ–‰

  • ๋ฐ์ดํ„ฐ๋ฅผ ์—ฌ๋Ÿฌ๋ฒˆ ๋‹ค์‹œ ์ฝ์–ด์™€์•ผ ํ•˜๋Š” ๊ฒฝ์šฐ ํ•œ๋ฒˆ์— ํ•˜๋‚˜์”ฉ vs. ํ•œ๋ฒˆ์— ์—ฌ๋Ÿฌ๊ฐœ์”ฉ
  1. ๋™์‹œ์— ์—ฌ๋Ÿฌ ์š”์ฒญ์ด ๋“ค์–ด๊ฐ€๊ฒŒ ๋˜๋ฉด ๋ฐ์ดํ„ฐ ์†Œ์Šค ์ชฝ์— ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒ
  • ๋ฐ์ดํ„ฐ ์ฝ๊ธฐ๋ฅผ ์ „๋‹ด์œผ๋กœ ํ•˜๋Š” worker๋ฅผ ๋งŒ๋“ค์–ด ๋‘๋Š” ๋ฐฉ์‹์œผ๋กœ ๋ถ„์‚ฐํ•˜์—ฌ ํ•ด๊ฒฐ
  • ๋ฐ์ดํ„ฐ ์“ฐ๊ธฐ๋Š” main(master?)์—์„œ ์ง„ํ–‰
  1. ์—ฌ๋Ÿฌ ๋‚ ์งœ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์˜ค๋Š” ํ”„๋กœ์„ธ์Šค๊ฐ€ ๋™์‹œ์— ์‹คํ–‰๋˜๋Š” ๊ฒฝ์šฐ, ๋ฐ์ดํ„ฐ ๋ฎ์–ด์“ฐ๊ธฐ ๋“ฑ ๋ฌธ์ œ ๋ฐœ์ƒ
  • ํ•˜๋‚˜์”ฉ ์‹คํ–‰ํ•˜๋Š”๊ฒƒ์ด ์•ˆ์ „ : max_active_runs(DAG parameter)๋ฅผ 1๋กœ ์„ธํŒ…(?)

Command Line

1
airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01
  • catchUp : True๋กœ
  • execution_date์„ ์‚ฌ์šฉํ•ด์„œ Incremental update๊ฐ€ ๊ตฌํ˜„๋˜์–ด ์žˆ์Œ
  • start_date๋ถ€ํ„ฐ ์‹œ์ž‘ํ•˜์ง€๋งŒ end_date์€ ํฌํ•จํ•˜์ง€ ์•Š์Œ
  • ์‹คํ–‰์ˆœ์„œ๋Š” ๋žœ๋ค (๋‚ ์งœ, ์‹œ๊ฐ„ ์ˆœ X)
  • ๋‚ ์งœ์ˆœ์œผ๋กœ ํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด DAG default_args์˜ depends_on_past๋ฅผ True๋กœ ์„ค์ •
    1
    2
    3
    
    default_args = {
        'depends_on_past': True,
        ...
    

How to Make Your DAG Backfill ready

๋ชจ๋“  DAG๊ฐ€ backfill์„ ํ•„์š”๋กœ ํ•˜์ง€๋Š” ์•Š์Œ

  • Full Refresh๋ฅผ ํ•œ๋‹ค๋ฉด backfill์€ ์˜๋ฏธ๊ฐ€ ์—†์Œ
  • Incremental Update๋ฅผ ํ•ด๋„, ๋งˆ์ง€๋ง‰ ์—…๋ฐ์ดํŠธ ์‹œ๊ฐ„ ๊ธฐ์ค€ backfill์„ ํ•˜๋Š” ๊ฒฝ์šฐ์—๋Š” execution_date์„ ์ด์šฉํ•œ backfill์€ ํ•„์š”ํ•˜์ง€ ์•Š์Œ (Data Warehouse ํ…Œ์ด๋ธ”์— ๊ธฐ๋ก๋œ ์‹œ๊ฐ„ ๊ธฐ์ค€)

backfill ๊ตฌํ˜„ ์ƒํ™ฉ ๋ฐ ์š”๊ฑด

  • ๋ฐ์ดํ„ฐ๊ฐ€ ๊ต‰์žฅํžˆ ์ปค์ง€๋ฉด backfill ๊ธฐ๋Šฅ ๊ตฌํ˜„์€ ํ•„์ˆ˜
  • airflow๋ฅผ ํ™œ์šฉํ•˜๋Š”๊ฒƒ์ด ๋งŽ์ด ๋„์›€๋จ
  • ๋ฐ์ดํ„ฐ ์†Œ์Šค๊ฐ€ backfill ๋ฐฉ์‹์„ ์ง€์›ํ•˜๋Š”๊ฒƒ์ด ์ œ์ผ ์ค‘์š”

backfill์„ ์–ด๋–ป๊ฒŒ ๊ตฌํ˜„ํ•  ๊ฒƒ์ธ๊ฐ€?

  • execution_date์„ ์‚ฌ์šฉํ•ด์„œ ์—…๋ฐ์ดํŠธํ•  ๋ฐ์ดํ„ฐ ๊ฒฐ์ •
  • catchup ํ•„๋“œ๋ฅผ True๋กœ ์„ค์ •
  • start_date/end_date์„ backfillํ•˜๋ ค๋Š” ๋‚ ์งœ๋กœ ์„ค์ •
  • DAG๋ฅผ ๊ตฌํ˜„ํ•  ๋•Œ execution_date์„ ๊ณ ๋ คํ•ด์•ผ ํ•˜๋ฉฐ, idempotent ํ•ด์•ผํ•จ

๐Ÿ‘€ย CHECK

(์–ด๋ ต๊ฑฐ๋‚˜ ์ƒˆ๋กญ๊ฒŒ ์•Œ๊ฒŒ ๋œ ๊ฒƒ ๋“ฑ ๋‹ค์‹œ ํ™•์ธํ•  ๊ฒƒ๋“ค)

โ— ๋Š๋‚€ ์ 

Hugo๋กœ ๋งŒ๋“ฆ
Jimmy์˜ Stack ํ…Œ๋งˆ ์‚ฌ์šฉ ์ค‘