tnblog
首页
视频
资源
登录

.netcore3.1 RabbitMq RPC

5947人阅读 2021/1/28 16:28 总访问:3467025 评论:0 收藏:0 手机
分类: RabbitMq

.netcore3.1 RabbitMq RPC

如果我们需要在远程计算机上运行功能并等待结果怎么办?这种模式通常称为远程过程调用RPC。我们举一个简单的例子,如下图所示:

这个就图片讲得就很清楚了,我也不用再讲了。

但这就引发了一个新问题,在获取结果队列中收到响应后,尚不清楚响应属于哪个请求。

那就是使用CorrelationId属性的时候 。我们将为每个请求将其设置为唯一值。稍后,当我们在获取结果队列中收到消息时,我们将查看该属性,并基于此属性将响应与请求进行匹配。如果看到未知的CorrelationId值,则可以安全地丢弃该消息-它不属于我们的请求。

RPC服务器端

  1. var factory = new ConnectionFactory()
  2. {
  3. HostName = "47.98.187.188",
  4. UserName = "bob",
  5. Password = "bob"
  6. };
  7. // 创建一个链接
  8. using (var connection = factory.CreateConnection())
  9. {
  10. // 创建一个通道
  11. using (var channel = connection.CreateModel())
  12. {
  13. // 声明离职队列
  14. channel.QueueDeclare(
  15. queue: "leave_rpc_queue",
  16. durable: true,
  17. exclusive: false,
  18. autoDelete: false,
  19. arguments: null);
  20. channel.BasicQos(0, 1, false);
  21. var consumer = new EventingBasicConsumer(channel);
  22. channel.BasicConsume(queue: "leave_rpc_queue", autoAck: false, consumer: consumer);
  23. // 处理事件
  24. consumer.Received += (model, ea) =>
  25. {
  26. string response = null;
  27. // 获取消息
  28. var body = ea.Body.ToArray();
  29. var props = ea.BasicProperties;
  30. var replyProps = channel.CreateBasicProperties();
  31. // 获取客户端的唯一ID
  32. replyProps.CorrelationId = props.CorrelationId;
  33. try
  34. {
  35. // 人事做离职处理
  36. var message = Encoding.UTF8.GetString(body);
  37. Console.WriteLine("HR Handle:{0}", message);
  38. response = "The exit formalities are complete";
  39. }
  40. catch (Exception e)
  41. {
  42. Console.WriteLine("Have Error:" + e.Message);
  43. response = "";
  44. }
  45. finally
  46. {
  47. var responseBytes = Encoding.UTF8.GetBytes(response);
  48. channel.BasicPublish(string.Empty, routingKey: props.ReplyTo,
  49. basicProperties: replyProps, body: responseBytes);
  50. channel.BasicAck(deliveryTag: ea.DeliveryTag,
  51. multiple: false);
  52. }
  53. };
  54. Console.WriteLine(" Press [enter] to exit.");
  55. Console.ReadLine();
  56. }
  57. }

客户端

创建Rpc帮助类

  1. public class RpcClient
  2. {
  3. private readonly IConnection connection;
  4. private readonly IModel channel;
  5. private readonly string replyQueueName;
  6. private readonly EventingBasicConsumer consumer;
  7. // 关于BlockingCollection可以参考这篇:https://www.cnblogs.com/imxiangzi/articles/2801106.html
  8. private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
  9. private readonly IBasicProperties props;
  10. public RpcClient()
  11. {
  12. var factory = new ConnectionFactory()
  13. {
  14. HostName = "47.98.187.188",
  15. UserName = "bob",
  16. Password = "bob"
  17. };
  18. var connection = factory.CreateConnection();
  19. channel = connection.CreateModel();
  20. // 创建临时队列
  21. replyQueueName = channel.QueueDeclare().QueueName;
  22. consumer = new EventingBasicConsumer(channel);
  23. props = channel.CreateBasicProperties();
  24. // 创建 ID
  25. var correlationId = Guid.NewGuid().ToString();
  26. props.CorrelationId = correlationId;
  27. // 回调的队列
  28. props.ReplyTo = replyQueueName;
  29. // 处理离职结果
  30. consumer.Received += Consumer_Received;
  31. }
  32. // 处理离职结果的方法
  33. private void Consumer_Received(object sender, BasicDeliverEventArgs ea)
  34. {
  35. var body = ea.Body.ToArray();
  36. var response = Encoding.UTF8.GetString(body);
  37. if (ea.BasicProperties.CorrelationId == props.CorrelationId)
  38. {
  39. // jiang
  40. respQueue.Add(response);
  41. }
  42. }
  43. /// <summary>
  44. /// 发送消息处理
  45. /// </summary>
  46. /// <param name="message"></param>
  47. /// <returns></returns>
  48. public string Call(string message)
  49. {
  50. // 创建离职消息
  51. var messageBytes = Encoding.UTF8.GetBytes(message);
  52. // 发送离职请求到人事
  53. channel.BasicPublish(
  54. exchange: string.Empty,
  55. routingKey: "leave_rpc_queue",
  56. basicProperties: props,
  57. body: messageBytes);
  58. // 绑定临时队列与处理
  59. channel.BasicConsume(
  60. consumer: consumer,
  61. queue: replyQueueName,
  62. autoAck: true
  63. );
  64. // 从BlockingCollection中移除一个项。
  65. return respQueue.Take();
  66. }
  67. public void Close()
  68. {
  69. connection.Close();
  70. }
  71. }

Main代码

  1. var rpcClient = new RpcClient();
  2. string result = rpcClient.Call("I want to leave");
  3. Console.WriteLine($"Result Handle: {result}");
  4. Console.ReadLine();

运行结果

有关RPC的声明

尽管RPC是计算中非常普遍的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是缓慢的RPC时,就会出现问题。这样的混乱会导致系统变幻莫测,并给调试增加了不必要的复杂性。滥用RPC可能会导致无法维护的意大利面条代码,而不是简化软件。
牢记这一点,请考虑以下建议:

  • 确保明显的是哪个函数调用是本地的,哪个是远程的。
  • 记录您的系统。明确组件之间的依赖关系。
  • 处理错误案例。RPC服务器长时间关闭后,客户端应如何反应?
    如有疑问,请避免使用RPC。如果可以的话,应该使用异步管道-代替类似RPC的阻塞,将结果异步推送到下一个计算阶段。

欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739

评价

net core 使用 EF Code First

下面这些内容很老了看这篇:https://www.tnblog.net/aojiancc2/article/details/5365 项目使用多层,把数据库访问...

.net mvc分部页,.net core分部页

.net分部页的三种方式第一种:@Html.Partial(&quot;_分部页&quot;)第二种:@{ Html.RenderPartial(&quot;分部页&quot;);}...

StackExchange.Redis操作redis(net core支持)

官方git开源地址https://github.com/StackExchange/StackExchange.Redis官方文档在docs里边都是官方的文档通过nuget命令下...

.net core 使用session

tip:net core 2.2后可以直接启用session了,不用在自己添加一次session依赖,本身就添加了使用nuget添加引用Microsoft.AspN...

通俗易懂,什么是.net?什么是.net Framework?什么是.net core?

朋友圈@蓝羽 看到一篇文章写的太详细太通俗了,搬过来细细看完,保证你对.NET有个新的认识理解原文地址:https://www.cnblo...

asp.net core2.0 依赖注入 AddTransient与AddScoped的区别

asp.net core主要提供了三种依赖注入的方式其中AddTransient与AddSingleton比较好区别AddTransient瞬时模式:每次都获取一...

.net core 使用 Kestrel

Kestrel介绍 Kestrel是一个基于libuv的跨平台web服务器 在.net core项目中就可以不一定要发布在iis下面了Kestrel体验可以使...

net core中使用cookie

net core中可以使用传统的cookie也可以使用加密的cookieNET CORE中使用传统cookie设置:HttpContext.Response.Cookies.Appe...

net core项目结构简单分析

一:wwwrootwwwroot用于存放网站的静态资源,例如css,js,图片与相关的前端插件等lib主要是第三方的插件,例如微软默认引用...

net core使用EF之DB First

一.新建一个.net core的MVC项目新建好项目后,不能像以前一样直接在新建项中添加ef了,需要用命令在添加ef的依赖二.使用Nug...

.net core使用requestresponse下载文件下载excel等

使用request获取内容net core中request没有直接的索引方法,需要点里边的Query,或者formstringbase64=Request.Form[&quot;f...

iframe自适应高度与配合net core使用

去掉iframe边框frameborder=&quot;0&quot;去掉滚动条scrolling=&quot;no&quot;iframe 自适应高度如果内容是固定的,那么就...

net core启动报错Unable to configure HTTPS endpoint. No server certificate was specified

这是因为net core2.1默认使用的https,如果使用Kestrel web服务器的话没有安装证书就会报这个错其实仔细看他的错误提示,其...

net core中使用url编码与解码操作

net core中暂时还没有以前asp.net与mvc中的server对象。获取url的编码与解码操作不能使用以前的server对象来获取。使用的是...

下载net core

官方下载地址:https://dotnet.microsoft.com/download 进来之后就可以看到最新的下载版本可以直接点击下载,也可以下载其...

net core使用依赖注入来装载EF的上下文对象

妹子情人节快乐~.net core中用了不少的依赖注入,官方文档中也推荐使用。这样使用依赖注入来管理ef对象,还是比较科学,比如...
这一世以无限游戏为使命!
排名
2
文章
634
粉丝
44
评论
93
docker中Sware集群与service
尘叶心繁 : 想学呀!我教你呀
一个bug让程序员走上法庭 索赔金额达400亿日元
叼着奶瓶逛酒吧 : 所以说做程序员也要懂点法律知识
.net core 塑形资源
剑轩 : 收藏收藏
映射AutoMapper
剑轩 : 好是好,这个对效率影响大不大哇,效率高不高
ASP.NET Core 服务注册生命周期
剑轩 : http://www.tnblog.net/aojiancc2/article/details/167
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:50010702506256
欢迎加群交流技术