一、关于MQ的介绍 消息队列MQ是一种应用之间异步通信的方式 , 主要是由三个部分组成:第一个是生产者(producer) , 生产者也就是说生产消息的这一端 , 然后主要是负责消息所承载业务信息的一个实例化 , 是我们整个消息的发起方;第二个是Broker , 它是我们整个消息的服务端 , 主要是去处理我们这个消息单元 , 它负责消息的存储、投递以及队列的一些其他附加功能的实现 , 它是整个消息队列里面最核心的组成部分;第三个消费者(consumer) , 主要负责消息的消费 , 具体是根据消息所承载的一个信息去处理各种业务逻辑 。市面上几款比较火爆的MQ有:ActiveMQ , RocketMQ , Kafka , RabbitMQ 。
RabbitMQ的完整架构图:
二、确保消息发送可靠性 大家知道 , RabbitMQ中的消息发送引入了 Exchange(交换机)的概念 , 消息的发送首先到达交换机上 , 然后再根据既定的路由规则 , 由交换机将消息Roueting(路由)到不同的 Queue(队列)中 , 再由不同的消费者去消费 。
大致的流程就是这样 , 所以要确保消息发送的可靠性 , 主要从两方面去确认:
1. 消息成功到达 Exchange
2. 消息成功到达 Queue
如果能确认这两步 , 那么我们就可以认为消息发送成功了 。如果这两步中任一步骤出现问题 , 那么消息就没有成功送达 , 此时我们可能要通过重试等方式去重新发送消息 , 多次重试之后 , 如果消息还是不能到达 , 则可能就需要人工介入了 。
经过上面的分析 , 我们可以确认 , 要确保消息成功发送 , 我们只需要做好三件事就可以了:
1. 确认消息到达 Exchange
2. 确认消息到达 Queue
3. 开启定时任务 , 定时投递那些发送失败的消息
上面提出的三个步骤 , 第三步需要我们根据业务逻辑实现 , 前两步RabbitMQ则给出了现成的解决方案 。
那么 , 该如何确保消息成功到达RabbitMQ?RabbitMQ有两种解决方案:
1.开启事务机制
2.发送方确认机制
注意:这是两种不同的方案 , 不可以同时开启 , 只能选择其中之一 。
2.1开启事务模式 Spring Boot 中开启 RabbitMQ 事务机制的方式如下: 首先需要先提供一个事务管理器 , 如下:
@BeanRabbitTransactionManager transactionManager(ConnectionFactory connectionFactory){return new RabbitTransactionManager(connectionFactory);} 接下来 , 在消息生产者上面做两件事:添加事务注解并设置通信信道为事务模式:
@Servicepublic class MsgService {@AutowiredRabbitTemplate rabbitTemplate;@Transactionalpublic void send() {rabbitTemplate.setChannelTransacted(true);rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JUMPING_QUEUE_NAME,"hello rabbitmq!".getBytes());int i = 1 / 0;}} 这里注意两点:
1. 发送消息的方法上添加 @Transactional 注解标记事务
2. 调用 setChannelTransacted 方法设置为 true 开启事务模式
这就 OK 了 。在上面的案例中 , 我们在结尾来了个 1/0 , 这在运行时必然抛出异常 , 我们可以尝试运行该方法 , 发现消息并未发送成功 。
当我们开启事务模式之后 , RabbitMQ 生产者发送消息会多出四个步骤:
1. 客户端发出请求 , 将信道设置为事务模式
2. 服务端给出回复 , 同意将信道设置为事务模式
3. 客户端发送消息
4. 客户端提交事务
5. 服务端给出响应 , 确认事务提交
上面的步骤 , 除了第三步是本来就有的 , 其他几个步骤都是平白无故多出来的 。所以大家看到 , 事务模 式其实效率有点低 , 这并非一个最佳解决方案 。我们可以想想 , 什么项目会用到消息中间件?一般来说 都是一些高并发的项目 , 这个时候并发性能尤为重要 。所以 , RabbitMQ还提供了发送方确认机制(publisher confirm)来确保消息发送成功 , 这种方式 , 性能要远远高于事务模式 。
2.2发送方确认机制 发送方确认机制需要在application.properties中配置两行代码:
【RabbitMQ简单入门】spring.rabbitmq.publisher-confirm-type=correlated //配置消息到达交换器的确认回调
spring.rabbitmq.publisher-returns=true //配置消息到达队列的回调
接下来我们要开启监听:
@Configurationpublic class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);public final static String MY_EXCHANGE_NAME = "my_exchange_name";public final static String MY_QUEUE_NAME = "my_queue_name";@AutowiredRabbitTemplate rabbitTemplate;/*** 当前类的构造方法执行完成后 , 会自动触发该方法 , 可以在该方法中 , 完成一些对象的初始化操作*/@PostConstructpublic void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@BeanBinding binding() {return BindingBuilder.bind(myQueue()).to(directExchange()).with(MY_QUEUE_NAME);}@BeanDirectExchange directExchange() {return new DirectExchange(MY_EXCHANGE_NAME, true, false);}@BeanQueue myQueue() {return new Queue(MY_QUEUE_NAME, true, false, false);}/*** 如果不管有没有到达交换机 , 都会触发该方法** @param correlationData 这个是携带的 correlationId 在这里* @param ack:这个值如果为true , 表示交换机收到消息了 , false 表示消息未到达交换机* @param cause*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {logger.info("消息到达交换机了 , 消息 id:{}", correlationData.getId());} else {logger.error("消息未到达交换机 , 消息 id:{}", correlationData.getId());}}/*** 如果消息未到达队列 , 会触发该方法** @param returned*/@Overridepublic void returnedMessage(ReturnedMessage returned) {logger.error("消息未到达队列:{}", returned.toString());} 关于这个配置类 , 我说如下几点:
1. 定义配置类 , 实现RabbitTemplate.ConfirmCallback和RabbitTemplate.ReturnsCallback两个接口 , 这两个接口 , 前者的回调用来确定消息到达交换器 , 后者则会在消息路由到队列失败时被调用
2. 定义 initRabbitTemplate 方法并添加 @PostConstruct 注解 , 在该方法中为rabbitTemplate分别 配置这两个Callback 。
这就可以了 。
- 春季老年人吃什么养肝?土豆、米饭换着吃
- 三八妇女节节日祝福分享 三八妇女节节日语录
- 老人谨慎!选好你的“第三只脚”
- 校方进行了深刻的反思 青岛一大学生坠亡校方整改校规
- 脸皮厚的人长寿!有这特征的老人最长寿
- 长寿秘诀:记住这10大妙招 100%增寿
- 春季老年人心血管病高发 3条保命要诀
- 眼睛花不花要看四十八 老年人怎样延缓老花眼
- 香槟然能防治老年痴呆症? 一天三杯它人到90不痴呆
- 老人手抖的原因 为什么老人手会抖
