
RabbitMq集群镜像同步与应用
Queue存放问题
队列在集群中存储有两种模式。第一种是default
模式,它就是消息队列存储再哪儿就在哪儿。第二种是mirror
模式,通过镜像的方式将master
的消息队列映射到各个slave
中去,消息在每个节点都有,实现高可用的方式。
启动镜像
队列已通过policy
启用了镜像。policy
可以随时更改;创建一个非镜像队列是有效的,然后在以后再进行镜像(反之亦然)。非镜像队列与不具有任何镜像的镜像队列之间是有区别的-前者缺乏额外的镜像基础结构,可能会提供更高的吞吐量。
设置为ha-all政策
我们做一个简单的测试简单的为test
开头的队列把参数ha-mode
设置为all
模式,该模式可以将队列镜像到所有的rabbitmq
集群中。针对不同的系统,多少会有一些差异,我会列一张表在下面。重点我们放在WebUI的操作上进行。
系统 | 操作 |
---|---|
rabbitmqctl | rabbitmqctl set_policy ha-all "^test" '{"ha-mode":"all"}' |
rabbitmqctl (Windows) | rabbitmqctl.bat set_policy ha-all "^test" "{""ha-mode"":""all""}" |
HTTP API | PUT /api/policies/%2f/ha-all {"pattern":"^test", "definition":{"ha-mode":"all"}} |
接着我就在这queue
页面上面创建一个test1
队列。
我们发现这里有个+2
表示已经有两台子机器已经开启了镜像。
问题1
但我们会产生一个问题,当一台机子发生了故障挂掉了,但又有新的消息发布到队列中来,然后那台机子又启动起来后是否会同步消息?我们来测试一下,先通过./rabbitmqctl stop_app
关闭一台机器。
然后我们可以看见这里只有一个+1
了。
接着我们在我们的队列中添加一条信息
然后将再次启动slave节点,我们就会发现它的数据并没有及时更新,是落后的。在图表中也出现了红色的+1
。
自动同步策略
我们可以通过参数ha-sync-mode: automatic
设置为自动同步。我们先删除原来的策略policy
再创建新的policy
并把这个参数带上,随后立即生效。
指定随机机器同步策略
在不同的系统上随机策略也有所不同,我也为大家列一张表。这里随机两台机器做自动同步队列与消息。
系统 | 操作 |
---|---|
rabbitmqctl | rabbitmqctl set_policy ha-two "^test" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' |
rabbitmqctl (Windows) | rabbitmqctl.bat set_policy ha-two "^test" ^"{""ha-mode"":""exactly"",""ha-params"":2,""ha-sync-mode"":""automatic""}" |
HTTP API
PUT /api/policies/%2f/ha-two
{
"pattern":"^test",
"definition": {
"ha-mode":"exactly",
"ha-params":2,
"ha-sync-mode":"automatic"
}
}
同样我们先删除policy
政策,在创建一个组的策略,并随机两台机器做自动同步队列,交换机与消息。(添加好后立即生效,我们在UI中也发现+2
变+1
了)
指定节点同步策略
我们通过node参数还可以指定集群中的任意节点,当我们指定rabbit@rabbitmqmaster
与rabbit@rabbitmqslave1
可以按照如下进行操作
系统 | 操作 |
---|---|
rabbitmqctl | rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@rabbitmqmaster", "rabbit@rabbitmqslave1"]}' |
rabbitmqctl (Windows) | rabbitmqctl set_policy ha-nodes "^nodes\." "{""ha-mode"":""nodes"",""ha-params"":[""rabbit@rabbitmqmaster"", ""rabbit@rabbitmqslave1""]}" |
.netcore重试连接集群
我们通过发布消息的方式将发布到队列中去,但在这之前我们需要了解两个连接对象里面的参数。
参数 | 描述 |
---|---|
AutomaticRecoveryEnabled |
如果连接断开,底层会尝试去重连rabbitmq集群… |
TopologyRecoveryEnabled |
当connection 连接成功了,里面的exchange ,queue ,binding 都要恢复原样。。 |
创建代码,并连接到这些节点中去。
var factory = new ConnectionFactory()
{
UserName = "bob",
Password = "bob",
AutomaticRecoveryEnabled = true,
TopologyRecoveryEnabled = true
};
// 创建一个链接
using (var connection = factory.CreateConnection(new string[] { "10.211.55.4", "10.211.55.5", "10.211.55.6" }))
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
var properties = channel.CreateBasicProperties();
// 我们创建15条消息
for (var i = 0; i < int.MaxValue; i++)
{
try
{
channel.BasicPublish(
exchange: string.Empty,
routingKey: "test1",
basicProperties: properties,
body: Encoding.UTF8.GetBytes($"新消息{i}")
);
System.Console.WriteLine($"Publish: 新消息{i}");
}
catch (Exception ex)
{
System.Console.WriteLine($"[ERROR]:{ex.Message}");
}
finally
{
Thread.Sleep(3000);
}
}
Console.WriteLine("Finish");
Console.ReadLine();
}
}
接着我们来集群中查看它连上哪台机子上了。我们发现它连上了rabbit@rabbitmqslave2
这台机子
当我们让rabbit@rabbitmqslave2
挂掉会发生什么事。
我们发现虽然它挂掉连但我们的客户端仍然好好的。今天就讲这么多,我们休息一下。
欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739

