tnblog
首页
视频
资源
登录

Dapr .NetCore 订阅与发布(上)

7838人阅读 2021/10/17 18:15 总访问:3467144 评论:0 收藏:1 手机
分类: .net后台框架

Dapr .NetCore 订阅与发布

介绍


Pub/Sub 是分布式系统中的一种常见模式,具有许多想要利用解耦异步消息传递的服务。使用 Pub/Sub,您可以启用事件使用者与事件生产者分离的场景。
Dapr 提供了一个具有 At-Least-Once 保证的可扩展 Pub/Sub 系统,允许开发者发布和订阅主题。Dapr 为 pub/sub 提供组件,使运营商能够使用他们喜欢的基础设施,例如 Redis Streams、Kafka 等。

工作原理


Dapr 发布&订阅构建基块提供了一个与平台无关的 API 框架来发送和接收消息。
服务将消息发布到指定主题, 业务服务订阅主题以使用消息。
服务在 Dapr sidecar 上调用 pub/sub API。 然后,sidecar 调用预定义 Dapr pub/sub 组件。
任何编程平台都可以使用 Dapr 本机 API 通过 HTTP 或 gRPC 调用构建基块。 若要发布消息,请进行以下 API 调用:

  1. http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic>


上述调用中有几个特定于 Dapr 的 URL 段:
<dapr-port> 提供 Dapr sidecar 侦听的端口号。
<pub-sub-name> 提供所选 Dapr pub/sub 组件的名称。
<topic> 提供消息发布到的主题的名称。

设置 Pub/Sub 组件


Redis Streams 在运行时默认安装在本地机器上dapr init
通过%UserProfile%\.dapr\components\pubsub.yaml在 Windows 或~/.dapr/components/pubsub.yamlLinux/MacOS上打开您的组件文件进行验证:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: pubsub
  5. spec:
  6. type: pubsub.redis
  7. version: v1
  8. metadata:
  9. - name: redisHost
  10. value: localhost:6379
  11. - name: redisPassword
  12. value: ""

订阅主题


Dapr 支持两种订阅主题的方法:
——声明性地,订阅是在外部文件中定义的。
——以编程方式,其中订阅在用户代码中定义。

声明式订阅


.dapr/components目录下,创建subscription.yaml来自定义资源定义 (CRD) 订阅主题。

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Subscription
  3. metadata:
  4. name: mydepositevent-subscription
  5. spec:
  6. topic: deposit
  7. route: /TopicSub
  8. pubsubname: pubsub
  9. scopes:
  10. - topicserver


depositpubsub组件的事件订阅者。
route 告诉 Dapr 将所有主题消息发送到应用程序中的 /TopicSub 端点。
scopes 为 Id是 topicserver启用订阅

代码定义


创建InvokeMethodServer API 项目,配置与依赖如下。

  1. <ItemGroup>
  2. <PackageReference Include="Dapr.AspNetCore" Version="1.4.0" />
  3. <PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
  4. </ItemGroup>
  1. services.AddControllers().AddDapr();


InvokeMethodServer项目中创建TopicSubController控制器和对应的订阅方法

  1. [ApiController]
  2. [Route("[controller]")]
  3. public class TopicSubController: ControllerBase
  4. {
  5. private readonly ILogger<TopicSubController> _logger;
  6. public TopicSubController(ILogger<TopicSubController> logger)
  7. {
  8. _logger = logger;
  9. }
  10. [Topic("pubsub", "deposit")]
  11. [HttpPost("deposit")]
  12. public ActionResult<Account> Deposit(Transaction transaction)
  13. {
  14. _logger.LogInformation($"Enter deposit {transaction.Id} {transaction.Amount}");
  15. var result = new Account() { Id = transaction.Id, Balance = transaction.Amount };
  16. return result;
  17. }
  18. [Topic("pubsub", "more")]
  19. [HttpPost("more")]
  20. public ActionResult<Account> More(Transaction transaction)
  21. {
  22. _logger.LogInformation($"Enter more {transaction.Id} {transaction.Amount}");
  23. var result = new Account() { Id = transaction.Id, Balance = transaction.Amount };
  24. return result;
  25. }
  26. }
  27. /// <summary>
  28. /// 接收的类型
  29. /// </summary>
  30. public class Transaction
  31. {
  32. public string Id { get; set; }
  33. public decimal Amount { get; set; }
  34. }
  35. /// <summary>
  36. /// 返回的类型
  37. /// </summary>
  38. public class Account
  39. {
  40. public string Id { get; set; }
  41. public decimal Balance { get; set; }
  42. }


需要在Startup的Configure中开启重复读取Body才能读取到数据

  1. app.Use((context, next) =>
  2. {
  3. context.Request.EnableBuffering();
  4. return next();
  5. });


UseCloudEvents在请求处理管道中注册云事件中间件。
MapSubscribeHandler注册一个端点,Dapr运行时将调用该端点来注册发布/订阅主题。除非使用pub/sub,否则不需要这样做。

运行

  1. dapr run --app-id topicserver --app-port 5002 --dapr-http-port 3500 -- dotnet run


这里虽然报错单并不影响运行与工作

创建客户端控制器(PublishController)


添加PublishController发布代码

  1. [ApiController]
  2. [Route("[controller]")]
  3. public class PublishController: ControllerBase
  4. {
  5. private readonly ILogger<PublishController> _logger;
  6. private readonly DaprClient _client;
  7. public PublishController(ILogger<PublishController> logger,DaprClient client )
  8. {
  9. _logger = logger;
  10. _client = client;
  11. }
  12. private static readonly string pubsubName = "pubsub";
  13. private static readonly string topicname1 = "deposit";
  14. private static readonly string topicname2 = "more";
  15. [HttpGet]
  16. public async Task<string> Get()
  17. {
  18. var eventData = new { Id = "17", Amount = 10m };
  19. var metadata = new Dictionary<string, string>()
  20. {
  21. { "ttlInSeconds","10" }
  22. };
  23. await _client.PublishEventAsync(pubsubName, topicname1, eventData,metadata);
  24. _logger.LogInformation("Published deposit event!");
  25. return "ok";
  26. }
  27. [HttpGet("more")]
  28. public async Task<string> GetMore()
  29. {
  30. var eventData = new { Id = "17", Amount = 10m };
  31. var metadata = new Dictionary<string, string>()
  32. {
  33. { "ttlInSeconds","10" }
  34. };
  35. await _client.PublishEventAsync(pubsubName, topicname2, eventData,metadata);
  36. _logger.LogInformation("Published more event!");
  37. return "ok";
  38. }
  39. }


写了两个发布事件,并将消息存在时间设置为10秒。

运行与测试

  1. dapr run --app-id myclient --app-port 5001 --dapr-http-port 3501 -- dotnet run


可以使用Dapr的cli进行测试

  1. dapr publish --publish-app-id topicserver --pubsub pubsub --topic deposit --data '{"Id":"17","Amount":10}'
  2. dapr publish --publish-app-id topicserver --pubsub pubsub --topic more --data '{"Id":"17","Amount":10}'


也可以用http请求的方式

  1. curl http://localhost:5001/Publish/
  2. curl http://localhost:5001/Publish/more


欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739

评价

net core 使用 EF Code First

下面这些内容很老了看这篇:https://www.tnblog.net/aojiancc2/article/details/5365 项目使用多层,把数据库访问...

.net mvc分部页,.net core分部页

.net分部页的三种方式第一种:@Html.Partial(&quot;_分部页&quot;)第二种:@{ Html.RenderPartial(&quot;分部页&quot;);}...

StackExchange.Redis操作redis(net core支持)

官方git开源地址https://github.com/StackExchange/StackExchange.Redis官方文档在docs里边都是官方的文档通过nuget命令下...

.net core 使用session

tip:net core 2.2后可以直接启用session了,不用在自己添加一次session依赖,本身就添加了使用nuget添加引用Microsoft.AspN...

通俗易懂,什么是.net?什么是.net Framework?什么是.net core?

朋友圈@蓝羽 看到一篇文章写的太详细太通俗了,搬过来细细看完,保证你对.NET有个新的认识理解原文地址:https://www.cnblo...

asp.net core2.0 依赖注入 AddTransient与AddScoped的区别

asp.net core主要提供了三种依赖注入的方式其中AddTransient与AddSingleton比较好区别AddTransient瞬时模式:每次都获取一...

.net core 使用 Kestrel

Kestrel介绍 Kestrel是一个基于libuv的跨平台web服务器 在.net core项目中就可以不一定要发布在iis下面了Kestrel体验可以使...

net core中使用cookie

net core中可以使用传统的cookie也可以使用加密的cookieNET CORE中使用传统cookie设置:HttpContext.Response.Cookies.Appe...

net core项目结构简单分析

一:wwwrootwwwroot用于存放网站的静态资源,例如css,js,图片与相关的前端插件等lib主要是第三方的插件,例如微软默认引用...

net core使用EF之DB First

一.新建一个.net core的MVC项目新建好项目后,不能像以前一样直接在新建项中添加ef了,需要用命令在添加ef的依赖二.使用Nug...

.net core使用requestresponse下载文件下载excel等

使用request获取内容net core中request没有直接的索引方法,需要点里边的Query,或者formstringbase64=Request.Form[&quot;f...

iframe自适应高度与配合net core使用

去掉iframe边框frameborder=&quot;0&quot;去掉滚动条scrolling=&quot;no&quot;iframe 自适应高度如果内容是固定的,那么就...

net core启动报错Unable to configure HTTPS endpoint. No server certificate was specified

这是因为net core2.1默认使用的https,如果使用Kestrel web服务器的话没有安装证书就会报这个错其实仔细看他的错误提示,其...

net core中使用url编码与解码操作

net core中暂时还没有以前asp.net与mvc中的server对象。获取url的编码与解码操作不能使用以前的server对象来获取。使用的是...

下载net core

官方下载地址:https://dotnet.microsoft.com/download 进来之后就可以看到最新的下载版本可以直接点击下载,也可以下载其...

net core使用依赖注入来装载EF的上下文对象

妹子情人节快乐~.net core中用了不少的依赖注入,官方文档中也推荐使用。这样使用依赖注入来管理ef对象,还是比较科学,比如...
这一世以无限游戏为使命!
排名
2
文章
634
粉丝
44
评论
93
docker中Sware集群与service
尘叶心繁 : 想学呀!我教你呀
一个bug让程序员走上法庭 索赔金额达400亿日元
叼着奶瓶逛酒吧 : 所以说做程序员也要懂点法律知识
.net core 塑形资源
剑轩 : 收藏收藏
映射AutoMapper
剑轩 : 好是好,这个对效率影响大不大哇,效率高不高
ASP.NET Core 服务注册生命周期
剑轩 : http://www.tnblog.net/aojiancc2/article/details/167
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:50010702506256
欢迎加群交流技术