在https://editor.csdn.net/md/?articleId=123689998中已经介绍了部分RabbitMQ的相关知识,但是都是基于Rabbit MQ正常工作,在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复,这种情况下会出现如下的报错信息 。如何才能进行RabbitMQ的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢?
所以在RabbitMQ中设置消息回退机制,可以解决消息丢失的问题 。
应用[xxx]在[08-1516:36:04]发生[错误日志异常],alertId=[xxx] 。由[org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620]触发 。应用xxx 可能原因如下 服务名为: 异常为:org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620, 产生原因如下:1.org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.||Consumer received fatal=false exception on startup:
文章目录
- RabbitMQ发生异常的处理
- 交换机确认:
- 需要实现一个回调接口的函数:
- 配置类
- 消息的生产者
- 消费者
- 消息回退
- 生产者代码,加入消息回退的参数
- 回调接口
- 备份交换机
- 配置类的修改
- 报警消费者
- RabbitMQ其他
- 幂等性问题
- 幂等性概念
- 幂等性解决方法
- 消费端的幂等性保障
- 优先级队列
- 队列添加优先级的方式
- 在Web界面进行添加
- 在代码的队列中进行优先级添加
- 在发送消息的代码中进行添加
- 注意
- 惰性队列
- 两种模式
- RabbitMQ集群
- 搭建步骤
- 镜像队列
- 镜像队列的设置:
RabbitMQ发生异常的处理 首先,RabbitMQ发生异常可能有两种情况,可能是交换机异常或者是消息队列异常,所以当交换机收到生产者发送的消息时,需要让生产者知道,该消息交换机已经接收到了,或者没有接收到 。
所以就需要在Spring Boot中添加配置信息:发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type=correlated - NONE:禁用发布确认模式,是默认值
- CORRELATED:发布消息成功到交换器后会触发回调方法
- SIMPLE:经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法 等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker
@Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback { /** * 交换机不管是否收到消息的一个回调方法 * CorrelationData * * 消息相关数据 * * ack * * 交换机是否收到消息 * */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id=correlationData!=null?correlationData.getId():""; if(ack){log.info("交换机已经收到id为:{}的消息",id); }else{log.info("交换机还未收到id为:{}消息,由于原因:{}",id,cause);}} } 但是实现这个接口只能够让生产者知道该消息是否到达交换机,但是并不知道这条消息的具体内容:如下面的代码:首先按照该方式设计好交换机和队列
配置类
@Configuration public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";//声明业务Exchange@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("key1");}} 消息的生产者 @RestController @RequestMapping("/confirm") @Slf4j public class Producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate MyCallBack myCallBack;//依赖注入rabbitTemplate之后再设置它的回调对象@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(myCallBack);}@GetMapping("sendMessage/{message}")public void sendMessage(@PathVariable String message){//指定消息id为随机数CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());String routingKey="key1";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);CorrelationData correlationData2=new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);log.info("发送消息内容:{}",message);}} 消费者 @Component @Slf4j public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; @RabbitListener(queues =CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msg=new String(message.getBody());log.info("接受到队列confirm.queue消息:{}",msg);}} 结果分析:可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key1”,第二条消息的 RoutingKey 为 “key2”,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了 。
消息回退 在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的 。那么如何让无法被路由的消息让生产者知道 。通过设置mandatory参数可以在当消息传递过程中不可达目的地时将消息返回给生产者 。
生产者代码,加入消息回退的参数
@Slf4j @Component public class MessageProducer implements RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; //rabbitTemplate注入之后就设置该值 @PostConstruct private void init() {rabbitTemplate.setConfirmCallback(this);/** * true:* 交换机无法将消息进行路由时,会将该消息返回给生产者* * false: * 如果发现消息无法进行路由,则直接丢弃*/rabbitTemplate.setMandatory(true);//设置回退消息交给谁处理rabbitTemplate.setReturnCallback(this);} @GetMapping("sendMessage") public void sendMessage(String message){//让消息绑定一个id值CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("confirm.exchange","key1",message+"key1",correlationData1);log.info("发送消息id为:{}内容为{}",correlationData1.getId(),message+"key1"); CorrelationData correlationData2 = newCorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("confirm.exchange","key2",message+"key2",correlationData2);log.info("发送消息id为:{}内容为{}",correlationData2.getId(),message+"key2"); } @Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {log.info("交换机收到消息确认成功, id:{}", id);} else {log.error("消息id:{}未成功投递到交换机,原因是:{}", id, cause);}}@Overridepublic void returnedMessage(Message message,int replyCode,String replyText,String exchange,String routingKey){log.info("消息:{}被服务器退回,退回原因:{}, 交换机是:{}, 路由key:{}",new String(message.getBody()),replyText,exchange,routingKey);}} 回调接口 设置当消息无法路由的时候的回调方法@Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {/** * 交换机不管是否收到消息的一个回调方法* CorrelationData * 消息相关数据* * ack * 交换机是否收到消息 **/@Override public void confirm(CorrelationData correlationData,boolean ack,String cause) {String id=correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到id为:{}的消息",id);}else{log.info("交换机还未收到id为:{}消息,由于原因:{}",id,cause);}}//当消息无法路由的时候的回调方法@Overridepublic void returnedMessage(Message message,int replyCode,String replyText,String exchange,String routingKey) {log.error("消息{},被交换机{}退回,退回原因:{},路由key:{}",new String(message.getBody()),exchange,replyText,routingKey);}} 结果分析:未被正常接收的消息被成功退回给生产者
因此生产者接收到退回的消息之后就可以将消息进行重新发送 。
备份交换机 【RabbitMQ-消息回退集群搭建】有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理 。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理 。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错 。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑 。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息 。在RabbitMQ中,有一种备份交换机的机制存在,可以很好的应对这个问题 。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了 。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警 。
配置类的修改 在上面的基础上添加了备份交换机,备份队列和报警队列,将这些交换机和队列按照图示的方式进行申明和绑定 。
@Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";public static final String BACKUP_QUEUE_NAME = "backup.queue";public static final String WARNING_QUEUE_NAME = "warning.queue";// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("key1");}//声明备份Exchange@Bean("backupExchange")public FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);} //声明确认Exchange交换机的备份交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)//设置该交换机的备份交换机.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);return (DirectExchange)exchangeBuilder.build();}// 声明警告队列@Bean("warningQueue")public Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}// 声明报警队列绑定关系@Beanpublic Binding warningBinding(@Qualifier("warningQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}// 声明备份队列@Bean("backQueue")public Queue backQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}// 声明备份队列绑定关系@Beanpublic Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);} } 报警消费者 @Component @Slf4j public class WarningConsumer {public static final String WARNING_QUEUE_NAME = "warning.queue"; @RabbitListener(queues = WARNING_QUEUE_NAME) public void receiveWarningMsg(Message message) {String msg = new String(message.getBody());log.error("报警发现不可路由消息:{}", msg);} } 重新启动项目的时候需要把原来的confirm.exchange删除因为我们修改了其绑定属性,不然报错 。mandatory参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是:备份交换机优先级高 。
因为未被路由的消息是经过被封交换机的报警队列获得的,并没有通过Mandatory回退的形式告诉生产者 。
RabbitMQ其他 幂等性问题 幂等性概念 用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用 。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条 。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等 。
消费者在消费MQ中的消息时,MQ已把消息发送给消费者,消费者在给MQ返回ack时网络中断,故MQ未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息 。
幂等性解决方法 MQ消费者的幂等性的解决一般使用全局ID 或者写个唯一标识比如时间戳 或者UUID 或者订单消费者消费MQ中的消息也可利用MQ的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过 。
消费端的幂等性保障 在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息 。业界主流的幂等性有两种操作:
- a.唯一ID+指纹码机制,利用数据库主键去重 。
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式 。 - b.利用redis的原子性去实现
利用redis执行setnx命令,天然具有幂等性 。从而实现不重复消费
所以订单量大了后采用RabbitMQ进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级 。
队列添加优先级的方式
在Web界面进行添加 点击Maximum priority属性,设置相应的优先级就行 。
在代码的队列中进行优先级添加
Map params=new HashMap();params.put("x-max-priotity",10);//表示可以设置的最大优先级channel.queueDeclare("hello",true,false,false,params); 在发送消息的代码中进行添加 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build(); 注意 消息如果为设置优先级,默认优先级最低 。要让队列实现优先级需要做的事情有如下事情:
- 队列需要设置为优先级队列,
- 消息需要设置消息的优先级,
- 消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序
默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者 。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份 。当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息 。虽然RabbitMQ的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候 。
两种模式 队列具备两种模式:default和lazy 。
- 默认的为default模式,在3.6.0之前的版本无需做任何变更 。
- lazy模式即为惰性队列的模式,可以通过调用channel.queueDeclare方法的时候在参数中设置,也可以通过Policy的方式设置,如果一个队列同时使用这两种方式设置的话,那么Policy的方式具备更高的优先级 。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的 。
- 在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy” 。
- 下面示例中演示了一个惰性队列的声明细节:
Map args = new HashMap();args.put("x-queue-mode", "lazy");channel.queueDeclare("myqueue", false, false, false, args); RabbitMQ集群 最开始我们介绍了如何安装及运行RabbitMQ服务,不过这些是单机版的,无法满足目前真实应用的要求 。如果RabbitMQ服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台RabbitMQ服务器可以满足每秒1000条消息的吞吐量,那么如果应用需要RabbitMQ服务满足每秒10万条消息的吞吐量呢?购买昂贵的服务器来增强单机RabbitMQ务的性能显得捉襟见肘,搭建一个RabbitMQ集群才是解决实际问题的关键.搭建步骤 将三个主机进行相应的配置之后,三个Rabbit MQ就相当于是相通的 。
- 1.首先需要三台服务器 。
- 2.修改3台机器的主机名称 ,在该文件中修改vim /etc/hostname
- 3.一台主机相当于一个node节点,在每个主机上都配置各个节点的 hosts 文件,让各个节点都能互相识别对方,三台主机都能相互识别对方 。
vim /etc/hosts
192.168.6.100 node1192.168.6.100 node2192.168.6.100 node3 - 4.确保各个节点的cookie文件使用的是同一个值
在node1上执行远程操作命令
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie - 5.启动RabbitMQ服务,顺带启动Erlang虚拟机和RbbitMQ应用服务(在三台节点上分别执行以下命令)
rabbitmq-server -detached - 6.在节点2执行
rabbitmqctl stop_app
(rabbitmqctl stop会将Erlang虚拟机关闭,rabbitmqctl stop_app只关闭RabbitMQ服务)
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app(只启动应用服务) - 7.在节点3执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
rabbitmqctl start_app - 8.集群状态
rabbitmqctl cluster_status - 9.需要重新设置用户
创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
rabbitmqctl set_permissions -p “/” admin “.- 春季老年人吃什么养肝?土豆、米饭换着吃
- 三八妇女节节日祝福分享 三八妇女节节日语录
- 老人谨慎!选好你的“第三只脚”
- 校方进行了深刻的反思 青岛一大学生坠亡校方整改校规
- 脸皮厚的人长寿!有这特征的老人最长寿
- 长寿秘诀:记住这10大妙招 100%增寿
- 春季老年人心血管病高发 3条保命要诀
- 眼睛花不花要看四十八 老年人怎样延缓老花眼
- 香槟然能防治老年痴呆症? 一天三杯它人到90不痴呆
- 老人手抖的原因 为什么老人手会抖
