19. 消息队列
后端手册消息队列大约 9 分钟约 2708 字
提示
消息队列顾名思义就是一个存放消息的队列。最简单的消息队列包含 3 个角色:
- 生产者:将消息存入队列中
- 队列:存放和管理消息
- 消费者: 将消息从队列中取出来并做业务处理。
Admin.NET 已经集成事件总线和消息队列。事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。
事件总线
事件总线通常用于耗时、不用及时反馈等场景,比如登录的时候发短信、发邮件、通知等等,不需要立刻通知,后台处理即可。定义事件订阅者 ToDoEventSubscriber
public class xxxSubscriber : IEventSubscriber
{
private readonly ILogger<xxxSubscriber> _logger;
public xxxSubscriber(ILogger<xxxSubscriber> logger)
{
_logger = logger;
}
[EventSubscribe("ToDo:Add")]
public async Task AddToDo(EventHandlerExecutingContext context)
{
var todo = context.Source;
_logger.LogInformation("事件总线:{Name}", todo.Payload);
await Task.CompletedTask;
}
}
下面是框架事件订阅的实现示例 Admin.NET.Core/EventBus/AppEventSubscriber.cs
namespace Admin.NET.Core;
/// <summary>
/// 事件订阅
/// </summary>
public class AppEventSubscriber : IEventSubscriber, ISingleton, IDisposable
{
private readonly IServiceScope _serviceScope;
public AppEventSubscriber(IServiceScopeFactory scopeFactory)
{
_serviceScope = scopeFactory.CreateScope();
}
/// <summary>
/// 增加异常日志
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
[EventSubscribe(CommonConst.AddExLog)]
public async Task CreateExLog(EventHandlerExecutingContext context)
{
var rep = _serviceScope.ServiceProvider.GetRequiredService<SqlSugarRepository<SysLogEx>>();
await rep.InsertAsync((SysLogEx)context.Source.Payload);
}
/// <summary>
/// 发送异常邮件
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
[EventSubscribe(CommonConst.SendErrorMail)]
public async Task SendOrderErrorMail(EventHandlerExecutingContext context)
{
//var mailTempPath = Path.Combine(App.WebHostEnvironment.WebRootPath, "Temp\\ErrorMail.tp");
//var mailTemp = File.ReadAllText(mailTempPath);
//var mail = await _serviceScope.ServiceProvider.GetRequiredService<IViewEngine>().RunCompileFromCachedAsync(mailTemp, );
var title = "Admin.NET 系统异常";
await _serviceScope.ServiceProvider.GetRequiredService<SysEmailService>().SendEmail(JSON.Serialize(context.Source.Payload), title);
}
/// <summary>
/// 释放服务作用域
/// </summary>
public void Dispose()
{
_serviceScope.Dispose();
}
}
依赖注入 IEventPublisher 服务,指定事件订阅名称即可发送/消费数据
public class xxxController : ControllerBase
{
private readonly IEventPublisher _eventPublisher;
public xxxController(IEventPublisher eventPublisher)
{
_eventPublisher = eventPublisher;
}
// 发布 ToDo:Add 消息
public async Task AddDoTo(string name)
{
await _eventPublisher.PublishAsync(new ChannelEventSource("ToDo:Add", name));
// await _eventPublisher.PublishDelayAsync(new ChannelEventSource("ToDo:Add", name), 5000); // 延迟 5s
}
}
提示
所有自定义的事件订阅者只需要实现/继承 ISingleton
接口,无需手动注册服务,框架已通过 services.AddEventBus() 全局注册所有。
自定义事件源存储器
提示
事件总线默认采用 Channel 作为事件源 IEventSource 存储器,可使用任何消息队列组件进行替换,如 Kafka、RabbitMQ、Redis 等。Admin.NET 已经集成 RabbitMQ
和 Redis
自定义事件源存储器。
RabbitMQ 事件源存储器
具体 RabbitMQ
事件源实现如下, 配置文件 Configuration/EventBus.json
{
"$schema": "https://gitee.com/dotnetchina/Furion/raw/v4/schemas/v4/furion-schema.json",
"EventBus": {
"RabbitMQ": {
"UserName": "adminnet",
"Password": "adminnet++123456",
"HostName": "127.0.0.1",
"Port": 5672
}
}
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Threading.Channels;
namespace Admin.NET.Core;
/// <summary>
/// RabbitMQ自定义事件源存储器
/// </summary>
public class RabbitMQEventSourceStore : IEventSourceStorer
{
/// <summary>
/// 内存通道事件源存储器
/// </summary>
private readonly Channel<IEventSource> _channel;
/// <summary>
/// 通道对象
/// </summary>
private readonly IModel _model;
/// <summary>
/// 连接对象
/// </summary>
private readonly IConnection _connection;
/// <summary>
/// 路由键
/// </summary>
private readonly string _routeKey;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="factory">连接工厂</param>
/// <param name="routeKey">路由键</param>
/// <param name="capacity">存储器最多能够处理多少消息,超过该容量进入等待写入</param>
public RabbitMQEventSourceStore(ConnectionFactory factory, string routeKey, int capacity)
{
// 配置通道,设置超出默认容量后进入等待
var boundedChannelOptions = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
// 创建有限容量通道
_channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);
// 创建连接
_connection = factory.CreateConnection();
_routeKey = routeKey;
// 创建通道
_model = _connection.CreateModel();
// 声明路由队列
_model.QueueDeclare(routeKey, false, false, false, null);
// 创建消息订阅者
var consumer = new EventingBasicConsumer(_model);
// 订阅消息并写入内存 Channel
consumer.Received += (ch, ea) =>
{
// 读取原始消息
var stringEventSource = Encoding.UTF8.GetString(ea.Body.ToArray());
// 转换为 IEventSource,如果自定义了 EventSource,注意属性是可读可写
var eventSource = JSON.Deserialize<ChannelEventSource>(stringEventSource);
// 写入内存管道存储器
_channel.Writer.WriteAsync(eventSource);
// 确认该消息已被消费
_model.BasicAck(ea.DeliveryTag, false);
};
// 启动消费者且设置为手动应答消息
_model.BasicConsume(routeKey, false, consumer);
}
/// <summary>
/// 将事件源写入存储器
/// </summary>
/// <param name="eventSource">事件源对象</param>
/// <param name="cancellationToken">取消任务 Token</param>
/// <returns><see cref="ValueTask"/></returns>
public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
{
if (eventSource == default)
throw new ArgumentNullException(nameof(eventSource));
// 判断是否是 ChannelEventSource 或自定义的 EventSource
if (eventSource is ChannelEventSource source)
{
// 序列化及发布
var data = Encoding.UTF8.GetBytes(JSON.Serialize(source));
_model.BasicPublish("", _routeKey, null, data);
}
else
{
// 处理动态订阅
await _channel.Writer.WriteAsync(eventSource, cancellationToken);
}
}
/// <summary>
/// 从存储器中读取一条事件源
/// </summary>
/// <param name="cancellationToken">取消任务 Token</param>
/// <returns>事件源对象</returns>
public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
{
var eventSource = await _channel.Reader.ReadAsync(cancellationToken);
return eventSource;
}
/// <summary>
/// 释放非托管资源
/// </summary>
public void Dispose()
{
_model.Dispose();
_connection.Dispose();
}
}
Redis 事件源存储器
具体 Redis
事件源实现如下, 配置文件 Configuration/Cache.json
{
"$schema": "https://gitee.com/dotnetchina/Furion/raw/v4/schemas/v4/furion-schema.json",
"Cache": {
"Prefix": "adminnet_", // 全局缓存前缀
"CacheType": "Memory", // Memory、Redis
"Redis": {
"Configuration": "server=127.0.0.1:6379;password=;db=5;", // Redis连接字符串
"Prefix": "adminnet_", // Redis前缀(目前没用)
"MaxMessageSize": "1048576" // 最大消息大小 默认1024 * 1024
}
},
"Cluster": { // 集群配置
"Enabled": false, // 启用集群:前提开启Redis缓存模式
"ServerId": "adminnet", // 服务器标识
"ServerIp": "", // 服务器IP
"SignalR": {
"RedisConfiguration": "127.0.0.1:6379,ssl=false,password=,defaultDatabase=5",
"ChannelPrefix": "signalrPrefix_"
},
"DataProtecteKey": "AdminNet:DataProtection-Keys",
"IsSentinel": false, // 是否哨兵模式
"SentinelConfig": {
"DefaultDb": "4",
"EndPoints": [ // 哨兵端口
// "10.10.0.124:26380"
],
"MainPrefix": "adminNet:",
"Password": "123456",
"SentinelPassword": "adminNet",
"ServiceName": "adminNet",
"SignalRChannelPrefix": "signalR:"
}
}
}
using System.Threading.Channels;
namespace Admin.NET.Core;
/// <summary>
/// Redis自定义事件源存储器
/// </summary>
public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
{
/// <summary>
/// 消费者
/// </summary>
private readonly EventConsumer<ChannelEventSource> _eventConsumer;
/// <summary>
/// 内存通道事件源存储器
/// </summary>
private readonly Channel<IEventSource> _channel;
/// <summary>
/// Redis 连接对象
/// </summary>
private readonly FullRedis _redis;
/// <summary>
/// 路由键
/// </summary>
private readonly string _routeKey;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="redis">Redis 连接对象</param>
/// <param name="routeKey">路由键</param>
/// <param name="capacity">存储器最多能够处理多少消息,超过该容量进入等待写入</param>
public RedisEventSourceStorer(ICache redis, string routeKey, int capacity)
{
// 配置通道,设置超出默认容量后进入等待
var boundedChannelOptions = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
// 创建有限容量通道
_channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);
_redis = redis as FullRedis;
_routeKey = routeKey;
// 创建消息订阅者
_eventConsumer = new EventConsumer<ChannelEventSource>(_redis, _routeKey);
// 订阅消息写入 Channel
_eventConsumer.Received += (send, cr) =>
{
// 反序列化消息
//var eventSource = JsonConvert.DeserializeObject<ChannelEventSource>(cr);
// 写入内存管道存储器
Task.Run(async () =>
{
await _channel.Writer.WriteAsync(cr);
});
};
// 启动消费者
_eventConsumer.Start();
}
/// <summary>
/// 将事件源写入存储器
/// </summary>
/// <param name="eventSource">事件源对象</param>
/// <param name="cancellationToken">取消任务 Token</param>
/// <returns><see cref="ValueTask"/></returns>
public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
{
// 空检查
if (eventSource == default)
{
throw new ArgumentNullException(nameof(eventSource));
}
// 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource
if (eventSource is ChannelEventSource source)
{
// 序列化消息
//var data = JsonSerializer.Serialize(source);
// 获取一个订阅对象
var queue = _redis.GetQueue<ChannelEventSource>(_routeKey);
// 异步发布
await Task.Factory.StartNew(() =>
{
queue.Add(source);
}, cancellationToken, TaskCreationOptions.LongRunning, System.Threading.Tasks.TaskScheduler.Default);
}
else
{
// 这里处理动态订阅问题
await _channel.Writer.WriteAsync(eventSource, cancellationToken);
}
}
/// <summary>
/// 从存储器中读取一条事件源
/// </summary>
/// <param name="cancellationToken">取消任务 Token</param>
/// <returns>事件源对象</returns>
public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
{
// 读取一条事件源
var eventSource = await _channel.Reader.ReadAsync(cancellationToken);
return eventSource;
}
/// <summary>
/// 释放非托管资源
/// </summary>
public async void Dispose()
{
await _eventConsumer.Stop();
GC.SuppressFinalize(this);
}
}
Redis 消息队列
Admin.NET 同时也集成了 Redis 消息队列静态工具类,直接使用即可。
using NewLife.Caching.Queues;
namespace Admin.NET.Core;
/// <summary>
/// Redis 消息队列
/// </summary>
public static class RedisQueue
{
private static readonly ICache _cache = App.GetRequiredService<ICache>();
/// <summary>
/// 获取普通队列
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="topic"></param>
/// <returns></returns>
public static IProducerConsumer<T> GetQueue<T>(string topic)
{
var queue = (_cache as FullRedis).GetQueue<T>(topic);
return queue;
}
/// <summary>
/// 发送一个数据到队列
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="topic"></param>
/// <param name="value"></param>
/// <returns></returns>
public static int AddQueue<T>(string topic, T value)
{
var queue = GetQueue<T>(topic);
return queue.Add(value);
}
/// <summary>
/// 发送一个数据列表到队列
/// </summary>
/// <param name="topic"></param>
/// <param name="value"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static int AddQueueList<T>(string topic, List<T> value)
{
var queue = GetQueue<T>(topic);
var count = queue.Count;
var result = queue.Add(value.ToArray());
return result - count;
}
/// <summary>
/// 获取一批队列消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="topic"></param>
/// <param name="count"></param>
/// <returns></returns>
public static List<T> Take<T>(string topic, int count = 1)
{
var queue = GetQueue<T>(topic);
var result = queue.Take(count).ToList();
return result;
}
/// <summary>
/// 获取一个队列消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="topic"></param>
/// <returns></returns>
public static async Task<T> TakeOneAsync<T>(string topic)
{
var queue = GetQueue<T>(topic);
return await queue.TakeOneAsync(1);
}
/// <summary>
/// 获取可信队列,需要确认
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="topic"></param>
/// <returns></returns>
public static RedisReliableQueue<T> GetRedisReliableQueue<T>(string topic)
{
var queue = (_cache as FullRedis).GetReliableQueue<T>(topic);
return queue;
}
/// <summary>
/// 可信队列回滚
/// </summary>
/// <param name="topic"></param>
/// <param name="retryInterval"></param>
/// <returns></returns>
public static int RollbackAllAck(string topic, int retryInterval = 60)
{
var queue = GetRedisReliableQueue<string>(topic);
queue.RetryInterval = retryInterval;
return queue.RollbackAllAck();
}
/// <summary>
/// 发送一个数据列表到可信队列
/// </summary>
/// <param name="topic"></param>
/// <param name="value"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static int AddReliableQueueList<T>(string topic, List<T> value)
{
var queue = (_cache as FullRedis).GetReliableQueue<T>(topic);
var count = queue.Count;
var result = queue.Add(value.ToArray());
return result - count;
}
/// <summary>
/// 发送一条数据到可信队列
/// </summary>
/// <param name="topic"></param>
/// <param name="value"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static int AddReliableQueue<T>(string topic, T value)
{
var queue = (_cache as FullRedis).GetReliableQueue<T>(topic);
var count = queue.Count;
var result = queue.Add(value);
return result - count;
}
/// <summary>
/// 在可信队列获取一条数据
/// </summary>
/// <param name="topic"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static T ReliableTakeOne<T>(string topic)
{
var queue = GetRedisReliableQueue<T>(topic);
return queue.TakeOne(1);
}
/// <summary>
/// 异步在可信队列获取一条数据
/// </summary>
/// <param name="topic"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static async Task<T> ReliableTakeOneAsync<T>(string topic)
{
var queue = GetRedisReliableQueue<T>(topic);
return await queue.TakeOneAsync(1);
}
/// <summary>
/// 在可信队列获取多条数据
/// </summary>
/// <param name="topic"></param>
/// <param name="count"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static List<T> ReliableTake<T>(string topic, int count)
{
var queue = GetRedisReliableQueue<T>(topic);
return queue.Take(count).ToList();
}
/// <summary>
/// 获取延迟队列
/// </summary>
/// <param name="topic"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static RedisDelayQueue<T> GetDelayQueue<T>(string topic)
{
var queue = (_cache as FullRedis).GetDelayQueue<T>(topic);
return queue;
}
/// <summary>
/// 发送一条数据到延迟队列
/// </summary>
/// <param name="topic"></param>
/// <param name="value"></param>
/// <param name="delay">延迟时间。单位秒</param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static int AddDelayQueue<T>(string topic, T value, int delay)
{
var queue = GetDelayQueue<T>(topic);
return queue.Add(value, delay);
}
/// <summary>
/// 发送数据列表到延迟队列
/// </summary>
/// <param name="topic"></param>
/// <param name="value"></param>
/// <param name="delay"></param>
/// <typeparam name="T">延迟时间。单位秒</typeparam>
/// <returns></returns>
public static int AddDelayQueue<T>(string topic, List<T> value, int delay)
{
var queue = GetDelayQueue<T>(topic);
queue.Delay = delay;
return queue.Add(value.ToArray());
}
/// <summary>
/// 异步在延迟队列获取一条数据
/// </summary>
/// <param name="topic"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static async Task<T> DelayTakeOne<T>(string topic)
{
var queue = GetDelayQueue<T>(topic);
return await queue.TakeOneAsync(1);
}
/// <summary>
/// 在延迟队列获取多条数据
/// </summary>
/// <param name="topic"></param>
/// <param name="count"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static List<T> DelayTake<T>(string topic, int count = 1)
{
var queue = GetDelayQueue<T>(topic);
return queue.Take(count).ToList();
}
}