26.3 任务队列
📝 模块更新日志
-
新特性
-
Enqueue/EnqueueAsync
支持Cron
表达式 实例重载方法 4.8.4.10 ⏱️2023.01.09 #I69HM4
-
以下内容仅限 Furion 4.8.3 +
版本使用。
任务队列可取代旧版本定时任务的 SpareTime.DoIt()
和 SpareTime.DoOnce
功能。
26.3.1 关于任务队列
任务队列常用于管理后台工作,通常这些后台工作在主线程响应之外,不会对主线程或当前线程响应阻塞。任务队列的一个显著特定就是它是一个队列,入队的顺序决定它出队执行的先后。
任务队列使用 Channel
+ Task
+ ThreadPool(线程池)
实现,入队/出队速度非常快,吞吐量极高,内存和 CPU
占用几乎忽略不计。
任务队列应用场景:**对于可能需长时间运行的任务,或不是那么及时的需要立即反馈的任务。**比如发送邮件,发送短信等等。
26.3.2 与事件总线的区别
事件总线基于消息通讯,任务队列最显著的特点就是将操作依次加入队列,然后按照入队的顺序出队去执行。
前者(事件总线)是无序的,只有完全匹配的消息 Id
才会触发执行操作,否则处于 “静待” 状态。
而后者(任务队列)则是将可能耗时且一定会执行的操作放到队列中,之后依次出队执行。
26.3.3 入门指南
任务队列使用非常简单,只需要注册 services.AddTaskQueue()
服务,之后通过依赖注入 ITaskQueue
服务或通过 TaskQueued
静态类使用即可,
1. 注册 TaskQueue
服务
services.AddTaskQueue();
2. 使用 ITaskQueue
服务
using Furion.TaskQueue;
namespace Your.Application;
public class YourService : IYourService
{
private readonly ITaskQueue _taskQueue;
public YourService(ITaskQueue taskQueue)
{
_taskQueue = taskQueue;
}
/// <summary>
/// 同步入队
/// </summary>
public void SyncTask()
{
_taskQueue.Enqueue(provider =>
{
Console.WriteLine("我是同步的");
});
// 如无需使用 provider 参数,可用 _ 替代
_taskQueue.Enqueue(_ => {});
}
/// <summary>
/// 同步入队,延迟 3 秒触发
/// </summary>
public void SyncTask2()
{
_taskQueue.Enqueue(provider =>
{
Console.WriteLine("我是同步的,但我延迟了 3 秒");
}, 3000);
// 如无需使用 provider 参数,可用 _ 替代
_taskQueue.Enqueue(_ => {}, 3000);
}
/// <summary>
/// 异步入队
/// </summary>
public async Task AsyncTask()
{
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
Console.WriteLine("我是异步的");
await ValueTask.CompletedTask;
});
// 如无需使用 provider 和 token 参数,可用 _ 替代
await _taskQueue.EnqueueAsync(async (_, _) => {});
}
/// <summary>
/// 异步入队,延迟 3 秒触发
/// </summary>
public async Task AsyncTask2()
{
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
Console.WriteLine("我是异步的,但我延迟了 3 秒");
await ValueTask.CompletedTask;
}, 3000);
// 如无需使用 provider 和 token 参数,可用 _ 替代
await _taskQueue.EnqueueAsync(async (_, _) => {}, 3000);
}
/// <summary>
/// 同步入队,支持 Cron 表达式延迟
/// </summary>
public void SyncTask3()
{
_taskQueue.Enqueue(provider =>
{
Console.WriteLine("Cron ...");
}, "* * * * *");
// 如无需使用 provider 参数,可用 _ 替代
_taskQueue.Enqueue(_ => {}, "* * * * *", CronStringFormat.Default);
}
/// <summary>
/// 异步入队,支持 Cron 表达式延迟
/// </summary>
public async Task AsyncTask3()
{
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
Console.WriteLine("Cron ...");
await ValueTask.CompletedTask;
}, "* * * * *");
// 如无需使用 provider 和 token 参数,可用 _ 替代
await _taskQueue.EnqueueAsync(async (_, _) => {}, "* * * * *", CronStringFormat.Default);
}
}
框架内置了一套简单的错误策略机制,也就是如果任务执行失败会默认重试 3
次,每次间隔 1秒
,该策略配置暂不对外公开。
26.3.4 TaskQueued
静态类
框架还提供了 TaskQueued
静态类可在任何线程中操作,如:
// 同步入队
TaskQueued.Enqueue((provider) => {}, [delay]);
TaskQueued.Enqueue((provider) => {}, cronExpression, [format]);
// 异步入队
await TaskQueued.EnqueueAsync(async (provider, token) => {}, [delay]);
await TaskQueued.EnqueueAsynce(async (provider, token) => {}, cronExpression, [format]);
26.3.5 在处理程序中使用服务
如果在任务队列处理程序中使用了外部的服务,如:
public class YourService : IYourService
{
private readonly ITaskQueue _taskQueue;
private readonly ILogger<YourService> _logger
public YourService(ITaskQueue taskQueue
, ILogger<YourService> logger)
{
_taskQueue = taskQueue;
_logger = logger;
}
public void SyncTask()
{
_taskQueue.Enqueue(provider =>
{
_logger.LogInformation("我使用了外部的 logger");
});
}
}
那么需要注意的是,**如果使用的外部服务是 **单例
服务,那么无需任何处理,但如果使用的服务属于 瞬时
或 范围
作用域,那么需要创建作用域,如:
_taskQueue.Enqueue(provider =>
{
// Repository 注册为范围,需创建作用域
using var scoped = provider.CreateScope();
var repository = scoped.ServiceProvider.GetService<IRepository<User>>();
// Logger 注册为单例,可以直接使用
_logger.LogInformation("我使用了外部的 logger");
});
26.3.6 订阅执行任务意外异常
任务处理程序使用的是 Task
对象进行创建并执行,但可能存在一些意外且难以捕获的异常,这时候可以通过以下方式订阅:
services.AddTaskQueue(builder =>
{
// 订阅 TaskQueue 意外未捕获异常
builder.UnobservedTaskExceptionHandler = (obj, args) =>
{
// ....
};
});
26.3.7 反馈与建议
给 Furion 提 Issue。