tnblog
首页
视频
资源
登录

.netcore3.1 RabbitMq Single Active Consumer

8013人阅读 2021/2/2 15:38 总访问:3467006 评论:1 收藏:0 手机
分类: RabbitMq

.netcore3.1 RabbitMq Single Active Consumer

Single Active Consumer

单个活动的消费者每次只能从队列中消费一个消费者,并在活动的消费者被取消或死亡的情况下故障转移到另一个注册的消费者。当必须按照消息到达队列的顺序来使用和处理消息时,仅与一个使用者一起使用很有用。举个例子:

创建队列

我们使用x-single-active-consumer参数来定义一个Single_Active_Consumer_Queue的队列。

  1. static void Main(string[] args)
  2. {
  3. var factory = new ConnectionFactory()
  4. {
  5. HostName = "47.98.187.188",
  6. UserName = "bob",
  7. Password = "bob"
  8. };
  9. // 创建一个链接
  10. using (var connection = factory.CreateConnection())
  11. {
  12. // 创建一个通道
  13. using (var channel = connection.CreateModel())
  14. {
  15. // 声明一个Single_Active_Consumer_Queue队列
  16. channel.QueueDeclare(
  17. queue: "Single_Active_Consumer_Queue",
  18. durable: true,
  19. exclusive: false,
  20. autoDelete: false,
  21. arguments: new Dictionary<string,object>{
  22. { "x-single-active-consumer",true },
  23. });
  24. Console.WriteLine(" Press [enter] to exit.");
  25. Console.ReadLine();
  26. }
  27. }
  28. }

运行并创建对应的队列,在UI中查看。

创建消费端代码

我们这里通过从args接收一个参数作为消费端的名称。

  1. var factory = new ConnectionFactory()
  2. {
  3. HostName = "47.98.187.188",
  4. UserName = "bob",
  5. Password = "bob"
  6. };
  7. string Name = args[0];
  8. // 创建一个链接
  9. using (var connection = factory.CreateConnection())
  10. {
  11. // 创建一个通道
  12. using (var channel = connection.CreateModel())
  13. {
  14. // 我们这里只需要获取去B餐厅的那个人就可以了
  15. var consumer = new EventingBasicConsumer(channel);
  16. // 绑定处理事件
  17. channel.BasicConsume(queue: "Single_Active_Consumer_Queue", autoAck: false, consumer: consumer);
  18. // B餐厅消费
  19. consumer.Received += (model, ea) =>
  20. {
  21. var body = ea.Body.ToArray();
  22. var props = ea.BasicProperties;
  23. try
  24. {
  25. var message = Encoding.UTF8.GetString(body);
  26. Console.WriteLine("{0} Message: {1}", Name, message);
  27. }
  28. catch (Exception e)
  29. {
  30. Console.WriteLine("Have Error:" + e.Message);
  31. }
  32. finally
  33. {
  34. channel.BasicAck(deliveryTag: ea.DeliveryTag,
  35. multiple: false);
  36. }
  37. };
  38. Console.WriteLine(" Press [enter] to exit.");
  39. Console.ReadLine();
  40. }
  41. }

并在运行目录中打开两个客户端A_ClientB_Client,注意程序有启动先打开A_Client再打开B_Client.

添加消息

我们将10条消息放入至Single_Active_Consumer_Queue队列中

  1. for (int i = 0; i < 10; i++)
  2. {
  3. var msg = Encoding.UTF8.GetBytes($"Message {i} ,Priority: 1");
  4. channel.BasicPublish(string.Empty, "Single_Active_Consumer_Queue", null, msg);
  5. }

我们发现这并不符合我们的程序设计,这样A都把活路干完了,我们设置让A只干5条数据

调整消费端处理量

  1. // 我们这里只需要获取去B餐厅的那个人就可以了
  2. var consumer = new EventingBasicConsumer(channel);
  3. // 绑定处理事件
  4. channel.BasicConsume(queue: "Single_Active_Consumer_Queue", autoAck: false, consumer: consumer);
  5. // B餐厅消费
  6. consumer.Received += (model, ea) =>
  7. {
  8. var body = ea.Body.ToArray();
  9. var props = ea.BasicProperties;
  10. try
  11. {
  12. var message = Encoding.UTF8.GetString(body);
  13. Console.WriteLine("{0} Message: {1}", Name, message);
  14. // 处理4条就退出
  15. if (Name.Contains("A") && ea.DeliveryTag == 4)
  16. {
  17. Console.WriteLine("A go home!!!!");
  18. // 退出程序的代码
  19. Environment.Exit(0);
  20. }
  21. }
  22. catch (Exception e)
  23. {
  24. Console.WriteLine("Have Error:" + e.Message);
  25. }
  26. finally
  27. {
  28. channel.BasicAck(deliveryTag: ea.DeliveryTag,
  29. multiple: false);
  30. }
  31. };
  32. Console.WriteLine(" Press [enter] to exit.");

由于在A_Client中消息3没有被交付后面又再一次处理了,但是我们做了更多的测试发现也会存在下面的结果。

这是由于事件异步,A_Client关得太快而导致该问题的发生,如何解决呢?我们可以尝试一下添加BasicQos,我在处理这条消息时,在没处理完之前不要给我推消息过来处理。

  1. channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);


欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739

评价

bk

2021/2/2 18:26:42

[good][good]

net core 使用 EF Code First

下面这些内容很老了看这篇:https://www.tnblog.net/aojiancc2/article/details/5365 项目使用多层,把数据库访问...

.net mvc分部页,.net core分部页

.net分部页的三种方式第一种:@Html.Partial(&quot;_分部页&quot;)第二种:@{ Html.RenderPartial(&quot;分部页&quot;);}...

StackExchange.Redis操作redis(net core支持)

官方git开源地址https://github.com/StackExchange/StackExchange.Redis官方文档在docs里边都是官方的文档通过nuget命令下...

.net core 使用session

tip:net core 2.2后可以直接启用session了,不用在自己添加一次session依赖,本身就添加了使用nuget添加引用Microsoft.AspN...

通俗易懂,什么是.net?什么是.net Framework?什么是.net core?

朋友圈@蓝羽 看到一篇文章写的太详细太通俗了,搬过来细细看完,保证你对.NET有个新的认识理解原文地址:https://www.cnblo...

asp.net core2.0 依赖注入 AddTransient与AddScoped的区别

asp.net core主要提供了三种依赖注入的方式其中AddTransient与AddSingleton比较好区别AddTransient瞬时模式:每次都获取一...

.net core 使用 Kestrel

Kestrel介绍 Kestrel是一个基于libuv的跨平台web服务器 在.net core项目中就可以不一定要发布在iis下面了Kestrel体验可以使...

net core中使用cookie

net core中可以使用传统的cookie也可以使用加密的cookieNET CORE中使用传统cookie设置:HttpContext.Response.Cookies.Appe...

net core项目结构简单分析

一:wwwrootwwwroot用于存放网站的静态资源,例如css,js,图片与相关的前端插件等lib主要是第三方的插件,例如微软默认引用...

net core使用EF之DB First

一.新建一个.net core的MVC项目新建好项目后,不能像以前一样直接在新建项中添加ef了,需要用命令在添加ef的依赖二.使用Nug...

.net core使用requestresponse下载文件下载excel等

使用request获取内容net core中request没有直接的索引方法,需要点里边的Query,或者formstringbase64=Request.Form[&quot;f...

iframe自适应高度与配合net core使用

去掉iframe边框frameborder=&quot;0&quot;去掉滚动条scrolling=&quot;no&quot;iframe 自适应高度如果内容是固定的,那么就...

net core启动报错Unable to configure HTTPS endpoint. No server certificate was specified

这是因为net core2.1默认使用的https,如果使用Kestrel web服务器的话没有安装证书就会报这个错其实仔细看他的错误提示,其...

net core中使用url编码与解码操作

net core中暂时还没有以前asp.net与mvc中的server对象。获取url的编码与解码操作不能使用以前的server对象来获取。使用的是...

下载net core

官方下载地址:https://dotnet.microsoft.com/download 进来之后就可以看到最新的下载版本可以直接点击下载,也可以下载其...

net core使用依赖注入来装载EF的上下文对象

妹子情人节快乐~.net core中用了不少的依赖注入,官方文档中也推荐使用。这样使用依赖注入来管理ef对象,还是比较科学,比如...
这一世以无限游戏为使命!
排名
2
文章
634
粉丝
44
评论
93
docker中Sware集群与service
尘叶心繁 : 想学呀!我教你呀
一个bug让程序员走上法庭 索赔金额达400亿日元
叼着奶瓶逛酒吧 : 所以说做程序员也要懂点法律知识
.net core 塑形资源
剑轩 : 收藏收藏
映射AutoMapper
剑轩 : 好是好,这个对效率影响大不大哇,效率高不高
ASP.NET Core 服务注册生命周期
剑轩 : http://www.tnblog.net/aojiancc2/article/details/167
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:50010702506256
欢迎加群交流技术