Celery 异步任务全生命周期调用链

最后更新:2026-03-26

生产环境下一个异步任务从“出生”到“消亡”的完整旅程:由客户端触发,进入 Broker 队列, 被 Worker 执行并写入 ResultBackend,最终由客户端查询结果。

Client Broker Worker ResultBackend

Client / Producer

任务定义 @app.task 装饰普通函数

将业务函数注册为 Celery Task,生成唯一任务名。

任务调用 task.delay() / apply_async()

序列化任务名称、参数、ETA、重试策略等元数据。

立即返回 异步解耦

客户端立刻返回 AsyncResult,不阻塞主业务。

Broker / Queue

消息发送 Client -> Broker

消息进入 RabbitMQ / Redis,路由到指定队列。

消息排队 Queue 缓冲

Broker 维护队列顺序,等待 Worker 领取。

Worker / Consumer

消息获取 prefetch 拉取任务

Worker 常驻 TCP 连接,轮询获取可执行消息。

任务执行 反序列化 + 运行函数

执行任务逻辑,处理重试、超时、异常策略。

状态落盘 SUCCESS / FAILURE

执行完成后写入结果后端,或更新重试状态。

ResultBackend

结果存储 Redis / DB

保存状态、返回值、异常堆栈等。

结果查询 AsyncResult.get()

客户端使用任务 ID 查询状态与结果。

生命周期结束 过期清理

结果过期或被清理,任务生命周期结束。

1
任务定义

把一个普通函数用 @app.task 标记后,Celery 会把它“登记成任务”:给它分配一个唯一的任务名(后续靠任务名找到并执行它)。同时也会记录“发消息的默认规则”:当你调用 delay/apply_async 时,要把任务名、参数、task_id、倒计时/ETA、以及需要的元信息打包成一条可传输的字符串(常见是 JSON)。另外还会记录“发到哪里”:默认走哪个 queue、使用什么 routing_key(或在代码里显式指定),这样 Broker 才知道把消息放进哪个队列,Worker 才会从对应队列里取到并执行。最后再记录任务在执行生命周期里没有成功完成即任务失败时怎么处理(重试次数/间隔、超时等)。

2
任务调用

调用 delay/apply_async:把“执行意图”封装成异步指令——任务名、Args/Kwargs、task_id、routing_key、ETA/countdown 等元数据;再通过 Kombu 将消息非阻塞投递到 Broker。调用会立刻返回 AsyncResult(追踪句柄,持有 task_id),之后可用它查询状态(PENDING/STARTED/SUCCESS/FAILURE)或取结果;同时 apply_async 还能在调用时覆盖默认路由(指定 queue/priority 等),把同一任务按场景分发到不同 Worker 集群。

3
投递到 Broker:RabbitMQ / Redis

生产者把消息推给 Broker:RabbitMQ 遵循 AMQP,把消息先投递到 Exchange(默认 celery),再由 Exchange 按 routing_key + binding 规则决定进入哪个 Queue;Redis 没有 Exchange 层,通常把“队列名”直接映射为一个 List Key,消息会直接写入该 Key 对应的列表。

4
RabbitMQ:交换机路由 (Exchange → Queue)

核心动作:队列(Queue)主动绑定(Binding)到交换机(Exchange),并在绑定时写下自己的过滤规则(Routing Key)。

  • 工作流:
    • Producer 发消息到 Exchange,并给消息贴一个 routing_key
    • Exchange 遍历自己所有 Binding(哪些 Queue 绑定了我、各自写了什么规则)
    • 对比“消息的 key”和“绑定的 key”,匹配成功就把消息投递到对应 Queue
  • 不同 Exchange 的绑定匹配方式
    • Direct:严格匹配。例:Queue A 绑定 key=error,只有消息 key=error 才进入
    • Topic:通配匹配。例:绑定 stock.#,可匹配 stock.usd/stock.cn
    • Fanout:忽略 key。只要绑定了,就给所有绑定队列各发一份拷贝(广播)
5
Redis:入队与阻塞消费

Redis 用 List 模拟队列:Producer 执行 LPUSH 把消息塞到列表左端;Worker 执行 BRPOP 阻塞式从右端弹出消息(队列为空时连接挂起,不消耗 CPU;有新消息时立刻被“弹醒”)。因为没有 Exchange,路由通常在客户端侧就决定好写入哪个 List Key。

6
Broker 存储
  • 存储架构(内存 vs 分层)
    • Redis:消息就是 List 里的元素,主要驻留在 RAM,读写极快,但堆积大了会吃满内存,极端会 内存溢出OOM。
    • RabbitMQ:同时管理 内存 + 磁盘。消息可写入磁盘 segment 文件并维护索引;堆积很大时会触发 paging,把部分消息从内存换到磁盘,用“变慢”换“服务不挂”。
  • 持久化策略
    • Redis
      • RDB:定时快照(像“拍照”)。间隔内断电可能丢最近一段时间的新任务。
      • AOF:追加写命令(像“记账”)。高频 LPUSH 会让日志迅速膨胀,磁盘 IO 压力变大,极端会拖慢 Redis。
    • RabbitMQ
      • delivery_mode=2(persistent):消息到达 Exchange/Queue 后,RabbitMQ 不只放内存(默认(非持久化)),而是会把该消息写入磁盘(segment 文件);必要时调用 fsync 强制把 OS 缓存刷到物理磁盘。只有在“已经安全落盘”之后,RabbitMQ 才会向生产者确认收到——因此断电重启后还能从磁盘把未完成消息恢复回来。代价是吞吐下降(磁盘 IOPS 限制)。在 Celery 中通常可通过 app.conf.task_default_delivery_mode = 'persistent' 开启。
      • watermark + paging:内存逼近水位线时强制刷盘,只在内存保留必要索引,避免内存中积压的消息太多导致内存溢出 OOM。
  • 总结建议
    • Redis:像高速缓存区——适合高频轻量、偶发丢几条可接受的任务(验证码/通知等)。
    • RabbitMQ:像专业仓库——适合波动大、关键任务“绝对不能丢”的场景(订单/计费等,建议开启持久化)。
7
分发与背压

分发:Broker 把队列里的消息“交给”哪些 Worker、以什么节奏交付。背压:当 Worker 忙/慢时,限制消息从 Broker(中间件)进入 Worker(执行进程)内存的速度,避免内存被撑爆或延迟失控。

  • RabbitMQ:Push(主动推)+ 配额控制(QoS/Prefetch)
    • 机制:Worker 建连后声明 prefetch_count(预取数量),RabbitMQ 会按配额把消息推过去。
    • 在途消息:推送给 Worker 但尚未 ACK 的消息属于 Unacked(在途)。
    • 例子prefetch=4 时,RabbitMQ 最多同时让该 Worker 持有 4 条未确认消息;只要 Worker ACK 了一条,才再补推下一条。
    • 背压效果:受控涌入。重任务(AI 推理/生图)常把 prefetch 设小(甚至 1);轻任务(发邮件)可设大提高吞吐。
  • Redis:Pull(按需拉)+ 自适应(BRPOP)
    • 机制:Worker 用 BRPOP 阻塞式拉取:有任务立即弹出;没任务就挂起等待,不忙轮询。
    • 背压效果:天然“按需取货”。Worker 处理慢 → 发起下一次拉取更晚;Worker 挂了 → 不再拉取,Broker 侧无需做复杂配额控制。
8
可靠性:ACK / 备份队列

目标:防止 Worker 中途挂掉导致“任务丢失”。两种 Broker 的实现思路不同:

  • RabbitMQ:内置 ACK
    • Unacked:消息投递给 Worker 后不会立刻删除,而是标记为 Unacked(待确认),对其他 Worker 不可见。
    • ACK 才删除:Worker 完成任务后发送 ACK,RabbitMQ 才真正从队列/磁盘中移除该消息。
    • 断连即重投:若 Worker 崩溃导致 TCP 连接断开且未 ACK,RabbitMQ 会把消息状态从 Unacked 立刻改回 Ready,重新排队并投递给其他健康 Worker(Redelivery)。
  • Redis:备份队列 + 超时捞回
    • 原子位移:因为直接弹出会“消失”,Celery 往往用 RPOPLPUSH(或新版 BLMOVE)把消息从主队列原子移动到“处理中/备份队列”(常见 key 形如 unacked:*)。
    • 完成后删除备份:Worker 真正执行完成后,再把备份队列里的那条记录删掉,相当于“确认”。
    • visibility_timeout:Redis 不能像 RabbitMQ 一样实时感知断连,Celery 依赖超时(例如 1h)判断“处理太久=可能挂了”,再把备份队列里的消息捞回主队列实现重投(恢复速度取决于超时配置)。
  • 核心差异:RabbitMQ 是“实时感应(断连触发)+ 立即重投”;Redis 是“延迟感应(超时触发)+ 备份队列捞回”。
9
Worker 执行任务

这是任务的“高光时刻”:消息进入 Worker 后,会被还原为可执行的调用,并按以下阶段推进:

  • 1) 反序列化
    • 校验 content-type(如 application/json),把消息体解包为 args/kwargs
    • 根据任务名/任务ID定位本地函数引用;若代码版本不一致/任务未注册,可能报 KeyError
  • 2) 执行业务逻辑
    • 按并发模型运行:子进程 / 线程 / 协程
    • 超时控制:soft time limit 先抛 SoftTimeLimitExceeded 给清理机会;hard time limit 可能强制终止
    • 异常与重试:捕获异常准备 traceback;代码里调用 self.retry() 会按策略(如指数退避)重新投递带 ETA(预计到达时间) 的任务
  • 3) 结果落盘
    • 可选:如果你需要查询结果,Worker 会把返回值/异常信息写到 Result Backend,并更新状态为 SUCCESS/FAILURE
    • 如果业务不需要返回值,建议开启 task_ignore_result=True(或任务级 ignore_result=True),避免结果在 Backend 大量堆积
  • 4) ACK
    • Worker 向 Broker 确认“这条消息我已处理完”,Broker 才会彻底删除/释放这条消息
    • acks_late=True:执行完成后再 ACK(更可靠,Worker 中途挂了可触发重投);相对地“收到即 ACK”吞吐更高但更易丢
10
查询与清理

当你调用 AsyncResult.get() 时,本质是在“查任务结果 + 等它就绪”,以及在结果写入后“按策略回收”。

  • 客户端查询:get() 的两种模式
    • 轮询:按 interval(默认约 0.5s)循环向 Backend 发起读取(例如 Redis 的 GET celery-task-meta-<task_id> 或查询 DB 记录)。
    • 控制参数timeout 防止无限等待;propagate 决定任务失败时是否把异常在客户端重新抛出。
    • 事件唤醒:以 Redis 为 Backend 时,可通过发布/订阅减少“频繁敲门”。客户端订阅与 task_id 相关的通知通道;Worker 写完结果后发布消息,客户端被瞬间唤醒,显著降低无效请求与 CPU 压力。
  • 结果清理:TTL 与自动回收
    • Redis:写结果时顺带设置 EXPIRE,由 result_expires 控制保留时间(默认通常是 1 天)。时间到后 Key 自动物理删除。
    • 数据库:没有行级 TTL。结果会一直存在,需要周期性清理:常见由 celery.backend_cleanup(通常配合 celery beat)定时执行删除过期行。
  • 核心配置与避坑
    • task_ignore_result:最关键优化。默认建议全局开启;只在确实需要返回值的任务上用 @app.task(ignore_result=False) 显式打开。
    • result_expires:结果保留时长(秒)。并发高时建议设短一些(例如 3600=1h)以降低堆积风险。
    • 经典坑:不启用 task_ignore_result=True,又从不调用 .get()/.forget(),结果会在 Backend 持续堆积直到过期,最终导致缓存/数据库被撑爆。
1
Q1:至少一次如何落地?请分别给出 RabbitMQ 与 Redis 的详细技术方案

A:目标是“消息至少成功处理一次”,允许重复,不允许静默丢失。关键是把 发送确认 → 存储持久化 → 消费确认 → 崩溃恢复 → 幂等去重 串成闭环。

  • RabbitMQ 方案
    • 生产端:开启 Publisher Confirm,发送成功以 broker confirm 为准,不以内存写入为准。
    • 队列端:队列建议设为 durable=true(RabbitMQ 重启后队列定义仍保留);同时消息设为 delivery_mode=2(消息持久化),两者都开启才具备“重启后可恢复”的基础。另一个关键点是队列类型:classic queue 更轻更快但高可用能力相对弱,quorum queue 会在多节点间复制队列日志(类似 Raft 的主从一致性思路),单点故障下更抗风险,适合订单/支付/计费等关键链路。
    • 消费端:关闭 auto-ack,使用手动 ACK;业务成功后 ACK,失败时 NACK(按策略 requeue 或送死信)。
    • 崩溃恢复:Worker 挂掉导致连接断开时,Unacked 消息自动回到 Ready 并重投给其他消费者。
    • 流控参数:设置合理 prefetch(重任务建议 1~4),避免单 Worker 堆积过多 Unacked。
  • Redis 方案
    • 生产端:写入主队列(List 或 Stream),生产成功以 Redis 写成功为准;持久化建议优先 AOF,并结合 fsync 策略做权衡:
      • appendfsync always:每次写命令都执行一次 fsync,数据安全性最高(几乎不丢),但磁盘 I/O 开销极大,吞吐容易受限。
      • appendfsync everysec:每秒执行一次 fsync,通常最多丢 1~2 秒数据,安全与性能折中,生产最常用。
      • appendfsync no:交由操作系统决定刷盘时机,性能最好,但可能丢几十秒数据,不建议关键链路使用。
    • 消费端(List 模式):用 BRPOPLPUSH/BLMOVE 将消息从 main 原子移动到 processing(处理中队列),避免“弹出即丢”。
    • 消费端(Stream 模式):使用 Consumer Group 的 XREADGROUP + XACK,未确认消息进入 PEL(pending entries list)。
    • 超时回收:后台巡检 processing/PEL,超过 visibility_timeout 的消息回捞到主队列(或转移给其他消费者)。
    • 持久化权衡:RDB 可能丢快照窗口数据;AOF 更可靠但写放大更高,需压测磁盘与恢复时间。
  • 重复消费与幂等(两者通用)
    • 每条消息携带 Business_ID(业务主键),消费前先落“去重记录”(唯一索引冲突即判重)。
    • 采用状态机门禁:仅允许状态合法迁移(如“待支付→已支付”),非法迁移直接幂等返回。
    • 外部副作用(扣款、发券、回调)必须附带幂等 token,避免下游重复执行。
2
Q2:死信队列如何设计与处理?请分别给出 RabbitMQ 与 Redis 的详细方案

A:死信队列不是“失败终点”,而是“故障隔离 + 诊断补偿入口”。目标是:主链路持续可用,异常消息可追踪、可回放、可审计。

  • RabbitMQ 方案
    • 触发条件
      • 1. 当消费者拿到消息发现消息逻辑错误,消费者发送一个 Basic.Reject 或 Basic.Nack 指令给 RabbitMQ,如果设置 requeue=true,消息会重新回到原队列排队(可能导致死循环);如果设置 requeue=false,RabbitMQ 就会把这条消息丢进死信交换机。
      • 2. 给消息或者队列设置一个 TTL消息,如果一条消息在队列里躺了太久还没有消费,一旦到期,它不会被物理删除,而是被投递到死信队列。
      • 3. 队列可以设置 x-max-length,当队列已经满了,又有新消息进来时,RabbitMQ 会根据策略把最前面(最老)的消息踢出去,被踢出的消息如果配置了死信交换机,就会进入死信队列,而不是直接消失。
    • 拓扑
      • 第一层:业务层
        • 准备业务 Exchange + 业务 Queue(消费者平时从这里消费)。
        • 在业务 Queue 的参数里声明 x-dead-letter-exchange(必填,指定“消息出事后要转投到哪个交换机”)。
        • 可选声明 x-dead-letter-routing-key(选填,指定“转投时改用哪个路由键”;不填则沿用原路由键)。
      • 第二层:死信层
        • 准备独立的死信 Exchange(DLX)和死信 Queue(DLQ)。
        • 把 DLQ 用指定 routing key 绑定到 DLX(确保“死信交换机后面真的有桶接住消息”)。
    • 运维流程
      • DLQ 仅存放失败消息,不直接业务消费;
      • 运维/平台任务读取 DLQ,按错误类型分流:数据修复、代码修复、人工审批后回放;
      • 回放时必须限流并记录 replay_id,避免“故障放大 + 重复污染”。
  • Redis 方案
    • 落死信方式:消费异常达到阈值后,将消息写入 dead_letter_key(List/Stream/ZSet 均可)。
    • 死信元数据:建议包含 business_idretry_countfirst_fail_atlast_errororigin_queuetrace_id
    • 处理流
      • 补偿服务订阅 dead_letter_key;
      • 按错误类别执行修复/重试/人工介入;
      • 成功后写审计日志并从死信结构移除。