tnblog
首页
视频
资源
登录

.netcore3.1 RabbitMq Overflow,Dead Letter Exchange,Dead Letter Routing,Max priority

6491人阅读 2021/2/1 13:41 总访问:3467029 评论:1 收藏:0 手机
分类: RabbitMq

.netcore3.1 RabbitMq Overflow,Dead Letter Exchange,Dead Letter Routing,Max priority

Overflow

从字面意思来讲就是溢出,它将结合上一个章节的max-lengthmax-length-bytes配合使用。它的意思是当队列所发布的消息超出max-length的数量时或消息的总大小超出max-length-bytes的大小时,所溢出的消息应该做什么处理?这个参数有两个Value如下图所示:

x-overflow 描述
drop-head(默认) 表示多余的消息直接丢弃。在代码中不填写即可,否则报错!
reject-publish 表示将暂时不让它进入队列中来,当有新的消息被消费时,仍然会重新排列到队列中去

举个栗子

我这里声明队列中的最大数量为2,将x-overflow设置为reject-publish,并发布8条消息。代码如下:

  1. var factory = new ConnectionFactory()
  2. {
  3. HostName = "47.98.187.188",
  4. UserName = "bob",
  5. Password = "bob"
  6. };
  7. // 创建一个链接
  8. using (var connection = factory.CreateConnection())
  9. {
  10. // 创建一个通道
  11. using (var channel = connection.CreateModel())
  12. {
  13. // 声明一个只能为2条消息的队列,并将
  14. channel.QueueDeclare(
  15. queue: "Max_Length_Overflow_Queue",
  16. durable: true,
  17. exclusive: false,
  18. autoDelete: false,
  19. arguments: new Dictionary<string,object>{
  20. { "x-max-length",2 },
  21. { "x-overflow","reject-publish" }
  22. });
  23. var consumer = new EventingBasicConsumer(channel);
  24. // 绑定处理事件
  25. channel.BasicConsume(queue: "Max_Length_Overflow_Queue", autoAck: false, consumer: consumer);
  26. // 处理事件
  27. consumer.Received += (model, ea) =>
  28. {
  29. // 获取消息
  30. var body = ea.Body.ToArray();
  31. var props = ea.BasicProperties;
  32. try
  33. {
  34. // 人事做离职处理
  35. var message = Encoding.UTF8.GetString(body);
  36. Console.WriteLine("Tag:{0} Handle Message:{1}",ea.DeliveryTag, message);
  37. }
  38. catch (Exception e)
  39. {
  40. Console.WriteLine("Have Error:" + e.Message);
  41. }
  42. finally
  43. {
  44. channel.BasicAck(deliveryTag: ea.DeliveryTag,
  45. multiple: true);
  46. }
  47. };
  48. // 发布8条消息
  49. for (int i = 0; i < 8; i++)
  50. {
  51. var msg = Encoding.UTF8.GetBytes("Good Morning!");
  52. channel.BasicPublish(string.Empty, "Max_Length_Overflow_Queue", null, msg);
  53. }
  54. Console.WriteLine(" Press [enter] to exit.");
  55. Console.ReadLine();
  56. }
  57. }

我们发现消息,并没有被丢弃而是溢出的消息等待正准备被消费的消息被消费后,依次进入到消息队列中,然后再次被消费…

死信(Dead Letter Exchange|Dead Letter Routing)

死信的概念

队列中的消息可以按字母顺序排列,当发生以下任何事件时,重新发布到交换机中:

消费者使用basic.rejectbasic.nackrequeue参数设置为false来否定该消息。
该消息由于每个消息TTL而到期
邮件被删除,因为其队列超过了长度限制

死信举例

我们以下面这张图做解释,当有11个人发起要吃饭的请求,但是高级A餐厅只能容纳10人吃饭,最后那一个人将会到B餐厅吃饭。我们可以将餐厅比作队列两家餐厅不同的地点表示交换机

相关参数

参数 参数描述
x-dead-letter-exchange 消息死了后会到哪个交换机上
x-dead-letter-routing-key 会到哪个队列上

通过Demo实现举例

发布死信队列(A餐厅)。

  1. var factory = new ConnectionFactory()
  2. {
  3. HostName = "47.98.187.188",
  4. UserName = "bob",
  5. Password = "bob"
  6. };
  7. // 创建一个链接
  8. using (var connection = factory.CreateConnection())
  9. {
  10. // 创建一个通道
  11. using (var channel = connection.CreateModel())
  12. {
  13. // 声明一个A餐厅的对立(位置就默认的老地方就可以了,我嫌麻烦)
  14. channel.QueueDeclare(
  15. queue: "A_Centeen_Queue",
  16. durable: false,
  17. exclusive: false,
  18. autoDelete: false,
  19. arguments: new Dictionary<string,object>{
  20. { "x-max-length",10 },
  21. { "x-dead-letter-exchange","B_Exchange" },
  22. { "x-dead-letter-routing-key","B_Centeen_Queue" },
  23. });
  24. // 发布11个人去吃饭的消息
  25. for (int i = 0; i < 12; i++)
  26. {
  27. var msg = Encoding.UTF8.GetBytes($"Person {i}");
  28. channel.BasicPublish(string.Empty, "A_Centeen_Queue", null, msg);
  29. }
  30. Console.WriteLine(" Press [enter] to exit.");
  31. Console.ReadLine();
  32. }
  33. }

再发布B餐厅交换机与队列

  1. var factory = new ConnectionFactory()
  2. {
  3. HostName = "47.98.187.188",
  4. UserName = "bob",
  5. Password = "bob"
  6. };
  7. // 创建一个链接
  8. using (var connection = factory.CreateConnection())
  9. {
  10. // 创建一个通道
  11. using (var channel = connection.CreateModel())
  12. {
  13. // 声明一个B餐厅位置的交换机
  14. channel.ExchangeDeclare("B_Exchange", ExchangeType.Direct, true, false, null);
  15. // 声明一个B餐厅
  16. channel.QueueDeclare(
  17. queue: "B_Centeen_Queue",
  18. durable: true,
  19. exclusive: false,
  20. autoDelete: false,
  21. arguments: null);
  22. // 绑定B餐厅
  23. channel.QueueBind("B_Centeen_Queue", "B_Exchange", "B_Centeen_Queue", null);
  24. // 我们这里只需要获取去B餐厅的那个人就可以了
  25. var consumer = new EventingBasicConsumer(channel);
  26. // 绑定处理事件
  27. channel.BasicConsume(queue: "B_Centeen_Queue", autoAck: false, consumer: consumer);
  28. // B餐厅消费
  29. consumer.Received += (model, ea) =>
  30. {
  31. var body = ea.Body.ToArray();
  32. var props = ea.BasicProperties;
  33. try
  34. {
  35. var message = Encoding.UTF8.GetString(body);
  36. Console.WriteLine("B Centeen Consumer Message: {0}", message);
  37. }
  38. catch (Exception e)
  39. {
  40. Console.WriteLine("Have Error:" + e.Message);
  41. }
  42. finally
  43. {
  44. channel.BasicAck(deliveryTag: ea.DeliveryTag,
  45. multiple: false);
  46. }
  47. };
  48. Console.WriteLine(" Press [enter] to exit.");
  49. Console.ReadLine();
  50. }
  51. }

运行测试一下。

Max Priority(优先级)

简单来说0-10,那么10是最大,我们就可以按照1-10来设立优先级,当优先级为10时,我就先让它处理该消息。

参数 描述
x-max-priority(默认值为0) 表示声明的优先级范围,一般在0-255之间。但由于里面是树形接口希望越小越好。

Max Priority Demo

示例代码如下:

  1. var factory = new ConnectionFactory()
  2. {
  3. HostName = "47.98.187.188",
  4. UserName = "bob",
  5. Password = "bob"
  6. };
  7. // 创建一个链接
  8. using (var connection = factory.CreateConnection())
  9. {
  10. // 创建一个通道
  11. using (var channel = connection.CreateModel())
  12. {
  13. // 声明一个Priority_Queue队列
  14. channel.QueueDeclare(
  15. queue: "Priority_Queue",
  16. durable: true,
  17. exclusive: false,
  18. autoDelete: false,
  19. arguments: new Dictionary<string,object>{
  20. // 设置优先级 0-10
  21. { "x-max-priority",10 },
  22. });
  23. // 发布Priority_Queue队列中的消息
  24. for (int i = 0; i < 10; i++)
  25. {
  26. var msg = Encoding.UTF8.GetBytes($"Message {i} ,Priority: 1");
  27. var cbp = channel.CreateBasicProperties();
  28. // 优先级都为1
  29. cbp.Priority = 1;
  30. channel.BasicPublish(string.Empty, "Priority_Queue", cbp, msg);
  31. }
  32. var cbphigh = channel.CreateBasicProperties();
  33. cbphigh.Priority = 10;
  34. // 发布一个高优先级消息
  35. channel.BasicPublish(string.Empty, "Priority_Queue", cbphigh, Encoding.UTF8.GetBytes($"High Priority Message,Priority: 10"));
  36. Console.WriteLine(" Press [enter] to exit.");
  37. Console.ReadLine();
  38. }
  39. }


欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739

评价

net core 使用 EF Code First

下面这些内容很老了看这篇:https://www.tnblog.net/aojiancc2/article/details/5365 项目使用多层,把数据库访问...

.net mvc分部页,.net core分部页

.net分部页的三种方式第一种:@Html.Partial(&quot;_分部页&quot;)第二种:@{ Html.RenderPartial(&quot;分部页&quot;);}...

StackExchange.Redis操作redis(net core支持)

官方git开源地址https://github.com/StackExchange/StackExchange.Redis官方文档在docs里边都是官方的文档通过nuget命令下...

.net core 使用session

tip:net core 2.2后可以直接启用session了,不用在自己添加一次session依赖,本身就添加了使用nuget添加引用Microsoft.AspN...

通俗易懂,什么是.net?什么是.net Framework?什么是.net core?

朋友圈@蓝羽 看到一篇文章写的太详细太通俗了,搬过来细细看完,保证你对.NET有个新的认识理解原文地址:https://www.cnblo...

asp.net core2.0 依赖注入 AddTransient与AddScoped的区别

asp.net core主要提供了三种依赖注入的方式其中AddTransient与AddSingleton比较好区别AddTransient瞬时模式:每次都获取一...

.net core 使用 Kestrel

Kestrel介绍 Kestrel是一个基于libuv的跨平台web服务器 在.net core项目中就可以不一定要发布在iis下面了Kestrel体验可以使...

net core中使用cookie

net core中可以使用传统的cookie也可以使用加密的cookieNET CORE中使用传统cookie设置:HttpContext.Response.Cookies.Appe...

net core项目结构简单分析

一:wwwrootwwwroot用于存放网站的静态资源,例如css,js,图片与相关的前端插件等lib主要是第三方的插件,例如微软默认引用...

net core使用EF之DB First

一.新建一个.net core的MVC项目新建好项目后,不能像以前一样直接在新建项中添加ef了,需要用命令在添加ef的依赖二.使用Nug...

.net core使用requestresponse下载文件下载excel等

使用request获取内容net core中request没有直接的索引方法,需要点里边的Query,或者formstringbase64=Request.Form[&quot;f...

iframe自适应高度与配合net core使用

去掉iframe边框frameborder=&quot;0&quot;去掉滚动条scrolling=&quot;no&quot;iframe 自适应高度如果内容是固定的,那么就...

net core启动报错Unable to configure HTTPS endpoint. No server certificate was specified

这是因为net core2.1默认使用的https,如果使用Kestrel web服务器的话没有安装证书就会报这个错其实仔细看他的错误提示,其...

net core中使用url编码与解码操作

net core中暂时还没有以前asp.net与mvc中的server对象。获取url的编码与解码操作不能使用以前的server对象来获取。使用的是...

下载net core

官方下载地址:https://dotnet.microsoft.com/download 进来之后就可以看到最新的下载版本可以直接点击下载,也可以下载其...

net core使用依赖注入来装载EF的上下文对象

妹子情人节快乐~.net core中用了不少的依赖注入,官方文档中也推荐使用。这样使用依赖注入来管理ef对象,还是比较科学,比如...
这一世以无限游戏为使命!
排名
2
文章
634
粉丝
44
评论
93
docker中Sware集群与service
尘叶心繁 : 想学呀!我教你呀
一个bug让程序员走上法庭 索赔金额达400亿日元
叼着奶瓶逛酒吧 : 所以说做程序员也要懂点法律知识
.net core 塑形资源
剑轩 : 收藏收藏
映射AutoMapper
剑轩 : 好是好,这个对效率影响大不大哇,效率高不高
ASP.NET Core 服务注册生命周期
剑轩 : http://www.tnblog.net/aojiancc2/article/details/167
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:50010702506256
欢迎加群交流技术