airflow-分布式调度
我的问题:
公司数据部门频繁清洗数据中
- 脚本繁多, 前后依赖
- 监控缺失,脚本执行时间管理几乎为0,脚本优化无针对性目标
- 重试机制缺失,任务超时而下个周期又被触发
什么是airflow
基于python, 联合其他工具实现的的分布式任务调度解决方案。主要涉及工具:airflow, celery, db, mq
优点:
- DAG(有向无环图)任务依赖清晰,任务统计信息明确,UI展示出众🌟
- 配合celery实现分布式调度,并发控制、队列隔离task。。。且无需关注celery实现
- 丰富的任务定义组件
SSHOperator
,PostgresOperator
等等 简单易用 - 重试、超时机制
- 连接池
缺点:
- 官方UI,只支持UTC时间。展示不直观,非常难看
有经验的人也许会奇怪,celery本身就是个分布式任务队列了,那么airflow又干了啥。
举个例子吧,就像springboot与spring,发动机与汽车。你干的不错,但是我可以做到更丰富易用。
DAG
任务依赖 一览无余
执行时间 监控
并发控制(concurrency)
retry
web security
官方自带方案,不过暂时没管。超级用户一时爽,一直超级一直爽!!!
⚠️注意
任务实际开始时间 = start_date + sheduler_interval
最优方案
结合celery特性,worker模式
在实际执行命令的机器,部署worker。消费执行队列的消息。
优势:
BashOperator
直接调用(小声逼逼:尽量使用bash直接操作,避免bash调用http等外部服务)- 限制worker并发数
- 限制任务内存占用
QA
UI时间不能换时区问题
A:修改源码首页,登录后台页
log过大,导致磁盘空间不足
A:写个job定时清理log目录,log目录在
airflow.cfg
中有定义timeout作用
A:超时kill掉任务,在operator的
on_kill
方法实现。目前只支持bashOperator
(其他task始终保持running
)A: 超时结束工作流,这个始终有效。但是会出现一种情况,工作流结束,子任务是中
running
BashOperator
自动结束逻辑A:开启子进程执行脚本,保存子进程组id,根据id kill
sql timeout实现
alter告警发送到钉钉
A:默认发送邮件,自定义operator发送到钉钉