完善登录流程的错误处理

This commit is contained in:
flswld
2023-02-11 21:08:44 +08:00
parent 4380aea9aa
commit b5b5cf59a5
17 changed files with 253 additions and 144 deletions

View File

@@ -6,6 +6,7 @@ import (
"net"
"strconv"
"strings"
"time"
"hk4e/common/config"
"hk4e/common/rpc"
@@ -25,16 +26,17 @@ import (
// 要用RPC有专门的NATSRPC
type MessageQueue struct {
natsConn *nats.Conn
natsMsgChan chan *nats.Msg
netMsgInput chan *NetMsg
netMsgOutput chan *NetMsg
cmdProtoMap *cmd.CmdProtoMap
serverType string
appId string
gateTcpMqChan chan []byte
gateTcpMqEventChan chan *GateTcpMqEvent
rpcClient *rpc.Client
natsConn *nats.Conn
natsMsgChan chan *nats.Msg
netMsgInput chan *NetMsg
netMsgOutput chan *NetMsg
cmdProtoMap *cmd.CmdProtoMap
serverType string
appId string
gateTcpMqChan chan []byte
gateTcpMqEventChan chan *GateTcpMqEvent
gateTcpMqDeadEventChan chan string
rpcClient *rpc.Client
}
func NewMessageQueue(serverType string, appId string, rpcClient *rpc.Client) (r *MessageQueue) {
@@ -63,11 +65,12 @@ func NewMessageQueue(serverType string, appId string, rpcClient *rpc.Client) (r
r.appId = appId
r.gateTcpMqChan = make(chan []byte, 1000)
r.gateTcpMqEventChan = make(chan *GateTcpMqEvent, 1000)
r.gateTcpMqDeadEventChan = make(chan string, 1000)
r.rpcClient = rpcClient
if serverType == api.GATE {
r.initGateTcpMqServer()
go r.runGateTcpMqServer()
} else {
r.initGateTcpMqClient()
go r.runGateTcpMqClient()
}
go r.recvHandler()
go r.sendHandler()
@@ -75,6 +78,13 @@ func NewMessageQueue(serverType string, appId string, rpcClient *rpc.Client) (r
}
func (m *MessageQueue) Close() {
// 等待所有待发送的消息发送完毕
for {
if len(m.netMsgInput) == 0 {
break
}
time.Sleep(time.Millisecond * 100)
}
m.natsConn.Close()
}
@@ -98,6 +108,10 @@ func (m *MessageQueue) recvHandler() {
logger.Error("parse bin to net msg error: %v", err)
continue
}
// 忽略自己发出的广播消息
if netMsg.OriginServerType == m.serverType && netMsg.OriginServerAppId == m.appId {
continue
}
switch netMsg.MsgType {
case MsgTypeGame:
gameMsg := netMsg.GameMsg
@@ -168,9 +182,15 @@ func (m *MessageQueue) sendHandler() {
return
}
}
// 广播消息只能走nats
if netMsg.ServerType == "ALL_SERVER_HK4E" {
fallbackNatsMqSend()
continue
}
// 有tcp快速通道就走快速通道
instMap, exist := gateTcpMqInstMap[netMsg.ServerType]
if !exist {
logger.Error("unknown server type: %v", netMsg.ServerType)
fallbackNatsMqSend()
continue
}
@@ -204,6 +224,7 @@ func (m *MessageQueue) sendHandler() {
case EventDisconnect:
logger.Warn("gate tcp mq disconnect, addr: %v, server type: %v, appid: %v", inst.conn.RemoteAddr().String(), inst.serverType, inst.appId)
delete(gateTcpMqInstMap[inst.serverType], inst.appId)
m.gateTcpMqDeadEventChan <- inst.conn.RemoteAddr().String()
}
}
}
@@ -225,7 +246,7 @@ type GateTcpMqEvent struct {
inst *GateTcpMqInst
}
func (m *MessageQueue) initGateTcpMqServer() {
func (m *MessageQueue) runGateTcpMqServer() {
addr, err := net.ResolveTCPAddr("tcp4", "0.0.0.0:"+strconv.Itoa(int(config.CONF.Hk4e.GateTcpMqPort)))
if err != nil {
logger.Error("gate tcp mq parse port error: %v", err)
@@ -236,17 +257,13 @@ func (m *MessageQueue) initGateTcpMqServer() {
logger.Error("gate tcp mq listen error: %v", err)
return
}
go m.gateTcpMqAcceptHandle(listener)
}
func (m *MessageQueue) gateTcpMqAcceptHandle(listener *net.TCPListener) {
for {
conn, err := listener.Accept()
if err != nil {
logger.Error("gate tcp mq accept error: %v", err)
return
}
logger.Info("server connect gate tcp mq, addr: %v", conn.RemoteAddr().String())
logger.Info("accept gate tcp mq, server addr: %v", conn.RemoteAddr().String())
go m.gateTcpMqHandshake(conn)
}
}
@@ -296,14 +313,37 @@ func (m *MessageQueue) gateTcpMqHandshake(conn net.Conn) {
}
}
func (m *MessageQueue) initGateTcpMqClient() {
func (m *MessageQueue) runGateTcpMqClient() {
// 已存在的GATE连接列表
gateServerConnAddrMap := make(map[string]bool)
m.gateTcpMqConn(gateServerConnAddrMap)
ticker := time.NewTicker(time.Minute)
for {
select {
case addr := <-m.gateTcpMqDeadEventChan:
// GATE连接断开
delete(gateServerConnAddrMap, addr)
case <-ticker.C:
// 定时获取全部GATE实例地址并建立连接
m.gateTcpMqConn(gateServerConnAddrMap)
}
}
}
func (m *MessageQueue) gateTcpMqConn(gateServerConnAddrMap map[string]bool) {
rsp, err := m.rpcClient.Discovery.GetAllGateServerInfoList(context.TODO(), new(api.NullMsg))
if err != nil {
logger.Error("gate tcp mq get gate list error: %v", err)
return
}
for _, gateServerInfo := range rsp.GateServerInfoList {
addr, err := net.ResolveTCPAddr("tcp4", gateServerInfo.MqAddr+":"+strconv.Itoa(int(gateServerInfo.MqPort)))
gateServerAddr := gateServerInfo.MqAddr + ":" + strconv.Itoa(int(gateServerInfo.MqPort))
_, exist := gateServerConnAddrMap[gateServerAddr]
// GATE连接已存在
if exist {
continue
}
addr, err := net.ResolveTCPAddr("tcp4", gateServerAddr)
if err != nil {
logger.Error("gate tcp mq parse addr error: %v", err)
return
@@ -323,11 +363,13 @@ func (m *MessageQueue) initGateTcpMqClient() {
serverType: api.GATE,
appId: gateServerInfo.AppId,
}
go m.gateTcpMqRecvHandle(inst)
m.gateTcpMqEventChan <- &GateTcpMqEvent{
event: EventConnect,
inst: inst,
}
gateServerConnAddrMap[gateServerAddr] = true
logger.Info("connect gate tcp mq, gate addr: %v", conn.RemoteAddr().String())
go m.gateTcpMqRecvHandle(inst)
}
}

View File

@@ -17,9 +17,9 @@ func (m *MessageQueue) getTopic(serverType string, appId string) string {
func (m *MessageQueue) SendToGate(appId string, netMsg *NetMsg) {
netMsg.Topic = m.getTopic(api.GATE, appId)
originServerType, originServerAppId := m.getOriginServer()
netMsg.ServerType = api.GATE
netMsg.AppId = appId
originServerType, originServerAppId := m.getOriginServer()
netMsg.OriginServerType = originServerType
netMsg.OriginServerAppId = originServerAppId
m.netMsgInput <- netMsg
@@ -27,9 +27,9 @@ func (m *MessageQueue) SendToGate(appId string, netMsg *NetMsg) {
func (m *MessageQueue) SendToGs(appId string, netMsg *NetMsg) {
netMsg.Topic = m.getTopic(api.GS, appId)
originServerType, originServerAppId := m.getOriginServer()
netMsg.ServerType = api.GS
netMsg.AppId = appId
originServerType, originServerAppId := m.getOriginServer()
netMsg.OriginServerType = originServerType
netMsg.OriginServerAppId = originServerAppId
m.netMsgInput <- netMsg
@@ -37,9 +37,9 @@ func (m *MessageQueue) SendToGs(appId string, netMsg *NetMsg) {
func (m *MessageQueue) SendToFight(appId string, netMsg *NetMsg) {
netMsg.Topic = m.getTopic(api.FIGHT, appId)
originServerType, originServerAppId := m.getOriginServer()
netMsg.ServerType = api.FIGHT
netMsg.AppId = appId
originServerType, originServerAppId := m.getOriginServer()
netMsg.OriginServerType = originServerType
netMsg.OriginServerAppId = originServerAppId
m.netMsgInput <- netMsg
@@ -47,9 +47,9 @@ func (m *MessageQueue) SendToFight(appId string, netMsg *NetMsg) {
func (m *MessageQueue) SendToPathfinding(appId string, netMsg *NetMsg) {
netMsg.Topic = m.getTopic(api.PATHFINDING, appId)
originServerType, originServerAppId := m.getOriginServer()
netMsg.ServerType = api.PATHFINDING
netMsg.AppId = appId
originServerType, originServerAppId := m.getOriginServer()
netMsg.OriginServerType = originServerType
netMsg.OriginServerAppId = originServerAppId
m.netMsgInput <- netMsg
@@ -57,6 +57,7 @@ func (m *MessageQueue) SendToPathfinding(appId string, netMsg *NetMsg) {
func (m *MessageQueue) SendToAll(netMsg *NetMsg) {
netMsg.Topic = "ALL_SERVER_HK4E"
netMsg.ServerType = "ALL_SERVER_HK4E"
originServerType, originServerAppId := m.getOriginServer()
netMsg.OriginServerType = originServerType
netMsg.OriginServerAppId = originServerAppId