index.go 8.8 KB


  1. package stc
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "pbx-api-gin/internal/app/stc/active"
  7. "pbx-api-gin/internal/app/stc/broadcast"
  8. msgdata "pbx-api-gin/internal/app/stc/data"
  9. "pbx-api-gin/internal/app/stc/socket"
  10. "pbx-api-gin/pkg/lfshook"
  11. "pbx-api-gin/pkg/utils"
  12. "sync"
  13. "syscall"
  14. "time"
  15. )
  16. func StartStcConnection(conn net.Conn, cab string) {
  17. var connMux sync.Mutex // 保护 conn 的读写
  18. var conn1 net.Conn
  19. var err error
  20. var logTag = 0
  21. for {
  22. // 尝试建立连接MC
  23. conn1, err = CreateConnection(cab)
  24. if err != nil || conn1 == nil {
  25. time.Sleep(2 * time.Second)
  26. lfshook.NewLogger().Logger.Infof("===========Reconnecting====To Cab:%s=======", cab)
  27. continue
  28. }
  29. //set connection log
  30. if logTag == 0 {
  31. utils.Logger.Printf("Train Information: CabNumber %s, Message: Connection to Cab%s STC is up !", active.ActivedCab, cab)
  32. logTag = 1
  33. }
  34. connMux.Lock()
  35. oldConn := conn
  36. if cab == "1" {
  37. socket.Conn = conn1
  38. } else {
  39. socket.Conn8 = conn1
  40. }
  41. connMux.Unlock()
  42. // 关闭旧连接(如果存在)
  43. if oldConn != nil {
  44. oldConn.Close()
  45. //lfshook.NewLogger().Logger.Infof("Closed previous connection")
  46. }
  47. // 使用 context 控制所有协程的生命周期
  48. ctx, cancel := context.WithCancel(context.Background())
  49. // 启动消息处理MC1
  50. go func() {
  51. defer func() {
  52. cancel() // 一旦任一协程退出,取消所有
  53. }()
  54. broadcast.HandleStcCmd(ctx, conn1) // 改造 HandleStcCmd 接收 ctx
  55. }()
  56. // 启动心跳MC1
  57. go func() {
  58. defer func() {
  59. cancel()
  60. }()
  61. Sendheartbeat(ctx, conn1) // 改造 Sendheartbeat 接收 ctx
  62. }()
  63. // 等待连接断开(监听连接状态)
  64. <-ctx.Done()
  65. // 连接已断开,清理
  66. cancel() // 确保所有 cancel 被调用
  67. conn1.Close()
  68. //set connection log
  69. if logTag == 1 {
  70. utils.Logger.Printf("Train Information: CabNumber %s, Message: Connection to Cab%s STC is down !", active.ActivedCab, cab)
  71. logTag = 0
  72. }
  73. //lfshook.NewLogger().Logger.Info("Reconnecting in 1 second...")
  74. time.Sleep(time.Second) // 重连前等待
  75. }
  76. }
  77. // 返回错误而不是终止程序
  78. func CreateConnection(RemoteCab string) (net.Conn, error) {
  79. if RemoteCab == "1" { // connect to MC1
  80. //lfshook.NewLogger().Logger.Infof("========Connect Server MC1 IP:%s :Port:%d", socket.RemoteAddr, socket.RemotePort)
  81. // 创建 Dialer
  82. if active.CabNum == "1" { //in cab1
  83. dialer := &net.Dialer{
  84. LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: socket.LocalPort}, // 固定本地端口
  85. Control: controlTCPConn,
  86. Timeout: 5 * time.Second,
  87. }
  88. DialAddr := fmt.Sprintf("%s:%d", socket.RemoteAddr, socket.RemotePort)
  89. conn, err := dialer.Dial("tcp", DialAddr)
  90. if err != nil {
  91. //lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err)
  92. return nil, err
  93. }
  94. lfshook.NewLogger().Logger.Infof("Connect success :%s:%d", socket.RemoteAddr, socket.RemotePort)
  95. return conn, nil
  96. } else { //in cab 8
  97. dialer := &net.Dialer{
  98. LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: socket.LocalPort8}, // 固定本地端口
  99. Control: controlTCPConn,
  100. Timeout: 5 * time.Second,
  101. }
  102. DialAddr := fmt.Sprintf("%s:%d", socket.RemoteAddr, socket.RemotePort)
  103. conn, err := dialer.Dial("tcp", DialAddr)
  104. if err != nil {
  105. //lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err)
  106. return nil, err
  107. }
  108. lfshook.NewLogger().Logger.Infof("Connect success :%s:%d", socket.RemoteAddr, socket.RemotePort)
  109. return conn, nil
  110. }
  111. } else { // connect to MC8
  112. //lfshook.NewLogger().Logger.Infof("========Connect server MC8 IP:%s :Port:%d", socket.RemoteAddr8, socket.RemotePort)
  113. // 创建 Dialer
  114. if active.CabNum == "1" { //in cab1
  115. dialer := &net.Dialer{
  116. LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: socket.LocalPort}, // 固定本地端口
  117. Control: controlTCPConn,
  118. Timeout: 5 * time.Second,
  119. }
  120. DialAddr := fmt.Sprintf("%s:%d", socket.RemoteAddr8, socket.RemotePort)
  121. conn, err := dialer.Dial("tcp", DialAddr)
  122. if err != nil {
  123. //lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err)
  124. return nil, err
  125. }
  126. lfshook.NewLogger().Logger.Infof("Connect success MC8:%s:%d", socket.RemoteAddr8, socket.RemotePort)
  127. return conn, nil
  128. } else { //in cab 8
  129. dialer := &net.Dialer{
  130. LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: socket.LocalPort8}, // 固定本地端口
  131. Control: controlTCPConn,
  132. Timeout: 5 * time.Second,
  133. }
  134. DialAddr := fmt.Sprintf("%s:%d", socket.RemoteAddr8, socket.RemotePort)
  135. conn, err := dialer.Dial("tcp", DialAddr)
  136. if err != nil {
  137. //lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err)
  138. return nil, err
  139. }
  140. lfshook.NewLogger().Logger.Infof("Connect success MC8:%s:%d", socket.RemoteAddr8, socket.RemotePort)
  141. return conn, nil
  142. }
  143. }
  144. }
  145. /*
  146. func StartConnectionToSipServer(conn net.Conn) {
  147. var connMux sync.Mutex // 保护 conn 的读写
  148. var conn1 net.Conn
  149. var err error
  150. for {
  151. // 尝试建立连接MC
  152. conn1, err = CreateConnectionSipServer()
  153. if err != nil || conn1 == nil {
  154. time.Sleep(2 * time.Second)
  155. lfshook.NewLogger().Logger.Infof("===========Reconnecting==Sip Server=======")
  156. continue
  157. }
  158. connMux.Lock()
  159. oldConn := conn
  160. socket.ConnToMaster = conn1
  161. connMux.Unlock()
  162. // 关闭旧连接(如果存在)
  163. if oldConn != nil {
  164. oldConn.Close()
  165. lfshook.NewLogger().Logger.Infof("Closed previous connection")
  166. }
  167. // 使用 context 控制所有协程的生命周期
  168. ctx, cancel := context.WithCancel(context.Background())
  169. // 启动心跳MC1
  170. go func() {
  171. defer func() {
  172. cancel()
  173. }()
  174. SendheartbeatToSipServer(ctx, conn1) // 改造 Sendheartbeat 接收 ctx
  175. }()
  176. // 等待连接断开(监听连接状态)
  177. <-ctx.Done()
  178. // 连接已断开,清理
  179. cancel() // 确保所有 cancel 被调用
  180. conn1.Close()
  181. lfshook.NewLogger().Logger.Info("Reconnecting in 1 second...")
  182. time.Sleep(time.Second) // 重连前等待
  183. //check connected Master tag; connection err change to Master role ,exit
  184. //if socket.ConnectedMaster {
  185. // active.Master = true
  186. // return
  187. //}
  188. }
  189. }
  190. */
  191. // 连接Master sipserver
  192. func CreateConnectionSipServer() (net.Conn, error) {
  193. dialer := &net.Dialer{
  194. LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: socket.LocalPort8}, // 固定本地端口
  195. Control: controlTCPConn,
  196. Timeout: 5 * time.Second,
  197. }
  198. DialAddr := fmt.Sprintf("%s:%d", socket.RemoteAddr, socket.LocalPort) // Connect to Cab1 Sip server
  199. conn, err := dialer.Dial("tcp", DialAddr)
  200. if err != nil {
  201. //lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err)
  202. return nil, err
  203. }
  204. lfshook.NewLogger().Logger.Infof("Connect SIP Server success :%s:%d", socket.RemoteAddr, socket.LocalPort)
  205. return conn, nil
  206. }
  207. func controlTCPConn(network, address string, c syscall.RawConn) error {
  208. return c.Control(func(fd uintptr) {
  209. syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
  210. // 注意:SO_REUSEPORT 在某些系统可用(如 Linux),但非标准
  211. })
  212. }
  213. func Sendheartbeat(ctx context.Context, conn net.Conn) {
  214. var count uint8
  215. protocol := msgdata.NewProtocol()
  216. protocol.MessageID = 0x21
  217. protocol.DataLength = 0x04
  218. protocol.Data = make([]byte, 4)
  219. // 初始化协议...
  220. ticker := time.NewTicker(2 * time.Second)
  221. defer ticker.Stop()
  222. for {
  223. select {
  224. case <-ctx.Done():
  225. //lfshook.NewLogger().Logger.Infof("Sendheartbeat===ctx==ret======")
  226. return
  227. case <-ticker.C:
  228. count++
  229. protocol.Data[0] = count
  230. // 编码并发送数据...
  231. encoded, err := protocol.Encode()
  232. if err != nil {
  233. //fmt.Printf("encode err : %v\n", err)
  234. return
  235. }
  236. if conn != nil {
  237. _, err = conn.Write(encoded)
  238. if err != nil {
  239. //fmt.Printf("Send hearbeat err: %v\n", err)
  240. conn.Close()
  241. return // 触发重连
  242. }
  243. //lfshook.NewLogger().Logger.Infof("Sendheartbeat===send ======%x", encoded)
  244. }
  245. }
  246. }
  247. }
  248. func SendheartbeatToSipServer(ctx context.Context, conn net.Conn) {
  249. var count uint8
  250. protocol := msgdata.NewProtocol()
  251. protocol.MessageID = 0x21
  252. protocol.DataLength = 0x04
  253. protocol.Data = make([]byte, 4)
  254. // 初始化协议...
  255. ticker := time.NewTicker(2 * time.Second)
  256. defer ticker.Stop()
  257. for {
  258. select {
  259. case <-ctx.Done():
  260. //lfshook.NewLogger().Logger.Infof("Sendheartbeat===ctx==ret======")
  261. return
  262. case <-ticker.C:
  263. count++
  264. protocol.Data[0] = count
  265. // 编码并发送数据...
  266. encoded, err := protocol.Encode()
  267. if err != nil {
  268. //fmt.Printf("encode err : %v\n", err)
  269. return
  270. }
  271. if conn != nil {
  272. _, err = conn.Write(encoded)
  273. if err != nil {
  274. //fmt.Printf("Send hearbeat err: %v\n", err)
  275. conn.Close()
  276. return // 触发重连
  277. }
  278. //Set connected Master tag
  279. if !socket.ConnectedMaster {
  280. socket.ConnectedMaster = true
  281. }
  282. //lfshook.NewLogger().Logger.Infof("Sendheartbeat===send ======%x", encoded)
  283. }
  284. }
  285. }
  286. }