
.netcore3.1 RabbitMq 工作队列轮询与确认消息
https://www.rabbitmq.com/confirms.html
https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html
轮询消费
如上图所示,默认消息队列是通过轮询的方式将消息有序的分发到不同的消费端上去。
创建100条消息
代码示例如下:+
var factory = new ConnectionFactory()
{
HostName = "47.98.187.188",
UserName = "bob",
Password = "bob"
};
// 创建一个链接
using (var connection = factory.CreateConnection())
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
// 在这里还应该声明一个交换机
// 声明一个队列
channel.QueueDeclare(
queue: "mytestqueue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
for (int i = 0; i < 100; i++)
{
// 创建一个消息
string message = $"({i}) Hello World";
// 编码一个消息
var body = Encoding.UTF8.GetBytes(message);
// 发布一个消息
channel.BasicPublish(
exchange: string.Empty,
routingKey: "mytestqueue",
basicProperties: null,
body: body
);
}
Console.ReadLine();
}
}
执行完毕后,UI上已经显示出队列的消息数量了。
我们也可以点击mytestqueue
队列里面去进行获取消息,操作如下:
开启两个队列去消费100条
在消费时我们可以开启一个事件处理实例,通过事件Received
去处理mytestqueue
队列中的消息,代码示例如下:
var factory = new ConnectionFactory()
{
HostName = "47.98.187.188",
UserName = "bob",
Password = "bob"
};
// 创建一个链接
using (var connection = factory.CreateConnection())
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
// 创建消费实例
var consumer = new EventingBasicConsumer(channel);
// 事件在交付到使用者时触发。(消费处理事件)
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" Processing message: {0}", message);
};
// 绑定到队列中去
channel.BasicConsume(queue: "mytestqueue", autoAck: true, consumer: consumer);
}
}
Console.ReadLine();
开启两个进行处理,我们从下面的图中得出它是通过轮询的方式进行消费的。(我这里是先开启的消费端,再添加生产者的方式)
事件处理是否阻塞
我们在事件处理时添加两行代码,测试一下
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" Processing message: {0}", message);
// 将它延迟3s
Thread.Sleep(3000);
Console.WriteLine(" [x] Done Message: {0}",message);
};
// 绑定到队列中去
channel.BasicConsume(queue: "mytestqueue", autoAck: true, consumer: consumer);
我们发现每一个消费端在消费的时候事件都是同步的,都是在等上一个消息被事件处理完成后再处理下一个消息的。但紧接着新的疑惑就有了,如果我们在两个消费端进行消费的时候,突然有一个消费端挂掉了会怎么样?
我们发现它第7
条Hello World
并没有被处理完成,然后也没有被重新消费的保险措施。这就产生了新的问题:消息丢失在了消费端。这个时候我们便需要通过消息确认的方式解决这个问题。
消息确认,让消息更加安全可靠
在代码中将处理实例绑定到队列时的BasicConsume
方法里有一个autoAck
的bool
参数,默认值为false
。当它为true
的时候,默认表示自动确认该消息被处理了,false
则表示手动处理的方法。RabbitMQ
为我们提供了三种手动交付的方法:
协议方法 | 描述 | 代码中的方法(.net core) |
---|---|---|
basic.ack | 肯定确认 | BasicAck |
basic.nack | 否定确认 | BasicNack |
basic.reject | 否定确认,通过参数可以丢弃消息以及重新排列到队列中 | BasicReject |
ACK确认交付
BasicAck手动确认
在代码中我们先把autoAck
设置为false
,将自动确认改为手动确认。并在处理消息事件的的末尾添加处理完成的手动确认代码。
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" Processing message: {0}", message);
Thread.Sleep(3000);
Console.WriteLine(" [x] Done Message: {0}",message);
// 手动确认
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
// 绑定到队列中去
channel.BasicConsume(queue: "mytestqueue", autoAck: false, consumer: consumer);
在添加100条数据后,开启两个消费端,并在中途关闭掉一个客户端。
我们发现第一个消费端在处理完自己的消费任务后,开始处理断掉的消费端没有处理的消息了,最后解决消息在客户端丢失的问题。(从第13条开始)接下来我们来看看BasicAck
中长用的参数:
参数名 | 描述 |
---|---|
deliveryTag | 表示消息标签 |
multiple | 当值为true 时,表示一次确认就能代表多次确认。为false 一次确认只能代表一次消息的确认。 |
BasicAck中multiple为true时的应用
可以分批手动确认以减少网络流量。当它为true
的时候,表示假如:1,2,3,4,5 这五条消息正在被几个客户端处理的时候突然有一个客户端处理5
时调用了multiple
为true
这方法,此时(1,2,3,4,5)这五条消息表示都已经处理过了。如果为false
,我只知道我自己处理的那条有没有成功嘛。
例如我将Worker
的解决方案写为10
后处理完成。
Thread.Sleep(10000);
Console.WriteLine(" [x] Done Message: {0}",message);
// 手动确认
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);
通过100条消息消费时统计Runtime Metrics (Advanced)
的平均指标。为true
时平均为402
,false
时平均为406
。
BasicReject否定确认与重回队列
有时,消费者无法立即处理交货,但其他情况下可能可以。在这种情况下,可能需要重新排队并让另一个消费者接收和处理它。
否定确认,并丢弃消息
channel.BasicReject(ea.DeliveryTag, false);
这里我们消费20
条数据进行测试一下,消费端代码如下
consumer.Received += (model, ea) =>
{
try
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("One Processing message: {0}", message);
Thread.Sleep(1000);
Console.WriteLine(" [x] Done Message: {0}", message);
// 否定确认,并丢弃消息
channel.BasicReject(ea.DeliveryTag, false);
}
catch (Exception ex)
{
Console.WriteLine("【Error】:", ex.Message);
}
};
否定确认,重新排列到队列
channel.BasicReject(ea.DeliveryTag, true);
这里我们用随机的方式进行处理,注意需要在外围添加Random
的实例
consumer.Received += (model, ea) =>
{
try
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.BackgroundColor = ConsoleColor.Black;
Console.ForegroundColor = ConsoleColor.White;
Console.WriteLine("One Processing message: {0}", message);
Thread.Sleep(1000);
if (random.Next(0,2).Equals(0))
{
Console.BackgroundColor = ConsoleColor.Blue; //设置背景色
Console.ForegroundColor = ConsoleColor.White; //设置前景色,即字体颜色
Console.WriteLine(" Discard Message: {0}", message);
// 否定确认
channel.BasicReject(ea.DeliveryTag, false);
}
else
{
Console.BackgroundColor = ConsoleColor.Green;
Console.ForegroundColor = ConsoleColor.DarkGreen;
Console.WriteLine(" Return Message: {0}", message);
// 重新排队
channel.BasicReject(ea.DeliveryTag, true);
}
}
catch (Exception ex)
{
Console.WriteLine("【Error】:", ex.Message);
}
};
截图没截到,后面反正又重新排列到队列中去了。重新排队的消息可能立即准备好重新发送,具体取决于它们在队列中的位置以及活动的使用者使用的通道使用的预取值。这意味着,如果所有使用者由于瞬态而无法处理交货而重新排队,则他们将创建一个重新排队/重新交货循环。就网络带宽和CPU资源而言,这样的循环可能代价很高。
BasicReject与BasicNack区别
消费者实现可以跟踪重新交付的次数并永久拒绝消息(丢弃消息),或在延迟后安排重新排队。BasicNack方法可以一次拒绝或重新排队多个消息。这就是与BasicReject不同的地方。
consumer.Received += (model, ea) =>
{
try
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.BackgroundColor = ConsoleColor.Black;
Console.ForegroundColor = ConsoleColor.White;
Console.WriteLine("One Processing message: {0}", message);
Thread.Sleep(1000);
if (random.Next(0,2).Equals(0))
{
Console.BackgroundColor = ConsoleColor.Blue; //设置背景色
Console.ForegroundColor = ConsoleColor.White; //设置前景色,即字体颜色
Console.WriteLine(" Discard Message: {0}", message);
// 否定确认,减少网络请求,提高性能
channel.BasicNack(ea.DeliveryTag,multiple: true,requeue:false);
}
else
{
Console.BackgroundColor = ConsoleColor.Green;
Console.ForegroundColor = ConsoleColor.DarkGreen;
Console.WriteLine(" Return Message: {0}", message);
// 重新排队,减少网络请求,提高性能
channel.BasicNack(ea.DeliveryTag,multiple: true,requeue:true);
}
}
catch (Exception ex)
{
Console.WriteLine("【Error】:", ex.Message);
}
};
当你忘记了BasicAck
时,这是一个简单的错误,但是后果很严重。当您的客户端退出时,消息将被重新发送(看起来像是随机重新发送),但是RabbitMQ将消耗越来越多的内存,因为它将无法释放任何未确认的消息。
为了调试这种错误,您可以使用 rabbitmqctl
打印 messages_unacknowledged
字段:
持久化
问题
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是,如果RabbitMQ服务器停止
,我们的任务仍然会丢失。
RabbitMQ退出或崩溃时,除非您告诉它,否则它将忘记队列和消息。确保消息不会丢失需要做两件事:我们需要将Queue
和Message
都标记为持久。
Queue 持久化
我们在声明队列的时候,将durable
设置为true
。这样就可以将队列持久化,但注意:已经声明的队列这样做是无效的,我们可以声明一个名字不同新的队列。
channel.QueueDeclare(
queue: "mytestqueue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
Message 持久化
在这一点上,我们确定即使RabbitMQ重新启动,task_queue
队列也不会丢失。现在我们需要将消息标记为持久性-通过将IBasicProperties.SetPersistent
设置为true
。
var body = Encoding.UTF8.GetBytes(message);
// 持久化操作,告诉Rabbitmq服务器将消息存储在磁盘上,但仍然有
// 少部分是存储在缓存中的,所以仍然不能绝对保证
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
// 发布一个消息
channel.BasicPublish(
exchange: string.Empty,
routingKey: "mytestqueuetwo",
basicProperties: properties,
body: body
);
消息分配合理化
当每条消息在A消费端
都需要处理很长的时间,才能确认消息被处理的时候。而B消费端
却在玩的时候,仍然平均去分发消息,就会显得很不合理。
为了更改此行为,我们可以将BasicQos
方法与prefetchCount = 1
设置一起使用。这告诉RabbitMQ一次不要给工人一个以上的消息。换句话说,在处理并确认上一条消息之前,不要将新消息发送给工作人员。而是将其分派给不忙的下一个工作程序。
channel.BasicQos(0, 1, false);
注意:如果所有工作人员都忙,则您的队列可以填满。您将需要关注这一点,并可能会增加更多的工作人员,或者有其他一些策略。
创建Queue添加消息的代码如下
var factory = new ConnectionFactory()
{
HostName = "47.98.187.188",
UserName = "bob",
Password = "bob"
};
// 创建一个链接
using (var connection = factory.CreateConnection())
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
// 在这里还应该声明一个交换机
// 声明一个队列
channel.QueueDeclare(
queue: "mytestqueuetwo",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
for (int i = 0; i < 30; i++)
{
// 创建一个消息
string message = $"({i}) Hello World";
// 编码一个消息
var body = Encoding.UTF8.GetBytes(message);
// 持久化操作,告诉Rabbitmq服务器将消息存储在磁盘上,但仍然有
// 少部分是存储在缓存中的,所以仍然不能绝对保证
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
// 发布一个消息
channel.BasicPublish(
exchange: string.Empty,
routingKey: "mytestqueuetwo",
basicProperties: properties,
body: body
);
}
Console.WriteLine("Finish");
Console.ReadLine();
}
}
消费端代码
var factory = new ConnectionFactory()
{
HostName = "47.98.187.188",
UserName = "bob",
Password = "bob",
VirtualHost = "/"
};
// 获取处理时常(第一个耗时为10s,第二个耗时为2s)
var sleeptime = int.Parse(args[0]);
// 创建一个链接
using (var connection = factory.CreateConnection())
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
// one by one 处理
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
// 创建消费实例
var consumer = new EventingBasicConsumer(channel);
// 事件在交付到使用者时触发。(消费处理事件)
consumer.Received += (model, ea) =>
{
try
{
int sj = sleeptime;
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.BackgroundColor = ConsoleColor.Black;
Console.ForegroundColor = ConsoleColor.White;
Console.WriteLine("One Processing message: {0}", message);
Thread.Sleep(sj*1000);
if (random.Next(0,2).Equals(0))
{
Console.BackgroundColor = ConsoleColor.Blue; //设置背景色
Console.ForegroundColor = ConsoleColor.White; //设置前景色,即字体颜色
Console.WriteLine(" Discard Message: {0}", message);
// 否定确认,减少网络请求,提高性能
channel.BasicNack(ea.DeliveryTag,multiple: true,requeue:false);
}
else
{
Console.BackgroundColor = ConsoleColor.Green;
Console.ForegroundColor = ConsoleColor.DarkGreen;
Console.WriteLine(" Return Message: {0}", message);
// 重新排队,减少网络请求,提高性能
channel.BasicNack(ea.DeliveryTag,multiple: true,requeue:true);
}
}
catch (Exception ex)
{
Console.WriteLine("【Error】:", ex.Message);
}
};
// 绑定到队列中去
channel.BasicConsume(queue: "mytestqueuetwo", autoAck: false, consumer: consumer);
Console.ReadLine();
}
}
绑定交换机
在Rabbitmq中,在一个交换机上可以绑定多个队列,如下图所示
在生产的时候我们就可以通过QueueBind
将队列绑定到交换机上。
var factory = new ConnectionFactory()
{
HostName = "47.98.187.188",
UserName = "bob",
Password = "bob"
};
// 创建一个链接
using (var connection = factory.CreateConnection())
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
// 创建交换机 普通类型
channel.ExchangeDeclare("MyExchangeName", ExchangeType.Direct);
// 声明一个队列
channel.QueueDeclare(
queue: "mytestqueuetwo",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
// 将队列与交换机绑定在一起
channel.QueueBind("mytestqueuetwo", "MyExchangeName", "mytestqueuetwo", null);
Console.WriteLine("Finish");
Console.ReadLine();
}
}
欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739

