Dapr .NetCore Actor
Actor简介
简单来讲就是锁的作用,可以用作单线程调用实例,起到加锁的效果。解决了并发抢票的。
比如:抢车票的应用场景。
工作原理
Dapr启动app时,Sidecar调用Actors获取配置信息,之后Sidecar将Actors的信息发送到 安置服务(Placement Service),安置服务会将不同的Actor类型根据其Id和Actor类型分区,并将Actor信息广播到所有dapr实例。
在客户端调用某个Actor时,安置服务会根据其Id和Actor类型,找到其所在的dapr实例,并执行其方法。
调用Actors方法
POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/method/<method>
参数名 | 描述 |
---|---|
actorType |
执行组件类型。 |
actorId |
要调用的特定参与者的。 |
method |
要调用的方法。 |
抢订单Demo
我们通过100个线程并发访问来进行抢10个商品多案例。
创建ActorDaprManager项目
首先安装相关依赖
<PackageReference Include="Dapr.Actors.AspNetCore" Version="1.4.0" />
<PackageReference Include="Dapr.AspNetCore" Version="1.4.0" />
安装其他依赖项
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="ServiceStack.Redis" Version="5.13.2" />
<PackageReference Include="StackExchange.Redis" Version="2.2.88" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.2.3" />
创建 Actor
public interface IDemoActor : IActor
{
/// <summary>
/// 初始化10个库存
/// </summary>
/// <returns></returns>
public Task Init();
/// <summary>
/// 获取库存
/// </summary>
/// <returns></returns>
public Task<long> GetInventory();
/// <summary>
/// 下订单
/// </summary>
/// <returns></returns>
public Task<long> SetOrder();
}
public class DemoActor : Actor, IDemoActor
{
RedisHelper _RedisHelper;
public DemoActor(ActorHost host, RedisHelper redisHelper) : base(host)
{
_RedisHelper = redisHelper;
}
/// <summary>
/// 初始化库存
/// </summary>
/// <returns></returns>
public async Task Init()
{
await StateManager.SetStateAsync("init", "init");
await _RedisHelper.GetDatabase().StringSetAsync("InventoryNum", 10);
await _RedisHelper.GetDatabase().StringSetAsync("OrderNum", 0);
}
/// <summary>
/// 获取剩余库存
/// </summary>
/// <returns></returns>
public Task<long> GetInventory()
{
var result = _RedisHelper.GetDatabase().StringGet("InventoryNum").ConvertTo<long>();
return Task.FromResult(result);
}
/// <summary>
/// 下订单
/// </summary>
/// <returns></returns>
public Task<long> SetOrder()
{
var result = GetInventory().Result;
if (result > 0)
{
_RedisHelper.GetDatabase().StringIncrement("OrderNum");
return _RedisHelper.GetDatabase().StringDecrementAsync("InventoryNum");
}
return Task.FromResult(long.Parse("-1"));
}
}
关于StateManager存储默认是由~/.dapr/components/statestore.yaml
指定的,一般都是Redis做存储,要使用时还需启动statestore.yaml
中的actorStateStore
参数才行。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"
除此之外,还需要创建RedisHelper,其实也可以换成其他的存储。
public class RedisHelper
{
/// <summary>
/// Redis操作对象
/// </summary>
protected readonly IDatabase redis = null;
/// <summary>
/// 初始化Redis操作方法基础类
/// </summary>
/// <param name="dbNum"></param>
public RedisHelper(string connstr = "127.0.0.1:6380", int dbNum = 0)
{
redis = ConnectionMultiplexer.Connect(connstr).GetDatabase(dbNum);
}
public IDatabase GetDatabase()
{
return redis;
}
}
public static class IntTool
{
/// <summary>
/// 将值反系列化成对象
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="value"></param>
/// <returns></returns>
public static T ConvertObj<T>(this RedisValue value)
{
return value.IsNullOrEmpty ? default(T) : JsonConvert.DeserializeObject<T>(value);
}
}
注意Redis的Ip与端口
定义了三个方法
方法名 | 描述 |
---|---|
Init |
初始化库存10个 |
GetInventory |
获取剩余库存 |
SetOrder |
下订单 |
注册相关服务
public IConfiguration Configuration { get; }
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<RedisHelper, RedisHelper>();
services.AddControllers().AddDapr();
services.AddActors(option => {
option.Actors.RegisterActor<DemoActor>();
});
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "HMY", Version = "v1" });
});
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UseDeveloperExceptionPage();
app.UseSwagger();
app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "HMY v1"));
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapActorsHandlers();
endpoints.MapControllers();
});
}
添加DemoController
控制器
[ApiController]
public class DemoController:Controller
{
[HttpPost("Init")]
public void Init()
{
var actorinit = GetDemo_Actor(Guid.NewGuid().ToString());
actorinit.Init().Wait();
}
/// <summary>
/// 不同的
/// </summary>
/// <returns></returns>
[HttpPost("DemoDifferentActor")]
public string DemoDifferentActor()
{
var actor = GetDemo_Actor(Guid.NewGuid().ToString());
actor.SetOrder();
return "ok";
}
/// <summary>
/// 同一个
/// </summary>
/// <returns></returns>
[HttpPost("DemoSameActor")]
public string DemoSameActor()
{
var actor = actorid;
actor.SetOrder();
return "ok";
}
public static IDemoActor actorid = GetDemo_Actor("bcd");
public static IDemoActor GetDemo_Actor(string actorid)
{
var actorId = new ActorId(actorid.ToString());
return ActorProxy.Create<IDemoActor>(actorId, nameof(ActorDaprManager.Actors.DemoActor));
}
}
我们定义了三个方法,Init用作初始化,DemoDifferentActor是不同的actorId创建出的不同实例,DemoSameActor是相同的actorId创建的相同的实例。
运行并初始化数据
dapr run --app-id myapp --app-port 5000 --dapr-http-port 3500 -- dotnet run
在我们的Swagger中,调用Demo控制器中的Init方法。
Jmeter配置
并发调用100个线程同时请求10个商品的情况
调用不同Actor进行测试
首先我们看到每一秒处理4347个请求这个性能还是很高的。接下来我们来看看请求后的数据结果。
我们发现购买时已经超过了我们原有的库存量,发生并发问题。可以通过下图来进行理解。
调用相同Actor进行测试
首先需要初始化应用数据,自己调吧!就Init那方法。
然后进行调用相同的ActorId
我们发现它执行时加上锁了的。执行流程如下图所示。
Timer的使用
定时触发Actor中的某个方法。
结合上面案例添加相关接口与代码的实现。
public interface IDemoActor : IActor
{
...
/// <summary>
/// 注册Timer
/// </summary>
/// <returns></returns>
public Task RegisterTime();
/// <summary>
/// 注销Timer
/// </summary>
/// <returns></returns>
public Task UnRegisterTime();
/// <summary>
/// Timer回调
/// </summary>
/// <returns></returns>
public Task TimerCallBack();
}
RegisterTime
方法设置第一次间隔5秒执行一次TimerCallBack随后每5秒调用一次,并将timer命名为Test。UnregisterTimerAsync
方法注销Test Timer。TimerCallBack
仅做一个日志打印。
public class DemoActor : Actor, IDemoActor
{
RedisHelper _RedisHelper;
ILogger<DemoActor> _log;
public DemoActor(
ActorHost host,
RedisHelper redisHelper,
ILogger<DemoActor> log
) : base(host)
{
_log = log;
_RedisHelper = redisHelper;
}
/// <summary>
/// 初始化库存
/// </summary>
/// <returns></returns>
public async Task Init()
{
await StateManager.SetStateAsync("init", "init");
await _RedisHelper.GetDatabase().StringSetAsync("InventoryNum", 10);
await _RedisHelper.GetDatabase().StringSetAsync("OrderNum", 0);
}
/// <summary>
/// 获取剩余库存
/// </summary>
/// <returns></returns>
public Task<long> GetInventory()
{
var result = _RedisHelper.GetDatabase().StringGet("InventoryNum").ConvertTo<long>();
return Task.FromResult(result);
}
/// <summary>
/// 下订单
/// </summary>
/// <returns></returns>
public Task<long> SetOrder()
{
var result = GetInventory().Result;
if (result > 0)
{
_RedisHelper.GetDatabase().StringIncrement("OrderNum");
return _RedisHelper.GetDatabase().StringDecrementAsync("InventoryNum");
}
return Task.FromResult(long.Parse("-1"));
}
public Task RegisterTime()
{
return this.RegisterTimerAsync("Test",nameof(this.TimerCallBack),null,TimeSpan.FromSeconds(5),TimeSpan.FromSeconds(5));
}
public Task UnRegisterTime()
{
return this.UnregisterTimerAsync("Test");
}
public Task TimerCallBack()
{
_log.LogInformation($"[{DateTime.Now}]--Thread: {Thread.CurrentThread.ManagedThreadId} Log");
return Task.CompletedTask;
}
}
在控制器中添加对应的方法。
/// <summary>
/// 启动Timer
/// </summary>
/// <returns></returns>
[HttpPost("RegisterTime")]
public string RegisterTime()
{
actorid.RegisterTime();
return "ok";
}
/// <summary>
/// 注销Timer
/// </summary>
/// <returns></returns>
[HttpPost("UnRegisterTime")]
public string UnRegisterTime()
{
actorid.UnRegisterTime();
return "ok";
}
运行测试
先启动该Timer。
查看定时日志输出情况。
最后注销该Timer。
Reminder
与Timer计算器的功能是一样的,只是当Actor实例释放或者调度到其他主机上时,Reminder将会继续开启调度,并因此重新创建actor。
/// <summary>
/// 注册Reminder
/// </summary>
/// <returns></returns>
public Task RegisterReminder();
/// <summary>
/// 注销Reminder
/// </summary>
/// <returns></returns>
public Task UnRegisterReminder();
在DemoActor中实现IRemindable接口
public class DemoActor : Actor, IDemoActor,IRemindable
public Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
{
_log.LogInformation($"Reminder [{DateTime.Now}]--Thread: {Thread.CurrentThread.ManagedThreadId} Log");
return Task.CompletedTask;
}
public Task RegisterReminder()
{
_log.LogInformation("Reminder 已经注册");
return this.RegisterReminderAsync("TestReminder", null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(2));
}
public Task UnRegisterReminder()
{
_log.LogInformation("Reminder 已经注销");
return this.UnregisterReminderAsync("TestReminder");
}
并在控制器中定义相关接口
/// <summary>
/// 启动Reminder
/// </summary>
/// <returns></returns>
[HttpPost("RegisterReminder")]
public string RegisterReminder()
{
actorid.RegisterReminder();
return "ok";
}
/// <summary>
/// 注销Reminder
/// </summary>
/// <returns></returns>
[HttpPost("UnRegisterReminder")]
public string UnRegisterReminder()
{
actorid.UnRegisterReminder();
return "ok";
}
测试附上一张图
欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739