Ver código fonte

add reconnect

dujunchen 1 semana atrás
pai
commit
e54a6575e2
2 arquivos alterados com 57 adições e 56 exclusões
  1. 7 16
      internal/app/index.go
  2. 50 40
      internal/app/stc/index.go

+ 7 - 16
internal/app/index.go

@@ -13,20 +13,11 @@ var conn net.Conn
 
 func StartApp() {
 	mysql.CreateDBInstance()
-
-	//conn = stc.CreateConnection("192.168.17.14", 6099)
-	conn = stc.CreateConnection("10.0.11.11", 10100) //172.16.0.11
-	if conn != nil {
-
-		go action.StartAMI(func() {
-			lfshook.NewLogger().Info("ami callback")
-			// 首次连接才进行初始化
-		}, []func(event map[string]string){}, conn)
-
-		go stc.StartStcConnection(conn) //connect
-	} else {
-		conn = stc.CreateConnection("10.0.11.11", 10100)
-		lfshook.NewLogger().Info("reconnect")
-	}
-
+	// 启动带有重连机制的连接管理协程
+	go stc.StartStcConnection("10.0.11.11", 10100)
+	// 启动其他服务...
+	go action.StartAMI(func() {
+		lfshook.NewLogger().Info("ami callback")
+		// 首次连接才进行初始化
+	}, []func(event map[string]string){}, conn)
 }

+ 50 - 40
internal/app/stc/index.go

@@ -2,33 +2,52 @@ package stc
 
 import (
 	"fmt"
-	"log"
 	"net"
 	"pbx-api-gin/internal/app/stc/broadcast"
 	msgdata "pbx-api-gin/internal/app/stc/data"
+	"sync"
 
 	"syscall"
 	"time"
 )
 
-func StartStcConnection(conn net.Conn) {
+func StartStcConnection(ServerAddr string, Port int) {
+	//var conn net.Conn
+	var wg sync.WaitGroup
 
-	//read msg
-	go func(conn net.Conn) {
-		broadcast.HandleStcCmd(conn)
+	for {
+		// 尝试建立连接
+		conn, err := CreateConnection(ServerAddr, Port)
+		if err != nil {
+			fmt.Printf("连接失败 %s:%d,将在5秒后重试...\n", ServerAddr, Port)
+			time.Sleep(5 * time.Second)
+			continue
+		}
 
-	}(conn)
+		fmt.Printf("成功连接到 %s:%d\n", ServerAddr, Port)
 
-	//heartbeat
-	go func(conn net.Conn) {
-		Sendheartbeat(conn)
-	}(conn)
+		// 启动消息处理和心跳协程
+		wg.Add(2)
+		go func() {
+			defer wg.Done()
+			broadcast.HandleStcCmd(conn) // 处理消息
+		}()
+		go func() {
+			defer wg.Done()
+			Sendheartbeat(conn) // 发送心跳
+		}()
 
+		// 等待连接断开
+		wg.Wait()
+		fmt.Println("检测到连接断开,准备重新连接...")
+		conn.Close()                // 显式关闭旧连接
+		time.Sleep(time.Second * 1) // 可选:断开后等待1秒再重试
+	}
 }
 
-func CreateConnection(ServerAddr string, Port int) net.Conn {
-	// connect server
-	fmt.Println("==========conn server============")
+// 返回错误而不是终止程序
+func CreateConnection(ServerAddr string, Port int) (net.Conn, error) {
+	fmt.Println("========== conn server ==========")
 	conn, err := net.DialTCP("tcp", &net.TCPAddr{
 		IP:   net.ParseIP("0.0.0.0"),
 		Port: 10201,
@@ -37,44 +56,35 @@ func CreateConnection(ServerAddr string, Port int) net.Conn {
 		Port: Port,
 	})
 	if err != nil {
-		fmt.Println("Error conn server:", err)
-		log.Fatal(err)
-		return nil
+		return nil, err
 	}
-
 	fileDesc, _ := conn.File()
 	fd := fileDesc.Fd()
-
 	syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
-	return conn
+	return conn, nil
 }
-
 func Sendheartbeat(conn net.Conn) {
 	var count uint8
-	//init heartbeat data  ============================
-	//return
 	protocol := msgdata.NewProtocol()
-	protocol.SourceID = 0x02
-	protocol.DestinationID = 0x01
-	protocol.MessageID = 0x21
-	protocol.DataLength = 0x04
-
+	// 初始化协议...
 	ticker := time.NewTicker(2 * time.Second)
 	defer ticker.Stop()
-	for range ticker.C {
 
-		count = count + 1
-		protocol.Data = []byte{count, 0x00, 0x00, 0x00}
-		encoded, errEn := protocol.Encode()
-		if errEn != nil {
-			fmt.Println("Encode error:", errEn)
-			return
-		}
-
-		_, err := conn.Write(encoded)
-		if err != nil {
-			fmt.Println("send heartbeat err:", err)
-			return
+	for {
+		select {
+		case <-ticker.C:
+			count++
+			// 编码并发送数据...
+			encoded, err := protocol.Encode()
+			if err != nil {
+				fmt.Printf("编码失败: %v\n", err)
+				return
+			}
+			_, err = conn.Write(encoded)
+			if err != nil {
+				fmt.Printf("发送心跳失败: %v\n", err)
+				return // 触发重连
+			}
 		}
 	}
 }