记一次Airflow任务丢失问题
描述问题
前置条件:
Airflow
调度 版本1.10.4
【比较老的版本】Airflow Scheduler
,Airflow Worker
,通信使用的Celery
模式是RabbitMQ
模式- EMR 环境
问题现象:
Airflow WebServer
展现, 可以看出4
个任务 一直处于queued
状态,不进入running
状态Airflow Scheduler
日志记录
[2022-06-13 22:20:09,899] {scheduler_job.py:934} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 4 task instances ready to be queued
[2022-06-13 22:20:09,899] {scheduler_job.py:961} INFO - DAG {{ DAG_ID }} has 0/10 running and queued tasks
[2022-06-13 22:20:09,899] {scheduler_job.py:961} INFO - DAG {{ DAG_ID }} has 1/10 running and queued tasks
[2022-06-13 22:20:09,900] {scheduler_job.py:961} INFO - DAG {{ DAG_ID }} has 2/10 running and queued tasks
[2022-06-13 22:20:09,900] {scheduler_job.py:961} INFO - DAG {{ DAG_ID }} has 3/10 running and queued tasks
[2022-06-13 22:20:09,908] {scheduler_job.py:1005} INFO - Setting the follow tasks to queued state:
<TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_001 }} 2022-06-13 13:20:00+00:00 [scheduled]>
<TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_002 }} 2022-06-13 13:20:00+00:00 [scheduled]>
<TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_003 }} 2022-06-13 13:20:00+00:00 [scheduled]>
<TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_004 }} 2022-06-13 13:20:00+00:00 [scheduled]>
[2022-06-13 22:20:09,916] {scheduler_job.py:1080} INFO - Setting the following 4 tasks to queued state:
<TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_001 }} 2022-06-13 13:20:00+00:00 [queued]>
<TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_002 }} 2022-06-13 13:20:00+00:00 [queued]>
<TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_003 }} 2022-06-13 13:20:00+00:00 [queued]>
<TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_004 }} 2022-06-13 13:20:00+00:00 [queued]>
// 下面的日志无关
....
{{ DAG_ID }}.{{ TaskInstanceID_001 }} 这些都是变量, 从日志可以看出 任务 进入了 scheduled
并 状态扭转为 queued
,但是就是死活不进入 running
状态。
排查过程
- 第一步 , 有理由怀疑
airflow worker
嗝屁了。 去查看了进程, 所有的airflow 的 worker
节点均正常。排除airflow worker
节点的问题。并查看了airflow 的 worker
日志确定了无问题。[XXXX@EMR-XXX]$ sudo systemctl list-units | grep airflow airflow-worker.service loaded active running Airflow celery worker daemon
- 第二步, 确定
airflow scheduler
的正常。第一步在抽取查看scheduler
日志的时候,也无的 error 和 warn 的日志错误记录。确定 scheduler 服务均正常。[XXXX@EMR-XXX]$ sudo systemctl list-units | grep airflow airflow-flower.service loaded active running Airflow celery flower daemon airflow-scheduler.service loaded active running Airflow scheduler daemon airflow-webserver.service loaded active running Airflow webserver daemon
- 第三步,确定
slots
和use slots
以及queued slots
的大小,发现也正常,均空闲。且整个集群都处于空间阶段。 - 第四步,综上可见,未找到明显问题,只能尝试重启
scheduler
组件, 让任务从新恢复调度,查看日志[2022-06-13 22:47:09,406] {scheduler_job.py:934} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 4 task instances ready to be queued [2022-06-13 22:47:09,406] {scheduler_job.py:961} INFO - DAG {{ DAG_ID }} has 0/10 running and queued tasks [2022-06-13 22:47:09,407] {scheduler_job.py:961} INFO - DAG {{ DAG_ID }} has 1/10 running and queued tasks [2022-06-13 22:47:09,407] {scheduler_job.py:961} INFO - DAG {{ DAG_ID }} has 2/10 running and queued tasks [2022-06-13 22:47:09,407] {scheduler_job.py:961} INFO - DAG {{ DAG_ID }} has 3/10 running and queued tasks [2022-06-13 22:47:09,424] {scheduler_job.py:1005} INFO - Setting the follow tasks to queued state: <TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_001 }} 2022-06-13 13:20:00+00:00 [scheduled]> <TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_002 }} 2022-06-13 13:20:00+00:00 [scheduled]> <TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_003 }} 2022-06-13 13:20:00+00:00 [scheduled]> <TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_004 }} 2022-06-13 13:20:00+00:00 [scheduled]> [2022-06-13 22:47:09,455] {scheduler_job.py:1080} INFO - Setting the following 4 tasks to queued state: <TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_001 }} 2022-06-13 13:20:00+00:00 [queued]> <TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_002 }} 2022-06-13 13:20:00+00:00 [queued]> <TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_003 }} 2022-06-13 13:20:00+00:00 [queued]> <TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_004 }} 2022-06-13 13:20:00+00:00 [queued]> [2022-06-13 22:47:09,455] {scheduler_job.py:1116} INFO - Sending ('{{ DAG_ID }}', '{{ TaskInstanceID_001 }}', datetime.datetime(2022, 6, 13, 13, 20, tzinfo=<Timezone [UTC]>), 2) to executor with priority 1 and queue default [2022-06-13 22:47:09,456] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', '....'] [2022-06-13 22:47:09,456] {scheduler_job.py:1116} INFO - Sending ('{{ DAG_ID }}', '{{ TaskInstanceID_002 }}', datetime.datetime(2022, 6, 13, 13, 20, tzinfo=<Timezone [UTC]>), 2) to executor with priority 1 and queue default [2022-06-13 22:47:09,456] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', '....'] [2022-06-13 22:47:09,456] {scheduler_job.py:1116} INFO - Sending ('{{ DAG_ID }}', '{{ TaskInstanceID_003 }}', datetime.datetime(2022, 6, 13, 13, 20, tzinfo=<Timezone [UTC]>), 2) to executor with priority 1 and queue default [2022-06-13 22:47:09,457] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', '....'] [2022-06-13 22:47:09,457] {scheduler_job.py:1116} INFO - Sending ('{{ DAG_ID }}', '{{ TaskInstanceID_004 }}', datetime.datetime(2022, 6, 13, 13, 20, tzinfo=<Timezone [UTC]>), 2) to executor with priority 1 and queue default [2022-06-13 22:47:09,457] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', '....'] ....
OK, 发现疑问了,当前有发送消息的日志记录【 scheduler -> celery -> worker 】
- 第五步, 所以进一步怀疑是否是
celery
出现问题了,我们使用的是RabbitMQ
模式,查阅Rabbit
节点状态,Web控制页面看见RabbitMQ
的节点异常,内存占用一会就到达了红线
。一会绿色
。去查看RabbitMQ
节点日志发现有警告确定了在********************************************************** *** Publishers will be blocked until this alarm clears *** ********************************************************* 2022-06-13 08:44:31.754 [warning] <0.3227.3> Resetting RabbitMQ management storage 2022-06-13 08:44:40.205 [info] <0.346.0> vm_memory_high_watermark clear. Memory used:2487980032 allowed:3167651430 2022-06-13 08:44:40.205 [warning] <0.344.0> memory resource limit alarm cleared on node rabbit@rabbitmq1 2022-06-13 08:44:40.205 [warning] <0.344.0> memory resource limit alarm cleared across the cluster
RabbitMQ
1节点有内存到达了高水位线,设法降低。默认值 0.4, 调高为 0.6。RabbitMQ节点正常。 - 第六步,重新观察 Flower 的 UI,发现, Celery worker 的节点均 online ,不像之前一会 online 一会 offline了,确定正常。
- 第七步,重置任务,再重新跑一遍,正常执行。【后续继续观察……】
总结
总体说来,这是由于 rabbitMQ 内存机制 vm_memory_high_watermark 导致的 客户端阻塞,导致的 celery 任务无法发出,airflow worker 节点未接收到任务导致的任务丢失问题。 【一会能发出、一会不能发出】=》如果降低到水位线一下或者释放内存了,客户端在那段时间不会被阻塞,当又达到水位线的时候,又阻塞了。
思考疑问
- 为什么 airflow scheduler celery 在发送消息,未发出或者发出有问题,不抛出异常,并记录下来,还是因为阻塞的,其实不抛出异常?这个需要看 celery 和 airflow 的源码确定问题根源。