原
.netcore3.1 RabbitMq Overflow,Dead Letter Exchange,Dead Letter Routing,Max priority

.netcore3.1 RabbitMq Overflow,Dead Letter Exchange,Dead Letter Routing,Max priority
Overflow
从字面意思来讲就是溢出,它将结合上一个章节的max-length
与max-length-bytes
配合使用。它的意思是当队列所发布的消息超出max-length
的数量时或消息的总大小超出max-length-bytes
的大小时,所溢出的消息应该做什么处理?这个参数有两个Value
如下图所示:
x-overflow | 描述 |
---|---|
drop-head (默认) |
表示多余的消息直接丢弃。在代码中不填写即可,否则报错! |
reject-publish |
表示将暂时不让它进入队列中来,当有新的消息被消费时,仍然会重新排列到队列中去 |
举个栗子
我这里声明队列中的最大数量为2
,将x-overflow
设置为reject-publish
,并发布8条消息。代码如下:
var factory = new ConnectionFactory()
{
HostName = "47.98.187.188",
UserName = "bob",
Password = "bob"
};
// 创建一个链接
using (var connection = factory.CreateConnection())
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
// 声明一个只能为2条消息的队列,并将
channel.QueueDeclare(
queue: "Max_Length_Overflow_Queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string,object>{
{ "x-max-length",2 },
{ "x-overflow","reject-publish" }
});
var consumer = new EventingBasicConsumer(channel);
// 绑定处理事件
channel.BasicConsume(queue: "Max_Length_Overflow_Queue", autoAck: false, consumer: consumer);
// 处理事件
consumer.Received += (model, ea) =>
{
// 获取消息
var body = ea.Body.ToArray();
var props = ea.BasicProperties;
try
{
// 人事做离职处理
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("Tag:{0} Handle Message:{1}",ea.DeliveryTag, message);
}
catch (Exception e)
{
Console.WriteLine("Have Error:" + e.Message);
}
finally
{
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: true);
}
};
// 发布8条消息
for (int i = 0; i < 8; i++)
{
var msg = Encoding.UTF8.GetBytes("Good Morning!");
channel.BasicPublish(string.Empty, "Max_Length_Overflow_Queue", null, msg);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
我们发现消息,并没有被丢弃而是溢出的消息等待正准备被消费的消息被消费后,依次进入到消息队列中,然后再次被消费…
死信(Dead Letter Exchange|Dead Letter Routing)
死信的概念
队列中的消息可以按字母顺序排列
,当发生以下任何事件时,重新发布到交换机中:
消费者使用basic.reject 或basic.nack 将requeue 参数设置为false 来否定该消息。 |
该消息由于每个消息TTL 而到期 |
邮件被删除,因为其队列超过了长度限制 |
死信举例
我们以下面这张图做解释,当有11
个人发起要吃饭的请求,但是高级A餐厅只能容纳10
人吃饭,最后那一个人将会到B餐厅吃饭。我们可以将餐厅
比作队列
,两家餐厅不同的地点
表示交换机
。
相关参数
参数 | 参数描述 |
---|---|
x-dead-letter-exchange |
消息死了后会到哪个交换机上 |
x-dead-letter-routing-key |
会到哪个队列上 |
通过Demo实现举例
发布死信队列(A餐厅)。
var factory = new ConnectionFactory()
{
HostName = "47.98.187.188",
UserName = "bob",
Password = "bob"
};
// 创建一个链接
using (var connection = factory.CreateConnection())
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
// 声明一个A餐厅的对立(位置就默认的老地方就可以了,我嫌麻烦)
channel.QueueDeclare(
queue: "A_Centeen_Queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string,object>{
{ "x-max-length",10 },
{ "x-dead-letter-exchange","B_Exchange" },
{ "x-dead-letter-routing-key","B_Centeen_Queue" },
});
// 发布11个人去吃饭的消息
for (int i = 0; i < 12; i++)
{
var msg = Encoding.UTF8.GetBytes($"Person {i}");
channel.BasicPublish(string.Empty, "A_Centeen_Queue", null, msg);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
再发布B餐厅交换机与队列
var factory = new ConnectionFactory()
{
HostName = "47.98.187.188",
UserName = "bob",
Password = "bob"
};
// 创建一个链接
using (var connection = factory.CreateConnection())
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
// 声明一个B餐厅位置的交换机
channel.ExchangeDeclare("B_Exchange", ExchangeType.Direct, true, false, null);
// 声明一个B餐厅
channel.QueueDeclare(
queue: "B_Centeen_Queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
// 绑定B餐厅
channel.QueueBind("B_Centeen_Queue", "B_Exchange", "B_Centeen_Queue", null);
// 我们这里只需要获取去B餐厅的那个人就可以了
var consumer = new EventingBasicConsumer(channel);
// 绑定处理事件
channel.BasicConsume(queue: "B_Centeen_Queue", autoAck: false, consumer: consumer);
// B餐厅消费
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var props = ea.BasicProperties;
try
{
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("B Centeen Consumer Message: {0}", message);
}
catch (Exception e)
{
Console.WriteLine("Have Error:" + e.Message);
}
finally
{
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
}
};
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
运行测试一下。
Max Priority(优先级)
简单来说0-10
,那么10
是最大,我们就可以按照1-10
来设立优先级,当优先级为10
时,我就先让它处理该消息。
参数 | 描述 |
---|---|
x-max-priority (默认值为0 ) |
表示声明的优先级范围,一般在0-255 之间。但由于里面是树形接口希望越小越好。 |
Max Priority Demo
示例代码如下:
var factory = new ConnectionFactory()
{
HostName = "47.98.187.188",
UserName = "bob",
Password = "bob"
};
// 创建一个链接
using (var connection = factory.CreateConnection())
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
// 声明一个Priority_Queue队列
channel.QueueDeclare(
queue: "Priority_Queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string,object>{
// 设置优先级 0-10
{ "x-max-priority",10 },
});
// 发布Priority_Queue队列中的消息
for (int i = 0; i < 10; i++)
{
var msg = Encoding.UTF8.GetBytes($"Message {i} ,Priority: 1");
var cbp = channel.CreateBasicProperties();
// 优先级都为1
cbp.Priority = 1;
channel.BasicPublish(string.Empty, "Priority_Queue", cbp, msg);
}
var cbphigh = channel.CreateBasicProperties();
cbphigh.Priority = 10;
// 发布一个高优先级消息
channel.BasicPublish(string.Empty, "Priority_Queue", cbphigh, Encoding.UTF8.GetBytes($"High Priority Message,Priority: 10"));
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739
评价
排名
2
文章
634
粉丝
44
评论
93
docker中Sware集群与service
尘叶心繁 : 想学呀!我教你呀
一个bug让程序员走上法庭 索赔金额达400亿日元
叼着奶瓶逛酒吧 : 所以说做程序员也要懂点法律知识
.net core 塑形资源
剑轩 : 收藏收藏
映射AutoMapper
剑轩 :
好是好,这个对效率影响大不大哇,效率高不高
一个bug让程序员走上法庭 索赔金额达400亿日元
剑轩 : 有点可怕
ASP.NET Core 服务注册生命周期
剑轩 :
http://www.tnblog.net/aojiancc2/article/details/167
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:
50010702506256


欢迎加群交流技术