dujunchen 1 týždeň pred
rodič
commit
b8c0c78e3e

+ 10 - 2
internal/app/ami/action/index.go

@@ -3,6 +3,7 @@ package action
 import (
 	"net"
 	alstatus "pbx-api-gin/internal/app/stc/sendstatus"
+	"pbx-api-gin/internal/app/stc/socket"
 	"pbx-api-gin/internal/pkg/configs"
 	"pbx-api-gin/pkg/lfshook"
 	"pbx-api-gin/pkg/utils"
@@ -27,6 +28,12 @@ const pacu8 = "2181"
 
 func HandleAMI(event map[string]string, conn net.Conn) {
 
+	conn = socket.Conn
+	if conn == nil {
+		lfshook.NewLogger().Infof("===HandleAMI===conn==nil=")
+		return
+	}
+
 	switch event["Event"] {
 	case "DialBegin":
 		lfshook.NewLogger().Infof("=========%s", event["Event"])
@@ -99,7 +106,7 @@ func HandleAMI(event map[string]string, conn net.Conn) {
 	}
 }
 
-func StartAMI(connectOKCallBack func(), handleEvents []func(event map[string]string), conn net.Conn) {
+func StartAMI(connectOKCallBack func(), handleEvents []func(event map[string]string)) {
 	lfshook.NewLogger().Info("Start AMI")
 	settings := &amigo.Settings{
 		Host:     configs.ConfigGlobal.AsteriskAMIHost,
@@ -112,7 +119,8 @@ func StartAMI(connectOKCallBack func(), handleEvents []func(event map[string]str
 	AminInstance.EventOn(func(payload ...interface{}) {
 		// lfshook.NewLogger().Infof("ami event on %+v", payload[0])
 		event := payload[0].(map[string]string)
-		HandleAMI(event, conn)
+
+		HandleAMI(event, socket.Conn)
 
 		for _, handle := range handleEvents {
 			go handle(event)

+ 10 - 4
internal/app/index.go

@@ -1,16 +1,22 @@
 package app
 
 import (
-	"net"
+	"pbx-api-gin/internal/app/ami/action"
 	"pbx-api-gin/internal/app/mysql"
 	"pbx-api-gin/internal/app/stc"
+	"pbx-api-gin/internal/app/stc/socket"
+	"pbx-api-gin/pkg/lfshook"
 )
 
-var conn net.Conn
-
 func StartApp() {
 	mysql.CreateDBInstance()
 	// 启动带有重连机制的连接管理协程
-	go stc.StartStcConnection(conn)
+	go stc.StartStcConnection(socket.Conn)
 	// 启动其他服务...
+	// 启动 AMI
+	go func() {
+		action.StartAMI(func() {
+			lfshook.NewLogger().Info("ami callback")
+		}, []func(event map[string]string){})
+	}()
 }

+ 29 - 21
internal/app/stc/broadcast/stc-broadcast.go

@@ -2,6 +2,7 @@ package broadcast
 
 import (
 	"bytes"
+	"context"
 	"fmt"
 	"io"
 	"net"
@@ -18,32 +19,39 @@ import (
 
 var Pacus = []string{"2111", "2121", "2131", "2141", "2151", "2161", "2171", "2181"}
 
-func HandleStcCmd(conn net.Conn) {
-	var buf bytes.Buffer // 用于累积未处理完的数据流
-	tmp := make([]byte, 1024)
+func HandleStcCmd(ctx context.Context, conn net.Conn) {
 
 	for {
-		if conn != nil {
-			n, err := conn.Read(tmp)
-			if err != nil {
-				if err != io.EOF {
-					fmt.Println("Error reading from server:", err)
-					conn.Close()
+		select {
+		case <-ctx.Done():
+			return
+
+		default:
+			var buf bytes.Buffer // 用于累积未处理完的数据流
+			tmp := make([]byte, 1024)
+
+			if conn != nil {
+				n, err := conn.Read(tmp)
+				if err != nil {
+					if err != io.EOF {
+						fmt.Println("Error reading from server:", err)
+						conn.Close()
+					}
+					return
 				}
-				return
-			}
 
-			// 将新读取的数据追加到缓冲区
-			buf.Write(tmp[:n])
-		}
-		// 尝试从缓冲区中提取完整数据包
-		for {
-			packet, err := msgdata.ExtractPacket(&buf)
-			if err != nil {
-				break // 没有完整包或出错,等待更多数据
+				// 将新读取的数据追加到缓冲区
+				buf.Write(tmp[:n])
+			}
+			// 尝试从缓冲区中提取完整数据包
+			for {
+				packet, err := msgdata.ExtractPacket(&buf)
+				if err != nil {
+					break // 没有完整包或出错,等待更多数据
+				}
+				// 成功提取一个包,进行处理
+				go processPacket(packet) // 使用 goroutine 避免阻塞接收
 			}
-			// 成功提取一个包,进行处理
-			go processPacket(packet) // 使用 goroutine 避免阻塞接收
 		}
 	}
 }

+ 43 - 21
internal/app/stc/index.go

@@ -1,11 +1,12 @@
 package stc
 
 import (
+	"context"
 	"fmt"
 	"net"
-	"pbx-api-gin/internal/app/ami/action"
 	"pbx-api-gin/internal/app/stc/broadcast"
 	msgdata "pbx-api-gin/internal/app/stc/data"
+	"pbx-api-gin/internal/app/stc/socket"
 	"pbx-api-gin/pkg/lfshook"
 	"sync"
 	"syscall"
@@ -22,38 +23,56 @@ const RemoteAddr = "10.0.11.11"
 
 func StartStcConnection(conn net.Conn) {
 	//var conn net.Conn
-	var wg sync.WaitGroup
-	var err error
+	//var wg sync.WaitGroup
+	var connMux sync.Mutex // 保护 conn 的读写
+	//var err error
 	for {
 		// 尝试建立连接
-		conn, err = CreateConnection()
+		conn1, err := CreateConnection()
 		if err != nil {
 			time.Sleep(2 * time.Second)
 			continue
 		}
+		connMux.Lock()
+		oldConn := conn
+		socket.Conn = conn1
+		connMux.Unlock()
+
+		// 关闭旧连接(如果存在)
+		if oldConn != nil {
+			oldConn.Close()
+			lfshook.NewLogger().Logger.Infof("Closed previous connection")
+		}
+
+		// 使用 context 控制所有协程的生命周期
+		ctx, cancel := context.WithCancel(context.Background())
 		lfshook.NewLogger().Logger.Infof("Connect success :%s:%d", RemoteAddr, RemotePort)
 		// 启动消息处理和心跳协程
-		wg.Add(2)
+		// 启动消息处理
 		go func() {
-			defer wg.Done()
-			broadcast.HandleStcCmd(conn) // 处理消息
+			defer func() {
+				cancel() // 一旦任一协程退出,取消所有
+			}()
+			broadcast.HandleStcCmd(ctx, conn1) // 改造 HandleStcCmd 接收 ctx
 		}()
+
+		// 启动心跳
 		go func() {
-			defer wg.Done()
-			Sendheartbeat(conn) // 发送心跳
+			defer func() {
+				cancel()
+			}()
+			Sendheartbeat(ctx, conn1) // 改造 Sendheartbeat 接收 ctx
 		}()
 
-		//start AMI
-		go action.StartAMI(func() {
-			lfshook.NewLogger().Info("ami callback")
-			// 首次连接才进行初始化
-		}, []func(event map[string]string){}, conn)
-
-		// 等待连接断开
-		wg.Wait()
-		fmt.Println("Start Reconnect ...")
-		conn.Close()                // 显式关闭旧连接
-		time.Sleep(time.Second * 1) // 可选:断开后等待1秒再重试
+		// 等待连接断开(监听连接状态)
+		<-ctx.Done()
+
+		// 连接已断开,清理
+		cancel() // 确保所有 cancel 被调用
+		conn1.Close()
+
+		lfshook.NewLogger().Logger.Info("Reconnecting in 1 second...")
+		time.Sleep(time.Second) // 重连前等待
 	}
 }
 
@@ -83,7 +102,7 @@ func controlTCPConn(network, address string, c syscall.RawConn) error {
 	})
 }
 
-func Sendheartbeat(conn net.Conn) {
+func Sendheartbeat(ctx context.Context, conn net.Conn) {
 	var count uint8
 	protocol := msgdata.NewProtocol()
 	// 初始化协议...
@@ -92,6 +111,9 @@ func Sendheartbeat(conn net.Conn) {
 
 	for {
 		select {
+		case <-ctx.Done():
+			return
+
 		case <-ticker.C:
 			count++
 			// 编码并发送数据...

+ 5 - 0
internal/app/stc/socket/index.go

@@ -0,0 +1,5 @@
+package socket
+
+import "net"
+
+var Conn net.Conn