分类:
.NET6
解释:
RabbitMQ 是一个流行的开源消息队列系统,广泛用于实现异步通信、解耦组件、负载均衡等场景。在本篇博客中,我们将详细介绍如何在 .NET 6 中使用 RabbitMQ,包括生产者和消费者的实现,以及如何通过依赖注入来管理它们。
一、创建 .NET 6 应用
接下来,我们需要定义连接 RabbitMQ 所需的配置选项。通常我们会将这些配置选项存储在一个类中。以下是配置类(RabbitMQServiceOptions.cs)的实现
为确保我们能够高效且安全地建立 RabbitMQ 连接,我们将创建一个名为 RabbitMQContext 的连接工厂类
接下来我们来实现一个 RabbitMQ 生产者,用于发送消息到队列。创建一个名为 RabbitMQProducer 的类:
接下来我们来实现一个 RabbitMQ 消费者,用于发送消息到队列。创建一个名为 RabbitMQConsumer 的类:
在 .NET 应用中,通常我们会使用依赖注入来管理服务的生命周期。以下是一个扩展类,用于将 RabbitMQ 的生产者和消费者注册到服务集合中:
RabbitMQ 是一个流行的开源消息队列系统,广泛用于实现异步通信、解耦组件、负载均衡等场景。在本篇博客中,我们将详细介绍如何在 .NET 6 中使用 RabbitMQ,包括生产者和消费者的实现,以及如何通过依赖注入来管理它们。
一、创建 .NET 6 应用
首先,确保你已经安装了 .NET 6 SDK。可以使用命令行工具创建一个新的 .NET 控制台应用:
dotnet new console -n RabbitMQDemo cd RabbitMQDemo接着,你需要安装 RabbitMQ 的客户端库,通过 NuGet 包管理器来安装 RabbitMQ.Client:
dotnet add package RabbitMQ.Client; //这里我们选择 6.4.0 版本 dotnet add package Masuit.Tools.Core; //这个为一个二、配置 RabbitMQ
接下来,我们需要定义连接 RabbitMQ 所需的配置选项。通常我们会将这些配置选项存储在一个类中。以下是配置类(RabbitMQServiceOptions.cs)的实现
/// <summary> /// RabbitMQ服务配置 /// </summary> public class RabbitMQServiceOptions { /// <summary> /// 服务地址 /// </summary> public string Host { get; set; } /// <summary> /// 端口 /// </summary> public int Port { get; set; } /// <summary> /// 用户名 /// </summary> public string UserName { get; set; } /// <summary> /// 密码 /// </summary> public string Password { get; set; } }三、实现 RabbitMQ 连接工厂
为确保我们能够高效且安全地建立 RabbitMQ 连接,我们将创建一个名为 RabbitMQContext 的连接工厂类
/// <summary> /// RabbitMQ连接工厂 /// </summary> public class RabbitMQContext { private static ConnectionFactory? factory; private static readonly object lockObj = new(); /// <summary> /// 获取单个RabbitMQ连接 /// </summary> /// <returns></returns> public static IConnection GetConnection(string hostName, int port, string userName, string password) { if (factory == null) { lock (lockObj) { factory ??= new ConnectionFactory { HostName = hostName, Port = port, UserName = userName, Password = password }; } } return factory.CreateConnection(); } }四、实现 RabbitMQ 生产者
接下来我们来实现一个 RabbitMQ 生产者,用于发送消息到队列。创建一个名为 RabbitMQProducer 的类:
/// <summary> /// RabbitMQ 客户端,用于发送消息到 RabbitMQ 队列或交换机。 /// </summary> public class RabbitMQProducer { private readonly ILogger<RabbitMQProducer> _logger; private readonly RabbitMQServiceOptions _options; /// <summary> /// 初始化 RabbitMQ 客户端。 /// </summary> /// <param name="logger">日志记录器。</param> /// <param name="options">RabbitMQ 连接配置。</param> public RabbitMQProducer(ILogger<RabbitMQProducer> logger, RabbitMQServiceOptions options) { _logger = logger; _options = options; } /**** * RabbitMQ 交换机类型说明: * 1. Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。 * 例如,如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发。 * 2. Fanout Exchange – 不处理路由键。只需将队列绑定到交换机上,发送到交换机的消息会被转发到所有绑定的队列。 * 类似于广播,所有绑定的队列都会收到消息。 * 3. Topic Exchange – 将路由键和某模式进行匹配。队列需要绑定到一个模式上。 * 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。 * 例如,“audit.#”能够匹配到“audit.irs.corporate”,但“audit.*”只会匹配到“audit.irs”。 ****/ /// <summary> /// 发布消息(工作队列模式,适用于多消费者负载均衡)。 /// </summary> /// <param name="queueName">队列名称。</param> /// <param name="message">消息内容。</param> public void WorkQueueSendMessage(string queueName, string message) { using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); using var channel = connection.CreateModel(); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送消息到指定队列 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: string.Empty, routingKey: queueName, basicProperties: properties, body: body); } /// <summary> /// 推送消息(简单模式,适用于单生产者和单消费者)。 /// </summary> /// <param name="queueName">队列名称。</param> /// <param name="message">消息内容。</param> public void SimpleSendMessage(string queueName, string message) { using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); using var channel = connection.CreateModel(); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送消息到指定队列 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: string.Empty, routingKey: queueName, mandatory: false, basicProperties: properties, body: body); } /// <summary> /// 发布消息(Fanout Exchange 模式,适用于广播消息到所有绑定队列)。 /// </summary> /// <param name="queueName">队列名称。</param> /// <param name="message">消息内容。</param> public void FanoutSendMessage(string queueName, string message) { using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); using var channel = connection.CreateModel(); // 创建 Fanout 类型的交换机 var exchangeName = $"{queueName}_fanout_exchange"; channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false); // 将队列绑定到交换机,routingKey 无需指定 channel.QueueBind(queueName, exchangeName, routingKey: string.Empty); // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送消息到交换机,所有绑定队列都会收到消息 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchangeName, routingKey: string.Empty, properties, body: body); } /// <summary> /// 发布消息(Fanout Exchange 模式,适用于广播消息到所有绑定队列)。 /// </summary> /// <param name="exchangeName">交换机名称。</param> /// <param name="queueName">队列名称。</param> /// <param name="message">消息内容。</param> public void FanoutSendMessage(string exchangeName, string queueName, string message) { using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); using var channel = connection.CreateModel(); // 创建 Fanout 类型的交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false); // 将队列绑定到交换机,routingKey 无需指定 channel.QueueBind(queueName, exchangeName, routingKey: string.Empty); // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送消息到交换机,所有绑定队列都会收到消息 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchangeName, routingKey: string.Empty, properties, body: body); } /// <summary> /// 发布消息(Direct Exchange 模式,适用于路由键完全匹配的消息分发)。 /// </summary> /// <param name="queueName">队列名称。</param> /// <param name="message">消息内容。</param> public void DirectSendMessage(string queueName, string message) { using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); using var channel = connection.CreateModel(); // 创建 Direct 类型的交换机 var exchangeName = $"{queueName}_direct_exchange"; channel.ExchangeDeclare(exchangeName, ExchangeType.Direct); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false); // 将队列绑定到交换机,并指定路由键 var routingKey = $"{queueName}"; channel.QueueBind(queueName, exchangeName, routingKey); // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送消息到交换机,只有路由键完全匹配的队列才会收到消息 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchangeName, routingKey, properties, body: body); } /// <summary> /// 发布消息(Direct Exchange 模式,适用于路由键完全匹配的消息分发)。 /// </summary> /// <param name="exchangeName">交换机名称。</param> /// <param name="queueName">队列名称。</param> /// <param name="routingKey">路由键。</param> /// <param name="message">消息内容。</param> public void DirectSendMessage(string exchangeName, string queueName, string routingKey, string message) { using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); using var channel = connection.CreateModel(); // 创建 Direct 类型的交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false); // 将队列绑定到交换机,并指定路由键 channel.QueueBind(queueName, exchangeName, routingKey); // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送消息到交换机,只有路由键完全匹配的队列才会收到消息 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchangeName, routingKey, properties, body: body); } /// <summary> /// 发布消息(Topic Exchange 模式,适用于路由键模式匹配的消息分发)。 /// </summary> /// <param name="queueName">队列名称。</param> /// <param name="routingKey">路由键。</param> /// <param name="message">消息内容。</param> /// <param name="bindingKeys">绑定规则(可选)。</param> public void TopicSendMessage(string queueName, string routingKey, string message, IEnumerable<string>? bindingKeys = null) { using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); using var channel = connection.CreateModel(); // 创建 Topic 类型的交换机 var exchangeName = $"{queueName}_topic_exchange"; channel.ExchangeDeclare(exchangeName, ExchangeType.Topic); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false); // 将队列绑定到交换机,并指定绑定规则 if (!bindingKeys.IsNullOrEmpty()) { foreach (string bindingKey in bindingKeys) { channel.QueueBind(queueName, exchangeName, bindingKey); } } // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送消息到交换机,只有路由键与绑定规则匹配的队列才会收到消息 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchangeName, routingKey, properties, body: body); } /// <summary> /// 发布消息(Topic Exchange 模式,适用于路由键模式匹配的消息分发)。 /// </summary> /// <param name="exchangeName">交换机名称。</param> /// <param name="queueName">队列名称。</param> /// <param name="routingKey">路由键。</param> /// <param name="message">消息内容。</param> /// <param name="bindingKeys">绑定规则(可选)。</param> public void TopicSendMessage(string exchangeName, string queueName, string routingKey, string message, IEnumerable<string>? bindingKeys = null) { using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); using var channel = connection.CreateModel(); // 创建 Topic 类型的交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Topic); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false); // 将队列绑定到交换机,并指定绑定规则 if (!bindingKeys.IsNullOrEmpty()) { foreach (string bindingKey in bindingKeys) { channel.QueueBind(queueName, exchangeName, bindingKey); } } // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送消息到交换机,只有路由键与绑定规则匹配的队列才会收到消息 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchangeName, routingKey, properties, body: body); } }五、实现 RabbitMQ 消费者
接下来我们来实现一个 RabbitMQ 消费者,用于发送消息到队列。创建一个名为 RabbitMQConsumer 的类:
/// <summary> /// RabbitMQ 消费者,用于从 RabbitMQ 队列或交换机中消费消息。 /// </summary> public class RabbitMQConsumer { private readonly ILogger<RabbitMQConsumer> _logger; private readonly RabbitMQServiceOptions _options; /// <summary> /// 初始化 RabbitMQ 消费者。 /// </summary> /// <param name="logger">日志记录器。</param> /// <param name="options">RabbitMQ 连接配置。</param> public RabbitMQConsumer(ILogger<RabbitMQConsumer> logger, RabbitMQServiceOptions options) { _logger = logger; _options = options; } /// <summary> /// 简单消费者(简单模式,适用于单生产者和单消费者)。 /// </summary> /// <param name="queueName">队列名称。</param> /// <param name="handler">消息处理逻辑。</param> public void SimpleConsumer(string queueName, Action<string> handler) { var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); var channel = connection.CreateModel(); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // 创建消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { // 处理消息 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); handler(message); }; // 开始消费,自动确认消息 channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); } /// <summary> /// 消费者(工作队列模式,适用于多消费者负载均衡)。 /// </summary> /// <param name="queueName">队列名称。</param> /// <param name="handler">消息处理逻辑。</param> public void WorkConsumer(string queueName, Func<string, bool> handler) { var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); var channel = connection.CreateModel(); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false); // 设置限流,避免消费者一次性接收过多消息 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); // 创建消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { // 处理消息 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); var result = handler(message); // 如果消息处理成功,手动确认消息 if (result) { channel.BasicAck(ea.DeliveryTag, false); } }; // 开始消费,手动确认消息 channel.BasicConsume(queueName, autoAck: false, consumer: consumer); } /// <summary> /// 消费者(发布/订阅模式,适用于广播消息到所有绑定队列)。 /// </summary> /// <param name="exchangeName">交换机名称。</param> /// <param name="queueName">队列名称。</param> /// <param name="handler">消息处理逻辑。</param> public void PubSubConsumer(string exchangeName, string queueName, Action<string> handler) { var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); var channel = connection.CreateModel(); // 声明 Fanout 类型的交换机 channel.ExchangeDeclare(exchange: exchangeName, type: "fanout"); // 声明队列(如果不存在则创建),并设置为持久化 var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // 将队列绑定到交换机 channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: ""); // 创建消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { // 处理消息 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); handler(message); // 手动确认消息 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; // 开始消费,自动确认消息 channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); } /// <summary> /// 消费者(路由模式,适用于路由键完全匹配的消息分发)。 /// </summary> /// <param name="exchangeName">交换机名称。</param> /// <param name="queueName">队列名称。</param> /// <param name="routingKey">路由键。</param> /// <param name="handler">消息处理逻辑。</param> public void RoutingConsumer(string exchangeName, string queueName, string routingKey, Action<string> handler) { var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); var channel = connection.CreateModel(); // 声明 Direct 类型的交换机 channel.ExchangeDeclare(exchange: exchangeName, type: "direct"); // 声明队列(如果不存在则创建),并设置为持久化 var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // 将队列绑定到交换机,并指定路由键 channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey); // 创建消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { // 处理消息 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); handler(message); // 手动确认消息 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; // 开始消费,自动确认消息 channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); } /// <summary> /// 消费者(主题模式,适用于路由键模式匹配的消息分发)。 /// </summary> /// <param name="exchangeName">交换机名称。</param> /// <param name="queueName">队列名称。</param> /// <param name="routingKey">路由键。</param> /// <param name="handler">消息处理逻辑。</param> public void TopicConsumer(string exchangeName, string queueName, string routingKey, Action<string> handler) { var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); var channel = connection.CreateModel(); // 声明 Topic 类型的交换机 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Topic); // 声明队列(如果不存在则创建),并设置为持久化 var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // 将队列绑定到交换机,并指定路由键 channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey); // 创建消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { // 处理消息 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); handler(message); // 手动确认消息 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; // 开始消费,手动确认消息 channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); } }六、整合到.Net6的注入依赖:
在 .NET 应用中,通常我们会使用依赖注入来管理服务的生命周期。以下是一个扩展类,用于将 RabbitMQ 的生产者和消费者注册到服务集合中:
/// <summary> /// RabbitMQ 服务集合扩展类,用于将 RabbitMQ 客户端和监听器添加到依赖注入容器。 /// </summary> public static class RabbitMQServiceCollectionExtensions { /// <summary> /// 添加 RabbitMQ 服务到服务集合。 /// </summary> /// <param name="services">服务集合。</param> /// <returns>服务集合。</returns> /// <exception cref="ArgumentNullException">当配置选项无效时抛出。</exception> public static IServiceCollection AddRabbmitMQ(this IServiceCollection services) { // 从容器中获取配置的 RabbitMQServiceOptions var serviceProvider = services.BuildServiceProvider(); var serviceOptions = serviceProvider.GetRequiredService<IOptions<RabbitMQServiceOptions>>().Value; // 验证服务选项是否有效 if (serviceOptions == null || serviceOptions.Host.IsNullOrEmpty() || serviceOptions.Port < 0) { throw new ArgumentNullException(nameof(serviceOptions), "RabbitMQ service options must be provided with valid settings."); } // 注册 RabbitMQ 客户端作为单例服务 services.AddSingleton(sp => new RabbitMQProducer( sp.GetRequiredService<ILogger<RabbitMQProducer>>(), serviceOptions)); // 注册 RabbitMQ消费端作为单例服务 services.AddSingleton(sp => new RabbitMQConsumer( sp.GetRequiredService<ILogger<RabbitMQConsumer>>(), serviceOptions)); return services; } }七、注入 RabbitMQ到程序(Program.cs):
var builder = WebApplication.CreateBuilder(args); builder.Services.AddControllers(); // 加载 RabbitMQ 配置 builder.Services.Configure<RabbitMQServiceOptions>(builder.Configuration.GetSection("RabbitMQ")); // 添加 RabbitMQ 客户端和监听器 builder.Services.AddRabbmitMQ(); var app = builder.Build(); // Configure the HTTP request pipeline. app.UseAuthorization(); app.MapControllers(); app.Run();
八、生产者的Demo:
using Microsoft.Extensions.Logging; using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; class Program { static void Main(string[] args) { // 配置 RabbitMQ 连接选项 var options = new RabbitMQServiceOptions { Host = "localhost", Port = 5672, UserName = "guest", Password = "guest" }; // 创建日志记录器(这里使用控制台日志) using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole()); var logger = loggerFactory.CreateLogger<RabbitMQProducer>(); // 创建 RabbitMQ 生产者 var producer = new RabbitMQProducer(logger, options); // 发送工作队列消息 producer.WorkQueueSendMessage("work_queue", "Hello Work Queue"); // 发送简单队列消息 producer.SimpleSendMessage("simple_queue", "Hello Simple Queue"); // 发送 Fanout 交换机消息(自动生成交换机名称) producer.FanoutSendMessage("fanout_queue", "Hello Fanout Queue"); // 发送 Fanout 交换机消息(指定交换机名称) producer.FanoutSendMessage("my_fanout_exchange", "fanout_queue", "Hello Fanout Queue"); // 发送 Direct 交换机消息(自动生成交换机名称) producer.DirectSendMessage("direct_queue", "Hello Direct Queue"); // 发送 Direct 交换机消息(指定交换机名称和路由键) producer.DirectSendMessage("my_direct_exchange", "direct_queue", "direct_key", "Hello Direct Queue"); // 发送 Topic 交换机消息(自动生成交换机名称) producer.TopicSendMessage("topic_queue", "topic.key", "Hello Topic Queue", new List<string> { "topic.*" }); // 发送 Topic 交换机消息(指定交换机名称和路由键) producer.TopicSendMessage("my_topic_exchange", "topic_queue", "topic.key", "Hello Topic Queue", new List<string> { "topic.*" }); Console.WriteLine("消息发送完成!"); } }
通过以上示例代码,你可以轻松地在 .NET 6 中使用 RabbitMQ 实现多种消息发送模式。每种模式都有其特定的应用场景,例如:
工作队列模式:适用于多消费者负载均衡。简单模式:适用于单生产者和单消费者。
Fanout 交换机模式:适用于广播消息到所有绑定队列。
Direct 交换机模式:适用于路由键完全匹配的消息分发。
Topic 交换机模式:适用于路由键模式匹配的消息分发。
九、消费者Demo:
using Microsoft.Extensions.Logging; using RabbitMQ.Client; using System; using System.Text; class Program { static void Main(string[] args) { // 配置 RabbitMQ 连接选项 var options = new RabbitMQServiceOptions { Host = "localhost", Port = 5672, UserName = "guest", Password = "guest" }; // 创建日志记录器(这里使用控制台日志) using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole()); var logger = loggerFactory.CreateLogger<RabbitMQConsumer>(); // 创建 RabbitMQ 消费者 var consumer = new RabbitMQConsumer(logger, options); // 简单消费者 consumer.SimpleConsumer("simple_queue", message => { Console.WriteLine($"接收到简单队列消息: {message}"); }); // 工作队列消费者 consumer.WorkConsumer("work_queue", message => { Console.WriteLine($"接收到工作队列消息: {message}"); return true; // 处理成功,手动确认消息 }); // 发布/订阅消费者 consumer.PubSubConsumer("my_fanout_exchange", "fanout_queue", message => { Console.WriteLine($"接收到发布/订阅消息: {message}"); }); // 路由消费者 consumer.RoutingConsumer("my_direct_exchange", "direct_queue", "direct_key", message => { Console.WriteLine($"接收到路由消息: {message}"); }); // 主题消费者 consumer.TopicConsumer("my_topic_exchange", "topic_queue", "topic.key", message => { Console.WriteLine($"接收到主题消息: {message}"); }); Console.WriteLine("消费者已启动,等待接收消息..."); Console.ReadLine(); // 保持程序运行 } }
通过以上示例代码,你可以轻松地在 .NET 6 中使用 RabbitMQ 实现多种消息消费模式。每种模式都有其特定的应用场景,例如:
简单消费者:适用于单生产者和单消费者。工作队列消费者:适用于多消费者负载均衡。
发布/订阅消费者:适用于广播消息到所有绑定队列。
路由消费者:适用于路由键完全匹配的消息分发。
主题消费者:适用于路由键模式匹配的消息分发。
END
评价
排名
6
文章
6
粉丝
16
评论
8
{{item.articleTitle}}
{{item.blogName}} : {{item.content}}
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:50010702506256
欢迎加群交流技术