...
from train_model.main import run
local_tz = pendulum.timezone("Asia/Seoul")
...
dag = DAG(
dag_id='train_model_dag',
default_args=default_args,
schedule_interval='30 9 * * *', # 매일 09:30 실행
catchup=False
)
process_task = PythonOperator(
task_id='train_model',
python_callable=run,
dag=dag
)
위와 같이 간단한 dag을 구성 후 train_model의 run 함수에서 airflow에서 작업을 수행한 시간인 execution_date를 가져오고 싶었다.
간단하게 run함수에서 **kwargs를 받으면 쉽게 얻을 수 있다.
# train_model.py
def run(**kwargs):
execution_date = kwargs['execution_date']
# 이외에도 다양한 정보를 kwargs에서 꺼낼 수 있음
# run_id = kwargs['run_id']
# dag_run = kwargs['dag_run']
# etc...
...
주의할 점으로 execution_date는 datetime이며 UTC를 기준으로 하고, time zone 정보가 포함되어 있다.
따라서 KST를 위해 9시간을 더하고 time zone 정보를 제거하고 사용하는 것이 좋다.
import datetime
...
execution_date_utc = kwargs['execution_date']
execution_date_ktc = run_time_utc + datetime.timedelta(hours=9)
execution_date = execution_date_ktc.replace(tzinfo=None)
...
'Develop > Python' 카테고리의 다른 글
[Python] 기상청 API Hub의 날씨 데이터 수집 (0) | 2025.02.21 |
---|---|
[Python] Window에 pyenv 설치 (0) | 2025.01.02 |
[Error] / [Pyinstaller] xgboost XGBoostLibraryNotFound error (1) | 2024.11.29 |
[Python] matplotlib 한글 폰트 전역 설정 (0) | 2024.08.20 |
[Python] 리스트의 원소 곱 - reduce (0) | 2024.01.11 |