airflow-分布式调度

我的问题:

公司数据部门频繁清洗数据中

  1. 脚本繁多, 前后依赖
  2. 监控缺失,脚本执行时间管理几乎为0,脚本优化无针对性目标
  3. 重试机制缺失,任务超时而下个周期又被触发

什么是airflow

​ 基于python, 联合其他工具实现的的分布式任务调度解决方案。主要涉及工具:airflow, celery, db, mq

优点:

  • DAG(有向无环图)任务依赖清晰,任务统计信息明确,UI展示出众🌟
  • 配合celery实现分布式调度,并发控制、队列隔离task。。。且无需关注celery实现
  • 丰富的任务定义组件SSHOperator,PostgresOperator 等等 简单易用
  • 重试、超时机制
  • 连接池

缺点:

  • 官方UI,只支持UTC时间。展示不直观,非常难看

有经验的人也许会奇怪,celery本身就是个分布式任务队列了,那么airflow又干了啥。

举个例子吧,就像springboot与spring,发动机与汽车。你干的不错,但是我可以做到更丰富易用。

快捷搭建

DAG

任务依赖 一览无余

执行时间 监控

并发控制(concurrency)

retry

web security

官方自带方案,不过暂时没管。超级用户一时爽,一直超级一直爽!!!

⚠️注意

任务实际开始时间 = start_date + sheduler_interval

最优方案

结合celery特性,worker模式

在实际执行命令的机器,部署worker。消费执行队列的消息。

优势:

  • BashOperator 直接调用(小声逼逼:尽量使用bash直接操作,避免bash调用http等外部服务
  • 限制worker并发数
  • 限制任务内存占用

QA

  1. UI时间不能换时区问题

    A:修改源码首页,登录后台页

  2. log过大,导致磁盘空间不足

    A:写个job定时清理log目录,log目录在airflow.cfg中有定义

  3. timeout作用

    A:超时kill掉任务,在operator的on_kill方法实现。目前只支持bashOperator(其他task始终保持running)

    A: 超时结束工作流,这个始终有效。但是会出现一种情况,工作流结束,子任务是中running

  4. BashOperator 自动结束逻辑

    A:开启子进程执行脚本,保存子进程组id,根据id kill

  5. sql timeout实现

    A: 修改conn hook源码

  6. alter告警发送到钉钉

    A:默认发送邮件,自定义operator发送到钉钉