package stc import ( "context" "errors" "fmt" "io" "net" "pbx-api-gin/internal/app/stc/active" "pbx-api-gin/internal/app/stc/broadcast" msgdata "pbx-api-gin/internal/app/stc/data" "pbx-api-gin/internal/app/stc/socket" "pbx-api-gin/pkg/lfshook" "pbx-api-gin/pkg/utils" "strings" "sync" "syscall" "time" ) func StartStcConnection(conn net.Conn, cab string) { lfshook.NewLogger().Infof("Connect to STC%s ", cab) var connMux sync.Mutex // 保护 conn 的读写 var conn1 net.Conn var err error var logTag = 0 for { // 尝试建立连接MC conn1, err = CreateConnection(cab) if err != nil || conn1 == nil { time.Sleep(2 * time.Second) //lfshook.NewLogger().Logger.Infof("===========Reconnecting====To Cab:%s=======", cab) continue } trainInfo := fmt.Sprintf("CabNumber %s", active.ActivedCab) //set connection log if logTag == 0 { utils.Logger.Printf("Train Information: %s, Message: Connection to Cab%s STC is up !", trainInfo, cab) logTag = 1 } connMux.Lock() oldConn := conn if cab == "1" { socket.Conn = conn1 } else { socket.Conn8 = conn1 } connMux.Unlock() // 关闭旧连接(如果存在) if oldConn != nil { oldConn.Close() //lfshook.NewLogger().Logger.Infof("Closed previous connection") } // 使用 context 控制所有协程的生命周期 ctx, cancel := context.WithCancel(context.Background()) // 启动消息处理MC1 go func() { defer func() { cancel() // 一旦任一协程退出,取消所有 }() broadcast.HandleStcCmd(ctx, conn1) // 改造 HandleStcCmd 接收 ctx }() // 启动心跳MC1 go func() { defer func() { cancel() }() Sendheartbeat(ctx, conn1) // 改造 Sendheartbeat 接收 ctx }() // 等待连接断开(监听连接状态) <-ctx.Done() // 连接已断开,清理 cancel() // 确保所有 cancel 被调用 conn1.Close() //set connection log if logTag == 1 { utils.Logger.Printf("Train Information: %s, Message: Connection to Cab%s STC is down !", trainInfo, cab) logTag = 0 } //lfshook.NewLogger().Logger.Info("Reconnecting in 1 second...") time.Sleep(time.Second) // 重连前等待 } } // 返回错误而不是终止程序 func CreateConnection(RemoteCab string) (net.Conn, error) { if RemoteCab == "1" { // connect to MC1 //lfshook.NewLogger().Logger.Infof("========Connect Server MC1 IP:%s :Port:%d", socket.RemoteAddr, socket.RemotePort) // 创建 Dialer if active.CabNum == "1" { //in cab1 dialer := &net.Dialer{ LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: socket.LocalPort}, // 固定本地端口 Control: controlTCPConn, Timeout: 5 * time.Second, } DialAddr := fmt.Sprintf("%s:%d", socket.RemoteAddr, socket.RemotePort) conn, err := dialer.Dial("tcp", DialAddr) if err != nil { //lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err) return nil, err } lfshook.NewLogger().Logger.Infof("Connect success MC1:%s:%d", socket.RemoteAddr, socket.RemotePort) return conn, nil } else { //in cab 8 dialer := &net.Dialer{ LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: socket.LocalPort8}, // 固定本地端口 Control: controlTCPConn, Timeout: 5 * time.Second, } DialAddr := fmt.Sprintf("%s:%d", socket.RemoteAddr, socket.RemotePort) conn, err := dialer.Dial("tcp", DialAddr) if err != nil { //lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err) return nil, err } lfshook.NewLogger().Logger.Infof("Connect success MC1:%s:%d", socket.RemoteAddr, socket.RemotePort) return conn, nil } } else { // connect to MC8 //lfshook.NewLogger().Logger.Infof("========Connect server MC8 IP:%s :Port:%d", socket.RemoteAddr8, socket.RemotePort) // 创建 Dialer if active.CabNum == "1" { //in cab1 dialer := &net.Dialer{ LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: socket.LocalPort}, // 固定本地端口 Control: controlTCPConn, Timeout: 5 * time.Second, } DialAddr := fmt.Sprintf("%s:%d", socket.RemoteAddr8, socket.RemotePort) conn, err := dialer.Dial("tcp", DialAddr) if err != nil { //lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err) return nil, err } lfshook.NewLogger().Logger.Infof("Connect success MC8:%s:%d", socket.RemoteAddr8, socket.RemotePort) return conn, nil } else { //in cab 8 dialer := &net.Dialer{ LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: socket.LocalPort8}, // 固定本地端口 Control: controlTCPConn, Timeout: 5 * time.Second, } DialAddr := fmt.Sprintf("%s:%d", socket.RemoteAddr8, socket.RemotePort) conn, err := dialer.Dial("tcp", DialAddr) if err != nil { //lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err) return nil, err } lfshook.NewLogger().Logger.Infof("Connect success MC8:%s:%d", socket.RemoteAddr8, socket.RemotePort) return conn, nil } } } func StartConnectionToSipServer(conn net.Conn) { lfshook.NewLogger().Infof("Connect to Slave Sip Server ... ") var connMux sync.Mutex // 保护 conn 的读写 var conn1 net.Conn var err error for { // 尝试建立连接MC conn1, err = CreateConnectionSipServer() if err != nil || conn1 == nil { time.Sleep(2 * time.Second) //lfshook.NewLogger().Logger.Infof("===========Reconnecting==Sip Server=======") continue } connMux.Lock() oldConn := conn socket.ConnToSlave = conn1 connMux.Unlock() // 关闭旧连接(如果存在) if oldConn != nil { oldConn.Close() lfshook.NewLogger().Logger.Infof("Closed previous connection") } // 使用 context 控制所有协程的生命周期 ctx, cancel := context.WithCancel(context.Background()) // 启动心跳MC1 go func() { defer func() { cancel() }() SendToRemoteMaster(ctx, conn1) // 改造 Sendheartbeat 接收 ctx }() // 等待连接断开(监听连接状态) <-ctx.Done() // 连接已断开,清理 cancel() // 确保所有 cancel 被调用 conn1.Close() lfshook.NewLogger().Logger.Info("Reconnecting in 1 second...") time.Sleep(time.Second) // 重连前等待 } } // 连接Master sipserver func CreateConnectionSipServer() (net.Conn, error) { dialer := &net.Dialer{ LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: socket.LocalServerPort}, // 固定本地端口 Control: controlTCPConn, Timeout: 5 * time.Second, } DialAddr := fmt.Sprintf("%s:%d", socket.RemoteAddr8, socket.LocalServerPort) // Connect to Cab8 Sip server conn, err := dialer.Dial("tcp", DialAddr) if err != nil { //lfshook.NewLogger().Logger.Infof("========Connect SIP server err :%+v", err) return nil, err } lfshook.NewLogger().Logger.Infof("Connect SIP Server success :%s:%d", socket.RemoteAddr, socket.LocalPort) return conn, nil } func controlTCPConn(network, address string, c syscall.RawConn) error { return c.Control(func(fd uintptr) { syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1) // 注意:SO_REUSEPORT 在某些系统可用(如 Linux),但非标准 }) } func Sendheartbeat(ctx context.Context, conn net.Conn) { var count uint8 protocol := msgdata.NewProtocol() protocol.MessageID = 0x21 protocol.DataLength = 0x04 protocol.Data = make([]byte, 4) // 初始化协议... ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): //lfshook.NewLogger().Logger.Infof("Sendheartbeat===ctx==ret======") return case <-ticker.C: count++ protocol.Data[0] = count // 编码并发送数据... encoded, err := protocol.Encode() if err != nil { //fmt.Printf("encode err : %v\n", err) return } if conn != nil { _, err = conn.Write(encoded) if err != nil { //fmt.Printf("Send hearbeat err: %v\n", err) conn.Close() return // 触发重连 } //lfshook.NewLogger().Logger.Infof("Sendheartbeat===send ======%x", encoded) } } } } // cab == 8 func RecvFromSipServer() { lfshook.NewLogger().Infof("Connect to Master Sip Server .... ") listener, err := net.Listen("tcp", "0.0.0.0:10000") if err != nil { lfshook.NewLogger().Logger.Infof("Sever Listen cab1 err:%+v", err) } defer listener.Close() for { conn, err := listener.Accept() //blocked wait connection if err != nil { lfshook.NewLogger().Logger.Infof("Sever accept cab1 err:%+v", err) continue } // 启动 goroutine 处理每个连接(支持并发) //go HandleConnection(conn) //clientAddr := conn.RemoteAddr().String() buf := make([]byte, 1024) for { n, err := conn.Read(buf) if n > 0 { //安全截取实际读到的字节 data := buf[:n] //Set master = true if data[8] == 0x01 && data[5] == 0xf1 { active.Master = true //return // set to master , stop recv } // else if data[8] == 0x00 && data[5] == 0xf1 { //active.Master = false //} //lfshook.NewLogger().Logger.Infof("Client received %d bytes: hex=%x", n, data) //Respond to remote if active.Master == true { if _, werr := conn.Write([]byte("1")); werr != nil { lfshook.NewLogger().Logger.Infof("Failed to write 'ok' to client: %+v", werr) } } else { if _, werr := conn.Write([]byte("0")); werr != nil { lfshook.NewLogger().Logger.Infof("Failed to write 'ok' to client: %+v", werr) } } } if err == io.EOF { //lfshook.NewLogger().Logger.Infof("Client %s 连接正常关闭", clientAddr) break } if err != nil { // 忽略临时错误(如 timeout),但记录非临时错误 if netErr, ok := err.(net.Error); ok && netErr.Timeout() { continue // 可选:超时后继续读(需配合 SetReadDeadline) } //lfshook.NewLogger().Logger.Infof("addr:%s read error: %+v", clientAddr, err) break } } lfshook.NewLogger().Logger.Infof("Connection closed from cab1 set Master = true !") active.Master = true //return } } func SendToRemoteMaster(ctx context.Context, conn net.Conn) { protocol := msgdata.NewProtocol() protocol.MessageID = 0xf1 //check master data type protocol.DataLength = 0x02 protocol.Data = make([]byte, 2) // 初始化协议... ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: //set master data if active.Master { protocol.Data[0] = 0 //set remote master false } else { protocol.Data[0] = 1 //set remote master true } // 编码并发送数据... encoded, err := protocol.Encode() if err != nil { active.Master = true return } conn.SetWriteDeadline(time.Now().Add(1000 * time.Millisecond)) //发送1秒超时 if conn != nil { _, err = conn.Write(encoded) if err != nil { conn.Close() return // 触发重连 } } conn.SetReadDeadline(time.Now().Add(1000 * time.Millisecond)) //接收1秒超时 buf := make([]byte, 64) // 足够接收短响应 n, err := conn.Read(buf) if err != nil { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { lfshook.NewLogger().Logger.Infof("Read timeout waiting for remote response — may be OK (no response expected?)") } else if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "broken pipe") { lfshook.NewLogger().Logger.Infof("Remote closed connection during read: %+v", err) conn.Close() return } else { lfshook.NewLogger().Logger.Infof("Read from remote failed unexpectedly: %+v", err) } continue // 不中断循环,继续下一轮 } if n > 0 { response := strings.TrimSpace(string(buf[:n])) //lfshook.NewLogger().Logger.Infof("Remote responded: [%s]", response) //可选:校验响应,例如 if response == "1" { active.Master = false if utils.CheckAsterisk() { //check asterisk available then stop asterisk utils.ExecCmd("/etc/init.d/asterisk", "stop", "PBX") } } } } } }