菜的像徐坤
排名
7
文章
192
粉丝
15
评论
16
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:50010702506256
欢迎加群交流技术

net core 使用 rabbitmq

1775人阅读 2024/3/20 17:46 总访问:961077 评论:0 收藏:0 手机
分类: .net core

都说rabbitmq可以实现异步处理,流量削峰,所有的概念都了解了一遍,但是真正使用的时候难以下手,
那么这篇博客可能会对你有帮助


如何安装rabbitmq 这里就不介绍了。网上一大堆,或者看我之前博客也能够安装。


下边直接上干货


首先,先把基本环境搭建起来

  1. //这里是配置的MQ 的链接,将这里替换成自己的MQ
  2. var connectionFactory = new ConnectionFactory()
  3. {
  4.     HostName = GlobalContext.SystemConfig.RabbitMQConnection.HostName,
  5.     UserName = GlobalContext.SystemConfig.RabbitMQConnection.UserName,
  6.     Password = GlobalContext.SystemConfig.RabbitMQConnection.Password,
  7.     Port = GlobalContext.SystemConfig.RabbitMQConnection.Port
  8. };
  9.         
  10. services.AddSingleton<ConnectionFactory>(connectionFactory);
  11. //创建链接
  12. var connection = connectionFactory.CreateConnection();
  13. services.AddSingleton<IConnection>(connection);
  14. var channel = connection.CreateModel();
  15. //注册交换机
  16. channel.AddMQExchange();
  17. //注册队列
  18. channel.AddMQQueue();
  19. //队列绑定到交换机
  20. channel.BindToExchange();
  21. services.AddSingleton<IModel>(channel);
  22. //注册消费者服务
  23. //这个报错先注释调下边一行,等下边创建消费者之后再在这里注册消费者
  24. services.AddHostedService<OrderConsumerService>();
  25. Console.WriteLine($"RabbitMQ环境准备成功,当前RabbitMQ服务器为{GlobalContext.SystemConfig.RabbitMQConnection.HostName}");
  26.  public static class ExchangeService
  27.  {
  28.      /// <summary>
  29.      ///  声明交换机
  30.      /// </summary>
  31.      /// <param name="channel"></param>
  32.      public static void AddMQExchange(this IModel channel)
  33.      {
  34.          //添加订单交换机
  35.          channel.ExchangeDeclare("order_exchange""direct");
  36.      } 
  37.  }
  38.  public static class QueueService
  39.  {
  40.      /// <summary>
  41.      /// 声明队列
  42.      /// </summary>
  43.      /// <param name="channel"></param>
  44.      public static void AddMQQueue(this IModel channel)
  45.      {
  46.          // 声明订单队列
  47.          channel.QueueDeclare(queue: "order_queue", durable: true, exclusive: false, autoDelete: false,
  48.              arguments: null);
  49.      }
  50.      /// <summary>
  51.      /// 队列绑定到交换机
  52.      /// </summary>
  53.      /// <param name="channel"></param>
  54.      public static void BindToExchange(this IModel channel)
  55.      {
  56.          channel.QueueBind("order_queue""order_exchange""place_order");
  57.      }
  58.  }


首先MQ 最重要的两个角色,消息生产者,消息消费者。应该如何用代码实现呢?


首先先理解生产与消费,生产就是说请求进来了,带了些什么参数,要干什么事。比如我下单了一件商品。


我这里写一个订单服务,这一步就将消息添加进入队列了。现在队列中就有消息了。这一步一般都是通过接口进来的。
因为要异步处理,接收与处理就分开了,这里就只管接收消息加入进队列

  1. //这是将消息添加进交换机,交换机会将消息发送到队列  
  2. // _channel 通过依赖注入获取,就是上边注册那个IModel
  3. public async Task<boolOrderJoinMQAsync(CreateOrderModel arg)
  4.     {
  5.         var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(arg)); 
  6.         _channel.BasicPublish(exchange: "order_exchange", routingKey: "place_order", basicProperties: null, body: body);  
  7.         return true;
  8.     }


有消息之后如何消费呢,现在去创建一个消费者服务

  1. public class OrderConsumerService : IHostedServiceIDisposable
  2. {
  3.     private readonly ILogger<OrderConsumerService> _logger;
  4.     private readonly IModel _channel;
  5.     private readonly EventingBasicConsumer _consumer;
  6.     private readonly IOrdersService _orderService = GlobalContext.ServiceProvider.GetService<IOrdersService>();
  7.     public OrderConsumerService(ILogger<OrderConsumerService> logger,  IModel channel)
  8.     {
  9.         _logger = logger;
  10.         _channel = channel;
  11.         _consumer = new EventingBasicConsumer(_channel);
  12.     }
  13.  
  14.     public void Dispose()
  15.     {
  16.         StopAsync(CancellationToken.None).Wait();
  17.     }
  18.     //消费消息    public async Task StartAsync(CancellationToken cancellationToken)
  19.     {
  20.         _consumer.Received += async (model, ea) =>
  21.         {
  22.             var body = ea.Body.ToArray();
  23.             var message = Encoding.UTF8.GetString(body);
  24.             // 扣库存,操作数据库
  25.             var result = await _orderService.OrderSubmitAsync(message.ToObject<CreateOrderModel>()); 
  26.             // 确认消息已经被成功处理
  27.             _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  28.         };
  29.         // 开始接收消息,参数为队列名、是否自动应答、消费者实例
  30.         _channel.BasicConsume(queue: "order_queue", autoAck: false, consumer: _consumer); 
  31.     }
  32.  
  33.     public Task StopAsync(CancellationToken cancellationToken)
  34.     {
  35.         return Task.CompletedTask;
  36.     }
  37. }


现在再将消费者注册到程序启动时,如果上边第一步的时候注释了,现在就可以加上了


  1. services.AddHostedService<OrderConsumerService>();


这样当程序启动时,将会自动启动消费者,当队列中添加进去消息后,消费者就会自动消费。
当然你可以灵活的配置路由键或者队列来让合适的消费者消费合适的消息。


这只是一个简单的例子,正常工作中肯定逻辑会比这个复杂的多,如何贴合业务去使用这项技术,就需要自己继续探索了。


评价

net core 使用 EF Code First

下面这些内容很老了看这篇:https://www.tnblog.net/aojiancc2/article/details/5365 项目使用多层,把数据库访问...

.net mvc分部页,.net core分部页

.net分部页的三种方式第一种:@Html.Partial(&quot;_分部页&quot;)第二种:@{ Html.RenderPartial(&quot;分部页&quot;);}...

StackExchange.Redis操作redis(net core支持)

官方git开源地址https://github.com/StackExchange/StackExchange.Redis官方文档在docs里边都是官方的文档通过nuget命令下...

.net core 使用session

tip:net core 2.2后可以直接启用session了,不用在自己添加一次session依赖,本身就添加了使用nuget添加引用Microsoft.AspN...

通俗易懂,什么是.net?什么是.net Framework?什么是.net core?

朋友圈@蓝羽 看到一篇文章写的太详细太通俗了,搬过来细细看完,保证你对.NET有个新的认识理解原文地址:https://www.cnblo...

asp.net core2.0 依赖注入 AddTransient与AddScoped的区别

asp.net core主要提供了三种依赖注入的方式其中AddTransient与AddSingleton比较好区别AddTransient瞬时模式:每次都获取一...

.net core 使用 Kestrel

Kestrel介绍 Kestrel是一个基于libuv的跨平台web服务器 在.net core项目中就可以不一定要发布在iis下面了Kestrel体验可以使...

net core使用cookie

net core中可以使用传统的cookie也可以使用加密的cookieNET CORE中使用传统cookie设置:HttpContext.Response.Cookies.Appe...

net core项目结构简单分析

一:wwwrootwwwroot用于存放网站的静态资源,例如css,js,图片与相关的前端插件等lib主要是第三方的插件,例如微软默认引用...

net core使用EF之DB First

一.新建一个.net core的MVC项目新建好项目后,不能像以前一样直接在新建项中添加ef了,需要用命令在添加ef的依赖二.使用Nug...

.net core使用requestresponse下载文件下载excel等

使用request获取内容net core中request没有直接的索引方法,需要点里边的Query,或者formstringbase64=Request.Form[&quot;f...

iframe自适应高度与配合net core使用

去掉iframe边框frameborder=&quot;0&quot;去掉滚动条scrolling=&quot;no&quot;iframe 自适应高度如果内容是固定的,那么就...

net core启动报错Unable to configure HTTPS endpoint. No server certificate was specified

这是因为net core2.1默认使用的https,如果使用Kestrel web服务器的话没有安装证书就会报这个错其实仔细看他的错误提示,其...

net core使用url编码与解码操作

net core中暂时还没有以前asp.net与mvc中的server对象。获取url的编码与解码操作不能使用以前的server对象来获取。使用的是...

下载net core

官方下载地址:https://dotnet.microsoft.com/download 进来之后就可以看到最新的下载版本可以直接点击下载,也可以下载其...

net core使用依赖注入来装载EF的上下文对象

妹子情人节快乐~.net core中用了不少的依赖注入,官方文档中也推荐使用。这样使用依赖注入来管理ef对象,还是比较科学,比如...