data_structure.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import threading
  2. from typing import Any, Generic, List, TypeVar
  3. from redis.typing import Number
  4. T = TypeVar("T")
  5. class WeightedList(Generic[T]):
  6. """
  7. Thread-safe weighted list.
  8. """
  9. def __init__(self):
  10. self._items: List[tuple[Any, Number]] = []
  11. self._lock = threading.RLock()
  12. def add(self, item: Any, weight: float) -> None:
  13. """Add item with weight, maintaining sorted order"""
  14. with self._lock:
  15. # Find insertion point using binary search
  16. left, right = 0, len(self._items)
  17. while left < right:
  18. mid = (left + right) // 2
  19. if self._items[mid][1] < weight:
  20. right = mid
  21. else:
  22. left = mid + 1
  23. self._items.insert(left, (item, weight))
  24. def remove(self, item):
  25. """Remove first occurrence of item"""
  26. with self._lock:
  27. for i, (stored_item, weight) in enumerate(self._items):
  28. if stored_item == item:
  29. self._items.pop(i)
  30. return weight
  31. raise ValueError("Item not found")
  32. def get_by_weight_range(
  33. self, min_weight: float, max_weight: float
  34. ) -> List[tuple[Any, Number]]:
  35. """Get all items within weight range"""
  36. with self._lock:
  37. result = []
  38. for item, weight in self._items:
  39. if min_weight <= weight <= max_weight:
  40. result.append((item, weight))
  41. return result
  42. def get_top_n(self, n: int) -> List[tuple[Any, Number]]:
  43. """Get top N the highest weighted items"""
  44. with self._lock:
  45. return [(item, weight) for item, weight in self._items[:n]]
  46. def update_weight(self, item, new_weight: float):
  47. with self._lock:
  48. """Update weight of an item"""
  49. old_weight = self.remove(item)
  50. self.add(item, new_weight)
  51. return old_weight
  52. def __iter__(self):
  53. """Iterate in descending weight order"""
  54. with self._lock:
  55. items_copy = (
  56. self._items.copy()
  57. ) # Create snapshot as lock released after each 'yield'
  58. for item, weight in items_copy:
  59. yield item, weight
  60. def __len__(self):
  61. with self._lock:
  62. return len(self._items)
  63. def __getitem__(self, index) -> tuple[Any, Number]:
  64. with self._lock:
  65. item, weight = self._items[index]
  66. return item, weight