index.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package stc
  2. import (
  3. "fmt"
  4. "net"
  5. "pbx-api-gin/internal/app/ami/action"
  6. "pbx-api-gin/internal/app/stc/broadcast"
  7. msgdata "pbx-api-gin/internal/app/stc/data"
  8. "pbx-api-gin/pkg/lfshook"
  9. "sync"
  10. "syscall"
  11. "time"
  12. )
  13. const RemotePort = 10100
  14. const LocalPort = 10201
  15. //const RemoteAddr = "192.168.17.14"
  16. const RemoteAddr = "10.0.11.11"
  17. func StartStcConnection(conn net.Conn) {
  18. //var conn net.Conn
  19. var wg sync.WaitGroup
  20. var err error
  21. for {
  22. // 尝试建立连接
  23. conn, err = CreateConnection()
  24. if err != nil {
  25. time.Sleep(2 * time.Second)
  26. continue
  27. }
  28. lfshook.NewLogger().Logger.Infof("Connect success :%s:%d", RemoteAddr, RemotePort)
  29. // 启动消息处理和心跳协程
  30. wg.Add(2)
  31. go func() {
  32. defer wg.Done()
  33. broadcast.HandleStcCmd(conn) // 处理消息
  34. }()
  35. go func() {
  36. defer wg.Done()
  37. Sendheartbeat(conn) // 发送心跳
  38. }()
  39. //start AMI
  40. go action.StartAMI(func() {
  41. lfshook.NewLogger().Info("ami callback")
  42. // 首次连接才进行初始化
  43. }, []func(event map[string]string){}, conn)
  44. // 等待连接断开
  45. wg.Wait()
  46. fmt.Println("Start Reconnect ...")
  47. conn.Close() // 显式关闭旧连接
  48. time.Sleep(time.Second * 1) // 可选:断开后等待1秒再重试
  49. }
  50. }
  51. // 返回错误而不是终止程序
  52. func CreateConnection() (net.Conn, error) {
  53. lfshook.NewLogger().Logger.Infof("========Connect server IP:%s :Port:%d", RemoteAddr, RemotePort)
  54. // 创建 Dialer
  55. dialer := &net.Dialer{
  56. LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: LocalPort}, // 固定本地端口
  57. Control: controlTCPConn,
  58. }
  59. DialAddr := fmt.Sprintf("%s:%d", RemoteAddr, RemotePort)
  60. conn, err := dialer.Dial("tcp", DialAddr)
  61. if err != nil {
  62. lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err)
  63. return nil, err
  64. }
  65. return conn, nil
  66. }
  67. func controlTCPConn(network, address string, c syscall.RawConn) error {
  68. return c.Control(func(fd uintptr) {
  69. syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
  70. // 注意:SO_REUSEPORT 在某些系统可用(如 Linux),但非标准
  71. })
  72. }
  73. func Sendheartbeat(conn net.Conn) {
  74. var count uint8
  75. protocol := msgdata.NewProtocol()
  76. // 初始化协议...
  77. ticker := time.NewTicker(2 * time.Second)
  78. defer ticker.Stop()
  79. for {
  80. select {
  81. case <-ticker.C:
  82. count++
  83. // 编码并发送数据...
  84. encoded, err := protocol.Encode()
  85. if err != nil {
  86. fmt.Printf("encode err : %v\n", err)
  87. return
  88. }
  89. if conn != nil {
  90. _, err = conn.Write(encoded)
  91. if err != nil {
  92. fmt.Printf("Send hearbeat err: %v\n", err)
  93. conn.Close()
  94. return // 触发重连
  95. }
  96. }
  97. }
  98. }
  99. }
  100. // 检查PA server是主状态还是从状态
  101. func CheckMaster(conn net.Conn) bool {
  102. // var count uint8
  103. //init heartbeat data
  104. protocol := msgdata.NewProtocol()
  105. protocol.SourceID = 0x02
  106. protocol.DestinationID = 0x01
  107. protocol.MessageID = 0x21
  108. protocol.DataLength = 0x04
  109. return false
  110. }