❤️ 关注 Furion 微信公众号有惊喜哦!
Skip to main content

26.3 任务队列

📝 模块更新日志
  • 新特性

    •   Enqueue/EnqueueAsync 支持 Cron 表达式 实例重载方法 4.8.4.10 ⏱️2023.01.09 #I69HM4
版本说明

以下内容仅限 Furion 4.8.3 + 版本使用。

小知识

任务队列可取代旧版本定时任务的 SpareTime.DoIt()SpareTime.DoOnce 功能。

26.3.1 关于任务队列

任务队列常用于管理后台工作,通常这些后台工作在主线程响应之外,不会对主线程或当前线程响应阻塞。任务队列的一个显著特定就是它是一个队列,入队的顺序决定它出队执行的先后。

任务队列使用 Channel + Task + ThreadPool(线程池) 实现,入队/出队速度非常快,吞吐量极高,内存和 CPU 占用几乎忽略不计。

任务队列应用场景:**对于可能需长时间运行的任务,或不是那么及时的需要立即反馈的任务。**比如发送邮件,发送短信等等。

26.3.2 与事件总线的区别

事件总线基于消息通讯,任务队列最显著的特点就是将操作依次加入队列,然后按照入队的顺序出队去执行。

前者(事件总线)是无序的,只有完全匹配的消息 Id 才会触发执行操作,否则处于 “静待” 状态。

而后者(任务队列)则是将可能耗时且一定会执行的操作放到队列中,之后依次出队执行

26.3.3 入门指南

任务队列使用非常简单,只需要注册 services.AddTaskQueue() 服务,之后通过依赖注入 ITaskQueue 服务或通过 TaskQueued 静态类使用即可,

1. 注册 TaskQueue 服务

services.AddTaskQueue();

2. 使用 ITaskQueue 服务

using Furion.TaskQueue;

namespace Your.Application;

public class YourService : IYourService
{
private readonly ITaskQueue _taskQueue;
public YourService(ITaskQueue taskQueue)
{
_taskQueue = taskQueue;
}

/// <summary>
/// 同步入队
/// </summary>
public void SyncTask()
{
_taskQueue.Enqueue(provider =>
{
Console.WriteLine("我是同步的");
});

// 如无需使用 provider 参数,可用 _ 替代
_taskQueue.Enqueue(_ => {});
}

/// <summary>
/// 同步入队,延迟 3 秒触发
/// </summary>
public void SyncTask2()
{
_taskQueue.Enqueue(provider =>
{
Console.WriteLine("我是同步的,但我延迟了 3 秒");
}, 3000);

// 如无需使用 provider 参数,可用 _ 替代
_taskQueue.Enqueue(_ => {}, 3000);
}

/// <summary>
/// 异步入队
/// </summary>
public async Task AsyncTask()
{
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
Console.WriteLine("我是异步的");
await ValueTask.CompletedTask;
});

// 如无需使用 provider 和 token 参数,可用 _ 替代
await _taskQueue.EnqueueAsync(async (_, _) => {});
}

/// <summary>
/// 异步入队,延迟 3 秒触发
/// </summary>
public async Task AsyncTask2()
{
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
Console.WriteLine("我是异步的,但我延迟了 3 秒");
await ValueTask.CompletedTask;
}, 3000);

// 如无需使用 provider 和 token 参数,可用 _ 替代
await _taskQueue.EnqueueAsync(async (_, _) => {}, 3000);
}

/// <summary>
/// 同步入队,支持 Cron 表达式延迟
/// </summary>
public void SyncTask3()
{
_taskQueue.Enqueue(provider =>
{
Console.WriteLine("Cron ...");
}, "* * * * *");

// 如无需使用 provider 参数,可用 _ 替代
_taskQueue.Enqueue(_ => {}, "* * * * *", CronStringFormat.Default);
}

/// <summary>
/// 异步入队,支持 Cron 表达式延迟
/// </summary>
public async Task AsyncTask3()
{
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
Console.WriteLine("Cron ...");
await ValueTask.CompletedTask;
}, "* * * * *");

// 如无需使用 provider 和 token 参数,可用 _ 替代
await _taskQueue.EnqueueAsync(async (_, _) => {}, "* * * * *", CronStringFormat.Default);
}
}
注意事项

框架内置了一套简单的错误策略机制,也就是如果任务执行失败会默认重试 3 次,每次间隔 1秒,该策略配置暂不对外公开。

26.3.4 TaskQueued 静态类

框架还提供了 TaskQueued 静态类可在任何线程中操作,如:

// 同步入队
TaskQueued.Enqueue((provider) => {}, [delay]);
TaskQueued.Enqueue((provider) => {}, cronExpression, [format]);

// 异步入队
await TaskQueued.EnqueueAsync(async (provider, token) => {}, [delay]);
await TaskQueued.EnqueueAsynce(async (provider, token) => {}, cronExpression, [format]);

26.3.5 在处理程序中使用服务

如果在任务队列处理程序中使用了外部的服务,如:

public class YourService : IYourService
{
private readonly ITaskQueue _taskQueue;
private readonly ILogger<YourService> _logger

public YourService(ITaskQueue taskQueue
, ILogger<YourService> logger)
{
_taskQueue = taskQueue;
_logger = logger;
}

public void SyncTask()
{
_taskQueue.Enqueue(provider =>
{
_logger.LogInformation("我使用了外部的 logger");
});
}
}

那么需要注意的是,**如果使用的外部服务是 **单例服务,那么无需任何处理,但如果使用的服务属于 瞬时范围 作用域,那么需要创建作用域,如:

_taskQueue.Enqueue(provider =>
{
// Repository 注册为范围,需创建作用域
using var scoped = provider.CreateScope();
var repository = scoped.ServiceProvider.GetService<IRepository<User>>();

// Logger 注册为单例,可以直接使用
_logger.LogInformation("我使用了外部的 logger");
});

26.3.6 订阅执行任务意外异常

任务处理程序使用的是 Task 对象进行创建并执行,但可能存在一些意外且难以捕获的异常,这时候可以通过以下方式订阅:

services.AddTaskQueue(builder =>
{
// 订阅 TaskQueue 意外未捕获异常
builder.UnobservedTaskExceptionHandler = (obj, args) =>
{
// ....
};
});

26.3.7 反馈与建议

与我们交流

给 Furion 提 Issue