RocketMQ延迟消息简明介绍

RocketMQ延迟消息简明介绍

目录

前言

核心属性

RMQ_SYS_SCHEDULE_TOPIC

FIRST_DELAY_TIME

DELAY_FOR_A_WHILE

DELAY_FOR_A_PERIOD

delayLevelTable

offsetTable

核心方法

queueId2DelayLevel

delayLevel2QueueId

updateOffset

computeDeliverTimestamp

start()

shutdown()

load()

parseDelayLevel

前言

场景可以是这样的,双11抢手机,一个新手机4000-5000,到0点的时候,冲着兴奋劲,抢到了。但是摸了摸钱包,又冷静下来了,好像不是很必要换手机。就放在那里没有支付,过了30分钟,自动取消了。这里就是使用延迟消息的场景,当下单之后,向消息队列发送一条延迟30分钟消费的消息。等到30分钟过了,然后消费消息,执行检查任务,要是对应的订单支付了,就什么都不做,要是没支付,就取消订单。

RocketMQ的延迟消息是org.apache.rocketmq.broker.schedule.ScheduleMessageService类实现的

核心属性 RMQ_SYS_SCHEDULE_TOPIC

在之前的版本中叫SCHEDULE_TOPIC,是系统内置的Topic,用来保存所有的定时消息。没有执行的定时消息都会被保存在这个topic中。

FIRST_DELAY_TIME

第一次执行定时任务的延迟时间,默认是1秒。

private static final long FIRST_DELAY_TIME = 1000L; DELAY_FOR_A_WHILE

第二次以及之后每次定时任务执行的间隔时间,默认100ms。

private static final long DELAY_FOR_A_WHILE = 100L; DELAY_FOR_A_PERIOD

若是延迟消息投递失败,则在这个时间过后继续投递,默认10秒。

private static final long DELAY_FOR_A_PERIOD = 10000L; delayLevelTable

这是保存延迟级别和延迟时间映射关系的地方

private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32); offsetTable

保存延迟级别和对应的消费位点

private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = new ConcurrentHashMap<Integer, Long>(32); 核心方法 queueId2DelayLevel

将queueId转换为延迟级别

public static int queueId2DelayLevel(final int queueId) { return queueId + 1; } delayLevel2QueueId

将延迟级别转换为queueId

public static int delayLevel2QueueId(final int delayLevel) { return delayLevel - 1; } updateOffset

更新延迟消息topic的消费位点

private void updateOffset(int delayLevel, long offset) { this.offsetTable.put(delayLevel, offset); if (versionChangeCounter.incrementAndGet() % brokerController.getBrokerConfig().getDelayOffsetUpdateVersionStep() == 0) { long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; dataVersion.nextVersion(stateMachineVersion); } } computeDeliverTimestamp

根据延迟消息级别和消息的存储时间计算该延迟消息的投递时间

public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) { Long time = this.delayLevelTable.get(delayLevel); if (time != null) { return time + storeTimestamp; } return storeTimestamp + 1000; } start()

启动延迟消息服务

shutdown()

关闭start方法中启动的额timer任务

load()

加载消息的消费位点信息和全部的延迟级别信息。延迟级别信息默认如下。

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; parseDelayLevel

格式化所有的延迟级别信息,保存到内存中。

到此这篇关于RocketMQ延迟消息简明介绍的文章就介绍到这了,更多相关RocketMQ延迟消息内容请搜索易知道(ezd.cc)以前的文章或继续浏览下面的相关文章希望大家以后多多支持易知道(ezd.cc)!

推荐阅读

    RocketMQ broker 注册 IP 问题

    RocketMQ broker 注册 IP 问题,配置文件,容器, RocketMQ broker 注册 问题 RocketMQ 装入容器中时,Broker 注册地址将使用容器自身的 IP

    RocketMQ Push 消费模型示例详解

    RocketMQ Push 消费模型示例详解目录使用 DefaultMQPushConsumer 消费消息基于长轮询机制的伪 push 实现客户端侧发起的长轮询请求服

    golang操作rocketmq的示例代码

    golang操作rocketmq的示例代码RocketMQ 是什么
    Github 上关于 RocketMQ 的介绍:RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息

    SpringBoot整合RocketMQ的方法详解

    SpringBoot整合RocketMQ的方法详解目录一:Ubuntu安装RocketMQ二:添加RocketMQ依赖三:在application中添加RocketMQ配置四:编写消费者,消息生

    RocketMQ的push消费方式实现示例

    RocketMQ的push消费方式实现示例目录引言MQ消费方式1、push(推方式)2、pull(拉方式)RocketMQ对于消费方式的实现RocketMQ聪明地实现push的原

    RocketMQ普通消息实战演练详解

    RocketMQ普通消息实战演练详解目录引言普通消息同步发送普通消息异步发送普通消息单向发送集群消费模式广播消费模式引言
    之前研究了Roc

    浅析MMAP零拷贝在RocketMQ中的运用

    浅析MMAP零拷贝在RocketMQ中的运用什么是零拷贝?
    零拷贝(英语: Zero-copy)技术是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另

    RocketMQ消息发送流程源码剖析

    RocketMQ消息发送流程源码剖析目录正文读源码1 调用defaultMQProducerImpl.send()2 设置过期时间3 执行defaultMQProducerImpl.sendDefa