1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| def get_Redshift_connection():
hook = PostgresHook(postgres_conn_id = 'redshift_dev_db')
return hook.get_conn().cursor()
def execSQL(**context):
schema = context['params']['schema']
table = context['params']['table']
select_sql = context['params']['sql']
...
cur = get_Redshift_connection()
# drop table if exits, before CTAS
sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};CREATE TABLE {schema}.temp_{table} AS """
sql += select_sql
cur.execute(sql)
cur.execute(f"""SELECT COUNT(1) FROM {schema}.temp_{table}""")
count = cur.fetchone()[0]
if count == 0:
raise ValueError(f"{schema}.{table} didn't have any record")
# ์ค๋ณต ๋ ์ฝ๋, Primary key ๋ฑ ์ฌ๋ฌ๊ฐ์ง ํ
์คํธ ์งํ ๊ฐ๋ฅ
# DBT๋ฅผ ์ฌ์ฉํ๋ฉด ์ด๋ฐ ์ฌ๋ฌ๊ฐ์ง ํ
์คํธ๋ฅผ ์ง์ ์์ฑํ ํ์ ์์ด์ง
...
# @once : ์ฃผ๊ธฐ์ ์ผ๋ก ์คํ๋์ง ์๊ณ ํ๋ฒ๋ง ์คํ๋จ
dag = DAG(
dag_id = "Build_Summary",
start_date = datetime(2021,12,10),
schedule = '@once',
catchup = False
)
execsql = PythonOperator(
task_id = 'mau_summary',
python_callable = execSQL,
params = {
'schema' : 'imsolem1226',
'table': 'mau_summary',
'sql' : """SELECT
TO_CHAR(A.ts, 'YYYY-MM') AS month,
COUNT(DISTINCT B.userid) AS mau
FROM raw_data.session_timestamp A
JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid
GROUP BY 1
;"""
},
dag = dag
)
|