index.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package stc
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "pbx-api-gin/internal/app/stc/active"
  7. "pbx-api-gin/internal/app/stc/broadcast"
  8. msgdata "pbx-api-gin/internal/app/stc/data"
  9. "pbx-api-gin/internal/app/stc/socket"
  10. "pbx-api-gin/pkg/lfshook"
  11. "sync"
  12. "syscall"
  13. "time"
  14. )
  15. func StartStcConnection(conn net.Conn, cab string) {
  16. var connMux sync.Mutex // 保护 conn 的读写
  17. var conn1 net.Conn
  18. var err error
  19. for {
  20. // 尝试建立连接MC
  21. conn1, err = CreateConnection(cab)
  22. if err != nil || conn1 == nil {
  23. time.Sleep(2 * time.Second)
  24. lfshook.NewLogger().Logger.Infof("===========Reconnecting====Cab:%s=======", cab)
  25. continue
  26. }
  27. connMux.Lock()
  28. oldConn := conn
  29. if cab == "1" {
  30. socket.Conn = conn1
  31. } else {
  32. socket.Conn8 = conn1
  33. }
  34. connMux.Unlock()
  35. // 关闭旧连接(如果存在)
  36. if oldConn != nil {
  37. oldConn.Close()
  38. lfshook.NewLogger().Logger.Infof("Closed previous connection")
  39. }
  40. // 使用 context 控制所有协程的生命周期
  41. ctx, cancel := context.WithCancel(context.Background())
  42. // 启动消息处理MC1
  43. go func() {
  44. defer func() {
  45. cancel() // 一旦任一协程退出,取消所有
  46. }()
  47. broadcast.HandleStcCmd(ctx, conn1) // 改造 HandleStcCmd 接收 ctx
  48. }()
  49. // 启动心跳MC1
  50. go func() {
  51. defer func() {
  52. cancel()
  53. }()
  54. Sendheartbeat(ctx, conn1) // 改造 Sendheartbeat 接收 ctx
  55. }()
  56. // 等待连接断开(监听连接状态)
  57. <-ctx.Done()
  58. // 连接已断开,清理
  59. cancel() // 确保所有 cancel 被调用
  60. conn1.Close()
  61. lfshook.NewLogger().Logger.Info("Reconnecting in 1 second...")
  62. time.Sleep(time.Second) // 重连前等待
  63. }
  64. }
  65. // 返回错误而不是终止程序
  66. func CreateConnection(RemoteCab string) (net.Conn, error) {
  67. if RemoteCab == "1" { // connect to MC1
  68. lfshook.NewLogger().Logger.Infof("========Connect Server MC1 IP:%s :Port:%d", socket.RemoteAddr, socket.RemotePort)
  69. // 创建 Dialer
  70. if active.CabNum == "1" { //in cab1
  71. dialer := &net.Dialer{
  72. LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: socket.LocalPort}, // 固定本地端口
  73. Control: controlTCPConn,
  74. Timeout: 5 * time.Second,
  75. }
  76. DialAddr := fmt.Sprintf("%s:%d", socket.RemoteAddr, socket.RemotePort)
  77. conn, err := dialer.Dial("tcp", DialAddr)
  78. if err != nil {
  79. lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err)
  80. return nil, err
  81. }
  82. lfshook.NewLogger().Logger.Infof("Connect success :%s:%d", socket.RemoteAddr, socket.RemotePort)
  83. return conn, nil
  84. } else { //in cab 8
  85. dialer := &net.Dialer{
  86. LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: socket.LocalPort8}, // 固定本地端口
  87. Control: controlTCPConn,
  88. Timeout: 5 * time.Second,
  89. }
  90. DialAddr := fmt.Sprintf("%s:%d", socket.RemoteAddr, socket.RemotePort)
  91. conn, err := dialer.Dial("tcp", DialAddr)
  92. if err != nil {
  93. lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err)
  94. return nil, err
  95. }
  96. lfshook.NewLogger().Logger.Infof("Connect success :%s:%d", socket.RemoteAddr, socket.RemotePort)
  97. return conn, nil
  98. }
  99. } else { // connect to MC8
  100. lfshook.NewLogger().Logger.Infof("========Connect server MC8 IP:%s :Port:%d", socket.RemoteAddr8, socket.RemotePort)
  101. // 创建 Dialer
  102. if active.CabNum == "1" { //in cab1
  103. dialer := &net.Dialer{
  104. LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: socket.LocalPort}, // 固定本地端口
  105. Control: controlTCPConn,
  106. Timeout: 5 * time.Second,
  107. }
  108. DialAddr := fmt.Sprintf("%s:%d", socket.RemoteAddr8, socket.RemotePort)
  109. conn, err := dialer.Dial("tcp", DialAddr)
  110. if err != nil {
  111. lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err)
  112. return nil, err
  113. }
  114. lfshook.NewLogger().Logger.Infof("Connect success MC8:%s:%d", socket.RemoteAddr8, socket.RemotePort)
  115. return conn, nil
  116. } else { //in cab 8
  117. dialer := &net.Dialer{
  118. LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: socket.LocalPort8}, // 固定本地端口
  119. Control: controlTCPConn,
  120. Timeout: 5 * time.Second,
  121. }
  122. DialAddr := fmt.Sprintf("%s:%d", socket.RemoteAddr8, socket.RemotePort)
  123. conn, err := dialer.Dial("tcp", DialAddr)
  124. if err != nil {
  125. lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err)
  126. return nil, err
  127. }
  128. lfshook.NewLogger().Logger.Infof("Connect success MC8:%s:%d", socket.RemoteAddr8, socket.RemotePort)
  129. return conn, nil
  130. }
  131. }
  132. }
  133. func controlTCPConn(network, address string, c syscall.RawConn) error {
  134. return c.Control(func(fd uintptr) {
  135. syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
  136. // 注意:SO_REUSEPORT 在某些系统可用(如 Linux),但非标准
  137. })
  138. }
  139. func Sendheartbeat(ctx context.Context, conn net.Conn) {
  140. var count uint8
  141. protocol := msgdata.NewProtocol()
  142. protocol.MessageID = 0x21
  143. protocol.DataLength = 0x04
  144. protocol.Data = make([]byte, 4)
  145. // 初始化协议...
  146. ticker := time.NewTicker(2 * time.Second)
  147. defer ticker.Stop()
  148. for {
  149. select {
  150. case <-ctx.Done():
  151. lfshook.NewLogger().Logger.Infof("Sendheartbeat===ctx==ret======")
  152. return
  153. case <-ticker.C:
  154. count++
  155. protocol.Data[0] = count
  156. // 编码并发送数据...
  157. encoded, err := protocol.Encode()
  158. if err != nil {
  159. fmt.Printf("encode err : %v\n", err)
  160. return
  161. }
  162. if conn != nil {
  163. _, err = conn.Write(encoded)
  164. if err != nil {
  165. fmt.Printf("Send hearbeat err: %v\n", err)
  166. conn.Close()
  167. return // 触发重连
  168. }
  169. //lfshook.NewLogger().Logger.Infof("Sendheartbeat===send ======%x", encoded)
  170. }
  171. }
  172. }
  173. }
  174. // 检查PA server是主状态还是从状态
  175. func CheckMaster(conn net.Conn) bool {
  176. // var count uint8
  177. //init heartbeat data
  178. protocol := msgdata.NewProtocol()
  179. protocol.SourceID = 0x02
  180. protocol.DestinationID = 0x01
  181. protocol.MessageID = 0x21
  182. protocol.DataLength = 0x04
  183. return false
  184. }