SpringBoot中使用RabbtiMq 详解

SpringBoot中使用RabbtiMq 详解

目录

前言

pom.xml  

application.properties

MailConstants (常量)

RabbitConfig (rabbitMq的配置类)

MailSendTask(定时任务,发送)

MailReceiver(接收端)

使用总结

前言

如图使用redisTemplate 一样的简单方便

模拟发送邮件的情况

pom.xml   <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> application.properties spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.host=192.168.91.128 spring.rabbitmq.port=5672 ## 根据自己情况而定,可以不用 spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.prefetch=100

写在配置文件中,由 RabbitProperties 这个类进行读取,封装到ConnectionFactory 中。

MailConstants (常量) public class MailConstants { public static final Integer DELIVERING = 0;//消息投递中 public static final Integer SUCCESS = 1;//消息投递成功 public static final Integer FAILURE = 2;//消息投递失败 public static final Integer MAX_TRY_COUNT = 3;//最大重试次数 public static final Integer MSG_TIMEOUT = 1;//消息超时时间 public static final String MAIL_QUEUE_NAME = "javaboy.mail.queue"; public static final String MAIL_EXCHANGE_NAME = "javaboy.mail.exchange"; public static final String MAIL_ROUTING_KEY_NAME = "javaboy.mail.routing.key"; } RabbitConfig (rabbitMq的配置类) import org.javaboy.vhr.model.MailConstants; import org.javaboy.vhr.service.MailSendLogService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { public final static Logger logger = LoggerFactory.getLogger(RabbitConfig.class); @Autowired CachingConnectionFactory cachingConnectionFactory; //发送邮件的 @Autowired MailSendLogService mailSendLogService; @Bean RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); //手动应答返回的标志 rabbitTemplate.setConfirmCallback((data, ack, cause) -> { String msgId = data.getId(); if (ack) { logger.info(msgId + ":消息发送成功"); mailSendLogService.updateMailSendLogStatus(msgId, 1);//修改数据库中的记录,消息投递成功 } else { logger.info(msgId + ":消息发送失败"); } }); rabbitTemplate.setReturnCallback((msg, repCode, repText, exchange, routingkey) -> { logger.info("消息发送失败"); }); return rabbitTemplate; } @Bean Queue mailQueue() { return new Queue(MailConstants.MAIL_QUEUE_NAME, true); } @Bean DirectExchange mailExchange() { return new DirectExchange(MailConstants.MAIL_EXCHANGE_NAME, true, false); } @Bean Binding mailBinding() { return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MailConstants.MAIL_ROUTING_KEY_NAME); } } MailSendTask(定时任务,发送) @Component public class MailSendTask { @Autowired MailSendLogService mailSendLogService; @Autowired RabbitTemplate rabbitTemplate; @Autowired EmployeeService employeeService; @Scheduled(cron = "0/10 * * * * ?") public void mailResendTask() { List<MailSendLog> logs = mailSendLogService.getMailSendLogsByStatus(); if (logs == null || logs.size() == 0) { return; } logs.forEach(mailSendLog->{ if (mailSendLog.getCount() >= 3) { mailSendLogService.updateMailSendLogStatus(mailSendLog.getMsgId(), 2);//直接设置该条消息发送失败 }else{ mailSendLogService.updateCount(mailSendLog.getMsgId(), new Date()); Employee emp = employeeService.getEmployeeById(mailSendLog.getEmpId()); /** * 参数1:交换机名称 * 参数2 :路由key * 参数三:数据 * 参数4:作为唯一标识 * */ rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME, MailConstants.MAIL_ROUTING_KEY_NAME, emp, new CorrelationData(mailSendLog.getMsgId())); } }); } } MailReceiver(接收端) @Component public class MailReceiver { public static final Logger logger = LoggerFactory.getLogger(MailReceiver.class); @Autowired JavaMailSender javaMailSender; @Autowired MailProperties mailProperties; @Autowired TemplateEngine templateEngine; @Autowired StringRedisTemplate redisTemplate; @RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME) public void handler(Message message, Channel channel) throws IOException { Employee employee = (Employee) message.getPayload(); MessageHeaders headers = message.getHeaders(); Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); String msgId = (String) headers.get("spring_returned_message_correlation"); if (redisTemplate.opsForHash().entries("mail_log").containsKey(msgId)) { //redis 中包含该 key,说明该消息已经被消费过 logger.info(msgId + ":消息已经被消费"); channel.basicAck(tag, false);//确认消息已消费 return; } //收到消息,发送邮件 MimeMessage msg = javaMailSender.createMimeMessage(); MimeMessageHelper helper = new MimeMessageHelper(msg); try { helper.setTo(employee.getEmail()); helper.setFrom(mailProperties.getUsername()); helper.setSubject("入职欢迎"); helper.setSentDate(new Date()); Context context = new Context(); context.setVariable("name", employee.getName()); context.setVariable("posName", employee.getPosition().getName()); context.setVariable("joblevelName", employee.getJobLevel().getName()); context.setVariable("departmentName", employee.getDepartment().getName()); //根据模板发送 String mail = templateEngine.process("mail", context); helper.setText(mail, true); javaMailSender.send(msg); redisTemplate.opsForHash().put("mail_log", msgId, "javaboy"); channel.basicAck(tag, false); logger.info(msgId + ":邮件发送成功"); } catch (MessagingException e) { //手动应答, tag 消息id ,、 channel.basicNack(tag, false, true); e.printStackTrace(); logger.error("邮件发送失败:" + e.getMessage()); } } } 使用总结

0. rabbtMq的本地服务,得开启。(跟redis差不多)

1. 写 application.properties中的rabbitMq的连接配置等

2. rabbitConfig配置文件。(包括:交换机选择与队列的配置,绑定),选择的模式在这里配置

3. 直接使用,导入rabbitTemplate类,使用rabbitTemplate.convertAndSend()方法

4. 接收类

@RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)  public void handler(Message message, Channel channel) throws IOException {         业务逻辑了         手动接收等等 }

到此这篇关于SpringBoot 中使用RabbtiMq 详解的文章就介绍到这了,更多相关SpringBoot RabbtiMq 内容请搜索易知道(ezd.cc)以前的文章或继续浏览下面的相关文章希望大家以后多多支持易知道(ezd.cc)!

推荐阅读

    SpringBoot自动配置的实现原理是什么

    SpringBoot自动配置的实现原理是什么,配置,组件,文件,方法,注册,获取,一、什么是springboot自动配置SpringBoot通过@EnableAutoConfiguration注

    SpringBoot启动流程是什么

    SpringBoot启动流程是什么,应用程序,方法,组件,上下文,对象,配置,SpringBoot启动过程简介SpringBoot应用程序的启动过程可以分为以下几个步骤:加

    SpringBoot之整合Shiro

    SpringBoot之整合Shiro,SpringBoot,之,整合,Shiro,1.SpringBoot,整合,Shiro,,1.SpringBoot整合Shiro思路2. 环境搭建2.1 创建项目2.2 引入

    SpringBoot与SpringCache

    SpringBoot与SpringCache,SpringBoot,与,SpringCache,1.SpringCache,的,概念,首先,,1.SpringCache的概念首先我们知道jpa,jdbc这些东西都

    SpringBoot集成kafka全面实战

    SpringBoot集成kafka全面实战,SpringBoot,集成,kafka,全面,实战,一,、,前戏,在,,一、前戏1、在项目中连接kafka,因为是外网,首先要开放kafka

    SpringBoot集成Elasticseach

    SpringBoot集成Elasticseach,SpringBoot,集成,Elasticseach,一,、,Elasticseach,介绍,,  一、Elasticseach介绍  1.简单介绍  官网:

    SpringBoot2.x 集成 Dozer

    SpringBoot2.x 集成 Dozer,SpringBoot2.x,集成,Dozer,一,、,引入,依赖,dependency,,一、引入依赖<dependency> <groupId>com.github.doze