springboot整合redis之消息队列

目录

一、项目准备

二、配置类

三、redis中list数据类型

定时器监听队列

运行即监控队列

四、发布/订阅模式

五、ZSet实现延迟队列

一、项目准备

依赖

<!-- RedisTemplate --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- Redis-Jedis --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency>

application.yaml配置文件

spring: redis: host: 127.0.0.1 port: 6379 database: 0 timeout: 4000 jedis: pool: max-wait: -1 max-active: -1 max-idle: 20 min-idle: 10 二、配置类 public class ObjectMapperConfig { public static final ObjectMapper objectMapper; private static final String PATTERN = "yyyy-MM-dd HH:mm:ss"; static { JavaTimeModule javaTimeModule = new JavaTimeModule(); javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer()); javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer()); objectMapper = new ObjectMapper() // 转换为格式化的json(控制台打印时,自动格式化规范) //.enable(SerializationFeature.INDENT_OUTPUT) // Include.ALWAYS 是序列化对像所有属性(默认) // Include.NON_NULL 只有不为null的字段才被序列化,属性为NULL 不序列化 // Include.NON_EMPTY 如果为null或者 空字符串和空集合都不会被序列化 // Include.NON_DEFAULT 属性为默认值不序列化 .setSerializationInclusion(JsonInclude.Include.NON_NULL) // 如果是空对象的时候,不抛异常 .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false) // 反序列化的时候如果多了其他属性,不抛出异常 .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) // 取消时间的转化格式,默认是时间戳,可以取消,同时需要设置要表现的时间格式 .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) .setDateFormat(new SimpleDateFormat(PATTERN)) // 对LocalDateTime序列化跟反序列化 .registerModule(javaTimeModule) .setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY) // 此项必须配置,否则会报java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to XXX .enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY) ; } static class LocalDateTimeSerializer extends JsonSerializer<LocalDateTime> { @Override public void serialize(LocalDateTime value, JsonGenerator gen, SerializerProvider serializers) throws IOException { gen.writeString(value.format(DateTimeFormatter.ofPattern(PATTERN))); } } static class LocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> { @Override public LocalDateTime deserialize(JsonParser p, DeserializationContext deserializationContext) throws IOException { return LocalDateTime.parse(p.getValueAsString(), DateTimeFormatter.ofPattern(PATTERN)); } } } @Configuration public class RedisConfig { /** * redisTemplate配置 */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); // 配置连接工厂 template.setConnectionFactory(factory); //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式) Jackson2JsonRedisSerializer<Object> jacksonSerializer = new Jackson2JsonRedisSerializer<>(Object.class); jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper); StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); // 使用StringRedisSerializer来序列化和反序列化redis的key,value采用json序列化 template.setKeySerializer(stringRedisSerializer); template.setValueSerializer(jacksonSerializer); // 设置hash key 和value序列化模式 template.setHashKeySerializer(stringRedisSerializer); template.setHashValueSerializer(jacksonSerializer); template.afterPropertiesSet(); return template; } } 三、redis中list数据类型

在Redis中,List类型是按照插入顺序排序的字符串链表。和数据结构中的普通链表一样,我们可以在其头部和尾部添加新的元素

优势:

顺序排序,保证先进先出

队列为空时,自动从Redis数据库删除

在队列的两头插入或删除元素,效率极高,即使队列中元素达到百万级

List中可以包含的最大元素数量是4294967295

定时器监听队列

生产者

@Slf4j @Component public class MessageProducer { public static final String MESSAGE_KEY = "message:queue"; @Autowired private RedisTemplate<String,Object> redisTemplate; public void lPush() { for (int i = 0; i < 10; i++) { new Thread(() -> { Long size = redisTemplate.opsForList().leftPush(MESSAGE_KEY, Thread.currentThread().getName() + ":hello world"); log.info(Thread.currentThread().getName() + ":put message size = " + size); }).start(); } } }

消费者:消费消息,定时器以达到监听队列功能

@Slf4j @Component @EnableScheduling public class MessageConsumer { public static final String MESSAGE_KEY = "message:queue"; @Autowired private RedisTemplate<String,Object> redisTemplate; @Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000) public void rPop() { String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY); log.info(message); } } @RestController public class RedisController { @Autowired private MessageProducer messageProducer; @GetMapping("/lPush") public void lPush() { messageProducer.lPush(); } }

测试

http://localhost:8080/lPush

可能出现的问题:

1.通过定时器监听List中是否有待处理消息,每执行一次都会发起一次连接,这会造成不必要的浪费。

2.生产速度大于消费速度,队列堆积,消息时效性差,占用内存。

运行即监控队列

修改消息消费者代码。

当队列没有元素时,会阻塞10秒,然后再次监听队列,
需要注意的是,阻塞时间必须小于连接超时时间

@Slf4j @Component @EnableScheduling public class MessageConsumer { public static final String MESSAGE_KEY = "message:queue"; @Autowired private RedisTemplate<String,Object> redisTemplate; //@Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000) public void rPop() { String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY); log.info(message); } @PostConstruct public void brPop() { new Thread(() -> { while (true) { String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY, 10, TimeUnit.SECONDS); log.info(message); } }).start(); } }

阻塞时间不能为负,直接报错超时为负
阻塞时间为零,此时阻塞时间等于超时时间,最后报错连接超时
阻塞时间大于超时时间,报错连接超时

测试:

消息不可重复消费,因为消息从队列POP之后就被移除了,即不支持多个消费者消费同一批数据

消息丢失,消费期间发生异常,消息未能正常消费

四、发布/订阅模式

消息可以重复消费,多个消费者订阅同一频道即可

一个消费者根据匹配规则订阅多个频道

消费者只能消费订阅之后发布的消息,这意味着,消费者下线再上线这期间发布的消息将会丢失

数据不具有持久化。同样Redis宕机也会数据丢失

消息发布后,是推送到一个缓冲区(内存),消费者从缓冲区拉取消息,当消息堆积,缓冲区溢出,消费者就会被迫下线,同时释放对应的缓冲区

RedisConfig中添加监听器

/** * redis消息监听器容器 */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅频道,通配符*表示任意多个占位符 container.addMessageListener(new MySubscribe(), new PatternTopic("channel*")); return container; }

订阅者

package com.yzm.redis08.message; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; public class MySubscribe implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { System.out.println("订阅频道:" + new String(message.getChannel())); System.out.println("接收数据:" + new String(message.getBody())); } }

消息发布

@GetMapping("/publish") public void publish() { redisTemplate.convertAndSend("channel_first", "hello world"); }

另一种发布方式

/** * redis消息监听器容器 */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅频道,通配符*表示任意多个占位符 container.addMessageListener(new MySubscribe(), new PatternTopic("channel*")); // 通配符?:表示一个占位符 MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage"); listenerAdapter.afterPropertiesSet(); container.addMessageListener(listenerAdapter, new PatternTopic("channel?")); return container; } public class MySubscribe2 { public void getMessage(Object message, String channel) { System.out.println("订阅频道2:" + channel); System.out.println("接收数据2:" + message); } } @GetMapping("/publish2") public void publish2() { redisTemplate.convertAndSend("channel2", "hello world"); }

消息是实体对象,进行转换

@Data @Builder @NoArgsConstructor @AllArgsConstructor public class User implements Serializable { private static final long serialVersionUID = 5250232737975907491L; private Integer id; private String username; } public class MySubscribe3 implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { Jackson2JsonRedisSerializer<User> jacksonSerializer = new Jackson2JsonRedisSerializer<>(User.class); jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper); User user = jacksonSerializer.deserialize(message.getBody()); System.out.println("订阅频道3:" + new String(message.getChannel())); System.out.println("接收数据3:" + user); } } /** * redis消息监听器容器 */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅频道,通配符*:表示任意多个占位符 container.addMessageListener(new MySubscribe(), new PatternTopic("channel*")); // 通配符?:表示一个占位符 MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage"); listenerAdapter.afterPropertiesSet(); container.addMessageListener(listenerAdapter, new PatternTopic("channel?")); container.addMessageListener(new MySubscribe3(), new PatternTopic("user")); return container; } @GetMapping("/publish3") public void publish3() { User user = User.builder().id(1).username("yzm").build(); redisTemplate.convertAndSend("user", user); }

五、ZSet实现延迟队列

生产消息,score = 时间搓+60s随机数

public static final String MESSAGE_ZKEY = "message:ZSetqueue"; public volatile AtomicInteger count = new AtomicInteger(); public void zAdd() { for (int i = 0; i < 10; i++) { new Thread(() -> { int increment = count.getAndIncrement(); log.info(Thread.currentThread().getName() + ":put message to zset = " + increment); double score = System.currentTimeMillis() + new Random().nextInt(60 * 1000); redisTemplate.opsForZSet().add(MESSAGE_ZKEY, Thread.currentThread().getName() + " hello zset:" + increment, score); }).start(); } }

消费者:定时任务,每秒执行一次

public static final String MESSAGE_ZKEY = "message:ZSetqueue"; public SimpleDateFormat simpleDateFormat = new SimpleDateFormat(); @Scheduled(initialDelay = 5 * 1000, fixedRate = 1000) public void zrangebysocre() { log.info("延时队列消费。。。"); // 拉取score小于当前时间戳的消息 Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(MESSAGE_ZKEY, 0, System.currentTimeMillis()); if (messages != null) { for (Object message : messages) { Double score = redisTemplate.opsForZSet().score(MESSAGE_ZKEY, message); log.info("消费了:" + message + "消费时间为:" + simpleDateFormat.format(score)); redisTemplate.opsForZSet().remove(MESSAGE_ZKEY, message); } } } @GetMapping("/zadd") public void zadd() { messageProducer.zAdd(); }

 到此这篇关于springboot整合redis之消息队列的文章就介绍到这了,更多相关springboot redis消息队列内容请搜索易知道(ezd.cc)以前的文章或继续浏览下面的相关文章希望大家以后多多支持易知道(ezd.cc)!

推荐阅读

    学习写字楼新选择6000元主流配置

    学习写字楼新选择6000元主流配置,,这种配置需要考虑双核心的办公和娱乐平台,充分考虑办公室的办公需求和娱乐需求,以约6000元的预算和cost-e

    酷睿I7 配置

    酷睿I7 配置,配置,玩家国度啦华硕 Rampage II Extreme(3800元)如果米不够,也可以把Extreme改为Gene,不过是小板内存推荐金士顿6G DDR3 2000骇

    提高3A四核羿龙II游戏配置的性能

    提高3A四核羿龙II游戏配置的性能,,以节能环保为主题的IT产业,目前3A低端平台处理器、主板芯片组、独立开发卡性能突出,特别是在与AMD的处理

    opporeno8参数配置及价格

    opporeno8参数配置及价格,面部,亿元,Oppo的荣誉2020年1月4日,接近屏幕关闭传感器是否支持双卡:支持oppor11splus什么时候上市的Oppo R11S P

    查看配置:酷睿i3530集展示办公平台

    查看配置:酷睿i3530集展示办公平台,,由于时间和精力的关系,我们不可能对所有的配置进行评论,希望我们能理解,我希望我们的评论能在那些需要帮

    3500元超额值学生娱乐结构的优化配置

    3500元超额值学生娱乐结构的优化配置,,作为一个DIY的主流用户领域的学生,每个用户51学生攒机的高峰。因为学生用户没有稳定的收入来源,攒机