diff --git a/configs/config.toml b/configs/config.toml index 284387c..1efee15 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -2,7 +2,7 @@ # 对外提供的服务,建议由 nginx 代理 [Server.HTTP] # http 端口 - Port = 8089 + Port = 8989 # 请求超时时间 Timeout = '1m0s' # jwt 秘钥,空串时,每次启动程序将随机赋值 @@ -37,7 +37,15 @@ # 连续分析帧数(2-64), 默认为10, 最大为 64 FrmNum = 10 # 是否使用深度学习版本, 默认使用深度学习版本 - IsDeepLearn = true + IsDeepLearn = false + # 是否开启轮询 + PollingEnable = false + # 轮询并发数 默认10 最小1 + PollingNum = 2 + # 轮询任务的执行时长 默认60s 最小30s + PollingTime = 120 + # 轮询任务参数模板 + PollingTemplate = 1 [VqdLgtDark] # 默认 0.4, 取值范围: 0~1, 建议范围: 0.2~0.6 diff --git a/domain/version/versionapi/api.go b/domain/version/versionapi/api.go index 0d559e6..86da93a 100644 --- a/domain/version/versionapi/api.go +++ b/domain/version/versionapi/api.go @@ -12,8 +12,8 @@ import ( // 通过修改版本号,来控制是否执行表迁移 var ( - DBVersion = "0.0.21" - DBRemark = "添加告警查询字段" + DBVersion = "0.0.22" + DBRemark = "添加轮询表" ) // NewVersionCore ... diff --git a/internal/conf/config.go b/internal/conf/config.go index b631202..b72c91e 100644 --- a/internal/conf/config.go +++ b/internal/conf/config.go @@ -104,9 +104,13 @@ func (d *Duration) Duration() time.Duration { // 基础配置 type VqdConfig struct { - SaveDay int32 `json:"save_day" comment:"数据保存天数"` - FrmNum int32 `json:"frm_num" comment:"连续分析帧数(2-64), 默认为10, 最大为 64"` - IsDeepLearn bool `json:"is_deep_learn" comment:"是否使用深度学习版本, 默认使用深度学习版本"` + SaveDay int32 `json:"save_day" comment:"数据保存天数"` + FrmNum int32 `json:"frm_num" comment:"连续分析帧数(2-64), 默认为10, 最大为 64"` + IsDeepLearn bool `json:"is_deep_learn" comment:"是否使用深度学习版本, 默认使用深度学习版本"` + PollingEnable bool `json:"polling_enable" comment:"是否开启轮询"` + PollingNum int32 `json:"polling_num" comment:"轮询并发数 默认10 最小1"` + PollingTime int32 `json:"polling_time" comment:"轮询任务的执行时长 默认60s 最小30s"` + PollingTemplate int32 `json:"polling_template" comment:"轮询任务参数模板"` } // 亮度检测 diff --git a/internal/core/vqd/core.go b/internal/core/vqd/core.go index a9e00d5..085a644 100644 --- a/internal/core/vqd/core.go +++ b/internal/core/vqd/core.go @@ -4,6 +4,7 @@ package vqd // Storer data type Storer interface { VqdTask() VqdTaskStorer + VqdPolling() VqdPollingStorer VqdAlarm() VqdAlarmStorer VqdTaskTemplate() VqdTaskTemplateStorer VqdTimeTemplate() VqdTimeTemplateStorer diff --git a/internal/core/vqd/model.go b/internal/core/vqd/model.go index 120e080..94d49d6 100644 --- a/internal/core/vqd/model.go +++ b/internal/core/vqd/model.go @@ -304,3 +304,15 @@ type VqdAlarm struct { func (*VqdAlarm) TableName() string { return "vqd_alarm" } + +type VqdPolling struct { + orm.Model + Name string `gorm:"column:name;notNull;default:'';comment:名称" json:"name"` // 名称 + ChannelID string `gorm:"column:channel_id;notNull;default:'';comment:关联通道" json:"channel_id"` // 关联通道 + ChannelName string `gorm:"column:channel_name;notNull;default:'';comment:关联通道名称" json:"channel_name"` // 关联通道名称 + TaskTemplateID int64 `gorm:"column:task_template_id;notNull;default:0;comment:关联模板" json:"task_template_id"` // 关联模板 +} + +func (*VqdPolling) TableName() string { + return "vqd_polling" +} diff --git a/internal/core/vqd/store/audioencodedb/db.go b/internal/core/vqd/store/audioencodedb/db.go index 03a32cd..9197048 100644 --- a/internal/core/vqd/store/audioencodedb/db.go +++ b/internal/core/vqd/store/audioencodedb/db.go @@ -21,6 +21,9 @@ func NewDB(db *gorm.DB) DB { func (d DB) VqdTask() vqd.VqdTaskStorer { return VqdTask(d) } +func (d DB) VqdPolling() vqd.VqdPollingStorer { + return VqdPolling(d) +} func (d DB) VqdTaskTemplate() vqd.VqdTaskTemplateStorer { return VqdTaskTemplate(d) } @@ -38,6 +41,7 @@ func (d DB) AutoMigrate(ok bool) DB { } if err := d.db.AutoMigrate( new(vqd.VqdTask), + new(vqd.VqdPolling), new(vqd.VqdTaskTemplate), new(vqd.VqdTimeTemplate), new(vqd.VqdAlarm), diff --git a/internal/core/vqd/store/audioencodedb/vqdpolling.go b/internal/core/vqd/store/audioencodedb/vqdpolling.go new file mode 100644 index 0000000..d949ad5 --- /dev/null +++ b/internal/core/vqd/store/audioencodedb/vqdpolling.go @@ -0,0 +1,49 @@ +// Code generated by gowebx, DO AVOID EDIT. +package audioencodedb + +import ( + "context" + "easyvqd/internal/core/vqd" + "git.lnton.com/lnton/pkg/orm" +) + +var _ vqd.VqdPollingStorer = VqdPolling{} + +// VqdPolling Related business namespaces +type VqdPolling DB + +// FindAll implements vqd.VqdPollingStorer. +func (d VqdPolling) FindAll(bs *[]*vqd.VqdPolling) (int64, error) { + db := d.db.Model(&vqd.VqdPolling{}) + var total int64 + if err := db.Count(&total).Error; err != nil || total <= 0 { + // 如果统计失败或者数量为0,则返回错误 + return 0, err + } + return total, db.Find(bs).Error +} + +// Find implements vqd.VqdPollingStorer. +func (d VqdPolling) Find(ctx context.Context, bs *[]*vqd.VqdPolling, page orm.Pager, opts ...orm.QueryOption) (int64, error) { + return orm.FindWithContext(ctx, d.db, bs, page, opts...) +} + +// Get implements vqd.VqdPollingStorer. +func (d VqdPolling) Get(ctx context.Context, model *vqd.VqdPolling, opts ...orm.QueryOption) error { + return orm.FirstWithContext(ctx, d.db, model, opts...) +} + +// Add implements vqd.VqdPollingStorer. +func (d VqdPolling) Add(ctx context.Context, model *vqd.VqdPolling) error { + return d.db.WithContext(ctx).Create(model).Error +} + +// Edit implements vqd.VqdPollingStorer. +func (d VqdPolling) Edit(ctx context.Context, model *vqd.VqdPolling, changeFn func(*vqd.VqdPolling), opts ...orm.QueryOption) error { + return orm.UpdateWithContext(ctx, d.db, model, changeFn, opts...) +} + +// Del implements vqd.VqdPollingStorer. +func (d VqdPolling) Del(ctx context.Context, model *vqd.VqdPolling, opts ...orm.QueryOption) error { + return orm.DeleteWithContext(ctx, d.db, model, opts...) +} diff --git a/internal/core/vqd/vqdpolling.go b/internal/core/vqd/vqdpolling.go new file mode 100644 index 0000000..e9904eb --- /dev/null +++ b/internal/core/vqd/vqdpolling.go @@ -0,0 +1,118 @@ +// Code generated by gowebx, DO AVOID EDIT. +package vqd + +import ( + "context" + "git.lnton.com/lnton/pkg/orm" + "git.lnton.com/lnton/pkg/reason" + "github.com/jinzhu/copier" + "log/slog" +) + +// VqdPollingStorer Instantiation interface +type VqdPollingStorer interface { + Find(context.Context, *[]*VqdPolling, orm.Pager, ...orm.QueryOption) (int64, error) + FindAll(dp *[]*VqdPolling) (int64, error) + Get(context.Context, *VqdPolling, ...orm.QueryOption) error + Add(context.Context, *VqdPolling) error + Edit(context.Context, *VqdPolling, func(*VqdPolling), ...orm.QueryOption) error + Del(context.Context, *VqdPolling, ...orm.QueryOption) error +} + +// FindVqdPollingAll Paginated search +func (c Core) FindVqdPollingAll() ([]*VqdPolling, int64, error) { + items := make([]*VqdPolling, 0) + total, err := c.store.VqdPolling().FindAll(&items) + if err != nil { + return nil, 0, reason.ErrDB.Withf(`Find err[%s]`, err.Error()) + } + return items, total, nil +} + +// FindVqdPolling Paginated search +func (c Core) FindVqdPolling(ctx context.Context, in *FindVqdPollingInput) ([]*VqdPolling, int64, error) { + items := make([]*VqdPolling, 0) + if in.Name != "" { + query := orm.NewQuery(8). + Where("name like ?", "%"+in.Name+"%").OrderBy("created_at DESC") + total, err := c.store.VqdPolling().Find(ctx, &items, in, query.Encode()...) + if err != nil { + return nil, 0, reason.ErrDB.Withf(`Find err[%s]`, err.Error()) + } + return items, total, nil + } else { + query := orm.NewQuery(2).OrderBy("created_at DESC") + total, err := c.store.VqdPolling().Find(ctx, &items, in, query.Encode()...) + if err != nil { + return nil, 0, reason.ErrDB.Withf(`Find err[%s]`, err.Error()) + } + return items, total, nil + } +} + +// GetVqdPolling Query a single object +func (c Core) GetVqdPolling(ctx context.Context, id int) (*VqdPolling, error) { + var out VqdPolling + if err := c.store.VqdPolling().Get(ctx, &out, orm.Where("id=?", id)); err != nil { + if orm.IsErrRecordNotFound(err) { + return nil, reason.ErrNotFound.Withf(`Get err[%s]`, err.Error()) + } + return nil, reason.ErrDB.Withf(`Get err[%s]`, err.Error()) + } + return &out, nil +} + +// GetPollingChannelID Query a single object +func (c Core) GetPollingChannelID(ctx context.Context, chnId string) (*VqdPolling, error) { + var out VqdPolling + if err := c.store.VqdPolling().Get(ctx, &out, orm.Where("channel_id=?", chnId)); err != nil { + if orm.IsErrRecordNotFound(err) { + return nil, reason.ErrNotFound.Withf(`Get err[%s]`, err.Error()) + } + return nil, reason.ErrDB.Withf(`Get err[%s]`, err.Error()) + } + return &out, nil +} + +// AddVqdPolling Insert into database +func (c Core) AddVqdPolling(ctx context.Context, in *AddVqdPollingInput) (*VqdPolling, error) { + var out VqdPolling + if err := copier.Copy(&out, in); err != nil { + slog.Error("Copy", "err", err) + } + if err := c.store.VqdPolling().Add(ctx, &out); err != nil { + return nil, reason.ErrDB.Withf(`Add err[%s]`, err.Error()) + } + return &out, nil +} + +// EditVqdPolling Update object information +func (c Core) EditVqdPolling(ctx context.Context, in *EditVqdPollingInput, id int) (*VqdPolling, error) { + var out VqdPolling + if err := c.store.VqdPolling().Edit(ctx, &out, func(b *VqdPolling) { + if err := copier.Copy(b, in); err != nil { + slog.Error("Copy", "err", err) + } + }, orm.Where("id=?", id)); err != nil { + return nil, reason.ErrDB.Withf(`Edit err[%s]`, err.Error()) + } + return &out, nil +} + +// DelVqdPolling Delete object +func (c Core) DelVqdPolling(ctx context.Context, id int) (*VqdPolling, error) { + var out VqdPolling + if err := c.store.VqdPolling().Del(ctx, &out, orm.Where("id = ?", id)); err != nil { + return nil, reason.ErrDB.Withf(`Del err[%s]`, err.Error()) + } + return &out, nil +} + +// DelVqdPollingAll Delete object +func (c Core) DelVqdPollingAll(ctx context.Context, ids []string) (*VqdPolling, error) { + var out VqdPolling + if err := c.store.VqdPolling().Del(ctx, &out, orm.Where("channel_id in (?)", ids)); err != nil { + return nil, reason.ErrDB.Withf(`Del ids err[%s]`, err.Error()) + } + return &out, nil +} diff --git a/internal/core/vqd/vqdpolling.param.go b/internal/core/vqd/vqdpolling.param.go new file mode 100644 index 0000000..518b42c --- /dev/null +++ b/internal/core/vqd/vqdpolling.param.go @@ -0,0 +1,43 @@ +// Code generated by gowebx, DO AVOID EDIT. +package vqd + +import ( + "git.lnton.com/lnton/pkg/web" +) + +type FindVqdPollingInput struct { + web.PagerFilter + Name string `form:"name"` // 名称 +} + +type EditVqdPollingInput struct { + Name string `json:"name"` // 名称 + ChannelID string `json:"channel_id"` // 关联通道 + ChannelName string `json:"channel_name"` // 通道名称 + TaskTemplateID int `json:"task_template_id"` // 关联模板 + +} +type AddVqdPollingInput struct { + Name string `json:"name"` // 名称 + ChannelID string `json:"channel_id"` // 关联通道 + ChannelName string `json:"channel_name"` // 通道名称 + TaskTemplateID int `json:"task_template_id"` // 关联模板 +} + +type AddVqdPollingAllInput struct { + Items []AddVqdPollingInput `json:"items"` +} +type DelVqdPollingInput struct { + IDs []string `json:"ids"` +} +type GetPollingConfOutput struct { + PollingNum int32 `json:"polling_num"` + PollingTime int32 `json:"polling_time"` + PollingTemplate int32 `json:"polling_template"` +} + +type EditPollingConfInput struct { + PollingNum int32 `json:"polling_num"` + PollingTime int32 `json:"polling_time"` + PollingTemplate int32 `json:"polling_template"` +} diff --git a/internal/core/vqdtask/core.go b/internal/core/vqdtask/core.go index 94b09de..6e7fc9f 100644 --- a/internal/core/vqdtask/core.go +++ b/internal/core/vqdtask/core.go @@ -84,14 +84,30 @@ func NewCore(HostCore *host.Core, VqdTaskCore *vqd.Core, Cfg *conf.Bootstrap) *C } core.HostCore.CbIFrame = func(s string, data []byte, codes int) { if codes == VIDEO_CODEC_H264 { - v, ok := VqdTaskMap.LoadTaskMap(s) - if ok { - v.SendData(data, VIDEO_CODEC_H264) + { + v, ok := VqdTaskMap.LoadTaskMap(s) + if ok { + v.SendData(data, VIDEO_CODEC_H264) + } + } + { + v, ok := VqdPollingTaskMap.LoadTaskMap(fmt.Sprintf("%s_polling", s)) + if ok { + v.SendData(data, VIDEO_CODEC_H264) + } } } else { - v, ok := VqdTaskMap.LoadTaskMap(s) - if ok { - v.SendData(data, VIDEO_CODEC_H265) + { + v, ok := VqdTaskMap.LoadTaskMap(s) + if ok { + v.SendData(data, VIDEO_CODEC_H265) + } + } + { + v, ok := VqdPollingTaskMap.LoadTaskMap(fmt.Sprintf("%s_polling", s)) + if ok { + v.SendData(data, VIDEO_CODEC_H265) + } } } @@ -101,6 +117,8 @@ func NewCore(HostCore *host.Core, VqdTaskCore *vqd.Core, Cfg *conf.Bootstrap) *C time.AfterFunc(time.Duration(5)*time.Second, func() { // 启用诊断分析 core.InitVqdTask() + // 启用轮询诊断任务 + core.StartPolling() }) // 启用定时清理任务 go core.scheduleCleanTask() @@ -119,11 +137,13 @@ func (c *Core) InitVqdTask() { all, _, err := c.VqdTaskCore.FindVqdTaskAll() if err == nil { for _, vqdTask := range all { - errs := c.AddTaskVqd(vqdTask.ID) - if errs != nil { - slog.Error("vqd init add task", "err", errs.Error()) + if vqdTask.Enable { + errs := c.AddTaskVqd(vqdTask.ID) + if errs != nil { + slog.Error("vqd init add task", "err", errs.Error()) + } + time.Sleep(200 * time.Millisecond) } - time.Sleep(200 * time.Millisecond) } } @@ -132,6 +152,8 @@ func (c *Core) InitVqdTask() { func (c *Core) UnVqdTask() { VqdTaskMap.DeleteTaskMapAll() vqdcms.VQDUnInit() + // 停止轮询诊断任务 + c.StopPolling() return } func (c *Core) AddTaskVqd(taskId int) error { @@ -141,6 +163,7 @@ func (c *Core) AddTaskVqd(taskId int) error { slog.Error("vqd add task find", "err", err.Error()) return err } + taskTemplate, err := c.VqdTaskCore.GetIDVqdTaskTemplate(context.TODO(), task.TaskTemplateID) if err != nil { slog.Error("vqd add task find template", "err", err.Error()) diff --git a/internal/core/vqdtask/polling.go b/internal/core/vqdtask/polling.go new file mode 100644 index 0000000..08c1292 --- /dev/null +++ b/internal/core/vqdtask/polling.go @@ -0,0 +1,213 @@ +package vqdtask + +import ( + "context" + "easyvqd/internal/core/vqd" + "easyvqd/pkg/vqdcms" + "fmt" + "log/slog" + "sync" + "time" +) + +var ( + VqdPollingTaskMap = vqdcms.VqdTaskMap{M: make(map[string]*vqdcms.VQDHandle)} + TaskFlowMap *TaskFlow +) + +func (c *Core) StartPolling() { + TaskFlowMap = NewTaskFlow(int(c.Cfg.VqdConfig.PollingNum)) + c.UpdatePollingTask() + // 注册开始执行回调 + TaskFlowMap.RegisterStartCallback(func(task Task, executeDuration time.Duration, timeout time.Duration) { + slog.Debug("[全局回调-开始]", "任务", task.ID(), "预期耗时", executeDuration, "超时阈值", timeout) + // 可扩展:记录任务启动时间、初始化监控指标、打印执行前日志等 + c.AddPollingTaskVqd(task.ID(), task.GetPara(), task.GetHandleInfo()) + }) + TaskFlowMap.RegisterSuccessCallback(func(taskID string, executeDuration time.Duration, elapsed time.Duration) { + slog.Debug("[全局回调-成功]", "任务", taskID, "预期耗时", executeDuration, "实际耗时", elapsed) + // 可扩展:记录日志、更新监控指标、发送通知等 + c.DelPollingTaskVqd(taskID) + }) + + // 注册失败回调 + TaskFlowMap.RegisterFailCallback(func(taskID string, err error, elapsed time.Duration) { + slog.Debug("[全局回调-失败]", "任务", taskID, "原因", err, "实际耗时", elapsed) + // 可扩展:记录错误日志、触发告警、重试策略等 + c.DelPollingTaskVqd(taskID) + }) + + // 注册超时回调 + TaskFlowMap.RegisterTimeoutCallback(func(taskID string, timeout time.Duration, elapsed time.Duration) { + slog.Debug("[全局回调-超时]", "任务", taskID, "超时阈值", timeout, "实际耗时", elapsed) + // 可扩展:发送超时告警、调整任务超时配置等 + c.DelPollingTaskVqd(taskID) + }) + items, _, err := c.VqdTaskCore.FindVqdPollingAll() + if err == nil && len(items) > 0 { + for _, v := range items { + errs := c.AddPollingTask(v) + if errs != nil { + slog.Error("vqd polling add template", "err", errs.Error()) + continue + } + time.Sleep(100 * time.Millisecond) + } + } + // 启动任务流 + TaskFlowMap.Start() + +} +func (c *Core) StopPolling() { + if TaskFlowMap != nil { + // 停止任务流 + TaskFlowMap.Close() + } +} +func (c *Core) HasRunningTask(chnId string) int { + pollingId := fmt.Sprintf("%s_polling", chnId) + if TaskFlowMap.HasRunning(pollingId) { + return 1 + } + return 0 +} +func (c *Core) AddPollingTask(task *vqd.VqdPolling) error { + + chnId := task.ChannelID + pollingId := fmt.Sprintf("%s_polling", chnId) + para := vqdcms.NewVQDPara(TaskFlowMap.TaskTemplate) + info := vqdcms.VQDHandleInfo{ + ChannelID: chnId, + ChannelName: task.ChannelName, + TaskID: task.ID, + TaskName: "轮询", + TemplateID: TaskFlowMap.TaskTemplate.ID, + TemplateName: TaskFlowMap.TaskTemplate.Name, + PlanID: 0, + PlanName: "全量", + Plans: "111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111", + } + newTask := NewPollingTask(pollingId, time.Duration(c.Cfg.VqdConfig.PollingTime)*time.Second, time.Duration(c.Cfg.VqdConfig.PollingTime+1)*time.Second, para, info) + TaskFlowMap.AddTask(newTask) + return nil +} + +func (c *Core) UpdatePollingTask() { + cof := c.Cfg.VqdConfig + TaskFlowMap.BatchSetExecuteDuration(time.Duration(cof.PollingTime) * time.Second) + TaskFlowMap.BatchSetTimeout(time.Duration(cof.PollingTime+1) * time.Second) + TaskFlowMap.SetMaxConcurrent(int(cof.PollingNum)) + + taskTemplate, err := c.VqdTaskCore.GetIDVqdTaskTemplate(context.TODO(), int64(cof.PollingTemplate)) + if err != nil { + slog.Error("vqd polling find template", "err", err.Error()) + } + if taskTemplate != nil { + TaskFlowMap.SetTemplate(taskTemplate) + } +} + +func (c *Core) DelPollingTask(chnId string) error { + pollingId := fmt.Sprintf("%s_polling", chnId) + TaskFlowMap.RemoveTask(pollingId, true) + c.DelPollingTaskVqd(pollingId) + return nil +} + +func (c *Core) AddPollingTaskVqd(pollingId string, para vqdcms.VQDPara, info vqdcms.VQDHandleInfo) { + v := vqdcms.NewVQDHandle(c.ResultCb, c.HostCore, info).Create(para, info.Plans) + VqdPollingTaskMap.StoreChildMap(pollingId, v) +} +func (c *Core) DelPollingTaskVqd(pollingId string) { + v, ok := VqdPollingTaskMap.LoadTaskMap(pollingId) + if ok { + v.Destroy() + } + VqdPollingTaskMap.DeleteTaskMap(pollingId) +} + +type PollingTask struct { + id string + timeout time.Duration + executeDuration time.Duration + info vqdcms.VQDHandleInfo + para vqdcms.VQDPara + mu sync.RWMutex // 任务配置修改锁 + +} + +func NewPollingTask(id string, executeDuration, timeout time.Duration, para vqdcms.VQDPara, info vqdcms.VQDHandleInfo) *PollingTask { + return &PollingTask{ + id: id, + timeout: timeout, + executeDuration: executeDuration, + para: para, + info: info, + } +} + +func (d *PollingTask) ID() string { return d.id } + +// Timeout 读取超时时间(读锁保护) +func (d *PollingTask) Timeout() time.Duration { + d.mu.RLock() + defer d.mu.RUnlock() + return d.timeout +} +func (d *PollingTask) GetPara() vqdcms.VQDPara { + d.mu.RLock() + defer d.mu.RUnlock() + return d.para +} +func (d *PollingTask) GetHandleInfo() vqdcms.VQDHandleInfo { + d.mu.RLock() + defer d.mu.RUnlock() + return d.info +} + +// ExecuteDuration 读取执行时长(读锁保护) +func (d *PollingTask) ExecuteDuration() time.Duration { + d.mu.RLock() + defer d.mu.RUnlock() + return d.executeDuration +} + +// SetTimeout 动态修改超时时间(写锁保护) +func (d *PollingTask) SetTimeout(newTimeout time.Duration) { + d.mu.Lock() + oldTimeout := d.timeout + d.timeout = newTimeout + d.mu.Unlock() + slog.Debug("任务超时时间修改", "id", d.id, "old", oldTimeout, "news", newTimeout) +} + +// SetExecuteDuration 动态修改执行时长(写锁保护) +func (d *PollingTask) SetExecuteDuration(newDuration time.Duration) { + d.mu.Lock() + oldDuration := d.executeDuration + d.executeDuration = newDuration + d.mu.Unlock() + slog.Debug("任务执行时长修改", "id", d.id, "old", oldDuration, "news", newDuration) +} + +func (d *PollingTask) Execute(ctx context.Context) error { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + // 执行时读取最新的执行时长配置 + targetDuration := d.ExecuteDuration() + elapsed := 0 * time.Millisecond + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("任务被取消: %v (预期执行: %v, 已执行: %v)", ctx.Err(), targetDuration, elapsed) + case <-ticker.C: + elapsed += 100 * time.Millisecond + if elapsed >= targetDuration { + fmt.Println("执行成功:", d.ID()) + return nil + } + } + } +} diff --git a/internal/core/vqdtask/task.go b/internal/core/vqdtask/task.go new file mode 100644 index 0000000..26cb83b --- /dev/null +++ b/internal/core/vqdtask/task.go @@ -0,0 +1,640 @@ +package vqdtask + +import ( + "context" + "easyvqd/internal/core/vqd" + "easyvqd/pkg/vqdcms" + "fmt" + "log/slog" + "sync" + "time" +) + +// TaskSuccessCallback 任务执行成功的全局回调 +// 参数:taskID=任务ID, executeDuration=预期执行时长, elapsed=实际耗时 +type TaskSuccessCallback func(taskID string, executeDuration time.Duration, elapsed time.Duration) + +// TaskFailCallback 任务执行失败的全局回调 +// 参数:taskID=任务ID, err=失败原因, elapsed=实际耗时 +type TaskFailCallback func(taskID string, err error, elapsed time.Duration) + +// TaskTimeoutCallback 任务执行超时的全局回调 +// 参数:taskID=任务ID, timeout=超时时间, elapsed=实际耗时 +type TaskTimeoutCallback func(taskID string, timeout time.Duration, elapsed time.Duration) + +// TaskStartCallback 任务开始执行的全局回调 +// 参数:taskID=任务ID, executeDuration=预期执行时长, timeout=超时时间 +type TaskStartCallback func(task Task, executeDuration time.Duration, timeout time.Duration) + +// Task 任务接口(新增动态配置方法) +type Task interface { + ID() string + Execute(ctx context.Context) error + // 读取配置(带锁) + Timeout() time.Duration + GetHandleInfo() vqdcms.VQDHandleInfo + GetPara() vqdcms.VQDPara + ExecuteDuration() time.Duration + // 动态修改配置 + SetTimeout(d time.Duration) + SetExecuteDuration(d time.Duration) +} + +// TaskFlow 任务流管理器(支持动态修改并发数) +type TaskFlow struct { + runningTasks map[string]Task // 正在运行的任务 + waitingTasks []Task // 等待执行的任务 + maxConcurrent int // 最大并发数(可动态修改) + TaskTemplate *vqd.VqdTaskTemplate // 关联模板 + mu sync.RWMutex // 并发安全锁 + wg sync.WaitGroup // 等待所有goroutine结束 + ctx context.Context // 全局上下文 + cancel context.CancelFunc // 全局取消函数 + removedTasks map[string]bool // 已标记删除的任务ID + rmMutex sync.RWMutex // 删除标记的锁 + closed bool // 标记是否已完全关闭 + concurrentMu sync.RWMutex // 并发数修改专用锁 + // ------------------- 回调相关字段 ------------------- + callbackMu sync.RWMutex // 回调操作专用锁 + startCallback TaskStartCallback // 开始执行回调 + successCallback TaskSuccessCallback // 成功回调 + failCallback TaskFailCallback // 失败回调 + timeoutCallback TaskTimeoutCallback // 超时回调 +} + +// NewTaskFlow 创建任务流实例 +func NewTaskFlow(maxConcurrent int) *TaskFlow { + ctx, cancel := context.WithCancel(context.Background()) + return &TaskFlow{ + runningTasks: make(map[string]Task), + waitingTasks: make([]Task, 0), + maxConcurrent: maxConcurrent, + ctx: ctx, + cancel: cancel, + removedTasks: make(map[string]bool), + closed: false, + } +} +func (tf *TaskFlow) SetTemplate(Temp *vqd.VqdTaskTemplate) { + // 加锁修改 + tf.concurrentMu.Lock() + tf.TaskTemplate = Temp + tf.concurrentMu.Unlock() +} + +// SetMaxConcurrent 动态修改最大并发数 +// 返回值:true=修改成功,false=任务流已关闭 +func (tf *TaskFlow) SetMaxConcurrent(newConcurrent int) bool { + // 检查是否已关闭 + tf.mu.RLock() + closed := tf.closed + tf.mu.RUnlock() + if closed { + slog.Error("任务流已关闭,拒绝修改并发数") + return false + } + + if newConcurrent <= 0 { + slog.Error("并发数必须大于0,修改失败") + return false + } + + // 加锁修改 + tf.concurrentMu.Lock() + oldConcurrent := tf.maxConcurrent + tf.maxConcurrent = newConcurrent + tf.concurrentMu.Unlock() + + slog.Debug("最大并发数已修改为", "old", oldConcurrent, "news", newConcurrent) + return true +} + +// AddTask 添加任务到等待队列(锁安全) +func (tf *TaskFlow) AddTask(task Task) { + // 检查是否已关闭 + tf.mu.RLock() + closed := tf.closed + tf.mu.RUnlock() + if closed { + slog.Error("任务流已关闭,拒绝添加任务", "id", task.ID()) + return + } + + // 检查是否被标记删除 + tf.rmMutex.RLock() + isRemoved := tf.removedTasks[task.ID()] + tf.rmMutex.RUnlock() + if isRemoved { + slog.Error("任务已被标记删除,跳过添加", "id", task.ID()) + return + } + + // 加写锁,确保并发安全 + tf.mu.Lock() + defer tf.mu.Unlock() + + // 二次检查关闭状态 + if tf.closed { + slog.Error("任务流已关闭,拒绝添加任务", "id", task.ID()) + return + } + + // 避免重复添加 + for _, t := range tf.waitingTasks { + if t.ID() == task.ID() { + return + } + } + if _, exists := tf.runningTasks[task.ID()]; exists { + return + } + + tf.waitingTasks = append(tf.waitingTasks, task) + slog.Debug(fmt.Sprintf("任务 %s 已加入等待队列(执行时长: %v, 超时时间: %v)\n", task.ID(), task.ExecuteDuration(), task.Timeout())) +} + +// RemoveTask 删除指定ID的任务(锁安全) +func (tf *TaskFlow) RemoveTask(taskID string, immediate bool) bool { + // 检查是否已关闭 + tf.mu.RLock() + closed := tf.closed + tf.mu.RUnlock() + if closed { + slog.Error("任务流已关闭,拒绝删除任务", "id", taskID) + return false + } + + // 标记任务为删除 + tf.rmMutex.Lock() + tf.removedTasks[taskID] = true + tf.rmMutex.Unlock() + + // 加写锁处理任务队列 + tf.mu.Lock() + defer tf.mu.Unlock() + + removed := false + + // 1. 移除等待队列中的任务 + newWaiting := make([]Task, 0, len(tf.waitingTasks)) + for _, task := range tf.waitingTasks { + if task.ID() == taskID { + removed = true + slog.Debug("从等待队列中删除任务", "id", task.ID()) + continue + } + newWaiting = append(newWaiting, task) + } + tf.waitingTasks = newWaiting + + // 2. 处理运行中的任务 + if _, exists := tf.runningTasks[taskID]; exists { + removed = true + if immediate { + delete(tf.runningTasks, taskID) + slog.Debug("标记任务为删除,立即终止当前执行", "id", taskID) + } else { + slog.Debug("标记任务为删除,执行完当前周期后移除", "id", taskID) + } + } + + // 清理无效标记 + if !removed { + slog.Debug("任务不存在,删除失败", "id", taskID) + tf.rmMutex.Lock() + delete(tf.removedTasks, taskID) + tf.rmMutex.Unlock() + } + + return removed +} + +// CleanupRemovedTasks 清理已删除任务的残留标记和资源 +func (tf *TaskFlow) CleanupRemovedTasks() int { + // 检查是否已关闭 + tf.mu.RLock() + closed := tf.closed + tf.mu.RUnlock() + if closed { + slog.Error("任务流已关闭,拒绝执行清理操作") + return 0 + } + + // 步骤1:收集有效任务ID(读锁) + tf.mu.RLock() + validTaskIDs := make(map[string]bool) + for taskID := range tf.runningTasks { + validTaskIDs[taskID] = true + } + for _, task := range tf.waitingTasks { + validTaskIDs[task.ID()] = true + } + tf.mu.RUnlock() + + // 步骤2:清理无效删除标记 + tf.rmMutex.Lock() + cleanedCount := 0 + for taskID := range tf.removedTasks { + if !validTaskIDs[taskID] { + delete(tf.removedTasks, taskID) + cleanedCount++ + slog.Debug("清理无效删除标记", "id", taskID) + } + } + tf.rmMutex.Unlock() + + // 步骤3:清理等待队列残留 + tf.mu.Lock() + newWaiting := make([]Task, 0, len(tf.waitingTasks)) + for _, task := range tf.waitingTasks { + tf.rmMutex.RLock() + isRemoved := tf.removedTasks[task.ID()] + tf.rmMutex.RUnlock() + if !isRemoved { + newWaiting = append(newWaiting, task) + } else { + slog.Debug("清理等待队列中已删除的任务", "id", task.ID()) + cleanedCount++ + } + } + tf.waitingTasks = newWaiting + tf.mu.Unlock() + + slog.Debug("清理完成,共清理无效标记/残留任务", "count", cleanedCount) + return cleanedCount +} + +// Start 启动任务流 +func (tf *TaskFlow) Start() { + tf.mu.RLock() + closed := tf.closed + tf.mu.RUnlock() + if closed { + slog.Error("任务流已关闭,无法启动") + return + } + + go tf.scheduleLoop() + slog.Info("任务流已启动") +} + +// Close 完全释放整个任务流 +func (tf *TaskFlow) Close() { + tf.mu.Lock() + defer tf.mu.Unlock() + + if tf.closed { + slog.Error("任务流已处于关闭状态,无需重复关闭") + return + } + + // 标记为已关闭 + tf.closed = true + slog.Info("开始完全释放任务流资源...") + + // 取消全局上下文 + tf.cancel() + + // 等待所有goroutine退出(临时解锁避免死锁) + tf.mu.Unlock() + tf.wg.Wait() + tf.mu.Lock() + + // 清空所有队列 + tf.waitingTasks = nil + tf.runningTasks = nil + + // 清空删除标记 + tf.rmMutex.Lock() + tf.removedTasks = nil + tf.rmMutex.Unlock() + + slog.Info("任务流已完全关闭,所有资源已释放") +} + +// Reset 重置任务流 +func (tf *TaskFlow) Reset(maxConcurrent int) { + tf.Close() + + tf.mu.Lock() + defer tf.mu.Unlock() + + // 重建上下文 + ctx, cancel := context.WithCancel(context.Background()) + tf.ctx = ctx + tf.cancel = cancel + + // 重建队列 + tf.waitingTasks = make([]Task, 0) + tf.runningTasks = make(map[string]Task) + tf.removedTasks = make(map[string]bool) + + // 重置状态 + tf.closed = false + tf.maxConcurrent = maxConcurrent + + slog.Debug("任务流已重置,最大并发数", "count", maxConcurrent) +} + +// scheduleLoop 任务调度主循环 +func (tf *TaskFlow) scheduleLoop() { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-tf.ctx.Done(): + slog.Info("任务调度循环已停止") + return + case <-ticker.C: + // 检查是否已关闭 + tf.mu.RLock() + closed := tf.closed + tf.mu.RUnlock() + if closed { + return + } + tf.scheduleTasks() + } + } +} + +// scheduleTasks 调度任务执行(动态读取并发数) +func (tf *TaskFlow) scheduleTasks() { + // 读取当前运行数和最新并发数 + tf.mu.RLock() + currentRunning := len(tf.runningTasks) + tf.mu.RUnlock() + + tf.concurrentMu.RLock() + maxConcurrent := tf.maxConcurrent + tf.concurrentMu.RUnlock() + + availableSlots := maxConcurrent - currentRunning + + // 检查是否已关闭 + tf.mu.RLock() + closed := tf.closed + tf.mu.RUnlock() + + if closed || availableSlots <= 0 { + return + } + + // 取出待执行任务 + tf.mu.Lock() + tasksToRun := make([]Task, 0) + if len(tf.waitingTasks) > 0 { + take := availableSlots + if take > len(tf.waitingTasks) { + take = len(tf.waitingTasks) + } + tasksToRun = tf.waitingTasks[:take] + tf.waitingTasks = tf.waitingTasks[take:] + } + tf.mu.Unlock() + + // 执行任务 + for _, task := range tasksToRun { + tf.runTask(task) + } +} + +// runTask 执行单个任务(使用最新的任务配置) +func (tf *TaskFlow) runTask(task Task) { + taskID := task.ID() + + // 加写锁标记为运行中 + tf.mu.Lock() + // 检查状态 + if tf.closed { + tf.mu.Unlock() + slog.Error("任务流已关闭,取消执行任务", "id", taskID) + return + } + tf.rmMutex.RLock() + isRemoved := tf.removedTasks[taskID] + tf.rmMutex.RUnlock() + if isRemoved { + tf.mu.Unlock() + slog.Debug("任务已被删除,取消执行", "id", taskID) + return + } + // 标记为运行中 + tf.runningTasks[taskID] = task + tf.mu.Unlock() + + // 执行任务协程 + tf.wg.Add(1) + go func(t Task) { + defer tf.wg.Done() + + // 获取任务最新的超时配置(执行时读取,保证使用最新值) + timeout := t.Timeout() + executeDuration := t.ExecuteDuration() + + // 创建超时上下文(使用最新超时时间) + taskCtx, taskCancel := context.WithTimeout(tf.ctx, timeout) + defer taskCancel() + tf.callbackMu.RLock() + defer tf.callbackMu.RUnlock() + if tf.startCallback != nil { + tf.startCallback(t, executeDuration, timeout) + } + slog.Debug(fmt.Sprintf("开始执行任务: %s (执行时长: %v, 超时时间: %v)", t.ID(), executeDuration, timeout)) + startTime := time.Now() + + // 执行任务 + err := t.Execute(taskCtx) + elapsed := time.Since(startTime) + + // 清理运行标记 + tf.mu.Lock() + delete(tf.runningTasks, t.ID()) + tf.mu.Unlock() + + // 检查状态 + tf.rmMutex.RLock() + isRemoved = tf.removedTasks[t.ID()] + tf.rmMutex.RUnlock() + tf.mu.RLock() + closed := tf.closed + tf.mu.RUnlock() + + // 处理执行结果 + if err != nil { + tf.callbackMu.RLock() + defer tf.callbackMu.RUnlock() + // 区分超时和普通失败 + if ctxErr := taskCtx.Err(); ctxErr == context.DeadlineExceeded { + // 超时触发超时回调 + if tf.timeoutCallback != nil { + tf.timeoutCallback(t.ID(), timeout, elapsed) + } + } else { + // 普通失败触发失败回调 + if tf.failCallback != nil { + tf.failCallback(t.ID(), err, elapsed) + } + } + slog.Debug(fmt.Sprintf("任务 %s 执行失败: %v (耗时: %v)", t.ID(), err, elapsed)) + if !closed && !isRemoved { + tf.AddTask(t) + } else { + slog.Debug("任务已被删除/任务流关闭,失败后不重试", "id", t.ID()) + } + } else { + tf.callbackMu.RLock() + defer tf.callbackMu.RUnlock() + if tf.successCallback != nil { + tf.successCallback(t.ID(), executeDuration, elapsed) + } + slog.Debug(fmt.Sprintf("任务 %s 执行成功 (预期执行时长: %v, 实际耗时: %v)", t.ID(), executeDuration, elapsed)) + if !closed && !isRemoved { + tf.AddTask(t) + } else { + slog.Debug("任务已被删除/任务流关闭,成功后不再循环", "id", t.ID()) + tf.rmMutex.Lock() + delete(tf.removedTasks, t.ID()) + tf.rmMutex.Unlock() + } + } + }(task) +} + +// BatchSetTimeout 批量给【所有任务】设置 超时时间 +func (tf *TaskFlow) BatchSetTimeout(timeout time.Duration) { + tf.mu.RLock() + defer tf.mu.RUnlock() + + if tf.closed { + slog.Error("任务流已关闭,批量设置超时失败") + return + } + + // 设置等待队列所有任务 + for _, task := range tf.waitingTasks { + task.SetTimeout(timeout) + } + // 设置运行中任务 + for _, task := range tf.runningTasks { + task.SetTimeout(timeout) + } + slog.Debug("所有任务超时时间已批量设置为", "time", timeout) +} + +// BatchSetExecuteDuration 批量给【所有任务】设置 执行时长 +func (tf *TaskFlow) BatchSetExecuteDuration(duration time.Duration) { + tf.mu.RLock() + defer tf.mu.RUnlock() + + if tf.closed { + slog.Error("任务流已关闭,批量设置执行时长失败") + return + } + + // 设置等待队列 + for _, task := range tf.waitingTasks { + task.SetExecuteDuration(duration) + } + // 设置运行中任务 + for _, task := range tf.runningTasks { + task.SetExecuteDuration(duration) + } + slog.Debug("所有任务执行时长已批量设置为", "duration", duration) +} + +// BatchSetByIDs 按任务ID列表批量设置 超时+执行时长 +func (tf *TaskFlow) BatchSetByIDs(ids []string, timeout, duration time.Duration) { + tf.mu.RLock() + defer tf.mu.RUnlock() + + if tf.closed { + slog.Error("任务流已关闭,按ID批量设置失败") + return + } + + idSet := make(map[string]bool) + for _, id := range ids { + idSet[id] = true + } + + // 处理等待队列 + for _, task := range tf.waitingTasks { + if idSet[task.ID()] { + task.SetTimeout(timeout) + task.SetExecuteDuration(duration) + } + } + // 处理运行中队列 + for _, task := range tf.runningTasks { + if idSet[task.ID()] { + task.SetTimeout(timeout) + task.SetExecuteDuration(duration) + } + } + tf.UnregisterAllCallbacks() + slog.Debug("所有任务执行按ID批量设置完成", "超时", timeout, "执行时长", duration) +} + +// 判断是否在执行 +func (tf *TaskFlow) HasRunning(taskId string) bool { + tf.mu.RLock() + defer tf.mu.RUnlock() + if tf.closed { + slog.Error("任务流已关闭") + return false + } + flag := false + // 处理运行中队列 + for _, task := range tf.runningTasks { + if task.ID() == taskId { + flag = true + break + } + } + return flag +} + +// RegisterSuccessCallback 注册任务成功的全局回调 +func (tf *TaskFlow) RegisterSuccessCallback(callback TaskSuccessCallback) { + tf.callbackMu.Lock() + defer tf.callbackMu.Unlock() + tf.successCallback = callback + slog.Debug("任务成功全局回调已注册") +} + +// RegisterFailCallback 注册任务失败的全局回调 +func (tf *TaskFlow) RegisterFailCallback(callback TaskFailCallback) { + tf.callbackMu.Lock() + defer tf.callbackMu.Unlock() + tf.failCallback = callback + slog.Debug("任务失败全局回调已注册") +} + +// RegisterTimeoutCallback 注册任务超时的全局回调 +func (tf *TaskFlow) RegisterTimeoutCallback(callback TaskTimeoutCallback) { + tf.callbackMu.Lock() + defer tf.callbackMu.Unlock() + tf.timeoutCallback = callback + slog.Debug("任务超时全局回调已注册") +} + +// RegisterStartCallback 注册任务开始执行的全局回调 +func (tf *TaskFlow) RegisterStartCallback(callback TaskStartCallback) { + tf.callbackMu.Lock() + defer tf.callbackMu.Unlock() + tf.startCallback = callback + slog.Debug("任务开始执行全局回调已注册") +} + +// UnregisterAllCallbacks 注销所有全局回调 +func (tf *TaskFlow) UnregisterAllCallbacks() { + tf.callbackMu.Lock() + defer tf.callbackMu.Unlock() + tf.successCallback = nil + tf.failCallback = nil + tf.timeoutCallback = nil + tf.startCallback = nil + slog.Info("所有全局回调已注销") +} diff --git a/internal/web/api/vqdpolling.go b/internal/web/api/vqdpolling.go new file mode 100644 index 0000000..b1f7520 --- /dev/null +++ b/internal/web/api/vqdpolling.go @@ -0,0 +1,137 @@ +package api + +import ( + "easyvqd/internal/conf" + "easyvqd/internal/core/host" + "easyvqd/internal/core/vqd" + "fmt" + "git.lnton.com/lnton/pkg/reason" + "github.com/gin-gonic/gin" + "log/slog" + "strconv" +) + +func (a VqdTaskAPI) findVqdPollingChannel(c *gin.Context, in *host.FindChannelsInput) (any, error) { + out, err := a.HostCore.FindChannels(c, in) + if err != nil { + return nil, err + } + rows := make([]map[string]interface{}, 0) + for _, item := range out.Items { + row := make(map[string]interface{}) + row["name"] = item.Name + row["id"] = item.ID + row["status"] = item.Status + row["protocol"] = item.Protocol + row["device_name"] = item.DeviceName + row["is_polling"] = false + chnInfo, errs := a.core.GetPollingChannelID(c.Request.Context(), item.ID) + if errs == nil && chnInfo != nil && chnInfo.ChannelID != "" { + row["is_polling"] = true + } + rows = append(rows, row) + } + + return gin.H{"items": rows, "total": out.Total}, nil +} +func (a VqdTaskAPI) findVqdPolling(c *gin.Context, in *vqd.FindVqdPollingInput) (any, error) { + items, total, err := a.core.FindVqdPolling(c.Request.Context(), in) + rows := make([]map[string]interface{}, 0) + + for _, item := range items { + row := make(map[string]interface{}) + row["name"] = item.Name + row["channel_name"] = item.ChannelName + row["channel_id"] = item.ChannelID + row["task_template_id"] = item.TaskTemplateID + row["id"] = item.ID + row["status"] = a.vqdSdkCore.HasRunningTask(item.ChannelID) + rows = append(rows, row) + } + + return gin.H{"items": rows, "total": total}, err +} +func (a VqdTaskAPI) getVqdPolling(c *gin.Context, _ *struct{}) (any, error) { + ID, _ := strconv.Atoi(c.Param("id")) + item, err := a.core.GetVqdPolling(c.Request.Context(), ID) + if err != nil { + return nil, reason.ErrServer.SetMsg(fmt.Sprintf(`find vqd polling [%d] err [%s]`, ID, err.Error())) + } + row := make(map[string]interface{}) + row["name"] = item.Name + row["channel_name"] = item.ChannelName + row["channel_id"] = item.ChannelID + row["task_template_id"] = item.TaskTemplateID + row["id"] = item.ID + row["status"] = a.vqdSdkCore.HasRunningTask(item.ChannelID) + return gin.H{"data": row}, nil +} +func (a VqdTaskAPI) addVqdPolling(c *gin.Context, data *vqd.AddVqdPollingAllInput) (any, error) { + + for _, item := range data.Items { + task, err := a.core.AddVqdPolling(c.Request.Context(), &item) + if err != nil { + slog.Error(fmt.Sprintf(`add vqd polling err [%s]`, err.Error())) + } else { + errs := a.vqdSdkCore.AddPollingTask(task) + if errs != nil { + slog.Error(fmt.Sprintf(`add vqd polling task err [%s]`, err.Error())) + } + } + + } + + return gin.H{"data": "OK!"}, nil +} + +func (a VqdTaskAPI) editPollingConf(c *gin.Context, in *vqd.EditPollingConfInput) (any, error) { + a.cfg.VqdConfig.PollingTemplate = in.PollingTemplate + a.cfg.VqdConfig.PollingNum = in.PollingNum + a.cfg.VqdConfig.PollingTime = in.PollingTime + err := conf.WriteConfig(a.cfg, a.cfg.ConfigDirPath()) + if err != nil { + return nil, err + } + a.vqdSdkCore.UpdatePollingTask() + return in, nil +} +func (a VqdTaskAPI) getPollingConf(_ *gin.Context, _ *struct{}) (vqd.GetPollingConfOutput, error) { + confMutex.Lock() + defer confMutex.Unlock() + + return vqd.GetPollingConfOutput{ + PollingNum: a.cfg.VqdConfig.PollingNum, + PollingTime: a.cfg.VqdConfig.PollingTime, + PollingTemplate: a.cfg.VqdConfig.PollingTemplate, + }, nil +} + +func (a VqdTaskAPI) delVqdPolling(c *gin.Context, _ *struct{}) (any, error) { + ID, _ := strconv.Atoi(c.Param("id")) + task, err := a.core.DelVqdPolling(c.Request.Context(), ID) + if err != nil { + return nil, reason.ErrServer.SetMsg(fmt.Sprintf(`del vqd polling [%d] err [%s]`, ID, err.Error())) + } + errs := a.vqdSdkCore.DelPollingTask(task.ChannelID) + if errs != nil { + slog.Error(fmt.Sprintf(`del vqd polling task err [%s]`, err.Error())) + } + return gin.H{"data": "OK!"}, err +} + +func (a VqdTaskAPI) delVqdPollingAll(c *gin.Context, in *vqd.DelVqdPollingInput) (any, error) { + if len(in.IDs) == 0 { + return nil, reason.ErrServer.SetMsg(fmt.Sprintf(`del vqdcms ids is empty`)) + } + _, err := a.core.DelVqdPollingAll(c.Request.Context(), in.IDs) + if err != nil { + return nil, reason.ErrServer.SetMsg(fmt.Sprintf(`del vqd polling [%v] err [%s]`, in.IDs, err.Error())) + } + for _, v := range in.IDs { + errs := a.vqdSdkCore.DelPollingTask(v) + if errs != nil { + slog.Error(fmt.Sprintf(`del all vqd polling task err [%s]`, errs.Error())) + } + } + return gin.H{"data": "OK!"}, err +} diff --git a/internal/web/api/vqdtask.go b/internal/web/api/vqdtask.go index c6236c7..213af7e 100644 --- a/internal/web/api/vqdtask.go +++ b/internal/web/api/vqdtask.go @@ -35,6 +35,16 @@ func RegisterVqdTask(g gin.IRouter, api VqdTaskAPI, handler ...gin.HandlerFunc) groupTask.POST("", web.WarpH(api.addVqdTask)) groupTask.DELETE("/:id", web.WarpH(api.delVqdTask)) + groupPolling := g.Group("/api/polling", handler...) + groupPolling.GET("/channels", web.WarpH(api.findVqdPollingChannel)) + groupPolling.GET("", web.WarpH(api.findVqdPolling)) + groupPolling.GET("/:id", web.WarpH(api.getVqdPolling)) + groupPolling.POST("", web.WarpH(api.addVqdPolling)) + groupPolling.DELETE("/:id", web.WarpH(api.delVqdPolling)) + groupPolling.DELETE("", web.WarpH(api.delVqdPollingAll)) + groupPolling.GET("/conf", web.WarpH(api.getPollingConf)) + groupPolling.PUT("/conf", web.WarpH(api.editPollingConf)) + groupTemplate := g.Group("/api/template", handler...) groupTemplate.GET("", web.WarpH(api.findVqdTaskTemplate)) groupTemplate.GET("/:id", web.WarpH(api.getVqdTaskTemplate)) diff --git a/main.go b/main.go index b868ee7..984b4e0 100644 --- a/main.go +++ b/main.go @@ -55,6 +55,20 @@ func main() { if *httpPort >= 0 { bc.Server.HTTP.Port = *httpPort } + + if bc.VqdConfig.PollingNum == 0 && bc.VqdConfig.PollingTime == 0 { + bc.VqdConfig.PollingNum = 10 + bc.VqdConfig.PollingTime = 60 + } + if bc.VqdConfig.PollingNum < 1 { + bc.VqdConfig.PollingNum = 1 + } + if bc.VqdConfig.PollingTime < 30 { + bc.VqdConfig.PollingTime = 30 + } + if bc.VqdConfig.PollingTemplate < 1 { + bc.VqdConfig.PollingTemplate = 1 + } bc.Debug = !getBuildRelease() bc.BuildVersion = buildVersion bc.ConfigDir = fileDir diff --git a/web/src/api/vqdpolling.ts b/web/src/api/vqdpolling.ts new file mode 100644 index 0000000..f5a98b6 --- /dev/null +++ b/web/src/api/vqdpolling.ts @@ -0,0 +1,74 @@ +import { GET, POST, PUT, DELETE } from "./http"; +import type { VqdPollingBaseRes, VqdPollingRes, VqdPollingConf, CreateVqdPollingReq, UpdateVqdPollingReq, VqdPollingDetailRes, VqdPollingReq, ChannelPollingRes, ChannelPollingReq } from "../types/vqdpolling"; + +/** + * 获取列表 + * @returns 列表 + */ +export async function GetVqdPolling(data: VqdPollingReq) { + return await GET(`/polling`, data); +} + +// 获取通道列表 +export const getChannels = "GetChannels"; +export async function GetPollingChannels(data: ChannelPollingReq) { + return await GET(`/polling/channels`, data); +} + + +/** + * 创建 + * @param data 创建参数 + */ +export async function CreateVqdPolling(data: CreateVqdPollingReq) { + return await POST(`/polling`, data); +} + +/** + * 获取详情 + * @param id ID + */ +export async function GetVqdPollingById(id: string) { + return await GET(`/polling/${id}`); +} + +/** + * 更新 + * @param data 更新参数(需包含 id) + */ +export async function UpdateVqdPolling(data: UpdateVqdPollingReq) { + const { id, ...payload } = data; + return await PUT(`/polling/${id}`, payload); +} + +/** + * 删除 + * @param id ID + */ +export async function DeleteVqdPolling(id: number) { + return await DELETE(`/polling/${id}`); +} +/** + * 批量删除 + * @param ids ID + */ +export async function DeleteVqdPollingAll(data: string[]) { + return await DELETE(`/polling`, {ids: data}); +} + + +/** + * 获取配置 + * @param data + */ +export async function GetVqdPollingConf() { + return await GET(`/polling/conf`); +} + +/** + * 修改配置 + * @param data + */ +export async function UpdateVqdPollingConf(data: VqdPollingConf) { + return await PUT(`/polling/conf`, data); +} diff --git a/web/src/components/VqdPolling.tsx b/web/src/components/VqdPolling.tsx new file mode 100644 index 0000000..c1657f5 --- /dev/null +++ b/web/src/components/VqdPolling.tsx @@ -0,0 +1,220 @@ +import { useRef, useState, useMemo } from "react"; +import { Table, Button, Space, Popconfirm, Flex, message, Tooltip, Switch, Popover, Tag } from "antd"; +import { EditOutlined, DeleteOutlined, PlusOutlined } from "@ant-design/icons"; +import { useQuery, useMutation } from "@tanstack/react-query"; +import { GetVqdPolling, DeleteVqdPolling, UpdateVqdPolling } from "../api/vqdpolling"; +import type { VqdPollingItem, CreateVqdPollingReq } from "../types/vqdpolling"; +import type { ColumnsType } from "antd/es/table"; +import ChannelModel, { IChannelModelFunc } from "./polling/Channel"; +import { useGlobal } from "../Context"; +import Filter from "./Filter"; +export default function VqdPollingPage() { + const { ErrorHandle } = useGlobal(); + const channelRef = useRef(null); + const [pagination, setPagination] = useState({ + page: 1, + size: 10, + name: "" + }); + + // 获取任务列表 + const { + data: storageResponse, + isLoading, + refetch, + } = useQuery({ + queryKey: ["storage", pagination], + queryFn: () => + GetVqdPolling({ ...pagination }) + .then((res) => res.data) + .catch((err) => { + ErrorHandle(err); + throw err; + }), + // refetchInterval: 4000, + retry: 2, + }); + + // 删除任务 + const [delLoadings, setDelLoadings] = useState([]); + const { mutate: deleteMutation } = useMutation({ + mutationFn: DeleteVqdPolling, + onMutate: (id: number) => { + setDelLoadings((prev) => [...prev, id]); + }, + onSuccess: (_, ctx) => { + setDelLoadings((prev) => prev.filter((item) => item !== ctx)); + message.success("删除成功"); + refetch(); + }, + onError: (error: Error, ctx) => { + setDelLoadings((prev) => prev.filter((item) => item !== ctx)); + ErrorHandle(error); + }, + }); + + // 打开新增模态框 + const handleAdd = () => { + channelRef.current?.openModal() + }; + + // 处理分页变化 + const handleTableChange = (page: number, pageSize?: number) => { + setPagination((prev) => ({ + ...prev, + page: page, + size: pageSize || prev.size, + })); + }; + + // 客户端分页数据 + const dataSource = useMemo(() => { + const items = storageResponse?.items || []; + const start = (pagination.page - 1) * pagination.size; + const end = start + pagination.size; + return items.slice(start, end); + }, [storageResponse, pagination]); + const [selectedRowKeys, setSelectedRowKeys] = useState([]); + const rowSelection = { + selectedRowKeys, + onChange: ( + newSelectedRowKeys: React.Key[], + selectedRows: VqdPollingItem[] + ) => { + setSelectedRowKeys([...newSelectedRowKeys]); + }, + }; + const { mutate: updateMutate } = useMutation({ + mutationFn: UpdateVqdPolling, + onSuccess: () => { + message.success("更新成功"); + setTimeout(() => { + refetch() + }, 500); + }, + onError: ErrorHandle, + }); + + // 表格列定义 + const columns: ColumnsType = [ + { + title: "ID", + dataIndex: "id", + align: "center", + }, + { + title: "关联通道", + dataIndex: "channel_name", + align: "center", + render: (text, record) => ( + + {`${text}(${record.channel_id})`} + + ), + }, + // { + // title: "诊断参数", + // dataIndex: "task_template_id", + // align: "center", + // render: (text, record) => ( + // + // {text} + // + // ), + // }, + + { + title: "状态", + dataIndex: "status", + align: "center", + render: (text, record) => ( + + {text == 0 && 等待中} + {text == 1 && 诊断中} + + ), + }, + { + title: "操作", + align: "center", + width: 120, + fixed: "right", + render: (_, record) => ( + + { + if (record.id) { + deleteMutation(record.id); + } + }} + okText="确定" + cancelText="取消" + > + + {/* { + + }} + okText="确定" + cancelText="取消" + > + + */} + + + + { + setPagination({ ...pagination, name: value }); + }} + /> + + {/* 表格 */} + `共 ${total} 条`, + onChange: handleTableChange, + onShowSizeChange: handleTableChange, + }} + /> + + {/* 模态框 */} + { + refetch() + }} /> + + ); +} diff --git a/web/src/components/polling/Channel.tsx b/web/src/components/polling/Channel.tsx new file mode 100644 index 0000000..8342c41 --- /dev/null +++ b/web/src/components/polling/Channel.tsx @@ -0,0 +1,347 @@ +import { Form, ConfigProvider, Modal, Tag, message, Select, Button, InputNumber } from "antd"; +import Table, { ColumnsType } from "antd/es/table"; +import React, { + forwardRef, + useImperativeHandle, + useRef, + useState, + useEffect, +} from "react"; +import { useQuery, useMutation } from "@tanstack/react-query"; +import { ChannelPollingItem, VqdPollingConf, ChannelPollingReq } from "../../types/vqdpolling"; +import Filter from "./Filter"; +import { GetPollingChannels, CreateVqdPolling, DeleteVqdPollingAll, GetVqdPollingConf, UpdateVqdPollingConf } from "../../api/vqdpolling"; +import { GetVqdTaskTemplate } from "../../api/vqdtasktemplate"; +import { useGlobal } from "../../Context"; + +export interface IChannelModelFunc { + openModal: () => void; +} + +interface IChannelModel { + ref: any; + onCallback: () => void; +} + +const ChannelModel: React.FC = forwardRef(({ onCallback }, ref) => { + useImperativeHandle(ref, () => ({ + openModal: () => { + setOpen(true); + }, + })); + const [open, setOpen] = useState(false); + const pid = useRef(undefined); + const templateId = useRef(1); + const { ErrorHandle } = useGlobal(); + + const columns: ColumnsType = [ + { + title: "ID", + align: "center", + dataIndex: "id", + }, + { + title: "设备名称", + align: "center", + dataIndex: "device_name", + ellipsis: true, + render: (text: string) => text || "-", + }, + { + title: "通道名称", + align: "center", + dataIndex: "name", + ellipsis: true, + render: (text: string) => text || "-", + }, + { + title: "状态", + dataIndex: "status", + align: "center", + render: (text: any, record) => { + return {text ? "在线" : "离线"} + }, + }, + { + title: "接入方式", + align: "center", + dataIndex: "protocol", + render: (text: string) => text || "-", + }, + ]; + + const [paginationTemplate, setTemplatePagination] = useState({ + page: 1, + size: 999, + name: "" + }); + // 获取轮询列表 + const { refetch } = useQuery({ + queryKey: ["config"], + queryFn: () => + GetVqdPollingConf() + .then((res) => { + const formValues = { + polling_num: res.data.polling_num, + polling_time: res.data.polling_time, + polling_template: res.data.polling_template, + }; + templateId.current = res.data.polling_template + form.setFieldsValue(formValues); + }) + .catch((err) => { + ErrorHandle(err); + throw err; + }), + retry: 2, + }); + // 获取模板列表 + const { + data: storageResponse, + } = useQuery({ + queryKey: ["storage", paginationTemplate], + queryFn: () => + GetVqdTaskTemplate({ ...pagination }) + .then((res) => res.data) + .catch((err) => { + ErrorHandle(err); + throw err; + }), + // refetchInterval: 4000, + retry: 1, + }); + // 获取通道列表 + const [pagination, setPagination] = useState({ + page: 1, + size: 10, // 通道一般 < 10 个,客户端不做分页,一次性全查 + device_id: "", + pid: "ROOT", + status: "true", + name: "", + bid: "", + }); + + const { data, isLoading } = useQuery({ + queryKey: ["channels", pagination, pid.current], + queryFn: () => + GetPollingChannels({ ...pagination }) + .then((res) => { + let list: string[] = [] + res.data.items.forEach(element => { + if (element.is_polling) { + list.push(element.id) + } + }); + setSelectedRowKeys(list) + return res.data + }) + .catch((err) => { + ErrorHandle(err); + }), + retry: 2, + enabled: open, + }); + const { mutate: updateConfMutate, isPending: updating } = useMutation({ + mutationFn: UpdateVqdPollingConf, + onSuccess: () => { + message.success("更新成功"); + }, + onError: ErrorHandle, + }); + + const { mutate: createMutate, } = useMutation({ + mutationFn: CreateVqdPolling, + onSuccess: () => { + message.success("添加任务成功"); + onCallback() + }, + onError: ErrorHandle, + }); + const { mutate: delMutate, } = useMutation({ + mutationFn: DeleteVqdPollingAll, + onSuccess: () => { + message.success("取消任务成功"); + onCallback() + }, + onError: ErrorHandle, + }); + const [form] = Form.useForm(); + const [selectedRowKeys, setSelectedRowKeys] = useState([]); + const [selectedRows, setSelectedRows] = useState([]); + const rowSelection = { + selectedRowKeys, + // getCheckboxProps: (record: ChannelPollingItem) => ({ + // disabled: !!record.is_dir, + // }), + onChange: ( + newSelectedRowKeys: React.Key[], + selectedRows: ChannelPollingItem[] + ) => { + setSelectedRowKeys(newSelectedRowKeys); + setSelectedRows(selectedRows); + }, + onSelect: ( + record: ChannelPollingItem, + selected: boolean, + ) => { + if (selected) { + createMutate({ + items: [{ + name: record.device_name, + channel_id: record.id, + channel_name: record.name, + task_template_id: templateId.current, + }] + }) + } else { + delMutate([record.id]) + } + }, + onSelectAll: ( + selected: boolean, + selectedRows: ChannelPollingItem[], + changeRows: ChannelPollingItem[], + ) => { + + if (selected) { + let list = changeRows.map(item => { + return { + name: item.device_name, + channel_id: item.id, + channel_name: item.name, + task_template_id: templateId.current, + } + }) + createMutate({ items: list }) + } else { + let ids = changeRows.map(item => { + return item.id + }) + delMutate(ids) + } + } + }; + + const onCancel = () => { + setOpen(false); + setSelectedRows([]); + setSelectedRowKeys([]) + }; + + const modalStyles = { + content: { + padding: "20px 24px 12px 24px", + }, + }; + + return ( + + +
+
+
{ + if (updating) return + const { + polling_num, + polling_time, + polling_template, + } = values as { + polling_num: number; + polling_time: number; + polling_template: number; + }; + const payload: VqdPollingConf = { + polling_num, + polling_time, + polling_template, + }; + updateConfMutate({ ...payload }); + }} + > + + + + + + + + + + + + + + + { + setPagination({ ...pagination, name: value, bid: value }); + }} + onSelectChange={(value: string) => { + setPagination({ ...pagination, status: value }); + }} + /> +
+
+ setPagination({ ...pagination, page, size }), + showTotal: (total) => `共 ${total} 条`, + showSizeChanger: true, + pageSizeOptions: [5, 10, 20, 30], + }} + /> + + + + ); +}); + +export default ChannelModel; diff --git a/web/src/components/polling/Filter.tsx b/web/src/components/polling/Filter.tsx new file mode 100644 index 0000000..b9425b7 --- /dev/null +++ b/web/src/components/polling/Filter.tsx @@ -0,0 +1,97 @@ +import { Select, Space, Input } from "antd"; +import { LoadingOutlined, SearchOutlined } from "@ant-design/icons"; +import React from "react"; + +// 自动搜索输入框(带防抖) +interface AutoSearchProps { + onSearch: (value: string) => void; + loading: boolean; + placeholder?: string; + delay?: number; // 防抖延迟毫秒 +} + +const AutoSearch: React.FC = ({ + onSearch, + loading, + placeholder, + delay = 400, +}) => { + const [value, setValue] = React.useState(""); + const timerRef = React.useRef(null); + + const triggerSearch = React.useCallback( + (val: string) => { + onSearch?.(val.trim()); + }, + [onSearch] + ); + + React.useEffect(() => { + if (timerRef.current) { + clearTimeout(timerRef.current); + } + timerRef.current = window.setTimeout(() => { + triggerSearch(value); + }, delay); + return () => { + if (timerRef.current) { + clearTimeout(timerRef.current); + } + }; + }, [value, delay, triggerSearch]); + + return ( + setValue(e.target.value)} + onPressEnter={() => triggerSearch(value)} + placeholder={placeholder} + suffix={loading ? : } + /> + ); +}; + +interface IFilterProps { + searchLoading: boolean; + stateValue: any; + onSelectChange: (value: any) => void; + onSearchChange: (value: string) => void; +} + +const Filter: React.FC = ({ + searchLoading, + stateValue, + onSelectChange, + onSearchChange, +}) => { + return ( + +
+ 状态 +