airflow调度和触发是通知DAG周期性运行的机制。
schedule_interval参数是调度和触发的参数,有cron表达式和 datetime.timedelta
对象,另外也可以用preset。这几种的对照表如下:
preset | 含义 | cron字符串 |
---|---|---|
None | 没有调度器 | |
@once | 只调度一次 | |
@hourly | 一个小时调度一次,而且是整小时 | 0 * * * * |
@daily | 每天一次,而且是凌晨0点 | 0 0 * * * |
@weekly | 一周一次,星期天晚上0点 | 0 0 * * 0 |
@monthly | 一个月一次,每个月第一天的0点 | 0 0 1 * * |
@yearly | 每年一次,1月1日0点 | 0 0 1 1 * |
注意:如果不想调度,用schedule_interval=None
不是schedule_interval='None'
。
Airflow DAG的start_date(开始日期),end_date(结束日期),schedule_interval(调度间隔)定义了一系列时间间隔,调度器把DAG按这些间隔执行。调度器会检查DAG过去哪些间隔没有跑,这个机制是Catchup。
可以使用dag.catchup = False
或者配置文件中设置catchup_by_default = False
关掉Catchup。调度器只会触发当前之间之后的DAG任务。
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime(2015, 12, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'schedule_interval': '@daily',
}
dag = DAG('tutorial', catchup=False, default_args=default_args)
注意
- DAG Run是基于最小start_date来创建DAG的任务。
- 调度器基于DAG的schedule_interval依次创建后面的DAG Runs。
- 当清除一些任务状态让任务重跑时,记住任务状态非常重要,因为调度器会检查触发任务后才会触发任务。
网友评论