
Dapr .NetCore 状态管理
使用状态管理,您的应用程序可以将数据作为键/值对存储在支持的状态存储中。
使用状态管理时,您的应用程序可以利用原本复杂且容易出错的功能来构建自己,例如:
——分布式并发和数据一致性
——批量CRUD操作
您的应用程序可以使用 Dapr 的状态管理 API 使用状态存储组件来保存和读取键/值对,如下图所示。例如,通过使用 HTTP POST,您可以保存键/值对,通过使用 HTTP GET,您可以读取键并返回其值。
创建简单的示例
首先创建好webapi的项目,并添加上Dapr.AspNetCore
与Newtonsoft.Json
包,再添加好Dapr服务。
<ItemGroup>
<PackageReference Include="Dapr.AspNetCore" Version="1.4.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
</ItemGroup>
services.AddControllers().AddDapr();
创建一个StateManagerController
控制器,定两个方法分别是:Get与Post;分别对应获取状态与修改状态。MyData
是我们获取与修改的数据类型。storeName
是定义我们的使用的是哪个存储(默认都是这个名字,待会细说)stateKeyName
是我们数据的keyGetStateAsync
获取值SaveStateAsync
保存与修改值
[ApiController]
[Route("[controller]")]
public class StateManagerController: ControllerBase
{
private readonly ILogger<StateManagerController> _logger;
private readonly DaprClient _client;
public StateManagerController(ILogger<StateManagerController> logger,DaprClient client )
{
_logger = logger;
_client = client;
}
/// <summary>
/// 存储key
/// </summary>
private static readonly string stateKeyName = "mystate";
/// <summary>
/// 存储名称
/// </summary>
private static readonly string storeName = "statestore";
/// <summary>
/// 获取mystate存储
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<string> Get()
{
var data = await _client.GetStateAsync<MyData>(storeName,stateKeyName);
string result = data == null ? "" : JsonConvert.SerializeObject(data);
return result;
}
/// <summary>
/// 修改与添加
/// </summary>
/// <returns></returns>
[HttpPost]
public async Task<MyData> Post()
{
var random = new Random();
MyData myData = new MyData(){
Name = $"MyData_Name{random.Next(1,99)}",
Context = $"MyData_Context{random.Next(1,99)}",
Time = DateTime.Now
};
await _client.SaveStateAsync<MyData>(storeName,stateKeyName,myData);
return myData;
}
[HttpDelete]
public async Task<bool> Delete()
{
await _client.DeleteStateAsync(storeName,stateKeyName);
return true;
}
public class MyData
{
public string Name { get; set; }
public string Context { get; set; }
public DateTime Time { get; set; }
}
}
写好后运行项目
# app端口为5001 注意在launchSettings.json中设置运行端口5001
# app的dapr端口为3501
# 相关项目都运行在本地的进程中 ps aux可查看 dapr list也可以
dapr run --app-id myclient --app-port 5001 --dapr-http-port 3501 -- dotnet run
我们可以以dapr的相关命令或者通过http请求的方式进行请求
# 添加修改存储值
dapr invoke --app-id myclient --method StateManager --verb POST
# 获取存储值
dapr invoke --app-id myclient --method StateManager --verb Get
# 还可以做http请求
curl http://localhost:3501/v1.0/state/statestore/mystate
curl http://localhost:5001/StateManager
# Delete 未测试
curl -X DELETE 'http://localhost:3501/v1.0/state/statestore/mystate'
接下来我们来看看storeName
的存储定义,一般是在如下路径
Linux/MacOS:$HOME/.dapr/components/statestore.yaml
Windows:%USERPROFILE%\.dapr\components\statestore.yaml
通过上图的定义我们可以看到,它存储的应用是redis,它的存储名称是statestore
。
我们可以通过它定义的redis去找到我们相关的状态存储
docker exec -it dapr_redis redis-cli
keys *
hgetall myclient||mystate
检索数据,当我们有多个数据需要检索时,可以按照如下代码进行检索。
/// <summary>
/// 检索
/// </summary>
/// <returns></returns>
[HttpGet("bulk")]
public async Task<string> Bulk()
{
var random = new Random();
// 添加数据
for (var i = 0; i < 5; i++)
{
MyData myData = new MyData(){
Name = $"MyData_Name{random.Next(1,99)}",
Context = $"MyData_Context{random.Next(1,99)}",
Time = DateTime.Now
};
await _client.SaveStateAsync<MyData>(storeName,$"key{i}",myData);
}
// 只检索key1与key2
var list = await _client.GetBulkStateAsync(storeName,new string[]{ "key1","key2" },0);
return JsonConvert.SerializeObject(list);
}
# 进行调用
dapr invoke --app-id myclient --method StateManager/bulk --verb GET
# or
curl -X Get -H "Content-Type: application/json" -d '{"keys":["key1", "key2"]}' http://localhost:3501/v1.0/state/statestore/bulk
并发(ETag)
Dapr 支持使用 ETags 的乐观并发控制 (OCC)。当请求状态时,Dapr 总是将 ETag 属性附加到返回的状态。当用户代码尝试更新或删除状态时,应通过请求正文或If-Match删除标头附加 ETag 。只有当提供的 ETag 与状态存储中的 ETag 匹配时,写操作才能成功。
Dapr 选择 OCC 是因为在很多应用中,数据更新冲突很少见,因为客户端自然会被业务上下文分区以对不同的数据进行操作。但是,如果您的应用程序选择使用 ETag,则请求可能会因 ETag 不匹配而被拒绝。建议您在使用 ETag 时使用重试策略来补偿此类冲突。
如果您的应用程序在写入请求时省略 ETag,则 Dapr 在处理请求时会跳过 ETag 检查。与使用 ETag的先写赢模式相比,这实质上启用了最后写赢模式。
简单来说:每次写都会进行更新数据中ETag,所以在写时会先去判断ETag字段是否相等
命令示例
首先,在 statestore 中存储一个对象(此示例使用已定义为statestore
的 Redis):
curl -X POST http://localhost:3501/v1.0/state/statestore \
-H "Content-Type: application/json" \
-d '[
{
"key": "sampleData",
"value": "1"
}
]'
获取对象以查找 statestore 自动设置的 ETag:
curl http://localhost:3501/v1.0/state/statestore/sampleData -v
为了更新或删除对象,只需匹配请求正文(更新)或If-Match标头(删除)中的 ETag 。请注意,当状态更新时,它会收到一个新的 ETag,因此进一步的更新或删除将需要使用新的 ETag。
# Update
curl -X POST http://localhost:3501/v1.0/state/statestore \
-H "Content-Type: application/json" \
-d '[
{
"key": "sampleData",
"value": "2",
"etag": "1"
}
]'
# Delete
curl -X DELETE -H 'If-Match: 1' http://localhost:3501/v1.0/state/statestore/sampleData
代码示例
还是在我们原来的StateManagerController控制器中添加相关的代码。
/// <summary>
/// Etag
/// </summary>
/// <returns></returns>
[HttpPost("etag")]
public async Task<string> ETag()
{
var random = new Random();
// 添加thedtage0数据
MyData myData = new MyData(){
Name = $"MyData_Name{random.Next(1,99)}",
Context = $"MyData_Context{random.Next(1,99)}",
Time = DateTime.Now
};
await _client.SaveStateAsync<MyData>(storeName,"thedtage0",myData);
// 获取thedtage0的 value与etag
var (value,etag) = await _client.GetStateAndETagAsync<MyData>(storeName,"thedtage0");
var sourcedata = JsonConvert.SerializeObject(value);
_logger.LogInformation($"原来的值:{sourcedata}");
// 进行修改后使用该Etage进行保存
value.Context = "Context Change";
var isSaveStateSuccess = await _client.TrySaveStateAsync<MyData>(storeName,"thedtage0",value,etag);
_logger.LogInformation($"ETage:{etag} 修改状态:{isSaveStateSuccess}");
// 进行使用原来的Etage进行删除
var isDeleteStateSuccess = await _client.TryDeleteStateAsync(storeName,"thedtage0",etag);
_logger.LogInformation($"ETage:{etag} 删除状态:{isDeleteStateSuccess}");
return "ok";
}
启动客户端
dapr run --app-id myclient --app-port 5001 --dapr-http-port 3501 -- dotnet run
然后通过dapr cri进行调用
dapr invoke --app-id myclient --method StateManager/etag --verb POST
我们发现删除时失败了,因为此时Etag已经更新了,所以用旧的Etag是不起作用的。只需要重新获取数据即可。
/// <summary>
/// Etag
/// </summary>
/// <returns></returns>
[HttpPost("etag")]
public async Task<string> ETag()
{
var random = new Random();
// 添加thedtage0数据
MyData myData = new MyData(){
Name = $"MyData_Name{random.Next(1,99)}",
Context = $"MyData_Context{random.Next(1,99)}",
Time = DateTime.Now
};
await _client.SaveStateAsync<MyData>(storeName,"thedtage0",myData);
// 获取thedtage0的 value与etag
var (value,etag) = await _client.GetStateAndETagAsync<MyData>(storeName,"thedtage0");
var sourcedata = JsonConvert.SerializeObject(value);
_logger.LogInformation($"原来的值:{sourcedata}");
// 进行修改后使用该Etage进行保存
value.Context = "Context Change";
var isSaveStateSuccess = await _client.TrySaveStateAsync<MyData>(storeName,"thedtage0",value,etag);
_logger.LogInformation($"ETage:{etag} 修改状态:{isSaveStateSuccess}");
// 进行使用新的的Etage进行删除
var (value1,etag1) = await _client.GetStateAndETagAsync<MyData>(storeName,"thedtage0");
var isDeleteStateSuccess = await _client.TryDeleteStateAsync(storeName,"thedtage0",etag1);
_logger.LogInformation($"ETage:{etag1} 删除状态:{isDeleteStateSuccess}");
return "ok";
}
一致性
在确定数据获取的一致性上有两种一致性:strong
与eventual
。默认是eventual
最终一致性,分别定义如下:
最终一致性
——读取时,状态存储可以从任何副本返回数据
——写入时,状态存储确认要更新的内容后异步复制到配置存储
http示例
curl -X POST http://localhost:3501/v1.0/state/statestore \
-H "Content-Type: application/json" \
-d '[
{
"key": "weapon",
"value": "DeathStar",
"options": {
"consistency": "eventual"
}
}
]'
代码示例
[HttpPost("eventual")]
public async Task<string> eventual()
{
// 设置最终一致性默认也是它
var stateoption = new StateOptions(){ Consistency=ConsistencyMode.Eventual };
await _client.SaveStateAsync<string>(storeName,"weapon","DeathStar",stateoption);
var result = await _client.GetStateAsync<string>(storeName,"weapon");
return result;
}
强一致性
——读取时,状态存储应跨副本一致的返回最新数据
——写与删除入时,转台存储应该在完成写请求之前将更新数据同步复制到副本中
http示例
curl -X POST http://localhost:3501/v1.0/state/statestore \
-H "Content-Type: application/json" \
-d '[
{
"key": "weapon",
"value": "DeathStar",
"options": {
"consistency": "strong"
}
}
]'
代码示例
[HttpPost("strong")]
public async Task<string> strong()
{
// 设置最终一致性默认也是它
var stateoption = new StateOptions(){ Consistency=ConsistencyMode.Strong };
await _client.SaveStateAsync<string>(storeName,"weapon","DeathStar",stateoption);
var result = await _client.GetStateAsync<string>(storeName,"weapon");
return result;
}
先写赢还是后写赢
Dapr 允许开发人员在使用数据存储时选择两种常见的并发模式:首先写入获胜和最后写入获胜。First-Write-Wins 在您有多个应用程序实例的情况下很有用,所有实例都同时写入同一个键。
Dapr 的默认模式是后写赢(Last-write-wins
)。
值分别为:first-write
与last-write
先写赢
http示例
curl -X POST http://localhost:3501/v1.0/state/statestore \
-H "Content-Type: application/json" \
-d '[
{
"key": "weapon",
"value": "DeathStar",
"options": {
"concurrency": "first-write"
}
}
]'
代码示例
[HttpPost("FirstWrite")]
public async Task<string> FirstWrite()
{
// 设置最终一致性默认也是它
var stateoption = new StateOptions(){ Concurrency=ConcurrencyMode.FirstWrite };
await _client.SaveStateAsync<string>(storeName,"weapon","DeathStar",stateoption);
var result = await _client.GetStateAsync<string>(storeName,"weapon");
return result;
}
后写赢
http示例
curl -X POST http://localhost:3501/v1.0/state/statestore \
-H "Content-Type: application/json" \
-d '[
{
"key": "weapon",
"value": "DeathStar",
"options": {
"concurrency": "last-write"
}
}
]'
代码示例
[HttpPost("LastWrite")]
public async Task<string> LastWrite()
{
// 设置最终一致性默认也是它
var stateoption = new StateOptions(){ Concurrency=ConcurrencyMode.LastWrite };
await _client.SaveStateAsync<string>(storeName,"weapon","DeathStar",stateoption);
var result = await _client.GetStateAsync<string>(storeName,"weapon");
return result;
}
状态交易
将状态存储的更改保留为多项目事务。(我的理解是:可以同时修改和删除多个状态)
如下示例:
curl -X POST http://localhost:3500/v1.0/state/starwars/transaction \
-H "Content-Type: application/json" \
-d '{
"operations": [
{
"operation": "upsert",
"request": {
"key": "key1",
"value": "myData"
}
},
{
"operation": "delete",
"request": {
"key": "key2"
}
}
]
}'
[HttpGet("transaction")]
public async Task<string> Transaction()
{
var random = new Random();
List<StateTransactionRequest> lstr = new List<StateTransactionRequest>();
// 添加数据
for (var i = 0; i < 5; i++)
{
MyData myData = new MyData(){
Name = $"Trans_Name{random.Next(1,99)}",
Context = $"Trans_Context{random.Next(1,99)}",
Time = DateTime.Now
};
// 创建状态项目事务
// StateOperationType 有两个选项 Upsert 与 Delete
var oneTrans = new StateTransactionRequest(
$"key{i}",
System.Text.Encoding.Default.GetBytes(JsonConvert.SerializeObject(myData)),
StateOperationType.Upsert);
lstr.Add(oneTrans);
}
// 执行需要修改的项目事务
await _client.ExecuteStateTransactionAsync(storeName,lstr);
// 只检索key1与key2
var list = await _client.GetBulkStateAsync(storeName,new string[]{ "key1","key2" },0);
return JsonConvert.SerializeObject(list);
}
测试执行
dapr invoke --app-id myclient --method StateManager/transaction --verb GET
状态生存时间 (TTL)
Dapr 启用每个状态集请求生存时间 (TTL)。这意味着应用程序可以为每个存储的状态设置生存时间,并且这些状态在到期后无法检索。
http请求
# 创建key1 value1 ttl设置为120秒
curl -X POST -H "Content-Type: application/json" -d '[{ "key": "key1", "value": "value1", "metadata": { "ttlInSeconds": "120" } }]' http://localhost:3501/v1.0/state/statestore
代码示例
[HttpPost("ttl")]
public async Task<string> TTL()
{
await _client.SaveStateAsync<string>(
storeName,
"myttl",
"myttl_value",
metadata: new Dictionary<string,string>(){
{"ttlInSeconds","12"}
});
var result = await _client.GetStateAsync<string>(storeName,"myttl");
return result;
}
[HttpGet("ttl")]
public async Task<string> GetTTL()
{
var result = await _client.GetStateAsync<string>(storeName,"myttl");
return result;
}
12秒后,将访问不到数据。
dapr invoke --app-id myclient --method StateManager/ttl --verb POST
dapr invoke --app-id myclient --method StateManager/ttl --verb GET
应用之间共享状态
Dapr 为开发人员提供了在应用程序之间共享状态的不同方式。
在共享状态方面,不同的架构可能有不同的需求。例如,在一种情况下,您可能希望将所有状态封装在给定的应用程序中,并让 Dapr 为您管理访问权限。在不同的场景中,您可能需要让两个在同一状态下工作的应用程序能够获取和保存相同的密钥。
为了启用状态共享,Dapr 支持以下键前缀策略:appid
- 这是默认策略。该appid前缀允许状态,仅通过指定的应用程序进行管理appid。所有状态键都以 为前缀appid,并以应用程序为范围。
(key=<应用程序ID>||< key名称>)name
- 此设置使用状态存储组件的名称作为前缀。对于给定的状态存储,多个应用程序可以共享相同的状态。
(key=<存储名称>||< key名称>)none
- 此设置不使用前缀。多个应用程序在不同的状态存储之间共享状态。
(key=< key名称>)
这三种策略都可以通过修改$HOME/.dapr/components/statestore.yaml
的keyPrefix
键值的不同而设置不同的策略。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"
- name: keyPrefix
value: appid
appid 策略
前面默认都这种存储就不说了,下面贴一张redis存储的图
name 策略
我们通过修改keyPrefix
键的值为name
后,在启动应用时需要指定目录进行启动达到更新的目的;随后通过配置的name策略访问添加的post接口,并进行查看key的结构。其结果为statestore||mystate
dapr run --app-id myclient --app-port 5001 --dapr-http-port 3501 --components-path=$HOME/.dapr/components -- dotnet run
dapr invoke --app-id myclient --method StateManager --verb POST
none 策略
步骤大致如上策略,只是修改keyPrefix
键的值为none
,在redis中其存储的键就是键的名称。
欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739

