package vqdtask import ( "context" "easyvqd/internal/core/vqd" "easyvqd/pkg/vqdcms" "fmt" "log/slog" "sync" "time" ) // TaskSuccessCallback 任务执行成功的全局回调 // 参数:taskID=任务ID, executeDuration=预期执行时长, elapsed=实际耗时 type TaskSuccessCallback func(taskID string, executeDuration time.Duration, elapsed time.Duration) // TaskFailCallback 任务执行失败的全局回调 // 参数:taskID=任务ID, err=失败原因, elapsed=实际耗时 type TaskFailCallback func(taskID string, err error, elapsed time.Duration) // TaskTimeoutCallback 任务执行超时的全局回调 // 参数:taskID=任务ID, timeout=超时时间, elapsed=实际耗时 type TaskTimeoutCallback func(taskID string, timeout time.Duration, elapsed time.Duration) // TaskStartCallback 任务开始执行的全局回调 // 参数:taskID=任务ID, executeDuration=预期执行时长, timeout=超时时间 type TaskStartCallback func(task Task, executeDuration time.Duration, timeout time.Duration) // Task 任务接口(新增动态配置方法) type Task interface { ID() string Execute(ctx context.Context) error // 读取配置(带锁) Timeout() time.Duration GetHandleInfo() vqdcms.VQDHandleInfo GetPara() vqdcms.VQDPara ExecuteDuration() time.Duration // 动态修改配置 SetTimeout(d time.Duration) SetExecuteDuration(d time.Duration) } // TaskFlow 任务流管理器(支持动态修改并发数) type TaskFlow struct { runningTasks map[string]Task // 正在运行的任务 waitingTasks []Task // 等待执行的任务 maxConcurrent int // 最大并发数(可动态修改) TaskTemplate *vqd.VqdTaskTemplate // 关联模板 mu sync.RWMutex // 并发安全锁 wg sync.WaitGroup // 等待所有goroutine结束 ctx context.Context // 全局上下文 cancel context.CancelFunc // 全局取消函数 removedTasks map[string]bool // 已标记删除的任务ID rmMutex sync.RWMutex // 删除标记的锁 closed bool // 标记是否已完全关闭 concurrentMu sync.RWMutex // 并发数修改专用锁 // ------------------- 回调相关字段 ------------------- callbackMu sync.RWMutex // 回调操作专用锁 startCallback TaskStartCallback // 开始执行回调 successCallback TaskSuccessCallback // 成功回调 failCallback TaskFailCallback // 失败回调 timeoutCallback TaskTimeoutCallback // 超时回调 } // NewTaskFlow 创建任务流实例 func NewTaskFlow(maxConcurrent int) *TaskFlow { ctx, cancel := context.WithCancel(context.Background()) return &TaskFlow{ runningTasks: make(map[string]Task), waitingTasks: make([]Task, 0), maxConcurrent: maxConcurrent, ctx: ctx, cancel: cancel, removedTasks: make(map[string]bool), closed: false, } } func (tf *TaskFlow) SetTemplate(Temp *vqd.VqdTaskTemplate) { // 加锁修改 tf.concurrentMu.Lock() tf.TaskTemplate = Temp tf.concurrentMu.Unlock() } // SetMaxConcurrent 动态修改最大并发数 // 返回值:true=修改成功,false=任务流已关闭 func (tf *TaskFlow) SetMaxConcurrent(newConcurrent int) bool { // 检查是否已关闭 tf.mu.RLock() closed := tf.closed tf.mu.RUnlock() if closed { slog.Error("任务流已关闭,拒绝修改并发数") return false } if newConcurrent <= 0 { slog.Error("并发数必须大于0,修改失败") return false } // 加锁修改 tf.concurrentMu.Lock() oldConcurrent := tf.maxConcurrent tf.maxConcurrent = newConcurrent tf.concurrentMu.Unlock() slog.Debug("最大并发数已修改为", "old", oldConcurrent, "news", newConcurrent) return true } // AddTask 添加任务到等待队列(锁安全) func (tf *TaskFlow) AddTask(task Task) { // 检查是否已关闭 tf.mu.RLock() closed := tf.closed tf.mu.RUnlock() if closed { slog.Error("任务流已关闭,拒绝添加任务", "id", task.ID()) return } // 检查是否被标记删除 tf.rmMutex.RLock() isRemoved := tf.removedTasks[task.ID()] tf.rmMutex.RUnlock() if isRemoved { slog.Error("任务已被标记删除,跳过添加", "id", task.ID()) return } // 加写锁,确保并发安全 tf.mu.Lock() defer tf.mu.Unlock() // 二次检查关闭状态 if tf.closed { slog.Error("任务流已关闭,拒绝添加任务", "id", task.ID()) return } // 避免重复添加 for _, t := range tf.waitingTasks { if t.ID() == task.ID() { return } } if _, exists := tf.runningTasks[task.ID()]; exists { return } tf.waitingTasks = append(tf.waitingTasks, task) slog.Debug(fmt.Sprintf("任务 %s 已加入等待队列(执行时长: %v, 超时时间: %v)\n", task.ID(), task.ExecuteDuration(), task.Timeout())) } // RemoveTask 删除指定ID的任务(锁安全) func (tf *TaskFlow) RemoveTask(taskID string, immediate bool) bool { // 检查是否已关闭 tf.mu.RLock() closed := tf.closed tf.mu.RUnlock() if closed { slog.Error("任务流已关闭,拒绝删除任务", "id", taskID) return false } // 标记任务为删除 tf.rmMutex.Lock() tf.removedTasks[taskID] = true tf.rmMutex.Unlock() // 加写锁处理任务队列 tf.mu.Lock() defer tf.mu.Unlock() removed := false // 1. 移除等待队列中的任务 newWaiting := make([]Task, 0, len(tf.waitingTasks)) for _, task := range tf.waitingTasks { if task.ID() == taskID { removed = true slog.Debug("从等待队列中删除任务", "id", task.ID()) continue } newWaiting = append(newWaiting, task) } tf.waitingTasks = newWaiting // 2. 处理运行中的任务 if _, exists := tf.runningTasks[taskID]; exists { removed = true if immediate { delete(tf.runningTasks, taskID) slog.Debug("标记任务为删除,立即终止当前执行", "id", taskID) } else { slog.Debug("标记任务为删除,执行完当前周期后移除", "id", taskID) } } // 清理无效标记 if !removed { slog.Debug("任务不存在,删除失败", "id", taskID) tf.rmMutex.Lock() delete(tf.removedTasks, taskID) tf.rmMutex.Unlock() } return removed } // CleanupRemovedTasks 清理已删除任务的残留标记和资源 func (tf *TaskFlow) CleanupRemovedTasks() int { // 检查是否已关闭 tf.mu.RLock() closed := tf.closed tf.mu.RUnlock() if closed { slog.Error("任务流已关闭,拒绝执行清理操作") return 0 } // 步骤1:收集有效任务ID(读锁) tf.mu.RLock() validTaskIDs := make(map[string]bool) for taskID := range tf.runningTasks { validTaskIDs[taskID] = true } for _, task := range tf.waitingTasks { validTaskIDs[task.ID()] = true } tf.mu.RUnlock() // 步骤2:清理无效删除标记 tf.rmMutex.Lock() cleanedCount := 0 for taskID := range tf.removedTasks { if !validTaskIDs[taskID] { delete(tf.removedTasks, taskID) cleanedCount++ slog.Debug("清理无效删除标记", "id", taskID) } } tf.rmMutex.Unlock() // 步骤3:清理等待队列残留 tf.mu.Lock() newWaiting := make([]Task, 0, len(tf.waitingTasks)) for _, task := range tf.waitingTasks { tf.rmMutex.RLock() isRemoved := tf.removedTasks[task.ID()] tf.rmMutex.RUnlock() if !isRemoved { newWaiting = append(newWaiting, task) } else { slog.Debug("清理等待队列中已删除的任务", "id", task.ID()) cleanedCount++ } } tf.waitingTasks = newWaiting tf.mu.Unlock() slog.Debug("清理完成,共清理无效标记/残留任务", "count", cleanedCount) return cleanedCount } // Start 启动任务流 func (tf *TaskFlow) Start() { tf.mu.RLock() closed := tf.closed tf.mu.RUnlock() if closed { slog.Error("任务流已关闭,无法启动") return } go tf.scheduleLoop() slog.Info("任务流已启动") } // Close 完全释放整个任务流 func (tf *TaskFlow) Close() { tf.mu.Lock() defer tf.mu.Unlock() if tf.closed { slog.Error("任务流已处于关闭状态,无需重复关闭") return } // 标记为已关闭 tf.closed = true slog.Info("开始完全释放任务流资源...") // 取消全局上下文 tf.cancel() // 等待所有goroutine退出(临时解锁避免死锁) tf.mu.Unlock() tf.wg.Wait() tf.mu.Lock() // 清空所有队列 tf.waitingTasks = nil tf.runningTasks = nil // 清空删除标记 tf.rmMutex.Lock() tf.removedTasks = nil tf.rmMutex.Unlock() slog.Info("任务流已完全关闭,所有资源已释放") } // Reset 重置任务流 func (tf *TaskFlow) Reset(maxConcurrent int) { tf.Close() tf.mu.Lock() defer tf.mu.Unlock() // 重建上下文 ctx, cancel := context.WithCancel(context.Background()) tf.ctx = ctx tf.cancel = cancel // 重建队列 tf.waitingTasks = make([]Task, 0) tf.runningTasks = make(map[string]Task) tf.removedTasks = make(map[string]bool) // 重置状态 tf.closed = false tf.maxConcurrent = maxConcurrent slog.Debug("任务流已重置,最大并发数", "count", maxConcurrent) } // scheduleLoop 任务调度主循环 func (tf *TaskFlow) scheduleLoop() { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { select { case <-tf.ctx.Done(): slog.Info("任务调度循环已停止") return case <-ticker.C: // 检查是否已关闭 tf.mu.RLock() closed := tf.closed tf.mu.RUnlock() if closed { return } tf.scheduleTasks() } } } // scheduleTasks 调度任务执行(动态读取并发数) func (tf *TaskFlow) scheduleTasks() { // 读取当前运行数和最新并发数 tf.mu.RLock() currentRunning := len(tf.runningTasks) tf.mu.RUnlock() tf.concurrentMu.RLock() maxConcurrent := tf.maxConcurrent tf.concurrentMu.RUnlock() availableSlots := maxConcurrent - currentRunning // 检查是否已关闭 tf.mu.RLock() closed := tf.closed tf.mu.RUnlock() if closed || availableSlots <= 0 { return } // 取出待执行任务 tf.mu.Lock() tasksToRun := make([]Task, 0) if len(tf.waitingTasks) > 0 { take := availableSlots if take > len(tf.waitingTasks) { take = len(tf.waitingTasks) } tasksToRun = tf.waitingTasks[:take] tf.waitingTasks = tf.waitingTasks[take:] } tf.mu.Unlock() // 执行任务 for _, task := range tasksToRun { tf.runTask(task) } } // runTask 执行单个任务(使用最新的任务配置) func (tf *TaskFlow) runTask(task Task) { taskID := task.ID() // 加写锁标记为运行中 tf.mu.Lock() // 检查状态 if tf.closed { tf.mu.Unlock() slog.Error("任务流已关闭,取消执行任务", "id", taskID) return } tf.rmMutex.RLock() isRemoved := tf.removedTasks[taskID] tf.rmMutex.RUnlock() if isRemoved { tf.mu.Unlock() slog.Debug("任务已被删除,取消执行", "id", taskID) return } // 标记为运行中 tf.runningTasks[taskID] = task tf.mu.Unlock() // 执行任务协程 tf.wg.Add(1) go func(t Task) { defer tf.wg.Done() // 获取任务最新的超时配置(执行时读取,保证使用最新值) timeout := t.Timeout() executeDuration := t.ExecuteDuration() // 创建超时上下文(使用最新超时时间) taskCtx, taskCancel := context.WithTimeout(tf.ctx, timeout) defer taskCancel() tf.callbackMu.RLock() defer tf.callbackMu.RUnlock() if tf.startCallback != nil { tf.startCallback(t, executeDuration, timeout) } slog.Debug(fmt.Sprintf("开始执行任务: %s (执行时长: %v, 超时时间: %v)", t.ID(), executeDuration, timeout)) startTime := time.Now() // 执行任务 err := t.Execute(taskCtx) elapsed := time.Since(startTime) // 清理运行标记 tf.mu.Lock() delete(tf.runningTasks, t.ID()) tf.mu.Unlock() // 检查状态 tf.rmMutex.RLock() isRemoved = tf.removedTasks[t.ID()] tf.rmMutex.RUnlock() tf.mu.RLock() closed := tf.closed tf.mu.RUnlock() // 处理执行结果 if err != nil { tf.callbackMu.RLock() defer tf.callbackMu.RUnlock() // 区分超时和普通失败 if ctxErr := taskCtx.Err(); ctxErr == context.DeadlineExceeded { // 超时触发超时回调 if tf.timeoutCallback != nil { tf.timeoutCallback(t.ID(), timeout, elapsed) } } else { // 普通失败触发失败回调 if tf.failCallback != nil { tf.failCallback(t.ID(), err, elapsed) } } slog.Debug(fmt.Sprintf("任务 %s 执行失败: %v (耗时: %v)", t.ID(), err, elapsed)) if !closed && !isRemoved { tf.AddTask(t) } else { slog.Debug("任务已被删除/任务流关闭,失败后不重试", "id", t.ID()) } } else { tf.callbackMu.RLock() defer tf.callbackMu.RUnlock() if tf.successCallback != nil { tf.successCallback(t.ID(), executeDuration, elapsed) } slog.Debug(fmt.Sprintf("任务 %s 执行成功 (预期执行时长: %v, 实际耗时: %v)", t.ID(), executeDuration, elapsed)) if !closed && !isRemoved { tf.AddTask(t) } else { slog.Debug("任务已被删除/任务流关闭,成功后不再循环", "id", t.ID()) tf.rmMutex.Lock() delete(tf.removedTasks, t.ID()) tf.rmMutex.Unlock() } } }(task) } // BatchSetTimeout 批量给【所有任务】设置 超时时间 func (tf *TaskFlow) BatchSetTimeout(timeout time.Duration) { tf.mu.RLock() defer tf.mu.RUnlock() if tf.closed { slog.Error("任务流已关闭,批量设置超时失败") return } // 设置等待队列所有任务 for _, task := range tf.waitingTasks { task.SetTimeout(timeout) } // 设置运行中任务 for _, task := range tf.runningTasks { task.SetTimeout(timeout) } slog.Debug("所有任务超时时间已批量设置为", "time", timeout) } // BatchSetExecuteDuration 批量给【所有任务】设置 执行时长 func (tf *TaskFlow) BatchSetExecuteDuration(duration time.Duration) { tf.mu.RLock() defer tf.mu.RUnlock() if tf.closed { slog.Error("任务流已关闭,批量设置执行时长失败") return } // 设置等待队列 for _, task := range tf.waitingTasks { task.SetExecuteDuration(duration) } // 设置运行中任务 for _, task := range tf.runningTasks { task.SetExecuteDuration(duration) } slog.Debug("所有任务执行时长已批量设置为", "duration", duration) } // BatchSetByIDs 按任务ID列表批量设置 超时+执行时长 func (tf *TaskFlow) BatchSetByIDs(ids []string, timeout, duration time.Duration) { tf.mu.RLock() defer tf.mu.RUnlock() if tf.closed { slog.Error("任务流已关闭,按ID批量设置失败") return } idSet := make(map[string]bool) for _, id := range ids { idSet[id] = true } // 处理等待队列 for _, task := range tf.waitingTasks { if idSet[task.ID()] { task.SetTimeout(timeout) task.SetExecuteDuration(duration) } } // 处理运行中队列 for _, task := range tf.runningTasks { if idSet[task.ID()] { task.SetTimeout(timeout) task.SetExecuteDuration(duration) } } tf.UnregisterAllCallbacks() slog.Debug("所有任务执行按ID批量设置完成", "超时", timeout, "执行时长", duration) } // 判断是否在执行 func (tf *TaskFlow) HasRunning(taskId string) bool { tf.mu.RLock() defer tf.mu.RUnlock() if tf.closed { slog.Error("任务流已关闭") return false } flag := false // 处理运行中队列 for _, task := range tf.runningTasks { if task.ID() == taskId { flag = true break } } return flag } // RegisterSuccessCallback 注册任务成功的全局回调 func (tf *TaskFlow) RegisterSuccessCallback(callback TaskSuccessCallback) { tf.callbackMu.Lock() defer tf.callbackMu.Unlock() tf.successCallback = callback slog.Debug("任务成功全局回调已注册") } // RegisterFailCallback 注册任务失败的全局回调 func (tf *TaskFlow) RegisterFailCallback(callback TaskFailCallback) { tf.callbackMu.Lock() defer tf.callbackMu.Unlock() tf.failCallback = callback slog.Debug("任务失败全局回调已注册") } // RegisterTimeoutCallback 注册任务超时的全局回调 func (tf *TaskFlow) RegisterTimeoutCallback(callback TaskTimeoutCallback) { tf.callbackMu.Lock() defer tf.callbackMu.Unlock() tf.timeoutCallback = callback slog.Debug("任务超时全局回调已注册") } // RegisterStartCallback 注册任务开始执行的全局回调 func (tf *TaskFlow) RegisterStartCallback(callback TaskStartCallback) { tf.callbackMu.Lock() defer tf.callbackMu.Unlock() tf.startCallback = callback slog.Debug("任务开始执行全局回调已注册") } // UnregisterAllCallbacks 注销所有全局回调 func (tf *TaskFlow) UnregisterAllCallbacks() { tf.callbackMu.Lock() defer tf.callbackMu.Unlock() tf.successCallback = nil tf.failCallback = nil tf.timeoutCallback = nil tf.startCallback = nil slog.Info("所有全局回调已注销") }