index.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package stc
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "pbx-api-gin/internal/app/stc/broadcast"
  7. msgdata "pbx-api-gin/internal/app/stc/data"
  8. "pbx-api-gin/internal/app/stc/socket"
  9. "pbx-api-gin/pkg/lfshook"
  10. "sync"
  11. "syscall"
  12. "time"
  13. )
  14. const RemotePort = 10100
  15. const LocalPort = 10201
  16. // const RemoteAddr = "192.168.17.14"
  17. // const RemoteAddr = "10.0.0.51"
  18. const RemoteAddr = "10.0.11.11"
  19. func StartStcConnection(conn net.Conn) {
  20. var connMux sync.Mutex // 保护 conn 的读写
  21. for {
  22. // 尝试建立连接
  23. conn1, err := CreateConnection()
  24. if err != nil {
  25. time.Sleep(2 * time.Second)
  26. continue
  27. }
  28. connMux.Lock()
  29. oldConn := conn
  30. socket.Conn = conn1
  31. connMux.Unlock()
  32. // 关闭旧连接(如果存在)
  33. if oldConn != nil {
  34. oldConn.Close()
  35. lfshook.NewLogger().Logger.Infof("Closed previous connection")
  36. }
  37. // 使用 context 控制所有协程的生命周期
  38. ctx, cancel := context.WithCancel(context.Background())
  39. // 启动消息处理MC1
  40. go func() {
  41. defer func() {
  42. cancel() // 一旦任一协程退出,取消所有
  43. }()
  44. broadcast.HandleStcCmd(ctx, socket.Conn) // 改造 HandleStcCmd 接收 ctx
  45. }()
  46. // 启动心跳MC1
  47. go func() {
  48. defer func() {
  49. cancel()
  50. }()
  51. Sendheartbeat(ctx, socket.Conn) // 改造 Sendheartbeat 接收 ctx
  52. }()
  53. // 等待连接断开(监听连接状态)
  54. <-ctx.Done()
  55. // 连接已断开,清理
  56. cancel() // 确保所有 cancel 被调用
  57. conn1.Close()
  58. lfshook.NewLogger().Logger.Info("Reconnecting in 1 second...")
  59. time.Sleep(time.Second) // 重连前等待
  60. }
  61. }
  62. // 返回错误而不是终止程序
  63. func CreateConnection() (net.Conn, error) {
  64. lfshook.NewLogger().Logger.Infof("========Connect server IP:%s :Port:%d", socket.RemoteAddr, socket.RemotePort)
  65. // 创建 Dialer
  66. dialer := &net.Dialer{
  67. LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: socket.LocalPort}, // 固定本地端口
  68. Control: controlTCPConn,
  69. }
  70. DialAddr := fmt.Sprintf("%s:%d", socket.RemoteAddr, socket.RemotePort)
  71. conn, err := dialer.Dial("tcp", DialAddr)
  72. if err != nil {
  73. lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err)
  74. return nil, err
  75. }
  76. lfshook.NewLogger().Logger.Infof("Connect success :%s:%d", socket.RemoteAddr, socket.RemotePort)
  77. return conn, nil
  78. }
  79. func controlTCPConn(network, address string, c syscall.RawConn) error {
  80. return c.Control(func(fd uintptr) {
  81. syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
  82. // 注意:SO_REUSEPORT 在某些系统可用(如 Linux),但非标准
  83. })
  84. }
  85. func Sendheartbeat(ctx context.Context, conn net.Conn) {
  86. var count uint8
  87. protocol := msgdata.NewProtocol()
  88. protocol.MessageID = 0x21
  89. protocol.DataLength = 0x04
  90. protocol.Data = make([]byte, 4)
  91. // 初始化协议...
  92. ticker := time.NewTicker(2 * time.Second)
  93. defer ticker.Stop()
  94. for {
  95. select {
  96. case <-ctx.Done():
  97. lfshook.NewLogger().Logger.Infof("Sendheartbeat===ctx==ret======")
  98. return
  99. case <-ticker.C:
  100. count++
  101. protocol.Data[0] = count
  102. // 编码并发送数据...
  103. encoded, err := protocol.Encode()
  104. if err != nil {
  105. fmt.Printf("encode err : %v\n", err)
  106. return
  107. }
  108. if conn != nil {
  109. _, err = conn.Write(encoded)
  110. if err != nil {
  111. fmt.Printf("Send hearbeat err: %v\n", err)
  112. conn.Close()
  113. return // 触发重连
  114. }
  115. //lfshook.NewLogger().Logger.Infof("Sendheartbeat===send ======%x", encoded)
  116. }
  117. }
  118. }
  119. }
  120. // 检查PA server是主状态还是从状态
  121. func CheckMaster(conn net.Conn) bool {
  122. // var count uint8
  123. //init heartbeat data
  124. protocol := msgdata.NewProtocol()
  125. protocol.SourceID = 0x02
  126. protocol.DestinationID = 0x01
  127. protocol.MessageID = 0x21
  128. protocol.DataLength = 0x04
  129. return false
  130. }