
解释:
RabbitMQ 是一个流行的开源消息队列系统,广泛用于实现异步通信、解耦组件、负载均衡等场景。在本篇博客中,我们将详细介绍如何在 .NET 6 中使用 RabbitMQ,包括生产者和消费者的实现,以及如何通过依赖注入来管理它们。
一、创建 .NET 6 应用
接下来,我们需要定义连接 RabbitMQ 所需的配置选项。通常我们会将这些配置选项存储在一个类中。以下是配置类(RabbitMQServiceOptions.cs)的实现
为确保我们能够高效且安全地建立 RabbitMQ 连接,我们将创建一个名为 RabbitMQContext 的连接工厂类
接下来我们来实现一个 RabbitMQ 生产者,用于发送消息到队列。创建一个名为 RabbitMQProducer 的类:
接下来我们来实现一个 RabbitMQ 消费者,用于发送消息到队列。创建一个名为 RabbitMQConsumer 的类:
在 .NET 应用中,通常我们会使用依赖注入来管理服务的生命周期。以下是一个扩展类,用于将 RabbitMQ 的生产者和消费者注册到服务集合中:
RabbitMQ 是一个流行的开源消息队列系统,广泛用于实现异步通信、解耦组件、负载均衡等场景。在本篇博客中,我们将详细介绍如何在 .NET 6 中使用 RabbitMQ,包括生产者和消费者的实现,以及如何通过依赖注入来管理它们。
一、创建 .NET 6 应用
首先,确保你已经安装了 .NET 6 SDK。可以使用命令行工具创建一个新的 .NET 控制台应用:
- dotnet new console -n RabbitMQDemo
- cd RabbitMQDemo
接着,你需要安装 RabbitMQ 的客户端库,通过 NuGet 包管理器来安装 RabbitMQ.Client:- dotnet add package RabbitMQ.Client; //这里我们选择 6.4.0 版本
- dotnet add package Masuit.Tools.Core; //这个为一个
二、配置 RabbitMQ接下来,我们需要定义连接 RabbitMQ 所需的配置选项。通常我们会将这些配置选项存储在一个类中。以下是配置类(RabbitMQServiceOptions.cs)的实现
- /// <summary>
- /// RabbitMQ服务配置
- /// </summary>
- public class RabbitMQServiceOptions
- {
- /// <summary>
- /// 服务地址
- /// </summary>
- public string Host { get; set; }
-
- /// <summary>
- /// 端口
- /// </summary>
- public int Port { get; set; }
-
- /// <summary>
- /// 用户名
- /// </summary>
- public string UserName { get; set; }
-
- /// <summary>
- /// 密码
- /// </summary>
- public string Password { get; set; }
- }
三、实现 RabbitMQ 连接工厂为确保我们能够高效且安全地建立 RabbitMQ 连接,我们将创建一个名为 RabbitMQContext 的连接工厂类
- /// <summary>
- /// RabbitMQ连接工厂
- /// </summary>
- public class RabbitMQContext
- {
- private static ConnectionFactory? factory;
- private static readonly object lockObj = new();
-
- /// <summary>
- /// 获取单个RabbitMQ连接
- /// </summary>
- /// <returns></returns>
- public static IConnection GetConnection(string hostName, int port, string userName, string password)
- {
- if (factory == null)
- {
- lock (lockObj)
- {
- factory ??= new ConnectionFactory
- {
- HostName = hostName,
- Port = port,
- UserName = userName,
- Password = password
- };
- }
- }
-
- return factory.CreateConnection();
- }
- }
四、实现 RabbitMQ 生产者接下来我们来实现一个 RabbitMQ 生产者,用于发送消息到队列。创建一个名为 RabbitMQProducer 的类:
- /// <summary>
- /// RabbitMQ 客户端,用于发送消息到 RabbitMQ 队列或交换机。
- /// </summary>
- public class RabbitMQProducer
- {
- private readonly ILogger<RabbitMQProducer> _logger;
- private readonly RabbitMQServiceOptions _options;
-
- /// <summary>
- /// 初始化 RabbitMQ 客户端。
- /// </summary>
- /// <param name="logger">日志记录器。</param>
- /// <param name="options">RabbitMQ 连接配置。</param>
- public RabbitMQProducer(ILogger<RabbitMQProducer> logger, RabbitMQServiceOptions options)
- {
- _logger = logger;
- _options = options;
- }
-
- /****
- * RabbitMQ 交换机类型说明:
- * 1. Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
- * 例如,如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发。
- * 2. Fanout Exchange – 不处理路由键。只需将队列绑定到交换机上,发送到交换机的消息会被转发到所有绑定的队列。
- * 类似于广播,所有绑定的队列都会收到消息。
- * 3. Topic Exchange – 将路由键和某模式进行匹配。队列需要绑定到一个模式上。
- * 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
- * 例如,“audit.#”能够匹配到“audit.irs.corporate”,但“audit.*”只会匹配到“audit.irs”。
- ****/
-
- /// <summary>
- /// 发布消息(工作队列模式,适用于多消费者负载均衡)。
- /// </summary>
- /// <param name="queueName">队列名称。</param>
- /// <param name="message">消息内容。</param>
- public void WorkQueueSendMessage(string queueName, string message)
- {
- using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
- using var channel = connection.CreateModel();
-
- // 声明队列(如果不存在则创建),并设置为持久化
- channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
-
- // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
- var properties = channel.CreateBasicProperties();
- properties.Persistent = true;
-
- // 发送消息到指定队列
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(exchange: string.Empty, routingKey: queueName, basicProperties: properties, body: body);
- }
-
- /// <summary>
- /// 推送消息(简单模式,适用于单生产者和单消费者)。
- /// </summary>
- /// <param name="queueName">队列名称。</param>
- /// <param name="message">消息内容。</param>
- public void SimpleSendMessage(string queueName, string message)
- {
- using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
- using var channel = connection.CreateModel();
-
- // 声明队列(如果不存在则创建),并设置为持久化
- channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
-
- // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
- var properties = channel.CreateBasicProperties();
- properties.Persistent = true;
-
- // 发送消息到指定队列
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(exchange: string.Empty, routingKey: queueName, mandatory: false, basicProperties: properties, body: body);
- }
-
- /// <summary>
- /// 发布消息(Fanout Exchange 模式,适用于广播消息到所有绑定队列)。
- /// </summary>
- /// <param name="queueName">队列名称。</param>
- /// <param name="message">消息内容。</param>
- public void FanoutSendMessage(string queueName, string message)
- {
- using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
- using var channel = connection.CreateModel();
-
- // 创建 Fanout 类型的交换机
- var exchangeName = $"{queueName}_fanout_exchange";
- channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
-
- // 声明队列(如果不存在则创建),并设置为持久化
- channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
-
- // 将队列绑定到交换机,routingKey 无需指定
- channel.QueueBind(queueName, exchangeName, routingKey: string.Empty);
-
- // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
- var properties = channel.CreateBasicProperties();
- properties.Persistent = true;
-
- // 发送消息到交换机,所有绑定队列都会收到消息
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(exchangeName, routingKey: string.Empty, properties, body: body);
- }
-
- /// <summary>
- /// 发布消息(Fanout Exchange 模式,适用于广播消息到所有绑定队列)。
- /// </summary>
- /// <param name="exchangeName">交换机名称。</param>
- /// <param name="queueName">队列名称。</param>
- /// <param name="message">消息内容。</param>
- public void FanoutSendMessage(string exchangeName, string queueName, string message)
- {
- using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
- using var channel = connection.CreateModel();
-
- // 创建 Fanout 类型的交换机
- channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
-
- // 声明队列(如果不存在则创建),并设置为持久化
- channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
-
- // 将队列绑定到交换机,routingKey 无需指定
- channel.QueueBind(queueName, exchangeName, routingKey: string.Empty);
-
- // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
- var properties = channel.CreateBasicProperties();
- properties.Persistent = true;
-
- // 发送消息到交换机,所有绑定队列都会收到消息
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(exchangeName, routingKey: string.Empty, properties, body: body);
- }
-
- /// <summary>
- /// 发布消息(Direct Exchange 模式,适用于路由键完全匹配的消息分发)。
- /// </summary>
- /// <param name="queueName">队列名称。</param>
- /// <param name="message">消息内容。</param>
- public void DirectSendMessage(string queueName, string message)
- {
- using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
- using var channel = connection.CreateModel();
-
- // 创建 Direct 类型的交换机
- var exchangeName = $"{queueName}_direct_exchange";
- channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
-
- // 声明队列(如果不存在则创建),并设置为持久化
- channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
-
- // 将队列绑定到交换机,并指定路由键
- var routingKey = $"{queueName}";
- channel.QueueBind(queueName, exchangeName, routingKey);
-
- // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
- var properties = channel.CreateBasicProperties();
- properties.Persistent = true;
-
- // 发送消息到交换机,只有路由键完全匹配的队列才会收到消息
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(exchangeName, routingKey, properties, body: body);
- }
-
- /// <summary>
- /// 发布消息(Direct Exchange 模式,适用于路由键完全匹配的消息分发)。
- /// </summary>
- /// <param name="exchangeName">交换机名称。</param>
- /// <param name="queueName">队列名称。</param>
- /// <param name="routingKey">路由键。</param>
- /// <param name="message">消息内容。</param>
- public void DirectSendMessage(string exchangeName, string queueName, string routingKey, string message)
- {
- using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
- using var channel = connection.CreateModel();
-
- // 创建 Direct 类型的交换机
- channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
-
- // 声明队列(如果不存在则创建),并设置为持久化
- channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
-
- // 将队列绑定到交换机,并指定路由键
- channel.QueueBind(queueName, exchangeName, routingKey);
-
- // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
- var properties = channel.CreateBasicProperties();
- properties.Persistent = true;
-
- // 发送消息到交换机,只有路由键完全匹配的队列才会收到消息
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(exchangeName, routingKey, properties, body: body);
- }
-
- /// <summary>
- /// 发布消息(Topic Exchange 模式,适用于路由键模式匹配的消息分发)。
- /// </summary>
- /// <param name="queueName">队列名称。</param>
- /// <param name="routingKey">路由键。</param>
- /// <param name="message">消息内容。</param>
- /// <param name="bindingKeys">绑定规则(可选)。</param>
- public void TopicSendMessage(string queueName, string routingKey, string message, IEnumerable<string>? bindingKeys = null)
- {
- using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
- using var channel = connection.CreateModel();
-
- // 创建 Topic 类型的交换机
- var exchangeName = $"{queueName}_topic_exchange";
- channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
-
- // 声明队列(如果不存在则创建),并设置为持久化
- channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
-
- // 将队列绑定到交换机,并指定绑定规则
- if (!bindingKeys.IsNullOrEmpty())
- {
- foreach (string bindingKey in bindingKeys)
- {
- channel.QueueBind(queueName, exchangeName, bindingKey);
- }
- }
-
- // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
- var properties = channel.CreateBasicProperties();
- properties.Persistent = true;
-
- // 发送消息到交换机,只有路由键与绑定规则匹配的队列才会收到消息
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(exchangeName, routingKey, properties, body: body);
- }
-
- /// <summary>
- /// 发布消息(Topic Exchange 模式,适用于路由键模式匹配的消息分发)。
- /// </summary>
- /// <param name="exchangeName">交换机名称。</param>
- /// <param name="queueName">队列名称。</param>
- /// <param name="routingKey">路由键。</param>
- /// <param name="message">消息内容。</param>
- /// <param name="bindingKeys">绑定规则(可选)。</param>
- public void TopicSendMessage(string exchangeName, string queueName, string routingKey, string message, IEnumerable<string>? bindingKeys = null)
- {
- using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
- using var channel = connection.CreateModel();
-
- // 创建 Topic 类型的交换机
- channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
-
- // 声明队列(如果不存在则创建),并设置为持久化
- channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
-
- // 将队列绑定到交换机,并指定绑定规则
- if (!bindingKeys.IsNullOrEmpty())
- {
- foreach (string bindingKey in bindingKeys)
- {
- channel.QueueBind(queueName, exchangeName, bindingKey);
- }
- }
-
- // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失
- var properties = channel.CreateBasicProperties();
- properties.Persistent = true;
-
- // 发送消息到交换机,只有路由键与绑定规则匹配的队列才会收到消息
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(exchangeName, routingKey, properties, body: body);
- }
- }
五、实现 RabbitMQ 消费者接下来我们来实现一个 RabbitMQ 消费者,用于发送消息到队列。创建一个名为 RabbitMQConsumer 的类:
- /// <summary>
- /// RabbitMQ 消费者,用于从 RabbitMQ 队列或交换机中消费消息。
- /// </summary>
- public class RabbitMQConsumer
- {
- private readonly ILogger<RabbitMQConsumer> _logger;
- private readonly RabbitMQServiceOptions _options;
-
- /// <summary>
- /// 初始化 RabbitMQ 消费者。
- /// </summary>
- /// <param name="logger">日志记录器。</param>
- /// <param name="options">RabbitMQ 连接配置。</param>
- public RabbitMQConsumer(ILogger<RabbitMQConsumer> logger, RabbitMQServiceOptions options)
- {
- _logger = logger;
- _options = options;
- }
-
- /// <summary>
- /// 简单消费者(简单模式,适用于单生产者和单消费者)。
- /// </summary>
- /// <param name="queueName">队列名称。</param>
- /// <param name="handler">消息处理逻辑。</param>
- public void SimpleConsumer(string queueName, Action<string> handler)
- {
- var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
- var channel = connection.CreateModel();
-
- // 声明队列(如果不存在则创建),并设置为持久化
- channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
-
- // 创建消费者
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) =>
- {
- // 处理消息
- var message = Encoding.UTF8.GetString(ea.Body.ToArray());
- handler(message);
- };
-
- // 开始消费,自动确认消息
- channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
- }
-
- /// <summary>
- /// 消费者(工作队列模式,适用于多消费者负载均衡)。
- /// </summary>
- /// <param name="queueName">队列名称。</param>
- /// <param name="handler">消息处理逻辑。</param>
- public void WorkConsumer(string queueName, Func<string, bool> handler)
- {
- var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
- var channel = connection.CreateModel();
-
- // 声明队列(如果不存在则创建),并设置为持久化
- channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
-
- // 设置限流,避免消费者一次性接收过多消息
- channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
-
- // 创建消费者
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) =>
- {
- // 处理消息
- var message = Encoding.UTF8.GetString(ea.Body.ToArray());
- var result = handler(message);
-
- // 如果消息处理成功,手动确认消息
- if (result)
- {
- channel.BasicAck(ea.DeliveryTag, false);
- }
- };
-
- // 开始消费,手动确认消息
- channel.BasicConsume(queueName, autoAck: false, consumer: consumer);
- }
-
- /// <summary>
- /// 消费者(发布/订阅模式,适用于广播消息到所有绑定队列)。
- /// </summary>
- /// <param name="exchangeName">交换机名称。</param>
- /// <param name="queueName">队列名称。</param>
- /// <param name="handler">消息处理逻辑。</param>
- public void PubSubConsumer(string exchangeName, string queueName, Action<string> handler)
- {
- var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
- var channel = connection.CreateModel();
-
- // 声明 Fanout 类型的交换机
- channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
-
- // 声明队列(如果不存在则创建),并设置为持久化
- var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
-
- // 将队列绑定到交换机
- channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: "");
-
- // 创建消费者
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) =>
- {
- // 处理消息
- var message = Encoding.UTF8.GetString(ea.Body.ToArray());
- handler(message);
-
- // 手动确认消息
- channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
- };
-
- // 开始消费,自动确认消息
- channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
- }
-
- /// <summary>
- /// 消费者(路由模式,适用于路由键完全匹配的消息分发)。
- /// </summary>
- /// <param name="exchangeName">交换机名称。</param>
- /// <param name="queueName">队列名称。</param>
- /// <param name="routingKey">路由键。</param>
- /// <param name="handler">消息处理逻辑。</param>
- public void RoutingConsumer(string exchangeName, string queueName, string routingKey, Action<string> handler)
- {
- var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
- var channel = connection.CreateModel();
-
- // 声明 Direct 类型的交换机
- channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
-
- // 声明队列(如果不存在则创建),并设置为持久化
- var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
-
- // 将队列绑定到交换机,并指定路由键
- channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey);
-
- // 创建消费者
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) =>
- {
- // 处理消息
- var message = Encoding.UTF8.GetString(ea.Body.ToArray());
- handler(message);
-
- // 手动确认消息
- channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
- };
-
- // 开始消费,自动确认消息
- channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
- }
-
- /// <summary>
- /// 消费者(主题模式,适用于路由键模式匹配的消息分发)。
- /// </summary>
- /// <param name="exchangeName">交换机名称。</param>
- /// <param name="queueName">队列名称。</param>
- /// <param name="routingKey">路由键。</param>
- /// <param name="handler">消息处理逻辑。</param>
- public void TopicConsumer(string exchangeName, string queueName, string routingKey, Action<string> handler)
- {
- var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password);
- var channel = connection.CreateModel();
-
- // 声明 Topic 类型的交换机
- channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Topic);
-
- // 声明队列(如果不存在则创建),并设置为持久化
- var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
-
- // 将队列绑定到交换机,并指定路由键
- channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey);
-
- // 创建消费者
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) =>
- {
- // 处理消息
- var message = Encoding.UTF8.GetString(ea.Body.ToArray());
- handler(message);
-
- // 手动确认消息
- channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
- };
-
- // 开始消费,手动确认消息
- channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
- }
- }
六、整合到.Net6的注入依赖:在 .NET 应用中,通常我们会使用依赖注入来管理服务的生命周期。以下是一个扩展类,用于将 RabbitMQ 的生产者和消费者注册到服务集合中:
- /// <summary>
- /// RabbitMQ 服务集合扩展类,用于将 RabbitMQ 客户端和监听器添加到依赖注入容器。
- /// </summary>
- public static class RabbitMQServiceCollectionExtensions
- {
- /// <summary>
- /// 添加 RabbitMQ 服务到服务集合。
- /// </summary>
- /// <param name="services">服务集合。</param>
- /// <returns>服务集合。</returns>
- /// <exception cref="ArgumentNullException">当配置选项无效时抛出。</exception>
- public static IServiceCollection AddRabbmitMQ(this IServiceCollection services)
- {
- // 从容器中获取配置的 RabbitMQServiceOptions
- var serviceProvider = services.BuildServiceProvider();
- var serviceOptions = serviceProvider.GetRequiredService<IOptions<RabbitMQServiceOptions>>().Value;
-
- // 验证服务选项是否有效
- if (serviceOptions == null || serviceOptions.Host.IsNullOrEmpty() || serviceOptions.Port < 0)
- {
- throw new ArgumentNullException(nameof(serviceOptions), "RabbitMQ service options must be provided with valid settings.");
- }
-
- // 注册 RabbitMQ 客户端作为单例服务
- services.AddSingleton(sp => new RabbitMQProducer(
- sp.GetRequiredService<ILogger<RabbitMQProducer>>(),
- serviceOptions));
-
- // 注册 RabbitMQ消费端作为单例服务
- services.AddSingleton(sp => new RabbitMQConsumer(
- sp.GetRequiredService<ILogger<RabbitMQConsumer>>(),
- serviceOptions));
-
- return services;
- }
- }
七、注入 RabbitMQ到程序(Program.cs):- var builder = WebApplication.CreateBuilder(args);
-
- builder.Services.AddControllers();
-
- // 加载 RabbitMQ 配置
- builder.Services.Configure<RabbitMQServiceOptions>(builder.Configuration.GetSection("RabbitMQ"));
-
- // 添加 RabbitMQ 客户端和监听器
- builder.Services.AddRabbmitMQ();
-
- var app = builder.Build();
-
- // Configure the HTTP request pipeline.
-
- app.UseAuthorization();
-
- app.MapControllers();
-
- app.Run();
八、生产者的Demo:
- using Microsoft.Extensions.Logging;
- using RabbitMQ.Client;
- using System;
- using System.Collections.Generic;
- using System.Text;
-
- class Program
- {
- static void Main(string[] args)
- {
- // 配置 RabbitMQ 连接选项
- var options = new RabbitMQServiceOptions
- {
- Host = "localhost",
- Port = 5672,
- UserName = "guest",
- Password = "guest"
- };
-
- // 创建日志记录器(这里使用控制台日志)
- using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
- var logger = loggerFactory.CreateLogger<RabbitMQProducer>();
-
- // 创建 RabbitMQ 生产者
- var producer = new RabbitMQProducer(logger, options);
-
- // 发送工作队列消息
- producer.WorkQueueSendMessage("work_queue", "Hello Work Queue");
-
- // 发送简单队列消息
- producer.SimpleSendMessage("simple_queue", "Hello Simple Queue");
-
- // 发送 Fanout 交换机消息(自动生成交换机名称)
- producer.FanoutSendMessage("fanout_queue", "Hello Fanout Queue");
-
- // 发送 Fanout 交换机消息(指定交换机名称)
- producer.FanoutSendMessage("my_fanout_exchange", "fanout_queue", "Hello Fanout Queue");
-
- // 发送 Direct 交换机消息(自动生成交换机名称)
- producer.DirectSendMessage("direct_queue", "Hello Direct Queue");
-
- // 发送 Direct 交换机消息(指定交换机名称和路由键)
- producer.DirectSendMessage("my_direct_exchange", "direct_queue", "direct_key", "Hello Direct Queue");
-
- // 发送 Topic 交换机消息(自动生成交换机名称)
- producer.TopicSendMessage("topic_queue", "topic.key", "Hello Topic Queue", new List<string> { "topic.*" });
-
- // 发送 Topic 交换机消息(指定交换机名称和路由键)
- producer.TopicSendMessage("my_topic_exchange", "topic_queue", "topic.key", "Hello Topic Queue", new List<string> { "topic.*" });
-
- Console.WriteLine("消息发送完成!");
- }
- }
通过以上示例代码,你可以轻松地在 .NET 6 中使用 RabbitMQ 实现多种消息发送模式。每种模式都有其特定的应用场景,例如:
工作队列模式:适用于多消费者负载均衡。简单模式:适用于单生产者和单消费者。
Fanout 交换机模式:适用于广播消息到所有绑定队列。
Direct 交换机模式:适用于路由键完全匹配的消息分发。
Topic 交换机模式:适用于路由键模式匹配的消息分发。
九、消费者Demo:
- using Microsoft.Extensions.Logging;
- using RabbitMQ.Client;
- using System;
- using System.Text;
-
- class Program
- {
- static void Main(string[] args)
- {
- // 配置 RabbitMQ 连接选项
- var options = new RabbitMQServiceOptions
- {
- Host = "localhost",
- Port = 5672,
- UserName = "guest",
- Password = "guest"
- };
-
- // 创建日志记录器(这里使用控制台日志)
- using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
- var logger = loggerFactory.CreateLogger<RabbitMQConsumer>();
-
- // 创建 RabbitMQ 消费者
- var consumer = new RabbitMQConsumer(logger, options);
-
- // 简单消费者
- consumer.SimpleConsumer("simple_queue", message =>
- {
- Console.WriteLine($"接收到简单队列消息: {message}");
- });
-
- // 工作队列消费者
- consumer.WorkConsumer("work_queue", message =>
- {
- Console.WriteLine($"接收到工作队列消息: {message}");
- return true; // 处理成功,手动确认消息
- });
-
- // 发布/订阅消费者
- consumer.PubSubConsumer("my_fanout_exchange", "fanout_queue", message =>
- {
- Console.WriteLine($"接收到发布/订阅消息: {message}");
- });
-
- // 路由消费者
- consumer.RoutingConsumer("my_direct_exchange", "direct_queue", "direct_key", message =>
- {
- Console.WriteLine($"接收到路由消息: {message}");
- });
-
- // 主题消费者
- consumer.TopicConsumer("my_topic_exchange", "topic_queue", "topic.key", message =>
- {
- Console.WriteLine($"接收到主题消息: {message}");
- });
-
- Console.WriteLine("消费者已启动,等待接收消息...");
- Console.ReadLine(); // 保持程序运行
- }
- }
通过以上示例代码,你可以轻松地在 .NET 6 中使用 RabbitMQ 实现多种消息消费模式。每种模式都有其特定的应用场景,例如:
简单消费者:适用于单生产者和单消费者。工作队列消费者:适用于多消费者负载均衡。
发布/订阅消费者:适用于广播消息到所有绑定队列。
路由消费者:适用于路由键完全匹配的消息分发。
主题消费者:适用于路由键模式匹配的消息分发。
END
评价
排名
158
文章
3
粉丝
0
评论
1
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:
50010702506256


欢迎加群交流技术