tnblog
首页
视频
资源
登录

.NET6中使用RabbitMQ详尽指南

795人阅读 2025/1/2 11:52 总访问:3439 评论:0 收藏:0 手机
分类: .NET6
解释:
    RabbitMQ 是一个流行的开源消息队列系统,广泛用于实现异步通信、解耦组件、负载均衡等场景。在本篇博客中,我们将详细介绍如何在 .NET 6 中使用 RabbitMQ,包括生产者和消费者的实现,以及如何通过依赖注入来管理它们。
一、创建 .NET 6 应用

    首先,确保你已经安装了 .NET 6 SDK。可以使用命令行工具创建一个新的 .NET 控制台应用:

  1. dotnet new console -n RabbitMQDemo
  2. cd RabbitMQDemo
      接着,你需要安装 RabbitMQ 的客户端库,通过 NuGet 包管理器来安装 RabbitMQ.Client:
  1. dotnet add package RabbitMQ.Client;       //这里我们选择 6.4.0 版本
  2. dotnet add package Masuit.Tools.Core;     //这个为一个
二、配置 RabbitMQ
    接下来,我们需要定义连接 RabbitMQ 所需的配置选项。通常我们会将这些配置选项存储在一个类中。以下是配置类(RabbitMQServiceOptions.cs)的实现
  1. /// <summary>
  2. /// RabbitMQ服务配置
  3. /// </summary>
  4. public class RabbitMQServiceOptions
  5. {
  6.         /// <summary>
  7.         /// 服务地址
  8.         /// </summary>
  9.         public string Host { getset; }
  10.         /// <summary>
  11.         /// 端口
  12.         /// </summary>
  13.         public int Port { getset; }
  14.         /// <summary>
  15.         /// 用户名
  16.         /// </summary>
  17.         public string UserName { getset; }
  18.         /// <summary>
  19.         /// 密码
  20.         /// </summary>
  21.         public string Password { getset; }
  22. }
三、实现 RabbitMQ 连接工厂
    为确保我们能够高效且安全地建立 RabbitMQ 连接,我们将创建一个名为 RabbitMQContext 的连接工厂类
  1.     /// <summary>
  2.     /// RabbitMQ连接工厂
  3.     /// </summary>
  4.     public class RabbitMQContext
  5.     {
  6.         private static ConnectionFactory? factory;
  7.         private static readonly object lockObj = new();
  8.         /// <summary>
  9.         /// 获取单个RabbitMQ连接
  10.         /// </summary>
  11.         /// <returns></returns>
  12.         public static IConnection GetConnection(string hostName, int port, string userName, string password)
  13.         {
  14.             if (factory == null)
  15.             {
  16.                 lock (lockObj)
  17.                 {
  18.                     factory ??= new ConnectionFactory
  19.                     {
  20.                         HostName = hostName,
  21.                         Port = port,
  22.                         UserName = userName,
  23.                         Password = password
  24.                     };
  25.                 }
  26.             }
  27.             return factory.CreateConnection();
  28.         }
  29.     }
四、实现 RabbitMQ 生产者
    接下来我们来实现一个 RabbitMQ 生产者,用于发送消息到队列。创建一个名为 RabbitMQProducer 的类:
  1.  /// <summary>
  2.  /// RabbitMQ 客户端,用于发送消息到 RabbitMQ 队列或交换机。
  3.  /// </summary>
  4.  public class RabbitMQProducer
  5.  {
  6.      private readonly ILogger<RabbitMQProducer> _logger;
  7.      private readonly RabbitMQServiceOptions _options;
  8.      /// <summary>
  9.      /// 初始化 RabbitMQ 客户端。
  10.      /// </summary>
  11.      /// <param name="logger">日志记录器。</param>
  12.      /// <param name="options">RabbitMQ 连接配置。</param>
  13.      public RabbitMQProducer(ILogger<RabbitMQProducer> logger, RabbitMQServiceOptions options)
  14.      {
  15.          _logger = logger;
  16.          _options = options;
  17.      }
  18.      /****
  19.       * RabbitMQ 交换机类型说明:
  20.       * 1. Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
  21.       *    例如,如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发。
  22.       * 2. Fanout Exchange – 不处理路由键。只需将队列绑定到交换机上,发送到交换机的消息会被转发到所有绑定的队列。
  23.       *    类似于广播,所有绑定的队列都会收到消息。
  24.       * 3. Topic Exchange – 将路由键和某模式进行匹配。队列需要绑定到一个模式上。
  25.       *    符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
  26.       *    例如,“audit.#”能够匹配到“audit.irs.corporate”,但“audit.*”只会匹配到“audit.irs”。
  27.       ****/
  28.      /// <summary>
  29.      /// 发布消息(工作队列模式,适用于多消费者负载均衡)。
  30.      /// </summary>
  31.      /// <param name="queueName">队列名称。</param>
  32.      /// <param name="message">消息内容。</param>
  33.      public void WorkQueueSendMessage(string queueName, string message)
  34.      {
  35.          using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
  36.          using var channel = connection.CreateModel();
  37.          // 声明队列(如果不存在则创建),并设置为持久化
  38.          channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
  39.          // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
  40.          var properties = channel.CreateBasicProperties();
  41.          properties.Persistent = true;
  42.          // 发送消息到指定队列
  43.          var body = Encoding.UTF8.GetBytes(message);
  44.          channel.BasicPublish(exchange: string.Empty, routingKey: queueName, basicProperties: properties, body: body);
  45.      }
  46.      /// <summary>
  47.      /// 推送消息(简单模式,适用于单生产者和单消费者)。
  48.      /// </summary>
  49.      /// <param name="queueName">队列名称。</param>
  50.      /// <param name="message">消息内容。</param>
  51.      public void SimpleSendMessage(string queueName, string message)
  52.      {
  53.          using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
  54.          using var channel = connection.CreateModel();
  55.          // 声明队列(如果不存在则创建),并设置为持久化
  56.          channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
  57.          // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
  58.          var properties = channel.CreateBasicProperties();
  59.          properties.Persistent = true;
  60.          // 发送消息到指定队列
  61.          var body = Encoding.UTF8.GetBytes(message);
  62.          channel.BasicPublish(exchange: string.Empty, routingKey: queueName, mandatory: false, basicProperties: properties, body: body);
  63.      }
  64.      /// <summary>
  65.      /// 发布消息(Fanout Exchange 模式,适用于广播消息到所有绑定队列)。
  66.      /// </summary>
  67.      /// <param name="queueName">队列名称。</param>
  68.      /// <param name="message">消息内容。</param>
  69.      public void FanoutSendMessage(string queueName, string message)
  70.      {
  71.          using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
  72.          using var channel = connection.CreateModel();
  73.          // 创建 Fanout 类型的交换机
  74.          var exchangeName = $"{queueName}_fanout_exchange";
  75.          channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
  76.          // 声明队列(如果不存在则创建),并设置为持久化
  77.          channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
  78.          // 将队列绑定到交换机,routingKey 无需指定
  79.          channel.QueueBind(queueName, exchangeName, routingKey: string.Empty);
  80.          // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
  81.          var properties = channel.CreateBasicProperties();
  82.          properties.Persistent = true;
  83.          // 发送消息到交换机,所有绑定队列都会收到消息
  84.          var body = Encoding.UTF8.GetBytes(message);
  85.          channel.BasicPublish(exchangeName, routingKey: string.Empty, properties, body: body);
  86.      }
  87.      /// <summary>
  88.      /// 发布消息(Fanout Exchange 模式,适用于广播消息到所有绑定队列)。
  89.      /// </summary>
  90.      /// <param name="exchangeName">交换机名称。</param>
  91.      /// <param name="queueName">队列名称。</param>
  92.      /// <param name="message">消息内容。</param>
  93.      public void FanoutSendMessage(string exchangeName, string queueName, string message)
  94.      {
  95.          using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
  96.          using var channel = connection.CreateModel();
  97.          // 创建 Fanout 类型的交换机
  98.          channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
  99.          // 声明队列(如果不存在则创建),并设置为持久化
  100.          channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
  101.          // 将队列绑定到交换机,routingKey 无需指定
  102.          channel.QueueBind(queueName, exchangeName, routingKey: string.Empty);
  103.          // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
  104.          var properties = channel.CreateBasicProperties();
  105.          properties.Persistent = true;
  106.          // 发送消息到交换机,所有绑定队列都会收到消息
  107.          var body = Encoding.UTF8.GetBytes(message);
  108.          channel.BasicPublish(exchangeName, routingKey: string.Empty, properties, body: body);
  109.      }
  110.      /// <summary>
  111.      /// 发布消息(Direct Exchange 模式,适用于路由键完全匹配的消息分发)。
  112.      /// </summary>
  113.      /// <param name="queueName">队列名称。</param>
  114.      /// <param name="message">消息内容。</param>
  115.      public void DirectSendMessage(string queueName, string message)
  116.      {
  117.          using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
  118.          using var channel = connection.CreateModel();
  119.          // 创建 Direct 类型的交换机
  120.          var exchangeName = $"{queueName}_direct_exchange";
  121.          channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
  122.          // 声明队列(如果不存在则创建),并设置为持久化
  123.          channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
  124.          // 将队列绑定到交换机,并指定路由键
  125.          var routingKey = $"{queueName}";
  126.          channel.QueueBind(queueName, exchangeName, routingKey);
  127.          // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
  128.          var properties = channel.CreateBasicProperties();
  129.          properties.Persistent = true;
  130.          // 发送消息到交换机,只有路由键完全匹配的队列才会收到消息
  131.          var body = Encoding.UTF8.GetBytes(message);
  132.          channel.BasicPublish(exchangeName, routingKey, properties, body: body);
  133.      }
  134.      /// <summary>
  135.      /// 发布消息(Direct Exchange 模式,适用于路由键完全匹配的消息分发)。
  136.      /// </summary>
  137.      /// <param name="exchangeName">交换机名称。</param>
  138.      /// <param name="queueName">队列名称。</param>
  139.      /// <param name="routingKey">路由键。</param>
  140.      /// <param name="message">消息内容。</param>
  141.      public void DirectSendMessage(string exchangeName, string queueName, string routingKey, string message)
  142.      {
  143.          using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
  144.          using var channel = connection.CreateModel();
  145.          // 创建 Direct 类型的交换机
  146.          channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
  147.          // 声明队列(如果不存在则创建),并设置为持久化
  148.          channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
  149.          // 将队列绑定到交换机,并指定路由键
  150.          channel.QueueBind(queueName, exchangeName, routingKey);
  151.          // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
  152.          var properties = channel.CreateBasicProperties();
  153.          properties.Persistent = true;
  154.          // 发送消息到交换机,只有路由键完全匹配的队列才会收到消息
  155.          var body = Encoding.UTF8.GetBytes(message);
  156.          channel.BasicPublish(exchangeName, routingKey, properties, body: body);
  157.      }
  158.      /// <summary>
  159.      /// 发布消息(Topic Exchange 模式,适用于路由键模式匹配的消息分发)。
  160.      /// </summary>
  161.      /// <param name="queueName">队列名称。</param>
  162.      /// <param name="routingKey">路由键。</param>
  163.      /// <param name="message">消息内容。</param>
  164.      /// <param name="bindingKeys">绑定规则(可选)。</param>
  165.      public void TopicSendMessage(string queueName, string routingKey, string message, IEnumerable<string>? bindingKeys = null)
  166.      {
  167.          using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
  168.          using var channel = connection.CreateModel();
  169.          // 创建 Topic 类型的交换机
  170.          var exchangeName = $"{queueName}_topic_exchange";
  171.          channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
  172.          // 声明队列(如果不存在则创建),并设置为持久化
  173.          channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
  174.          // 将队列绑定到交换机,并指定绑定规则
  175.          if (!bindingKeys.IsNullOrEmpty())
  176.          {
  177.              foreach (string bindingKey in bindingKeys)
  178.              {
  179.                  channel.QueueBind(queueName, exchangeName, bindingKey);
  180.              }
  181.          }
  182.          // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
  183.          var properties = channel.CreateBasicProperties();
  184.          properties.Persistent = true;
  185.          // 发送消息到交换机,只有路由键与绑定规则匹配的队列才会收到消息
  186.          var body = Encoding.UTF8.GetBytes(message);
  187.          channel.BasicPublish(exchangeName, routingKey, properties, body: body);
  188.      }
  189.      /// <summary>
  190.      /// 发布消息(Topic Exchange 模式,适用于路由键模式匹配的消息分发)。
  191.      /// </summary>
  192.      /// <param name="exchangeName">交换机名称。</param>
  193.      /// <param name="queueName">队列名称。</param>
  194.      /// <param name="routingKey">路由键。</param>
  195.      /// <param name="message">消息内容。</param>
  196.      /// <param name="bindingKeys">绑定规则(可选)。</param>
  197.      public void TopicSendMessage(string exchangeName, string queueName, string routingKey, string message, IEnumerable<string>? bindingKeys = null)
  198.      {
  199.          using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
  200.          using var channel = connection.CreateModel();
  201.          // 创建 Topic 类型的交换机
  202.          channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
  203.          // 声明队列(如果不存在则创建),并设置为持久化
  204.          channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
  205.          // 将队列绑定到交换机,并指定绑定规则
  206.          if (!bindingKeys.IsNullOrEmpty())
  207.          {
  208.              foreach (string bindingKey in bindingKeys)
  209.              {
  210.                  channel.QueueBind(queueName, exchangeName, bindingKey);
  211.              }
  212.          }
  213.          // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
  214.          var properties = channel.CreateBasicProperties();
  215.          properties.Persistent = true;
  216.          // 发送消息到交换机,只有路由键与绑定规则匹配的队列才会收到消息
  217.          var body = Encoding.UTF8.GetBytes(message);
  218.          channel.BasicPublish(exchangeName, routingKey, properties, body: body);
  219.      }
  220.  }
五、实现 RabbitMQ 消费者
    接下来我们来实现一个 RabbitMQ 消费者,用于发送消息到队列。创建一个名为 RabbitMQConsumer 的类:
  1.     /// <summary>
  2.     /// RabbitMQ 消费者,用于从 RabbitMQ 队列或交换机中消费消息。
  3.     /// </summary>
  4.     public class RabbitMQConsumer
  5.     {
  6.         private readonly ILogger<RabbitMQConsumer> _logger;
  7.         private readonly RabbitMQServiceOptions _options;
  8.         /// <summary>
  9.         /// 初始化 RabbitMQ 消费者。
  10.         /// </summary>
  11.         /// <param name="logger">日志记录器。</param>
  12.         /// <param name="options">RabbitMQ 连接配置。</param>
  13.         public RabbitMQConsumer(ILogger<RabbitMQConsumer> logger, RabbitMQServiceOptions options)
  14.         {
  15.             _logger = logger;
  16.             _options = options;
  17.         }
  18.         /// <summary>
  19.         /// 简单消费者(简单模式,适用于单生产者和单消费者)。
  20.         /// </summary>
  21.         /// <param name="queueName">队列名称。</param>
  22.         /// <param name="handler">消息处理逻辑。</param>
  23.         public void SimpleConsumer(string queueName, Action<string> handler)
  24.         {
  25.             var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
  26.             var channel = connection.CreateModel();
  27.             // 声明队列(如果不存在则创建),并设置为持久化
  28.             channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
  29.             // 创建消费者
  30.             var consumer = new EventingBasicConsumer(channel);
  31.             consumer.Received += (model, ea) =>
  32.             {
  33.                 // 处理消息
  34.                 var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  35.                 handler(message);
  36.             };
  37.             // 开始消费,自动确认消息
  38.             channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
  39.         }
  40.         /// <summary>
  41.         /// 消费者(工作队列模式,适用于多消费者负载均衡)。
  42.         /// </summary>
  43.         /// <param name="queueName">队列名称。</param>
  44.         /// <param name="handler">消息处理逻辑。</param>
  45.         public void WorkConsumer(string queueName, Func<stringbool> handler)
  46.         {
  47.             var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
  48.             var channel = connection.CreateModel();
  49.             // 声明队列(如果不存在则创建),并设置为持久化
  50.             channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
  51.             // 设置限流,避免消费者一次性接收过多消息
  52.             channel.BasicQos(prefetchSize: 0, prefetchCount: 1globalfalse);
  53.             // 创建消费者
  54.             var consumer = new EventingBasicConsumer(channel);
  55.             consumer.Received += (model, ea) =>
  56.             {
  57.                 // 处理消息
  58.                 var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  59.                 var result = handler(message);
  60.                 // 如果消息处理成功,手动确认消息
  61.                 if (result)
  62.                 {
  63.                     channel.BasicAck(ea.DeliveryTag, false);
  64.                 }
  65.             };
  66.             // 开始消费,手动确认消息
  67.             channel.BasicConsume(queueName, autoAck: false, consumer: consumer);
  68.         }
  69.         /// <summary>
  70.         /// 消费者(发布/订阅模式,适用于广播消息到所有绑定队列)。
  71.         /// </summary>
  72.         /// <param name="exchangeName">交换机名称。</param>
  73.         /// <param name="queueName">队列名称。</param>
  74.         /// <param name="handler">消息处理逻辑。</param>
  75.         public void PubSubConsumer(string exchangeName, string queueName, Action<string> handler)
  76.         {
  77.             var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
  78.             var channel = connection.CreateModel();
  79.             // 声明 Fanout 类型的交换机
  80.             channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
  81.             // 声明队列(如果不存在则创建),并设置为持久化
  82.             var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
  83.             // 将队列绑定到交换机
  84.             channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: "");
  85.             // 创建消费者
  86.             var consumer = new EventingBasicConsumer(channel);
  87.             consumer.Received += (model, ea) =>
  88.             {
  89.                 // 处理消息
  90.                 var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  91.                 handler(message);
  92.                 // 手动确认消息
  93.                 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  94.             };
  95.             // 开始消费,自动确认消息
  96.             channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
  97.         }
  98.         /// <summary>
  99.         /// 消费者(路由模式,适用于路由键完全匹配的消息分发)。
  100.         /// </summary>
  101.         /// <param name="exchangeName">交换机名称。</param>
  102.         /// <param name="queueName">队列名称。</param>
  103.         /// <param name="routingKey">路由键。</param>
  104.         /// <param name="handler">消息处理逻辑。</param>
  105.         public void RoutingConsumer(string exchangeName, string queueName, string routingKey, Action<string> handler)
  106.         {
  107.             var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
  108.             var channel = connection.CreateModel();
  109.             // 声明 Direct 类型的交换机
  110.             channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
  111.             // 声明队列(如果不存在则创建),并设置为持久化
  112.             var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
  113.             // 将队列绑定到交换机,并指定路由键
  114.             channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey);
  115.             // 创建消费者
  116.             var consumer = new EventingBasicConsumer(channel);
  117.             consumer.Received += (model, ea) =>
  118.             {
  119.                 // 处理消息
  120.                 var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  121.                 handler(message);
  122.                 // 手动确认消息
  123.                 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  124.             };
  125.             // 开始消费,自动确认消息
  126.             channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
  127.         }
  128.         /// <summary>
  129.         /// 消费者(主题模式,适用于路由键模式匹配的消息分发)。
  130.         /// </summary>
  131.         /// <param name="exchangeName">交换机名称。</param>
  132.         /// <param name="queueName">队列名称。</param>
  133.         /// <param name="routingKey">路由键。</param>
  134.         /// <param name="handler">消息处理逻辑。</param>
  135.         public void TopicConsumer(string exchangeName, string queueName, string routingKey, Action<string> handler)
  136.         {
  137.             var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
  138.             var channel = connection.CreateModel();
  139.             // 声明 Topic 类型的交换机
  140.             channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Topic);
  141.             // 声明队列(如果不存在则创建),并设置为持久化
  142.             var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
  143.             // 将队列绑定到交换机,并指定路由键
  144.             channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey);
  145.             // 创建消费者
  146.             var consumer = new EventingBasicConsumer(channel);
  147.             consumer.Received += (model, ea) =>
  148.             {
  149.                 // 处理消息
  150.                 var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  151.                 handler(message);
  152.                 // 手动确认消息
  153.                 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  154.             };
  155.             // 开始消费,手动确认消息
  156.             channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
  157.         }
  158.     }
六、整合到.Net6的注入依赖:
    在 .NET 应用中,通常我们会使用依赖注入来管理服务的生命周期。以下是一个扩展类,用于将 RabbitMQ 的生产者和消费者注册到服务集合中:
  1.     /// <summary>
  2.     /// RabbitMQ 服务集合扩展类,用于将 RabbitMQ 客户端和监听器添加到依赖注入容器。
  3.     /// </summary>
  4.     public static class RabbitMQServiceCollectionExtensions
  5.     {
  6.         /// <summary>
  7.         /// 添加 RabbitMQ 服务到服务集合。
  8.         /// </summary>
  9.         /// <param name="services">服务集合。</param>
  10.         /// <returns>服务集合。</returns>
  11.         /// <exception cref="ArgumentNullException">当配置选项无效时抛出。</exception>
  12.         public static IServiceCollection AddRabbmitMQ(this IServiceCollection services)
  13.         {
  14.             // 从容器中获取配置的 RabbitMQServiceOptions
  15.             var serviceProvider = services.BuildServiceProvider();
  16.             var serviceOptions = serviceProvider.GetRequiredService<IOptions<RabbitMQServiceOptions>>().Value;
  17.             // 验证服务选项是否有效
  18.             if (serviceOptions == null || serviceOptions.Host.IsNullOrEmpty() || serviceOptions.Port < 0)
  19.             {
  20.                 throw new ArgumentNullException(nameof(serviceOptions), "RabbitMQ service options must be provided with valid settings.");
  21.             }
  22.             // 注册 RabbitMQ 客户端作为单例服务
  23.             services.AddSingleton(sp => new RabbitMQProducer(
  24.                 sp.GetRequiredService<ILogger<RabbitMQProducer>>(),
  25.                 serviceOptions));
  26.             // 注册 RabbitMQ消费端作为单例服务
  27.             services.AddSingleton(sp => new RabbitMQConsumer(
  28.                 sp.GetRequiredService<ILogger<RabbitMQConsumer>>(),
  29.                 serviceOptions));
  30.             return services;
  31.         }
  32.     }
七、注入 RabbitMQ到程序(Program.cs):
  1. var builder = WebApplication.CreateBuilder(args);
  2. builder.Services.AddControllers();
  3. // 加载 RabbitMQ 配置
  4. builder.Services.Configure<RabbitMQServiceOptions>(builder.Configuration.GetSection("RabbitMQ"));
  5. // 添加 RabbitMQ 客户端和监听器
  6. builder.Services.AddRabbmitMQ();
  7. var app = builder.Build();
  8. // Configure the HTTP request pipeline.
  9. app.UseAuthorization();
  10. app.MapControllers();
  11. app.Run();

八、生产者的Demo:

  1. using Microsoft.Extensions.Logging;
  2. using RabbitMQ.Client;
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Text;
  6. class Program
  7. {
  8.     static void Main(string[] args)
  9.     {
  10.         // 配置 RabbitMQ 连接选项
  11.         var options = new RabbitMQServiceOptions
  12.         {
  13.             Host = "localhost",
  14.             Port = 5672,
  15.             UserName = "guest",
  16.             Password = "guest"
  17.         };
  18.         // 创建日志记录器(这里使用控制台日志)
  19.         using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
  20.         var logger = loggerFactory.CreateLogger<RabbitMQProducer>();
  21.         // 创建 RabbitMQ 生产者
  22.         var producer = new RabbitMQProducer(logger, options);
  23.         // 发送工作队列消息
  24.         producer.WorkQueueSendMessage("work_queue""Hello Work Queue");
  25.         // 发送简单队列消息
  26.         producer.SimpleSendMessage("simple_queue""Hello Simple Queue");
  27.         // 发送 Fanout 交换机消息(自动生成交换机名称)
  28.         producer.FanoutSendMessage("fanout_queue""Hello Fanout Queue");
  29.         // 发送 Fanout 交换机消息(指定交换机名称)
  30.         producer.FanoutSendMessage("my_fanout_exchange""fanout_queue""Hello Fanout Queue");
  31.         // 发送 Direct 交换机消息(自动生成交换机名称)
  32.         producer.DirectSendMessage("direct_queue""Hello Direct Queue");
  33.         // 发送 Direct 交换机消息(指定交换机名称和路由键)
  34.         producer.DirectSendMessage("my_direct_exchange""direct_queue""direct_key""Hello Direct Queue");
  35.         // 发送 Topic 交换机消息(自动生成交换机名称)
  36.         producer.TopicSendMessage("topic_queue""topic.key""Hello Topic Queue"new List<string> { "topic.*" });
  37.         // 发送 Topic 交换机消息(指定交换机名称和路由键)
  38.         producer.TopicSendMessage("my_topic_exchange""topic_queue""topic.key""Hello Topic Queue"new List<string> { "topic.*" });
  39.         Console.WriteLine("消息发送完成!");
  40.     }
  41. }
  • 通过以上示例代码,你可以轻松地在 .NET 6 中使用 RabbitMQ 实现多种消息发送模式。每种模式都有其特定的应用场景,例如:
    工作队列模式
    :适用于多消费者负载均衡。

  • 简单模式:适用于单生产者和单消费者。

  • Fanout 交换机模式:适用于广播消息到所有绑定队列。

  • Direct 交换机模式:适用于路由键完全匹配的消息分发。

  • Topic 交换机模式:适用于路由键模式匹配的消息分发。


九、消费者Demo:

  1. using Microsoft.Extensions.Logging;
  2. using RabbitMQ.Client;
  3. using System;
  4. using System.Text;
  5. class Program
  6. {
  7.     static void Main(string[] args)
  8.     {
  9.         // 配置 RabbitMQ 连接选项
  10.         var options = new RabbitMQServiceOptions
  11.         {
  12.             Host = "localhost",
  13.             Port = 5672,
  14.             UserName = "guest",
  15.             Password = "guest"
  16.         };
  17.         // 创建日志记录器(这里使用控制台日志)
  18.         using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
  19.         var logger = loggerFactory.CreateLogger<RabbitMQConsumer>();
  20.         // 创建 RabbitMQ 消费者
  21.         var consumer = new RabbitMQConsumer(logger, options);
  22.         // 简单消费者
  23.         consumer.SimpleConsumer("simple_queue", message =>
  24.         {
  25.             Console.WriteLine($"接收到简单队列消息: {message}");
  26.         });
  27.         // 工作队列消费者
  28.         consumer.WorkConsumer("work_queue", message =>
  29.         {
  30.             Console.WriteLine($"接收到工作队列消息: {message}");
  31.             return true// 处理成功,手动确认消息
  32.         });
  33.         // 发布/订阅消费者
  34.         consumer.PubSubConsumer("my_fanout_exchange""fanout_queue", message =>
  35.         {
  36.             Console.WriteLine($"接收到发布/订阅消息: {message}");
  37.         });
  38.         // 路由消费者
  39.         consumer.RoutingConsumer("my_direct_exchange""direct_queue""direct_key", message =>
  40.         {
  41.             Console.WriteLine($"接收到路由消息: {message}");
  42.         });
  43.         // 主题消费者
  44.         consumer.TopicConsumer("my_topic_exchange""topic_queue""topic.key", message =>
  45.         {
  46.             Console.WriteLine($"接收到主题消息: {message}");
  47.         });
  48.         Console.WriteLine("消费者已启动,等待接收消息...");
  49.         Console.ReadLine(); // 保持程序运行
  50.     }
  51. }
  • 通过以上示例代码,你可以轻松地在 .NET 6 中使用 RabbitMQ 实现多种消息消费模式。每种模式都有其特定的应用场景,例如:
    简单消费者
    :适用于单生产者和单消费者。

  • 工作队列消费者:适用于多消费者负载均衡。

  • 发布/订阅消费者:适用于广播消息到所有绑定队列。

  • 路由消费者:适用于路由键完全匹配的消息分发。

  • 主题消费者:适用于路由键模式匹配的消息分发。

END



评价

.net6 Ocelot 与 Kubernetes

.Net6 Ocelot 与 Kubernetes[TOC] 前言这玩意太坑人了。浪费了我一天的时间。先看我们想实现的效果流程: 首先我们请求sv...

.net core发布出来swagger无法访问。docker 发布.net6 webapi swagger访问不到

因为代码里边设置swagger的代码是: if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); app.Use...

.net6 指定时区

.Net6 指定时区[TOC] 最近相当忙!忙着学这学那的,各种考试。以及项目上也有很多改动。还有这恶心的时间问题(特别注意当...

.net6 AsyncEx 异步锁

.Net6 AsyncEx[TOC] 简单来讲就是可以通过异步方式实现锁。安装&lt;PackageReference Include=&quot;Nito.AsyncEx&quot; V...

Kubernetes .net6 集群管理(一)

Kubernetes .Net6 集群管理(一)[TOC] 对于Kubernetes集群的管理,.net提供了KubernetesClient库,可以对k8s集群通过简单...

Kubernetes .net6 Webhook

Kubernetes .Net6 Webhook[TOC] 本文主要是学习陈计节大佬讲的 使用 .NET Core 开发 Kubernetes 基础组件记录的笔记。 Ad...

.net6 设置信任自签证书(浏览器可信任)

.Net6 设置信任自签证书(浏览器可信任)[TOC] 先决条件确保本地windows上拥有openssl,没有的自己去:http://slproweb.com...

Kubernetes .net6 CRD

Kubernetes .Net6 CRD[TOC] CRD介绍简单来说就是自定义资源,像Pod、Service、Deployment一样。创建自定义资源的资源类型...

linux批量执行命令脚本。linux脚本执行docker镜像打包运行.net6项目等

linux批量执行命令脚本1:创建一个.sh后缀的文件vi run.sh 2:在文件开头添加内容#!/bin/bash 3:在文件里边输入想要执行...

docker发布.net6项目。制作发布的批量脚本一键发布脚本

docker 发布.net core项目可以参考:https://www.tnblog.net/aojiancc2/article/details/5030 docker发布.net6项目简单的d...

.net6 连接mysql报错Unable to connect to any of the specified MySQL hosts.

.net5/6 连接mysql报错Unable to connect to any of the specified MySQL hosts. 不能使用点.连接 server=.;uid=root;pwd...

.net6使用nacos作为配置中心

consul+.net core实现配置中心:https://www.tnblog.net/aojiancc2/article/details/6815nacos的安装参考:https://www.tnbl...

.net6使用session

先在Program.cs中引入 使用存储 HttpContext.Session.SetString(&quot;nickname&quot;,&quot;test&quot;); 读取 string...

.net6使用nacos实现服务注册与服务发现

.net6使用nacos作为配置中心:https://www.tnblog.net/aojiancc2/article/details/7870docker安装nacos v2.1.2:https://www...

.net6.net core获取服务器上所有网卡的IP地址

代码如下: //获取服务器上所有网卡的IP地址 NetworkInterface[] networks = NetworkInterface.GetAllNetworkInterfaces(...

.net6使用nacos 集群部署,负载均衡调用 。docker swarm 集群部署.net6项目

我们这里的k8s测试环境暂时用不了了,这里先使用docker swarm来进行一下集群部署。.net6使用nacos实现服务注册与服务发现:h...
感谢挫折
排名
158
文章
3
粉丝
0
评论
1
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:50010702506256
欢迎加群交流技术