资讯

展开

NetCore RabbitMQ 的消息确认机制

作者:快盘下载 人气:

十年河东,十年河西,莫欺少年穷

学无止境,精益求精

上一节介绍了RabbitMQ定向模式,本篇介绍Rabbitmq 的消息确认机制

我的系列博客:

NetCore RabbitMQ Topics 通配符模式

NetCore RabbitMQ ,Routing定向模式

NetCore RabbitMQ 发布订阅模式,消息广播

RabbitMQ的六种工作模式

NetCore RabbitMQ 简介及兔子生产者、消费者 【简单模式,work工作模式,竞争消费】

windows环境下,RabbitMQ 安装教程

kafka、Rabbitmq、EasyNetQ NetCore 源码下载

在一些场合,如支付时每一条消息都必须保证成功的被处理。

AMQP是金融级的消息队列协议,有很高的可靠性,这里介绍在使用RabbitMQ时怎么保证消息被成功处理的。

消息确认可以分为两种:

一种是生产者发送消息到Broke时,Broker给生产者发送确认回执,用于告诉生产者消息已被成功发送到Broker;

一种是消费者接收到Broker发送的消息时,消费者给Broker发送确认回执,用于通知消息已成功被消费者接收。

生产者端消息确认机制

生产者端消息确认机制分为两种,一种是基于事务机制,另一种是基于Confrim确认机制。事务机制占用资源较多,会拉低生产效率,因此,事务模式在市场上用的比较少。

事务机制【资源占用高,效率低】

事务机制类似于数据库事务,要先开启事务,发完消息后,提交事务,发生异常时回滚事务。具体展现在C#代码如下:

channel.TxSelect(); //开启事务模式

channel.TxCommit();//提交事务

channel.TxRollback();//异常时,回滚事务

使用事务机制,我们首先要通过txSelect方法开启事务,然后发布消息给broker服务器了,如果txCommit提交成功了,则说明消息成功被broker接收了;

如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们可以捕获异常,通过txRollback回滚事务。看一个事务机制的简单实现:

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMqProducer
{
class Program
{
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; //主机名
factory.UserName = "guest";//使用的用户
factory.Password = "guest";//用户密码
factory.Port = 5672;//端口号
factory.VirtualHost = "/"; //虚拟主机
factory.MaxMessageSize = 1024; //消息最大字节数
using (var connection = factory.CreateConnection())
{
//rabbitMQ 基于信道进行通信,因此,我们需要实例化信道Channel
using (var channel = connection.CreateModel())
{
string Ename = "MyExChange";
channel.ExchangeDeclare(Ename, ExchangeType.Direct, false, false, null);
//声明广播的队列
string QnameName = "MyQueue";
channel.QueueDeclare(QnameName, false, false, false, null);

string routingKey = "MyroutingKey"; //
//
channel.QueueBind(QnameName, Ename, routingKey);

var messages = "Hello,RabbitMQ的事务方式"; //
try
{
channel.TxSelect(); //开启事务模式
//发送消息
for (int i = 0; i < 10; i++)
{
channel.BasicPublish(Ename, routingKey, null, Encoding.UTF8.GetBytes(messages + "_" + i)); //
}
channel.TxCommit();//提交事务
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
channel.TxRollback();//异常时,回滚事务
}

}
}
Console.Read();
}
}
}

上述代码执行后

NetCore  RabbitMQ 的消息确认机制

如果我们将上述代码:channel.BasicPublish(Ename, routingKey, null, Encoding.UTF8.GetBytes(messages + "_" + i)); // 中的routingKey修改为空字符串,如下:

channel.BasicPublish(Ename, "", null, Encoding.UTF8.GetBytes(messages + "_" + i)); //

再次运行发现,消息并不能发送到队列中,程序也不会报异常。也就是说,事务提交了,但消息并没发出去。

因此:虽说执行了事务提交,程序也没报异常,但消息不一定会发出去。

Confirm确认机制【推荐模式】

C#的RabbitMQ API中,有三个与Confirm相关的方法:ConfirmSelect(),WaitForConfirms(),WaitForConfirmOrDie()

channel.ConfirmSelect() 表示开启Confirm模式;

channel.WaitForConfirms() 等待所有消息确认,如果所有的消息都被服务端成功接收返回true,只要有一条没有被成功接收就返回false。

channel.WaitForConfirmsOrDie() 和WaitForConfirms作用类似,也是等待所有消息确认,区别在于该方法没有返回值(Void),如果有任意一条消息没有被成功接收,该方法会立即抛出一个OperationInterrupedException类型异常。

看一个Confirm模式的简单实现:

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMqProducer
{
class Program
{
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; //主机名
factory.UserName = "guest";//使用的用户
factory.Password = "guest";//用户密码
factory.Port = 5672;//端口号
factory.VirtualHost = "/"; //虚拟主机
factory.MaxMessageSize = 1024; //消息最大字节数
using (var connection = factory.CreateConnection())
{
//rabbitMQ 基于信道进行通信,因此,我们需要实例化信道Channel
using (var channel = connection.CreateModel())
{
string Ename = "MyExChange";
channel.ExchangeDeclare(Ename, ExchangeType.Direct, false, false, null);
//声明广播的队列
string QnameName = "MyQueue";
channel.QueueDeclare(QnameName, false, false, false, null);

string routingKey = "MyroutingKey"; //
//
channel.QueueBind(QnameName, Ename, routingKey);

var messages = "Hello,RabbitMQ的事务方式"; //

channel.ConfirmSelect(); // 启用服务器确认机制方式
//
for (int i = 0; i < 10; i++)
{
channel.BasicPublish(Ename, routingKey, null, Encoding.UTF8.GetBytes(messages + "_" + i)); //发送消息
}
if (channel.WaitForConfirms())
{
Console.WriteLine("消息发送成功");
}
else
{
//重发 或者 写具体的处理逻辑
Console.WriteLine("消息发送失败");
}
}
}
Console.Read();
}
}
}

这里需要说明的是,WaitForConfirms是指等待所有消息确认,如果你在调试过程中,将发送消息刻意循环三次,在执行WaitForConfirms时,返回值依旧是True,因此三次发送均成功了。

我在网上看到还有一种写法是这样的,如下:

for (int i = 0; i < 10; i++)
{
channel.ConfirmSelect(); // 启用服务器确认机制方式
channel.BasicPublish(Ename, routingKey, null, Encoding.UTF8.GetBytes(messages + "_" + i)); //发送消息
if (channel.WaitForConfirms())
{
Console.WriteLine("消息发送成功");
}
else
{
//重发 或者 写具体的处理逻辑
Console.WriteLine("消息发送失败");
}
}

每发一次消息,确认一次,这种写法无疑会浪费资源。大家有何看法,欢迎评论。~_~

同理

如果我们将上述代码:channel.BasicPublish(Ename, routingKey, null, Encoding.UTF8.GetBytes(messages + "_" + i)); // 中的routingKey修改为空字符串,如下:

channel.BasicPublish(Ename, "", null, Encoding.UTF8.GetBytes(messages + "_" + i)); //

再次运行发现,消息并不能发送到队列中,程序也不会报异常。但WaitForConfirms依旧返回True

因此,这种机制还事务模式一样,都不能完全保证消息发送到 队列。

消费者端消息确认机制(自动确认和显示确认)

 从Broke发送到消费者时,RabbitMQ提供了两种消息确认的方式:自动确认和显示确认。

1 自动确认

  自动确认:当RabbbitMQ将消息发送给消费者后,消费者端接收到消息后,不等待消息处理结束,立即自动回送一个确认回执。自动确认的用法十分简单,设置消费方法的参数autoAck为true即可,如下:

channel.BasicConsume(Qname, true, consumer);//开启自动确认

注意:Broker会在接收到确认回执时删除消息,如果消费者接收到消息并返回了确认回执,然后这个消费者在处理消息时挂了,那么这条消息就再也找不回来了。

消费者代码如下

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQConsumer_2
{
class Program
{
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; //主机名
factory.UserName = "guest";//使用的用户
factory.Password = "guest";//用户密码
factory.Port = 5672;//端口号
factory.VirtualHost = "/"; //虚拟主机
factory.MaxMessageSize = 1024; //消息最大字节数
//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();

//事件基本消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"消费者收到消息: {message}");
//确认该消息已被消费
channel.BasicAck(ea.DeliveryTag, false); Thread.Sleep(100);
};
//启动消费者
string Qname = "MyQueue";
channel.BasicConsume(Qname, true, consumer);//开启自动确认
Console.WriteLine("消费者已启动");
Console.ReadKey();
channel.Dispose();
connection.Close();
}
}
}

View Code

2 显示确认

自动确认可能会出现消息丢失的问题,我们不免会想到:Broker收到回执后才删除消息,如果可以让消费者在接收消息时不立即返回确认回执,等到消息处理完成后(或者完成一部分的逻辑)再返回确认回执,这样就保证消费端不会丢失消息了!这正是显式确认的思路。使用显示确认也比较简单,首先将Resume方法的参数autoAck设置为false,然后在消费端使用代码channel.BasicAck()/BasicReject()等方法来确认和拒绝消息。看一个栗子:

生产者代码:

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMqProducer
{
class Program
{
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; //主机名
factory.UserName = "guest";//使用的用户
factory.Password = "guest";//用户密码
factory.Port = 5672;//端口号
factory.VirtualHost = "/"; //虚拟主机
factory.MaxMessageSize = 1024; //消息最大字节数
using (var connection = factory.CreateConnection())
{
//rabbitMQ 基于信道进行通信,因此,我们需要实例化信道Channel
using (var channel = connection.CreateModel())
{
string Ename = "MyExChange";
channel.ExchangeDeclare(Ename, ExchangeType.Direct, false, false, null);
//声明广播的队列
string QnameName = "MyQueue";
channel.QueueDeclare(QnameName, false, false, false, null);

string routingKey = "MyroutingKey"; //
//
channel.QueueBind(QnameName, Ename, routingKey);

var messages = "MyHello,RabbitMQ"; //
//
for (int i = 0; i < 10; i++)
{
channel.BasicPublish(Ename, routingKey, null, Encoding.UTF8.GetBytes(messages + "_" + i)); //发送消息
}

}
}
Console.Read();
}
}
}

View Code

消费者代码:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQConsumer_2
{
class Program
{
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; //主机名
factory.UserName = "guest";//使用的用户
factory.Password = "guest";//用户密码
factory.Port = 5672;//端口号
factory.VirtualHost = "/"; //虚拟主机
factory.MaxMessageSize = 1024; //消息最大字节数
//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();

//事件基本消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());

//确认该消息已被消费
if (message.StartsWith("Hello"))
{
Console.WriteLine($"消费者收到消息: {message}");
channel.BasicAck(ea.DeliveryTag, false);
}
else
{
Console.WriteLine($"消费者拒绝接收的消息: {message}");
//拒绝接收
channel.BasicReject(ea.DeliveryTag, false);
}
};
//启动消费者
string Qname = "MyQueue";
channel.BasicConsume(Qname, false, consumer);//开启自动确认
Console.WriteLine("消费者已启动");
Console.ReadKey();
channel.Dispose();
connection.Close();
}
}
}

View Code

注意:显示确认时,自动确认要关闭。

//确认该消息已被消费
if (message.StartsWith("Hello"))
{
Console.WriteLine($"消费者收到消息: {message}");
//deliveryTag 参数分发的标记
//multiple 是否确认多条
//void BasicAck(ulong deliveryTag, bool multiple);
channel.BasicAck(ea.DeliveryTag, false);
}
else
{
Console.WriteLine($"消费者拒绝接收的消息: {message}");
//deliveryTag 参数分发的标记
// requeue false 时,拒绝的消息会被直接删除 true 拒绝的消息会被重新放入队列中
//void BasicReject(ulong deliveryTag, bool requeue);
//拒绝接收
channel.BasicReject(ea.DeliveryTag, false);
}

介绍一下代码中的两个方法:channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple:false);方法用于确认消息,deliveryTag参数是分发的标记,multiple表示是否确认多条。channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue:false);方法用于拒绝消息,deliveryTag也是指分发的标记,requeue表示消息被拒绝后是否重新放回queue中,true表示放回queue中,false表示直接丢弃。

  运行这两个应用程序,通过生产者发送两条消息,效果如下:

一些意外的情况:使用显式确认时,如果消费者处理完消息不发送确认回执,那么消息不会被删除,消息的状态一直是Unacked,这条消息也不会再发送给其他消费者。如果一个消费者在处理消息时尚未发送确认回执的情况下挂掉了,那么消息会被重新放入队列(状态从Unacked变成Ready),有其他消费者存时,消息会发送给其他消费者。

例如,我们将上述消费者代码中的回执部分注释掉,如下

消息发送

再次生产消息后,运行消费者端代码,此时,消息的状态为:Unacked

主机名,NetCore  RabbitMQ 的消息确认机制

关闭消费者端调试后,消息状态又变成了 Ready

发送消息

@天才卧龙的波尔卡


加载全部内容

相关教程
猜你喜欢
用户评论
快盘暂不提供评论功能!