
.netcore3.1 RabbitMq RPC
如果我们需要在远程计算机上运行功能并等待结果怎么办?这种模式通常称为远程过程调用
或RPC
。我们举一个简单的例子,如下图所示:
这个就图片讲得就很清楚了,我也不用再讲了。
但这就引发了一个新问题,在获取结果队列中收到响应后,尚不清楚响应属于哪个请求。
那就是使用CorrelationId
属性的时候 。我们将为每个请求将其设置为唯一值。稍后,当我们在获取结果队列中收到消息时,我们将查看该属性,并基于此属性将响应与请求进行匹配。如果看到未知的CorrelationId
值,则可以安全地丢弃该消息-它不属于我们的请求。
RPC服务器端
var factory = new ConnectionFactory()
{
HostName = "47.98.187.188",
UserName = "bob",
Password = "bob"
};
// 创建一个链接
using (var connection = factory.CreateConnection())
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
// 声明离职队列
channel.QueueDeclare(
queue: "leave_rpc_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "leave_rpc_queue", autoAck: false, consumer: consumer);
// 处理事件
consumer.Received += (model, ea) =>
{
string response = null;
// 获取消息
var body = ea.Body.ToArray();
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
// 获取客户端的唯一ID
replyProps.CorrelationId = props.CorrelationId;
try
{
// 人事做离职处理
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("HR Handle:{0}", message);
response = "The exit formalities are complete";
}
catch (Exception e)
{
Console.WriteLine("Have Error:" + e.Message);
response = "";
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(string.Empty, routingKey: props.ReplyTo,
basicProperties: replyProps, body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
}
};
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
客户端
创建Rpc帮助类
public class RpcClient
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
// 关于BlockingCollection可以参考这篇:https://www.cnblogs.com/imxiangzi/articles/2801106.html
private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
private readonly IBasicProperties props;
public RpcClient()
{
var factory = new ConnectionFactory()
{
HostName = "47.98.187.188",
UserName = "bob",
Password = "bob"
};
var connection = factory.CreateConnection();
channel = connection.CreateModel();
// 创建临时队列
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);
props = channel.CreateBasicProperties();
// 创建 ID
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
// 回调的队列
props.ReplyTo = replyQueueName;
// 处理离职结果
consumer.Received += Consumer_Received;
}
// 处理离职结果的方法
private void Consumer_Received(object sender, BasicDeliverEventArgs ea)
{
var body = ea.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
if (ea.BasicProperties.CorrelationId == props.CorrelationId)
{
// jiang
respQueue.Add(response);
}
}
/// <summary>
/// 发送消息处理
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public string Call(string message)
{
// 创建离职消息
var messageBytes = Encoding.UTF8.GetBytes(message);
// 发送离职请求到人事
channel.BasicPublish(
exchange: string.Empty,
routingKey: "leave_rpc_queue",
basicProperties: props,
body: messageBytes);
// 绑定临时队列与处理
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true
);
// 从BlockingCollection中移除一个项。
return respQueue.Take();
}
public void Close()
{
connection.Close();
}
}
Main代码
var rpcClient = new RpcClient();
string result = rpcClient.Call("I want to leave");
Console.WriteLine($"Result Handle: {result}");
Console.ReadLine();
运行结果
有关RPC的声明
尽管RPC是计算中非常普遍的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是缓慢的RPC时,就会出现问题。这样的混乱会导致系统变幻莫测,并给调试增加了不必要的复杂性。滥用RPC可能会导致无法维护的意大利面条代码,而不是简化软件。
牢记这一点,请考虑以下建议:
- 确保明显的是哪个函数调用是本地的,哪个是远程的。
- 记录您的系统。明确组件之间的依赖关系。
- 处理错误案例。RPC服务器长时间关闭后,客户端应如何反应?
如有疑问,请避免使用RPC。如果可以的话,应该使用异步管道-代替类似RPC的阻塞,将结果异步推送到下一个计算阶段。
欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739
评价
排名
2
文章
634
粉丝
44
评论
93
docker中Sware集群与service
尘叶心繁 : 想学呀!我教你呀
一个bug让程序员走上法庭 索赔金额达400亿日元
叼着奶瓶逛酒吧 : 所以说做程序员也要懂点法律知识
.net core 塑形资源
剑轩 : 收藏收藏
映射AutoMapper
剑轩 :
好是好,这个对效率影响大不大哇,效率高不高
一个bug让程序员走上法庭 索赔金额达400亿日元
剑轩 : 有点可怕
ASP.NET Core 服务注册生命周期
剑轩 :
http://www.tnblog.net/aojiancc2/article/details/167
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:
50010702506256


欢迎加群交流技术