1
0

4 Commits a478177e06 ... fe20659536

Autor SHA1 Nachricht Datum
  dujunchen fe20659536 fix PACU tag vor 1 Woche
  dujunchen b8c0c78e3e fix reconnect vor 1 Woche
  dujunchen 68c166e9ca fix vor 1 Woche
  dujunchen 27d3dbdd76 add api for alarm status ; vor 1 Woche

+ 1 - 1
cmd/main.go

@@ -21,7 +21,7 @@ func main() {
 	initVersion()
 
 	// 解析配置文件
-	configs.ConfigPath = "./configs/config.yaml"
+	configs.ConfigPath = "/data/test/configs/config.yaml"
 	configs.DecodeConfig()
 
 	configs.ConfigGlobal.LogLevel = logrus.InfoLevel

+ 2 - 1
internal/app/ami/action/call.go

@@ -50,7 +50,8 @@ func Dial(src, dst, dialrule, callerID, callerName string, callType string) {
 func ChanSpy(src, dst string, whisper, bargein bool) {
 	lfshook.NewLogger().Infof("chan spy src:%s dst:%s", src, dst)
 
-	channel := fmt.Sprintf("%s/%s", utils.DialPrefix, dst)
+	//channel := fmt.Sprintf("%s/%s", utils.DialPrefix, dst)
+	channel := fmt.Sprintf("Local/%s@aio-rule", dst)
 	data := fmt.Sprintf("%s/%s,qBE", utils.DialPrefix, src)
 	/*
 		if whisper {

+ 10 - 2
internal/app/ami/action/index.go

@@ -3,6 +3,7 @@ package action
 import (
 	"net"
 	alstatus "pbx-api-gin/internal/app/stc/sendstatus"
+	"pbx-api-gin/internal/app/stc/socket"
 	"pbx-api-gin/internal/pkg/configs"
 	"pbx-api-gin/pkg/lfshook"
 	"pbx-api-gin/pkg/utils"
@@ -27,6 +28,12 @@ const pacu8 = "2181"
 
 func HandleAMI(event map[string]string, conn net.Conn) {
 
+	conn = socket.Conn
+	if conn == nil {
+		lfshook.NewLogger().Infof("===HandleAMI===conn==nil=")
+		return
+	}
+
 	switch event["Event"] {
 	case "DialBegin":
 		lfshook.NewLogger().Infof("=========%s", event["Event"])
@@ -99,7 +106,7 @@ func HandleAMI(event map[string]string, conn net.Conn) {
 	}
 }
 
-func StartAMI(connectOKCallBack func(), handleEvents []func(event map[string]string), conn net.Conn) {
+func StartAMI(connectOKCallBack func(), handleEvents []func(event map[string]string)) {
 	lfshook.NewLogger().Info("Start AMI")
 	settings := &amigo.Settings{
 		Host:     configs.ConfigGlobal.AsteriskAMIHost,
@@ -112,7 +119,8 @@ func StartAMI(connectOKCallBack func(), handleEvents []func(event map[string]str
 	AminInstance.EventOn(func(payload ...interface{}) {
 		// lfshook.NewLogger().Infof("ami event on %+v", payload[0])
 		event := payload[0].(map[string]string)
-		HandleAMI(event, conn)
+
+		HandleAMI(event, socket.Conn)
 
 		for _, handle := range handleEvents {
 			go handle(event)

+ 14 - 1
internal/app/ami/action/playback.go

@@ -9,7 +9,20 @@ import (
 func PlaybackPacu(filename string, count int, delay int, PaType string) (err error) {
 
 	Para := fmt.Sprintf("count=%d,filename=%s,delay=%d,priority=%s", count, strings.Replace(filename, ".wav", "", -1), delay, PaType)
-	Chan := "Local/0500" //paging pacu
+	Chan := ""
+
+	switch PaType {
+	case "STN":
+		Chan = "Local/0503" //spa-rule
+	case "SPC":
+		Chan = "Local/0505" //spc-rule
+	case "DCS":
+		Chan = "Local/0504" //dc-rule
+	case "CHK":
+		Chan = "Local/0503" //paging pacu
+	case "EMG":
+		Chan = "Local/0502" //emg-rule
+	}
 
 	action := map[string]string{
 		"Action":   "Originate",

+ 10 - 4
internal/app/index.go

@@ -1,16 +1,22 @@
 package app
 
 import (
-	"net"
+	"pbx-api-gin/internal/app/ami/action"
 	"pbx-api-gin/internal/app/mysql"
 	"pbx-api-gin/internal/app/stc"
+	"pbx-api-gin/internal/app/stc/socket"
+	"pbx-api-gin/pkg/lfshook"
 )
 
-var conn net.Conn
-
 func StartApp() {
 	mysql.CreateDBInstance()
 	// 启动带有重连机制的连接管理协程
-	go stc.StartStcConnection(conn)
+	go stc.StartStcConnection(socket.Conn)
 	// 启动其他服务...
+	// 启动 AMI
+	go func() {
+		action.StartAMI(func() {
+			lfshook.NewLogger().Info("ami callback")
+		}, []func(event map[string]string){})
+	}()
 }

+ 80 - 81
internal/app/stc/broadcast/stc-broadcast.go

@@ -2,46 +2,57 @@ package broadcast
 
 import (
 	"bytes"
+	"context"
 	"fmt"
 	"io"
 	"net"
+	"net/http"
 	"pbx-api-gin/internal/app/ami/action"
 	"pbx-api-gin/internal/app/ami/model"
 	"pbx-api-gin/internal/app/mysql"
 	msgdata "pbx-api-gin/internal/app/stc/data"
 	"pbx-api-gin/pkg/lfshook"
+	"strings"
 	"sync"
 	"time"
 )
 
 var Pacus = []string{"2111", "2121", "2131", "2141", "2151", "2161", "2171", "2181"}
 
-func HandleStcCmd(conn net.Conn) {
-	var buf bytes.Buffer // 用于累积未处理完的数据流
-	tmp := make([]byte, 1024)
+func HandleStcCmd(ctx context.Context, conn net.Conn) {
 
 	for {
-		if conn != nil {
-			n, err := conn.Read(tmp)
-			if err != nil {
-				if err != io.EOF {
-					fmt.Println("Error reading from server:", err)
-					conn.Close()
+		select {
+		case <-ctx.Done():
+			lfshook.NewLogger().Logger.Infof("HandleStcCmd===ctx==ret======")
+			return
+
+		default:
+			var buf bytes.Buffer // 用于累积未处理完的数据流
+			tmp := make([]byte, 1024)
+
+			if conn != nil {
+				n, err := conn.Read(tmp)
+				if err != nil {
+					if err != io.EOF {
+						fmt.Println("Error reading from server:", err)
+						conn.Close()
+					}
+					return
 				}
-				return
-			}
 
-			// 将新读取的数据追加到缓冲区
-			buf.Write(tmp[:n])
-		}
-		// 尝试从缓冲区中提取完整数据包
-		for {
-			packet, err := msgdata.ExtractPacket(&buf)
-			if err != nil {
-				break // 没有完整包或出错,等待更多数据
+				// 将新读取的数据追加到缓冲区
+				buf.Write(tmp[:n])
+			}
+			// 尝试从缓冲区中提取完整数据包
+			for {
+				packet, err := msgdata.ExtractPacket(&buf)
+				if err != nil {
+					break // 没有完整包或出错,等待更多数据
+				}
+				// 成功提取一个包,进行处理
+				go processPacket(packet) // 使用 goroutine 避免阻塞接收
 			}
-			// 成功提取一个包,进行处理
-			go processPacket(packet) // 使用 goroutine 避免阻塞接收
 		}
 	}
 }
@@ -83,73 +94,25 @@ func processPacket(packet []byte) {
 	}
 }
 
-/*
-func HandleStcCmd(conn net.Conn) {
-
-	for {
-		buffer := make([]byte, 1024)
-		n, err := conn.Read(buffer)
-		if err != nil {
-			fmt.Println("Error reading from server:", err)
-			return
-		}
-		if buffer[5] != 0x01 {
-			lfshook.NewLogger().Logger.Infof("Get data from STC ===============:%x", buffer[:n])
-		}
-
-		switch buffer[5] {
-		case 0x01: //heartbeat
-
-		case 0x02: //STN
-			StationAnn(buffer)
-
-		case 0x03: //ACTIVE
-			Active([1]byte{buffer[8]})
-
-		case 0x05: //SPC
-			SpecialAnn(buffer)
-
-		case 0x06: //EMG
-			EmgMsg(buffer)
-
-		case 0x07: //STOP
-			AnnStop([4]byte{buffer[8], buffer[9], buffer[10], buffer[11]})
-
-		case 0x08: //DCS
-			DcsAnn(buffer)
-
-		case 0x09: //SELF CHECK
-			SelfCheck(buffer)
-
-		case 0x0a: //
-			AlarmHandle(buffer)
-
-		case 0x0b: //
-			AlarmResetAll()
-
-		case 0x0c: //
-			RecordStorageConf(buffer[8:])
-		}
-	}
-}
-*/
 // STN , 自动报站广播
 func StationAnn(data []byte) (err error) {
 
-	specialVoice := int(data[8])
+	//specialVoice := int(data[8])
 	delay := data[9]
 	cycleCount := data[10]
 	datalen := int(data[11])
 
 	filename := msgdata.SubstrByRune(string(data[12:]), 0, datalen-4)
-
-	//update special voice
-	_, er := mysql.DBOrmInstance.In("exten", Pacus).Update(&model.Extension{Special: specialVoice, PaType: "STN"})
-	if er != nil {
-		lfshook.NewLogger().Logger.Infof("update special voice to exten err : %+v", er.Error())
-		return er
-	}
-
+	filename = strings.Split(filename, ".")[0]
+	lfshook.NewLogger().Logger.Infof("=============Get filename  : %v", filename)
+	/*
+		//update special voice
+		_, er := mysql.DBOrmInstance.In("exten", Pacus).Update(&model.Extension{Special: specialVoice, PaType: "STN"})
+		if er != nil {
+			lfshook.NewLogger().Logger.Infof("update special voice to exten err : %+v", er.Error())
+			return er
+		}
+	*/
 	action.PlaybackPacu(filename, int(cycleCount), int(delay), "STN")
 	return nil
 }
@@ -323,7 +286,7 @@ func AlarmHandle(data []byte) {
 
 	switch handler {
 	case 0x01: //answer(ICP+Alarm+PACU)
-
+		//NotifyPaiu(exten, "answer")
 		err := action.RedirectInQueue(exten, "0402", "ani-rule", "1") // 1车ICP接听
 		if err != nil {
 			lfshook.NewLogger().Logger.Infof("================ICP Answer PAD====ERR============ : %+v", err.Error())
@@ -344,16 +307,52 @@ func AlarmHandle(data []byte) {
 		//action.ChanSpy("PACU", exten, false, true)
 		lfshook.NewLogger().Logger.Infof("================ICP Answer PAD================:%s ", exten)
 	case 0x02: //hold  重新放回队列里面
+		NotifyPaiu(exten, "hold")
+
 		err := action.RedirectInQueue(exten, "0300", "default", "1")
 		if err != nil {
 			lfshook.NewLogger().Info(err)
 		}
 
 	case 0x03: //hangup
+		//NotifyPaiu(exten, "hangup")
 		action.Hangup(exten)
 	}
 }
 
+// 挂断所有报警器
+func NotifyPaiu(Exten, Action string) {
+	url := ""
+	switch Action {
+	case "answer":
+		url = fmt.Sprintf("http://10.0.24.%s/api/sipphone?action=answer", Exten[2:])
+	case "hold":
+		url = fmt.Sprintf("http://10.0.24.%s/api/sipphone?action=hold", Exten[2:])
+	case "hangup":
+		url = fmt.Sprintf("http://10.0.24.%s/api/sipphone?action=hangup", Exten[2:])
+	}
+
+	lfshook.NewLogger().Logger.Infof("======Notify PAIU Alarm====:%+v ", url)
+	resp, err := http.Get(url)
+	if err != nil {
+		lfshook.NewLogger().Logger.Infof("======Notify PAIU Alarm====:%+v ", err)
+		return
+	}
+	defer resp.Body.Close()
+	/*
+	   body, err := io.ReadAll(resp.Body)
+
+	   	if err != nil {
+	   		// 读取数据错误
+	   		lfshook.NewLogger().Warn("ioutil ReadAll failed :", err.Error())
+	   		return
+	   	}
+
+	   fmt.Printf("状态码: %d\n", resp.StatusCode)
+	   fmt.Printf("响应内容: %s\n", body)
+	*/
+}
+
 // 挂断所有报警器
 func AlarmResetAll() {
 

+ 47 - 21
internal/app/stc/index.go

@@ -1,11 +1,12 @@
 package stc
 
 import (
+	"context"
 	"fmt"
 	"net"
-	"pbx-api-gin/internal/app/ami/action"
 	"pbx-api-gin/internal/app/stc/broadcast"
 	msgdata "pbx-api-gin/internal/app/stc/data"
+	"pbx-api-gin/internal/app/stc/socket"
 	"pbx-api-gin/pkg/lfshook"
 	"sync"
 	"syscall"
@@ -15,42 +16,63 @@ import (
 
 const RemotePort = 10100
 const LocalPort = 10201
+
+//const RemoteAddr = "192.168.17.14"
+
 const RemoteAddr = "10.0.11.11"
 
 func StartStcConnection(conn net.Conn) {
 	//var conn net.Conn
-	var wg sync.WaitGroup
-
+	//var wg sync.WaitGroup
+	var connMux sync.Mutex // 保护 conn 的读写
+	//var err error
 	for {
 		// 尝试建立连接
-		conn, err := CreateConnection()
+		conn1, err := CreateConnection()
 		if err != nil {
 			time.Sleep(2 * time.Second)
 			continue
 		}
+		connMux.Lock()
+		oldConn := conn
+		socket.Conn = conn1
+		connMux.Unlock()
+
+		// 关闭旧连接(如果存在)
+		if oldConn != nil {
+			oldConn.Close()
+			lfshook.NewLogger().Logger.Infof("Closed previous connection")
+		}
+
+		// 使用 context 控制所有协程的生命周期
+		ctx, cancel := context.WithCancel(context.Background())
 		lfshook.NewLogger().Logger.Infof("Connect success :%s:%d", RemoteAddr, RemotePort)
 		// 启动消息处理和心跳协程
-		wg.Add(2)
+		// 启动消息处理
 		go func() {
-			defer wg.Done()
-			broadcast.HandleStcCmd(conn) // 处理消息
+			defer func() {
+				cancel() // 一旦任一协程退出,取消所有
+			}()
+			broadcast.HandleStcCmd(ctx, conn1) // 改造 HandleStcCmd 接收 ctx
 		}()
+
+		// 启动心跳
 		go func() {
-			defer wg.Done()
-			Sendheartbeat(conn) // 发送心跳
+			defer func() {
+				cancel()
+			}()
+			Sendheartbeat(ctx, conn1) // 改造 Sendheartbeat 接收 ctx
 		}()
 
-		//start AMI
-		go action.StartAMI(func() {
-			lfshook.NewLogger().Info("ami callback")
-			// 首次连接才进行初始化
-		}, []func(event map[string]string){}, conn)
-
-		// 等待连接断开
-		wg.Wait()
-		fmt.Println("Start Reconnect ...")
-		conn.Close()                // 显式关闭旧连接
-		time.Sleep(time.Second * 1) // 可选:断开后等待1秒再重试
+		// 等待连接断开(监听连接状态)
+		<-ctx.Done()
+
+		// 连接已断开,清理
+		cancel() // 确保所有 cancel 被调用
+		conn1.Close()
+
+		lfshook.NewLogger().Logger.Info("Reconnecting in 1 second...")
+		time.Sleep(time.Second) // 重连前等待
 	}
 }
 
@@ -80,7 +102,7 @@ func controlTCPConn(network, address string, c syscall.RawConn) error {
 	})
 }
 
-func Sendheartbeat(conn net.Conn) {
+func Sendheartbeat(ctx context.Context, conn net.Conn) {
 	var count uint8
 	protocol := msgdata.NewProtocol()
 	// 初始化协议...
@@ -89,6 +111,10 @@ func Sendheartbeat(conn net.Conn) {
 
 	for {
 		select {
+		case <-ctx.Done():
+			lfshook.NewLogger().Logger.Infof("Sendheartbeat===ctx==ret======")
+			return
+
 		case <-ticker.C:
 			count++
 			// 编码并发送数据...

+ 5 - 0
internal/app/stc/socket/index.go

@@ -0,0 +1,5 @@
+package socket
+
+import "net"
+
+var Conn net.Conn

+ 1 - 1
internal/pkg/configs/decode.go

@@ -53,7 +53,7 @@ type Config struct {
 }
 
 // ConfigPath 配置文件路径
-var ConfigPath = "./config.yaml"
+var ConfigPath = "/data/test/configs/config.yaml"
 
 // ConfigGlobal 全局配置变量
 var ConfigGlobal *Config