tnblog
首页
视频
资源
登录

Dapr .NetCore Actor

4186人阅读 2021/12/20 15:28 总访问:3252775 评论:0 收藏:0 手机
分类: .net后台框架

Dapr .NetCore Actor

Actor简介


简单来讲就是锁的作用,可以用作单线程调用实例,起到加锁的效果。解决了并发抢票的。
比如:抢车票的应用场景。

工作原理


Dapr启动app时,Sidecar调用Actors获取配置信息,之后Sidecar将Actors的信息发送到 安置服务(Placement Service),安置服务会将不同的Actor类型根据其Id和Actor类型分区,并将Actor信息广播到所有dapr实例。


在客户端调用某个Actor时,安置服务会根据其Id和Actor类型,找到其所在的dapr实例,并执行其方法。

调用Actors方法

  1. POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/method/<method>
参数名 描述
actorType 执行组件类型。
actorId 要调用的特定参与者的。
method 要调用的方法。

抢订单Demo


我们通过100个线程并发访问来进行抢10个商品多案例。

创建ActorDaprManager项目


首先安装相关依赖

  1. <PackageReference Include="Dapr.Actors.AspNetCore" Version="1.4.0" />
  2. <PackageReference Include="Dapr.AspNetCore" Version="1.4.0" />


安装其他依赖项

  1. <PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
  2. <PackageReference Include="ServiceStack.Redis" Version="5.13.2" />
  3. <PackageReference Include="StackExchange.Redis" Version="2.2.88" />
  4. <PackageReference Include="Swashbuckle.AspNetCore" Version="6.2.3" />

创建 Actor

  1. public interface IDemoActor : IActor
  2. {
  3. /// <summary>
  4. /// 初始化10个库存
  5. /// </summary>
  6. /// <returns></returns>
  7. public Task Init();
  8. /// <summary>
  9. /// 获取库存
  10. /// </summary>
  11. /// <returns></returns>
  12. public Task<long> GetInventory();
  13. /// <summary>
  14. /// 下订单
  15. /// </summary>
  16. /// <returns></returns>
  17. public Task<long> SetOrder();
  18. }
  1. public class DemoActor : Actor, IDemoActor
  2. {
  3. RedisHelper _RedisHelper;
  4. public DemoActor(ActorHost host, RedisHelper redisHelper) : base(host)
  5. {
  6. _RedisHelper = redisHelper;
  7. }
  8. /// <summary>
  9. /// 初始化库存
  10. /// </summary>
  11. /// <returns></returns>
  12. public async Task Init()
  13. {
  14. await StateManager.SetStateAsync("init", "init");
  15. await _RedisHelper.GetDatabase().StringSetAsync("InventoryNum", 10);
  16. await _RedisHelper.GetDatabase().StringSetAsync("OrderNum", 0);
  17. }
  18. /// <summary>
  19. /// 获取剩余库存
  20. /// </summary>
  21. /// <returns></returns>
  22. public Task<long> GetInventory()
  23. {
  24. var result = _RedisHelper.GetDatabase().StringGet("InventoryNum").ConvertTo<long>();
  25. return Task.FromResult(result);
  26. }
  27. /// <summary>
  28. /// 下订单
  29. /// </summary>
  30. /// <returns></returns>
  31. public Task<long> SetOrder()
  32. {
  33. var result = GetInventory().Result;
  34. if (result > 0)
  35. {
  36. _RedisHelper.GetDatabase().StringIncrement("OrderNum");
  37. return _RedisHelper.GetDatabase().StringDecrementAsync("InventoryNum");
  38. }
  39. return Task.FromResult(long.Parse("-1"));
  40. }
  41. }


关于StateManager存储默认是由~/.dapr/components/statestore.yaml指定的,一般都是Redis做存储,要使用时还需启动statestore.yaml中的actorStateStore参数才行。

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: statestore
  5. spec:
  6. type: state.redis
  7. version: v1
  8. metadata:
  9. - name: redisHost
  10. value: localhost:6379
  11. - name: redisPassword
  12. value: ""
  13. - name: actorStateStore
  14. value: "true"


除此之外,还需要创建RedisHelper,其实也可以换成其他的存储。

  1. public class RedisHelper
  2. {
  3. /// <summary>
  4. /// Redis操作对象
  5. /// </summary>
  6. protected readonly IDatabase redis = null;
  7. /// <summary>
  8. /// 初始化Redis操作方法基础类
  9. /// </summary>
  10. /// <param name="dbNum"></param>
  11. public RedisHelper(string connstr = "127.0.0.1:6380", int dbNum = 0)
  12. {
  13. redis = ConnectionMultiplexer.Connect(connstr).GetDatabase(dbNum);
  14. }
  15. public IDatabase GetDatabase()
  16. {
  17. return redis;
  18. }
  19. }
  20. public static class IntTool
  21. {
  22. /// <summary>
  23. /// 将值反系列化成对象
  24. /// </summary>
  25. /// <typeparam name="T"></typeparam>
  26. /// <param name="value"></param>
  27. /// <returns></returns>
  28. public static T ConvertObj<T>(this RedisValue value)
  29. {
  30. return value.IsNullOrEmpty ? default(T) : JsonConvert.DeserializeObject<T>(value);
  31. }
  32. }

注意Redis的Ip与端口


定义了三个方法

方法名 描述
Init 初始化库存10个
GetInventory 获取剩余库存
SetOrder 下订单


注册相关服务

  1. public IConfiguration Configuration { get; }
  2. public void ConfigureServices(IServiceCollection services)
  3. {
  4. services.AddSingleton<RedisHelper, RedisHelper>();
  5. services.AddControllers().AddDapr();
  6. services.AddActors(option => {
  7. option.Actors.RegisterActor<DemoActor>();
  8. });
  9. services.AddSwaggerGen(c =>
  10. {
  11. c.SwaggerDoc("v1", new OpenApiInfo { Title = "HMY", Version = "v1" });
  12. });
  13. }
  14. public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
  15. {
  16. app.UseDeveloperExceptionPage();
  17. app.UseSwagger();
  18. app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "HMY v1"));
  19. app.UseRouting();
  20. app.UseAuthorization();
  21. app.UseEndpoints(endpoints =>
  22. {
  23. endpoints.MapActorsHandlers();
  24. endpoints.MapControllers();
  25. });
  26. }


添加DemoController控制器

  1. [ApiController]
  2. public class DemoController:Controller
  3. {
  4. [HttpPost("Init")]
  5. public void Init()
  6. {
  7. var actorinit = GetDemo_Actor(Guid.NewGuid().ToString());
  8. actorinit.Init().Wait();
  9. }
  10. /// <summary>
  11. /// 不同的
  12. /// </summary>
  13. /// <returns></returns>
  14. [HttpPost("DemoDifferentActor")]
  15. public string DemoDifferentActor()
  16. {
  17. var actor = GetDemo_Actor(Guid.NewGuid().ToString());
  18. actor.SetOrder();
  19. return "ok";
  20. }
  21. /// <summary>
  22. /// 同一个
  23. /// </summary>
  24. /// <returns></returns>
  25. [HttpPost("DemoSameActor")]
  26. public string DemoSameActor()
  27. {
  28. var actor = actorid;
  29. actor.SetOrder();
  30. return "ok";
  31. }
  32. public static IDemoActor actorid = GetDemo_Actor("bcd");
  33. public static IDemoActor GetDemo_Actor(string actorid)
  34. {
  35. var actorId = new ActorId(actorid.ToString());
  36. return ActorProxy.Create<IDemoActor>(actorId, nameof(ActorDaprManager.Actors.DemoActor));
  37. }
  38. }


我们定义了三个方法,Init用作初始化,DemoDifferentActor是不同的actorId创建出的不同实例,DemoSameActor是相同的actorId创建的相同的实例。

运行并初始化数据

  1. dapr run --app-id myapp --app-port 5000 --dapr-http-port 3500 -- dotnet run


在我们的Swagger中,调用Demo控制器中的Init方法。

Jmeter配置


并发调用100个线程同时请求10个商品的情况

调用不同Actor进行测试


首先我们看到每一秒处理4347个请求这个性能还是很高的。接下来我们来看看请求后的数据结果。


我们发现购买时已经超过了我们原有的库存量,发生并发问题。可以通过下图来进行理解。

调用相同Actor进行测试


首先需要初始化应用数据,自己调吧!就Init那方法。
然后进行调用相同的ActorId


我们发现它执行时加上锁了的。执行流程如下图所示。

Timer的使用


定时触发Actor中的某个方法。
结合上面案例添加相关接口与代码的实现。

  1. public interface IDemoActor : IActor
  2. {
  3. ...
  4. /// <summary>
  5. /// 注册Timer
  6. /// </summary>
  7. /// <returns></returns>
  8. public Task RegisterTime();
  9. /// <summary>
  10. /// 注销Timer
  11. /// </summary>
  12. /// <returns></returns>
  13. public Task UnRegisterTime();
  14. /// <summary>
  15. /// Timer回调
  16. /// </summary>
  17. /// <returns></returns>
  18. public Task TimerCallBack();
  19. }


RegisterTime方法设置第一次间隔5秒执行一次TimerCallBack随后每5秒调用一次,并将timer命名为Test。
UnregisterTimerAsync方法注销Test Timer。
TimerCallBack仅做一个日志打印。

  1. public class DemoActor : Actor, IDemoActor
  2. {
  3. RedisHelper _RedisHelper;
  4. ILogger<DemoActor> _log;
  5. public DemoActor(
  6. ActorHost host,
  7. RedisHelper redisHelper,
  8. ILogger<DemoActor> log
  9. ) : base(host)
  10. {
  11. _log = log;
  12. _RedisHelper = redisHelper;
  13. }
  14. /// <summary>
  15. /// 初始化库存
  16. /// </summary>
  17. /// <returns></returns>
  18. public async Task Init()
  19. {
  20. await StateManager.SetStateAsync("init", "init");
  21. await _RedisHelper.GetDatabase().StringSetAsync("InventoryNum", 10);
  22. await _RedisHelper.GetDatabase().StringSetAsync("OrderNum", 0);
  23. }
  24. /// <summary>
  25. /// 获取剩余库存
  26. /// </summary>
  27. /// <returns></returns>
  28. public Task<long> GetInventory()
  29. {
  30. var result = _RedisHelper.GetDatabase().StringGet("InventoryNum").ConvertTo<long>();
  31. return Task.FromResult(result);
  32. }
  33. /// <summary>
  34. /// 下订单
  35. /// </summary>
  36. /// <returns></returns>
  37. public Task<long> SetOrder()
  38. {
  39. var result = GetInventory().Result;
  40. if (result > 0)
  41. {
  42. _RedisHelper.GetDatabase().StringIncrement("OrderNum");
  43. return _RedisHelper.GetDatabase().StringDecrementAsync("InventoryNum");
  44. }
  45. return Task.FromResult(long.Parse("-1"));
  46. }
  47. public Task RegisterTime()
  48. {
  49. return this.RegisterTimerAsync("Test",nameof(this.TimerCallBack),null,TimeSpan.FromSeconds(5),TimeSpan.FromSeconds(5));
  50. }
  51. public Task UnRegisterTime()
  52. {
  53. return this.UnregisterTimerAsync("Test");
  54. }
  55. public Task TimerCallBack()
  56. {
  57. _log.LogInformation($"[{DateTime.Now}]--Thread: {Thread.CurrentThread.ManagedThreadId} Log");
  58. return Task.CompletedTask;
  59. }
  60. }


在控制器中添加对应的方法。

  1. /// <summary>
  2. /// 启动Timer
  3. /// </summary>
  4. /// <returns></returns>
  5. [HttpPost("RegisterTime")]
  6. public string RegisterTime()
  7. {
  8. actorid.RegisterTime();
  9. return "ok";
  10. }
  11. /// <summary>
  12. /// 注销Timer
  13. /// </summary>
  14. /// <returns></returns>
  15. [HttpPost("UnRegisterTime")]
  16. public string UnRegisterTime()
  17. {
  18. actorid.UnRegisterTime();
  19. return "ok";
  20. }

运行测试


先启动该Timer。


查看定时日志输出情况。


最后注销该Timer。

Reminder


与Timer计算器的功能是一样的,只是当Actor实例释放或者调度到其他主机上时,Reminder将会继续开启调度,并因此重新创建actor。

  1. /// <summary>
  2. /// 注册Reminder
  3. /// </summary>
  4. /// <returns></returns>
  5. public Task RegisterReminder();
  6. /// <summary>
  7. /// 注销Reminder
  8. /// </summary>
  9. /// <returns></returns>
  10. public Task UnRegisterReminder();


在DemoActor中实现IRemindable接口

  1. public class DemoActor : Actor, IDemoActor,IRemindable
  1. public Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
  2. {
  3. _log.LogInformation($"Reminder [{DateTime.Now}]--Thread: {Thread.CurrentThread.ManagedThreadId} Log");
  4. return Task.CompletedTask;
  5. }
  6. public Task RegisterReminder()
  7. {
  8. _log.LogInformation("Reminder 已经注册");
  9. return this.RegisterReminderAsync("TestReminder", null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(2));
  10. }
  11. public Task UnRegisterReminder()
  12. {
  13. _log.LogInformation("Reminder 已经注销");
  14. return this.UnregisterReminderAsync("TestReminder");
  15. }


并在控制器中定义相关接口

  1. /// <summary>
  2. /// 启动Reminder
  3. /// </summary>
  4. /// <returns></returns>
  5. [HttpPost("RegisterReminder")]
  6. public string RegisterReminder()
  7. {
  8. actorid.RegisterReminder();
  9. return "ok";
  10. }
  11. /// <summary>
  12. /// 注销Reminder
  13. /// </summary>
  14. /// <returns></returns>
  15. [HttpPost("UnRegisterReminder")]
  16. public string UnRegisterReminder()
  17. {
  18. actorid.UnRegisterReminder();
  19. return "ok";
  20. }


测试附上一张图


欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739

评价
这一世以无限游戏为使命!
排名
8
文章
191
粉丝
7
评论
7
docker中Sware集群与service
尘叶心繁 : 想学呀!我教你呀
一个bug让程序员走上法庭 索赔金额达400亿日元
叼着奶瓶逛酒吧 : 所以说做程序员也要懂点法律知识
.net core 塑形资源
剑轩 : 收藏收藏
映射AutoMapper
剑轩 : 好是好,这个对效率影响大不大哇,效率高不高
ASP.NET Core 服务注册生命周期
剑轩 : http://www.tnblog.net/aojiancc2/article/details/167
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:50010702506256
欢迎加群交流技术