使用 DefaultMQPushConsumer 消费消息
基于长轮询机制的伪 push 实现
客户端侧发起的长轮询请求
服务端阻塞请求
客户端回调处理
客户端发起请求的底层逻辑
PullCallback 回调
总结
Push 模式是指由 Server 端来控制消息的推送,即当有消息到 Server 之后,会将消息主动投递给 client(Consumer 端)。
使用 DefaultMQPushConsumer 消费消息下面是使用 DefaultMQPushConsumer 消费消息的官方示例代码:
// 初始化consumer,并设置consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyGroup");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS 为消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动Consumer
consumer.start();
这里看到主要是通过 consumer 注册回调接口来处理从 Broker 中收到的消息。这种监听回调的机制很容易想到是一种观察者模式或者事件机制;对于这种 C-S 模型的架构来说,如果要做到 Server 在有新消息时立即推送给 Client,那么 Client 和 Server 之间应该是有连接存在的,Client 端开放端口来 watch Server 的推送。这里好论证,即可以查看当前 Client 端所在进程开启了什么端口即可,通过如下指令查看:
1、先通过 jps 查看 Consumer Client 的进程号
➜ rocketmq-4.9.4 git:(06f2208a3) jps
10722 Jps
4676 rocketmq-dashboard-1.0.1-SNAPSHOT.jar
1766
4121 BrokerStartup
4009 NamesrvStartup
9419 PushConsumer
9692 RemoteMavenServer36
可以看到 PushConsumer 的进程号是 9419
2、通过 lsof 命令查看进程端口占用
➜ rocketmq-4.9.4 git:(06f2208a3) lsof -nP -p 9419| grep LISTEN
➜
这里没有看到 PushConsumer 有开启端口。同样,这里可以看看 Broker 的进程端口占用
➜ rocketmq-4.9.4 git:(06f2208a3) lsof -nP -p 4121| grep LISTEN
java 4121 glmapper 137u IPv6 0xca1142b0f200067d 0t0 TCP *:10912 (LISTEN)
java 4121 glmapper 141u IPv6 0xca1142b0f1fc8cfd 0t0 TCP *:10911 (LISTEN)
java 4121 glmapper 142u IPv6 0xca1142b0f1fc935d 0t0 TCP *:10909 (LISTEN)
所以得到一个初步的结论是,在 Push 模式下,Consumer Client 并没有启动端口来接收 Server 的消息推送。 那么 RocketMQ 是怎么实现的?
基于长轮询机制的伪 push 实现真正的 Push 方式,是 Server 端接收到消息后,主动把消息推送给 Client 端,这种情况一般需要 Client 和 Server 之间建立长连接。通过前面的分析,Client 既然没有开启端口用于接收 Server 的信息推送,那么只有一种可能就是 Client 自己去拉了消息,但是这种主动拉消息的方式是对于用户无感的,从使用上体验上来看,做到了和 push 一样的效果;这种机制就是“长轮询”。
为啥不用长连接方式,让 Server 主动 Push 呢?其实很好理解,对于一个提供队列服务的 Server 来说,用 Push方式主动推送有两个问题:
1、会增加 Server 端的工作量,进而影响 Server 的性能
2、Client 的处理能力存在差异,Client 的状态不受 Server 控制,如果 Client 不能及时处理 Server 推送过来的消息,会造成各种潜在问题
客户端侧发起的长轮询请求下图是初始化相关资源的过程,DefaultMQPushConsumer 是面向用户使用的 API client 类,内部处理实际上是委托给 DefaultMQPushConsumerImpl 来处理的。DefaultMQPushConsumerImpl#start 时,会初始化 MQClientInstance ,MQClientInstance 初始化过程中又会初始化一堆资源,比如请求-响应的通道,开启各种各样的调度任务(定期拉去 NameServerAddress、定期更新 Topic 路由信息、定期清理 Offline状态的 Broker、定期发送心跳给 Broker、定期持久化所有 Consumer Offset等等),开启 pullMessageService,开启 rebalance Service 等等。大致的调用链如下图
下面这个代码片段是 pullMessageService 的 run 方法(pullMessageService 是 Runnable 子类)
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 从 pullRequestQueue 中取 pullRequest
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
通过代码,可以直观的看起,pullMessageService 会一直从 pullRequestQueue 中取 pullRequest,然后执行 pullMessage 请求。实际上 MessageQueue 是和 pullRequest 一一对应的 ,pullRequest 全部存储到该 Consumer 的 pullRequestQueue 队列里面;消费者会不停的从 PullRequest 的队列里取 request 然后向broker 请求消息。
这里还有一个问题是队列取出之后什么时候放回去的?在 pullMessage 的回调方法中,如果正常得到了 broker 的响应,那么会把 PullRequest放回队列,相关代码可以从 org.apache.rocketmq.client.consumer.PullCallback
onSuccess 方法中得到答案。
服务端处理 pullRequest 请求的是 PullMessageProcessor,当没有消息时,则通过 PullRequestHoldService 将当前请求先 hold 住。
case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
// 如果是 LongPolling,则 hold 住
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
PullRequestHoldService 中会将所有的 PullRequest 缓存到 pullRequestTable。PullRequestHoldService 也是一个 task,默认每次 hold 5s 然后再去检查是否有新的消息过来,如果有新的消息到来,则唤醒对应的线程来将消息返回给客户端。
// 已省略无关代码
public void run() {
// loop
while (!this.isStopped()) {
// default hold 5s
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
// 检查是否有新的消息到达
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
}
}
客户端回调处理
我们在编写 consumer 代码时,基于 push 模式是通过如下方式来监听消息的
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS 为消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
通过前面的分析,对于如何通过“长轮询”实现伪“push” 有了大概得了解;客户端通过一个定时任务不断向 Broker 发请求,Broker 在没有消息时先 hold 住一小段时间,当有新的消息时会立即将消息返回给 consumer;本节就主要探讨 consumer 在收到消息之后的处理逻辑,以及是怎么触发 MessageListener 回调执行的。
客户端发起请求的底层逻辑以异步调用为例,代码在
org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync
中,截取部分代码如下:
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
assert pullResult != null;
// 成功回调
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
// 异常回调
pullCallback.onException(e);
}
} else {
if (!responseFuture.isSendRequestOK()) {
// 异常回调
pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
// 异常回调
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
responseFuture.getCause()));
} else {
// 异常回调
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
}
}
}
});
PullCallback 回调
PullCallback 回调逻辑在 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
方法中,以正常返回消息为例:
// 已省略无关代码
public void onSuccess(PullResult pullResult) {
// 将接收到的消息 交给 consumeMessageService 处理
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// 将 pullRequest 放回 pullRequestQueue
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
ConsumeRequest 是一个 Runnable,submitConsumeRequest 就是将返回结果丢在一个单独的线程池中去处理返回结果的。ConsumeRequest 的 run 方法中,会拿到 messageListener,然后执行 consumeMessage 方法。
总结到此,关于 RocketMQ push 消费模型基本就探讨完了。从实现机制上来看,push 本质上并不是在建立双向通道的前提下,由 Server 主动推送给 Client 的,而是由 Client 端触发 pullRequest 请求,以长轮询的方式“伪装”的结果。从代码上来,RocketMQ 代码中使用了非常多的异步机制,如 pullRequestQueue 来解耦发送请求和等待结果,各种定时任务等等。
整体看,PushConsumer 采用了 长轮询+超时时间+Pull的模式, 这种方式带来的好处总结如下 :
1、减少 Broker 的压力,避免由于不同 Consumer 消费能力导致 Broker 出现问题
2、确保了 Consumer 不会负载过高,Consumer 在校验自己的缓存消息没有超过阈值才会去从 Broker 拉取消息,Broker 不会主动推过来
3、兼顾了消息的即时性,Broker 在没有消息的时候会先 hold 一小段时间,有消息会立即唤起线程将消息返回给 Consumer
4、Broker 端无效请求的次数大大降低,Broker 在没有消息时会挂起 PullRequest,而 Consumer 在未接收到Response 且未超时时,也不会重新发起 PullRequest
以上就是RocketMQ Push 消费模型示例详解的详细内容,更多关于RocketMQ Push 消费模型的资料请关注易知道(ezd.cc)其它相关文章!