
Dapr .NetCore 订阅与发布
介绍
Pub/Sub 是分布式系统中的一种常见模式,具有许多想要利用解耦异步消息传递的服务。使用 Pub/Sub,您可以启用事件使用者与事件生产者分离的场景。
Dapr 提供了一个具有 At-Least-Once 保证的可扩展 Pub/Sub 系统,允许开发者发布和订阅主题。Dapr 为 pub/sub 提供组件,使运营商能够使用他们喜欢的基础设施,例如 Redis Streams、Kafka 等。
工作原理
Dapr 发布&订阅构建基块提供了一个与平台无关的 API 框架来发送和接收消息。
服务将消息发布到指定主题, 业务服务订阅主题以使用消息。
服务在 Dapr sidecar 上调用 pub/sub API。 然后,sidecar 调用预定义 Dapr pub/sub 组件。
任何编程平台都可以使用 Dapr 本机 API 通过 HTTP 或 gRPC 调用构建基块。 若要发布消息,请进行以下 API 调用:
http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic>
上述调用中有几个特定于 Dapr 的 URL 段:<dapr-port>
提供 Dapr sidecar 侦听的端口号。<pub-sub-name>
提供所选 Dapr pub/sub 组件的名称。<topic>
提供消息发布到的主题的名称。
设置 Pub/Sub 组件
Redis Streams 在运行时默认安装在本地机器上dapr init
。
通过%UserProfile%\.dapr\components\pubsub.yaml
在 Windows 或~/.dapr/components/pubsub.yaml
Linux/MacOS上打开您的组件文件进行验证:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
订阅主题
Dapr 支持两种订阅主题的方法:
——声明性地,订阅是在外部文件中定义的。
——以编程方式,其中订阅在用户代码中定义。
声明式订阅
在.dapr/components
目录下,创建subscription.yaml
来自定义资源定义 (CRD) 订阅主题。
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: mydepositevent-subscription
spec:
topic: deposit
route: /TopicSub
pubsubname: pubsub
scopes:
- topicserver
deposit
是pubsub
组件的事件订阅者。route
告诉 Dapr 将所有主题消息发送到应用程序中的 /TopicSub
端点。scopes
为 Id是 topicserver
启用订阅
代码定义
创建InvokeMethodServer API 项目,配置与依赖如下。
<ItemGroup>
<PackageReference Include="Dapr.AspNetCore" Version="1.4.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
</ItemGroup>
services.AddControllers().AddDapr();
在InvokeMethodServer
项目中创建TopicSubController
控制器和对应的订阅方法
[ApiController]
[Route("[controller]")]
public class TopicSubController: ControllerBase
{
private readonly ILogger<TopicSubController> _logger;
public TopicSubController(ILogger<TopicSubController> logger)
{
_logger = logger;
}
[Topic("pubsub", "deposit")]
[HttpPost("deposit")]
public ActionResult<Account> Deposit(Transaction transaction)
{
_logger.LogInformation($"Enter deposit {transaction.Id} {transaction.Amount}");
var result = new Account() { Id = transaction.Id, Balance = transaction.Amount };
return result;
}
[Topic("pubsub", "more")]
[HttpPost("more")]
public ActionResult<Account> More(Transaction transaction)
{
_logger.LogInformation($"Enter more {transaction.Id} {transaction.Amount}");
var result = new Account() { Id = transaction.Id, Balance = transaction.Amount };
return result;
}
}
/// <summary>
/// 接收的类型
/// </summary>
public class Transaction
{
public string Id { get; set; }
public decimal Amount { get; set; }
}
/// <summary>
/// 返回的类型
/// </summary>
public class Account
{
public string Id { get; set; }
public decimal Balance { get; set; }
}
需要在Startup的Configure中开启重复读取Body才能读取到数据
app.Use((context, next) =>
{
context.Request.EnableBuffering();
return next();
});
UseCloudEvents
在请求处理管道中注册云事件中间件。MapSubscribeHandler
注册一个端点,Dapr运行时将调用该端点来注册发布/订阅主题。除非使用pub/sub,否则不需要这样做。
运行
dapr run --app-id topicserver --app-port 5002 --dapr-http-port 3500 -- dotnet run
这里虽然报错单并不影响运行与工作
创建客户端控制器(PublishController)
添加PublishController发布代码
[ApiController]
[Route("[controller]")]
public class PublishController: ControllerBase
{
private readonly ILogger<PublishController> _logger;
private readonly DaprClient _client;
public PublishController(ILogger<PublishController> logger,DaprClient client )
{
_logger = logger;
_client = client;
}
private static readonly string pubsubName = "pubsub";
private static readonly string topicname1 = "deposit";
private static readonly string topicname2 = "more";
[HttpGet]
public async Task<string> Get()
{
var eventData = new { Id = "17", Amount = 10m };
var metadata = new Dictionary<string, string>()
{
{ "ttlInSeconds","10" }
};
await _client.PublishEventAsync(pubsubName, topicname1, eventData,metadata);
_logger.LogInformation("Published deposit event!");
return "ok";
}
[HttpGet("more")]
public async Task<string> GetMore()
{
var eventData = new { Id = "17", Amount = 10m };
var metadata = new Dictionary<string, string>()
{
{ "ttlInSeconds","10" }
};
await _client.PublishEventAsync(pubsubName, topicname2, eventData,metadata);
_logger.LogInformation("Published more event!");
return "ok";
}
}
写了两个发布事件,并将消息存在时间设置为10秒。
运行与测试
dapr run --app-id myclient --app-port 5001 --dapr-http-port 3501 -- dotnet run
可以使用Dapr的cli进行测试
dapr publish --publish-app-id topicserver --pubsub pubsub --topic deposit --data '{"Id":"17","Amount":10}'
dapr publish --publish-app-id topicserver --pubsub pubsub --topic more --data '{"Id":"17","Amount":10}'
也可以用http请求的方式
curl http://localhost:5001/Publish/
curl http://localhost:5001/Publish/more
欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739

