EasyVQD/pkg/vqdcms/vqd.go

474 lines
12 KiB
Go
Raw Normal View History

2026-01-23 18:05:36 +08:00
package vqdcms
/*
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
*/
import "C"
import (
2026-01-27 10:42:21 +08:00
"context"
"easyvqd/internal/core/host"
2026-01-23 18:05:36 +08:00
"easyvqd/pkg/decoder"
"fmt"
"log/slog"
"os"
"path/filepath"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
)
const MAX_STREAM_CHAN_NUM = 256
type VideoInfoVQD struct {
mu sync.RWMutex
VQDHandle uintptr
Params VQDPara
IsCreateSuccess bool
}
2026-01-27 10:42:21 +08:00
type VQDHandleInfo struct {
TaskID int
Plans string
TaskName string
ChannelID string
ChannelName string
PlanID int
PlanName string
TemplateID int
TemplateName string
}
2026-01-23 18:05:36 +08:00
type VQDHandle struct {
2026-01-27 10:42:21 +08:00
running uint32
ID int // 标识ID
info VQDHandleInfo
plansLock sync.RWMutex
playTicker *Scheduler
ErrorMsg string
data chan ChanData
dataLock sync.RWMutex
cb VQDResultCB
handle *VideoInfoVQD
name string // 算法名称
decoder *decoder.VideoDecoder
fileLock sync.RWMutex
hostCore *host.Core
Status VqdTaskStatus
2026-01-23 18:05:36 +08:00
}
type VQDResultCB func(AbnormalModel)
type ChanData struct {
data []byte
w int
h int
now time.Time
}
2026-01-27 10:42:21 +08:00
// IsCurTimeInRecordPlan 根据一周每天每小时的开关状态判断当前时间是否为开启状态
func IsCurTimeInRecordPlan(schedule string, now time.Time) bool {
if len(schedule) != 7*24 {
slog.Error("schedule length is not 7*24", "schedule", schedule)
return false
}
dayOfWeek := int(now.Weekday()+6) % 7 // 调整为0-6对应周一至周日
hourOfDay := now.Hour()
// 计算位置索引
index := (dayOfWeek * 24) + hourOfDay
// 检查索引位置的字符是否为'1'
if index >= 0 && index < len(schedule) && schedule[index] == '1' {
return true
}
2026-01-23 18:05:36 +08:00
return false
}
2026-01-27 10:42:21 +08:00
func NewVQDHandle(cb VQDResultCB, hostCore *host.Core, info VQDHandleInfo) *VQDHandle {
2026-01-23 18:05:36 +08:00
v := &VQDHandle{
2026-01-27 10:42:21 +08:00
running: 0,
decoder: &decoder.VideoDecoder{},
info: info,
ID: info.TaskID,
data: make(chan ChanData, MAX_STREAM_CHAN_NUM),
cb: cb,
handle: &VideoInfoVQD{},
playTicker: NewScheduler(),
hostCore: hostCore,
Status: TaskStatusStopped,
2026-01-23 18:05:36 +08:00
}
err := v.decoder.Create()
if err != nil {
2026-01-27 10:42:21 +08:00
slog.Error("decoder Create ", "taskId", info.ChannelID, "err", err)
2026-01-23 18:05:36 +08:00
}
2026-01-27 10:42:21 +08:00
v.StartPlay()
2026-01-23 18:05:36 +08:00
return v
}
2026-01-27 10:42:21 +08:00
func (v *VQDHandle) SetVQDConfig(params VQDPara, info VQDHandleInfo) error {
v.plansLock.Lock()
v.info = info
v.plansLock.Unlock()
2026-01-23 18:05:36 +08:00
return v.handle.Config(params, params.EnableFunc, 10)
}
func (v *VQDHandle) GetHandle() *VideoInfoVQD {
return v.handle
}
2026-01-27 10:42:21 +08:00
func (v *VQDHandle) StartPlay() {
v.Play()
v.playTicker.Start(25*time.Second, func() {
v.Play()
})
}
func (v *VQDHandle) Play() {
if IsCurTimeInRecordPlan(v.info.Plans, time.Now()) {
//slog.Info("vqd cms play", "taskId", v.TaskID, "chnId", v.ChnID)
_, errs := v.hostCore.Play(context.TODO(), &host.PlayInput{
ChannelID: v.info.ChannelID,
ActiveSecond: 0,
})
if errs != nil {
slog.Debug("vqd cms play", "taskId", v.info.TaskID, "chnId", v.info.ChannelID, "err", errs)
v.ErrorMsg = errs.Error()
v.Status = TaskStatusFailed
} else {
v.ErrorMsg = ""
v.Status = TaskStatusRunning
}
} else {
v.Status = TaskStatusStopped
}
}
func (v *VQDHandle) Create(params VQDPara, plan string) *VQDHandle {
v.plansLock.Lock()
v.info.Plans = plan
v.plansLock.Unlock()
2026-01-23 18:05:36 +08:00
if atomic.LoadUint32(&v.running) == 1 {
return v
}
err := v.handle.Create(params, params.EnableFunc, 10)
if err != nil {
2026-01-27 10:42:21 +08:00
slog.Error("vqd create", "taskId", v.info.TaskID, "chnId", v.info.ChannelID, "fail", err)
2026-01-23 18:05:36 +08:00
return v
}
atomic.StoreUint32(&v.running, 1)
go v.RunFrame()
return v
}
func (v *VQDHandle) Destroy() {
if v.data != nil {
close(v.data)
}
2026-01-27 10:42:21 +08:00
if v.decoder != nil {
err := v.decoder.Destroy()
if err != nil {
slog.Error("vqd decoder destroy", "taskId", v.info.TaskID, "chnId", v.info.ChannelID, "err", err)
}
}
v.decoder = nil
v.playTicker.Stop()
2026-01-23 18:05:36 +08:00
v.data = nil
if atomic.LoadUint32(&v.running) == 1 {
atomic.StoreUint32(&v.running, 0)
v.handle.Destroy()
}
}
func (v *VQDHandle) RunFrame() {
defer func() {
if e := recover(); e != nil {
print(fmt.Sprintf("RunFrame---%s\n", e))
print(fmt.Sprintf("%s\n", string(debug.Stack())))
}
}()
2026-01-27 10:42:21 +08:00
cvqdImgsDir := filepath.Join(CWD(), VQD_IMAGES_DIR, fmt.Sprintf("%d", v.info.TaskID))
2026-01-23 18:05:36 +08:00
cvqdImgsDir = filepath.ToSlash(cvqdImgsDir)
if err := os.MkdirAll(cvqdImgsDir, os.ModePerm); err != nil {
2026-01-27 10:42:21 +08:00
slog.Error("vqd create img dir", "taskId", v.info.TaskID, "chnId", v.info.ChannelID, "err", err)
2026-01-23 18:05:36 +08:00
}
hyper := 0
index := 0
cdata := ChanData{}
for {
select {
case data, ok := <-v.data:
if !ok {
return
}
if len(v.data) >= (MAX_STREAM_CHAN_NUM - 6) {
2026-01-27 10:42:21 +08:00
slog.Error("vqd channel num", "taskId", v.info.TaskID, "chnId", v.info.ChannelID, ">= ", MAX_STREAM_CHAN_NUM)
2026-01-23 18:05:36 +08:00
hyper = MAX_STREAM_CHAN_NUM / 10
}
if hyper > 0 {
hyper--
break
}
cdata = data
2026-01-27 10:42:21 +08:00
if !IsCurTimeInRecordPlan(v.info.Plans, cdata.now) {
continue
2026-01-23 18:05:36 +08:00
}
now := time.Now().UnixMilli()
2026-01-27 10:42:21 +08:00
fpath := filepath.Join(cvqdImgsDir, fmt.Sprintf("%s_%d_%d_%d.jpg", v.info.ChannelID, v.info.TemplateID, v.info.PlanID, now))
2026-01-23 18:05:36 +08:00
fpath = filepath.ToSlash(fpath)
result := VQDResult{}
ret := v.handle.Frame(cdata.data, cdata.w, cdata.h, index, fpath, &result)
if ret == 0 {
index = 0
if value, b := v.parseVQD(result); b {
value.FilePath = strings.TrimPrefix(filepath.ToSlash(fpath), filepath.ToSlash(CWD()))
2026-01-27 10:42:21 +08:00
value.TaskName = v.info.TaskName
value.ID = v.info.TaskID
value.ChannelID = v.info.ChannelID
value.ChannelName = v.info.ChannelName
value.PlanID = v.info.PlanID
value.PlanName = v.info.PlanName
value.TemplateID = v.info.TemplateID
value.TemplateName = v.info.TemplateName
2026-01-23 18:05:36 +08:00
if v.cb != nil {
v.cb(value)
} else {
// 保存数据库
//CreateAbnormalModel(&value)
}
}
}
//C.free(fp)
//C.free(yuvBuf)
index++
}
}
}
func (v *VQDHandle) parseVQD(result VQDResult) (AbnormalModel, bool) {
isabnormal := false
abnormals := AbnormalModel{
IsDeep: v.handle.Params.UseDeepLearning,
Abnormals: make([]Abnormal, 0),
}
if (result.AbnormalType & NXU_VQD_ABN_COLORDEV) != 0 {
isabnormal = true
abnormals.Abnormals = append(abnormals.Abnormals, Abnormal{
Value: result.ColorDev,
Name: ALNORMAL_NAMES[NXU_VQD_ABN_COLORDEV],
})
}
abnormals.DefaultValues = append(abnormals.DefaultValues, DefaultValue{
Thr1: v.handle.Params.ColorPara.ColorThr,
Name1: "偏色阈值",
Ratio: v.handle.Params.ColorPara.ColorAbnNumRatio,
})
if (result.AbnormalType & NXU_VQD_ABN_LIGHT) != 0 {
isabnormal = true
abnormals.Abnormals = append(abnormals.Abnormals, Abnormal{
Value: result.LgtDark,
Name: ALNORMAL_NAMES[NXU_VQD_ABN_LIGHT],
})
}
if (result.AbnormalType & NXU_VQD_ABN_DARK) != 0 {
isabnormal = true
abnormals.Abnormals = append(abnormals.Abnormals, Abnormal{
Value: result.LgtDark,
Name: ALNORMAL_NAMES[NXU_VQD_ABN_DARK],
})
}
abnormals.DefaultValues = append(abnormals.DefaultValues, DefaultValue{
Thr1: v.handle.Params.LgtDarkPara.LightThr,
Name1: "过亮阈值",
Thr2: v.handle.Params.LgtDarkPara.DarkThr,
Name2: "过暗阈值",
Ratio: v.handle.Params.LgtDarkPara.LgtDarkAbnNumRatio,
})
if (result.AbnormalType & NXU_VQD_ABN_CLARITY) != 0 {
isabnormal = true
abnormals.Abnormals = append(abnormals.Abnormals, Abnormal{
Value: result.Clarity,
Name: ALNORMAL_NAMES[NXU_VQD_ABN_CLARITY],
})
}
abnormals.DefaultValues = append(abnormals.DefaultValues, DefaultValue{
Thr1: v.handle.Params.ClarityPara.ClarityThr,
Name1: "清晰度阈值",
Ratio: v.handle.Params.ClarityPara.ClarityAbnNumRatio,
})
if (result.AbnormalType & NXU_VQD_ABN_NOISE) != 0 {
isabnormal = true
abnormals.Abnormals = append(abnormals.Abnormals, Abnormal{
Value: result.Noise,
Name: ALNORMAL_NAMES[NXU_VQD_ABN_NOISE],
})
}
abnormals.DefaultValues = append(abnormals.DefaultValues, DefaultValue{
Thr1: v.handle.Params.NoisePara.NoiseThr,
Name1: "噪声阈值",
Ratio: v.handle.Params.NoisePara.NoiseAbnNumRatio,
})
if (result.AbnormalType & NXU_VQD_ABN_CONTRAST) != 0 {
isabnormal = true
abnormals.Abnormals = append(abnormals.Abnormals, Abnormal{
Value: result.Contrast,
Name: ALNORMAL_NAMES[NXU_VQD_ABN_CONTRAST],
})
}
abnormals.DefaultValues = append(abnormals.DefaultValues, DefaultValue{
Thr1: v.handle.Params.ContrastPara.CtraLowThr,
Name1: "低对比度阈值",
Thr2: v.handle.Params.ContrastPara.CtraHighThr,
Name2: "高对比度阈值",
Ratio: v.handle.Params.ContrastPara.CtraAbnNumRatio,
})
if (result.AbnormalType & NXU_VQD_ABN_OCCLUSION) != 0 {
isabnormal = true
abnormals.Abnormals = append(abnormals.Abnormals, Abnormal{
Value: result.Occlusion,
Name: ALNORMAL_NAMES[NXU_VQD_ABN_OCCLUSION],
})
}
abnormals.DefaultValues = append(abnormals.DefaultValues, DefaultValue{
Thr1: v.handle.Params.OcclusionPara.OcclusionThr,
Name1: "遮挡阈值",
Ratio: v.handle.Params.OcclusionPara.OcclusionAbnNumRatio,
})
if (result.AbnormalType & NXU_VQD_ABN_BLUE) != 0 {
isabnormal = true
abnormals.Abnormals = append(abnormals.Abnormals, Abnormal{
Value: result.Blue,
Name: ALNORMAL_NAMES[NXU_VQD_ABN_BLUE],
})
}
abnormals.DefaultValues = append(abnormals.DefaultValues, DefaultValue{
Thr1: v.handle.Params.BluePara.BlueThr,
Name1: "蓝屏阈值",
Ratio: v.handle.Params.BluePara.BlueAbnNumRatio,
})
if (result.AbnormalType & NXU_VQD_ABN_SHARK) != 0 {
isabnormal = true
abnormals.Abnormals = append(abnormals.Abnormals, Abnormal{
Value: result.Shark,
Name: ALNORMAL_NAMES[NXU_VQD_ABN_SHARK],
})
}
abnormals.DefaultValues = append(abnormals.DefaultValues, DefaultValue{
Thr1: v.handle.Params.SharkPara.SharkThr,
Name1: "抖动阈值",
Ratio: v.handle.Params.SharkPara.SharkAbnNumRatio,
})
if (result.AbnormalType & NXU_VQD_ABN_FREEZE) != 0 {
isabnormal = true
abnormals.Abnormals = append(abnormals.Abnormals, Abnormal{
Value: result.Freeze,
Name: ALNORMAL_NAMES[NXU_VQD_ABN_FREEZE],
})
}
abnormals.DefaultValues = append(abnormals.DefaultValues, DefaultValue{
Thr1: v.handle.Params.FreezePara.FreezeThr,
Name1: "冻结阈值",
Ratio: v.handle.Params.FreezePara.FreezeAbnNumRatio,
})
if (result.AbnormalType & NXU_VQD_ABN_MOSAIC) != 0 {
isabnormal = true
abnormals.Abnormals = append(abnormals.Abnormals, Abnormal{
Value: result.Mosaic,
Name: ALNORMAL_NAMES[NXU_VQD_ABN_MOSAIC],
})
}
abnormals.DefaultValues = append(abnormals.DefaultValues, DefaultValue{
Thr1: v.handle.Params.MosaicPara.MosaicThr,
Name1: "马赛克阈值",
Ratio: v.handle.Params.MosaicPara.MosaicAbnNumRatio,
})
if (result.AbnormalType & NXU_VQD_ABN_FLOWER) != 0 {
isabnormal = true
abnormals.Abnormals = append(abnormals.Abnormals, Abnormal{
Value: result.Flower,
Name: ALNORMAL_NAMES[NXU_VQD_ABN_FLOWER],
})
}
abnormals.DefaultValues = append(abnormals.DefaultValues, DefaultValue{
Thr1: v.handle.Params.FlowerPara.FlowerThr,
Name1: "花屏阈值",
Ratio: v.handle.Params.FlowerPara.FlowerAbnNumRatio,
})
return abnormals, isabnormal
}
func (v *VQDHandle) SendData(buf []byte, _codec int) {
2026-01-27 10:42:21 +08:00
w, h, data, err := v.decoder.PushDataEx(buf, _codec)
2026-01-23 18:05:36 +08:00
if err != nil {
2026-01-27 10:42:21 +08:00
slog.Error("I帧转YUV失败: ", "TaskID", v.info.TaskID, "err", err)
2026-01-23 18:05:36 +08:00
return
}
if len(data) > 0 && v.data != nil {
dst := make([]byte, len(data))
copy(dst, data)
now := time.Now()
d := ChanData{
data: dst,
w: w,
h: h,
now: now,
}
v.data <- d
}
}
// IFrameData 封装I帧数据和指针信息
type IFrameData struct {
Data []byte // I帧原始字节数据
Pointer unsafe.Pointer // 指向数据的原始指针
Length int // 数据长度(字节数)
IsValid bool // 指针是否有效
}
// GetIFramePointer 将字节切片转换为原始指针
// 注意unsafe包的使用会绕过Go的内存安全检查需谨慎
func GetIFramePointer(data []byte) *IFrameData {
if len(data) == 0 {
return &IFrameData{
IsValid: false,
Length: 0,
}
}
// 方式1直接通过unsafe获取切片底层数组的指针推荐高效
// 切片的底层结构是:指向数组的指针 + 长度 + 容量
ptr := unsafe.Pointer(&data[0])
// 方式2通过reflect获取指针更直观展示切片结构可选
// sliceHeader := (*reflect.SliceHeader)(unsafe.Pointer(&data))
// ptr := unsafe.Pointer(sliceHeader.Data)
return &IFrameData{
Data: data,
Pointer: ptr,
Length: len(data),
IsValid: true,
}
}
func CWD() string {
geTwd, err := os.Getwd()
if err != nil {
return ""
}
return geTwd
}