
分类6.数据订阅 TMQ |篇章02 消费流程适用版本TDengine v3.xv3.3.x / v3.4.x | 最后更新2026-06-30理解 TMQ 消费的内部链路有助于诊断性能问题、设计可靠的消费应用。本文按 Subscribe → Assign → Poll → Process → Commit 五个阶段拆解。核心概念速查表概念说明Subscribe订阅 TopicAssign分区分配Fetch从服务端拉取消息Poll客户端循环调用拉取Process业务处理Commit提交位点Heartbeat心跳保活详细解析1. Subscribe 阶段Subscribe 流程 ① 客户端调用 subscribe([topic1, topic2]) ② 客户端向 MNode 注册 Consumer - 上报 group_id, consumer_id, 订阅的 Topic 列表 ③ MNode 加入 Consumer 到对应 Group ④ 触发 Rebalance - 重新计算分区分配 - 通知所有组员 ⑤ Consumer 收到分配方案 - 知道自己负责哪些 VGroup ⑥ 准备从各 VGroup 的对应 Offset 开始消费2. Assign 与 Rebalance分区分配算法Range / Round-Robin 等 示例 Topic 在 6 个 VGroup 有数据 Group 有 3 个 Consumer 分配 Consumer-1: VG1, VG2 Consumer-2: VG3, VG4 Consumer-3: VG5, VG6 当 Consumer 加入/离开 ① MNode 检测到变化 ② 发起 Rebalance - 收回所有分区 - 按新成员重新分配 - 通知所有 Consumer ③ Consumer 暂停 Poll ④ 提交未提交 Offset可配置 ⑤ 接收新分区 ⑥ 恢复 Poll Rebalance 期间 - 消费暂停短暂 - 可能重复消费未 Commit 部分 - 应用应设计幂等3. Fetch 阶段Fetch 从服务端拉取数据 Client VNode │ │ │── Fetch (offsetN) ───→│ │ │── 从 WAL 读取 N 之后的数据 ─┐ │ │← ────────── 返回 ─────────┘ │ │── 应用 Topic SQL 过滤 ────┐ │ │← ────── 过滤后数据 ─────────┘ │ │ │← ── Messages ──────────│ │ │ Fetch 特性 - 长轮询无数据时短暂等待 - 批量返回多条消息打包 - 每个 VGroup 独立 Fetch - 多 VGroup 并发拉取4. Poll 循环应用层 Poll 循环 while running: msgs consumer.poll(timeout1.0) if msgs is None: continue // 超时无消息 for msg in msgs: process(msg) if should_commit: consumer.commit() Poll 内部 ① 从各 VGroup 缓冲队列取消息 ② 缓冲不足 → 异步 Fetch 补充 ③ 返回当前可用的消息批 ④ 超时返回 None5. Topic SQL 过滤Topic 中的 SQL 在哪里执行 CREATE TOPIC topic_high_current AS SELECT * FROM meters WHERE current 100; 执行位置 ① 写入时数据写入 WAL未过滤 ② Fetch 时VNode 读取 WAL → 应用 SQL 过滤 → 返回过滤后数据 优势 - 网络传输已过滤数据节省带宽 - Consumer 端处理逻辑简化 - 多 Consumer 共享过滤计算6. Commit 与位点持久化Commit 流程 ① Consumer 调用 commit(offsets) ② 客户端组装 Commit 请求 ③ 发送到 MNode ④ MNode 持久化位点 - 写入元数据库 - 多副本同步 ⑤ 返回成功 Commit 类型 - Sync Commit等待服务端确认 - Async Commit发送后立即返回 - Auto Commit定期自动 Commit 位点存储 - 按 (group_id, topic, vgroup_id) 维度存储 - 仅存储最新位点不存历史 - Group 删除时位点清理7. 心跳保活Consumer 心跳 Consumer 定期向 MNode 发送 Heartbeat - 报告存活 - 上报消费进度可选 MNode 维护 Consumer 状态 - 收到 Heartbeat → 标记活跃 - 超过 session.timeout → 标记失联 - 失联触发 Rebalance 关键参数 heartbeat.interval.ms // 心跳间隔 session.timeout.ms // 失联阈值8. 异常处理常见异常及处理 ① 网络断开 - 客户端 SDK 自动重连 - 重连成功后继续 Poll ② Rebalance - Poll 短暂阻塞 - 注册 RebalanceCallback 处理状态 ③ 消息处理失败 - 不 Commit → 下次 Poll 重发 - 业务必须幂等 ④ Consumer 崩溃 - 心跳超时 → 触发 Rebalance - 其他 Consumer 接管分区 - 从最近 Commit 位置继续 ⑤ 服务端 WAL 滚动后丢失早期数据 - Consumer Offset 落后过多 - 错误offset out of range - 处理重置为 earliest 或调大 WAL 保留期代码示例完整消费骨架Pythonfromtaos.tmqimportConsumerdefprocess(msg):forblockinmsg:forrowinblock:handle_row(row)consumerConsumer({group.id:worker_group,auto.offset.reset:earliest,enable.auto.commit:false,session.timeout.ms:30000,})consumer.subscribe([topic_meters])try:whileTrue:msgconsumer.poll(timeout1.0)ifmsgisNone:continuetry:process(msg)consumer.commit()exceptExceptionase:log.error(fProcess failed:{e})# 不 commit → 下次重试finally:consumer.close()Rebalance 监听defon_assign(consumer,partitions):print(fAssigned:{partitions})defon_revoke(consumer,partitions):print(fRevoked:{partitions})consumer.commit()# 失去分区前提交consumer.subscribe([topic_meters],on_assignon_assign,on_revokeon_revoke)性能考量消费延迟分析阶段典型延迟写入到 WAL 可读 10msFetch 网络往返1~10ms过滤计算取决于 SQL 复杂度客户端处理业务决定Commit 持久化5~20ms高吞吐配置高吞吐场景 - 大批量 Fetchmax.poll.records 调大 - 异步 Commit - 增加 Consumer 数不超过 VGroup 数 - 处理逻辑异步化线程池 低延迟场景 - 短 timeout - 频繁 Poll - 同步 Commit 保证有序FAQQ1: 一次 Poll 返回多少消息由服务端 batch 决定通常几十~几千行。客户端可通过参数限制最大批量。Q2: Commit 失败怎么办捕获异常重试。重复 Commit 是幂等的多次 Commit 同一 Offset 无副作用。Q3: 同一消息能被多次 Commit 吗可以。Commit 只是更新位点最大值不影响消息本身。Q4: Consumer 长时间不 Commit 会怎样占用大量 WAL 空间无法清理Rebalance 后会重复消费很多数据建议至少定期 CommitQ5: 删除 Consumer Group 怎么做DROPCONSUMERGROUPgroup_idONtopic_name;参考系统构架篇01-《TDengine 整体架构全景》02-《集群拓扑深度解析》03-《MNode 内部机制深度解析》04-《RPC 通信层深度解析》05-《VNode 生命周期》06-《RAFT 共识协议》07-《端到端的消息流》数据模型01-《数据库创建与参数详解》02-《超级表/子表/普通表》03-《支持数据类型深度解析》04-《TDengine Tag 设计哲学与 Schema 变更机制》05-《TDengine 虚拟表实现原理》存储引擎01-《TDengine 存储引擎概览》02-《TDengine MemTable 深度解析》03-《TDengine WAL 预写日志机制》04-《TDengine 数据文件格式》05-《TDengine Commit 与 Flush 机制 》06-《TDengine Compaction 合并策略 》07-《TDengine 数据保留与 TTL》08-《TDengine 压缩编码机制》09-《TDengine Cache 与 Last 查询加速》10-《TDengine 逻辑计划生成》查询引擎01-《TDengine 查询引擎概览》02-《TDengine SQL 解析与词法分析》03-《TDengine 语义分析与 AST 重写》04-《TDengine 逻辑计划生成》05-《TDengine 物理计划生成》06-《TDengine 扫描算子》07-《TDengine 聚合算子》08-《TDengine 聚合算子》09-《TDengine 连接算子》10-《TDengine 排序、填充与投影》11-《TDengine 分布式查询执行》12-《TDengine EXPLAIN 与查询优化》数据写入01-《TDengine SQL INSERT》02-《TDengine 无模式写入》03-《TDengine STMT 写入》04-《TDengine 写入内部流程》05-《TDengine 数据更新删除》数据订阅01-《TDengine 数据订阅》02-《TDengine 订阅 vs Kafka》关于 TDengineTDengine 专为物联网IoT平台、工业大数据平台设计。其中TDengine TSDB 是一款高性能、分布式的时序数据库Time Series Database同时它还带有内建的缓存、流式计算、数据订阅等系统功能TDengine IDMP 是一款AI原生工业数据管理平台它通过树状层次结构建立数据目录对数据进行标准化、情景化并通过 AI 提供实时分析、可视化、事件管理与报警等功能。