20181014 Airflow tips

  1. DAG 中的参数 catchup,若设为 False,则启动后不会追溯之前的任务,如果 start time 在现在之前且符合至少一次任务,则会启动最近的一次任务;比如设定的时间为 "50 0 * * *",则是每天的 00:50 启动一次,
    • 假如现在是 2018-09-17 00:20:00,启动暂停的任务,会启动一个 Run id 为 scheduled__2018-09-15T00:50:00+00:00 的任务,因为在 2018-09-15 00:50:00 后过了一天,所以启动上一次的任务
    • 假如现在是 2018-09-17 00:51:00,则启动后会执行上一次,即 Run id 为 scheduled__2018-09-16T00:50:00+00:00 的任务
  2. 时间问题,可以设置默认时区,但是 templates 中返回的还是 UTC 时间,需要手动转换,所以还不如保持默认时区 UTC。当设置 start_time 的时区后,schedule_interval 的 cron 格式就是本地时区的时间了
  3. pyenv 与 Airflow 结合的问题。Airflow 中的 BashOperator 是通过 subprocess 实现的,其中传入的环境变量就是当前的运行时,所以无法和 pyenv 搭配使用,比如进到某个路径下自动用相应的 local env。如果调用的 Python 版本各不相同,暂时的解决方案是直接使用绝对路径的 Python
  4. if else 判断?BranchPythonOperator
  5. 如何传递 execution date?使用 templates,或 trigger 时设定 dag_run.conf
  6. 在 PythonOperator 中设 provide_context=True,则定义 callable 时加上 **kwargs,会自动填入 jinja templates 中的信息,如 ds 等,从而在代码中方便调用 kwargs['ds']
  7. 更新 DAG 后是否重新命名?建议 id 增加 v1,v2
  8. BashOperator 中如果直接指定的是可执行脚本文件,后面需要加一个空格, from Common Pitfalls
  9. 登录账号密码?Web Authentication,注意从一个数据库比如 MySQL 迁移到 PostgreSQL 而没有迁移数据时,需要重新设置账号密码
  10. 假如需要一个 dag 完成之后才进行下一时间的 dag,即同时只有一个 dag 在运行:
    • 可以试着看 wait_for_downstream 是否适用,
    • 或者用 ExternalTaskSensor. from this url;
    • 或者使用 xcom 保存 mutex 信息,和上一条原理相同,都是检测一个之前的 dag 正在运行的标志;from this url
    • 或者通过查询表 dag_run 中的相应条目
    • 或者更加直接地使用 max_active_run = 1
  11. how to kill task? 运行中任务点击 clear 是杀死任务,当前任务失败,并且整个 DAG 失败,后续 trigger rule 为 one_failed 的任务也不会触发;失败任务 clear 会导致重试
  12. 当需要一个 DAG 触发另一个 DAG,而不是将这两个连成一个大的 DAG 时,

    • 可以使用 TriggerDagRunOperator;即使被触发的 DAG 的时间小于设定的 start_date,也会执行;但如果被暂停了,就不会执行,会累计在队列中,此时开启暂停,会执行所有积累的任务
    • 传入 execution_date 时需要注意相同的 dag_id 的多个任务的 execution_date 不能相同,因为写入 table dag_run 时会报错,比如:sqlalchemy.exc.IntegrityError: (_mysql_exceptions.IntegrityError) (1062, “Duplicate entry ‘d9_subs-2018-09-28 07:44:00.000000’ for key ‘dag_id’”);这是因为 table dag_run 设定了 UNIQUE KEY dag_id (dag_id,execution_date), 所以同一 dag_id 的 execution_date 不能相同
    • 但不传入 execution_date 时,会导致被触发的 DAG 的实际执行时间是 timezone.utcnow(),而获取 execution_date 又很麻烦,比如需要继承并修改 template_fields 等。所以如果需要两个 DAG execution_date 相同,可以使用 PythonOperator,利用 provide_context 获取时间,然后直接调用 TriggerDagRunOperator 中使用的方法,如下:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      def trigger_external(**kwargs):
      trigger_dag(dag_id='external1',
      run_id='trig__' + timezone.utcnow().isoformat(),
      conf='null',
      execution_date=kwargs['execution_date'],
      replace_microseconds=False)

      trigger = PythonOperator(task_id='trigger',
      python_callable=trigger_external,
      provide_context=True)
  13. 可以在执行的 task 的 Web UI 中的 Rendered Template 中看到实际经过渲染的命令,可用于任务失败的排查,当然日志中也会打印执行的实际命令;也可以通过 CLI:airflow render dag_id task_id date,可用于执行前的命令验证

  14. Operator 的返回值(如果不是 None)会放在 XCom 中,底层是在数据库的 xcom table 中,所以不同的 task 之间传递数据,即使下游任务失败,重启下游后还能获得相同的值,不必重新执行上游;下游获取时默认 dag_id = self.dag_id, key = ‘return_value’ 和 current execution_date

jinja2 使用的 context 由 TaskInstance.get_template_context() 获得
需要关注的一些 variables:

  • dag 对应 class DAG
  • dag_run 对应 class DagRun
  • conf 对应 class AirflowConfigParser,是解析后的 airflow.cfg,可用 as_dict() 查看全部内容;注意这个不是 DagRun 的 conf
  • params = task.dag.params + task.params;每个 task 的 params 都是定义的 DAG + Operator 的 params,不能由上一个 task 设定 params 然后下一个 task 读取,即不能用 params 在 task 间传递值
  • ti 对应 TaskInstance

给 BashOperator 中的 bash_command 传值:

  • 可以使用 env,注意复制原有环境,然后再使用比如 $FOO;如果是调用脚本文件,且文件没有使用 Airflow 的 marcos,则只能用这种方法传入环境变量
  • 可以用 DAG user_defined_macros,利用 jinja templates;比 env 的方法好在 render 之后可以看到实际执行代码

以前用 bash 执行任务,因为是定时检测上游数据,希望一个日期的任务不要被多次触发,使用的方法是创建一个 mutex 文件标志当前日期文件正在运行中;在 Airflow 中,加入检测和执行是两个 DAG,也可以通过数据库,比如使用 xcom,在任务执行前检测是否已有任务正在执行并插入,执行完成后删除,但是这样假如任务失败就会使得 mutex 文件保留在数据库中,下一次执行的时候需要手动删除,这和在 bash 脚本执行的时候遇到的情况相同。
另一种方法就是利用 Airflow 中 dag_run 这张表,里面记录了 DAG 每次运行情况及状态,在触发 DAG 的时候可以设定其 run_id,将任务日期加在后面,然后每次检测 running 状态的 run_id 是否日期相同即可。这样就不用手动删除 mutex 记录。

而要直接操作数据库,就涉及到 Hook 和 Connection 了。直接在 Airflow 的 Web UI Connection 中设定相应的账号密码,则在代码中可以直接使用对应的 Hook conn_id 获取相应的数据库数据。