index.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package stc
  2. import (
  3. "fmt"
  4. "net"
  5. "pbx-api-gin/internal/app/ami/action"
  6. "pbx-api-gin/internal/app/stc/broadcast"
  7. msgdata "pbx-api-gin/internal/app/stc/data"
  8. "pbx-api-gin/pkg/lfshook"
  9. "sync"
  10. "syscall"
  11. "time"
  12. )
  13. func StartStcConnection(conn net.Conn) {
  14. //var conn net.Conn
  15. var wg sync.WaitGroup
  16. ServerAddr := "10.0.11.11"
  17. Port := 10100
  18. for {
  19. // 尝试建立连接
  20. conn, err := CreateConnection(ServerAddr, Port)
  21. if err != nil {
  22. fmt.Printf("连接失败 %s:%d,将在5秒后重试...\n", ServerAddr, Port)
  23. time.Sleep(5 * time.Second)
  24. lfshook.NewLogger().Logger.Infof("reconnect err %+v", err)
  25. continue
  26. }
  27. fmt.Printf("成功连接到 %s:%d\n", ServerAddr, Port)
  28. // 启动消息处理和心跳协程
  29. wg.Add(2)
  30. go func() {
  31. defer wg.Done()
  32. broadcast.HandleStcCmd(conn) // 处理消息
  33. }()
  34. go func() {
  35. defer wg.Done()
  36. Sendheartbeat(conn) // 发送心跳
  37. }()
  38. //start AMI
  39. go action.StartAMI(func() {
  40. lfshook.NewLogger().Info("ami callback")
  41. // 首次连接才进行初始化
  42. }, []func(event map[string]string){}, conn)
  43. // 等待连接断开
  44. wg.Wait()
  45. fmt.Println("检测到连接断开,准备重新连接...")
  46. conn.Close() // 显式关闭旧连接
  47. time.Sleep(time.Second * 1) // 可选:断开后等待1秒再重试
  48. }
  49. }
  50. // 返回错误而不是终止程序
  51. func CreateConnection(ServerAddr string, Port int) (net.Conn, error) {
  52. fmt.Println("========== conn server ==========")
  53. conn, err := net.DialTCP("tcp", &net.TCPAddr{
  54. IP: net.ParseIP("0.0.0.0"),
  55. Port: 10201,
  56. }, &net.TCPAddr{
  57. IP: net.ParseIP(ServerAddr),
  58. Port: Port,
  59. })
  60. if err != nil {
  61. return nil, err
  62. }
  63. fileDesc, _ := conn.File()
  64. fd := fileDesc.Fd()
  65. syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
  66. return conn, nil
  67. }
  68. func Sendheartbeat(conn net.Conn) {
  69. var count uint8
  70. protocol := msgdata.NewProtocol()
  71. // 初始化协议...
  72. ticker := time.NewTicker(2 * time.Second)
  73. defer ticker.Stop()
  74. for {
  75. select {
  76. case <-ticker.C:
  77. count++
  78. // 编码并发送数据...
  79. encoded, err := protocol.Encode()
  80. if err != nil {
  81. fmt.Printf("编码失败: %v\n", err)
  82. return
  83. }
  84. _, err = conn.Write(encoded)
  85. if err != nil {
  86. fmt.Printf("发送心跳失败: %v\n", err)
  87. return // 触发重连
  88. }
  89. }
  90. }
  91. }
  92. // 检查PA server是主状态还是从状态
  93. func CheckMaster(conn net.Conn) bool {
  94. // var count uint8
  95. //init heartbeat data
  96. protocol := msgdata.NewProtocol()
  97. protocol.SourceID = 0x02
  98. protocol.DestinationID = 0x01
  99. protocol.MessageID = 0x21
  100. protocol.DataLength = 0x04
  101. return false
  102. }