RabbitMq-笔记

RabbitMq-笔记 1. hello word
封装公共方法
public class RabbitMqCommons {private staticConnectionFactory connectionFactory;static{//创建连接工厂对象connectionFactory = new ConnectionFactory();//设置连接主机connectionFactory.setHost("127.0.0.1");//设置端口connectionFactory.setPort(5672);//设置连接虚拟机connectionFactory.setVirtualHost("/ems");//设置访问虚拟主机用户名和密码connectionFactory.setUsername("ems");connectionFactory.setPassword("123");}public Connection getConnectInfo() throws IOException, TimeoutException {//设置连接对象Connection connection = connectionFactory.newConnection();returnconnection;}} 1.1 消息提供者 /*测试直连 -消息提供者*/@Testvoid privated() throws IOException, TimeoutException {RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();Connection connection = rabbitMqCommons.getConnectInfo();//获取连接中通道Channel channel = connection.createChannel();//通道绑定对应消息队列/*参数1:队列名称 无则自动创建参数2:定义队列是否持久化(退出是否还在)参数3: 是否独占队列(只有当前对象可以访问)参数4: 是否消费完后自动删除队列参数5: 额外附加参数* */channel.queueDeclare("hello",true,false,false,null);//发布消息/** 参数1:交换机名称* 参数2:队列名称* 参数3: 传递雄消息额外设置 .消息持久化* 参数4: 消息的具体内容* *//*第一个参数没有交换机不要空格*/channel.basicPublish("","hello",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());channel.close();connection.close();} 1.2 消费者 /*消费者*/@Testvoid consumer() throws IOException, TimeoutException {RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();Connection connection = rabbitMqCommons.getConnectInfo();//获取连接中通道Channel channel = connection.createChannel();//通道绑定对应消息队列/*参数1:队列名称 无则自动创建参数2:定义队列是否持久化(退出是否还在)参数3: 是否独占队列(只有当前对象可以访问)参数4: 是否消费完后自动删除队列参数5: 额外附加参数* */channel.queueDeclare("hello",true,false,false,null);//消费消息第二个参数:自动确认收到消息channel.basicConsume("hello",true,new DefaultConsumer(channel){@Override // body:xiaoxipublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("执行顺1");System.out.println("String.valueOf(body) = " + new String(body));super.handleDelivery(consumerTag, envelope, properties, body);}});//一直打开 一直监听/*channel.close();connection.close();*/System.out.println("2");//执行结果2 执行顺1先执行主线程,后执行回调函数} 2.work
消费者交替消费消息 ->轮询
2.1 消息提供者 /*测试workqueue -消息提供者默认轮询算法*/@Testvoid privated() throws IOException, TimeoutException {RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();Connection connection = rabbitMqCommons.getConnectInfo();//获取连接中通道Channel channel = connection.createChannel();//通道绑定对应消息队列/*参数1:队列名称 无则自动创建参数2:定义队列是否持久化(退出是否还在)参数3: 是否独占队列(只有当前对象可以访问)参数4: 是否消费完后自动删除队列参数5: 额外附加参数* */channel.queueDeclare("hello-work",true,false,false,null);//发布消息/** 参数1:交换机名称* 参数2:队列名称* 参数3: 传递雄消息额外设置 .消息持久化* 参数4: 消息的具体内容* *//*第一个参数没有交换机不要空格*/for (int i = 0; i <20 ; i++) {channel.basicPublish("","hello-work", MessageProperties.PERSISTENT_TEXT_PLAIN,("hello rabbitmq"+i).getBytes());}channel.close();connection.close();} 2.2 消费者 2.2.1 消费者1 /*消费者1*/@Testvoid consumer1() throws IOException, TimeoutException {RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();Connection connection = rabbitMqCommons.getConnectInfo();//获取连接中通道Channel channel = connection.createChannel();//通道绑定对应消息队列/*参数1:队列名称 无则自动创建参数2:定义队列是否持久化(退出是否还在)参数3: 是否独占队列(只有当前对象可以访问)参数4: 是否消费完后自动删除队列参数5: 额外附加参数* */channel.queueDeclare("hello-work",true,false,false,null);//消费消息channel.basicConsume("hello-work",true,new DefaultConsumer(channel){@Override // body:xiaoxipublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("String.valueOf(body) = " + new String(body));super.handleDelivery(consumerTag, envelope, properties, body);}});//一直打开 一直监听/*channel.close();connection.close();*/} 2.2.2 消费者2 /*消费者2*/@Testvoid consumer2() throws IOException, TimeoutException {RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();Connection connection = rabbitMqCommons.getConnectInfo();//获取连接中通道Channel channel = connection.createChannel();//通道绑定对应消息队列/*参数1:队列名称 无则自动创建参数2:定义队列是否持久化(退出是否还在)参数3: 是否独占队列(只有当前对象可以访问)参数4: 是否消费完后自动删除队列参数5: 额外附加参数* */channel.queueDeclare("hello-work",true,false,false,null);//消费消息channel.basicConsume("hello-work",true,new DefaultConsumer(channel){@Override // body:xiaoxipublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("String.valueOf(body) = " + new String(body));super.handleDelivery(consumerTag, envelope, properties, body);}});//一直打开 一直监听/*channel.close();connection.close();*/} 2.3修改轮询算法 *修改轮询算法 basicConsume第二个参数设为false 通道一次只能消费一个消息 。轮询是把消息全部放入通道慢慢消费
2.3.1 消费者1 public class Consumer1 {//默认轮询public static void main(String[] args) throws IOException, TimeoutException {RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();Connection connection = rabbitMqCommons.getConnectInfo();//获取连接中通道Channel channel = connection.createChannel();//通道绑定对应消息队列/*参数1:队列名称 无则自动创建参数2:定义队列是否持久化(退出是否还在)参数3: 是否独占队列(只有当前对象可以访问)参数4: 是否消费完后自动删除队列参数5: 额外附加参数* *//*修改轮询算法basicConsume第二个参数设为false通道一次只能消费一个消息 。轮询是把消息全部* 放入通道慢慢消费*/channel.basicQos(1);//每次消费一个channel.queueDeclare("hello-work",true,false,false,null);//消费消息channel.basicConsume("hello-work",false,new DefaultConsumer(channel){@Override // body:消息 第二个参数:自动确认收到消息public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("String.valueOf(body) = " + new String(body));super.handleDelivery(consumerTag, envelope, properties, body);//手动确认 参数1 手动确认消息 确认当前 envelope消息 参数2false 每次确认一个 true 多个channel.basicAck(envelope.getDeliveryTag(),false);}});//一直打开 一直监听/*channel.close();connection.close();*/}} 2.3.2 消费者2 public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();Connection connection = rabbitMqCommons.getConnectInfo();//获取连接中通道Channel channel = connection.createChannel();//通道绑定对应消息队列/*参数1:队列名称 无则自动创建参数2:定义队列是否持久化(退出是否还在)参数3: 是否独占队列(只有当前对象可以访问)参数4: 是否消费完后自动删除队列参数5: 额外附加参数* *//*修改轮询算法basicConsume第二个参数设为false通道一次只能消费一个消息 。轮询是把消息全部* 放入通道慢慢消费*/channel.basicQos(1);//每次消费一个channel.queueDeclare("hello-work",true,false,false,null);//消费消息 第二个参数:自动确认收到消息channel.basicConsume("hello-work",false,new DefaultConsumer(channel){@Override // body:xiaoxipublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("String.valueOf(body) = " + new String(body));super.handleDelivery(consumerTag, envelope, properties, body);//手动确认 参数1 手动确认消息 参数2 false 每次确认一个channel.basicAck(envelope.getDeliveryTag(),false);}});//一直打开 一直监听/*channel.close();connection.close();*/}} 3.fanout
扇出
3.1 消息提供者 public class Privider {public static void main(String[] args) throws IOException, TimeoutException {RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();Connection connection = rabbitMqCommons.getConnectInfo();//获取连接中通道Channel channel = connection.createChannel();//将通道声明指定交换机//参数1 交换机名称 参数2 交换机类型 fanout 广播类型channel.exchangeDeclare("logs","fanout");//发送消息channel.basicPublish("logs","",null,"fanout type message".getBytes());channel.close();connection.close();}} 3.2 消费者 3.2.1 消费者1 public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();Connection connectInfo = rabbitMqCommons.getConnectInfo();Channel channel = connectInfo.createChannel();//将通道声明指定交换机channel.exchangeDeclare("logs","fanout");//临时队列String queue = channel.queueDeclare().getQueue();channel.queueBind(queue,"logs","");//消费消息channel.basicConsume(queue,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1 = " + new String(body));super.handleDelivery(consumerTag, envelope, properties, body);}});}} 3.2.2 消费者2 public class Comsumer2 {public static void main(String[] args) throws IOException, TimeoutException {RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();Connection connectInfo = rabbitMqCommons.getConnectInfo();Channel channel = connectInfo.createChannel();//将通道声明指定交换机channel.exchangeDeclare("logs","fanout");//临时队列String queue = channel.queueDeclare().getQueue();channel.queueBind(queue,"logs","");//消费消息channel.basicConsume(queue,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2 = " + new String(body));super.handleDelivery(consumerTag, envelope, properties, body);}});}} 4.routing 可以指定哪个消费者消费

4.1 消息提供者 public class Privider {public static void main(String[] args) throws IOException, TimeoutException {RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();Connection connection = rabbitMqCommons.getConnectInfo();//获取连接中通道Channel channel = connection.createChannel();//将通道声明指定交换机//参数1 交换机名称 参数2 交换机类型 fanout 广播类型channel.exchangeDeclare("logs-direct","direct");//发送消息 参数2 routeKeychannel.basicPublish("logs-direct","info",null,"fanout type message".getBytes());channel.close();connection.close();}} 4.2 消费者 4.2.1 消费者1 public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();Connection connectInfo = rabbitMqCommons.getConnectInfo();Channel channel = connectInfo.createChannel();//将通道声明指定交换机channel.exchangeDeclare("logs-direct","direct");//临时队列String queue = channel.queueDeclare().getQueue();channel.queueBind(queue,"logs-direct","error");channel.queueBind(queue,"logs-direct","dev");//消费消息channel.basicConsume(queue,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1 = " + new String(body));super.handleDelivery(consumerTag, envelope, properties, body);}});}} 4.2.2 消费者2 public class Comsumer2 {public static void main(String[] args) throws IOException, TimeoutException {RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();Connection connectInfo = rabbitMqCommons.getConnectInfo();Channel channel = connectInfo.createChannel();//将通道声明指定交换机channel.exchangeDeclare("logs-direct","direct");//临时队列String queue = channel.queueDeclare().getQueue();channel.queueBind(queue,"logs-direct","info");//消费消息channel.basicConsume(queue,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2 = " + new String(body));super.handleDelivery(consumerTag, envelope, properties, body);}});}} 5.topic 通过 * # 动态指定消费者消费

5.1 消息提供者 public class Privider {public static void main(String[] args) throws IOException, TimeoutException {RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();Connection connection = rabbitMqCommons.getConnectInfo();//获取连接中通道Channel channel = connection.createChannel();//将通道声明指定交换机//参数1 交换机名称 参数2 交换机类型 fanout 广播类型channel.exchangeDeclare("logs-topic","topic");//发送消息 参数2 routeKeychannel.basicPublish("logs-topic","user.add",null,"fanout type message".getBytes());channel.basicPublish("logs-topic","user.add.info",null,"fanout type message".getBytes());channel.close();connection.close();}} 5.2 消费者 5.2.1 消费者1 public class Consumer1 {/***# 匹配多个 * 匹配单个* */public static void main(String[] args) throws IOException, TimeoutException {RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();Connection connectInfo = rabbitMqCommons.getConnectInfo();Channel channel = connectInfo.createChannel();//将通道声明指定交换机channel.exchangeDeclare("logs-topic","topic");//临时队列String queue = channel.queueDeclare().getQueue();channel.queueBind(queue,"logs-topic","user.*");/*channel.queueBind(queue,"logs-topic","user.#");*///消费消息channel.basicConsume(queue,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1 = " + new String(body));super.handleDelivery(consumerTag, envelope, properties, body);}});}} 5.2.2消费者2 public class Comsumer2 {public static void main(String[] args) throws IOException, TimeoutException {RabbitMqCommons rabbitMqCommons = new RabbitMqCommons();Connection connectInfo = rabbitMqCommons.getConnectInfo();Channel channel = connectInfo.createChannel();//将通道声明指定交换机channel.exchangeDeclare("logs-topic","topic");//临时队列String queue = channel.queueDeclare().getQueue();channel.queueBind(queue,"logs-topic","user.*");channel.queueBind(queue,"logs-topic","user.#");//消费消息channel.basicConsume(queue,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2 = " + new String(body));super.handleDelivery(consumerTag, envelope, properties, body);}});}} 6.整合springboot 6.1 消息提供者 @SpringBootTestclass RabbitmqSpringbootApplicationTests {@AutowiredRabbitTemplate rabbitTemplate;/*hello*/@Testvoid test1() {rabbitTemplate.convertAndSend("spring-hello","hello");}/*work*/@Testvoid work() {for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("spring-work","hello-work"+i);}}/*fanout*/@Testvoid fanout() {rabbitTemplate.convertAndSend("spring-fanout","","hello-fanout");}/*routekey*/@Testvoid routekey() {rabbitTemplate.convertAndSend("spring-route","info","hello-route");}/*topic*/@Testvoid topic() {rabbitTemplate.convertAndSend("spring-topic","info.add","hello-route");}} 6.2 消费者 6.2.1 helloword @Component@RabbitListener(queuesToDeclare = @Queue(value = "https://tazarkount.com/read/spring-hello",declare = "false"))public class Consumer {@RabbitHandlerpublicvoid receivel(String message){System.out.println("message = " + message);}} 6.2.2 work @Componentpublic class ConsumerWork {@RabbitListener(queuesToDeclare = @Queue(value = "https://tazarkount.com/read/spring-work"))public void receivel(String message) {System.out.println("message1 = " + message);}@RabbitListener(queuesToDeclare = @Queue(value = "https://tazarkount.com/read/spring-work"))public void receivel1(String message) {System.out.println("message2 = " + message);}} 6.2.3 fanout @Componentpublic class ConsumerFanout {@RabbitListener(bindings = {@QueueBinding(value = https://tazarkount.com/read/@Queue, //创建临时对列exchange = @Exchange(value ="spring-fanout",type = ExchangeTypes.FANOUT))})publicvoid receivel(String message){System.out.println("message1 = " + message);}@RabbitListener(bindings = {@QueueBinding(value = https://tazarkount.com/read/@Queue, //创建临时对列exchange = @Exchange(value ="spring-fanout",type = ExchangeTypes.FANOUT))})publicvoid receivel1(String message){System.out.println("message2 = " + message);}} 6.2.4 routing @Componentpublic class ConsumerRoute {@RabbitListener(bindings = {@QueueBinding(value = https://tazarkount.com/read/@Queue,//创建临时队列exchange = @Exchange(value ="spring-route",type = "direct"),//定义交换机名称和类型key={"info","error","warn"})})public void recivel(String message){System.out.println("message1 = " + message);}@RabbitListener(bindings = {@QueueBinding(value = https://tazarkount.com/read/@Queue,//创建临时队列exchange = @Exchange(value ="spring-route",type = "direct"),//定义交换机名称和类型key={"error","warn"})})public void recivel1(String message){System.out.println("message2 = " + message);}} 6.2.5 topic 【RabbitMq-笔记】@Componentpublic class ConsumerTopic {@RabbitListener(bindings = {@QueueBinding(value = https://tazarkount.com/read/@Queue,//创建临时队列exchange = @Exchange(value ="spring-topic",type = "topic"),//定义交换机名称和类型key={"info.*"})})public void recivel(String message){System.out.println("message1 = " + message);}@RabbitListener(bindings = {@QueueBinding(value = https://tazarkount.com/read/@Queue,//创建临时队列exchange = @Exchange(value ="spring-topic",type = "topic"),//定义交换机名称和类型key={"error.*"})})public void recivel1(String message){System.out.println("message2 = " + message);}} 7.rabbitmq集群