EasyAudioEncode/internal/core/transcode/task.go

681 lines
16 KiB
Go
Raw Normal View History

2025-12-31 11:29:58 +08:00
package transcode
import (
"context"
"errors"
"fmt"
"log"
"runtime/debug"
"sync"
"time"
)
// Task 任务定义
type Task struct {
ID string // 任务唯一ID
Type string // 任务类型
EncodeUrl string // 任务音频地址
execute func(ctx context.Context, task *Task) error // 执行函数
ctx context.Context // 任务上下文
cancelFunc context.CancelFunc // 取消函数
status TaskStatus // 任务状态
startTime time.Time // 开始时间
endTime time.Time // 结束时间
createTime time.Time // 创建时间
quit chan error
success chan struct{}
isEnd bool
Duration int64
}
type TaskInfo struct {
Status TaskStatus // 任务状态
StartTime time.Time // 开始时间
EndTime time.Time // 结束时间
CreateTime time.Time // 创建时间
}
// TaskStatus 任务状态
type TaskStatus int
const (
TaskPending TaskStatus = iota // 等待中
TaskRunning // 执行中
TaskCompleted // 已完成
TaskCancelled // 已取消
TaskFailed // 已失败
)
// Workflow 工作流
type Workflow struct {
maxConcurrency int // 最大并发数
semaphore chan struct{} // 并发信号量
tasks map[string]*Task // 所有任务
taskQueue map[string][]string // 按类型分组的任务队列
runningTasks map[string]string // 正在运行的任务类型 -> 任务ID
mu sync.RWMutex // 读写锁
wg sync.WaitGroup // 等待组
cleanupTicker *time.Ticker // 清理定时器
cleanupStop chan struct{} // 清理停止信号
// 清理配置
cleanupInterval time.Duration // 清理间隔
maxTaskHistory int // 最大历史任务数
retentionTime time.Duration // 任务保留时间
// 回调函数
onComplete func(task *Task) // 完成回调
onCancel func(task *Task) // 取消回调
onError func(task *Task, err error) // 错误回调
onCleanup func(task *Task, reason string) // 清理回调
}
// WorkflowConfig 工作流配置
type WorkflowConfig struct {
MaxConcurrency int // 最大并发数默认2
CleanupInterval time.Duration // 清理间隔默认30秒
MaxTaskHistory int // 最大历史任务数默认1000
RetentionTime time.Duration // 任务保留时间默认5分钟
}
// NewWorkflow 创建工作流
func NewWorkflow(config WorkflowConfig) *Workflow {
if config.MaxConcurrency <= 0 {
config.MaxConcurrency = 2
}
if config.CleanupInterval <= 0 {
config.CleanupInterval = 30 * time.Second
}
if config.MaxTaskHistory <= 0 {
config.MaxTaskHistory = 1000
}
if config.RetentionTime <= 0 {
config.RetentionTime = 5 * time.Minute
}
wf := &Workflow{
maxConcurrency: config.MaxConcurrency,
semaphore: make(chan struct{}, config.MaxConcurrency),
tasks: make(map[string]*Task),
taskQueue: make(map[string][]string),
runningTasks: make(map[string]string),
cleanupInterval: config.CleanupInterval,
maxTaskHistory: config.MaxTaskHistory,
retentionTime: config.RetentionTime,
cleanupStop: make(chan struct{}),
}
// 启动清理协程
wf.startCleanupRoutine()
return wf
}
// SetCallbacks 设置回调函数
func (wf *Workflow) SetCallbacks(
onComplete func(task *Task),
onCancel func(task *Task),
onError func(task *Task, err error),
onCleanup func(task *Task, reason string),
) {
wf.mu.Lock()
defer wf.mu.Unlock()
wf.onComplete = onComplete
wf.onCancel = onCancel
wf.onError = onError
wf.onCleanup = onCleanup
}
// AddTask 添加新任务
func (wf *Workflow) AddTask(task *Task) error {
if task == nil {
return errors.New("task cannot be nil")
}
if task.ID == "" {
return errors.New("task ID cannot be empty")
}
if task.execute == nil {
return errors.New("task execute function cannot be nil")
}
wf.mu.Lock()
defer wf.mu.Unlock()
// 检查任务是否已存在
if _, exists := wf.tasks[task.ID]; exists {
return fmt.Errorf("task with ID %s already exists", task.ID)
}
// 设置任务初始状态
task.status = TaskPending
task.ctx, task.cancelFunc = context.WithCancel(context.Background())
task.createTime = time.Now()
// 保存任务到map
wf.tasks[task.ID] = task
// 添加到类型队列
if _, ok := wf.taskQueue[task.Type]; !ok {
wf.taskQueue[task.Type] = make([]string, 0)
}
wf.taskQueue[task.Type] = append(wf.taskQueue[task.Type], task.ID)
// 尝试执行任务(在锁外执行,避免死锁)
go func() {
wf.tryExecuteTask(task.Type)
}()
return nil
}
// tryExecuteTask 尝试执行任务
func (wf *Workflow) tryExecuteTask(taskType string) {
wf.mu.Lock()
// 检查是否有该类型的任务正在运行
if _, isRunning := wf.runningTasks[taskType]; isRunning {
wf.mu.Unlock()
return
}
// 获取该类型的任务队列
queue, exists := wf.taskQueue[taskType]
if !exists || len(queue) == 0 {
wf.mu.Unlock()
return
}
// 获取队列中的第一个任务
taskID := queue[0]
task, taskExists := wf.tasks[taskID]
if !taskExists || task.status != TaskPending {
wf.mu.Unlock()
return
}
wf.mu.Unlock()
// 尝试获取并发许可(非阻塞)
select {
case wf.semaphore <- struct{}{}:
// 获取成功,可以执行
wf.executeTask(task)
default:
// 并发已达上限,等待下次尝试
}
}
// executeTask 执行任务
func (wf *Workflow) executeTask(task *Task) {
wf.mu.Lock()
// 再次检查任务状态
if task.status != TaskPending {
wf.mu.Unlock()
<-wf.semaphore // 释放信号量
return
}
// 更新任务状态
task.status = TaskRunning
task.startTime = time.Now()
// 记录该类型任务正在运行
wf.runningTasks[task.Type] = task.ID
// 从队列中移除(第一个元素)
if queue, exists := wf.taskQueue[task.Type]; exists && len(queue) > 0 {
wf.taskQueue[task.Type] = queue[1:]
}
wf.mu.Unlock()
wf.wg.Add(1)
// 启动任务执行
go func() {
defer func() {
// 恢复panic
if r := recover(); r != nil {
err := fmt.Errorf("task panic: %v\n%s", r, debug.Stack())
wf.handleTaskError(task, err)
}
wf.taskCompleted(task.Type)
wf.wg.Done()
}()
// 设置超时
timeNum := 12 * 60 * time.Minute
if task.Duration > 0 {
2026-02-02 16:16:37 +08:00
timeNum = time.Duration(task.Duration+30) * time.Second
2025-12-31 11:29:58 +08:00
}
ctx, cancel := context.WithTimeout(task.ctx, timeNum)
defer cancel()
// 执行任务
err := task.execute(ctx, task)
// 处理任务结果
wf.mu.Lock()
defer wf.mu.Unlock()
if err != nil {
if errors.Is(err, context.Canceled) {
task.status = TaskCancelled
task.endTime = time.Now()
wf.triggerOnCancel(task)
} else {
task.status = TaskFailed
task.endTime = time.Now()
wf.triggerOnError(task, err)
}
} else {
task.status = TaskCompleted
task.endTime = time.Now()
wf.triggerOnComplete(task)
}
// 释放该类型的运行标记
delete(wf.runningTasks, task.Type)
}()
}
// taskCompleted 任务完成处理
func (wf *Workflow) taskCompleted(taskType string) {
// 释放并发许可
<-wf.semaphore
// 尝试执行下一个同类型任务
go wf.tryExecuteTask(taskType)
// 检查是否有其他类型的任务可以执行
wf.mu.RLock()
allTypes := make([]string, 0, len(wf.taskQueue))
for t := range wf.taskQueue {
allTypes = append(allTypes, t)
}
wf.mu.RUnlock()
for _, t := range allTypes {
if t != taskType {
go wf.tryExecuteTask(t)
}
}
}
// CancelTask 取消任务
func (wf *Workflow) CancelTask(taskID string) bool {
wf.mu.Lock()
defer wf.mu.Unlock()
task, exists := wf.tasks[taskID]
if !exists {
return false
}
switch task.status {
case TaskPending:
// 从队列中移除
if queue, exists := wf.taskQueue[task.Type]; exists {
newQueue := make([]string, 0, len(queue))
for _, id := range queue {
if id != taskID {
newQueue = append(newQueue, id)
}
}
wf.taskQueue[task.Type] = newQueue
}
task.status = TaskCancelled
task.endTime = time.Now()
wf.triggerOnCancel(task)
return true
case TaskRunning:
// 取消正在执行的任务
if task.cancelFunc != nil {
task.cancelFunc()
}
return true
default:
// 已完成或已取消的任务
return false
}
}
// CleanupCompletedTasks 清理已完成的任务
func (wf *Workflow) CleanupCompletedTasks() int {
wf.mu.Lock()
defer wf.mu.Unlock()
return wf.cleanupTasksInternal("manual", false)
}
// CleanupOldTasks 清理旧任务(基于保留时间)
func (wf *Workflow) CleanupOldTasks() int {
wf.mu.Lock()
defer wf.mu.Unlock()
return wf.cleanupTasksInternal("retention", false)
}
// CleanupAllTasks 清理所有任务(谨慎使用)
func (wf *Workflow) CleanupAllTasks(force bool) int {
wf.mu.Lock()
defer wf.mu.Unlock()
return wf.cleanupTasksInternal("all", force)
}
// cleanupTasksInternal 内部清理方法
func (wf *Workflow) cleanupTasksInternal(reason string, force bool) int {
cleanedCount := 0
now := time.Now()
for id, task := range wf.tasks {
// 跳过运行中和等待中的任务(除非强制清理)
if !force && (task.status == TaskRunning || task.status == TaskPending) {
continue
}
// 检查是否应该清理
shouldClean := false
switch reason {
case "manual":
// 手动清理:只清理已结束的任务
shouldClean = task.status == TaskCompleted ||
task.status == TaskCancelled ||
task.status == TaskFailed
case "retention":
// 基于保留时间清理
if task.endTime.IsZero() {
// 如果没有结束时间,使用创建时间
shouldClean = now.Sub(task.createTime) > wf.retentionTime
} else {
shouldClean = now.Sub(task.endTime) > wf.retentionTime
}
case "all":
// 清理所有任务(包括运行中和等待中的)
shouldClean = true
default:
// 默认清理已结束的任务
shouldClean = task.status == TaskCompleted ||
task.status == TaskCancelled ||
task.status == TaskFailed
}
if shouldClean {
// 触发清理回调
if wf.onCleanup != nil {
wf.onCleanup(task, reason)
}
// 从tasks map中删除
delete(wf.tasks, id)
cleanedCount++
}
}
// 清理空的任务队列
for taskType, queue := range wf.taskQueue {
newQueue := make([]string, 0, len(queue))
for _, taskID := range queue {
if _, exists := wf.tasks[taskID]; exists {
newQueue = append(newQueue, taskID)
}
}
wf.taskQueue[taskType] = newQueue
// 如果队列为空,删除该类型
if len(newQueue) == 0 {
delete(wf.taskQueue, taskType)
}
}
// 清理运行任务记录
for taskType, taskID := range wf.runningTasks {
if _, exists := wf.tasks[taskID]; !exists {
delete(wf.runningTasks, taskType)
}
}
return cleanedCount
}
// startCleanupRoutine 启动清理协程
func (wf *Workflow) startCleanupRoutine() {
wf.cleanupTicker = time.NewTicker(wf.cleanupInterval)
go func() {
for {
select {
case <-wf.cleanupTicker.C:
// 定期清理旧任务
wf.mu.Lock()
cleaned := wf.cleanupTasksInternal("retention", false)
wf.mu.Unlock()
if cleaned > 0 {
log.Printf("自动清理了 %d 个旧任务", cleaned)
}
// 检查任务数量,如果超过限制,清理最旧的任务
wf.mu.Lock()
if len(wf.tasks) > wf.maxTaskHistory {
wf.cleanupExcessTasks()
}
wf.mu.Unlock()
case <-wf.cleanupStop:
wf.cleanupTicker.Stop()
return
}
}
}()
}
// cleanupExcessTasks 清理超出限制的任务
func (wf *Workflow) cleanupExcessTasks() {
if len(wf.tasks) <= wf.maxTaskHistory {
return
}
// 收集所有任务并按创建时间排序
tasks := make([]*Task, 0, len(wf.tasks))
for _, task := range wf.tasks {
tasks = append(tasks, task)
}
// 按创建时间排序(最旧的在前面)
for i := 0; i < len(tasks); i++ {
for j := i + 1; j < len(tasks); j++ {
if tasks[j].createTime.Before(tasks[i].createTime) {
tasks[i], tasks[j] = tasks[j], tasks[i]
}
}
}
// 计算需要清理的数量
excess := len(tasks) - wf.maxTaskHistory
// 清理最旧的任务(跳过运行中和等待中的任务)
cleaned := 0
for i := 0; i < len(tasks) && cleaned < excess; i++ {
task := tasks[i]
// 只清理已结束的任务
if task.status == TaskCompleted || task.status == TaskCancelled || task.status == TaskFailed {
if wf.onCleanup != nil {
wf.onCleanup(task, "max_history")
}
delete(wf.tasks, task.ID)
cleaned++
}
}
if cleaned > 0 {
log.Printf("清理了 %d 个任务以保持历史记录不超过 %d", cleaned, wf.maxTaskHistory)
}
}
// GetTaskStatus 获取任务状态
func (wf *Workflow) GetTaskStatus(taskID string) (TaskStatus, bool) {
wf.mu.RLock()
defer wf.mu.RUnlock()
task, exists := wf.tasks[taskID]
if !exists {
return TaskPending, false
}
return task.status, true
}
// GetTaskInfo 获取任务详情
func (wf *Workflow) GetTaskInfo(taskID string) (*TaskInfo, bool) {
wf.mu.RLock()
defer wf.mu.RUnlock()
task, exists := wf.tasks[taskID]
if !exists {
return &TaskInfo{}, false
}
return &TaskInfo{
Status: task.status,
EndTime: task.endTime,
CreateTime: task.createTime,
StartTime: task.startTime,
}, true
}
// GetRunningTasks 获取正在运行的任务
func (wf *Workflow) GetRunningTasks() []*Task {
wf.mu.RLock()
defer wf.mu.RUnlock()
result := make([]*Task, 0)
for _, task := range wf.tasks {
if task.status == TaskRunning {
result = append(result, task)
}
}
return result
}
// GetPendingTasks 获取等待中的任务
func (wf *Workflow) GetPendingTasks() []*Task {
wf.mu.RLock()
defer wf.mu.RUnlock()
result := make([]*Task, 0)
for _, task := range wf.tasks {
if task.status == TaskPending {
result = append(result, task)
}
}
return result
}
// GetPendingTasks 获取等待中的任务
func (wf *Workflow) GetIsTasks(bid string) bool {
wf.mu.RLock()
defer wf.mu.RUnlock()
result := false
for _, task := range wf.tasks {
if task.Type == bid && task.status != TaskPending && task.status != TaskRunning {
result = true
break
}
}
return result
}
// GetTaskCount 获取各种状态的任务数量
func (wf *Workflow) GetTaskCount() (total, running, pending, completed, cancelled, failed int) {
wf.mu.RLock()
defer wf.mu.RUnlock()
total = len(wf.tasks)
for _, task := range wf.tasks {
switch task.status {
case TaskRunning:
running++
case TaskPending:
pending++
case TaskCompleted:
completed++
case TaskCancelled:
cancelled++
case TaskFailed:
failed++
}
}
return
}
// Wait 等待所有任务完成
func (wf *Workflow) Wait() {
wf.wg.Wait()
}
// Stop 停止工作流
func (wf *Workflow) Stop() {
wf.mu.Lock()
defer wf.mu.Unlock()
// 发送清理停止信号
close(wf.cleanupStop)
// 取消所有任务
for _, task := range wf.tasks {
if task.status == TaskPending || task.status == TaskRunning {
if task.cancelFunc != nil {
task.cancelFunc()
}
}
}
}
// 触发回调函数
func (wf *Workflow) triggerOnComplete(task *Task) {
if wf.onComplete != nil {
wf.onComplete(task)
}
}
func (wf *Workflow) triggerOnCancel(task *Task) {
if wf.onCancel != nil {
wf.onCancel(task)
}
}
func (wf *Workflow) triggerOnError(task *Task, err error) {
if wf.onError != nil {
wf.onError(task, err)
}
}
func (wf *Workflow) handleTaskError(task *Task, err error) {
wf.mu.Lock()
defer wf.mu.Unlock()
task.status = TaskFailed
task.endTime = time.Now()
wf.triggerOnError(task, err)
delete(wf.runningTasks, task.Type)
}