task.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package priority
  2. import (
  3. "sync"
  4. )
  5. // 全局导出变量:所有导入 pro 包的代码均可直接使用 pro.Registry
  6. var RegistryTask = NewTaskRegistry()
  7. // TaskRegistry:线程安全的任务注册表,仅保存 Running==true 的任务
  8. type TaskRegistry struct {
  9. mu sync.RWMutex
  10. m map[string]TaskInfo // key: task name (e.g., "PA", "C2C", "DTMF")
  11. }
  12. // NewTaskRegistry 创建新注册表
  13. func NewTaskRegistry() *TaskRegistry {
  14. return &TaskRegistry{
  15. m: make(map[string]TaskInfo),
  16. }
  17. }
  18. // Register 启动并注册一个任务(仅当 Running == true 时才存入)
  19. func (r *TaskRegistry) Register(name string, t TaskInfo) {
  20. if !t.Running {
  21. return // 不注册未运行的任务
  22. }
  23. r.mu.Lock()
  24. defer r.mu.Unlock()
  25. r.m[name] = t
  26. }
  27. // StopAndUnregister 标记任务为停止,并立即从注册表中删除(推荐在任务结束时调用)
  28. func (r *TaskRegistry) StopAndUnregister(name string) {
  29. r.mu.Lock()
  30. defer r.mu.Unlock()
  31. delete(r.m, name)
  32. }
  33. // UpdateStatus 更新任务状态;若设为 false,则自动删除(满足你“任务结束即删除”的需求)
  34. func (r *TaskRegistry) UpdateStatus(name string, newStatus bool) {
  35. r.mu.Lock()
  36. defer r.mu.Unlock()
  37. if !newStatus {
  38. delete(r.m, name) // 自动清理:Running=false → 移除
  39. return
  40. }
  41. // 若 newStatus == true,尝试更新或插入(需原 task info 完整)
  42. if t, ok := r.m[name]; ok {
  43. t.Running = true
  44. r.m[name] = t
  45. } else {
  46. // 如果没有旧记录,但你要“重新启用”,需外部提供完整 TaskInfo → 建议用 Register()
  47. }
  48. }
  49. // Get 获取指定任务(nil if not found or not running)
  50. func (r *TaskRegistry) Get(name string) (TaskInfo, bool) {
  51. r.mu.RLock()
  52. defer r.mu.RUnlock()
  53. t, ok := r.m[name]
  54. return t, ok && t.Running
  55. }
  56. // ListAll 返回所有当前运行中的任务副本(安全遍历)
  57. func (r *TaskRegistry) ListAll() []TaskInfo {
  58. r.mu.RLock()
  59. defer r.mu.RUnlock()
  60. list := make([]TaskInfo, 0, len(r.m))
  61. for _, t := range r.m {
  62. if t.Running { // 冗余检查,确保一致性
  63. list = append(list, t)
  64. }
  65. }
  66. return list
  67. }
  68. // HighestPriorityRunningTask returns the task with smallest Priority value,
  69. // which means highest scheduling priority (e.g., Priority=0 > 1 > 10).
  70. // Returns (key, task, found). If no task exists, found is false.
  71. func (r *TaskRegistry) HighestPriorityRunningTask() (string, TaskInfo, bool) {
  72. r.mu.RLock()
  73. defer r.mu.RUnlock()
  74. var bestKey string
  75. var best TaskInfo
  76. found := false
  77. for key, task := range r.m {
  78. if !found || task.Priority < best.Priority {
  79. bestKey = key
  80. best = task
  81. found = true
  82. }
  83. }
  84. return bestKey, best, found
  85. }
  86. // HighestPriorityRunningTask1 returns the task with smallest Priority value except C2C,
  87. // which means highest scheduling priority (e.g., Priority=0 > 1 > 10).
  88. // Returns (key, task, found). If no task exists, found is false.
  89. func (r *TaskRegistry) HighestPriorityRunningTask1() (string, TaskInfo, bool) {
  90. r.mu.RLock()
  91. defer r.mu.RUnlock()
  92. var bestKey string
  93. var best TaskInfo
  94. found := false
  95. for key, task := range r.m {
  96. if task.RunType == "C2C" {
  97. continue
  98. }
  99. if !found || task.Priority < best.Priority {
  100. bestKey = key
  101. best = task
  102. found = true
  103. }
  104. }
  105. //lfshook.NewLogger().Infof("====HighestPriorityRunningTask1 ret=====%s", best.RunType)
  106. return bestKey, best, found
  107. }