tnblog
首页
视频
资源
登录

.NET6中使用RabbitMQ详尽指南

69人阅读 2025/1/2 11:52 总访问:1543 评论:0 收藏:0 手机
分类: .NET6
解释:
    RabbitMQ 是一个流行的开源消息队列系统,广泛用于实现异步通信、解耦组件、负载均衡等场景。在本篇博客中,我们将详细介绍如何在 .NET 6 中使用 RabbitMQ,包括生产者和消费者的实现,以及如何通过依赖注入来管理它们。
一、创建 .NET 6 应用

    首先,确保你已经安装了 .NET 6 SDK。可以使用命令行工具创建一个新的 .NET 控制台应用:

dotnet new console -n RabbitMQDemo
cd RabbitMQDemo
      接着,你需要安装 RabbitMQ 的客户端库,通过 NuGet 包管理器来安装 RabbitMQ.Client:
dotnet add package RabbitMQ.Client;       //这里我们选择 6.4.0 版本
dotnet add package Masuit.Tools.Core;     //这个为一个
二、配置 RabbitMQ
    接下来,我们需要定义连接 RabbitMQ 所需的配置选项。通常我们会将这些配置选项存储在一个类中。以下是配置类(RabbitMQServiceOptions.cs)的实现
/// <summary>
/// RabbitMQ服务配置
/// </summary>
public class RabbitMQServiceOptions
{
        /// <summary>
        /// 服务地址
        /// </summary>
        public string Host { get; set; }

        /// <summary>
        /// 端口
        /// </summary>
        public int Port { get; set; }

        /// <summary>
        /// 用户名
        /// </summary>
        public string UserName { get; set; }

        /// <summary>
        /// 密码
        /// </summary>
        public string Password { get; set; }
}
三、实现 RabbitMQ 连接工厂
    为确保我们能够高效且安全地建立 RabbitMQ 连接,我们将创建一个名为 RabbitMQContext 的连接工厂类
    /// <summary>
    /// RabbitMQ连接工厂
    /// </summary>
    public class RabbitMQContext
    {
        private static ConnectionFactory? factory;
        private static readonly object lockObj = new();

        /// <summary>
        /// 获取单个RabbitMQ连接
        /// </summary>
        /// <returns></returns>
        public static IConnection GetConnection(string hostName, int port, string userName, string password)
        {
            if (factory == null)
            {
                lock (lockObj)
                {
                    factory ??= new ConnectionFactory
                    {
                        HostName = hostName,
                        Port = port,
                        UserName = userName,
                        Password = password
                    };
                }
            }

            return factory.CreateConnection();
        }
    }
四、实现 RabbitMQ 生产者
    接下来我们来实现一个 RabbitMQ 生产者,用于发送消息到队列。创建一个名为 RabbitMQProducer 的类:
 /// <summary>
 /// RabbitMQ 客户端,用于发送消息到 RabbitMQ 队列或交换机。
 /// </summary>
 public class RabbitMQProducer
 {
     private readonly ILogger<RabbitMQProducer> _logger;
     private readonly RabbitMQServiceOptions _options;

     /// <summary>
     /// 初始化 RabbitMQ 客户端。
     /// </summary>
     /// <param name="logger">日志记录器。</param>
     /// <param name="options">RabbitMQ 连接配置。</param>
     public RabbitMQProducer(ILogger<RabbitMQProducer> logger, RabbitMQServiceOptions options)
     {
         _logger = logger;
         _options = options;
     }

     /****
      * RabbitMQ 交换机类型说明:
      * 1. Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
      *    例如,如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发。
      * 2. Fanout Exchange – 不处理路由键。只需将队列绑定到交换机上,发送到交换机的消息会被转发到所有绑定的队列。
      *    类似于广播,所有绑定的队列都会收到消息。
      * 3. Topic Exchange – 将路由键和某模式进行匹配。队列需要绑定到一个模式上。
      *    符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
      *    例如,“audit.#”能够匹配到“audit.irs.corporate”,但“audit.*”只会匹配到“audit.irs”。
      ****/

     /// <summary>
     /// 发布消息(工作队列模式,适用于多消费者负载均衡)。
     /// </summary>
     /// <param name="queueName">队列名称。</param>
     /// <param name="message">消息内容。</param>
     public void WorkQueueSendMessage(string queueName, string message)
     {
         using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
         using var channel = connection.CreateModel();

         // 声明队列(如果不存在则创建),并设置为持久化
         channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

         // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
         var properties = channel.CreateBasicProperties();
         properties.Persistent = true;

         // 发送消息到指定队列
         var body = Encoding.UTF8.GetBytes(message);
         channel.BasicPublish(exchange: string.Empty, routingKey: queueName, basicProperties: properties, body: body);
     }

     /// <summary>
     /// 推送消息(简单模式,适用于单生产者和单消费者)。
     /// </summary>
     /// <param name="queueName">队列名称。</param>
     /// <param name="message">消息内容。</param>
     public void SimpleSendMessage(string queueName, string message)
     {
         using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
         using var channel = connection.CreateModel();

         // 声明队列(如果不存在则创建),并设置为持久化
         channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

         // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
         var properties = channel.CreateBasicProperties();
         properties.Persistent = true;

         // 发送消息到指定队列
         var body = Encoding.UTF8.GetBytes(message);
         channel.BasicPublish(exchange: string.Empty, routingKey: queueName, mandatory: false, basicProperties: properties, body: body);
     }

     /// <summary>
     /// 发布消息(Fanout Exchange 模式,适用于广播消息到所有绑定队列)。
     /// </summary>
     /// <param name="queueName">队列名称。</param>
     /// <param name="message">消息内容。</param>
     public void FanoutSendMessage(string queueName, string message)
     {
         using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
         using var channel = connection.CreateModel();

         // 创建 Fanout 类型的交换机
         var exchangeName = $"{queueName}_fanout_exchange";
         channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);

         // 声明队列(如果不存在则创建),并设置为持久化
         channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);

         // 将队列绑定到交换机,routingKey 无需指定
         channel.QueueBind(queueName, exchangeName, routingKey: string.Empty);

         // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
         var properties = channel.CreateBasicProperties();
         properties.Persistent = true;

         // 发送消息到交换机,所有绑定队列都会收到消息
         var body = Encoding.UTF8.GetBytes(message);
         channel.BasicPublish(exchangeName, routingKey: string.Empty, properties, body: body);
     }

     /// <summary>
     /// 发布消息(Fanout Exchange 模式,适用于广播消息到所有绑定队列)。
     /// </summary>
     /// <param name="exchangeName">交换机名称。</param>
     /// <param name="queueName">队列名称。</param>
     /// <param name="message">消息内容。</param>
     public void FanoutSendMessage(string exchangeName, string queueName, string message)
     {
         using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
         using var channel = connection.CreateModel();

         // 创建 Fanout 类型的交换机
         channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);

         // 声明队列(如果不存在则创建),并设置为持久化
         channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);

         // 将队列绑定到交换机,routingKey 无需指定
         channel.QueueBind(queueName, exchangeName, routingKey: string.Empty);

         // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
         var properties = channel.CreateBasicProperties();
         properties.Persistent = true;

         // 发送消息到交换机,所有绑定队列都会收到消息
         var body = Encoding.UTF8.GetBytes(message);
         channel.BasicPublish(exchangeName, routingKey: string.Empty, properties, body: body);
     }

     /// <summary>
     /// 发布消息(Direct Exchange 模式,适用于路由键完全匹配的消息分发)。
     /// </summary>
     /// <param name="queueName">队列名称。</param>
     /// <param name="message">消息内容。</param>
     public void DirectSendMessage(string queueName, string message)
     {
         using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
         using var channel = connection.CreateModel();

         // 创建 Direct 类型的交换机
         var exchangeName = $"{queueName}_direct_exchange";
         channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);

         // 声明队列(如果不存在则创建),并设置为持久化
         channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);

         // 将队列绑定到交换机,并指定路由键
         var routingKey = $"{queueName}";
         channel.QueueBind(queueName, exchangeName, routingKey);

         // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
         var properties = channel.CreateBasicProperties();
         properties.Persistent = true;

         // 发送消息到交换机,只有路由键完全匹配的队列才会收到消息
         var body = Encoding.UTF8.GetBytes(message);
         channel.BasicPublish(exchangeName, routingKey, properties, body: body);
     }

     /// <summary>
     /// 发布消息(Direct Exchange 模式,适用于路由键完全匹配的消息分发)。
     /// </summary>
     /// <param name="exchangeName">交换机名称。</param>
     /// <param name="queueName">队列名称。</param>
     /// <param name="routingKey">路由键。</param>
     /// <param name="message">消息内容。</param>
     public void DirectSendMessage(string exchangeName, string queueName, string routingKey, string message)
     {
         using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
         using var channel = connection.CreateModel();

         // 创建 Direct 类型的交换机
         channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);

         // 声明队列(如果不存在则创建),并设置为持久化
         channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);

         // 将队列绑定到交换机,并指定路由键
         channel.QueueBind(queueName, exchangeName, routingKey);

         // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
         var properties = channel.CreateBasicProperties();
         properties.Persistent = true;

         // 发送消息到交换机,只有路由键完全匹配的队列才会收到消息
         var body = Encoding.UTF8.GetBytes(message);
         channel.BasicPublish(exchangeName, routingKey, properties, body: body);
     }

     /// <summary>
     /// 发布消息(Topic Exchange 模式,适用于路由键模式匹配的消息分发)。
     /// </summary>
     /// <param name="queueName">队列名称。</param>
     /// <param name="routingKey">路由键。</param>
     /// <param name="message">消息内容。</param>
     /// <param name="bindingKeys">绑定规则(可选)。</param>
     public void TopicSendMessage(string queueName, string routingKey, string message, IEnumerable<string>? bindingKeys = null)
     {
         using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
         using var channel = connection.CreateModel();

         // 创建 Topic 类型的交换机
         var exchangeName = $"{queueName}_topic_exchange";
         channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);

         // 声明队列(如果不存在则创建),并设置为持久化
         channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);

         // 将队列绑定到交换机,并指定绑定规则
         if (!bindingKeys.IsNullOrEmpty())
         {
             foreach (string bindingKey in bindingKeys)
             {
                 channel.QueueBind(queueName, exchangeName, bindingKey);
             }
         }

         // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
         var properties = channel.CreateBasicProperties();
         properties.Persistent = true;

         // 发送消息到交换机,只有路由键与绑定规则匹配的队列才会收到消息
         var body = Encoding.UTF8.GetBytes(message);
         channel.BasicPublish(exchangeName, routingKey, properties, body: body);
     }

     /// <summary>
     /// 发布消息(Topic Exchange 模式,适用于路由键模式匹配的消息分发)。
     /// </summary>
     /// <param name="exchangeName">交换机名称。</param>
     /// <param name="queueName">队列名称。</param>
     /// <param name="routingKey">路由键。</param>
     /// <param name="message">消息内容。</param>
     /// <param name="bindingKeys">绑定规则(可选)。</param>
     public void TopicSendMessage(string exchangeName, string queueName, string routingKey, string message, IEnumerable<string>? bindingKeys = null)
     {
         using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
         using var channel = connection.CreateModel();

         // 创建 Topic 类型的交换机
         channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);

         // 声明队列(如果不存在则创建),并设置为持久化
         channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);

         // 将队列绑定到交换机,并指定绑定规则
         if (!bindingKeys.IsNullOrEmpty())
         {
             foreach (string bindingKey in bindingKeys)
             {
                 channel.QueueBind(queueName, exchangeName, bindingKey);
             }
         }

         // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
         var properties = channel.CreateBasicProperties();
         properties.Persistent = true;

         // 发送消息到交换机,只有路由键与绑定规则匹配的队列才会收到消息
         var body = Encoding.UTF8.GetBytes(message);
         channel.BasicPublish(exchangeName, routingKey, properties, body: body);
     }
 }
五、实现 RabbitMQ 消费者
    接下来我们来实现一个 RabbitMQ 消费者,用于发送消息到队列。创建一个名为 RabbitMQConsumer 的类:
    /// <summary>
    /// RabbitMQ 消费者,用于从 RabbitMQ 队列或交换机中消费消息。
    /// </summary>
    public class RabbitMQConsumer
    {
        private readonly ILogger<RabbitMQConsumer> _logger;
        private readonly RabbitMQServiceOptions _options;

        /// <summary>
        /// 初始化 RabbitMQ 消费者。
        /// </summary>
        /// <param name="logger">日志记录器。</param>
        /// <param name="options">RabbitMQ 连接配置。</param>
        public RabbitMQConsumer(ILogger<RabbitMQConsumer> logger, RabbitMQServiceOptions options)
        {
            _logger = logger;
            _options = options;
        }

        /// <summary>
        /// 简单消费者(简单模式,适用于单生产者和单消费者)。
        /// </summary>
        /// <param name="queueName">队列名称。</param>
        /// <param name="handler">消息处理逻辑。</param>
        public void SimpleConsumer(string queueName, Action<string> handler)
        {
            var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
            var channel = connection.CreateModel();

            // 声明队列(如果不存在则创建),并设置为持久化
            channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

            // 创建消费者
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                // 处理消息
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                handler(message);
            };

            // 开始消费,自动确认消息
            channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
        }

        /// <summary>
        /// 消费者(工作队列模式,适用于多消费者负载均衡)。
        /// </summary>
        /// <param name="queueName">队列名称。</param>
        /// <param name="handler">消息处理逻辑。</param>
        public void WorkConsumer(string queueName, Func<string, bool> handler)
        {
            var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
            var channel = connection.CreateModel();

            // 声明队列(如果不存在则创建),并设置为持久化
            channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);

            // 设置限流,避免消费者一次性接收过多消息
            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            // 创建消费者
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                // 处理消息
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                var result = handler(message);

                // 如果消息处理成功,手动确认消息
                if (result)
                {
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };

            // 开始消费,手动确认消息
            channel.BasicConsume(queueName, autoAck: false, consumer: consumer);
        }

        /// <summary>
        /// 消费者(发布/订阅模式,适用于广播消息到所有绑定队列)。
        /// </summary>
        /// <param name="exchangeName">交换机名称。</param>
        /// <param name="queueName">队列名称。</param>
        /// <param name="handler">消息处理逻辑。</param>
        public void PubSubConsumer(string exchangeName, string queueName, Action<string> handler)
        {
            var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
            var channel = connection.CreateModel();

            // 声明 Fanout 类型的交换机
            channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");

            // 声明队列(如果不存在则创建),并设置为持久化
            var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

            // 将队列绑定到交换机
            channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: "");

            // 创建消费者
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                // 处理消息
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                handler(message);

                // 手动确认消息
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };

            // 开始消费,自动确认消息
            channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
        }

        /// <summary>
        /// 消费者(路由模式,适用于路由键完全匹配的消息分发)。
        /// </summary>
        /// <param name="exchangeName">交换机名称。</param>
        /// <param name="queueName">队列名称。</param>
        /// <param name="routingKey">路由键。</param>
        /// <param name="handler">消息处理逻辑。</param>
        public void RoutingConsumer(string exchangeName, string queueName, string routingKey, Action<string> handler)
        {
            var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
            var channel = connection.CreateModel();

            // 声明 Direct 类型的交换机
            channel.ExchangeDeclare(exchange: exchangeName, type: "direct");

            // 声明队列(如果不存在则创建),并设置为持久化
            var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

            // 将队列绑定到交换机,并指定路由键
            channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey);

            // 创建消费者
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                // 处理消息
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                handler(message);

                // 手动确认消息
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };

            // 开始消费,自动确认消息
            channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
        }

        /// <summary>
        /// 消费者(主题模式,适用于路由键模式匹配的消息分发)。
        /// </summary>
        /// <param name="exchangeName">交换机名称。</param>
        /// <param name="queueName">队列名称。</param>
        /// <param name="routingKey">路由键。</param>
        /// <param name="handler">消息处理逻辑。</param>
        public void TopicConsumer(string exchangeName, string queueName, string routingKey, Action<string> handler)
        {
            var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
            var channel = connection.CreateModel();

            // 声明 Topic 类型的交换机
            channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Topic);

            // 声明队列(如果不存在则创建),并设置为持久化
            var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

            // 将队列绑定到交换机,并指定路由键
            channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey);

            // 创建消费者
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                // 处理消息
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                handler(message);

                // 手动确认消息
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };

            // 开始消费,手动确认消息
            channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
        }
    }
六、整合到.Net6的注入依赖:
    在 .NET 应用中,通常我们会使用依赖注入来管理服务的生命周期。以下是一个扩展类,用于将 RabbitMQ 的生产者和消费者注册到服务集合中:
    /// <summary>
    /// RabbitMQ 服务集合扩展类,用于将 RabbitMQ 客户端和监听器添加到依赖注入容器。
    /// </summary>
    public static class RabbitMQServiceCollectionExtensions
    {
        /// <summary>
        /// 添加 RabbitMQ 服务到服务集合。
        /// </summary>
        /// <param name="services">服务集合。</param>
        /// <returns>服务集合。</returns>
        /// <exception cref="ArgumentNullException">当配置选项无效时抛出。</exception>
        public static IServiceCollection AddRabbmitMQ(this IServiceCollection services)
        {
            // 从容器中获取配置的 RabbitMQServiceOptions
            var serviceProvider = services.BuildServiceProvider();
            var serviceOptions = serviceProvider.GetRequiredService<IOptions<RabbitMQServiceOptions>>().Value;

            // 验证服务选项是否有效
            if (serviceOptions == null || serviceOptions.Host.IsNullOrEmpty() || serviceOptions.Port < 0)
            {
                throw new ArgumentNullException(nameof(serviceOptions), "RabbitMQ service options must be provided with valid settings.");
            }

            // 注册 RabbitMQ 客户端作为单例服务
            services.AddSingleton(sp => new RabbitMQProducer(
                sp.GetRequiredService<ILogger<RabbitMQProducer>>(),
                serviceOptions));

            // 注册 RabbitMQ消费端作为单例服务
            services.AddSingleton(sp => new RabbitMQConsumer(
                sp.GetRequiredService<ILogger<RabbitMQConsumer>>(),
                serviceOptions));

            return services;
        }
    }
七、注入 RabbitMQ到程序(Program.cs):
var builder = WebApplication.CreateBuilder(args);

builder.Services.AddControllers();

// 加载 RabbitMQ 配置
builder.Services.Configure<RabbitMQServiceOptions>(builder.Configuration.GetSection("RabbitMQ"));

// 添加 RabbitMQ 客户端和监听器
builder.Services.AddRabbmitMQ();

var app = builder.Build();

// Configure the HTTP request pipeline.

app.UseAuthorization();

app.MapControllers();

app.Run();

八、生产者的Demo:

using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;

class Program
{
    static void Main(string[] args)
    {
        // 配置 RabbitMQ 连接选项
        var options = new RabbitMQServiceOptions
        {
            Host = "localhost",
            Port = 5672,
            UserName = "guest",
            Password = "guest"
        };

        // 创建日志记录器(这里使用控制台日志)
        using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
        var logger = loggerFactory.CreateLogger<RabbitMQProducer>();

        // 创建 RabbitMQ 生产者
        var producer = new RabbitMQProducer(logger, options);

        // 发送工作队列消息
        producer.WorkQueueSendMessage("work_queue", "Hello Work Queue");

        // 发送简单队列消息
        producer.SimpleSendMessage("simple_queue", "Hello Simple Queue");

        // 发送 Fanout 交换机消息(自动生成交换机名称)
        producer.FanoutSendMessage("fanout_queue", "Hello Fanout Queue");

        // 发送 Fanout 交换机消息(指定交换机名称)
        producer.FanoutSendMessage("my_fanout_exchange", "fanout_queue", "Hello Fanout Queue");

        // 发送 Direct 交换机消息(自动生成交换机名称)
        producer.DirectSendMessage("direct_queue", "Hello Direct Queue");

        // 发送 Direct 交换机消息(指定交换机名称和路由键)
        producer.DirectSendMessage("my_direct_exchange", "direct_queue", "direct_key", "Hello Direct Queue");

        // 发送 Topic 交换机消息(自动生成交换机名称)
        producer.TopicSendMessage("topic_queue", "topic.key", "Hello Topic Queue", new List<string> { "topic.*" });

        // 发送 Topic 交换机消息(指定交换机名称和路由键)
        producer.TopicSendMessage("my_topic_exchange", "topic_queue", "topic.key", "Hello Topic Queue", new List<string> { "topic.*" });

        Console.WriteLine("消息发送完成!");
    }
}
  • 通过以上示例代码,你可以轻松地在 .NET 6 中使用 RabbitMQ 实现多种消息发送模式。每种模式都有其特定的应用场景,例如:
    工作队列模式
    :适用于多消费者负载均衡。

  • 简单模式:适用于单生产者和单消费者。

  • Fanout 交换机模式:适用于广播消息到所有绑定队列。

  • Direct 交换机模式:适用于路由键完全匹配的消息分发。

  • Topic 交换机模式:适用于路由键模式匹配的消息分发。


九、消费者Demo:

using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using System;
using System.Text;

class Program
{
    static void Main(string[] args)
    {
        // 配置 RabbitMQ 连接选项
        var options = new RabbitMQServiceOptions
        {
            Host = "localhost",
            Port = 5672,
            UserName = "guest",
            Password = "guest"
        };

        // 创建日志记录器(这里使用控制台日志)
        using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
        var logger = loggerFactory.CreateLogger<RabbitMQConsumer>();

        // 创建 RabbitMQ 消费者
        var consumer = new RabbitMQConsumer(logger, options);

        // 简单消费者
        consumer.SimpleConsumer("simple_queue", message =>
        {
            Console.WriteLine($"接收到简单队列消息: {message}");
        });

        // 工作队列消费者
        consumer.WorkConsumer("work_queue", message =>
        {
            Console.WriteLine($"接收到工作队列消息: {message}");
            return true; // 处理成功,手动确认消息
        });

        // 发布/订阅消费者
        consumer.PubSubConsumer("my_fanout_exchange", "fanout_queue", message =>
        {
            Console.WriteLine($"接收到发布/订阅消息: {message}");
        });

        // 路由消费者
        consumer.RoutingConsumer("my_direct_exchange", "direct_queue", "direct_key", message =>
        {
            Console.WriteLine($"接收到路由消息: {message}");
        });

        // 主题消费者
        consumer.TopicConsumer("my_topic_exchange", "topic_queue", "topic.key", message =>
        {
            Console.WriteLine($"接收到主题消息: {message}");
        });

        Console.WriteLine("消费者已启动,等待接收消息...");
        Console.ReadLine(); // 保持程序运行
    }
}
  • 通过以上示例代码,你可以轻松地在 .NET 6 中使用 RabbitMQ 实现多种消息消费模式。每种模式都有其特定的应用场景,例如:
    简单消费者
    :适用于单生产者和单消费者。

  • 工作队列消费者:适用于多消费者负载均衡。

  • 发布/订阅消费者:适用于广播消息到所有绑定队列。

  • 路由消费者:适用于路由键完全匹配的消息分发。

  • 主题消费者:适用于路由键模式匹配的消息分发。

END



评价
感谢挫折
排名
6
文章
6
粉丝
16
评论
8
{{item.articleTitle}}
{{item.blogName}} : {{item.content}}
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:50010702506256
欢迎加群交流技术