...

from train_model.main import run

local_tz = pendulum.timezone("Asia/Seoul")

...

dag = DAG(
    dag_id='train_model_dag',
    default_args=default_args,
    schedule_interval='30 9 * * *',  # 매일 09:30 실행
    catchup=False
)

process_task = PythonOperator(
    task_id='train_model',
    python_callable=run,
    dag=dag
)Copy Icon

 

위와 같이 간단한 dag을 구성 후 train_model의 run 함수에서 airflow에서 작업을 수행한 시간인 execution_date를 가져오고 싶었다.

 

간단하게 run함수에서 **kwargs를 받으면 쉽게 얻을 수 있다.

# train_model.py
def run(**kwargs):
	execution_date = kwargs['execution_date']
    
    # 이외에도 다양한 정보를 kwargs에서 꺼낼 수 있음
    # run_id = kwargs['run_id']
    # dag_run = kwargs['dag_run']
    # etc...
    
    ...Copy Icon

 

주의할 점으로 execution_date는 datetime이며 UTC를 기준으로 하고, time zone 정보가 포함되어 있다.

따라서 KST를 위해 9시간을 더하고 time zone 정보를 제거하고 사용하는 것이 좋다.

import datetime

...

execution_date_utc = kwargs['execution_date']
execution_date_ktc = run_time_utc + datetime.timedelta(hours=9)
execution_date = execution_date_ktc.replace(tzinfo=None)

...Copy Icon
욱근욱