浏览代码

1.修正master角色切换机制

dujunchen 2 周之前
父节点
当前提交
8eba509b42

+ 11 - 0
internal/app/index.go

@@ -23,15 +23,20 @@ func StartApp() {
 	if err != nil {
 		lfshook.NewLogger().Infof("Get IP err :%+v", err)
 	}
+
+	// Init cab number and master role
 	if IP[len(IP)-2:] == "81" {
+		active.Master = false
 		active.CabNum = "8"
 	} else {
+		active.Master = true
 		active.CabNum = "1"
 	}
 	lfshook.NewLogger().Infof("=================cab number:%s===========", active.CabNum)
 
 	//init the active status
 	active.Actived = true
+
 	//get priority
 	priority.GetPriority()
 
@@ -40,6 +45,12 @@ func StartApp() {
 
 	// 启动带有重连机制的连接管理协程MC8
 	go stc.StartStcConnection(socket.Conn8, "8")
+
+	//启动连接到Master服务器,检查Master是否在线
+	if active.CabNum == "8" {
+		socket.ConnectedMaster = false
+		go stc.StartConnectionToSipServer(socket.ConnToMaster)
+	}
 	// 启动其他服务...
 	// 启动 AMI
 	go func() {

+ 2 - 0
internal/app/stc/active/index.go

@@ -4,4 +4,6 @@ package active
 var CabNum string
 var Actived bool
 
+var Master bool
+
 //var ActivedMC int

+ 14 - 17
internal/app/stc/broadcast/stc-broadcast.go

@@ -58,6 +58,13 @@ func HandleStcCmd(ctx context.Context, conn net.Conn) {
 
 // 处理单个数据包(原 switch 逻辑迁移过来)
 func processPacket(packet []byte) {
+
+	//check if Master role
+	if !active.Master {
+		lfshook.NewLogger().Logger.Infof("=========Not Master Role Ignore data=============")
+		return
+	}
+
 	if len(packet) < 6 {
 		fmt.Println("Invalid packet length")
 		return
@@ -74,18 +81,13 @@ func processPacket(packet []byte) {
 		return
 	}
 
-	//check if actived
-	if !active.Actived {
-		lfshook.NewLogger().Logger.Infof("===========Inactived  retrun==============")
-		return
-	}
-
 	switch packet[5] {
 	case 0x01: //heartbeat
+
 		break
 
 	case 0x02: // STN
-		if priority.CheckPriority("STN") && active.Actived {
+		if priority.CheckPriority("STN") {
 			action.HangupRunningTask("STN") //STN interrupt other
 			StationAnn(packet)
 		} else {
@@ -93,7 +95,7 @@ func processPacket(packet []byte) {
 		}
 
 	case 0x05: // SPC
-		if priority.CheckPriority("SPC") && active.Actived {
+		if priority.CheckPriority("SPC") {
 			action.HangupRunningTask("SPC") //SPC interrupt other
 			SpecialAnn(packet)
 		} else {
@@ -101,7 +103,7 @@ func processPacket(packet []byte) {
 		}
 
 	case 0x06: // EMG
-		if priority.CheckPriority("EMG") && active.Actived {
+		if priority.CheckPriority("EMG") {
 			action.HangupRunningTask("EMG") //EMG interrupt other
 			EmgMsg(packet)
 		} else {
@@ -111,7 +113,7 @@ func processPacket(packet []byte) {
 		AnnStop([4]byte{packet[8], packet[9], packet[10], packet[11]})
 
 	case 0x08: // DCS
-		if priority.CheckPriority("DCS") && active.Actived {
+		if priority.CheckPriority("DCS") {
 			action.HangupRunningTask("DCS") //DCS interrupt other
 			DcsAnn(packet)
 		} else {
@@ -119,7 +121,7 @@ func processPacket(packet []byte) {
 		}
 
 	case 0x09: // SELF CHECK
-		if priority.CheckPriority("CHK") && active.Actived {
+		if priority.CheckPriority("CHK") {
 			action.HangupRunningTask("CHK") //CHK interrupt other
 			SelfCheck(packet)
 		} else {
@@ -127,12 +129,7 @@ func processPacket(packet []byte) {
 		}
 
 	case 0x0a: // Tone-test
-		if priority.CheckPriority("VOL") && active.Actived {
-			//check active signal  before VOL
-			if !active.Actived {
-				break
-			}
-
+		if priority.CheckPriority("VOL") {
 			action.HangupRunningTask("VOL") //VOL interrupt other
 			ToneTest(packet)
 		} else {

+ 120 - 0
internal/app/stc/index.go

@@ -150,6 +150,81 @@ func CreateConnection(RemoteCab string) (net.Conn, error) {
 	}
 }
 
+func StartConnectionToSipServer(conn net.Conn) {
+
+	var connMux sync.Mutex // 保护 conn 的读写
+	var conn1 net.Conn
+	var err error
+
+	for {
+		// 尝试建立连接MC
+		conn1, err = CreateConnectionSipServer()
+		if err != nil || conn1 == nil {
+			time.Sleep(2 * time.Second)
+			lfshook.NewLogger().Logger.Infof("===========Reconnecting==Sip Server=======")
+			continue
+		}
+
+		connMux.Lock()
+		oldConn := conn
+
+		socket.ConnToMaster = conn1
+
+		connMux.Unlock()
+
+		// 关闭旧连接(如果存在)
+		if oldConn != nil {
+			oldConn.Close()
+			lfshook.NewLogger().Logger.Infof("Closed previous connection")
+		}
+
+		// 使用 context 控制所有协程的生命周期
+		ctx, cancel := context.WithCancel(context.Background())
+
+		// 启动心跳MC1
+		go func() {
+			defer func() {
+				cancel()
+			}()
+			SendheartbeatToSipServer(ctx, conn1) // 改造 Sendheartbeat 接收 ctx
+		}()
+
+		// 等待连接断开(监听连接状态)
+		<-ctx.Done()
+
+		// 连接已断开,清理
+		cancel() // 确保所有 cancel 被调用
+		conn1.Close()
+
+		lfshook.NewLogger().Logger.Info("Reconnecting in 1 second...")
+		time.Sleep(time.Second) // 重连前等待
+
+		//check connected Master tag; connection err change to Master role ,exit
+		if socket.ConnectedMaster {
+			active.Master = true
+			return
+		}
+	}
+}
+
+// 连接Master sipserver
+func CreateConnectionSipServer() (net.Conn, error) {
+	dialer := &net.Dialer{
+		LocalAddr: &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: socket.LocalPort8}, // 固定本地端口
+		Control:   controlTCPConn,
+		Timeout:   5 * time.Second,
+	}
+
+	DialAddr := fmt.Sprintf("%s:%d", socket.RemoteAddr, socket.LocalPort) // Connect to Cab1 Sip server
+	conn, err := dialer.Dial("tcp", DialAddr)
+	if err != nil {
+		lfshook.NewLogger().Logger.Infof("========Connect server err :%+v", err)
+		return nil, err
+	}
+	lfshook.NewLogger().Logger.Infof("Connect SIP Server success :%s:%d", socket.RemoteAddr, socket.LocalPort)
+	return conn, nil
+}
+
 func controlTCPConn(network, address string, c syscall.RawConn) error {
 	return c.Control(func(fd uintptr) {
 		syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
@@ -197,6 +272,51 @@ func Sendheartbeat(ctx context.Context, conn net.Conn) {
 	}
 }
 
+func SendheartbeatToSipServer(ctx context.Context, conn net.Conn) {
+	var count uint8
+
+	protocol := msgdata.NewProtocol()
+	protocol.MessageID = 0x21
+	protocol.DataLength = 0x04
+	protocol.Data = make([]byte, 4)
+
+	// 初始化协议...
+	ticker := time.NewTicker(2 * time.Second)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ctx.Done():
+			lfshook.NewLogger().Logger.Infof("Sendheartbeat===ctx==ret======")
+			return
+
+		case <-ticker.C:
+			count++
+			protocol.Data[0] = count
+			// 编码并发送数据...
+			encoded, err := protocol.Encode()
+			if err != nil {
+				fmt.Printf("encode err : %v\n", err)
+				return
+			}
+			if conn != nil {
+				_, err = conn.Write(encoded)
+				if err != nil {
+					fmt.Printf("Send hearbeat err: %v\n", err)
+					conn.Close()
+					return // 触发重连
+				}
+
+				//Set connected Master tag
+				if !socket.ConnectedMaster {
+					socket.ConnectedMaster = true
+				}
+				//lfshook.NewLogger().Logger.Infof("Sendheartbeat===send ======%x", encoded)
+			}
+		}
+	}
+}
+
 // 检查PA server是主状态还是从状态
 func CheckMaster(conn net.Conn) bool {
 	//	var count uint8

+ 43 - 10
internal/app/stc/sendstatus/status.go

@@ -23,6 +23,11 @@ func SendToStc(conn net.Conn, data []byte) {
 // report alarm status to STC
 func AlarmStatus(exten string, status string) {
 
+	//Not Master role , ignore
+	if !active.Master {
+		return
+	}
+
 	//check exten if it is a alarm exten
 	if !utils.IsPAIU(exten) { // if not alarm device , return
 		return
@@ -74,19 +79,24 @@ func AlarmStatus(exten string, status string) {
 	}
 	//check if actived
 	lfshook.NewLogger().Logger.Infof("===AlarmStatus=ext:%s===carr:%x==========pos:%x=========status:%x", exten, protocol.Data[0], protocol.Data[1], protocol.Data[2])
-	if active.Actived {
-		if socket.Conn != nil {
-			SendToStc(socket.Conn, encoded)
-		}
 
-		if socket.Conn8 != nil {
-			SendToStc(socket.Conn8, encoded)
-		}
+	if socket.Conn != nil {
+		SendToStc(socket.Conn, encoded)
 	}
+
+	if socket.Conn8 != nil {
+		SendToStc(socket.Conn8, encoded)
+	}
+
 }
 
 // report broadcast status to STC
 func PaStatus(src string, patype string, operation string) {
+
+	//Not Master role , ignore
+	if !active.Master {
+		return
+	}
 	lfshook.NewLogger().Logger.Infof("===PAStatus=Startext:%s=== type:%s=========action:%s", src, patype, operation)
 	protocol := msgdata.NewProtocol()
 	protocol.MessageID = 0x22
@@ -151,14 +161,24 @@ func PaStatus(src string, patype string, operation string) {
 		return
 	}
 
-	if active.Actived {
+	if socket.Conn != nil {
 		SendToStc(socket.Conn, encoded)
+	}
+
+	if socket.Conn8 != nil {
 		SendToStc(socket.Conn8, encoded)
 	}
+
 }
 
 // report broadcast status to STC
 func OccPad(operation string) {
+
+	//Not Master role , ignore
+	if !active.Master {
+		return
+	}
+
 	lfshook.NewLogger().Logger.Infof("===OCC-PAD========action:%s", operation)
 	protocol := msgdata.NewProtocol()
 	protocol.MessageID = 0x2A
@@ -181,10 +201,14 @@ func OccPad(operation string) {
 		return
 	}
 
-	if active.Actived {
+	if socket.Conn != nil {
 		SendToStc(socket.Conn, encoded)
+	}
+
+	if socket.Conn8 != nil {
 		SendToStc(socket.Conn8, encoded)
 	}
+
 }
 
 // report broadcast status to STC
@@ -197,6 +221,11 @@ func SendRecordFile(filename, rcdtype string) {
 			return
 		}
 	*/
+	//Not Master role , ignore
+	if !active.Master {
+		return
+	}
+
 	protocol := msgdata.NewProtocol()
 	protocol.MessageID = 0x31
 
@@ -226,8 +255,12 @@ func SendRecordFile(filename, rcdtype string) {
 		return
 	}
 
-	if active.Actived {
+	if socket.Conn != nil {
 		SendToStc(socket.Conn, encoded)
+	}
+
+	if socket.Conn8 != nil {
 		SendToStc(socket.Conn8, encoded)
 	}
+
 }

+ 4 - 0
internal/app/stc/socket/index.go

@@ -5,6 +5,8 @@ import "net"
 var Conn net.Conn
 var Conn8 net.Conn
 
+var ConnToMaster net.Conn
+
 const RemotePort = 10100
 const LocalPort = 10201
 const LocalPort8 = 10202
@@ -14,3 +16,5 @@ const LocalPort8 = 10202
 const RemoteAddr = "10.0.11.11"
 
 const RemoteAddr8 = "10.0.11.81"
+
+var ConnectedMaster bool