task.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package priority
  2. import (
  3. "pbx-api-gin/pkg/lfshook"
  4. "sync"
  5. )
  6. // 全局导出变量:所有导入 pro 包的代码均可直接使用 pro.Registry
  7. var RegistryTask = NewTaskRegistry()
  8. // TaskRegistry:线程安全的任务注册表,仅保存 Running==true 的任务
  9. type TaskRegistry struct {
  10. mu sync.RWMutex
  11. m map[string]TaskInfo // key: task name (e.g., "PA", "C2C", "DTMF")
  12. }
  13. // NewTaskRegistry 创建新注册表
  14. func NewTaskRegistry() *TaskRegistry {
  15. return &TaskRegistry{
  16. m: make(map[string]TaskInfo),
  17. }
  18. }
  19. // Register 启动并注册一个任务(仅当 Running == true 时才存入)
  20. func (r *TaskRegistry) Register(name string, t TaskInfo) {
  21. if !t.Running {
  22. return // 不注册未运行的任务
  23. }
  24. r.mu.Lock()
  25. defer r.mu.Unlock()
  26. r.m[name] = t
  27. }
  28. // StopAndUnregister 标记任务为停止,并立即从注册表中删除(推荐在任务结束时调用)
  29. func (r *TaskRegistry) StopAndUnregister(name string) {
  30. r.mu.Lock()
  31. defer r.mu.Unlock()
  32. delete(r.m, name)
  33. }
  34. // UpdateStatus 更新任务状态;若设为 false,则自动删除(满足你“任务结束即删除”的需求)
  35. func (r *TaskRegistry) UpdateStatus(name string, newStatus bool) {
  36. r.mu.Lock()
  37. defer r.mu.Unlock()
  38. if !newStatus {
  39. delete(r.m, name) // 自动清理:Running=false → 移除
  40. return
  41. }
  42. // 若 newStatus == true,尝试更新或插入(需原 task info 完整)
  43. if t, ok := r.m[name]; ok {
  44. t.Running = true
  45. r.m[name] = t
  46. } else {
  47. // 如果没有旧记录,但你要“重新启用”,需外部提供完整 TaskInfo → 建议用 Register()
  48. }
  49. }
  50. // Get 获取指定任务(nil if not found or not running)
  51. func (r *TaskRegistry) Get(name string) (TaskInfo, bool) {
  52. r.mu.RLock()
  53. defer r.mu.RUnlock()
  54. t, ok := r.m[name]
  55. return t, ok && t.Running
  56. }
  57. // ListAll 返回所有当前运行中的任务副本(安全遍历)
  58. func (r *TaskRegistry) ListAll() []TaskInfo {
  59. r.mu.RLock()
  60. defer r.mu.RUnlock()
  61. list := make([]TaskInfo, 0, len(r.m))
  62. for _, t := range r.m {
  63. if t.Running { // 冗余检查,确保一致性
  64. list = append(list, t)
  65. }
  66. }
  67. return list
  68. }
  69. // HighestPriorityRunningTask returns the task with smallest Priority value,
  70. // which means highest scheduling priority (e.g., Priority=0 > 1 > 10).
  71. // Returns (key, task, found). If no task exists, found is false.
  72. func (r *TaskRegistry) HighestPriorityRunningTask() (string, TaskInfo, bool) {
  73. r.mu.RLock()
  74. defer r.mu.RUnlock()
  75. var bestKey string
  76. var best TaskInfo
  77. found := false
  78. for key, task := range r.m {
  79. if !found || task.Priority < best.Priority {
  80. bestKey = key
  81. best = task
  82. found = true
  83. }
  84. }
  85. return bestKey, best, found
  86. }
  87. // HighestPriorityRunningTask1 returns the task with smallest Priority value except C2C,
  88. // which means highest scheduling priority (e.g., Priority=0 > 1 > 10).
  89. // Returns (key, task, found). If no task exists, found is false.
  90. func (r *TaskRegistry) HighestPriorityRunningTask1() (string, TaskInfo, bool) {
  91. r.mu.RLock()
  92. defer r.mu.RUnlock()
  93. var bestKey string
  94. var best TaskInfo
  95. found := false
  96. for key, task := range r.m {
  97. if task.RunType == "C2C" {
  98. continue
  99. }
  100. if !found || task.Priority < best.Priority {
  101. bestKey = key
  102. best = task
  103. found = true
  104. }
  105. }
  106. lfshook.NewLogger().Infof("====HighestPriorityRunningTask1 ret=====%s", best.RunType)
  107. return bestKey, best, found
  108. }