
.netcore3.1 RabbitMq Single Active Consumer
Single Active Consumer
单个活动的消费者每次只能从队列中消费一个消费者,并在活动的消费者被取消或死亡的情况下故障转移到另一个注册的消费者。当必须按照消息到达队列的顺序来使用和处理消息时,仅与一个使用者一起使用很有用。举个例子:
创建队列
我们使用x-single-active-consumer
参数来定义一个Single_Active_Consumer_Queue
的队列。
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "47.98.187.188",
UserName = "bob",
Password = "bob"
};
// 创建一个链接
using (var connection = factory.CreateConnection())
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
// 声明一个Single_Active_Consumer_Queue队列
channel.QueueDeclare(
queue: "Single_Active_Consumer_Queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string,object>{
{ "x-single-active-consumer",true },
});
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
运行并创建对应的队列,在UI中查看。
创建消费端代码
我们这里通过从args
接收一个参数作为消费端的名称。
var factory = new ConnectionFactory()
{
HostName = "47.98.187.188",
UserName = "bob",
Password = "bob"
};
string Name = args[0];
// 创建一个链接
using (var connection = factory.CreateConnection())
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
// 我们这里只需要获取去B餐厅的那个人就可以了
var consumer = new EventingBasicConsumer(channel);
// 绑定处理事件
channel.BasicConsume(queue: "Single_Active_Consumer_Queue", autoAck: false, consumer: consumer);
// B餐厅消费
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var props = ea.BasicProperties;
try
{
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("{0} Message: {1}", Name, message);
}
catch (Exception e)
{
Console.WriteLine("Have Error:" + e.Message);
}
finally
{
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
}
};
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
并在运行目录中打开两个客户端A_Client
与B_Client
,注意程序有启动先打开A_Client
再打开B_Client
.
添加消息
我们将10
条消息放入至Single_Active_Consumer_Queue
队列中
for (int i = 0; i < 10; i++)
{
var msg = Encoding.UTF8.GetBytes($"Message {i} ,Priority: 1");
channel.BasicPublish(string.Empty, "Single_Active_Consumer_Queue", null, msg);
}
我们发现这并不符合我们的程序设计,这样A
都把活路干完了,我们设置让A
只干5
条数据
调整消费端处理量
// 我们这里只需要获取去B餐厅的那个人就可以了
var consumer = new EventingBasicConsumer(channel);
// 绑定处理事件
channel.BasicConsume(queue: "Single_Active_Consumer_Queue", autoAck: false, consumer: consumer);
// B餐厅消费
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var props = ea.BasicProperties;
try
{
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("{0} Message: {1}", Name, message);
// 处理4条就退出
if (Name.Contains("A") && ea.DeliveryTag == 4)
{
Console.WriteLine("A go home!!!!");
// 退出程序的代码
Environment.Exit(0);
}
}
catch (Exception e)
{
Console.WriteLine("Have Error:" + e.Message);
}
finally
{
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
}
};
Console.WriteLine(" Press [enter] to exit.");
由于在A_Client
中消息3
没有被交付后面又再一次处理了,但是我们做了更多的测试发现也会存在下面的结果。
这是由于事件异步,A_Client
关得太快而导致该问题的发生,如何解决呢?我们可以尝试一下添加BasicQos
,我在处理这条消息时,在没处理完之前不要给我推消息过来处理。
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739
评价
排名
2
文章
634
粉丝
44
评论
93
docker中Sware集群与service
尘叶心繁 : 想学呀!我教你呀
一个bug让程序员走上法庭 索赔金额达400亿日元
叼着奶瓶逛酒吧 : 所以说做程序员也要懂点法律知识
.net core 塑形资源
剑轩 : 收藏收藏
映射AutoMapper
剑轩 :
好是好,这个对效率影响大不大哇,效率高不高
一个bug让程序员走上法庭 索赔金额达400亿日元
剑轩 : 有点可怕
ASP.NET Core 服务注册生命周期
剑轩 :
http://www.tnblog.net/aojiancc2/article/details/167
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:
50010702506256


欢迎加群交流技术
bk