Day 51 Airflow(6)

๐Ÿ“‹ย ๊ณต๋ถ€ ๋‚ด์šฉ

Airflow Docker ํ™˜๊ฒฝ์„ค์ •

docker-compose.yml

  • :- : if ์ฒ˜๋Ÿผ ์กฐ๊ฑด๋ฌธ ์—ญํ• 
1
2
3
4
environment:
  AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
  # ํ™˜๊ฒฝ๋ณ€์ˆ˜๊ฐ€ ์กด์žฌํ•˜์ง€ ์•Š๋Š”๋‹ค๋ฉด(์„ธํŒ…๋˜์ง€ ์•Š์•˜๋‹ค๋ฉด) :- ๋’ค์˜ ์ด๋ฆ„๋“ค์„ ํ•ด๋‹น ๋ณ€์ˆ˜์— ์ถ”๊ฐ€ํ•˜๋ผ๋Š” ์˜๋ฏธ
  _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- yfinance pandas numpy oauth2client gspread}
  • DATA_DIR ๋ณ€์ˆ˜ ํ™•์ธํ•˜๋Š” ๋ฐฉ๋ฒ•

    • Postgres Named Volume ์•ˆ์— ์ •๋ณด๊ฐ€ ์ €์žฅ๋จ
    • Web UI์—์„œ ํ™•์ธ ๋ถˆ๊ฐ€ํ•˜์ง€๋งŒ command๋ฅผ ํ†ตํ•ด ํ™•์ธ ๊ฐ€๋Šฅ
    1
    
    docker exec -it {airflow-scheduler-container-name} airflow variables get DATA_DIR
    

  • ํ™˜๊ฒฝ์„ค์ •๊ฐ’(Variables, Connections)์„ ๊ด€๋ฆฌ/๋ฐฐํฌํ•˜๋Š” ๋ฐฉ๋ฒ•

1
2
3
4
5
6
7
x-airflow-common:
  &airflow-common
  ...
  environment:
    &airflow-common-env
    AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
    AIRFLOW_CONN_TEST_ID: test_connection

+) ํ™˜๊ฒฝ ๋ณ€์ˆ˜๊ฐ€ ์•„๋‹Œ, credentials ์ „์šฉ์œผ๋กœ Secrets ๋ฐฑ์—”๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ๋„ ํ•จ - ๋งํฌ

ELT ๊ตฌํ˜„

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def get_Redshift_connection():
    hook = PostgresHook(postgres_conn_id = 'redshift_dev_db')
    return hook.get_conn().cursor()

def execSQL(**context):
    schema = context['params']['schema']
    table = context['params']['table']
    select_sql = context['params']['sql']

    ...

    cur = get_Redshift_connection()

    # drop table if exits, before CTAS
    sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};CREATE TABLE {schema}.temp_{table} AS """
    sql += select_sql
    cur.execute(sql)

    cur.execute(f"""SELECT COUNT(1) FROM {schema}.temp_{table}""")
    count = cur.fetchone()[0]
    if count == 0:
        raise ValueError(f"{schema}.{table} didn't have any record")

    # ์ค‘๋ณต ๋ ˆ์ฝ”๋“œ, Primary key ๋“ฑ ์—ฌ๋Ÿฌ๊ฐ€์ง€ ํ…Œ์ŠคํŠธ ์ง„ํ–‰ ๊ฐ€๋Šฅ
    # DBT๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ์ด๋Ÿฐ ์—ฌ๋Ÿฌ๊ฐ€์ง€ ํ…Œ์ŠคํŠธ๋ฅผ ์ง์ ‘ ์ž‘์„ฑํ•  ํ•„์š” ์—†์–ด์ง

    ...

# @once : ์ฃผ๊ธฐ์ ์œผ๋กœ ์‹คํ–‰๋˜์ง€ ์•Š๊ณ  ํ•œ๋ฒˆ๋งŒ ์‹คํ–‰๋จ
dag = DAG(
    dag_id = "Build_Summary",
    start_date = datetime(2021,12,10),
    schedule = '@once',
    catchup = False
)

execsql = PythonOperator(
    task_id = 'mau_summary',
    python_callable = execSQL,
    params = {
        'schema' : 'imsolem1226',
        'table': 'mau_summary',
        'sql' : """SELECT
  TO_CHAR(A.ts, 'YYYY-MM') AS month,
  COUNT(DISTINCT B.userid) AS mau
FROM raw_data.session_timestamp A
JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid
GROUP BY 1
;"""
    },
    dag = dag
)

CTAS SQL query to config

  • config/mau_summary.py
1
2
3
4
5
6
7
8
# Python dictionary ํ˜•ํƒœ๋กœ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด .py ํ™•์žฅ์ž๋ฅผ ๊ฐ€์ง€๊ฒŒ ๋จ
{
'table': 'mau_summary',
'schema': 'imsolem1226',
'main_sql': """SELECT ...;""",
'input_check': [ ],
'output_check': [ ],
}

์˜คํผ๋ ˆ์ดํ„ฐ ๊ตฌํ˜„

์™ธ๋ถ€ API ์—ฐ๋™ํ•˜์—ฌ DAG ์ž‘์„ฑ

๊ตฌ๊ธ€ ์‹œํŠธ

Slack

  • ์•ฑ ์ƒ์„ฑ ๋ฐ Webhook์œผ๋กœ ๋ฉ”์„ธ์ง€ ์ „์†ก
1
curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, Slack!"}' https://hooks.slack.com/services/{slack_webhook_url}

Airflow API & ๋ชจ๋‹ˆํ„ฐ๋ง

๐Ÿ‘€ย CHECK

(์–ด๋ ต๊ฑฐ๋‚˜ ์ƒˆ๋กญ๊ฒŒ ์•Œ๊ฒŒ ๋œ ๊ฒƒ ๋“ฑ ๋‹ค์‹œ ํ™•์ธํ•  ๊ฒƒ๋“ค)

โ— ๋Š๋‚€ ์ 

Hugo๋กœ ๋งŒ๋“ฆ
Jimmy์˜ Stack ํ…Œ๋งˆ ์‚ฌ์šฉ ์ค‘