官方默认设置了 18 哥延迟等级
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
发送延迟消息:按照默认顺序 1-18 数字就对应上面的延迟时间
Message msg = new Message (TOPIC, TAG, "OrderID199", "ok", getBytes(StandardCharsets.UTF_8));
//设置延迟等级
msg.setDelayTimeLevel(3);
producer.send(msg);
延迟消息都会被存储到 RocketMQ 的一个内部 Topic:SCHEDULE_TOPIC_XXXX 中
SCHEDULE_TOPIC_XXXX 共有 18 个 MessageQueue:
- 对应延迟消息的 18 个等级,根据指定的 DelayTimeLevel 来决定选择哪个 MessageQueue
- 有一个定时任务,每 100 ms 执行一次判断 SCHEDULE_TOPIC_XXXX Topic 中的 MessageQueue 的消息是否到达延迟时间
- 若到达延迟时间,将 SCHEDULE_TOPIC_XXXX 中的消息投递到消息最初需要投递的 Topic 之中
为什么不支持任意时间?
RocketMQ 并不支持任意时间的延迟,可能的主要原因就是如果提供任意时间,就会涉及到消息的排序,会有一定的性能损耗
RocketMQ 采用了 2PC 的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或失败的消息
第一阶段:
第二阶段:如果半事务消息投递成功,则会开始执行本地事务
分为如下三种 Case:
采用 2PC 两阶段设计:
将 Message 原本真实的 Topic 和 MessageQueue 进行备份
- 存入到 PROPERTY_REALTOPC、PROPERTY_REAL_QUEUE_ID 中
将消息投递到一个内部 Topic 中 RMQ_SYS_TRANS_HALF_TOPIC,该队列专门存储事务消息
所有的 Half Message 全部都写入到 queueId 为 0 的 MessageQueue,因为一个 Topic 下只有 1 个 MessageQueue:
- 这个 Topic 下的所有 Message 就是全局有序的,ta 们会按照先来后到的顺序被消费
如果本地事务执行成功进行 Commit,则将 RMQ_SYS_TRANS_HALF_TOPIC 队列中的消息投递到真实的 Topic 中,供后续流程执行
- 并删除这条 Half Message,但删除也是假删除,只是给 Message 打上一个删除的 tag
如果本地事务执行失败进行 rollback,则直接删除这条 Half Message,但删除也是假删除
如果本地事务吃吃没有返回结果(默认时间 6s),则会触发事务回查机制
- 执行回查之前需要校验检查次数是否达到了最大值(需要手动设置,没有默认值)
- 或者是当前 Half Message 存在是否超过了 Message 保存的上限,即 3 天
- 如果满足上面条件中的一种 Half Message 会被放进 TRANS_CHECK_MAX_TOPIC Topic 中
- 一旦判定为需要执行事务回查逻辑,那么当前这条 Half Message 就算已经被消费了
- 在没有达到最大的校验次数之前,都还需要将其投递到事务队列当中,以便下次重试时再次执行 Check 逻辑
- 如果回查成功,则删除投递的 Half Message
消息消费失败后,并不会立即重试,而是一个递增的时间间隔来进行重试的,重试次数默认为 16 次
只比延迟消息的时间间隔等级少了前两个,延迟消息总共有 18 个等级,而消息重试使用了延迟消息的第 3-18 等级
10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
重试的 Message,RocketMQ 的做法并不是将其投递回原来的 Topic,而是重试队列
每个 ConsumerGroup 都有自己的重试队列:
消费失败的 Message,Consumer 会将其投回 Broker:
RocketMQ 的混合性存储结构(多个 Topic 的消息实体内容都存储于一个 CommitLog中)
针对 Producer 和 Consumer 分别采用了数据和索引部分相分离的存储结构
Producer 发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 CommitLog 中
核心步骤:
commitLog 消息日志:
- 消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容
consumequeue 逻辑消费队列:
- 存储了 commitLog 的起始物理 offset,目的是提高消费的性能
indexFile 索引文件:
- 提供了一种可以通过 key 或者时间区间来查询消息的方法
consumequeue 文件:
consumequeue 文件采取定长设计,每一个条目共 20 个字节,分别为:
- 8 字节的 commmitLog 物理偏移量
- 4 字节 的消息长度
- 8 字节 tag hashcode
单个文件由 30w 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M
- 默认一个 topic 对应 4 个 queueId,即 4 个 messageQueue
每个 messageQueue 文件夹下有多个 consumeQueue,所以:messageQueue 1 :N consumeQueue
基本通讯流程如下:
为了实现客户端与服务器之间高效的数据请求与接收:
参与评论
手机查看
返回顶部