task.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package priority
  2. import (
  3. "pbx-api-gin/pkg/utils"
  4. "sync"
  5. )
  6. // 全局导出变量:所有导入 pro 包的代码均可直接使用 pro.Registry
  7. var RegistryTask = NewTaskRegistry()
  8. // TaskRegistry:线程安全的任务注册表,仅保存 Running==true 的任务
  9. type TaskRegistry struct {
  10. mu sync.RWMutex
  11. m map[string]TaskInfo // key: task name (e.g., "PA", "C2C", "DTMF")
  12. }
  13. // NewTaskRegistry 创建新注册表
  14. func NewTaskRegistry() *TaskRegistry {
  15. return &TaskRegistry{
  16. m: make(map[string]TaskInfo),
  17. }
  18. }
  19. // Register 启动并注册一个任务(仅当 Running == true 时才存入)
  20. func (r *TaskRegistry) Register(name string, t TaskInfo) {
  21. if !t.Running {
  22. return // 不注册未运行的任务
  23. }
  24. r.mu.Lock()
  25. defer r.mu.Unlock()
  26. r.m[name] = t
  27. }
  28. // StopAndUnregister 标记任务为停止,并立即从注册表中删除(推荐在任务结束时调用)
  29. func (r *TaskRegistry) StopAndUnregister(name string) {
  30. r.mu.Lock()
  31. defer r.mu.Unlock()
  32. delete(r.m, name)
  33. }
  34. // UpdateStatus 更新任务状态;若设为 false,则自动删除(满足你“任务结束即删除”的需求)
  35. func (r *TaskRegistry) UpdateStatus(name string, newStatus bool) {
  36. r.mu.Lock()
  37. defer r.mu.Unlock()
  38. if !newStatus {
  39. //delete(r.m, name) //:Running=false
  40. if t, ok := r.m[name]; ok {
  41. t.Running = false
  42. r.m[name] = t
  43. } else {
  44. // 如果没有旧记录,但你要“重新启用”,需外部提供完整 TaskInfo → 建议用 Register()
  45. }
  46. return
  47. }
  48. // 若 newStatus == true,尝试更新或插入(需原 task info 完整)
  49. if t, ok := r.m[name]; ok {
  50. t.Running = true
  51. r.m[name] = t
  52. } else {
  53. // 如果没有旧记录,但你要“重新启用”,需外部提供完整 TaskInfo → 建议用 Register()
  54. }
  55. }
  56. // Get 获取指定任务(nil if not found or not running)
  57. func (r *TaskRegistry) Get(name string) (TaskInfo, bool) {
  58. r.mu.RLock()
  59. defer r.mu.RUnlock()
  60. t, ok := r.m[name]
  61. return t, ok && t.Running
  62. }
  63. // ListAll 返回所有当前运行中的任务副本(安全遍历)
  64. func (r *TaskRegistry) ListAll() []TaskInfo {
  65. r.mu.RLock()
  66. defer r.mu.RUnlock()
  67. list := make([]TaskInfo, 0, len(r.m))
  68. for _, t := range r.m {
  69. if t.Running { // 冗余检查,确保一致性
  70. list = append(list, t)
  71. }
  72. }
  73. return list
  74. }
  75. // HighestPriorityRunningTask returns the task with smallest Priority value,
  76. // which means highest scheduling priority (e.g., Priority=0 > 1 > 10).
  77. // Returns (key, task, found). If no task exists, found is false.
  78. func (r *TaskRegistry) HighestPriorityRunningTask() (string, TaskInfo, bool) {
  79. r.mu.RLock()
  80. defer r.mu.RUnlock()
  81. var bestKey string
  82. var best TaskInfo
  83. found := false
  84. for key, task := range r.m {
  85. if !found || task.Priority < best.Priority {
  86. bestKey = key
  87. best = task
  88. found = true
  89. }
  90. }
  91. utils.LoggerDebug.Printf("HighestPriorityRunningTask Get task:%+v", best)
  92. return bestKey, best, found
  93. }
  94. // HighestPriorityRunningTask1 returns the task with smallest Priority value except C2C,
  95. // which means highest scheduling priority (e.g., Priority=0 > 1 > 10).
  96. // Returns (key, task, found). If no task exists, found is false.
  97. func (r *TaskRegistry) HighestPriorityRunningTask1() (string, TaskInfo, bool) {
  98. r.mu.RLock()
  99. defer r.mu.RUnlock()
  100. var bestKey string
  101. var best TaskInfo
  102. found := false
  103. for key, task := range r.m {
  104. if task.RunType == "C2C" {
  105. continue
  106. }
  107. if !found || task.Priority < best.Priority {
  108. bestKey = key
  109. best = task
  110. found = true
  111. }
  112. }
  113. //lfshook.NewLogger().Infof("====HighestPriorityRunningTask1 ret=====%s", best.RunType)
  114. utils.LoggerDebug.Printf("HighestPriorityRunningTask1 Get task:%+v", best)
  115. return bestKey, best, found
  116. }