阿里云rocketmq费用 阿里云RocketMQ定时延迟消息队列实现

新的阅读体验:http://www.zhouhong.icu/archives/a-li-yun-rocketmq-ding-shi--yan-chi-xiao-xi-dui-lie-shi-xian
一、业务需求需要实现一个提前二十分钟通知用户去做某件事的一个业务,拿到这个业务首先想到的最简单得方法就是使用Redis监控Key值:在排计划时候计算当前时间与提前二十分钟这个时间差,然后使用一个唯一的业务Key压入Redis中并设定好过期时间,然后只需要让Redis监控这个Key值即可,当这个Key过期后就可以直接拿到这个Key的值然后实现发消息等业务 。
关于Redis实现该业务的具体实现在之前我已经记过一篇笔记,有兴趣的可以直接去瞅瞅,但是现在感觉有好多不足之处 。
       Redis实现定时: http://www.zhouhong.icu/post/144
二、Redis实现定时推送等功能的不足之处由于Redis不止你一个使用,其他业务也会使用Redis,那么最容易想到的一个缺点就是:1、如果在提醒的那一刻有大量的其他业务的Key也过期了,那么就会很长时间都轮不到你的这个Key,就会出现消息推送延迟等缺点;2、还有一个缺点就是像阿里云他们的Redis根本就不支持对 Redis 的 Key值得监控(我也是因为公司使用阿里云的Redis没法对Key监控才从之前使用Redis监控转移到使用RocketMQ的延时消息推送的 。。。)
三、阿里云RocketMQ定时/延迟消息队列实现其实在实现上非常简单
1、首先去阿里云控制台创建所需消息队列资源,包括消息队列 RocketMQ 的实例、Topic、Group ID (GID),以及鉴权需要的 AccessKey(AK),一般公司都有现成的可以直接使用 。2、在springboot项目pom.xml添加需要的依赖 。<!--阿里云MQ TCP--><dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.8.7.1.Final</version></dependency>3、在对应环境的application.properties文件配置参数console:rocketmq:tcp:accessKey: XXXXXXXX使用自己的secretKey: XXXXXXXXXXXXX使用自己的nameSrvAddr: XXXXXXXXXXXXXXXX使用自己的topic: XXXXXXX使用自己的groupId: XXXXXXX使用自己的tag: XXXXXXXXX使用自己的4、封装MQ配置类import com.aliyun.openservices.ons.api.PropertyKeyConst;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import java.util.Properties;/*** @Description: MQ配置类* @Author: zhouhong* @Date: 2021/8/4*/@Configuration@EnableConfigurationProperties({PatrolMqConfig.class})@ConfigurationProperties(prefix = "console.rocketmq.tcp")@Primarypublic class PatrolMqConfig {private String accessKey;private String secretKey;private String nameSrvAddr;private String topic;private String groupId;private String tag;private String orderTopic;private String orderGroupId;private String orderTag;public Properties getMqPropertie() {Properties properties = new Properties();properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);return properties;}public String getAccessKey() {return accessKey;}public void setAccessKey(String accessKey) {this.accessKey = accessKey;}public String getSecretKey() {return secretKey;}public void setSecretKey(String secretKey) {this.secretKey = secretKey;}public String getNameSrvAddr() {return nameSrvAddr;}public void setNameSrvAddr(String nameSrvAddr) {this.nameSrvAddr = nameSrvAddr;}public String getTopic() {return topic;}public void setTopic(String topic) {this.topic = topic;}public String getGroupId() {return groupId;}public void setGroupId(String groupId) {this.groupId = groupId;}public String getTag() {return tag;}public void setTag(String tag) {this.tag = tag;}public String getOrderTopic() {return orderTopic;}public void setOrderTopic(String orderTopic) {this.orderTopic = orderTopic;}public String getOrderGroupId() {return orderGroupId;}public void setOrderGroupId(String orderGroupId) {this.orderGroupId = orderGroupId;}public String getOrderTag() {return orderTag;}public void setOrderTag(String orderTag) {this.orderTag = orderTag;}}5、配置生产者import com.aliyun.openservices.ons.api.bean.ProducerBean;import com.honyar.iot.ibs.smartpatrol.modular.mq.tcp.config.PatrolMqConfig;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class PatrolProducerClient {@Autowiredprivate PatrolMqConfig mqConfig;@Bean(name = "ConsoleProducer", initMethod = "start", destroyMethod = "shutdown")public ProducerBean buildProducer() {ProducerBean producer = new ProducerBean();producer.setProperties(mqConfig.getMqPropertie());return producer;}}6、消费者订阅import com.aliyun.openservices.ons.api.MessageListener;import com.aliyun.openservices.ons.api.PropertyKeyConst;import com.aliyun.openservices.ons.api.bean.ConsumerBean;import com.aliyun.openservices.ons.api.bean.Subscription;import com.honyar.iot.ibs.smartpatrol.modular.mq.tcp.config.PatrolMqConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;import java.util.Properties;//项目中加上 @Configuration 注解,这样服务启动时consumer也启动了@Configuration@Slf4jpublic class PatrolConsumerClient {@Autowiredprivate PatrolMqConfig mqConfig;@Autowiredprivate MqTimeMessageListener messageListener;@Bean(initMethod = "start", destroyMethod = "shutdown")public ConsumerBean buildConsumer() {ConsumerBean consumerBean = new ConsumerBean();//配置文件Properties properties = mqConfig.getMqPropertie();properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());//将消费者线程数固定为20个 20为默认值properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");consumerBean.setProperties(properties);//订阅关系Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();Subscription subscription = new Subscription();subscription.setTopic(mqConfig.getTopic());subscription.set);subscriptionTable.put(subscription, messageListener);//订阅多个topic如上面设置consumerBean.setSubscriptionTable(subscriptionTable);System.err.println("订阅成功!");return consumerBean;}}7、定时延时MQ消息监听消费/** * @Description: 定时/延时MQ消息监听消费 * @Author: zhouhong * @Create: 2021-08-03 09:16 **/@Componentpublic class MqTimeMessageListener implements MessageListener {private Logger logger = LoggerFactory.getLogger(this.getClass());@Overridepublic Action consume(Message message, ConsumeContext context) {System.err.println("收到消息啦!!");logger.info("接收到MQ消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",message.getTopic(),message.getTag(),message.getMsgID(),message.getKey(),new String(message.getBody()));try {String msgTag = message.getTag(); // 消息类型String msgKey = message.getKey(); // 业务唯一idswitch (msgTag) {case "XXXX":// TODO 具体业务实现,比如发消息等操作System.err.println("推送成功!!!!");break;}return Action.CommitMessage;} catch (Exception e) {logger.error("消费MQ消息失败! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());//消费失败,告知服务器稍后再投递这条消息,继续消费其他消息return Action.ReconsumeLater;}}}8、封装一个发延时/定时消息的工具类/** * @Description: MQ发送消息助手 * @Author: zhouhong * @Create: 2021-08-03 09:06 **/@Componentpublic class ProducerUtil {private Logger logger = LoggerFactory.getLogger(ProducerUtil.class);@Autowiredprivate PatrolMqConfig config;@Resource(name = "ConsoleProducer")ProducerBean producerBean;public SendResult sendTimeMsg(String msgTag,byte[] messageBody,String msgKey,long delayTime) {Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody);msg.setStartDeliverTime(delayTime);return this.send(msg,Boolean.FALSE);}/*** 普通消息发送发放* @param msg 消息* @param isOneWay 是否单向发送*/private SendResult send(Message msg,Boolean isOneWay) {try {if(isOneWay) {//由于在 oneway 方式发送消息时没有请求应答处理,一旦出现消息发送失败,则会因为没有重试而导致数据丢失 。//若数据不可丢,建议选用同步或异步发送方式 。producerBean.sendOneway(msg);success(msg, "单向消息MsgId不返回");return null;}else {//可靠同步发送SendResult sendResult = producerBean.send(msg);//获取发送结果,不抛异常即发送成功if (sendResult != null) {success(msg, sendResult.getMessageId());return sendResult;}else {error(msg,null);return null;}}} catch (Exception e) {error(msg,e);return null;}}private ExecutorService threads = Executors.newFixedThreadPool(3);private void error(Message msg,Exception e) {logger.error("发送MQ消息失败-- Topic:{}, Key:{}, tag:{}, body:{}",msg.getTopic(),msg.getKey(),msg.getTag(),new String(msg.getBody()));logger.error("errorMsg --- {}",e.getMessage());}private void success(Message msg,String messageId) {logger.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}",msg.getTopic(),messageId,msg.getKey(),msg.getTag(),new String(msg.getBody()));}}9、接口测试(10000表示延迟10秒,可以根据自己的业务计算出)// 测试MQ延时@AutowiredProducerUtil producerUtil;@PostMapping("/patrolTaskTemp/mqtest")public void mqTime(){producerUtil.sendTimeMsg("SMARTPATROL","你好鸭!!!".getBytes(),"红红火火恍恍惚惚!!",System.currentTimeMillis() + 10000);}10、结果2021-08-04 22:07:12.677INFO 17548 --- [nio-8498-exec-2] c.h.i.i.s.m.common.util.ProducerUtil: 发送MQ消息成功 -- Topic:TID_COMMON ,msgId:C0A80168448C2F0E140B14322CB30000 , Key:红红火火恍恍惚惚!!, tag:SMARTPATROL, body:你好鸭!!!收到消息啦!!推送成功!!!!2021-08-04 22:07:22.179INFO 17548 --- [MessageThread_1] c.h.i.i.s.m.m.t.n.MqTimeMessageListener: 接收到MQ消息 -- Topic:TID_COMMON, tag:SMARTPATROL,msgId:0b17f2e71ebd1b054c2c156f6d1d1655 , Key:红红火火恍恍惚惚!!, body:你好鸭!!!【阿里云rocketmq费用 阿里云RocketMQ定时延迟消息队列实现】

阿里云rocketmq费用 阿里云RocketMQ定时延迟消息队列实现

文章插图
输了不可怕,大不了从头再来,我们还年轻---周红