RabbitMQ-基本使用
作者:快盘下载 人气:40目录
⼀、消息队列介绍
1.1 同步调⽤与异步调⽤
1.2 消息队列概念
1.3 常⽤的消息队列产品
⼆、RabbitMQ
2.1 RabbitMQ介绍
2.2 RabbitMQ安装和配置
2.3 RabbitMQ逻辑结构
三、RabbitMQ⽤户管理
3.1 逻辑结构
3.2 ⽤户管理
3.2.1 命令⾏⽤户管理
3.2.2 管理系统进⾏⽤户管理
四、RabbitMQ⼯作⽅式
4.1 简单模式
4.2 ⼯作模式
4.3 订阅模式
4.4 路由模式
五、RabbitMQ交换机和队列管理
5.1 创建队列
5.2 创建交换机
5.3 交换机绑定队列
六、在普通的Maven应⽤中使⽤MQ
6.1简单模式
6.1.1 消息⽣产者
6.1.2 消息消费者
6.2 ⼯作模式
6.2.1 发送者
6.2.2 消费者1
6.2.3 消费者2
6.3 订阅模式
6.3.1 发送者 发送消息到交换机
6.3.2 消费者1
6.3.3 消费者2
6.4 路由模式
6.4.1 发送者 发送消息到交换机
6.4.2 消费者1
6.4.3 消费者2
七、在SpringBoot应⽤中使⽤MQ
7.1 消息⽣产者
7.2 消息消费者
⼋、使⽤RabbitMQ传递对象
8.1 使⽤序列化对象
8.2 使⽤序列化字节数组
8.3 使⽤JSON字符串传递
九、基于Java的交换机与队列创建
9.1 普通Maven项⽬交换机及队列创建
9.2 SpringBoot应⽤中通过配置完成队列的创建
⼗、消息的可靠性
10.1 RabbitMQ事务
10.2 RabbitMQ消息确认和return机制
10.2.1 普通Maven项⽬的消息确认
10.2.2 普通Maven项⽬的return机制
10.3 在SpringBoot应⽤实现消息确认与return监听
10.3.1 配置application.yml,开启消息确认和return监听
10.3.2 创建confirm和return监听
10.4 RabbitMQ消费者⼿动应答
10.5 消息消费的幂等性问题
⼗⼀、延迟机制
11.1 延迟队列
11.2 使⽤延迟队列实现订单⽀付监控
11.2.1 实现流程图
11.2.2 创建交换机和队列
⼗⼆、消息队列作⽤/使⽤场景总结
12.1 解耦
12.2 异步
12.3 消息通信
12.4 流量削峰
12.5⽇志处理
⼀、消息队列介绍
1.1 同步调⽤与异步调⽤
同步调⽤ Feign 客户端可以实现服务间的通信;但是 Feign 是同步调⽤;也就是说 A 服务调⽤ B 服务 之后;会进⼊阻塞 / 等待状态;直到 B 服务返回调⽤结果给 A 服务; A 服务才会继续往后执 ⾏ 在特定的业务场景中;⽤户注册成功之后;发送短息通知⽤户;A服务为⽤户注册; B 服 务发送短信; A 服务在完成⽤户注册之后;代码 1 ;;调⽤ B 服务发送短信; A 服务完成 B 服务调⽤之后⽆需等待 B 服务的执⾏接⼝;直接执⾏提示⽤户注册从公;代码 2 ;;在这 种需求下 A 服务调⽤ B 服务如果使⽤同步调⽤;必然降低 A 服务的执⾏效率;因此在这种 场景下 A 服务需要通过 异步调⽤ 调⽤ B 服务 异步调⽤; 当 A 服务调⽤ B 服务之后;⽆需等待 B 的调⽤结果;可以继续往下执⾏;那么服务间的异 步通信该如何实现呢? 服务之间可以通过消息队列实现异步调⽤- 同步调⽤
- A服务调⽤B服务;需要等待B服务执⾏完毕的返回值;A服务才可以继续往下执⾏
- 同步调⽤可以通过REST和RPC完成
- REST: ribbon、Feign
- RPC: Dubbo
- 异步调⽤
- A服务调⽤B服务;⽽⽆需等待B服务的执⾏结果;也就是说在B服务执⾏的同时A服 务可以继续往下执⾏
- 通过消息队列实现异步调⽤
1.2 消息队列概念
- MQ全称为Message Queue,消息队列;MQ;是⼀种应⽤程序对应⽤程序的通信⽅法。 应⽤程序通过读写出⼊队列的消息;针对应⽤程序的数据;来通信;⽽⽆需专⽤连接来 链接它们。
- 消息传递指的是程序之间通过在消息中发送数据进⾏通信;⽽不是通过直接调⽤彼此来 通信;直接调⽤通常是⽤于诸如远程过程调⽤的技术。
1.3 常⽤的消息队列产品
- RabbitMQ 稳定可靠,数据⼀致,⽀持多协议,有消息确认,基于erlang语⾔
- Kafka ⾼吞吐,⾼性能,快速持久化,⽆消息确认,⽆消息遗漏,可能会有有重复消息,依赖于 zookeeper,成本⾼.
- ActiveMQ 不够灵活轻巧,对队列较多情况⽀持不好.
- RocketMQ 性能好,⾼吞吐,⾼可⽤性,⽀持⼤规模分布式;协议⽀持单⼀
⼆、RabbitMQ
2.1 RabbitMQ介绍
- RabbitMQ是⼀个在AMQP基础上完成的;可复⽤的企业消息系统。他遵循Mozilla Public License开源协议。
- AMQP;即Advanced Message Queuing Protocol, ⼀个提供统⼀消息服务的应⽤层标准 ⾼级消息队列协议,是应⽤层协议的⼀个开放标准,为⾯向消息的中间件设计。基于此协议 的客户端与消息中间件可传递消息;并不受客户端/中间件不同产品;不同的开发语⾔等 条件的限制。Erlang中的实现有 RabbitMQ等。
- 主要特性;
- 保证可靠性 ;使⽤⼀些机制来保证可靠性;如持久化、传输确认、发布确认
- 灵活的路由功能
- 可伸缩性;⽀持消息集群;多台RabbitMQ服务器可以组成⼀个集群
- ⾼可⽤性 ;RabbitMQ集群中的某个节点出现问题时队列仍然可⽤
- ⽀持多种协议
- ⽀持多语⾔客户端
- 提供良好的管理界⾯
- 提供跟踪机制;如果消息出现异常;可以通过跟踪机制分析异常原因
- 提供插件机制;可通过插件进⾏多⽅⾯扩展
2.2 RabbitMQ安装和配置
- 参考安装⽂档;RabbitMQ安装及配置.pdf
2.3 RabbitMQ逻辑结构
RabbitMQ逻辑结构
三、RabbitMQ⽤户管理
RabbitMQ 默认提供了⼀个 guests 账号;但是此账号不能⽤作远程登录;也就是不能在 管理系统的登录;我们可以创建⼀个新的账号并授予响应的管理权限来实现远程登录3.1 逻辑结构
- ⽤户
- 虚拟主机
- 队列
3.2 ⽤户管理
3.2.1 命令⾏⽤户管理
- 在linux中使⽤命令⾏创建⽤户
## 进⼊到rabbit_mq的sbin⽬录 cd /usr/local/rabbitmq_server-3.7.0/sbin ## 新增⽤户 ./rabbitmqctl add_user ytao admin123
- 设置⽤户级别
## ⽤户级别;
## 1.administrator 可以登录控制台、查看所有信息、可以对RabbitMQ进⾏管理 ## 2.monitoring 监控者 登录控制台、查看所有信息 ## 3.policymaker 策略制定者 登录控制台、指定策略 ## 4.managment 普通管理员 登录控制台 ./rabbitmqctl set_user_tags ytao administrator
3.2.2 管理系统进⾏⽤户管理
- 管理系统登录;访问http://47.96.11.185:15672/
1.
新增⽤户
2.
创建虚拟主机
3.
删除⽤户
4.
⽤户绑定虚拟主机
More Actions3.
删除⽤户

四、RabbitMQ⼯作⽅式
RabbitMQ
提供了多种消息的通信⽅式
—
⼯作模式
https://www.rabbitmq.com/getstarted.html
消息通信是由两个⻆⾊完成;消息⽣产者;
producer
;和 消息消费者;
Consumer
;
4.1 简单模式
⼀个队列只有⼀个消费者
⽣产者将消息发送到队列;消费者从队列取出数据
4.2 ⼯作模式
多个消费者监听同⼀个队列
多个消费者监听同⼀个队列;但多个消费者中只有⼀个消费者会成功的消费消息
4.3 订阅模式
⼀个交换机绑定多个消息队列;每个消息队列有⼀个消费者监听
消息⽣产者发送的消息可以被每⼀个消费者接收
4.4 路由模式
⼀个交换机绑定多个消息队列;每个消息队列都由⾃⼰唯⼀的
key
;每个消息队列有⼀个
消费者监听
路由模式
五、RabbitMQ交换机和队列管理
5.1 创建队列

5.2 创建交换机

5.3 交换机绑定队列


六、在普通的Maven应⽤中使⽤MQ
RabbitMQ
队列结构
6.1简单模式
6.1.1 消息⽣产者
- 创建Maven项⽬
- 添加RabbitMQ连接所需要的依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --
><dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.10.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.commons/commonslang3 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.9</version> </dependency>
- 在resources⽬录下创建log4j.properties
log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao = DEBUG
log4j.logger.org.mybatis = DEBUG log4j.appender.A1=org.apache.log4j.ConsoleAppender log4j.appender.A1.layout=org.apache.log4j.PatternLayout log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-[%p] %m%n
- 创建MQ连接帮助类
package com.qfedu.mq.utils;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException { //1.创建连接⼯⼚ ConnectionFactory factory = new ConnectionFactory(); //2.在⼯⼚对象中设置MQ的连接信息 (ip,port,virtualhost,username,password) factory.setHost(;47.96.11.185;); factory.setPort(5672); factory.setVirtualHost(;host1;); factory.setUsername(;ytao;); factory.setPassword(;admin123;); //3.通过⼯⼚对象获取与MQ的链接 Connection connection = factory.newConnection(); return connection; } }
- 消息⽣产者发送消息
package com.qfedu.mq.service;
import com.qfedu.mq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class SendMsg { public static void main(String[] args) throws Exception{ String msg = ;Hello HuangDaoJun!;; Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //定义队列(使⽤Java代码在MQ中新建⼀个队列) //参数1;定义的队列名称 //参数2;队列中的数据是否持久化;如果选择了持久化; //参数3: 是否排外;当前队列是否为当前连接私有; //参数4;⾃动删除;当此队列的连接数为0时;此队列会销毁;⽆论队列中是否 还有数据;; //参数5;设置当前队列的参数 //channel.queueDeclare(;queue7;,false,false,false,null); //参数1;交换机名称;如果直接发送信息到队列;则交换机名称为;; //参数2;⽬标队列名称 //参数3;设置当前这条消息的属性;设置过期时间 10; //参数4;消息的内容 channel.basicPublish(;;,;queue1;,null,msg.getBytes()); System.out.println(;发送;; ; msg); channel.close(); connection.close(); } }
6.1.2 消息消费者
- 创建Maven项⽬
- 添加依赖
- log4j.properties
- ConnetionUtil.java
- 消费者消费消息
package com.qfedu.mq.service;
import com.qfedu.mq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ ;Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println(;接收;;;msg); } }; channel.basicConsume(;queue1;,true,consumer); } }
6.2 ⼯作模式
⼀个发送者多个消费者
6.2.1 发送者
public class SendMsg {
public static void main(String[] args) throws Exception{ System.out.println(;请输⼊消息;;); Scanner scanner = new Scanner(System.in); String msg = null; while(!;quit;.equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicPublish(;;,;queue2;,null,msg.getBytes()); System.out.println(;发送;; ; msg); channel.close(); connection.close(); } } }
6.2.2 消费者1
public class ReceiveMsg {
public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ ;Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println(;Consumer1接收;;;msg); if(;wait;.equals(msg)){ try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; channel.basicConsume(;queue2;,true,consumer); } }
6.2.3 消费者2
public class ReceiveMsg {
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ ;Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println(;Consumer2接收;;;msg); } }; channel.basicConsume(;queue2;,true,consumer); } }
6.3 订阅模式
6.3.1 发送者 发送消息到交换机
public class SendMsg {
public static void main(String[] args) throws Exception{ System.out.println(;请输⼊消息;;); Scanner scanner = new Scanner(System.in); String msg = null; while(!;quit;.equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicPublish(;ex1;,;;,null,msg.getBytes()); System.out.println(;发送;; ; msg); channel.close(); connection.close(); } } }
6.3.2 消费者1
public class ReceiveMsg1 {
public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ ;Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println(;Consumer1接收;;;msg); if(;wait;.equals(msg)){ try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; channel.basicConsume(;queue3;,true,consumer); } }
6.3.3 消费者2
public class ReceiveMsg2 {
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ ;Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println(;Consumer2接收;;;msg); } }; channel.basicConsume(;queue4;,true,consumer); } }
6.4 路由模式
6.4.1 发送者 发送消息到交换机
public class SendMsg {
public static void main(String[] args) throws Exception{ System.out.println(;请输⼊消息;;); Scanner scanner = new Scanner(System.in); String msg = null; while(!;quit;.equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); if(msg.startsWith(;a;)){ channel.basicPublish(;ex2;,;a;,null,msg.getBytes()); }else if(msg.startsWith(;b;)){ channel.basicPublish(;ex2;,;b;,null,msg.getBytes()); } System.out.println(;发送;; ; msg); channel.close(); connection.close(); } } }
6.4.2 消费者1
public class ReceiveMsg1 {
public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ ;Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println(;Consumer1接收;;;msg); if(;wait;.equals(msg)){ try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; channel.basicConsume(;queue5;,true,consumer); } }
6.4.3 消费者2
public class ReceiveMsg2 {
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ ;Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println(;Consumer2接收;;;msg); } }; channel.basicConsume(;queue6;,true,consumer); } }
七、在SpringBoot应⽤中使⽤MQ
SpringBoot
应⽤可以完成⾃动配置及依赖注⼊
——
可以通过
Spring
直接提供与
MQ
的连
接对象
7.1 消息⽣产者
- 创建SpringBoot应⽤;添加依赖

- 配置application.yml
server:
port: 9001 spring: application: name: producer rabbitmq: host: 47.96.11.185 port: 5672 virtual-host: host1 username: ytao password: admin123
- 发送消息
;Service
public class TestService { ;Resource private AmqpTemplate amqpTemplate; 123456 public void sendMsg(String msg){ //1. 发送消息到队列 amqpTemplate.convertAndSend(;queue1;,msg); //2. 发送消息到交换机(订阅交换机) amqpTemplate.convertAndSend(;ex1;,;;,msg); //3. 发送消息到交换机(路由交换机) amqpTemplate.convertAndSend(;ex2;,;a;,msg); } }
7.2 消息消费者
- 创建项⽬添加依赖
- 配置yml
- 接收消息
-
;Service
//;RabbitListener(queues = {;queue1;,;queue2;}) ;RabbitListener(queues = ;queue1;) public class ReceiveMsgService { ;RabbitHandler public void receiveMsg(String msg){ System.out.println(;接收MSG;;;msg); } }
⼋、使⽤RabbitMQ传递对象
RabbitMQ
是消息队列;发送和接收的都是字符串
/
字节数组类型的消息
8.1 使⽤序列化对象
要求;
传递的对象实现序列化接⼝
传递的对象的包名、类名、属性名必须⼀致
- 消息提供者
;Service
public class MQService { ;Resource private AmqpTemplate amqpTemplate; public void sendGoodsToMq(Goods goods){ //消息队列可以发送 字符串、字节数组、序列化对象 amqpTemplate.convertAndSend(;;,;queue1;,goods); } }
- 消息消费者
;Component
;RabbitListener(queues = ;queue1;) public class ReceiveService { ;RabbitHandler public void receiveMsg(Goods goods){ System.out.println(;Goods---;;goods); } }
8.2 使⽤序列化字节数组
要求;
传递的对象实现序列化接⼝
传递的对象的包名、类名、属性名必须⼀致
- 消息提供者
;Service
public class MQService { ;Resource private AmqpTemplate amqpTemplate; public void sendGoodsToMq(Goods goods){ //消息队列可以发送 字符串、字节数组、序列化对象 byte[] bytes = SerializationUtils.serialize(goods); amqpTemplate.convertAndSend(;;,;queue1;,bytes); } }
- 消息消费者
;Component
;RabbitListener(queues = ;queue1;) public class ReceiveService { ;RabbitHandler public void receiveMsg(byte[] bs){ Goods goods = (Goods) SerializationUtils.deserialize(bs); System.out.println(;byte[]---;;goods); } }
8.3 使⽤JSON字符串传递
要求;对象的属性名⼀直
- 消息提供者
;Service
public class MQService { ;Resource private AmqpTemplate amqpTemplate; public void sendGoodsToMq(Goods goods) throws JsonProcessingException { //消息队列可以发送 字符串、字节数组、序列化对象 ObjectMapper objectMapper = new ObjectMapper(); String msg = objectMapper.writeValueAsString(goods); amqpTemplate.convertAndSend(;;,;queue1;,msg); } }
- 消息消费者
;Component
;RabbitListener(queues = ;queue1;) public class ReceiveService { ;RabbitHandler public void receiveMsg(String msg) throws JsonProcessingException { ObjectMapper objectMapper = new ObjectMapper(); Goods goods = objectMapper.readValue(msg,Goods.class); System.out.println(;String---;;msg); } }
九、基于Java的交换机与队列创建
我们使⽤消息队列;消息队列和交换机可以通过管理系统完成创建;也可以在应⽤程序
中通过
Java
代码来完成创建
9.1 普通Maven项⽬交换机及队列创建
- 使⽤Java代码新建队列
//1.定义队列 (使⽤Java代码在MQ中新建⼀个队列)
//参数1;定义的队列名称 //参数2;队列中的数据是否持久化;如果选择了持久化; //参数3: 是否排外;当前队列是否为当前连接私有; //参数4;⾃动删除;当此队列的连接数为0时;此队列会销毁;⽆论队列中是否还有数 据;; //参数5;设置当前队列的参数 channel.queueDeclare(;queue7;,false,false,false,null);
- 新建交换机
//定义⼀个“订阅交换机”
channel.exchangeDeclare(;ex3;, BuiltinExchangeType.FANOUT); //定义⼀个“路由交换机” channel.exchangeDeclare(;ex4;, BuiltinExchangeType.DIRECT);
- 绑定队列到交换机
//绑定队列
//参数1;队列名称 //参数2;⽬标交换机 //参数3;如果绑定订阅交换机参数为;;,如果绑定路由交换机则表示设置队列的key channel.queueBind(;queue7;,;ex4;,;k1;); channel.queueBind(;queue8;,;ex4;,;k2;);
9.2 SpringBoot应⽤中通过配置完成队列的创建
;Configuration
public class RabbitMQConfiguration { //声明队列 ;Bean public Queue queue9(){ Queue queue9 = new Queue(;queue9;); //设置队列属性 return queue9; } ;Bean public Queue queue10(){ Queue queue10 = new Queue(;queue10;); //设置队列属性 return queue10; } //声明订阅模式交换机 ;Bean public FanoutExchange ex5(){ return new FanoutExchange(;ex5;); } //声明路由模式交换机 ;Bean public DirectExchange ex6(){ return new DirectExchange(;ex6;); } //绑定队列 ;Bean public Binding bindingQueue9(Queue queue9, DirectExchange ex6){ return BindingBuilder.bind(queue9).to(ex6).with(;k1;); } ;Bean public Binding bindingQueue10(Queue queue10, DirectExchange ex6){ return BindingBuilder.bind(queue10).to(ex6).with(;k2;); } }
⼗、消息的可靠性
消息的可靠性;从
⽣产者发送消息
——
消息队列存储消息
——
消费者消费消息
的整个过程中
消息的安全性及可控性。
- ⽣产者
- 消息队列
- 消费者

10.1 RabbitMQ事务
RabbitMQ
事务指的是基于客户端实现的事务管理;当在消息发送过程中添加了事务;处
理效率降低⼏⼗倍甚⾄上百倍
Connection connection = RabbitMQUtil.getConnection(); //connection 表
示与 host1的连接 Channel channel = connection.createChannel(); channel.txSelect(); //开启事务 try{ channel.basicPublish(;ex4;, ;k1;, null, msg.getBytes()); channel.txCommit(); //提交事务 }catch (Exception e){ channel.txRollback(); //事务回滚 }finally{ channel.close(); connection.close(); }
10.2 RabbitMQ消息确认和return机制

消息确认机制;确认消息提供者是否成功发送消息到交换机
return
机制;确认消息是否成功的从交换机分发到队列
10.2.1 普通Maven项⽬的消息确认
- 普通confirm⽅式
//1.发送消息之前开启消息确认
channel.confirmSelect(); channel.basicPublish(;ex1;, ;a;, null, msg.getBytes()); //2.接收消息确认 boolean b = channel.waitForConfirms(); System.out.println(;发送;; ;(b?;成功;:;失败;));
- 批量confirm⽅式
//1.发送消息之前开启消息确认
channel.confirmSelect(); //2.批量发送消息 for (int i=0 ; i<10 ; i;;){ channel.basicPublish(;ex1;, ;a;, null, msg.getBytes()); } //3.接收批量消息确认;发送的所有消息中;如果有⼀条是失败的;则所有消息发送直接失败; 抛出IO异常 boolean b = channel.waitForConfirms();
- 异步confirm⽅式
//发送消息之前开启消息确认
channel.confirmSelect(); //批量发送消息 for (int i=0 ; i<10 ; i;;){ channel.basicPublish(;ex1;, ;a;, null, msg.getBytes()); } //假如发送消息需要10s;waitForConfirms会进⼊阻塞状态 //boolean b = channel.waitForConfirms(); //使⽤监听器异步confirm channel.addConfirmListener(new ConfirmListener() { //参数1; long l 返回消息的表示 //参数2; boolean b 是否为批量confirm public void handleAck(long l, boolean b) throws IOException { System.out.println(;~~~~~消息成功发送到交换机;); } public void handleNack(long l, boolean b) throws IOException { System.out.println(;~~~~~消息发送到交换机失败;); } });
10.2.2 普通Maven项⽬的return机制
- 添加return监听器
- 发送消息是指定第三个参数为true
- 由于监听器监听是异步处理;所以在消息发送之后不能关闭channel
String msg = ;Hello HuangDaoJun!;;
Connection connection = ConnectionUtil.getConnection(); //相当于JDBC 操作的数据库连接 Channel channel = connection.createChannel(); //相当于JDBC 操作的statement //return机制;监控交换机是否将消息分发到队列 channel.addReturnListener(new ReturnListener() { public void handleReturn(int i, String s, String s1, String s2,AMQP.BasicProperties basicProperties,byte[] bytes) throws IOException { //如果交换机分发消息到队列失败;则会执⾏此⽅法;⽤来处理交换机分发消息到队 列失败的情况; System.out.println(;*****;;i); //标识 System.out.println(;*****;;s); // System.out.println(;*****;;s1); //交换机名 System.out.println(;*****;;s2); //交换机对应的队列的key System.out.println(;*****;;new String(bytes)); //发送的消息 } }); //发送消息 //channel.basicPublish(;ex2;, ;c;, null, msg.getBytes()); channel.basicPublish(;ex2;, ;c;, true, null, msg.getBytes());
10.3 在SpringBoot应⽤实现消息确认与return监听
10.3.1 配置application.yml,开启消息确认和return监听
spring:
rabbitmq: publisher-confirm-type: simple ## 开启消息确认模式 publisher-returns: true ##使⽤return监听机制
10.3.2 创建confirm和return监听
- 消息确认
;Component
public class MyConfirmListener implements RabbitTemplate.ConfirmCallback { ;Autowired private AmqpTemplate amqpTemplate; ;Autowired private RabbitTemplate rabbitTemplate; ;PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); } ;Override public void confirm(CorrelationData correlationData, boolean b, String s) { //参数b 表示消息确认结果 //参数s 表示发送的消息 if(b){ System.out.println(;消息发送到交换机成功;;); }else{ System.out.println(;消息发送到交换机失败;;); amqpTemplate.convertAndSend(;ex4;,;;,s); } } }
- return机制
;Component
public class MyReturnListener implements RabbitTemplate.ReturnsCallback { ;Autowired private AmqpTemplate amqpTemplate; ;Autowired private RabbitTemplate rabbitTemplate; ;PostConstruct public void init(){ rabbitTemplate.setReturnsCallback(this); } ;Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println(;消息从交换机分发到队列失败;); String exchange = returnedMessage.getExchange(); String routingKey = returnedMessage.getRoutingKey(); String msg = returnedMessage.getMessage().toString(); amqpTemplate.convertAndSend(exchange,routingKey,msg); } }
10.4 RabbitMQ消费者⼿动应答
;Component
;RabbitListener(queues=;queue01;) public class Consumer1 { ;RabbitHandler public void process(String msg,Channel channel, Message message) throws IOException { try { System.out.println(;get msg1 success msg = ;;msg); /** * 确认⼀条消息;<br> * channel.basicAck(deliveryTag, false); <br> * deliveryTag:该消息的index <br> * multiple;是否批量.true:将⼀次性ack所有⼩于deliveryTag的消息 <br> */ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { //消费者处理出了问题;需要告诉队列信息消费失败 /** * 拒绝确认消息:<br> * channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ; <br> * deliveryTag:该消息的index<br> * multiple;是否批量.true:将⼀次性拒绝所有⼩于deliveryTag的消息。<br> * requeue;被拒绝的是否重新⼊队列 <br> */ channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); System.err.println(;get msg1 failed msg = ;;msg); } } }
10.5 消息消费的幂等性问题
消息消费的幂等性
——
多次消费的执⾏结果时相同的 ;避免重复消费;
解决⽅案;处理成功的消息
setnx
到
redis
##
⼗⼀、延迟机制
11.1 延迟队列
- 延迟队列——消息进⼊到队列之后;延迟指定的时间才能被消费者消费
- AMQP协议和RabbitMQ队列本身是不⽀持延迟队列功能的;但是可以通过TTL;Time To Live;特性模拟延迟队列的功能
- TTL就是消息的存活时间。RabbitMQ可以分别对队列和消息设置存活时间
在创建队列的时候可以设置队列的存活时间;当消息进⼊到队列并且在存活时间内没 有消费者消费;则此消息就会从当前队列被移除;
创建消息队列没有设置
TTL
;但是消息设置了
TTL
;那么当消息的存活时间结束;也
会被移除;
当
TTL
结束之后;我们可以指定将当前队列的消息转存到其他指定的队列
11.2 使⽤延迟队列实现订单⽀付监控
11.2.1 实现流程图

11.2.2 创建交换机和队列
1.
创建路由交换机
2.
创建消息队列
3.
创建死信队列
4.
队列绑定
⼗⼆、消息队列作⽤/使⽤场景总结
12.1 解耦
场景说明;⽤户下单之后;订单系统要通知库存系统
传统⽅式;订单系统直接调⽤库存系统提供的接⼝;如果库存系统出现故障会导致订单系
统失败
使⽤消息队列;
12.2 异步
场景说明;⽤户注册成功之后;需要发送注册邮件及注册短信提醒
传统⽅式;
使⽤消息队列;

12.3 消息通信
场景说明;应⽤系统之间的通信;例如聊天室
聊天室
12.4 流量削峰
场景说明;秒杀业务
⼤量的请求不会主动请求秒杀业务;⽽是存放在消息队列
(
缓存
)
12.5⽇志处理
场景说明;系统中⼤量的⽇志处理
⽇志搜集处理
加载全部内容