tnblog
首页
视频
资源
登录

.netcore3.1 RabbitMq 工作队列轮询与确认消息

8027人阅读 2021/1/21 13:20 总访问:3470513 评论:1 收藏:1 手机
分类: RabbitMq

.netcore3.1 RabbitMq 工作队列轮询与确认消息

https://www.rabbitmq.com/confirms.html
https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html

轮询消费

如上图所示,默认消息队列是通过轮询的方式将消息有序的分发到不同的消费端上去。

创建100条消息

代码示例如下:+

  1. var factory = new ConnectionFactory()
  2. {
  3. HostName = "47.98.187.188",
  4. UserName = "bob",
  5. Password = "bob"
  6. };
  7. // 创建一个链接
  8. using (var connection = factory.CreateConnection())
  9. {
  10. // 创建一个通道
  11. using (var channel = connection.CreateModel())
  12. {
  13. // 在这里还应该声明一个交换机
  14. // 声明一个队列
  15. channel.QueueDeclare(
  16. queue: "mytestqueue",
  17. durable: false,
  18. exclusive: false,
  19. autoDelete: false,
  20. arguments: null);
  21. for (int i = 0; i < 100; i++)
  22. {
  23. // 创建一个消息
  24. string message = $"({i}) Hello World";
  25. // 编码一个消息
  26. var body = Encoding.UTF8.GetBytes(message);
  27. // 发布一个消息
  28. channel.BasicPublish(
  29. exchange: string.Empty,
  30. routingKey: "mytestqueue",
  31. basicProperties: null,
  32. body: body
  33. );
  34. }
  35. Console.ReadLine();
  36. }
  37. }

执行完毕后,UI上已经显示出队列的消息数量了。

我们也可以点击mytestqueue队列里面去进行获取消息,操作如下:

开启两个队列去消费100条

在消费时我们可以开启一个事件处理实例,通过事件Received去处理mytestqueue队列中的消息,代码示例如下:

  1. var factory = new ConnectionFactory()
  2. {
  3. HostName = "47.98.187.188",
  4. UserName = "bob",
  5. Password = "bob"
  6. };
  7. // 创建一个链接
  8. using (var connection = factory.CreateConnection())
  9. {
  10. // 创建一个通道
  11. using (var channel = connection.CreateModel())
  12. {
  13. // 创建消费实例
  14. var consumer = new EventingBasicConsumer(channel);
  15. // 事件在交付到使用者时触发。(消费处理事件)
  16. consumer.Received += (model, ea) =>
  17. {
  18. var body = ea.Body.ToArray();
  19. var message = Encoding.UTF8.GetString(body);
  20. Console.WriteLine(" Processing message: {0}", message);
  21. };
  22. // 绑定到队列中去
  23. channel.BasicConsume(queue: "mytestqueue", autoAck: true, consumer: consumer);
  24. }
  25. }
  26. Console.ReadLine();

开启两个进行处理,我们从下面的图中得出它是通过轮询的方式进行消费的。(我这里是先开启的消费端,再添加生产者的方式)

事件处理是否阻塞

我们在事件处理时添加两行代码,测试一下

  1. consumer.Received += (model, ea) =>
  2. {
  3. var body = ea.Body.ToArray();
  4. var message = Encoding.UTF8.GetString(body);
  5. Console.WriteLine(" Processing message: {0}", message);
  6. // 将它延迟3s
  7. Thread.Sleep(3000);
  8. Console.WriteLine(" [x] Done Message: {0}",message);
  9. };
  10. // 绑定到队列中去
  11. channel.BasicConsume(queue: "mytestqueue", autoAck: true, consumer: consumer);

我们发现每一个消费端在消费的时候事件都是同步的,都是在等上一个消息被事件处理完成后再处理下一个消息的。但紧接着新的疑惑就有了,如果我们在两个消费端进行消费的时候,突然有一个消费端挂掉了会怎么样?

我们发现它第7Hello World并没有被处理完成,然后也没有被重新消费的保险措施。这就产生了新的问题:消息丢失在了消费端。这个时候我们便需要通过消息确认的方式解决这个问题。

消息确认,让消息更加安全可靠

在代码中将处理实例绑定到队列时的BasicConsume方法里有一个autoAckbool参数,默认值为false。当它为true的时候,默认表示自动确认该消息被处理了,false则表示手动处理的方法。RabbitMQ为我们提供了三种手动交付的方法:

协议方法 描述 代码中的方法(.net core)
basic.ack 肯定确认 BasicAck
basic.nack 否定确认 BasicNack
basic.reject 否定确认,通过参数可以丢弃消息以及重新排列到队列中 BasicReject

ACK确认交付

BasicAck手动确认

在代码中我们先把autoAck设置为false,将自动确认改为手动确认。并在处理消息事件的的末尾添加处理完成的手动确认代码。

  1. consumer.Received += (model, ea) =>
  2. {
  3. var body = ea.Body.ToArray();
  4. var message = Encoding.UTF8.GetString(body);
  5. Console.WriteLine(" Processing message: {0}", message);
  6. Thread.Sleep(3000);
  7. Console.WriteLine(" [x] Done Message: {0}",message);
  8. // 手动确认
  9. channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  10. };
  11. // 绑定到队列中去
  12. channel.BasicConsume(queue: "mytestqueue", autoAck: false, consumer: consumer);

在添加100条数据后,开启两个消费端,并在中途关闭掉一个客户端。

我们发现第一个消费端在处理完自己的消费任务后,开始处理断掉的消费端没有处理的消息了,最后解决消息在客户端丢失的问题。(从第13条开始)接下来我们来看看BasicAck中长用的参数:

参数名 描述
deliveryTag 表示消息标签
multiple 当值为true时,表示一次确认就能代表多次确认。为false一次确认只能代表一次消息的确认。

BasicAck中multiple为true时的应用

可以分批手动确认以减少网络流量。当它为true的时候,表示假如:1,2,3,4,5 这五条消息正在被几个客户端处理的时候突然有一个客户端处理5时调用了multipletrue这方法,此时(1,2,3,4,5)这五条消息表示都已经处理过了。如果为false,我只知道我自己处理的那条有没有成功嘛。
例如我将Worker的解决方案写为10后处理完成。

  1. Thread.Sleep(10000);
  2. Console.WriteLine(" [x] Done Message: {0}",message);
  3. // 手动确认
  4. channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);

通过100条消息消费时统计Runtime Metrics (Advanced)的平均指标。为true时平均为402,false时平均为406

BasicReject否定确认与重回队列

有时,消费者无法立即处理交货,但其他情况下可能可以。在这种情况下,可能需要重新排队并让另一个消费者接收和处理它。

否定确认,并丢弃消息

  1. channel.BasicReject(ea.DeliveryTag, false);

这里我们消费20条数据进行测试一下,消费端代码如下

  1. consumer.Received += (model, ea) =>
  2. {
  3. try
  4. {
  5. var body = ea.Body.ToArray();
  6. var message = Encoding.UTF8.GetString(body);
  7. Console.WriteLine("One Processing message: {0}", message);
  8. Thread.Sleep(1000);
  9. Console.WriteLine(" [x] Done Message: {0}", message);
  10. // 否定确认,并丢弃消息
  11. channel.BasicReject(ea.DeliveryTag, false);
  12. }
  13. catch (Exception ex)
  14. {
  15. Console.WriteLine("【Error】:", ex.Message);
  16. }
  17. };


否定确认,重新排列到队列

  1. channel.BasicReject(ea.DeliveryTag, true);

这里我们用随机的方式进行处理,注意需要在外围添加Random的实例

  1. consumer.Received += (model, ea) =>
  2. {
  3. try
  4. {
  5. var body = ea.Body.ToArray();
  6. var message = Encoding.UTF8.GetString(body);
  7. Console.BackgroundColor = ConsoleColor.Black;
  8. Console.ForegroundColor = ConsoleColor.White;
  9. Console.WriteLine("One Processing message: {0}", message);
  10. Thread.Sleep(1000);
  11. if (random.Next(0,2).Equals(0))
  12. {
  13. Console.BackgroundColor = ConsoleColor.Blue; //设置背景色
  14. Console.ForegroundColor = ConsoleColor.White; //设置前景色,即字体颜色
  15. Console.WriteLine(" Discard Message: {0}", message);
  16. // 否定确认
  17. channel.BasicReject(ea.DeliveryTag, false);
  18. }
  19. else
  20. {
  21. Console.BackgroundColor = ConsoleColor.Green;
  22. Console.ForegroundColor = ConsoleColor.DarkGreen;
  23. Console.WriteLine(" Return Message: {0}", message);
  24. // 重新排队
  25. channel.BasicReject(ea.DeliveryTag, true);
  26. }
  27. }
  28. catch (Exception ex)
  29. {
  30. Console.WriteLine("【Error】:", ex.Message);
  31. }
  32. };

截图没截到,后面反正又重新排列到队列中去了。重新排队的消息可能立即准备好重新发送,具体取决于它们在队列中的位置以及活动的使用者使用的通道使用的预取值。这意味着,如果所有使用者由于瞬态而无法处理交货而重新排队,则他们将创建一个重新排队/重新交货循环。就网络带宽和CPU资源而言,这样的循环可能代价很高。

BasicReject与BasicNack区别

消费者实现可以跟踪重新交付的次数并永久拒绝消息(丢弃消息),或在延迟后安排重新排队。BasicNack方法可以一次拒绝或重新排队多个消息。这就是与BasicReject不同的地方。

  1. consumer.Received += (model, ea) =>
  2. {
  3. try
  4. {
  5. var body = ea.Body.ToArray();
  6. var message = Encoding.UTF8.GetString(body);
  7. Console.BackgroundColor = ConsoleColor.Black;
  8. Console.ForegroundColor = ConsoleColor.White;
  9. Console.WriteLine("One Processing message: {0}", message);
  10. Thread.Sleep(1000);
  11. if (random.Next(0,2).Equals(0))
  12. {
  13. Console.BackgroundColor = ConsoleColor.Blue; //设置背景色
  14. Console.ForegroundColor = ConsoleColor.White; //设置前景色,即字体颜色
  15. Console.WriteLine(" Discard Message: {0}", message);
  16. // 否定确认,减少网络请求,提高性能
  17. channel.BasicNack(ea.DeliveryTag,multiple: true,requeue:false);
  18. }
  19. else
  20. {
  21. Console.BackgroundColor = ConsoleColor.Green;
  22. Console.ForegroundColor = ConsoleColor.DarkGreen;
  23. Console.WriteLine(" Return Message: {0}", message);
  24. // 重新排队,减少网络请求,提高性能
  25. channel.BasicNack(ea.DeliveryTag,multiple: true,requeue:true);
  26. }
  27. }
  28. catch (Exception ex)
  29. {
  30. Console.WriteLine("【Error】:", ex.Message);
  31. }
  32. };

当你忘记了BasicAck时,这是一个简单的错误,但是后果很严重。当您的客户端退出时,消息将被重新发送(看起来像是随机重新发送),但是RabbitMQ将消耗越来越多的内存,因为它将无法释放任何未确认的消息。
为了调试这种错误,您可以使用 rabbitmqctl 打印 messages_unacknowledged 字段:

持久化

问题

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是,如果RabbitMQ服务器停止,我们的任务仍然会丢失。

RabbitMQ退出或崩溃时,除非您告诉它,否则它将忘记队列和消息。确保消息不会丢失需要做两件事:我们需要将QueueMessage都标记为持久。

Queue 持久化

我们在声明队列的时候,将durable设置为true。这样就可以将队列持久化,但注意:已经声明的队列这样做是无效的,我们可以声明一个名字不同新的队列。

  1. channel.QueueDeclare(
  2. queue: "mytestqueue",
  3. durable: true,
  4. exclusive: false,
  5. autoDelete: false,
  6. arguments: null);

Message 持久化

在这一点上,我们确定即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将消息标记为持久性-通过将IBasicProperties.SetPersistent设置为true

  1. var body = Encoding.UTF8.GetBytes(message);
  2. // 持久化操作,告诉Rabbitmq服务器将消息存储在磁盘上,但仍然有
  3. // 少部分是存储在缓存中的,所以仍然不能绝对保证
  4. var properties = channel.CreateBasicProperties();
  5. properties.Persistent = true;
  6. // 发布一个消息
  7. channel.BasicPublish(
  8. exchange: string.Empty,
  9. routingKey: "mytestqueuetwo",
  10. basicProperties: properties,
  11. body: body
  12. );

消息分配合理化

当每条消息在A消费端都需要处理很长的时间,才能确认消息被处理的时候。而B消费端却在玩的时候,仍然平均去分发消息,就会显得很不合理。

为了更改此行为,我们可以将BasicQos方法与prefetchCount = 1设置一起使用。这告诉RabbitMQ一次不要给工人一个以上的消息。换句话说,在处理并确认上一条消息之前,不要将新消息发送给工作人员。而是将其分派给不忙的下一个工作程序。

  1. channel.BasicQos(0, 1, false);

注意:如果所有工作人员都忙,则您的队列可以填满。您将需要关注这一点,并可能会增加更多的工作人员,或者有其他一些策略。

创建Queue添加消息的代码如下

  1. var factory = new ConnectionFactory()
  2. {
  3. HostName = "47.98.187.188",
  4. UserName = "bob",
  5. Password = "bob"
  6. };
  7. // 创建一个链接
  8. using (var connection = factory.CreateConnection())
  9. {
  10. // 创建一个通道
  11. using (var channel = connection.CreateModel())
  12. {
  13. // 在这里还应该声明一个交换机
  14. // 声明一个队列
  15. channel.QueueDeclare(
  16. queue: "mytestqueuetwo",
  17. durable: true,
  18. exclusive: false,
  19. autoDelete: false,
  20. arguments: null);
  21. for (int i = 0; i < 30; i++)
  22. {
  23. // 创建一个消息
  24. string message = $"({i}) Hello World";
  25. // 编码一个消息
  26. var body = Encoding.UTF8.GetBytes(message);
  27. // 持久化操作,告诉Rabbitmq服务器将消息存储在磁盘上,但仍然有
  28. // 少部分是存储在缓存中的,所以仍然不能绝对保证
  29. var properties = channel.CreateBasicProperties();
  30. properties.Persistent = true;
  31. // 发布一个消息
  32. channel.BasicPublish(
  33. exchange: string.Empty,
  34. routingKey: "mytestqueuetwo",
  35. basicProperties: properties,
  36. body: body
  37. );
  38. }
  39. Console.WriteLine("Finish");
  40. Console.ReadLine();
  41. }
  42. }

消费端代码

  1. var factory = new ConnectionFactory()
  2. {
  3. HostName = "47.98.187.188",
  4. UserName = "bob",
  5. Password = "bob",
  6. VirtualHost = "/"
  7. };
  8. // 获取处理时常(第一个耗时为10s,第二个耗时为2s)
  9. var sleeptime = int.Parse(args[0]);
  10. // 创建一个链接
  11. using (var connection = factory.CreateConnection())
  12. {
  13. // 创建一个通道
  14. using (var channel = connection.CreateModel())
  15. {
  16. // one by one 处理
  17. channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
  18. // 创建消费实例
  19. var consumer = new EventingBasicConsumer(channel);
  20. // 事件在交付到使用者时触发。(消费处理事件)
  21. consumer.Received += (model, ea) =>
  22. {
  23. try
  24. {
  25. int sj = sleeptime;
  26. var body = ea.Body.ToArray();
  27. var message = Encoding.UTF8.GetString(body);
  28. Console.BackgroundColor = ConsoleColor.Black;
  29. Console.ForegroundColor = ConsoleColor.White;
  30. Console.WriteLine("One Processing message: {0}", message);
  31. Thread.Sleep(sj*1000);
  32. if (random.Next(0,2).Equals(0))
  33. {
  34. Console.BackgroundColor = ConsoleColor.Blue; //设置背景色
  35. Console.ForegroundColor = ConsoleColor.White; //设置前景色,即字体颜色
  36. Console.WriteLine(" Discard Message: {0}", message);
  37. // 否定确认,减少网络请求,提高性能
  38. channel.BasicNack(ea.DeliveryTag,multiple: true,requeue:false);
  39. }
  40. else
  41. {
  42. Console.BackgroundColor = ConsoleColor.Green;
  43. Console.ForegroundColor = ConsoleColor.DarkGreen;
  44. Console.WriteLine(" Return Message: {0}", message);
  45. // 重新排队,减少网络请求,提高性能
  46. channel.BasicNack(ea.DeliveryTag,multiple: true,requeue:true);
  47. }
  48. }
  49. catch (Exception ex)
  50. {
  51. Console.WriteLine("【Error】:", ex.Message);
  52. }
  53. };
  54. // 绑定到队列中去
  55. channel.BasicConsume(queue: "mytestqueuetwo", autoAck: false, consumer: consumer);
  56. Console.ReadLine();
  57. }
  58. }

绑定交换机

在Rabbitmq中,在一个交换机上可以绑定多个队列,如下图所示

在生产的时候我们就可以通过QueueBind将队列绑定到交换机上。

  1. var factory = new ConnectionFactory()
  2. {
  3. HostName = "47.98.187.188",
  4. UserName = "bob",
  5. Password = "bob"
  6. };
  7. // 创建一个链接
  8. using (var connection = factory.CreateConnection())
  9. {
  10. // 创建一个通道
  11. using (var channel = connection.CreateModel())
  12. {
  13. // 创建交换机 普通类型
  14. channel.ExchangeDeclare("MyExchangeName", ExchangeType.Direct);
  15. // 声明一个队列
  16. channel.QueueDeclare(
  17. queue: "mytestqueuetwo",
  18. durable: true,
  19. exclusive: false,
  20. autoDelete: false,
  21. arguments: null);
  22. // 将队列与交换机绑定在一起
  23. channel.QueueBind("mytestqueuetwo", "MyExchangeName", "mytestqueuetwo", null);
  24. Console.WriteLine("Finish");
  25. Console.ReadLine();
  26. }
  27. }




欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739

评价

net core 使用 EF Code First

下面这些内容很老了看这篇:https://www.tnblog.net/aojiancc2/article/details/5365 项目使用多层,把数据库访问...

.net mvc分部页,.net core分部页

.net分部页的三种方式第一种:@Html.Partial(&quot;_分部页&quot;)第二种:@{ Html.RenderPartial(&quot;分部页&quot;);}...

StackExchange.Redis操作redis(net core支持)

官方git开源地址https://github.com/StackExchange/StackExchange.Redis官方文档在docs里边都是官方的文档通过nuget命令下...

.net core 使用session

tip:net core 2.2后可以直接启用session了,不用在自己添加一次session依赖,本身就添加了使用nuget添加引用Microsoft.AspN...

通俗易懂,什么是.net?什么是.net Framework?什么是.net core?

朋友圈@蓝羽 看到一篇文章写的太详细太通俗了,搬过来细细看完,保证你对.NET有个新的认识理解原文地址:https://www.cnblo...

asp.net core2.0 依赖注入 AddTransient与AddScoped的区别

asp.net core主要提供了三种依赖注入的方式其中AddTransient与AddSingleton比较好区别AddTransient瞬时模式:每次都获取一...

.net core 使用 Kestrel

Kestrel介绍 Kestrel是一个基于libuv的跨平台web服务器 在.net core项目中就可以不一定要发布在iis下面了Kestrel体验可以使...

net core中使用cookie

net core中可以使用传统的cookie也可以使用加密的cookieNET CORE中使用传统cookie设置:HttpContext.Response.Cookies.Appe...

net core项目结构简单分析

一:wwwrootwwwroot用于存放网站的静态资源,例如css,js,图片与相关的前端插件等lib主要是第三方的插件,例如微软默认引用...

net core使用EF之DB First

一.新建一个.net core的MVC项目新建好项目后,不能像以前一样直接在新建项中添加ef了,需要用命令在添加ef的依赖二.使用Nug...

.net core使用requestresponse下载文件下载excel等

使用request获取内容net core中request没有直接的索引方法,需要点里边的Query,或者formstringbase64=Request.Form[&quot;f...

iframe自适应高度与配合net core使用

去掉iframe边框frameborder=&quot;0&quot;去掉滚动条scrolling=&quot;no&quot;iframe 自适应高度如果内容是固定的,那么就...

net core启动报错Unable to configure HTTPS endpoint. No server certificate was specified

这是因为net core2.1默认使用的https,如果使用Kestrel web服务器的话没有安装证书就会报这个错其实仔细看他的错误提示,其...

net core中使用url编码与解码操作

net core中暂时还没有以前asp.net与mvc中的server对象。获取url的编码与解码操作不能使用以前的server对象来获取。使用的是...

下载net core

官方下载地址:https://dotnet.microsoft.com/download 进来之后就可以看到最新的下载版本可以直接点击下载,也可以下载其...
这一世以无限游戏为使命!
排名
2
文章
635
粉丝
44
评论
93
docker中Sware集群与service
尘叶心繁 : 想学呀!我教你呀
一个bug让程序员走上法庭 索赔金额达400亿日元
叼着奶瓶逛酒吧 : 所以说做程序员也要懂点法律知识
.net core 塑形资源
剑轩 : 收藏收藏
映射AutoMapper
剑轩 : 好是好,这个对效率影响大不大哇,效率高不高
ASP.NET Core 服务注册生命周期
剑轩 : http://www.tnblog.net/aojiancc2/article/details/167
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:50010702506256
欢迎加群交流技术