重试
消息发送失败的处理
消费错误处理
自定义MessageHandler类型
Endpoint端点
Metrics指标
Serverless
Partition统一
Polling Consumer
支持多个Binder同时使用
建立事件机制
重试Consumer
端可以配置重试次数,当消息消费失败的时候会进行重试。
底层使用Spring Retry
去重试,重试次数可自定义配置。
# 默认重试次数为3,配置大于1时才会生效
spring.cloud.stream.bindings.<channelName>.consumer.maxAttempte=3
消息发送失败的处理
Producer
发送消息出错的情况下,可以配置错误处理,将错误信息发送给对应ID的MessageChannel
消息发送失败的场景下,会将消息发送到一个MessageChannel
。这个MessageChannel
会取ApplicationContext
中name为topic.errors
(topic
就是配置的destination
)的Bean。
如果找不到就会自动构建一个PublishSubscribeChannel
。
然后使用BridgeHandler
订阅这个MessageChannel
,同时再设置ApplicationContext
中name为errorChannel
的PublishSubscribeChannel
消息通道为BridgeHandler
的outputChannel
。
public static final String ERROR_CHANNEL_BEAN_NAME = "errorChannel"
private SubscribableChannel registerErrorInfrastructure(
ProducerDestination destination) {
// destination.getName() + ".errors"
String errorChannelName = errorsBaseName(destination);
SubscribableChannel errorChannel;
if (getApplicationContext().containsBean(errorChannelName)) {
Object errorChannelObject = getApplicationContext().getBean(errorChannelName);
if (!(errorChannelObject instanceof SubscribableChannel)) {
throw new IllegalStateException("Error channel '" + errorChannelName
+ "' must be a SubscribableChannel");
}
errorChannel = (SubscribableChannel) errorChannelObject;
}
else {
errorChannel = new PublishSubscribeChannel();
((GenericApplicationContext) getApplicationContext()).registerBean(
errorChannelName, SubscribableChannel.class, () -> errorChannel);
}
MessageChannel defaultErrorChannel = null;
if (getApplicationContext()
.containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) {
defaultErrorChannel = getApplicationContext().getBean(
IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
MessageChannel.class);
}
if (defaultErrorChannel != null) {
BridgeHandler errorBridge = new BridgeHandler();
errorBridge.setOutputChannel(defaultErrorChannel);
errorChannel.subscribe(errorBridge);
String errorBridgeHandlerName = getErrorBridgeName(destination);
((GenericApplicationContext) getApplicationContext()).registerBean(
errorBridgeHandlerName, BridgeHandler.class, () -> errorBridge);
}
return errorChannel;
}
示例代码
spring.cloud.stream.bindings.output.destination=test-output
# 消息发送失败的处理逻辑默认是关闭的
spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true
@Bean("test-output.errors")
MessageChannel testOutputErrorChannel() {
return new PublishSubscribeChannel();
}
@Service
class ErrorProduceService {
@ServiceActivator(inputChannel = "test-output.errors")
public void receiveProduceError(Message receiveMsg) {
System.out.println("receive error msg: " + receiveMsg);
}
}
消费错误处理
Consumer
消费消息出错的情况下,可以配置错误处理,将错误信息发给对应ID的MessageChannel
消息错误处理与生产错误处理大致相同。错误的MessageChannel
对应的name为topic.group.errors
,还会加上多个MessageHandler
订阅的一些判断,使用ErrorMessageStrategy
创建错误消息等内容。
示例代码
spring.cloud.stream.bindings.input.destination=test-input
spring.cloud.stream.bindings.input.group=test-input-group
@StreamListener(Sink.INPUT)
public void receive(String receiveMsg) {
throw new RuntimeException("Oops");
}
@ServiceActivator(inputChannel = "test-input.test-input-group.errors")
public void receiveConsumeError(Message receiveMsg) {
System.out.println("receive error msg: " + receiveMsg);
}
建议直接使用topic.group.errors
这个消息通道,并设置发送到单播模式的DirectChannel
消息通道中(使用@ServiceActivator
注解接收会直接构成DirectChannel
),这样会确保只会被唯一的一个订阅了topic.group.errors
的MessageHandler
处理,否则可能会被多个MessageHandler
处理,导致出现一些意想不到的结果。
默认情况下,Output Binding
对应的MessageChannel
和Input Binding
对应的SubscribeChannel
会被构造成DirectChannel
。
SCS提供了BindingTargetFactory
接口进行扩展,比如可以扩展构造PublishSubscribeChannel
这种广播类型的MessageChannel
。
BindingTargetFactory
接口只有两个实现类
SubscribableChannelBindingTargetFactory
:针对Input Binding
和Output Binding
都会构造成DirectWithAttributesChannel
类型的MessageChannel
(一种带有HashMap
属性的DirectChannel
)。
MessageSourceBindingTargetFactory
:不支持Output Binding
,Input Binding
会构造成DefaultPollableMessageSource
。DefaultPollableMessageSource
内部维护着MessageSource
属性,该属性用于拉取消息。
SCS提供了BindingsEndpoint
,可以获取Binding
信息或对Binding
生命周期进行修改,比如start
、stop
、pause
或resume
。
BindingsEndpoint
的ID是bindings,对外暴露了一下3个操作:
修改Binding
状态,可以改成STARTED
、STOPPED
、PAUSED
和RESUMED
,对应Binding
接口的4个操作。
查询单个Binding
的状态信息。
查询所有Binding
的状态信息。
@Endpoint(id = "bindings")
public class BindingsEndpoint {
...
@WriteOperation
public void changeState(@Selector String name, State state) {
Binding<?> binding = BindingsEndpoint.this.locateBinding(name);
if (binding != null) {
switch (state) {
case STARTED:
binding.start();
break;
case STOPPED:
binding.stop();
break;
case PAUSED:
binding.pause();
break;
case RESUMED:
binding.resume();
break;
default:
break;
}
}
}
@ReadOperation
public List<?> queryStates() {
List<Binding<?>> bindings = new ArrayList<>(gatherInputBindings());
bindings.addAll(gatherOutputBindings());
return this.objectMapper.convertValue(bindings, List.class);
}
@ReadOperation
public Binding<?> queryState(@Selector String name) {
Assert.notNull(name, "'name' must not be null");
return this.locateBinding(name);
}
...
}
Metrics指标
该功能自动与micrometer集成进行Metrics统计,可以通过前缀spring.cloud.stream.metrics
进行相关配置,配置项spring.cloud.stream.bindings.applicationMetrics.destination
会构造MetersPublisherBinding
,将相关的metrics发送到MQ中。
默认与Spring Cloud Function
集成。
可以使用Function处理消息。配置文件需要加上function配置。
spring.cloud.stream.function.definition=uppercase | addprefix
@Bean
public Function<String, String> uppercase() {
return x -> x.toUpperCase();
}
@Bean
public Function<String, String> addprefix() {
return x -> "prefix-" + x;
}
Partition统一
SCS统一Partition
相关的设置,可以屏蔽不同MQ Partition的设置。
Producer Binding提供的ProducerProperties提供了一些Partition相关的配置:
partitionKeyExpression
:partition key提取表达式。
partitionKeyExtractorName
:是一个实现PartitionKeyExtractorStrategy
接口的Bean name。PartitionKeyExtractorStrategy
是一个根据Message获取partition key的接口。如果两者都配置,优先级高于partitionKeyExtractorName
。
partitionSelectorName
:是一个实现PartitionSelectorStrategy
接口的Bean name。PartitionSelectorStrategy
是一个根据partition key决定选择哪个partition 的接口。
partitionSelectorExpression
:partition 选择表达式,会根据表达式和partition key得到最终的partition。如果两者都配置,优先partitionSelectorExpression
表达式解析partition。
partitionCount
:partition 个数。该属性不一定会生效,Kafka Binder 和RocketMQ Binder会使用topic上的partition 个数覆盖该属性。
public final class PartitioningInterceptor implements ChannelInterceptor {
...
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
if (!message.getHeaders().containsKey(BinderHeaders.PARTITION_OVERRIDE)) {
int partition = this.partitionHandler.determinePartition(message);
return MessageConverterConfigurer.this.messageBuilderFactory
.fromMessage(message)
.setHeader(BinderHeaders.PARTITION_HEADER, partition).build();
}
else {
return MessageConverterConfigurer.this.messageBuilderFactory
.fromMessage(message)
.setHeader(BinderHeaders.PARTITION_HEADER,
message.getHeaders()
.get(BinderHeaders.PARTITION_OVERRIDE))
.removeHeader(BinderHeaders.PARTITION_OVERRIDE).build();
}
}
}
public class PartitionHandler {
...
public int determinePartition(Message<?> message) {
Object key = extractKey(message);
int partition;
if (this.producerProperties.getPartitionSelectorExpression() != null) {
partition = this.producerProperties.getPartitionSelectorExpression()
.getValue(this.evaluationContext, key, Integer.class);
}
else {
partition = this.partitionSelectorStrategy.selectPartition(key,
this.partitionCount);
}
// protection in case a user selector returns a negative.
return Math.abs(partition % this.partitionCount);
}
private Object extractKey(Message<?> message) {
Object key = invokeKeyExtractor(message);
if (key == null && this.producerProperties.getPartitionKeyExpression() != null) {
key = this.producerProperties.getPartitionKeyExpression()
.getValue(this.evaluationContext, message);
}
Assert.notNull(key, "Partition key cannot be null");
return key;
}
...
}
Polling Consumer
实现MessageSource
进行polling
操作的Consumer
。
普通的Pub/Sub模式需要定义SubscribeableChannel
类型的返回值,Polling Consumer需要定义PollableMessageSource
类型的返回值。
public interface PollableSink {
/**
* Input channel name.
*/
String INPUT = "input";
/**
* @return input channel.
*/
@Input(Sink.INPUT)
PollableMessageSource input();
}
支持多个Binder同时使用
支持多个Binder
同时使用,在配置Binding
的时候需要指定对应的Binder
。
配置全局默认的Binder
:spring.cloud.stream.default-binder=rocketmq
。
配置各个Binder内部的配置信息:
spring.cloud.stream.binders.rocketmq.environment.<xx>=xx
spring.cloud.stream.binders.rocketmq.type=rocketmq
配置Binding
对应的Binder
:
建立事件机制spring.cloud.stream.bindings.<channelName>.binder=kafka
spring.cloud.stream.bindings.<channelName>.binder=rocketmq
spring.cloud.stream.bindings.<channelName>.binder=rabbit
比如,新建BindingCreateEvent
事件,用户的应用就可以监听该事件在创建Input Binding
或Output Binding
时做业务相关的处理。
以上就是Spring Cloud Stream 高级特性使用详解的详细内容,更多关于Spring Cloud Stream 高级特性的资料请关注易知道(ezd.cc)其它相关文章!