排名
7
文章
192
粉丝
15
评论
16
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:
50010702506256


欢迎加群交流技术

都说rabbitmq可以实现异步处理,流量削峰,所有的概念都了解了一遍,但是真正使用的时候难以下手,
那么这篇博客可能会对你有帮助
如何安装rabbitmq 这里就不介绍了。网上一大堆,或者看我之前博客也能够安装。
下边直接上干货
首先,先把基本环境搭建起来
- //这里是配置的MQ 的链接,将这里替换成自己的MQ
- var connectionFactory = new ConnectionFactory()
- {
- HostName = GlobalContext.SystemConfig.RabbitMQConnection.HostName,
- UserName = GlobalContext.SystemConfig.RabbitMQConnection.UserName,
- Password = GlobalContext.SystemConfig.RabbitMQConnection.Password,
- Port = GlobalContext.SystemConfig.RabbitMQConnection.Port
- };
-
- services.AddSingleton<ConnectionFactory>(connectionFactory);
- //创建链接
- var connection = connectionFactory.CreateConnection();
- services.AddSingleton<IConnection>(connection);
- var channel = connection.CreateModel();
- //注册交换机
- channel.AddMQExchange();
- //注册队列
- channel.AddMQQueue();
- //队列绑定到交换机
- channel.BindToExchange();
- services.AddSingleton<IModel>(channel);
- //注册消费者服务
- //这个报错先注释调下边一行,等下边创建消费者之后再在这里注册消费者
- services.AddHostedService<OrderConsumerService>();
- Console.WriteLine($"RabbitMQ环境准备成功,当前RabbitMQ服务器为{GlobalContext.SystemConfig.RabbitMQConnection.HostName}");
- public static class ExchangeService
- {
- /// <summary>
- /// 声明交换机
- /// </summary>
- /// <param name="channel"></param>
- public static void AddMQExchange(this IModel channel)
- {
- //添加订单交换机
- channel.ExchangeDeclare("order_exchange", "direct");
- }
- }
- public static class QueueService
- {
- /// <summary>
- /// 声明队列
- /// </summary>
- /// <param name="channel"></param>
- public static void AddMQQueue(this IModel channel)
- {
- // 声明订单队列
- channel.QueueDeclare(queue: "order_queue", durable: true, exclusive: false, autoDelete: false,
- arguments: null);
- }
- /// <summary>
- /// 队列绑定到交换机
- /// </summary>
- /// <param name="channel"></param>
- public static void BindToExchange(this IModel channel)
- {
- channel.QueueBind("order_queue", "order_exchange", "place_order");
- }
- }
首先MQ 最重要的两个角色,消息生产者,消息消费者。应该如何用代码实现呢?
首先先理解生产与消费,生产就是说请求进来了,带了些什么参数,要干什么事。比如我下单了一件商品。
我这里写一个订单服务,这一步就将消息添加进入队列了。现在队列中就有消息了。这一步一般都是通过接口进来的。
因为要异步处理,接收与处理就分开了,这里就只管接收消息加入进队列
- //这是将消息添加进交换机,交换机会将消息发送到队列
- // _channel 通过依赖注入获取,就是上边注册那个IModel
- public async Task<bool> OrderJoinMQAsync(CreateOrderModel arg)
- {
- var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(arg));
- _channel.BasicPublish(exchange: "order_exchange", routingKey: "place_order", basicProperties: null, body: body);
- return true;
- }
有消息之后如何消费呢,现在去创建一个消费者服务
- public class OrderConsumerService : IHostedService, IDisposable
- {
- private readonly ILogger<OrderConsumerService> _logger;
- private readonly IModel _channel;
- private readonly EventingBasicConsumer _consumer;
- private readonly IOrdersService _orderService = GlobalContext.ServiceProvider.GetService<IOrdersService>();
- public OrderConsumerService(ILogger<OrderConsumerService> logger, IModel channel)
- {
- _logger = logger;
- _channel = channel;
- _consumer = new EventingBasicConsumer(_channel);
- }
-
- public void Dispose()
- {
- StopAsync(CancellationToken.None).Wait();
- }
- //消费消息 public async Task StartAsync(CancellationToken cancellationToken)
- {
- _consumer.Received += async (model, ea) =>
- {
- var body = ea.Body.ToArray();
- var message = Encoding.UTF8.GetString(body);
- // 扣库存,操作数据库
- var result = await _orderService.OrderSubmitAsync(message.ToObject<CreateOrderModel>());
- // 确认消息已经被成功处理
- _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
- };
- // 开始接收消息,参数为队列名、是否自动应答、消费者实例
- _channel.BasicConsume(queue: "order_queue", autoAck: false, consumer: _consumer);
- }
-
- public Task StopAsync(CancellationToken cancellationToken)
- {
- return Task.CompletedTask;
- }
- }
现在再将消费者注册到程序启动时,如果上边第一步的时候注释了,现在就可以加上了
- services.AddHostedService<OrderConsumerService>();
这样当程序启动时,将会自动启动消费者,当队列中添加进去消息后,消费者就会自动消费。
当然你可以灵活的配置路由键或者队列来让合适的消费者消费合适的消息。
这只是一个简单的例子,正常工作中肯定逻辑会比这个复杂的多,如何贴合业务去使用这项技术,就需要自己继续探索了。
评价