diff --git a/README.md b/README.md index c7ed2151..28070def 100644 --- a/README.md +++ b/README.md @@ -19,13 +19,13 @@ hk4e game server #### 启动顺序 -> 1. 启动节点服务器(仅单节点 有状态) `cmd/node && go run .` -> 2. 启动http登录服务器(可多节点 无状态) `cmd/dispatch && go run .` +> 1. 启动节点服务器(仅单节点 有状态) `cd cmd/node && go run .` +> 2. 启动http登录服务器(可多节点 无状态) `cd cmd/dispatch && go run .` > 3. 启动网关服务器(可多节点 有状态) `cd cmd/gate && go run .` -> 4. 启动战斗服务器(可多节点 有状态) `cmd/fight && go run .` -> 5. 启动寻路服务器(可多节点 无状态) `cmd/pathfinding && go run .` +> 4. 启动战斗服务器(可多节点 有状态 非必要) `cd cmd/fight && go run .` +> 5. 启动寻路服务器(可多节点 无状态 非必要) `cd cmd/pathfinding && go run .` > 6. 启动游戏服务器(可多节点 有状态) `cd cmd/gs && go run .` -> 7. 启动游戏管理服务器(仅单节点 无状态) `cmd/gm && go run .` +> 7. 启动游戏管理服务器(仅单节点 无状态) `cd cmd/gm && go run .` #### 其它 diff --git a/cmd/gate/application.toml b/cmd/gate/application.toml index 84025f16..f1c72f6a 100644 --- a/cmd/gate/application.toml +++ b/cmd/gate/application.toml @@ -3,6 +3,8 @@ kcp_addr = "127.0.0.1" # 该地址只用来注册到节点服务器 并非网关 kcp_port = 22103 client_proto_proxy_enable = false version = "320" +gate_tcp_mq_addr = "127.0.0.1" +gate_tcp_mq_port = 9999 [logger] level = "DEBUG" diff --git a/common/config/config.go b/common/config/config.go index da44872f..414a16c0 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -45,6 +45,8 @@ type Hk4e struct { GachaHistoryServer string `toml:"gacha_history_server"` ClientProtoProxyEnable bool `toml:"client_proto_proxy_enable"` Version string `toml:"version"` + GateTcpMqAddr string `toml:"gate_tcp_mq_addr"` + GateTcpMqPort int32 `toml:"gate_tcp_mq_port"` } // MQ 消息队列 diff --git a/common/mq/nats.go b/common/mq/nats.go index f37c5abb..cc945b03 100644 --- a/common/mq/nats.go +++ b/common/mq/nats.go @@ -1,7 +1,15 @@ package mq import ( + "context" + "encoding/binary" + "net" + "strconv" + "strings" + "hk4e/common/config" + "hk4e/common/rpc" + "hk4e/node/api" "hk4e/pkg/logger" "hk4e/protocol/cmd" @@ -12,21 +20,24 @@ import ( // 用于服务器之间传输游戏协议 // 仅用于传递数据平面(client<--->server)和控制平面(server<--->server)的消息 -// 目前是全部消息都走NATS 之后可以做优化服务器之间socket直连 +// 服务器之间消息优先走tcp socket直连 tcp连接断开或不存在时降级回NATS // 请不要用这个来搞RPC写一大堆异步回调!!! // 要用RPC有专门的NATSRPC type MessageQueue struct { - natsConn *nats.Conn - natsMsgChan chan *nats.Msg - netMsgInput chan *NetMsg - netMsgOutput chan *NetMsg - cmdProtoMap *cmd.CmdProtoMap - serverType string - appId string + natsConn *nats.Conn + natsMsgChan chan *nats.Msg + netMsgInput chan *NetMsg + netMsgOutput chan *NetMsg + cmdProtoMap *cmd.CmdProtoMap + serverType string + appId string + gateTcpMqChan chan []byte + gateTcpMqEventChan chan *GateTcpMqEvent + rpcClient *rpc.Client } -func NewMessageQueue(serverType string, appId string) (r *MessageQueue) { +func NewMessageQueue(serverType string, appId string, rpcClient *rpc.Client) (r *MessageQueue) { r = new(MessageQueue) conn, err := nats.Connect(config.CONF.MQ.NatsUrl) if err != nil { @@ -52,6 +63,14 @@ func NewMessageQueue(serverType string, appId string) (r *MessageQueue) { r.appId = appId go r.recvHandler() go r.sendHandler() + r.gateTcpMqChan = make(chan []byte, 1000) + r.gateTcpMqEventChan = make(chan *GateTcpMqEvent, 1000) + r.rpcClient = rpcClient + if serverType == api.GATE { + r.initGateTcpMqServer() + } else { + r.initGateTcpMqClient() + } return r } @@ -65,10 +84,16 @@ func (m *MessageQueue) GetNetMsg() chan *NetMsg { func (m *MessageQueue) recvHandler() { for { - natsMsg := <-m.natsMsgChan + var rawData []byte = nil + select { + case natsMsg := <-m.natsMsgChan: + rawData = natsMsg.Data + case gateTcpMqMsg := <-m.gateTcpMqChan: + rawData = gateTcpMqMsg + } // msgpack NetMsg netMsg := new(NetMsg) - err := msgpack.Unmarshal(natsMsg.Data, netMsg) + err := msgpack.Unmarshal(rawData, netMsg) if err != nil { logger.Error("parse bin to net msg error: %v", err) continue @@ -76,6 +101,10 @@ func (m *MessageQueue) recvHandler() { switch netMsg.MsgType { case MsgTypeGame: gameMsg := netMsg.GameMsg + if gameMsg == nil { + logger.Error("recv game msg is nil") + continue + } if netMsg.EventId == NormalMsg { // protobuf PayloadMessage payloadMessage := m.cmdProtoMap.GetProtoObjByCmdId(gameMsg.CmdId) @@ -96,33 +125,257 @@ func (m *MessageQueue) recvHandler() { } func (m *MessageQueue) sendHandler() { + // 网关tcp连接消息收发快速通道 key1:服务器类型 key2:服务器appid value:连接实例 + gateTcpMqInstMap := map[string]map[string]*GateTcpMqInst{ + api.GATE: make(map[string]*GateTcpMqInst), + api.GS: make(map[string]*GateTcpMqInst), + api.FIGHT: make(map[string]*GateTcpMqInst), + api.PATHFINDING: make(map[string]*GateTcpMqInst), + } for { - netMsg := <-m.netMsgInput - switch netMsg.MsgType { - case MsgTypeGame: - gameMsg := netMsg.GameMsg - if gameMsg.PayloadMessageData == nil { - // protobuf PayloadMessage - payloadMessageData, err := pb.Marshal(gameMsg.PayloadMessage) - if err != nil { - logger.Error("parse payload msg to bin error: %v", err) + select { + case netMsg := <-m.netMsgInput: + switch netMsg.MsgType { + case MsgTypeGame: + gameMsg := netMsg.GameMsg + if gameMsg == nil { + logger.Error("send game msg is nil") continue } - gameMsg.PayloadMessageData = payloadMessageData + if gameMsg.PayloadMessageData == nil { + // protobuf PayloadMessage + payloadMessageData, err := pb.Marshal(gameMsg.PayloadMessage) + if err != nil { + logger.Error("parse payload msg to bin error: %v", err) + continue + } + gameMsg.PayloadMessageData = payloadMessageData + } + } + // msgpack NetMsg + netMsgData, err := msgpack.Marshal(netMsg) + if err != nil { + logger.Error("parse net msg to bin error: %v", err) + continue + } + fallbackNatsMqSend := func() { + // 找不到tcp快速通道就fallback回nats + natsMsg := nats.NewMsg(netMsg.Topic) + natsMsg.Data = netMsgData + err = m.natsConn.PublishMsg(natsMsg) + if err != nil { + logger.Error("nats publish msg error: %v", err) + return + } + } + // 有tcp快速通道就走快速通道 + instMap, exist := gateTcpMqInstMap[netMsg.ServerType] + if !exist { + fallbackNatsMqSend() + continue + } + inst, exist := instMap[netMsg.AppId] + if !exist { + fallbackNatsMqSend() + continue + } + // 前4个字节为消息的载荷部分长度 + netMsgDataTcp := make([]byte, 4+len(netMsgData)) + binary.BigEndian.PutUint32(netMsgDataTcp, uint32(len(netMsgData))) + copy(netMsgDataTcp[4:], netMsgData) + _, err = inst.conn.Write(netMsgDataTcp) + if err != nil { + // 发送失败关闭连接fallback回nats + logger.Error("gate tcp mq send error: %v", err) + _ = inst.conn.Close() + m.gateTcpMqEventChan <- &GateTcpMqEvent{ + event: EventDisconnect, + inst: inst, + } + fallbackNatsMqSend() + continue + } + case gateTcpMqEvent := <-m.gateTcpMqEventChan: + inst := gateTcpMqEvent.inst + switch gateTcpMqEvent.event { + case EventConnect: + logger.Warn("gate tcp mq connect, addr: %v, server type: %v, appid: %v", inst.conn.RemoteAddr().String(), inst.serverType, inst.appId) + gateTcpMqInstMap[inst.serverType][inst.appId] = inst + case EventDisconnect: + logger.Warn("gate tcp mq disconnect, addr: %v, server type: %v, appid: %v", inst.conn.RemoteAddr().String(), inst.serverType, inst.appId) + delete(gateTcpMqInstMap[inst.serverType], inst.appId) } - } - // msgpack NetMsg - netMsgData, err := msgpack.Marshal(netMsg) - if err != nil { - logger.Error("parse net msg to bin error: %v", err) - continue - } - natsMsg := nats.NewMsg(netMsg.Topic) - natsMsg.Data = netMsgData - err = m.natsConn.PublishMsg(natsMsg) - if err != nil { - logger.Error("nats publish msg error: %v", err) - continue } } } + +type GateTcpMqInst struct { + conn net.Conn + serverType string + appId string +} + +const ( + EventConnect = iota + EventDisconnect +) + +type GateTcpMqEvent struct { + event int + inst *GateTcpMqInst +} + +func (m *MessageQueue) initGateTcpMqServer() { + addr, err := net.ResolveTCPAddr("tcp4", "0.0.0.0:"+strconv.Itoa(int(config.CONF.Hk4e.GateTcpMqPort))) + if err != nil { + logger.Error("gate tcp mq parse port error: %v", err) + return + } + listener, err := net.ListenTCP("tcp4", addr) + if err != nil { + logger.Error("gate tcp mq listen error: %v", err) + return + } + go m.gateTcpMqAcceptHandle(listener) +} + +func (m *MessageQueue) gateTcpMqAcceptHandle(listener *net.TCPListener) { + for { + conn, err := listener.Accept() + if err != nil { + logger.Error("gate tcp mq accept error: %v", err) + return + } + logger.Info("server connect gate tcp mq, addr: %v", conn.RemoteAddr().String()) + go m.gateTcpMqHandshake(conn) + } +} + +func (m *MessageQueue) gateTcpMqHandshake(conn net.Conn) { + recvBuf := make([]byte, 1500) + recvLen, err := conn.Read(recvBuf) + if err != nil { + logger.Error("handshake packet recv error: %v", err) + return + } + recvBuf = recvBuf[:recvLen] + serverMetaData := string(recvBuf) + // 握手包格式 服务器类型@appid + split := strings.Split(serverMetaData, "@") + if len(split) != 2 { + logger.Error("handshake packet format error") + return + } + inst := &GateTcpMqInst{ + conn: conn, + serverType: "", + appId: "", + } + switch split[0] { + case api.GATE: + inst.serverType = api.GATE + case api.GS: + inst.serverType = api.GS + case api.FIGHT: + inst.serverType = api.FIGHT + case api.PATHFINDING: + inst.serverType = api.PATHFINDING + default: + logger.Error("invalid server type") + return + } + if len(split[1]) != 8 { + logger.Error("invalid appid") + return + } + inst.appId = split[1] + go m.gateTcpMqRecvHandle(inst) + m.gateTcpMqEventChan <- &GateTcpMqEvent{ + event: EventConnect, + inst: inst, + } +} + +func (m *MessageQueue) initGateTcpMqClient() { + rsp, err := m.rpcClient.Discovery.GetAllGateServerInfoList(context.TODO(), new(api.NullMsg)) + if err != nil { + logger.Error("gate tcp mq get gate list error: %v", err) + return + } + for _, gateServerInfo := range rsp.GateServerInfoList { + addr, err := net.ResolveTCPAddr("tcp4", gateServerInfo.MqAddr+":"+strconv.Itoa(int(gateServerInfo.MqPort))) + if err != nil { + logger.Error("gate tcp mq parse addr error: %v", err) + return + } + conn, err := net.DialTCP("tcp4", nil, addr) + if err != nil { + logger.Error("gate tcp mq conn error: %v", err) + return + } + _, err = conn.Write([]byte(m.serverType + "@" + m.appId)) + if err != nil { + logger.Error("gate tcp mq handshake send error: %v", err) + return + } + inst := &GateTcpMqInst{ + conn: conn, + serverType: api.GATE, + appId: gateServerInfo.AppId, + } + go m.gateTcpMqRecvHandle(inst) + m.gateTcpMqEventChan <- &GateTcpMqEvent{ + event: EventConnect, + inst: inst, + } + } +} + +func (m *MessageQueue) gateTcpMqRecvHandle(inst *GateTcpMqInst) { + dataBuf := make([]byte, 0, 1500) + for { + recvBuf := make([]byte, 1500) + recvLen, err := inst.conn.Read(recvBuf) + if err != nil { + logger.Error("gate tcp mq recv error: %v", err) + m.gateTcpMqEventChan <- &GateTcpMqEvent{ + event: EventDisconnect, + inst: inst, + } + _ = inst.conn.Close() + return + } + recvBuf = recvBuf[:recvLen] + m.gateTcpMqRecvHandleLoop(recvBuf, &dataBuf) + } +} + +func (m *MessageQueue) gateTcpMqRecvHandleLoop(data []byte, dataBuf *[]byte) { + if len(*dataBuf) != 0 { + // 取出之前的缓冲区数据 + data = append(*dataBuf, data...) + *dataBuf = make([]byte, 0, 1500) + } + // 长度太短 + if len(data) < 4 { + logger.Debug("packet len less 4 byte") + return + } + // 消息的载荷部分长度 + msgPayloadLen := binary.BigEndian.Uint32(data[0:4]) + // 检查长度 + packetLen := int(msgPayloadLen) + 4 + haveMorePacket := false + if len(data) > packetLen { + // 有不止一个包 + haveMorePacket = true + } else if len(data) < packetLen { + // 这一次没收够 放入缓冲区 + *dataBuf = append(*dataBuf, data...) + return + } + m.gateTcpMqChan <- data[4 : 4+msgPayloadLen] + if haveMorePacket { + m.gateTcpMqRecvHandleLoop(data[packetLen:], dataBuf) + } +} diff --git a/common/mq/net_msg.go b/common/mq/net_msg.go index 07917332..d4a45ee9 100644 --- a/common/mq/net_msg.go +++ b/common/mq/net_msg.go @@ -12,6 +12,8 @@ const ( type NetMsg struct { MsgType uint8 `msgpack:"MsgType"` EventId uint16 `msgpack:"EventId"` + ServerType string `msgpack:"-"` + AppId string `msgpack:"-"` Topic string `msgpack:"-"` GameMsg *GameMsg `msgpack:"GameMsg"` FightMsg *FightMsg `msgpack:"FightMsg"` diff --git a/common/mq/topic.go b/common/mq/topic.go index 5699eebe..5ca0990c 100644 --- a/common/mq/topic.go +++ b/common/mq/topic.go @@ -18,6 +18,8 @@ func (m *MessageQueue) getTopic(serverType string, appId string) string { func (m *MessageQueue) SendToGate(appId string, netMsg *NetMsg) { netMsg.Topic = m.getTopic(api.GATE, appId) originServerType, originServerAppId := m.getOriginServer() + netMsg.ServerType = api.GATE + netMsg.AppId = appId netMsg.OriginServerType = originServerType netMsg.OriginServerAppId = originServerAppId m.netMsgInput <- netMsg @@ -26,6 +28,8 @@ func (m *MessageQueue) SendToGate(appId string, netMsg *NetMsg) { func (m *MessageQueue) SendToGs(appId string, netMsg *NetMsg) { netMsg.Topic = m.getTopic(api.GS, appId) originServerType, originServerAppId := m.getOriginServer() + netMsg.ServerType = api.GS + netMsg.AppId = appId netMsg.OriginServerType = originServerType netMsg.OriginServerAppId = originServerAppId m.netMsgInput <- netMsg @@ -34,6 +38,8 @@ func (m *MessageQueue) SendToGs(appId string, netMsg *NetMsg) { func (m *MessageQueue) SendToFight(appId string, netMsg *NetMsg) { netMsg.Topic = m.getTopic(api.FIGHT, appId) originServerType, originServerAppId := m.getOriginServer() + netMsg.ServerType = api.FIGHT + netMsg.AppId = appId netMsg.OriginServerType = originServerType netMsg.OriginServerAppId = originServerAppId m.netMsgInput <- netMsg @@ -42,6 +48,8 @@ func (m *MessageQueue) SendToFight(appId string, netMsg *NetMsg) { func (m *MessageQueue) SendToPathfinding(appId string, netMsg *NetMsg) { netMsg.Topic = m.getTopic(api.PATHFINDING, appId) originServerType, originServerAppId := m.getOriginServer() + netMsg.ServerType = api.PATHFINDING + netMsg.AppId = appId netMsg.OriginServerType = originServerType netMsg.OriginServerAppId = originServerAppId m.netMsgInput <- netMsg diff --git a/dispatch/controller/dispatch_controller.go b/dispatch/controller/dispatch_controller.go index 00232fd7..d906dcf3 100644 --- a/dispatch/controller/dispatch_controller.go +++ b/dispatch/controller/dispatch_controller.go @@ -94,7 +94,7 @@ func (c *Controller) query_cur_region(context *gin.Context) { rspError() return } - regionCurrBase64 := region.GetRegionCurrBase64(addr.IpAddr, int32(addr.Port), c.ec2b) + regionCurrBase64 := region.GetRegionCurrBase64(addr.KcpAddr, int32(addr.KcpPort), c.ec2b) if version < 275 { context.Header("Content-type", "text/html; charset=UTF-8") _, _ = context.Writer.WriteString(regionCurrBase64) diff --git a/fight/app/app.go b/fight/app/app.go index b978ef79..fbd0a9c8 100644 --- a/fight/app/app.go +++ b/fight/app/app.go @@ -40,10 +40,13 @@ func Run(ctx context.Context, configFile string) error { ticker := time.NewTicker(time.Second * 15) for { <-ticker.C - _, _ = client.Discovery.KeepaliveServer(context.TODO(), &api.KeepaliveServerReq{ + _, err := client.Discovery.KeepaliveServer(context.TODO(), &api.KeepaliveServerReq{ ServerType: api.FIGHT, AppId: APPID, }) + if err != nil { + logger.Error("keepalive error: %v", err) + } } }() defer func() { @@ -58,7 +61,7 @@ func Run(ctx context.Context, configFile string) error { constant.InitConstant() - messageQueue := mq.NewMessageQueue(api.FIGHT, APPID) + messageQueue := mq.NewMessageQueue(api.FIGHT, APPID, client) defer messageQueue.Close() _ = engine.NewFightEngine(messageQueue) diff --git a/gate/app/app.go b/gate/app/app.go index 3414624b..a9d856a2 100644 --- a/gate/app/app.go +++ b/gate/app/app.go @@ -31,8 +31,10 @@ func Run(ctx context.Context, configFile string) error { rsp, err := client.Discovery.RegisterServer(context.TODO(), &api.RegisterServerReq{ ServerType: api.GATE, GateServerAddr: &api.GateServerAddr{ - IpAddr: config.CONF.Hk4e.KcpAddr, - Port: uint32(config.CONF.Hk4e.KcpPort), + KcpAddr: config.CONF.Hk4e.KcpAddr, + KcpPort: uint32(config.CONF.Hk4e.KcpPort), + MqAddr: config.CONF.Hk4e.GateTcpMqAddr, + MqPort: uint32(config.CONF.Hk4e.GateTcpMqPort), }, Version: config.CONF.Hk4e.Version, }) @@ -44,10 +46,13 @@ func Run(ctx context.Context, configFile string) error { ticker := time.NewTicker(time.Second * 15) for { <-ticker.C - _, _ = client.Discovery.KeepaliveServer(context.TODO(), &api.KeepaliveServerReq{ + _, err := client.Discovery.KeepaliveServer(context.TODO(), &api.KeepaliveServerReq{ ServerType: api.GATE, AppId: APPID, }) + if err != nil { + logger.Error("keepalive error: %v", err) + } } }() defer func() { @@ -60,7 +65,7 @@ func Run(ctx context.Context, configFile string) error { logger.InitLogger("gate_" + APPID) logger.Warn("gate start, appid: %v", APPID) - messageQueue := mq.NewMessageQueue(api.GATE, APPID) + messageQueue := mq.NewMessageQueue(api.GATE, APPID, client) connectManager := net.NewKcpConnectManager(messageQueue, client.Discovery) connectManager.Start() diff --git a/gate/net/kcp_connect_manager.go b/gate/net/kcp_connect_manager.go index 1da3f3bd..db531453 100644 --- a/gate/net/kcp_connect_manager.go +++ b/gate/net/kcp_connect_manager.go @@ -21,7 +21,10 @@ import ( "hk4e/protocol/cmd" ) -const PacketFreqLimit = 1000 +const ( + PacketFreqLimit = 1000 + PacketMaxLen = 343 * 1024 +) type KcpConnectManager struct { discovery *rpc.DiscoveryClient @@ -239,7 +242,8 @@ func (k *KcpConnectManager) recvHandle(session *Session) { convId := conn.GetConv() pktFreqLimitCounter := 0 pktFreqLimitTimer := time.Now().UnixNano() - recvBuf := make([]byte, conn.GetMaxPayloadLen()) + recvBuf := make([]byte, PacketMaxLen) + dataBuf := make([]byte, 0, 1500) for { _ = conn.SetReadDeadline(time.Now().Add(time.Second * 15)) recvLen, err := conn.Read(recvBuf) @@ -263,7 +267,7 @@ func (k *KcpConnectManager) recvHandle(session *Session) { } recvData := recvBuf[:recvLen] kcpMsgList := make([]*KcpMsg, 0) - k.decodeBinToPayload(recvData, convId, &kcpMsgList, session.xorKey) + k.decodeBinToPayload(recvData, &dataBuf, convId, &kcpMsgList, session.xorKey) for _, v := range kcpMsgList { protoMsgList := k.protoDecode(v) for _, vv := range protoMsgList { diff --git a/gate/net/kcp_endecode.go b/gate/net/kcp_endecode.go index e4e02af0..4259cd6f 100644 --- a/gate/net/kcp_endecode.go +++ b/gate/net/kcp_endecode.go @@ -1,7 +1,6 @@ package net import ( - "bytes" "encoding/binary" "hk4e/pkg/endec" @@ -33,13 +32,18 @@ type KcpMsg struct { ProtoData []byte } -func (k *KcpConnectManager) decodeBinToPayload(data []byte, convId uint64, kcpMsgList *[]*KcpMsg, xorKey []byte) { +func (k *KcpConnectManager) decodeBinToPayload(data []byte, dataBuf *[]byte, convId uint64, kcpMsgList *[]*KcpMsg, xorKey []byte) { // xor解密 endec.Xor(data, xorKey) - k.decodeLoop(data, convId, kcpMsgList) + k.decodeLoop(data, dataBuf, convId, kcpMsgList) } -func (k *KcpConnectManager) decodeLoop(data []byte, convId uint64, kcpMsgList *[]*KcpMsg) { +func (k *KcpConnectManager) decodeLoop(data []byte, dataBuf *[]byte, convId uint64, kcpMsgList *[]*KcpMsg) { + if len(*dataBuf) != 0 { + // 取出之前的缓冲区数据 + data = append(*dataBuf, data...) + *dataBuf = make([]byte, 0, 1500) + } // 长度太短 if len(data) < 12 { logger.Debug("packet len less 12 byte") @@ -51,73 +55,45 @@ func (k *KcpConnectManager) decodeLoop(data []byte, convId uint64, kcpMsgList *[ return } // 协议号 - cmdIdByteSlice := make([]byte, 8) - cmdIdByteSlice[6] = data[2] - cmdIdByteSlice[7] = data[3] - cmdIdBuffer := bytes.NewBuffer(cmdIdByteSlice) - var cmdId int64 - err := binary.Read(cmdIdBuffer, binary.BigEndian, &cmdId) - if err != nil { - logger.Error("packet cmd id parse fail: %v", err) - return - } + cmdId := binary.BigEndian.Uint16(data[2:4]) // 头部长度 - headLenByteSlice := make([]byte, 8) - headLenByteSlice[6] = data[4] - headLenByteSlice[7] = data[5] - headLenBuffer := bytes.NewBuffer(headLenByteSlice) - var headLen int64 - err = binary.Read(headLenBuffer, binary.BigEndian, &headLen) - if err != nil { - logger.Error("packet head len parse fail: %v", err) - return - } + headLen := binary.BigEndian.Uint16(data[4:6]) // proto长度 - protoLenByteSlice := make([]byte, 8) - protoLenByteSlice[4] = data[6] - protoLenByteSlice[5] = data[7] - protoLenByteSlice[6] = data[8] - protoLenByteSlice[7] = data[9] - protoLenBuffer := bytes.NewBuffer(protoLenByteSlice) - var protoLen int64 - err = binary.Read(protoLenBuffer, binary.BigEndian, &protoLen) - if err != nil { - logger.Error("packet proto len parse fail: %v", err) + protoLen := binary.BigEndian.Uint32(data[6:10]) + // 检查长度 + packetLen := int(headLen) + int(protoLen) + 12 + if packetLen > PacketMaxLen { + logger.Error("packet len too long") return } - // 检查最小长度 - if len(data) < int(headLen+protoLen)+12 { - logger.Error("packet len error") + haveMorePacket := false + if len(data) > packetLen { + // 有不止一个包 + haveMorePacket = true + } else if len(data) < packetLen { + // 这一次没收够 放入缓冲区 + *dataBuf = append(*dataBuf, data...) return } // 尾部幻数错误 - if data[headLen+protoLen+10] != 0x89 || data[headLen+protoLen+11] != 0xAB { + if data[int(headLen)+int(protoLen)+10] != 0x89 || data[int(headLen)+int(protoLen)+11] != 0xAB { logger.Error("packet tail magic 0x89AB error") return } - // 判断是否有不止一个包 - haveMoreData := false - if len(data) > int(headLen+protoLen)+12 { - haveMoreData = true - } // 头部数据 - headData := data[10 : 10+headLen] + headData := data[10 : 10+int(headLen)] // proto数据 - protoData := data[10+headLen : 10+headLen+protoLen] + protoData := data[10+int(headLen) : 10+int(headLen)+int(protoLen)] // 返回数据 kcpMsg := new(KcpMsg) kcpMsg.ConvId = convId - kcpMsg.CmdId = uint16(cmdId) - // kcpMsg.HeadData = make([]byte, len(headData)) - // copy(kcpMsg.HeadData, headData) - // kcpMsg.ProtoData = make([]byte, len(protoData)) - // copy(kcpMsg.ProtoData, protoData) + kcpMsg.CmdId = cmdId kcpMsg.HeadData = headData kcpMsg.ProtoData = protoData *kcpMsgList = append(*kcpMsgList, kcpMsg) // 递归解析 - if haveMoreData { - k.decodeLoop(data[int(headLen+protoLen)+12:], convId, kcpMsgList) + if haveMorePacket { + k.decodeLoop(data[packetLen:], dataBuf, convId, kcpMsgList) } } @@ -133,34 +109,11 @@ func (k *KcpConnectManager) encodePayloadToBin(kcpMsg *KcpMsg, xorKey []byte) (b bin[0] = 0x45 bin[1] = 0x67 // 协议号 - cmdIdBuffer := bytes.NewBuffer([]byte{}) - err := binary.Write(cmdIdBuffer, binary.BigEndian, kcpMsg.CmdId) - if err != nil { - logger.Error("cmd id encode err: %v", err) - return nil - } - bin[2] = (cmdIdBuffer.Bytes())[0] - bin[3] = (cmdIdBuffer.Bytes())[1] + binary.BigEndian.PutUint16(bin[2:4], kcpMsg.CmdId) // 头部长度 - headLenBuffer := bytes.NewBuffer([]byte{}) - err = binary.Write(headLenBuffer, binary.BigEndian, uint16(len(kcpMsg.HeadData))) - if err != nil { - logger.Error("head len encode err: %v", err) - return nil - } - bin[4] = (headLenBuffer.Bytes())[0] - bin[5] = (headLenBuffer.Bytes())[1] + binary.BigEndian.PutUint16(bin[4:6], uint16(len(kcpMsg.HeadData))) // proto长度 - protoLenBuffer := bytes.NewBuffer([]byte{}) - err = binary.Write(protoLenBuffer, binary.BigEndian, uint32(len(kcpMsg.ProtoData))) - if err != nil { - logger.Error("proto len encode err: %v", err) - return nil - } - bin[6] = (protoLenBuffer.Bytes())[0] - bin[7] = (protoLenBuffer.Bytes())[1] - bin[8] = (protoLenBuffer.Bytes())[2] - bin[9] = (protoLenBuffer.Bytes())[3] + binary.BigEndian.PutUint32(bin[6:10], uint32(len(kcpMsg.ProtoData))) // 头部数据 copy(bin[10:], kcpMsg.HeadData) // proto数据 diff --git a/gate/net/session.go b/gate/net/session.go index bb6c6a86..fb7cc560 100644 --- a/gate/net/session.go +++ b/gate/net/session.go @@ -106,11 +106,8 @@ func (k *KcpConnectManager) recvMsgHandle(protoMsg *ProtoMsg, session *Session) logger.Error("conn not active so drop packet, cmdId: %v, userId: %v, convId: %v", protoMsg.CmdId, userId, protoMsg.ConvId) return } - // 只转发到寻路服务器 - if protoMsg.CmdId == cmd.QueryPathReq || protoMsg.CmdId == cmd.ObstacleModifyNotify { - if session.pathfindingServerAppId == "" { - return - } + // 转发到寻路服务器 + if session.pathfindingServerAppId != "" && (protoMsg.CmdId == cmd.QueryPathReq || protoMsg.CmdId == cmd.ObstacleModifyNotify) { gameMsg := new(mq.GameMsg) gameMsg.UserId = userId gameMsg.CmdId = protoMsg.CmdId @@ -123,8 +120,8 @@ func (k *KcpConnectManager) recvMsgHandle(protoMsg *ProtoMsg, session *Session) }) return } - // 同时转发到战斗服务器 - if protoMsg.CmdId == cmd.CombatInvocationsNotify && session.fightServerAppId != "" { + // 转发到战斗服务器 + if session.fightServerAppId != "" && protoMsg.CmdId == cmd.CombatInvocationsNotify { gameMsg := new(mq.GameMsg) gameMsg.UserId = userId gameMsg.CmdId = protoMsg.CmdId diff --git a/gs/app/app.go b/gs/app/app.go index 90a2b63f..b0e5e9c0 100644 --- a/gs/app/app.go +++ b/gs/app/app.go @@ -47,10 +47,13 @@ func Run(ctx context.Context, configFile string) error { ticker := time.NewTicker(time.Second * 15) for { <-ticker.C - _, _ = client.Discovery.KeepaliveServer(context.TODO(), &api.KeepaliveServerReq{ + _, err := client.Discovery.KeepaliveServer(context.TODO(), &api.KeepaliveServerReq{ ServerType: api.GS, AppId: APPID, }) + if err != nil { + logger.Error("keepalive error: %v", err) + } } }() GSID = rsp.GetGsId() @@ -75,7 +78,7 @@ func Run(ctx context.Context, configFile string) error { } defer db.CloseDao() - messageQueue := mq.NewMessageQueue(api.GS, APPID) + messageQueue := mq.NewMessageQueue(api.GS, APPID, client) defer messageQueue.Close() gameManager := game.NewGameManager(db, messageQueue, GSID) diff --git a/gs/game/route_manager.go b/gs/game/route_manager.go index 86681812..7a359d40 100644 --- a/gs/game/route_manager.go +++ b/gs/game/route_manager.go @@ -54,6 +54,7 @@ func (r *RouteManager) doRoute(cmdId uint16, userId uint32, clientSeq uint32, pa } func (r *RouteManager) initRoute() { + r.registerRouter(cmd.QueryPathReq, GAME_MANAGER.QueryPathReq) r.registerRouter(cmd.UnionCmdNotify, GAME_MANAGER.UnionCmdNotify) r.registerRouter(cmd.MassiveEntityElementOpBatchNotify, GAME_MANAGER.MassiveEntityElementOpBatchNotify) r.registerRouter(cmd.ToTheMoonEnterSceneReq, GAME_MANAGER.ToTheMoonEnterSceneReq) diff --git a/gs/game/user_common_handler.go b/gs/game/user_common_handler.go index caf9fc12..7994c188 100644 --- a/gs/game/user_common_handler.go +++ b/gs/game/user_common_handler.go @@ -40,6 +40,18 @@ func (g *GameManager) TowerAllDataReq(player *model.Player, payloadMsg pb.Messag g.SendMsg(cmd.TowerAllDataRsp, player.PlayerID, player.ClientSeq, towerAllDataRsp) } +func (g *GameManager) QueryPathReq(player *model.Player, payloadMsg pb.Message) { + // logger.Debug("user query path, uid: %v", player.PlayerID) + req := payloadMsg.(*proto.QueryPathReq) + + queryPathRsp := &proto.QueryPathRsp{ + QueryId: req.QueryId, + QueryStatus: proto.QueryPathRsp_PATH_STATUS_TYPE_SUCC, + Corners: []*proto.Vector{req.DestinationPos[0]}, + } + g.SendMsg(cmd.QueryPathRsp, player.PlayerID, player.ClientSeq, queryPathRsp) +} + func (g *GameManager) EntityAiSyncNotify(player *model.Player, payloadMsg pb.Message) { logger.Debug("user entity ai sync, uid: %v", player.PlayerID) req := payloadMsg.(*proto.EntityAiSyncNotify) diff --git a/gs/game/user_fight_sync.go b/gs/game/user_fight_sync.go index ab8dd277..b244edbe 100644 --- a/gs/game/user_fight_sync.go +++ b/gs/game/user_fight_sync.go @@ -1,6 +1,7 @@ package game import ( + "hk4e/common/constant" "hk4e/common/utils" "hk4e/gs/model" "hk4e/pkg/logger" @@ -101,7 +102,51 @@ func (g *GameManager) CombatInvocationsNotify(player *model.Player, payloadMsg p for _, entry := range req.InvokeList { switch entry.ArgumentType { case proto.CombatTypeArgument_COMBAT_TYPE_ARGUMENT_EVT_BEING_HIT: - continue + hitInfo := new(proto.EvtBeingHitInfo) + clientProtoObj := g.GetClientProtoObjByName("EvtBeingHitInfo") + if clientProtoObj == nil { + logger.Error("get client proto obj is nil") + return + } + ok := utils.UnmarshalProtoObj(hitInfo, clientProtoObj, entry.CombatData) + if !ok { + continue + } + attackResult := hitInfo.AttackResult + if attackResult == nil { + logger.Error("attackResult is nil") + continue + } + logger.Debug("run attack handler, attackResult: %v", attackResult) + target := scene.GetEntity(attackResult.DefenseId) + if target == nil { + logger.Error("could not found target, defense id: %v", attackResult.DefenseId) + continue + } + attackResult.Damage *= 100 + damage := attackResult.Damage + attackerId := attackResult.AttackerId + _ = attackerId + currHp := float32(0) + if target.fightProp != nil { + currHp = target.fightProp[uint32(constant.FightPropertyConst.FIGHT_PROP_CUR_HP)] + currHp -= damage + if currHp < 0 { + currHp = 0 + } + target.fightProp[uint32(constant.FightPropertyConst.FIGHT_PROP_CUR_HP)] = currHp + } + entityFightPropUpdateNotify := &proto.EntityFightPropUpdateNotify{ + FightPropMap: target.fightProp, + EntityId: target.id, + } + g.SendToWorldA(world, cmd.EntityFightPropUpdateNotify, player.ClientSeq, entityFightPropUpdateNotify) + combatData, err := pb.Marshal(hitInfo) + if err != nil { + logger.Error("create combat invocations entity hit info error: %v", err) + } + entry.CombatData = combatData + player.CombatInvokeHandler.AddEntry(entry.ForwardType, entry) case proto.CombatTypeArgument_COMBAT_TYPE_ARGUMENT_ENTITY_MOVE: entityMoveInfo := new(proto.EntityMoveInfo) clientProtoObj := g.GetClientProtoObjByName("EntityMoveInfo") diff --git a/node/api/api.natsrpc.pb.go b/node/api/api.natsrpc.pb.go index 42253945..2dd71ffe 100644 --- a/node/api/api.natsrpc.pb.go +++ b/node/api/api.natsrpc.pb.go @@ -33,6 +33,8 @@ type DiscoveryNATSRPCServer interface { GetRegionEc2B(ctx context.Context, req *NullMsg) (*RegionEc2B, error) // 获取负载最小的网关服务器的地址和端口 GetGateServerAddr(ctx context.Context, req *GetGateServerAddrReq) (*GateServerAddr, error) + // 获取全部网关服务器信息列表 + GetAllGateServerInfoList(ctx context.Context, req *NullMsg) (*GateServerInfoList, error) } // RegisterDiscoveryNATSRPCServer register Discovery service @@ -54,6 +56,8 @@ type DiscoveryNATSRPCClient interface { GetRegionEc2B(ctx context.Context, req *NullMsg, opt ...natsrpc.CallOption) (*RegionEc2B, error) // 获取负载最小的网关服务器的地址和端口 GetGateServerAddr(ctx context.Context, req *GetGateServerAddrReq, opt ...natsrpc.CallOption) (*GateServerAddr, error) + // 获取全部网关服务器信息列表 + GetAllGateServerInfoList(ctx context.Context, req *NullMsg, opt ...natsrpc.CallOption) (*GateServerInfoList, error) } type _DiscoveryNATSRPCClient struct { @@ -101,3 +105,8 @@ func (c *_DiscoveryNATSRPCClient) GetGateServerAddr(ctx context.Context, req *Ge err := c.c.Request(ctx, "GetGateServerAddr", req, rep, opt...) return rep, err } +func (c *_DiscoveryNATSRPCClient) GetAllGateServerInfoList(ctx context.Context, req *NullMsg, opt ...natsrpc.CallOption) (*GateServerInfoList, error) { + rep := &GateServerInfoList{} + err := c.c.Request(ctx, "GetAllGateServerInfoList", req, rep, opt...) + return rep, err +} diff --git a/node/api/api.pb.go b/node/api/api.pb.go index 9ae3025a..a10d06a1 100644 --- a/node/api/api.pb.go +++ b/node/api/api.pb.go @@ -479,8 +479,10 @@ type GateServerAddr struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - IpAddr string `protobuf:"bytes,1,opt,name=ip_addr,json=ipAddr,proto3" json:"ip_addr,omitempty"` - Port uint32 `protobuf:"varint,2,opt,name=port,proto3" json:"port,omitempty"` + KcpAddr string `protobuf:"bytes,1,opt,name=kcp_addr,json=kcpAddr,proto3" json:"kcp_addr,omitempty"` + KcpPort uint32 `protobuf:"varint,2,opt,name=kcp_port,json=kcpPort,proto3" json:"kcp_port,omitempty"` + MqAddr string `protobuf:"bytes,3,opt,name=mq_addr,json=mqAddr,proto3" json:"mq_addr,omitempty"` + MqPort uint32 `protobuf:"varint,4,opt,name=mq_port,json=mqPort,proto3" json:"mq_port,omitempty"` } func (x *GateServerAddr) Reset() { @@ -515,20 +517,144 @@ func (*GateServerAddr) Descriptor() ([]byte, []int) { return file_api_proto_rawDescGZIP(), []int{9} } -func (x *GateServerAddr) GetIpAddr() string { +func (x *GateServerAddr) GetKcpAddr() string { if x != nil { - return x.IpAddr + return x.KcpAddr } return "" } -func (x *GateServerAddr) GetPort() uint32 { +func (x *GateServerAddr) GetKcpPort() uint32 { if x != nil { - return x.Port + return x.KcpPort } return 0 } +func (x *GateServerAddr) GetMqAddr() string { + if x != nil { + return x.MqAddr + } + return "" +} + +func (x *GateServerAddr) GetMqPort() uint32 { + if x != nil { + return x.MqPort + } + return 0 +} + +type GateServerInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + AppId string `protobuf:"bytes,1,opt,name=app_id,json=appId,proto3" json:"app_id,omitempty"` + MqAddr string `protobuf:"bytes,2,opt,name=mq_addr,json=mqAddr,proto3" json:"mq_addr,omitempty"` + MqPort uint32 `protobuf:"varint,3,opt,name=mq_port,json=mqPort,proto3" json:"mq_port,omitempty"` +} + +func (x *GateServerInfo) Reset() { + *x = GateServerInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_api_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GateServerInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GateServerInfo) ProtoMessage() {} + +func (x *GateServerInfo) ProtoReflect() protoreflect.Message { + mi := &file_api_proto_msgTypes[10] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GateServerInfo.ProtoReflect.Descriptor instead. +func (*GateServerInfo) Descriptor() ([]byte, []int) { + return file_api_proto_rawDescGZIP(), []int{10} +} + +func (x *GateServerInfo) GetAppId() string { + if x != nil { + return x.AppId + } + return "" +} + +func (x *GateServerInfo) GetMqAddr() string { + if x != nil { + return x.MqAddr + } + return "" +} + +func (x *GateServerInfo) GetMqPort() uint32 { + if x != nil { + return x.MqPort + } + return 0 +} + +type GateServerInfoList struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + GateServerInfoList []*GateServerInfo `protobuf:"bytes,1,rep,name=gate_server_info_list,json=gateServerInfoList,proto3" json:"gate_server_info_list,omitempty"` +} + +func (x *GateServerInfoList) Reset() { + *x = GateServerInfoList{} + if protoimpl.UnsafeEnabled { + mi := &file_api_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GateServerInfoList) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GateServerInfoList) ProtoMessage() {} + +func (x *GateServerInfoList) ProtoReflect() protoreflect.Message { + mi := &file_api_proto_msgTypes[11] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GateServerInfoList.ProtoReflect.Descriptor instead. +func (*GateServerInfoList) Descriptor() ([]byte, []int) { + return file_api_proto_rawDescGZIP(), []int{11} +} + +func (x *GateServerInfoList) GetGateServerInfoList() []*GateServerInfo { + if x != nil { + return x.GateServerInfoList + } + return nil +} + var File_api_proto protoreflect.FileDescriptor var file_api_proto_rawDesc = []byte{ @@ -568,40 +694,61 @@ var file_api_proto_rawDesc = []byte{ 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x20, 0x0a, 0x0a, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x45, 0x63, 0x32, 0x62, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x3d, 0x0a, 0x0e, 0x47, 0x61, 0x74, 0x65, 0x53, 0x65, 0x72, - 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x12, 0x17, 0x0a, 0x07, 0x69, 0x70, 0x5f, 0x61, 0x64, - 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x69, 0x70, 0x41, 0x64, 0x64, 0x72, - 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, - 0x70, 0x6f, 0x72, 0x74, 0x32, 0xba, 0x03, 0x0a, 0x09, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, - 0x72, 0x79, 0x12, 0x4c, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x65, - 0x72, 0x76, 0x65, 0x72, 0x12, 0x1b, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, - 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, - 0x71, 0x1a, 0x1b, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x52, 0x65, 0x67, - 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x73, 0x70, 0x22, 0x00, - 0x12, 0x3e, 0x0a, 0x0c, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x12, 0x19, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x43, 0x61, 0x6e, 0x63, - 0x65, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, 0x1a, 0x11, 0x2e, 0x6e, 0x6f, - 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4e, 0x75, 0x6c, 0x6c, 0x4d, 0x73, 0x67, 0x22, 0x00, - 0x12, 0x44, 0x0a, 0x0f, 0x4b, 0x65, 0x65, 0x70, 0x61, 0x6c, 0x69, 0x76, 0x65, 0x53, 0x65, 0x72, - 0x76, 0x65, 0x72, 0x12, 0x1c, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4b, - 0x65, 0x65, 0x70, 0x61, 0x6c, 0x69, 0x76, 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, - 0x71, 0x1a, 0x11, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4e, 0x75, 0x6c, - 0x6c, 0x4d, 0x73, 0x67, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, - 0x76, 0x65, 0x72, 0x41, 0x70, 0x70, 0x49, 0x64, 0x12, 0x1b, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, - 0x61, 0x70, 0x69, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x70, 0x70, - 0x49, 0x64, 0x52, 0x65, 0x71, 0x1a, 0x1b, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, - 0x2e, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x70, 0x70, 0x49, 0x64, 0x52, - 0x73, 0x70, 0x22, 0x00, 0x12, 0x3a, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x52, 0x65, 0x67, 0x69, 0x6f, - 0x6e, 0x45, 0x63, 0x32, 0x62, 0x12, 0x11, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, - 0x2e, 0x4e, 0x75, 0x6c, 0x6c, 0x4d, 0x73, 0x67, 0x1a, 0x14, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, - 0x61, 0x70, 0x69, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x45, 0x63, 0x32, 0x62, 0x22, 0x00, - 0x12, 0x4f, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x47, 0x61, 0x74, 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, - 0x72, 0x41, 0x64, 0x64, 0x72, 0x12, 0x1e, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, - 0x2e, 0x47, 0x65, 0x74, 0x47, 0x61, 0x74, 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, - 0x64, 0x72, 0x52, 0x65, 0x71, 0x1a, 0x18, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, - 0x2e, 0x47, 0x61, 0x74, 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x22, - 0x00, 0x42, 0x13, 0x5a, 0x11, 0x68, 0x6b, 0x34, 0x65, 0x2f, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x61, - 0x70, 0x69, 0x3b, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x78, 0x0a, 0x0e, 0x47, 0x61, 0x74, 0x65, 0x53, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x12, 0x19, 0x0a, 0x08, 0x6b, 0x63, 0x70, 0x5f, 0x61, + 0x64, 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6b, 0x63, 0x70, 0x41, 0x64, + 0x64, 0x72, 0x12, 0x19, 0x0a, 0x08, 0x6b, 0x63, 0x70, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x6b, 0x63, 0x70, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x17, 0x0a, + 0x07, 0x6d, 0x71, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, + 0x6d, 0x71, 0x41, 0x64, 0x64, 0x72, 0x12, 0x17, 0x0a, 0x07, 0x6d, 0x71, 0x5f, 0x70, 0x6f, 0x72, + 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x6d, 0x71, 0x50, 0x6f, 0x72, 0x74, 0x22, + 0x59, 0x0a, 0x0e, 0x47, 0x61, 0x74, 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, + 0x6f, 0x12, 0x15, 0x0a, 0x06, 0x61, 0x70, 0x70, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x61, 0x70, 0x70, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x6d, 0x71, 0x5f, 0x61, + 0x64, 0x64, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x71, 0x41, 0x64, 0x64, + 0x72, 0x12, 0x17, 0x0a, 0x07, 0x6d, 0x71, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0d, 0x52, 0x06, 0x6d, 0x71, 0x50, 0x6f, 0x72, 0x74, 0x22, 0x61, 0x0a, 0x12, 0x47, 0x61, + 0x74, 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x4c, 0x69, 0x73, 0x74, + 0x12, 0x4b, 0x0a, 0x15, 0x67, 0x61, 0x74, 0x65, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, + 0x69, 0x6e, 0x66, 0x6f, 0x5f, 0x6c, 0x69, 0x73, 0x74, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x18, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x47, 0x61, 0x74, 0x65, 0x53, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x12, 0x67, 0x61, 0x74, 0x65, 0x53, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x4c, 0x69, 0x73, 0x74, 0x32, 0x89, 0x04, + 0x0a, 0x09, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x12, 0x4c, 0x0a, 0x0e, 0x52, + 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x1b, 0x2e, + 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, + 0x72, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, 0x1a, 0x1b, 0x2e, 0x6e, 0x6f, 0x64, + 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x52, 0x73, 0x70, 0x22, 0x00, 0x12, 0x3e, 0x0a, 0x0c, 0x43, 0x61, 0x6e, + 0x63, 0x65, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x19, 0x2e, 0x6e, 0x6f, 0x64, 0x65, + 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, + 0x72, 0x52, 0x65, 0x71, 0x1a, 0x11, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, + 0x4e, 0x75, 0x6c, 0x6c, 0x4d, 0x73, 0x67, 0x22, 0x00, 0x12, 0x44, 0x0a, 0x0f, 0x4b, 0x65, 0x65, + 0x70, 0x61, 0x6c, 0x69, 0x76, 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x1c, 0x2e, 0x6e, + 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4b, 0x65, 0x65, 0x70, 0x61, 0x6c, 0x69, 0x76, + 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, 0x1a, 0x11, 0x2e, 0x6e, 0x6f, 0x64, + 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4e, 0x75, 0x6c, 0x6c, 0x4d, 0x73, 0x67, 0x22, 0x00, 0x12, + 0x4c, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x70, 0x70, 0x49, + 0x64, 0x12, 0x1b, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x47, 0x65, 0x74, + 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x70, 0x70, 0x49, 0x64, 0x52, 0x65, 0x71, 0x1a, 0x1b, + 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x41, 0x70, 0x70, 0x49, 0x64, 0x52, 0x73, 0x70, 0x22, 0x00, 0x12, 0x3a, 0x0a, + 0x0d, 0x47, 0x65, 0x74, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x45, 0x63, 0x32, 0x62, 0x12, 0x11, + 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4e, 0x75, 0x6c, 0x6c, 0x4d, 0x73, + 0x67, 0x1a, 0x14, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x52, 0x65, 0x67, + 0x69, 0x6f, 0x6e, 0x45, 0x63, 0x32, 0x62, 0x22, 0x00, 0x12, 0x4f, 0x0a, 0x11, 0x47, 0x65, 0x74, + 0x47, 0x61, 0x74, 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x12, 0x1e, + 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x47, 0x65, 0x74, 0x47, 0x61, 0x74, + 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x52, 0x65, 0x71, 0x1a, 0x18, + 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x47, 0x61, 0x74, 0x65, 0x53, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x22, 0x00, 0x12, 0x4d, 0x0a, 0x18, 0x47, 0x65, + 0x74, 0x41, 0x6c, 0x6c, 0x47, 0x61, 0x74, 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, + 0x66, 0x6f, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x11, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, + 0x69, 0x2e, 0x4e, 0x75, 0x6c, 0x6c, 0x4d, 0x73, 0x67, 0x1a, 0x1c, 0x2e, 0x6e, 0x6f, 0x64, 0x65, + 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x47, 0x61, 0x74, 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, + 0x6e, 0x66, 0x6f, 0x4c, 0x69, 0x73, 0x74, 0x22, 0x00, 0x42, 0x13, 0x5a, 0x11, 0x68, 0x6b, 0x34, + 0x65, 0x2f, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x3b, 0x61, 0x70, 0x69, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -616,7 +763,7 @@ func file_api_proto_rawDescGZIP() []byte { return file_api_proto_rawDescData } -var file_api_proto_msgTypes = make([]protoimpl.MessageInfo, 10) +var file_api_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_api_proto_goTypes = []interface{}{ (*NullMsg)(nil), // 0: node.api.NullMsg (*GetServerAppIdReq)(nil), // 1: node.api.GetServerAppIdReq @@ -628,26 +775,31 @@ var file_api_proto_goTypes = []interface{}{ (*GetGateServerAddrReq)(nil), // 7: node.api.GetGateServerAddrReq (*RegionEc2B)(nil), // 8: node.api.RegionEc2b (*GateServerAddr)(nil), // 9: node.api.GateServerAddr + (*GateServerInfo)(nil), // 10: node.api.GateServerInfo + (*GateServerInfoList)(nil), // 11: node.api.GateServerInfoList } var file_api_proto_depIdxs = []int32{ - 9, // 0: node.api.RegisterServerReq.gate_server_addr:type_name -> node.api.GateServerAddr - 3, // 1: node.api.Discovery.RegisterServer:input_type -> node.api.RegisterServerReq - 5, // 2: node.api.Discovery.CancelServer:input_type -> node.api.CancelServerReq - 6, // 3: node.api.Discovery.KeepaliveServer:input_type -> node.api.KeepaliveServerReq - 1, // 4: node.api.Discovery.GetServerAppId:input_type -> node.api.GetServerAppIdReq - 0, // 5: node.api.Discovery.GetRegionEc2b:input_type -> node.api.NullMsg - 7, // 6: node.api.Discovery.GetGateServerAddr:input_type -> node.api.GetGateServerAddrReq - 4, // 7: node.api.Discovery.RegisterServer:output_type -> node.api.RegisterServerRsp - 0, // 8: node.api.Discovery.CancelServer:output_type -> node.api.NullMsg - 0, // 9: node.api.Discovery.KeepaliveServer:output_type -> node.api.NullMsg - 2, // 10: node.api.Discovery.GetServerAppId:output_type -> node.api.GetServerAppIdRsp - 8, // 11: node.api.Discovery.GetRegionEc2b:output_type -> node.api.RegionEc2b - 9, // 12: node.api.Discovery.GetGateServerAddr:output_type -> node.api.GateServerAddr - 7, // [7:13] is the sub-list for method output_type - 1, // [1:7] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name + 9, // 0: node.api.RegisterServerReq.gate_server_addr:type_name -> node.api.GateServerAddr + 10, // 1: node.api.GateServerInfoList.gate_server_info_list:type_name -> node.api.GateServerInfo + 3, // 2: node.api.Discovery.RegisterServer:input_type -> node.api.RegisterServerReq + 5, // 3: node.api.Discovery.CancelServer:input_type -> node.api.CancelServerReq + 6, // 4: node.api.Discovery.KeepaliveServer:input_type -> node.api.KeepaliveServerReq + 1, // 5: node.api.Discovery.GetServerAppId:input_type -> node.api.GetServerAppIdReq + 0, // 6: node.api.Discovery.GetRegionEc2b:input_type -> node.api.NullMsg + 7, // 7: node.api.Discovery.GetGateServerAddr:input_type -> node.api.GetGateServerAddrReq + 0, // 8: node.api.Discovery.GetAllGateServerInfoList:input_type -> node.api.NullMsg + 4, // 9: node.api.Discovery.RegisterServer:output_type -> node.api.RegisterServerRsp + 0, // 10: node.api.Discovery.CancelServer:output_type -> node.api.NullMsg + 0, // 11: node.api.Discovery.KeepaliveServer:output_type -> node.api.NullMsg + 2, // 12: node.api.Discovery.GetServerAppId:output_type -> node.api.GetServerAppIdRsp + 8, // 13: node.api.Discovery.GetRegionEc2b:output_type -> node.api.RegionEc2b + 9, // 14: node.api.Discovery.GetGateServerAddr:output_type -> node.api.GateServerAddr + 11, // 15: node.api.Discovery.GetAllGateServerInfoList:output_type -> node.api.GateServerInfoList + 9, // [9:16] is the sub-list for method output_type + 2, // [2:9] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_api_proto_init() } @@ -776,6 +928,30 @@ func file_api_proto_init() { return nil } } + file_api_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GateServerInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_api_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GateServerInfoList); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -783,7 +959,7 @@ func file_api_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_api_proto_rawDesc, NumEnums: 0, - NumMessages: 10, + NumMessages: 12, NumExtensions: 0, NumServices: 1, }, diff --git a/node/api/api.proto b/node/api/api.proto index f84710d8..59881f02 100644 --- a/node/api/api.proto +++ b/node/api/api.proto @@ -17,6 +17,8 @@ service Discovery { rpc GetRegionEc2b (NullMsg) returns (RegionEc2b) {} // 获取负载最小的网关服务器的地址和端口 rpc GetGateServerAddr (GetGateServerAddrReq) returns (GateServerAddr) {} + // 获取全部网关服务器信息列表 + rpc GetAllGateServerInfoList (NullMsg) returns (GateServerInfoList) {} } message NullMsg { @@ -60,6 +62,18 @@ message RegionEc2b { } message GateServerAddr { - string ip_addr = 1; - uint32 port = 2; + string kcp_addr = 1; + uint32 kcp_port = 2; + string mq_addr = 3; + uint32 mq_port = 4; +} + +message GateServerInfo { + string app_id = 1; + string mq_addr = 2; + uint32 mq_port = 3; +} + +message GateServerInfoList { + repeated GateServerInfo gate_server_info_list = 1; } diff --git a/node/service/discovery.go b/node/service/discovery.go index 33f4ee9d..d11ac0b2 100644 --- a/node/service/discovery.go +++ b/node/service/discovery.go @@ -33,12 +33,14 @@ func (s ServerInstanceSortList) Swap(i, j int) { } type ServerInstance struct { - serverType string - appId string - gateServerIpAddr string - gateServerPort uint32 - version string - lastAliveTime int64 + serverType string + appId string + gateServerKcpAddr string + gateServerKcpPort uint32 + gateServerMqAddr string + gateServerMqPort uint32 + version string + lastAliveTime int64 } type DiscoveryService struct { @@ -80,13 +82,16 @@ func (s *DiscoveryService) RegisterServer(ctx context.Context, req *api.Register } } inst := &ServerInstance{ - serverType: req.ServerType, - appId: appId, + serverType: req.ServerType, + appId: appId, + lastAliveTime: time.Now().Unix(), } if req.ServerType == api.GATE { - logger.Info("register new gate server, ip: %v, port: %v", req.GateServerAddr.IpAddr, req.GateServerAddr.Port) - inst.gateServerIpAddr = req.GateServerAddr.IpAddr - inst.gateServerPort = req.GateServerAddr.Port + logger.Info("register new gate server, ip: %v, port: %v", req.GateServerAddr.KcpAddr, req.GateServerAddr.KcpPort) + inst.gateServerKcpAddr = req.GateServerAddr.KcpAddr + inst.gateServerKcpPort = req.GateServerAddr.KcpPort + inst.gateServerMqAddr = req.GateServerAddr.MqAddr + inst.gateServerMqPort = req.GateServerAddr.MqPort inst.version = req.Version } instMap.Store(appId, inst) @@ -109,6 +114,7 @@ func (s *DiscoveryService) CancelServer(ctx context.Context, req *api.CancelServ } _, exist = instMap.Load(req.AppId) if !exist { + logger.Error("recv not exist server cancel, server type: %v, appid: %v", req.ServerType, req.AppId) return nil, errors.New("server not exist") } instMap.Delete(req.AppId) @@ -117,12 +123,14 @@ func (s *DiscoveryService) CancelServer(ctx context.Context, req *api.CancelServ // KeepaliveServer 服务器在线心跳保持 func (s *DiscoveryService) KeepaliveServer(ctx context.Context, req *api.KeepaliveServerReq) (*api.NullMsg, error) { + logger.Debug("server keepalive, server type: %v, appid: %v", req.ServerType, req.AppId) instMap, exist := s.serverInstanceMap[req.ServerType] if !exist { return nil, errors.New("server type not exist") } inst, exist := instMap.Load(req.AppId) if !exist { + logger.Error("recv not exist server keepalive, server type: %v, appid: %v", req.ServerType, req.AppId) return nil, errors.New("server not exist") } serverInstance := inst.(*ServerInstance) @@ -178,10 +186,35 @@ func (s *DiscoveryService) GetGateServerAddr(ctx context.Context, req *api.GetGa return nil, errors.New("no gate server found") } inst := s.getRandomServerInstance(&versionInstMap) - logger.Debug("get gate server addr is, ip: %v, port: %v", inst.gateServerIpAddr, inst.gateServerPort) + logger.Debug("get gate server addr is, ip: %v, port: %v", inst.gateServerKcpAddr, inst.gateServerKcpPort) return &api.GateServerAddr{ - IpAddr: inst.gateServerIpAddr, - Port: inst.gateServerPort, + KcpAddr: inst.gateServerKcpAddr, + KcpPort: inst.gateServerKcpPort, + }, nil +} + +// GetAllGateServerInfoList 获取全部网关服务器信息列表 +func (s *DiscoveryService) GetAllGateServerInfoList(ctx context.Context, req *api.NullMsg) (*api.GateServerInfoList, error) { + logger.Debug("get all gate server info list") + instMap, exist := s.serverInstanceMap[api.GATE] + if !exist { + return nil, errors.New("gate server not exist") + } + if s.getServerInstanceMapLen(instMap) == 0 { + return nil, errors.New("no gate server found") + } + gateServerInfoList := make([]*api.GateServerInfo, 0) + instMap.Range(func(key, value any) bool { + serverInstance := value.(*ServerInstance) + gateServerInfoList = append(gateServerInfoList, &api.GateServerInfo{ + AppId: serverInstance.appId, + MqAddr: serverInstance.gateServerMqAddr, + MqPort: serverInstance.gateServerMqPort, + }) + return true + }) + return &api.GateServerInfoList{ + GateServerInfoList: gateServerInfoList, }, nil } @@ -216,7 +249,8 @@ func (s *DiscoveryService) removeDeadServer() { instMap.Range(func(key, value any) bool { serverInstance := value.(*ServerInstance) if nowTime-serverInstance.lastAliveTime > 60 { - logger.Warn("remove dead server, server type: %v, appid: %v", serverInstance.serverType, serverInstance.appId) + logger.Warn("remove dead server, server type: %v, appid: %v, last alive time: %v", + serverInstance.serverType, serverInstance.appId, serverInstance.lastAliveTime) instMap.Delete(key) } return true diff --git a/pathfinding/app/app.go b/pathfinding/app/app.go index 56307df1..a462c629 100644 --- a/pathfinding/app/app.go +++ b/pathfinding/app/app.go @@ -39,10 +39,13 @@ func Run(ctx context.Context, configFile string) error { ticker := time.NewTicker(time.Second * 15) for { <-ticker.C - _, _ = client.Discovery.KeepaliveServer(context.TODO(), &api.KeepaliveServerReq{ + _, err := client.Discovery.KeepaliveServer(context.TODO(), &api.KeepaliveServerReq{ ServerType: api.PATHFINDING, AppId: APPID, }) + if err != nil { + logger.Error("keepalive error: %v", err) + } } }() defer func() { @@ -55,7 +58,7 @@ func Run(ctx context.Context, configFile string) error { logger.InitLogger("pathfinding_" + APPID) logger.Warn("pathfinding start, appid: %v", APPID) - messageQueue := mq.NewMessageQueue(api.PATHFINDING, APPID) + messageQueue := mq.NewMessageQueue(api.PATHFINDING, APPID, client) defer messageQueue.Close() _ = handle.NewHandle(messageQueue)