介绍
github地址
CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。
新建项目
新建.net7web项目
安装依赖包
安装软件
安装redis和Sql Server
修改代码
新建RedisConfigModel
namespace CAPStu01.Models;public class RedisConfigModel
{/// <summary>/// 服务器地址/// </summary>public string Host { get; set; }/// <summary>/// 端口号/// </summary>public int Port { get; set; }/// <summary>/// 密码/// </summary>public string Pwd { get; set; }
}
修改appsettings.json
{"Logging": {"LogLevel": {"Default": "Information","Microsoft.AspNetCore": "Warning"}},"ConnectionStrings": {"SQlServer": "server=127.0.0.1;User ID=sa;Password=xxxx;database=capstu;Encrypt=True;TrustServerCertificate=True;connection timeout=600;"},"RedisConfig": {"Host": "127.0.0.1","Port": 6379,"Pwd": ""}
}
修改Program.cs
using CAPStu01.Models;var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
var redisConfig = builder.Configuration.GetSection("RedisConfig").Get<RedisConfigModel>();
var connectionStr = builder.Configuration.GetConnectionString("SQlServer") ?? "";
builder.Services.AddCap(x =>
{x.UseRedis(options =>{if (options.Configuration != null && redisConfig != null){options.Configuration.EndPoints.Add(redisConfig.Host, redisConfig.Port);options.Configuration.Password = redisConfig?.Pwd ?? "";}});x.UseSqlServer(sqlServerOptions =>{sqlServerOptions.Schema = "dbo";sqlServerOptions.ConnectionString = connectionStr;});//开启面板x.UseDashboard(d =>{//允许匿名访问d.AllowAnonymousExplicit = true;});
});
var app = builder.Build();app.UseRouting();
app.MapControllers();
app.Run();
新建HomeController
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;namespace CAPStu01.Controllers;[ApiController]
public class HomeController:ControllerBase
{public HomeController(){}/// <summary>/// 发送消息/// </summary>/// <returns></returns>[HttpGet("/")]public IActionResult Index([FromServices]ICapPublisher capBus){capBus.Publish("test.show.time","你好,CAP");return Content("发送消息成功");}/// <summary>/// 接受消息/// </summary>/// <param name="data"></param>[NonAction][CapSubscribe("test.show.time")]public void ReceiveMessage(string data){Console.WriteLine("message data is:" + data);}
}
结果
如果使用redis需要定期清理streams内容
安装freeredis,修改Program.cs
builder.Services.AddSingleton<IRedisClient>(new RedisClient($"{redisConfig.Host}:{redisConfig.Port},password={redisConfig.Pwd},defaultDatabase=0"));
新增清除方法
private readonly IRedisClient _redisClient;public HomeController(IRedisClient redisClient)
{_redisClient = redisClient;
}/// <summary>
/// 清除已处理的redis数据
/// </summary>
/// <returns></returns>
[HttpGet("/clear")]
public IActionResult ClearAckStream()
{var groups = _redisClient.XInfoGroups("test.show.time");var unreandMsgs = new List<string>();//获取所有的未读消息foreach (var group in groups){if (group.pending > 0){//有未读消息var unReadList = _redisClient.XPending("test.show.time", group.name);if (unReadList.count > 0){var groupInfo = _redisClient.XPending("test.show.time", group.name);var unreandList = _redisClient.XPending("test.show.time", group.name, groupInfo.minId, groupInfo.maxId,groupInfo.count);foreach (var unre in unreandList){unreandMsgs.Add(unre.id);}}}}//获取全部的消息var allMsgs = _redisClient.XRange("test.show.time", "-", "+");foreach (var msg in allMsgs){if (unreandMsgs.Contains(msg.id)){//这个消息未读则跳过continue;}//删除已处理的消息_redisClient.XDel("test.show.time", msg.id);}return Content($"共处理未读消息:{unreandMsgs.Count}个,已读消息{allMsgs.Length}个");
}