记一次Airflow任务丢失问题

描述问题

前置条件:

  • Airflow 调度 版本 1.10.4 【比较老的版本】
  • Airflow SchedulerAirflow Worker ,通信使用的 Celery 模式是 RabbitMQ 模式
  • EMR 环境

问题现象:

  • Airflow WebServer 展现, 可以看出 4 个任务 一直处于 queued 状态,不进入 running 状态 task_in_queued
  • Airflow Scheduler 日志记录
[2022-06-13 22:20:09,899] {scheduler_job.py:934} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 4 task instances ready to be queued
[2022-06-13 22:20:09,899] {scheduler_job.py:961} INFO - DAG {{ DAG_ID }} has 0/10 running and queued tasks
[2022-06-13 22:20:09,899] {scheduler_job.py:961} INFO - DAG {{ DAG_ID }} has 1/10 running and queued tasks
[2022-06-13 22:20:09,900] {scheduler_job.py:961} INFO - DAG {{ DAG_ID }} has 2/10 running and queued tasks
[2022-06-13 22:20:09,900] {scheduler_job.py:961} INFO - DAG {{ DAG_ID }} has 3/10 running and queued tasks
[2022-06-13 22:20:09,908] {scheduler_job.py:1005} INFO - Setting the follow tasks to queued state:
	<TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_001 }} 2022-06-13 13:20:00+00:00 [scheduled]>
	<TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_002 }} 2022-06-13 13:20:00+00:00 [scheduled]>
	<TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_003 }} 2022-06-13 13:20:00+00:00 [scheduled]>
	<TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_004 }} 2022-06-13 13:20:00+00:00 [scheduled]>
[2022-06-13 22:20:09,916] {scheduler_job.py:1080} INFO - Setting the following 4 tasks to queued state:
	<TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_001 }} 2022-06-13 13:20:00+00:00 [queued]>
	<TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_002 }} 2022-06-13 13:20:00+00:00 [queued]>
	<TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_003 }} 2022-06-13 13:20:00+00:00 [queued]>
	<TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_004 }} 2022-06-13 13:20:00+00:00 [queued]>
// 下面的日志无关
....

{{ DAG_ID }}.{{ TaskInstanceID_001 }} 这些都是变量, 从日志可以看出 任务 进入了 scheduled 并 状态扭转为 queued ,但是就是死活不进入 running 状态。

排查过程

  • 第一步 , 有理由怀疑 airflow worker 嗝屁了。 去查看了进程, 所有的 airflow 的 worker 节点均正常。排除 airflow worker 节点的问题。并查看了 airflow 的 worker 日志确定了无问题。
    [XXXX@EMR-XXX]$ sudo systemctl list-units | grep airflow
    airflow-worker.service               loaded active running   Airflow celery worker daemon
    
  • 第二步, 确定 airflow scheduler 的正常。第一步在抽取查看 scheduler 日志的时候,也无的 error 和 warn 的日志错误记录。确定 scheduler 服务均正常。
    [XXXX@EMR-XXX]$ sudo systemctl list-units | grep airflow
    airflow-flower.service              loaded active running   Airflow celery flower daemon
    airflow-scheduler.service           loaded active running   Airflow scheduler daemon
    airflow-webserver.service           loaded active running   Airflow webserver daemon
    
  • 第三步,确定 slotsuse slots 以及 queued slots 的大小,发现也正常,均空闲。且整个集群都处于空间阶段。
  • 第四步,综上可见,未找到明显问题,只能尝试重启 scheduler 组件, 让任务从新恢复调度,查看日志
    [2022-06-13 22:47:09,406] {scheduler_job.py:934} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 4 task instances ready to be queued
    [2022-06-13 22:47:09,406] {scheduler_job.py:961} INFO - DAG {{ DAG_ID }} has 0/10 running and queued tasks
    [2022-06-13 22:47:09,407] {scheduler_job.py:961} INFO - DAG {{ DAG_ID }} has 1/10 running and queued tasks
    [2022-06-13 22:47:09,407] {scheduler_job.py:961} INFO - DAG {{ DAG_ID }} has 2/10 running and queued tasks
    [2022-06-13 22:47:09,407] {scheduler_job.py:961} INFO - DAG {{ DAG_ID }} has 3/10 running and queued tasks
    [2022-06-13 22:47:09,424] {scheduler_job.py:1005} INFO - Setting the follow tasks to queued state:
        <TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_001 }} 2022-06-13 13:20:00+00:00 [scheduled]>
        <TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_002 }} 2022-06-13 13:20:00+00:00 [scheduled]>
        <TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_003 }} 2022-06-13 13:20:00+00:00 [scheduled]>
        <TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_004 }} 2022-06-13 13:20:00+00:00 [scheduled]>
    [2022-06-13 22:47:09,455] {scheduler_job.py:1080} INFO - Setting the following 4 tasks to queued state:
        <TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_001 }} 2022-06-13 13:20:00+00:00 [queued]>
        <TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_002 }} 2022-06-13 13:20:00+00:00 [queued]>
        <TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_003 }} 2022-06-13 13:20:00+00:00 [queued]>
        <TaskInstance: {{ DAG_ID }}.{{ TaskInstanceID_004 }} 2022-06-13 13:20:00+00:00 [queued]>
    [2022-06-13 22:47:09,455] {scheduler_job.py:1116} INFO - Sending ('{{ DAG_ID }}', '{{ TaskInstanceID_001 }}', datetime.datetime(2022, 6, 13, 13, 20, tzinfo=<Timezone [UTC]>), 2) to executor with priority 1 and queue default
    [2022-06-13 22:47:09,456] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', '....']
    [2022-06-13 22:47:09,456] {scheduler_job.py:1116} INFO - Sending ('{{ DAG_ID }}', '{{ TaskInstanceID_002 }}', datetime.datetime(2022, 6, 13, 13, 20, tzinfo=<Timezone [UTC]>), 2) to executor with priority 1 and queue default
    [2022-06-13 22:47:09,456] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', '....']
    [2022-06-13 22:47:09,456] {scheduler_job.py:1116} INFO - Sending ('{{ DAG_ID }}', '{{ TaskInstanceID_003 }}', datetime.datetime(2022, 6, 13, 13, 20, tzinfo=<Timezone [UTC]>), 2) to executor with priority 1 and queue default
    [2022-06-13 22:47:09,457] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', '....']
    [2022-06-13 22:47:09,457] {scheduler_job.py:1116} INFO - Sending ('{{ DAG_ID }}', '{{ TaskInstanceID_004 }}', datetime.datetime(2022, 6, 13, 13, 20, tzinfo=<Timezone [UTC]>), 2) to executor with priority 1 and queue default
    [2022-06-13 22:47:09,457] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', '....']
    ....
    

OK, 发现疑问了,当前有发送消息的日志记录【 scheduler -> celery -> worker 】

  • 第五步, 所以进一步怀疑是否是 celery 出现问题了,我们使用的是 RabbitMQ 模式,查阅 Rabbit 节点状态,Web控制页面看见 RabbitMQ 的节点异常,内存占用一会就到达了红线。一会绿色。去查看 RabbitMQ 节点日志发现有警告
    **********************************************************
    *** Publishers will be blocked until this alarm clears ***
    *********************************************************
    2022-06-13 08:44:31.754 [warning] <0.3227.3> Resetting RabbitMQ management storage
    2022-06-13 08:44:40.205 [info] <0.346.0> vm_memory_high_watermark clear. Memory used:2487980032 allowed:3167651430
    2022-06-13 08:44:40.205 [warning] <0.344.0> memory resource limit alarm cleared on node rabbit@rabbitmq1
    2022-06-13 08:44:40.205 [warning] <0.344.0> memory resource limit alarm cleared across the cluster
    
    确定了在 RabbitMQ 1节点有内存到达了高水位线,设法降低。默认值 0.4, 调高为 0.6。RabbitMQ节点正常。
  • 第六步,重新观察 Flower 的 UI,发现, Celery worker 的节点均 online ,不像之前一会 online 一会 offline了,确定正常。
  • 第七步,重置任务,再重新跑一遍,正常执行。【后续继续观察……】

总结

总体说来,这是由于 rabbitMQ 内存机制 vm_memory_high_watermark 导致的 客户端阻塞,导致的 celery 任务无法发出,airflow worker 节点未接收到任务导致的任务丢失问题。 【一会能发出、一会不能发出】=》如果降低到水位线一下或者释放内存了,客户端在那段时间不会被阻塞,当又达到水位线的时候,又阻塞了。

思考疑问

  • 为什么 airflow scheduler celery 在发送消息,未发出或者发出有问题,不抛出异常,并记录下来,还是因为阻塞的,其实不抛出异常?这个需要看 celery 和 airflow 的源码确定问题根源。
comments powered by Disqus