.NetCore3.1及以上Server-Sent Events(SSE)轻量级主动推送和Redis发布订阅
1.前言
服务端推送,也称为消息推送或通知推送,是一种允许应用服务器主动将信息发送到客户端的能力,为客户端提供了实时的信息更新和通知,增强了用户体验。
服务端推送的背景与需求主要基于以下几个诉求:
实时通知:在很多情况下,用户期望实时接收到应用的通知,如记录、新订单、新支付
节省资源:如果没有服务端推送,客户端需要通过轮询的方式来获取新信息,会造成客户端、服务端的资源损耗。通过服务端推送,客户端只需要在收到通知时做出响应,大大减少了资源的消耗。
常见推送场景有:一些实时数据,订单状态、称重数据等
一、解决方案:
1、传统实时处理方案:
轮询:这是一种较为传统的方式,客户端会定时地向服务端发送请求,询问是否有新数据。服务端只需要检查数据状态,然后将结果返回给客户端。轮询的优点是实现简单,兼容性好;缺点是可能产生较大的延迟,且对服务端资源消耗较高。
长轮询(Long Polling):轮询的改进版。客户端向服务器发送请求,服务器收到请求后,如果有新的数据,立即返回给客户端;如果没有新数据,服务器会等待一定时间(比如30秒超时时间),在这段时间内,如果有新数据,就返回给客户端,否则返回空数据。客户端处理完服务器返回的响应后,再次发起新的请求,如此反复。长轮询相较于传统的轮询方式减少了请求次数,但仍然存在一定的延迟。
2、HTML5 标准引入的实时处理方案:
WebSocket:一种双向通信协议,同时支持服务端和客户端之间的实时交互。WebSocket 是基于 TCP 的长连接,和HTTP 协议相比,它能实现轻量级的、低延迟的数据传输,非常适合实时通信场景,主要用于交互性强的双向通信。
SSE:SSE(Server-Sent Events)是一种基于 HTTP 协议的推送技术。服务端可以使用 SSE 来向客户端推送数据,但客户端不能通过SSE向服务端发送数据。相较于 WebSocket,SSE 更简单、更轻量级,但只能实现单向通信。
两者的主要区别:
1.通信区别
Server-Sent Events: 单向通信由服务端指向客户端
WebSocket: 双工通信,可以交换数据
2.协议区别
Server-Sent Events: HTTP
WebSocket: WebSocket
3.自动重连
Server-Sent Events: 支持
WebSocket: 不支持需要轮询实现
4.响应文本信息:
Server-Sent Events: 文本格式
WebSocket: 文本格式需要二进制编译或者自定义json文本也是可以
5.浏览器支持:
Server-Sent Events:大部分支持,但在Internet Explorer及早期的Edge浏览器中并不被支持
WebSocket: 主流浏览器(包括移动端)的支持较好
二、前期准备
由于我们是需要SSE和Redis发布与订阅实现数据的主动推送和接受前端入参实现定向订阅操作。
我们在编写程序的时候需要使用到Redis组件:我这里推荐FreeRedis
三、代码编写
高质量的代码往往写的很朴素且很沉默(我们前端就用HTML5,后端就使用控制台和Api了)
下面进入示例了:
1.前端代码
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>SSE Client</title> </head> <body> <h1>Receive: <span id="sse"></span></h1> <script> const numberElement = document.getElementById("sse"); //在这里Code是可以去自定义获取链接的和普通的Query一样,是为了定向订阅Redis的发布数据 const source = new EventSource('http://localhost:8080/GetSSEMessage?code=12345'); source.onmessage = (event) => { numberElement.innerText = event.data; }; source.onerror = (error) => { console.error("SSE error:", error); }; </script> </body> </html>
2.后端Redis发布消息(记得在Nuget上拉取FreeSql)
class Program { static void Main() { for (int i = 0; i < 10000; i++) { RedisClient cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=2"); //发布消息 cli.Publish("12345", "ok"); Console.WriteLine($"发送消息成功,等待接收{DateTime.Now:yyyy-MM-dd HH:mm:ss}"); Thread.Sleep(2000); //10s一次 } } }
上面就是Redis发布的代码,都说了朴实无华
3.后端服务代码:
1.WeatherForecastController.cs
using FreeRedis; using Masuit.Tools; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; using SSEDemo.Dto; using System.Runtime.CompilerServices; namespace SSEDemo.Controllers { [ApiController] [Route("[controller]")] public class WeatherForecastController : ControllerBase { private static readonly string[] Summaries = new[] { "Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching" }; private readonly ILogger<WeatherForecastController> _logger; private readonly IRedisClient _freeRedis; public WeatherForecastController(ILogger<WeatherForecastController> logger, IRedisClient freeRedis) { _logger = logger; _freeRedis = freeRedis; } /// <summary> /// 获取天气预报 /// </summary> /// <returns></returns> [HttpGet] public IEnumerable<WeatherForecast> Get() { return Enumerable.Range(1, 5).Select(index => new WeatherForecast { Date = DateTime.Now.AddDays(index), TemperatureC = Random.Shared.Next(-20, 55), Summary = Summaries[Random.Shared.Next(Summaries.Length)] }) .ToArray(); } /// <summary> /// Server-Sent Event Message /// </summary> /// <param name="code">消息标识</param> /// <returns></returns> [HttpGet("GetSSEMessage")] [AllowAnonymous] public async IAsyncEnumerable<string> GetSSEMessage([FromQuery] string code, [EnumeratorCancellation] CancellationToken cancellationToken) { // 设置响应头 Response.Headers["Content-Type"] = "text/event-stream"; Response.Headers["Cache-Control"] = "no-cache"; Response.Headers["Connection"] = "keep-alive"; if (string.IsNullOrEmpty(code)) { var data = CreateMessage(string.Empty, "请传入有效标识!", false); yield return $"data: {data.ToJsonString()}\n\n"; // 返回错误信息 yield break; // 结束方法 } IDisposable subscription = null; try { // 创建异步流用于心跳 var heartbeatTask = SendHeartbeatAsync(code, cancellationToken); // 创建频道订阅 subscription = _freeRedis.Subscribe(code, async (channel, message) => { var model = CreateMessage(code, string.Empty, true); await WriteResponse(model.ToJsonString(), cancellationToken); }); // 等待直到取消请求 await heartbeatTask; // 等待心跳任务完成 } finally { subscription?.Dispose(); } } /// <summary> /// 发送心跳包 /// </summary> private async Task SendHeartbeatAsync(string code, CancellationToken ct) { while (!ct.IsCancellationRequested) { await Task.Delay(5000, ct); // 每5秒发送一次心跳 var model = CreateMessage(code, string.Empty, true); model.IsHeartbeat = true; await WriteResponse(model.ToJsonString(), ct); // 心跳消息 } } /// <summary> /// 输出数据 /// </summary> /// <param name="jsonData"></param> /// <param name="ct"></param> /// <returns></returns> private async Task WriteResponse(string jsonData, CancellationToken ct) { if (ct.IsCancellationRequested) return; await Response.WriteAsync($"data: {jsonData}\n\n", ct); await Response.Body.FlushAsync(ct); } /// <summary> /// 消息输出 /// </summary> /// <param name="vehicleCode"></param> /// <param name="message"></param> /// <param name="isSuccess"></param> /// <returns></returns> private MessageOutDto CreateMessage(string code, string message, bool isSuccess) { return new MessageOutDto { Code = code, IsSuccess = isSuccess, Message = message, }; } } }
2.MessageOutDto.cs
namespace SSEDemo.Dto { public class MessageOutDto { /// <summary> /// 是否成功 /// </summary> public bool IsSuccess { get; set; } /// <summary> /// 消息 /// </summary> public string Message { get; set; } /// <summary> /// 标记 /// </summary> public string Code { get; set; } /// <summary> /// 是否心跳包 /// </summary> public bool IsHeartbeat { get; set; } = false; } }
3.Program.cs
using FreeRedis; var builder = WebApplication.CreateBuilder(args); builder.Services.AddControllers(); // 注册 FreeRedis 客户端 builder.Services.AddSingleton<IRedisClient>(sp => { var redisConnectionString = "127.0.0.1:6379,password=,defaultDatabase=2"; return new RedisClient(redisConnectionString); }); var app = builder.Build(); // Configure the HTTP request pipeline. app.UseAuthorization(); app.MapControllers(); app.Run();
轻量级SSE配合Redis的订阅发布,实现后端主动推送到前端的代码就完成了