package stc import ( "context" "fmt" "net" "pbx-api-gin/internal/app/stc/broadcast" msgdata "pbx-api-gin/internal/app/stc/data" "pbx-api-gin/internal/app/stc/socket" "pbx-api-gin/pkg/lfshook" "sync" "syscall" "time" ) const RemotePort = 10100 const LocalPort = 10201 //const RemoteAddr = "192.168.17.14" const RemoteAddr = "10.0.11.11" func StartStcConnection(conn net.Conn) { //var conn net.Conn //var wg sync.WaitGroup var connMux sync.Mutex // 保护 conn 的读写 //var err error for { // 尝试建立连接 conn1, err := CreateConnection() if err != nil { time.Sleep(2 * time.Second) continue } connMux.Lock() oldConn := conn socket.Conn = conn1 connMux.Unlock() // 关闭旧连接(如果存在) if oldConn != nil { oldConn.Close() lfshook.NewLogger().Logger.Infof("Closed previous connection") } // 使用 context 控制所有协程的生命周期 ctx, cancel := context.WithCancel(context.Background()) lfshook.NewLogger().Logger.Infof("Connect success :%s:%d", RemoteAddr, RemotePort) // 启动消息处理和心跳协程 // 启动消息处理 go func() { defer func() { cancel() // 一旦任一协程退出,取消所有 }() broadcast.HandleStcCmd(ctx, conn1) // 改造 HandleStcCmd 接收 ctx }() // 启动心跳 go func() { defer func() { cancel() }() Sendheartbeat(ctx, conn1) // 改造 Sendheartbeat 接收 ctx }() // 等待连接断开(监听连接状态) <-ctx.Done() // 连接已断开,清理 cancel() // 确保所有 cancel 被调用 conn1.Close() lfshook.NewLogger().Logger.Info("Reconnecting in 1 second...") time.Sleep(time.Second) // 重连前等待 } } // 返回错误而不是终止程序 func CreateConnection() (net.Conn, error) { lfshook.NewLogger().Logger.Infof("========Connect server IP:%s :Port:%d", RemoteAddr, RemotePort) // 创建 Dialer dialer := &net.Dialer{ LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: LocalPort}, // 固定本地端口 Control: controlTCPConn, } DialAddr := fmt.Sprintf("%s:%d", RemoteAddr, RemotePort) conn, err := dialer.Dial("tcp", DialAddr) if err != nil { lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err) return nil, err } return conn, nil } func controlTCPConn(network, address string, c syscall.RawConn) error { return c.Control(func(fd uintptr) { syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1) // 注意:SO_REUSEPORT 在某些系统可用(如 Linux),但非标准 }) } func Sendheartbeat(ctx context.Context, conn net.Conn) { var count uint8 protocol := msgdata.NewProtocol() protocol.MessageID = 0x21 protocol.DataLength = 0x04 protocol.Data = make([]byte, 4) // 初始化协议... ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): lfshook.NewLogger().Logger.Infof("Sendheartbeat===ctx==ret======") return case <-ticker.C: count++ protocol.Data[0] = count // 编码并发送数据... encoded, err := protocol.Encode() if err != nil { fmt.Printf("encode err : %v\n", err) return } if conn != nil { _, err = conn.Write(encoded) if err != nil { fmt.Printf("Send hearbeat err: %v\n", err) conn.Close() return // 触发重连 } //lfshook.NewLogger().Logger.Infof("Sendheartbeat===send ======%x", encoded) } } } } // 检查PA server是主状态还是从状态 func CheckMaster(conn net.Conn) bool { // var count uint8 //init heartbeat data protocol := msgdata.NewProtocol() protocol.SourceID = 0x02 protocol.DestinationID = 0x01 protocol.MessageID = 0x21 protocol.DataLength = 0x04 return false }