tnblog
首页
视频
资源
登录

.netcore3.1 RabbitMq Routing的应用

4292人阅读 2021/1/24 17:04 总访问:3470514 评论:0 收藏:0 手机
分类: RabbitMq

.netcore3.1 RabbitMq Routing的应用

在前面很多文章中都用到了routingKey参数但并没有细说它的用途。在这之前我可以为大家举个栗子。
假如你开了一家水果商店,现在到了过节了为了加大买卖量需要对部分水果做打折处理,比如你要把苹果香蕉橘子都打七折,而蓝莓还是原来都价格。大致如下图所示:

上图的大致对应的如下表格所示,如果还看不懂看下图。

名称 对应的
水果 routingKey
队列 很明显了
篮子 消费端

我相信大家都懂起了。下面开始写代码。

创建水果交换机,打折队列

安装依赖包

  1. dotnet add package RabbitMQ.Client --version 6.2.1

代码

  1. var factory = new ConnectionFactory()
  2. {
  3. HostName = "127.0.0.1",
  4. UserName = "bob",
  5. Password = "bob"
  6. };
  7. // 创建一个链接
  8. using (var connection = factory.CreateConnection())
  9. {
  10. // 创建一个通道
  11. using (var channel = connection.CreateModel())
  12. {
  13. // 创建水果交换机
  14. channel.ExchangeDeclare("FruitExchange", ExchangeType.Direct,true,false);
  15. // 声明一个七折队列
  16. channel.QueueDeclare(
  17. queue: "SevenQueue",
  18. durable: true,
  19. exclusive: false,
  20. autoDelete: false,
  21. arguments: null);
  22. // 将七折队列与水果交换机绑定在一起
  23. // 添加 苹果,香蕉,橘子
  24. string[] fruits = new string[] { "apple", "banana", "orange" };
  25. for (var i = 0; i < fruits.Length; i++)
  26. {
  27. channel.QueueBind("SevenQueue", "FruitExchange", fruits[i], null);
  28. }
  29. // 声明一个原价队列
  30. channel.QueueDeclare(
  31. queue: "OriginalQueue",
  32. durable: true,
  33. exclusive: false,
  34. autoDelete: false,
  35. arguments: null);
  36. // 将原价队列与水果交换机绑定在一起
  37. // 添加 蓝莓
  38. string[] originalfruits = new string[] { "Blueberry" };
  39. for (var i = 0; i < originalfruits.Length; i++)
  40. {
  41. channel.QueueBind("OriginalQueue", "FruitExchange", originalfruits[i], null);
  42. }
  43. Console.WriteLine("Finish");
  44. Console.ReadLine();
  45. }
  46. }

我们在UI的交换机中查看routingKey绑定到队列的情况。

创建篮子(消费端)并进行消费

命令创建消费篮子

  1. dotnet new console -n BasketConsumer
  2. cd BasketConsumer
  3. dotnet add package RabbitMQ.Client --version 6.2.1
  4. dotnet restore

消费代码

  1. var factory = new ConnectionFactory()
  2. {
  3. HostName = "127.0.0.1",
  4. UserName = "bob",
  5. Password = "bob"
  6. };
  7. // 获取篮子名称
  8. var _BasketName = args[0];
  9. // 创建一个链接
  10. using (var connection = factory.CreateConnection())
  11. {
  12. // 创建一个通道
  13. using (var channel = connection.CreateModel())
  14. {
  15. // one by one 处理
  16. channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
  17. // 创建消费实例
  18. var consumer = new EventingBasicConsumer(channel);
  19. // 事件在交付到使用者时触发。(消费处理事件)
  20. consumer.Received += (model, ea) =>
  21. {
  22. try
  23. {
  24. string BasketName = _BasketName;
  25. var body = ea.Body.ToArray();
  26. var message = Encoding.UTF8.GetString(body);
  27. Thread.Sleep(1000);
  28. if (BasketName.ToLower() == "original")
  29. {
  30. Console.BackgroundColor = ConsoleColor.Blue; //设置背景色
  31. Console.ForegroundColor = ConsoleColor.White; //设置前景色,即字体颜色
  32. Console.WriteLine(" Original : {0}", message);
  33. // 手动确认
  34. channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  35. }
  36. else
  37. {
  38. Console.BackgroundColor = ConsoleColor.Green;
  39. Console.ForegroundColor = ConsoleColor.DarkGreen;
  40. Console.WriteLine(" Seven : {0}", message);
  41. // 手动确认
  42. channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  43. }
  44. }
  45. catch (Exception ex)
  46. {
  47. Console.WriteLine("【Error】:", ex.Message);
  48. }
  49. };
  50. // 通过判断绑定到不同队列中去
  51. if (_BasketName.ToLower() == "original")
  52. {
  53. channel.BasicConsume(queue: "OriginalQueue", autoAck: false, consumer: consumer);
  54. }
  55. else
  56. {
  57. channel.BasicConsume(queue: "SevenQueue", autoAck: false, consumer: consumer);
  58. }
  59. Console.ReadLine();
  60. }
  61. }

运行消费端

在不同端窗口中运行该命令

  1. dotnet run seven
  2. dotnet run original

添加消息

修改部分创建交换机,队列端代码进行添加消息

  1. var factory = new ConnectionFactory()
  2. {
  3. HostName = "127.0.0.1",
  4. UserName = "bob",
  5. Password = "bob"
  6. };
  7. // 创建一个链接
  8. using (var connection = factory.CreateConnection())
  9. {
  10. // 创建一个通道
  11. using (var channel = connection.CreateModel())
  12. {
  13. // 创建水果交换机
  14. channel.ExchangeDeclare("FruitExchange", ExchangeType.Direct,true,false);
  15. // 声明一个七折队列
  16. channel.QueueDeclare(
  17. queue: "SevenQueue",
  18. durable: true,
  19. exclusive: false,
  20. autoDelete: false,
  21. arguments: null);
  22. // 将七折队列与水果交换机绑定在一起
  23. // 添加 苹果,香蕉,橘子
  24. string[] fruits = new string[] { "apple", "banana", "orange" };
  25. for (var i = 0; i < fruits.Length; i++)
  26. {
  27. channel.QueueBind("SevenQueue", "FruitExchange", fruits[i], null);
  28. }
  29. // 持久化操作,告诉Rabbitmq服务器将消息存储在磁盘上,但仍然有
  30. // 少部分是存储在缓存中的,所以仍然不能绝对保证
  31. var properties = channel.CreateBasicProperties();
  32. properties.Persistent = true;
  33. // 发布一个苹果,香蕉,与橘子
  34. channel.BasicPublish(
  35. exchange: "FruitExchange",
  36. routingKey: "apple",
  37. basicProperties: properties,
  38. body: Encoding.UTF8.GetBytes("Apple")
  39. );
  40. channel.BasicPublish(
  41. exchange: "FruitExchange",
  42. routingKey: "banana",
  43. basicProperties: properties,
  44. body: Encoding.UTF8.GetBytes("Banana")
  45. );
  46. channel.BasicPublish(
  47. exchange: "FruitExchange",
  48. routingKey: "orange",
  49. basicProperties: properties,
  50. body: Encoding.UTF8.GetBytes("Orange")
  51. );
  52. // 声明一个原价队列
  53. channel.QueueDeclare(
  54. queue: "OriginalQueue",
  55. durable: true,
  56. exclusive: false,
  57. autoDelete: false,
  58. arguments: null);
  59. // 将原价队列与水果交换机绑定在一起
  60. // 添加 蓝莓
  61. string[] originalfruits = new string[] { "Blueberry" };
  62. for (var i = 0; i < originalfruits.Length; i++)
  63. {
  64. channel.QueueBind("OriginalQueue", "FruitExchange", originalfruits[i], null)
  65. }
  66. // 创建一个蓝莓水果到原价队列进行消费
  67. channel.BasicPublish(
  68. exchange: "FruitExchange",
  69. routingKey: "Blueberry",
  70. basicProperties: properties,
  71. body: Encoding.UTF8.GetBytes("Blueberry")
  72. );
  73. Console.WriteLine("Finish");
  74. Console.ReadLine();
  75. }
  76. }

测试运行


欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739

评价

序列化和反序列化的应用

序列化:var jsonstr= JsonConvert.SerializeObject(result);//将字符串序列化为JSON对象 反序列化: API api = new API();...

.net core 为选项数据添加验证:避免错误配置的应用接收用户流量

.net core 为选项数据添加验证:避免错误配置的应用接收用户流量[TOC] 三种验证方法 直接注册验证函数实现 IValidate...

spring boot配置文件的应用

springboot实例注意参数引用是用$

net core 使用 EF Code First

下面这些内容很老了看这篇:https://www.tnblog.net/aojiancc2/article/details/5365 项目使用多层,把数据库访问...

.net mvc分部页,.net core分部页

.net分部页的三种方式第一种:@Html.Partial(&quot;_分部页&quot;)第二种:@{ Html.RenderPartial(&quot;分部页&quot;);}...

StackExchange.Redis操作redis(net core支持)

官方git开源地址https://github.com/StackExchange/StackExchange.Redis官方文档在docs里边都是官方的文档通过nuget命令下...

.net core 使用session

tip:net core 2.2后可以直接启用session了,不用在自己添加一次session依赖,本身就添加了使用nuget添加引用Microsoft.AspN...

通俗易懂,什么是.net?什么是.net Framework?什么是.net core?

朋友圈@蓝羽 看到一篇文章写的太详细太通俗了,搬过来细细看完,保证你对.NET有个新的认识理解原文地址:https://www.cnblo...

asp.net core2.0 依赖注入 AddTransient与AddScoped的区别

asp.net core主要提供了三种依赖注入的方式其中AddTransient与AddSingleton比较好区别AddTransient瞬时模式:每次都获取一...

.net core 使用 Kestrel

Kestrel介绍 Kestrel是一个基于libuv的跨平台web服务器 在.net core项目中就可以不一定要发布在iis下面了Kestrel体验可以使...

net core中使用cookie

net core中可以使用传统的cookie也可以使用加密的cookieNET CORE中使用传统cookie设置:HttpContext.Response.Cookies.Appe...

net core项目结构简单分析

一:wwwrootwwwroot用于存放网站的静态资源,例如css,js,图片与相关的前端插件等lib主要是第三方的插件,例如微软默认引用...

net core使用EF之DB First

一.新建一个.net core的MVC项目新建好项目后,不能像以前一样直接在新建项中添加ef了,需要用命令在添加ef的依赖二.使用Nug...

.net core使用requestresponse下载文件下载excel等

使用request获取内容net core中request没有直接的索引方法,需要点里边的Query,或者formstringbase64=Request.Form[&quot;f...

iframe自适应高度与配合net core使用

去掉iframe边框frameborder=&quot;0&quot;去掉滚动条scrolling=&quot;no&quot;iframe 自适应高度如果内容是固定的,那么就...
这一世以无限游戏为使命!
排名
2
文章
635
粉丝
44
评论
93
docker中Sware集群与service
尘叶心繁 : 想学呀!我教你呀
一个bug让程序员走上法庭 索赔金额达400亿日元
叼着奶瓶逛酒吧 : 所以说做程序员也要懂点法律知识
.net core 塑形资源
剑轩 : 收藏收藏
映射AutoMapper
剑轩 : 好是好,这个对效率影响大不大哇,效率高不高
ASP.NET Core 服务注册生命周期
剑轩 : http://www.tnblog.net/aojiancc2/article/details/167
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:50010702506256
欢迎加群交流技术