diff --git a/common/mq/nats.go b/common/mq/nats.go index bdea4f29..95728fd6 100644 --- a/common/mq/nats.go +++ b/common/mq/nats.go @@ -6,6 +6,7 @@ import ( "net" "strconv" "strings" + "time" "hk4e/common/config" "hk4e/common/rpc" @@ -25,16 +26,17 @@ import ( // 要用RPC有专门的NATSRPC type MessageQueue struct { - natsConn *nats.Conn - natsMsgChan chan *nats.Msg - netMsgInput chan *NetMsg - netMsgOutput chan *NetMsg - cmdProtoMap *cmd.CmdProtoMap - serverType string - appId string - gateTcpMqChan chan []byte - gateTcpMqEventChan chan *GateTcpMqEvent - rpcClient *rpc.Client + natsConn *nats.Conn + natsMsgChan chan *nats.Msg + netMsgInput chan *NetMsg + netMsgOutput chan *NetMsg + cmdProtoMap *cmd.CmdProtoMap + serverType string + appId string + gateTcpMqChan chan []byte + gateTcpMqEventChan chan *GateTcpMqEvent + gateTcpMqDeadEventChan chan string + rpcClient *rpc.Client } func NewMessageQueue(serverType string, appId string, rpcClient *rpc.Client) (r *MessageQueue) { @@ -63,11 +65,12 @@ func NewMessageQueue(serverType string, appId string, rpcClient *rpc.Client) (r r.appId = appId r.gateTcpMqChan = make(chan []byte, 1000) r.gateTcpMqEventChan = make(chan *GateTcpMqEvent, 1000) + r.gateTcpMqDeadEventChan = make(chan string, 1000) r.rpcClient = rpcClient if serverType == api.GATE { - r.initGateTcpMqServer() + go r.runGateTcpMqServer() } else { - r.initGateTcpMqClient() + go r.runGateTcpMqClient() } go r.recvHandler() go r.sendHandler() @@ -75,6 +78,13 @@ func NewMessageQueue(serverType string, appId string, rpcClient *rpc.Client) (r } func (m *MessageQueue) Close() { + // 等待所有待发送的消息发送完毕 + for { + if len(m.netMsgInput) == 0 { + break + } + time.Sleep(time.Millisecond * 100) + } m.natsConn.Close() } @@ -98,6 +108,10 @@ func (m *MessageQueue) recvHandler() { logger.Error("parse bin to net msg error: %v", err) continue } + // 忽略自己发出的广播消息 + if netMsg.OriginServerType == m.serverType && netMsg.OriginServerAppId == m.appId { + continue + } switch netMsg.MsgType { case MsgTypeGame: gameMsg := netMsg.GameMsg @@ -168,9 +182,15 @@ func (m *MessageQueue) sendHandler() { return } } + // 广播消息只能走nats + if netMsg.ServerType == "ALL_SERVER_HK4E" { + fallbackNatsMqSend() + continue + } // 有tcp快速通道就走快速通道 instMap, exist := gateTcpMqInstMap[netMsg.ServerType] if !exist { + logger.Error("unknown server type: %v", netMsg.ServerType) fallbackNatsMqSend() continue } @@ -204,6 +224,7 @@ func (m *MessageQueue) sendHandler() { case EventDisconnect: logger.Warn("gate tcp mq disconnect, addr: %v, server type: %v, appid: %v", inst.conn.RemoteAddr().String(), inst.serverType, inst.appId) delete(gateTcpMqInstMap[inst.serverType], inst.appId) + m.gateTcpMqDeadEventChan <- inst.conn.RemoteAddr().String() } } } @@ -225,7 +246,7 @@ type GateTcpMqEvent struct { inst *GateTcpMqInst } -func (m *MessageQueue) initGateTcpMqServer() { +func (m *MessageQueue) runGateTcpMqServer() { addr, err := net.ResolveTCPAddr("tcp4", "0.0.0.0:"+strconv.Itoa(int(config.CONF.Hk4e.GateTcpMqPort))) if err != nil { logger.Error("gate tcp mq parse port error: %v", err) @@ -236,17 +257,13 @@ func (m *MessageQueue) initGateTcpMqServer() { logger.Error("gate tcp mq listen error: %v", err) return } - go m.gateTcpMqAcceptHandle(listener) -} - -func (m *MessageQueue) gateTcpMqAcceptHandle(listener *net.TCPListener) { for { conn, err := listener.Accept() if err != nil { logger.Error("gate tcp mq accept error: %v", err) return } - logger.Info("server connect gate tcp mq, addr: %v", conn.RemoteAddr().String()) + logger.Info("accept gate tcp mq, server addr: %v", conn.RemoteAddr().String()) go m.gateTcpMqHandshake(conn) } } @@ -296,14 +313,37 @@ func (m *MessageQueue) gateTcpMqHandshake(conn net.Conn) { } } -func (m *MessageQueue) initGateTcpMqClient() { +func (m *MessageQueue) runGateTcpMqClient() { + // 已存在的GATE连接列表 + gateServerConnAddrMap := make(map[string]bool) + m.gateTcpMqConn(gateServerConnAddrMap) + ticker := time.NewTicker(time.Minute) + for { + select { + case addr := <-m.gateTcpMqDeadEventChan: + // GATE连接断开 + delete(gateServerConnAddrMap, addr) + case <-ticker.C: + // 定时获取全部GATE实例地址并建立连接 + m.gateTcpMqConn(gateServerConnAddrMap) + } + } +} + +func (m *MessageQueue) gateTcpMqConn(gateServerConnAddrMap map[string]bool) { rsp, err := m.rpcClient.Discovery.GetAllGateServerInfoList(context.TODO(), new(api.NullMsg)) if err != nil { logger.Error("gate tcp mq get gate list error: %v", err) return } for _, gateServerInfo := range rsp.GateServerInfoList { - addr, err := net.ResolveTCPAddr("tcp4", gateServerInfo.MqAddr+":"+strconv.Itoa(int(gateServerInfo.MqPort))) + gateServerAddr := gateServerInfo.MqAddr + ":" + strconv.Itoa(int(gateServerInfo.MqPort)) + _, exist := gateServerConnAddrMap[gateServerAddr] + // GATE连接已存在 + if exist { + continue + } + addr, err := net.ResolveTCPAddr("tcp4", gateServerAddr) if err != nil { logger.Error("gate tcp mq parse addr error: %v", err) return @@ -323,11 +363,13 @@ func (m *MessageQueue) initGateTcpMqClient() { serverType: api.GATE, appId: gateServerInfo.AppId, } - go m.gateTcpMqRecvHandle(inst) m.gateTcpMqEventChan <- &GateTcpMqEvent{ event: EventConnect, inst: inst, } + gateServerConnAddrMap[gateServerAddr] = true + logger.Info("connect gate tcp mq, gate addr: %v", conn.RemoteAddr().String()) + go m.gateTcpMqRecvHandle(inst) } } diff --git a/common/mq/topic.go b/common/mq/topic.go index 5ca0990c..f5209f2e 100644 --- a/common/mq/topic.go +++ b/common/mq/topic.go @@ -17,9 +17,9 @@ func (m *MessageQueue) getTopic(serverType string, appId string) string { func (m *MessageQueue) SendToGate(appId string, netMsg *NetMsg) { netMsg.Topic = m.getTopic(api.GATE, appId) - originServerType, originServerAppId := m.getOriginServer() netMsg.ServerType = api.GATE netMsg.AppId = appId + originServerType, originServerAppId := m.getOriginServer() netMsg.OriginServerType = originServerType netMsg.OriginServerAppId = originServerAppId m.netMsgInput <- netMsg @@ -27,9 +27,9 @@ func (m *MessageQueue) SendToGate(appId string, netMsg *NetMsg) { func (m *MessageQueue) SendToGs(appId string, netMsg *NetMsg) { netMsg.Topic = m.getTopic(api.GS, appId) - originServerType, originServerAppId := m.getOriginServer() netMsg.ServerType = api.GS netMsg.AppId = appId + originServerType, originServerAppId := m.getOriginServer() netMsg.OriginServerType = originServerType netMsg.OriginServerAppId = originServerAppId m.netMsgInput <- netMsg @@ -37,9 +37,9 @@ func (m *MessageQueue) SendToGs(appId string, netMsg *NetMsg) { func (m *MessageQueue) SendToFight(appId string, netMsg *NetMsg) { netMsg.Topic = m.getTopic(api.FIGHT, appId) - originServerType, originServerAppId := m.getOriginServer() netMsg.ServerType = api.FIGHT netMsg.AppId = appId + originServerType, originServerAppId := m.getOriginServer() netMsg.OriginServerType = originServerType netMsg.OriginServerAppId = originServerAppId m.netMsgInput <- netMsg @@ -47,9 +47,9 @@ func (m *MessageQueue) SendToFight(appId string, netMsg *NetMsg) { func (m *MessageQueue) SendToPathfinding(appId string, netMsg *NetMsg) { netMsg.Topic = m.getTopic(api.PATHFINDING, appId) - originServerType, originServerAppId := m.getOriginServer() netMsg.ServerType = api.PATHFINDING netMsg.AppId = appId + originServerType, originServerAppId := m.getOriginServer() netMsg.OriginServerType = originServerType netMsg.OriginServerAppId = originServerAppId m.netMsgInput <- netMsg @@ -57,6 +57,7 @@ func (m *MessageQueue) SendToPathfinding(appId string, netMsg *NetMsg) { func (m *MessageQueue) SendToAll(netMsg *NetMsg) { netMsg.Topic = "ALL_SERVER_HK4E" + netMsg.ServerType = "ALL_SERVER_HK4E" originServerType, originServerAppId := m.getOriginServer() netMsg.OriginServerType = originServerType netMsg.OriginServerAppId = originServerAppId diff --git a/dispatch/controller/gate_controller.go b/dispatch/controller/gate_controller.go index 8adb04f1..e246632d 100644 --- a/dispatch/controller/gate_controller.go +++ b/dispatch/controller/gate_controller.go @@ -22,42 +22,42 @@ type TokenVerifyRsp struct { } func (c *Controller) gateTokenVerify(context *gin.Context) { - VerifyFail := func() { + verifyFail := func(playerID uint32) { context.JSON(http.StatusOK, &TokenVerifyRsp{ Valid: false, Forbid: false, ForbidEndTime: 0, - PlayerID: 0, + PlayerID: playerID, }) } tokenVerifyReq := new(TokenVerifyReq) err := context.ShouldBindJSON(tokenVerifyReq) if err != nil { - VerifyFail() + verifyFail(0) return } logger.Info("gate token verify, req: %v", tokenVerifyReq) accountId, err := strconv.ParseUint(tokenVerifyReq.AccountId, 10, 64) if err != nil { - VerifyFail() + verifyFail(0) return } account, err := c.dao.QueryAccountByField("accountID", accountId) if err != nil || account == nil { - VerifyFail() + verifyFail(0) return } if tokenVerifyReq.AccountToken != account.ComboToken { - VerifyFail() + verifyFail(uint32(account.PlayerID)) return } if account.ComboTokenUsed { - VerifyFail() + verifyFail(uint32(account.PlayerID)) return } _, err = c.dao.UpdateAccountFieldByFieldName("accountID", account.AccountID, "comboTokenUsed", true) if err != nil { - VerifyFail() + verifyFail(uint32(account.PlayerID)) return } context.JSON(http.StatusOK, &TokenVerifyRsp{ diff --git a/gate/net/kcp_connect_manager.go b/gate/net/kcp_connect_manager.go index e4c612fb..36e3e376 100644 --- a/gate/net/kcp_connect_manager.go +++ b/gate/net/kcp_connect_manager.go @@ -28,7 +28,7 @@ const ( PacketMaxLen = 343 * 1024 // 最大应用层包长度 ConnRecvTimeout = 30 // 收包超时时间 秒 ConnSendTimeout = 10 // 发包超时时间 秒 - MaxClientConnNumLimit = 100 // 最大客户端连接数限制 + MaxClientConnNumLimit = 2 // 最大客户端连接数限制 ) var CLIENT_CONN_NUM int32 = 0 // 当前客户端连接数 @@ -37,14 +37,17 @@ type KcpConnectManager struct { discovery *rpc.DiscoveryClient // node服务器客户端 openState bool // 网关开放状态 // 会话 - sessionConvIdMap map[uint64]*Session - sessionUserIdMap map[uint32]*Session - sessionMapLock sync.RWMutex - createSessionChan chan *Session - destroySessionChan chan *Session + sessionConvIdMap map[uint64]*Session + sessionUserIdMap map[uint32]*Session + sessionMapLock sync.RWMutex + createSessionChan chan *Session + destroySessionChan chan *Session + globalGsOnlineMap map[uint32]string + globalGsOnlineMapLock sync.RWMutex // 连接事件 - kcpEventInput chan *KcpEvent - kcpEventOutput chan *KcpEvent + kcpEventInput chan *KcpEvent + kcpEventOutput chan *KcpEvent + reLoginRemoteKickRegChan chan *RemoteKick // 协议 serverCmdProtoMap *cmd.CmdProtoMap clientCmdProtoMap *client_proto.ClientCmdProtoMap @@ -65,8 +68,10 @@ func NewKcpConnectManager(messageQueue *mq.MessageQueue, discovery *rpc.Discover r.sessionUserIdMap = make(map[uint32]*Session) r.createSessionChan = make(chan *Session, 1000) r.destroySessionChan = make(chan *Session, 1000) + r.globalGsOnlineMap = make(map[uint32]string) r.kcpEventInput = make(chan *KcpEvent, 1000) r.kcpEventOutput = make(chan *KcpEvent, 1000) + r.reLoginRemoteKickRegChan = make(chan *RemoteKick, 1000) r.serverCmdProtoMap = cmd.NewCmdProtoMap() if config.CONF.Hk4e.ClientProtoProxyEnable { r.clientCmdProtoMap = client_proto.NewClientCmdProtoMap() @@ -110,8 +115,6 @@ func (k *KcpConnectManager) run() { func (k *KcpConnectManager) Close() { k.closeAllKcpConn() - // 等待所有连接关闭时需要发送的消息发送完毕 - time.Sleep(time.Second * 3) } func (k *KcpConnectManager) gateNetInfo() { @@ -145,13 +148,6 @@ func (k *KcpConnectManager) acceptHandle(listener *kcp.Listener) { _ = conn.Close() continue } - clientConnNum := atomic.AddInt32(&CLIENT_CONN_NUM, 1) - if clientConnNum > MaxClientConnNumLimit { - logger.Error("gate conn num limit, convId: %v", convId) - _ = conn.Close() - atomic.AddInt32(&CLIENT_CONN_NUM, -1) - continue - } conn.SetACKNoDelay(true) conn.SetWriteDelay(false) logger.Info("client connect, convId: %v", convId) diff --git a/gate/net/kcp_event.go b/gate/net/kcp_event.go index a89cbadc..9576bef1 100644 --- a/gate/net/kcp_event.go +++ b/gate/net/kcp_event.go @@ -61,13 +61,7 @@ func (k *KcpConnectManager) eventHandle() { k.closeAllKcpConn() } case KcpConnRelogin: - kickFinishNotifyChan, ok := event.EventMessage.(chan bool) - if !ok { - logger.Error("event KcpConnRelogin msg type error") - continue - } k.forceCloseKcpConn(event.ConvId, kcp.EnetServerRelogin) - kickFinishNotifyChan <- true } } } diff --git a/gate/net/session.go b/gate/net/session.go index 62554ddd..6e1f3c61 100644 --- a/gate/net/session.go +++ b/gate/net/session.go @@ -9,6 +9,7 @@ import ( "math/rand" "strconv" "strings" + "sync/atomic" "time" "hk4e/common/config" @@ -148,13 +149,14 @@ func (k *KcpConnectManager) recvMsgHandle(protoMsg *ProtoMsg, session *Session) } } -// 从GS接收消息 +// 接收来自其他服务器的消息 func (k *KcpConnectManager) sendMsgHandle() { logger.Debug("send msg handle start") + // 函数栈内缓存 添加删除事件走chan 避免频繁加锁 convSessionMap := make(map[uint64]*Session) userIdConvMap := make(map[uint32]uint64) + // 分发到每个连接具体的发送协程 sendToClientFn := func(protoMsg *ProtoMsg) { - // 分发到每个连接具体的发送协程 session := convSessionMap[protoMsg.ConvId] if session == nil { logger.Error("session is nil, convId: %v", protoMsg.ConvId) @@ -212,6 +214,8 @@ func (k *KcpConnectManager) sendMsgHandle() { } kcpRawSendChan <- protoMsg } + // 远程全局顶号注册列表 + reLoginRemoteKickRegMap := make(map[uint32]chan bool) for { select { case session := <-k.createSessionChan: @@ -221,6 +225,8 @@ func (k *KcpConnectManager) sendMsgHandle() { delete(convSessionMap, session.conn.GetConv()) delete(userIdConvMap, session.userId) close(session.kcpRawSendChan) + case remoteKick := <-k.reLoginRemoteKickRegChan: + reLoginRemoteKickRegMap[remoteKick.userId] = remoteKick.kickFinishNotifyChan case protoMsg := <-k.localMsgOutput: sendToClientFn(protoMsg) case netMsg := <-k.messageQueue.GetNetMsg(): @@ -268,7 +274,7 @@ func (k *KcpConnectManager) sendMsgHandle() { session := convSessionMap[convId] if session == nil { logger.Error("session is nil, convId: %v", convId) - return + continue } session.gsServerAppId = serverMsg.GameServerAppId session.fightServerAppId = "" @@ -285,6 +291,22 @@ func (k *KcpConnectManager) sendMsgHandle() { EventId: mq.NormalMsg, GameMsg: gameMsg, }) + case mq.ServerUserOnlineStateChangeNotify: + // 收到GS玩家离线完成通知 唤醒存在的顶号登录流程 + if serverMsg.IsOnline { + k.globalGsOnlineMapLock.Lock() + k.globalGsOnlineMap[serverMsg.UserId] = netMsg.OriginServerAppId + k.globalGsOnlineMapLock.Unlock() + } else { + k.globalGsOnlineMapLock.Lock() + delete(k.globalGsOnlineMap, serverMsg.UserId) + k.globalGsOnlineMapLock.Unlock() + kickFinishNotifyChan, exist := reLoginRemoteKickRegMap[serverMsg.UserId] + if !exist { + continue + } + kickFinishNotifyChan <- true + } } } } @@ -300,8 +322,13 @@ func (k *KcpConnectManager) getHeadMsg(clientSeq uint32) (headMsg *proto.PacketH return headMsg } -func (k *KcpConnectManager) getPlayerToken(req *proto.GetPlayerTokenReq, session *Session) (rsp *proto.GetPlayerTokenRsp) { - loginFail := func() { +type RemoteKick struct { + userId uint32 + kickFinishNotifyChan chan bool +} + +func (k *KcpConnectManager) getPlayerToken(req *proto.GetPlayerTokenReq, session *Session) *proto.GetPlayerTokenRsp { + loginFailClose := func() { k.kcpEventInput <- &KcpEvent{ ConvId: session.conn.GetConv(), EventId: KcpConnForceClose, @@ -313,26 +340,26 @@ func (k *KcpConnectManager) getPlayerToken(req *proto.GetPlayerTokenReq, session AccountToken: req.AccountToken, }, "") if err != nil { - logger.Error("verify token error: %v", err) - loginFail() + logger.Error("verify token error: %v, account uid: %v", err, req.AccountUid) + loginFailClose() return nil } - if !tokenVerifyRsp.Valid { - logger.Error("token error") - loginFail() - return nil - } - // comboToken验证成功 - if tokenVerifyRsp.Forbid { - // 封号通知 - rsp = new(proto.GetPlayerTokenRsp) - rsp.Uid = tokenVerifyRsp.PlayerID + uid := tokenVerifyRsp.PlayerID + loginFailRsp := func(retCode int32, isForbid bool, forbidEndTime uint32) *proto.GetPlayerTokenRsp { + // 关联session信息 不然包发不出去 + session.userId = uid + k.SetSession(session, session.conn.GetConv(), session.userId) + k.createSessionChan <- session + rsp := new(proto.GetPlayerTokenRsp) + rsp.Uid = uid rsp.IsProficientPlayer = true - rsp.Retcode = int32(proto.Retcode_RET_BLACK_UID) - rsp.Msg = "FORBID_CHEATING_PLUGINS" - rsp.BlackUidEndTime = tokenVerifyRsp.ForbidEndTime - if rsp.BlackUidEndTime == 0 { - rsp.BlackUidEndTime = 2051193600 // 2035-01-01 00:00:00 + rsp.Retcode = retCode + if isForbid { + rsp.Msg = "FORBID_CHEATING_PLUGINS" + rsp.BlackUidEndTime = forbidEndTime + if rsp.BlackUidEndTime == 0 { + rsp.BlackUidEndTime = 2051193600 // 2035-01-01 00:00:00 + } } rsp.RegPlatform = 3 rsp.CountryCode = "US" @@ -341,31 +368,57 @@ func (k *KcpConnectManager) getPlayerToken(req *proto.GetPlayerTokenReq, session rsp.ClientIpStr = split[0] return rsp } - oldSession := k.GetSessionByUserId(tokenVerifyRsp.PlayerID) + if !tokenVerifyRsp.Valid { + logger.Error("token error, uid: %v", uid) + return loginFailRsp(int32(proto.Retcode_RET_TOKEN_ERROR), false, 0) + } + // comboToken验证成功 + if tokenVerifyRsp.Forbid { + // 封号通知 + return loginFailRsp(int32(proto.Retcode_RET_BLACK_UID), true, tokenVerifyRsp.ForbidEndTime) + } + clientConnNum := atomic.AddInt32(&CLIENT_CONN_NUM, 1) + if clientConnNum > MaxClientConnNumLimit { + logger.Error("gate conn num limit, uid: %v", uid) + return loginFailRsp(int32(proto.Retcode_RET_MAX_PLAYER), false, 0) + } + oldSession := k.GetSessionByUserId(uid) if oldSession != nil { // 本地顶号 - kickFinishNotifyChan := make(chan bool) k.kcpEventInput <- &KcpEvent{ - ConvId: oldSession.conn.GetConv(), - EventId: KcpConnRelogin, - EventMessage: kickFinishNotifyChan, + ConvId: oldSession.conn.GetConv(), + EventId: KcpConnRelogin, + } + kickFinishNotifyChan := make(chan bool) + k.reLoginRemoteKickRegChan <- &RemoteKick{ + userId: uid, + kickFinishNotifyChan: kickFinishNotifyChan, } <-kickFinishNotifyChan - } else { + } + k.globalGsOnlineMapLock.RLock() + _, exist := k.globalGsOnlineMap[uid] + k.globalGsOnlineMapLock.RUnlock() + if exist { // 远程全局顶号 connCtrlMsg := new(mq.ConnCtrlMsg) - connCtrlMsg.KickUserId = tokenVerifyRsp.PlayerID + connCtrlMsg.KickUserId = uid connCtrlMsg.KickReason = kcp.EnetServerRelogin k.messageQueue.SendToAll(&mq.NetMsg{ MsgType: mq.MsgTypeConnCtrl, EventId: mq.KickPlayerNotify, ConnCtrlMsg: connCtrlMsg, }) - // TODO 确保旧连接已下线 已通知GS已保存好数据 - time.Sleep(time.Second) + // 注册回调通知 + kickFinishNotifyChan := make(chan bool) + k.reLoginRemoteKickRegChan <- &RemoteKick{ + userId: uid, + kickFinishNotifyChan: kickFinishNotifyChan, + } + <-kickFinishNotifyChan } // 关联玩家uid和连接信息 - session.userId = tokenVerifyRsp.PlayerID + session.userId = uid k.SetSession(session, session.conn.GetConv(), session.userId) k.createSessionChan <- session // 绑定各个服务器appid @@ -373,8 +426,8 @@ func (k *KcpConnectManager) getPlayerToken(req *proto.GetPlayerTokenReq, session ServerType: api.GS, }) if err != nil { - logger.Error("get gs server appid error: %v", err) - loginFail() + logger.Error("get gs server appid error: %v, uid: %v", err, uid) + loginFailClose() return nil } session.gsServerAppId = gsServerAppId.AppId @@ -382,22 +435,22 @@ func (k *KcpConnectManager) getPlayerToken(req *proto.GetPlayerTokenReq, session ServerType: api.FIGHT, }) if err != nil { - logger.Error("get fight server appid error: %v", err) + logger.Error("get fight server appid error: %v, uid: %v", err, uid) } session.fightServerAppId = fightServerAppId.AppId pathfindingServerAppId, err := k.discovery.GetServerAppId(context.TODO(), &api.GetServerAppIdReq{ ServerType: api.PATHFINDING, }) if err != nil { - logger.Error("get pathfinding server appid error: %v", err) + logger.Error("get pathfinding server appid error: %v, uid: %v", err, uid) } session.pathfindingServerAppId = pathfindingServerAppId.AppId - logger.Debug("session gs appid: %v", session.gsServerAppId) - logger.Debug("session fight appid: %v", session.fightServerAppId) - logger.Debug("session pathfinding appid: %v", session.pathfindingServerAppId) + logger.Debug("session gs appid: %v, uid: %v", session.gsServerAppId, uid) + logger.Debug("session fight appid: %v, uid: %v", session.fightServerAppId, uid) + logger.Debug("session pathfinding appid: %v, uid: %v", session.pathfindingServerAppId, uid) // 返回响应 - rsp = new(proto.GetPlayerTokenRsp) - rsp.Uid = tokenVerifyRsp.PlayerID + rsp := new(proto.GetPlayerTokenRsp) + rsp.Uid = uid rsp.AccountUid = req.AccountUid rsp.Token = req.AccountToken data := make([]byte, 16+32) @@ -418,66 +471,66 @@ func (k *KcpConnectManager) getPlayerToken(req *proto.GetPlayerTokenReq, session serverSeedUint64 := timeRand.Uint64() session.seed = serverSeedUint64 if req.GetKeyId() != 0 { - logger.Debug("do hk4e 2.8 rsa logic") + logger.Debug("do hk4e 2.8 rsa logic, uid: %v", uid) session.useMagicSeed = true keyId := strconv.Itoa(int(req.GetKeyId())) encPubPrivKey, exist := k.encRsaKeyMap[keyId] if !exist { - logger.Error("can not found key id: %v", keyId) - loginFail() + logger.Error("can not found key id: %v, uid: %v", keyId, uid) + loginFailClose() return nil } pubKey, err := endec.RsaParsePubKeyByPrivKey(encPubPrivKey) if err != nil { - logger.Error("parse rsa pub key error: %v", err) - loginFail() + logger.Error("parse rsa pub key error: %v, uid: %v", err, uid) + loginFailClose() return nil } signPrivkey, err := endec.RsaParsePrivKey(k.signRsaKey) if err != nil { - logger.Error("parse rsa priv key error: %v", err) - loginFail() + logger.Error("parse rsa priv key error: %v, uid: %v", err, uid) + loginFailClose() return nil } clientSeedBase64 := req.GetClientRandKey() clientSeedEnc, err := base64.StdEncoding.DecodeString(clientSeedBase64) if err != nil { - logger.Error("parse client seed base64 error: %v", err) - loginFail() + logger.Error("parse client seed base64 error: %v, uid: %v", err, uid) + loginFailClose() return nil } clientSeed, err := endec.RsaDecrypt(clientSeedEnc, signPrivkey) if err != nil { - logger.Error("rsa dec error: %v", err) - loginFail() + logger.Error("rsa dec error: %v, uid: %v", err, uid) + loginFailClose() return nil } clientSeedUint64 := uint64(0) err = binary.Read(bytes.NewReader(clientSeed), binary.BigEndian, &clientSeedUint64) if err != nil { - logger.Error("parse client seed to uint64 error: %v", err) - loginFail() + logger.Error("parse client seed to uint64 error: %v, uid: %v", err, uid) + loginFailClose() return nil } seedUint64 := serverSeedUint64 ^ clientSeedUint64 seedBuf := new(bytes.Buffer) err = binary.Write(seedBuf, binary.BigEndian, seedUint64) if err != nil { - logger.Error("conv seed uint64 to bytes error: %v", err) - loginFail() + logger.Error("conv seed uint64 to bytes error: %v, uid: %v", err, uid) + loginFailClose() return nil } seed := seedBuf.Bytes() seedEnc, err := endec.RsaEncrypt(seed, pubKey) if err != nil { - logger.Error("rsa enc error: %v", err) - loginFail() + logger.Error("rsa enc error: %v, uid: %v", err, uid) + loginFailClose() return nil } seedSign, err := endec.RsaSign(seed, signPrivkey) if err != nil { - logger.Error("rsa sign error: %v", err) - loginFail() + logger.Error("rsa sign error: %v, uid: %v", err, uid) + loginFailClose() return nil } rsp.KeyId = req.KeyId diff --git a/gs/dao/player_redis.go b/gs/dao/player_redis.go index 4660a6e9..194c8fef 100644 --- a/gs/dao/player_redis.go +++ b/gs/dao/player_redis.go @@ -21,13 +21,17 @@ func (d *Dao) GetRedisPlayerKey(userId uint32) string { } func (d *Dao) GetRedisPlayer(userId uint32) *model.Player { + startTime := time.Now().UnixNano() playerDataLz4, err := d.redis.Get(context.TODO(), d.GetRedisPlayerKey(userId)).Result() if err != nil { logger.Error("get player from redis error: %v", err) return nil } + endTime := time.Now().UnixNano() + costTime := endTime - startTime + logger.Debug("get player from redis cost time: %v ns", costTime) // 解压 - startTime := time.Now().UnixNano() + startTime = time.Now().UnixNano() in := bytes.NewReader([]byte(playerDataLz4)) out := new(bytes.Buffer) lz4Reader := lz4.NewReader(in) @@ -37,8 +41,8 @@ func (d *Dao) GetRedisPlayer(userId uint32) *model.Player { return nil } playerData := out.Bytes() - endTime := time.Now().UnixNano() - costTime := endTime - startTime + endTime = time.Now().UnixNano() + costTime = endTime - startTime logger.Debug("lz4 decode cost time: %v ns, before len: %v, after len: %v, ratio lz4/raw: %v", costTime, len(playerDataLz4), len(playerData), float64(len(playerDataLz4))/float64(len(playerData))) player := new(model.Player) @@ -76,11 +80,15 @@ func (d *Dao) SetRedisPlayer(player *model.Player) { costTime := endTime - startTime logger.Debug("lz4 encode cost time: %v ns, before len: %v, after len: %v, ratio lz4/raw: %v", costTime, len(playerData), len(playerDataLz4), float64(len(playerDataLz4))/float64(len(playerData))) + startTime = time.Now().UnixNano() err = d.redis.Set(context.TODO(), d.GetRedisPlayerKey(player.PlayerID), playerDataLz4, time.Hour*24*30).Err() if err != nil { logger.Error("set player to redis error: %v", err) return } + endTime = time.Now().UnixNano() + costTime = endTime - startTime + logger.Debug("set player to redis cost time: %v ns", costTime) } func (d *Dao) SetRedisPlayerList(playerList []*model.Player) { diff --git a/gs/game/game_manager.go b/gs/game/game_manager.go index 16cc4aa6..7a9f6870 100644 --- a/gs/game/game_manager.go +++ b/gs/game/game_manager.go @@ -185,6 +185,7 @@ func (g *GameManager) gameMainLoop() { tickCost := int64(0) localEventCost := int64(0) commandCost := int64(0) + routeCount := int64(0) runtime.LockOSThread() for { // 消耗CPU时间性能统计 @@ -194,7 +195,7 @@ func (g *GameManager) gameMainLoop() { tickCost /= 1e6 localEventCost /= 1e6 commandCost /= 1e6 - logger.Info("[GAME MAIN LOOP] cpu time cost detail, routeCost: %vms, tickCost: %vms, localEventCost: %vms, commandCost: %vms", + logger.Info("[GAME MAIN LOOP] cpu time cost detail, routeCost: %v ms, tickCost: %v ms, localEventCost: %v ms, commandCost: %v ms", routeCost, tickCost, localEventCost, commandCost) totalCost := routeCost + tickCost + localEventCost + commandCost logger.Info("[GAME MAIN LOOP] cpu time cost percent, routeCost: %v%%, tickCost: %v%%, localEventCost: %v%%, commandCost: %v%%", @@ -202,15 +203,17 @@ func (g *GameManager) gameMainLoop() { float32(tickCost)/float32(totalCost)*100.0, float32(localEventCost)/float32(totalCost)*100.0, float32(commandCost)/float32(totalCost)*100.0) - logger.Info("[GAME MAIN LOOP] total cpu time cost detail, totalCost: %vms", + logger.Info("[GAME MAIN LOOP] total cpu time cost detail, totalCost: %v ms", totalCost) logger.Info("[GAME MAIN LOOP] total cpu time cost percent, totalCost: %v%%", float32(totalCost)/float32(intervalTime/1e6)*100.0) + logger.Info("[GAME MAIN LOOP] avg route cost: %v ms", float32(routeCost)/float32(routeCount)) lastTime = now routeCost = 0 tickCost = 0 localEventCost = 0 commandCost = 0 + routeCount = 0 } select { case netMsg := <-MESSAGE_QUEUE.GetNetMsg(): @@ -219,6 +222,7 @@ func (g *GameManager) gameMainLoop() { ROUTE_MANAGER.RouteHandle(netMsg) end := time.Now().UnixNano() routeCost += end - start + routeCount++ case <-TICK_MANAGER.GetGlobalTick().C: // 游戏服务器定时帧 start := time.Now().UnixNano() @@ -232,12 +236,12 @@ func (g *GameManager) gameMainLoop() { end := time.Now().UnixNano() localEventCost += end - start case command := <-COMMAND_MANAGER.GetCommandTextInput(): - // 处理传入的命令(普通玩家 GM命令) + // 处理GM命令 start := time.Now().UnixNano() COMMAND_MANAGER.HandleCommand(command) end := time.Now().UnixNano() commandCost += end - start - logger.Info("run gm cmd cost: %v ns", commandCost) + logger.Info("run gm cmd cost: %v ns", end-start) } } } diff --git a/gs/game/player_common.go b/gs/game/player_common.go index e625d9ec..c5164add 100644 --- a/gs/game/player_common.go +++ b/gs/game/player_common.go @@ -4,6 +4,7 @@ import ( "time" "hk4e/common/mq" + "hk4e/gate/kcp" "hk4e/gs/model" "hk4e/pkg/logger" "hk4e/protocol/cmd" @@ -87,6 +88,13 @@ func (g *GameManager) ClientTimeNotify(userId uint32, clientTime uint32) { } logger.Debug("client time notify, uid: %v, time: %v", userId, clientTime) player.ClientTime = clientTime + now := time.Now().Unix() + // 客户端与服务器时间相差太过严重 + if now-int64(player.ClientTime) > 60 || int64(player.ClientTime)-now > 60 { + g.KickPlayer(player.PlayerID, kcp.EnetServerKick) + logger.Error("abs of client time and server time above 60, uid: %v", userId) + } + player.LastKeepaliveTime = uint32(now) } func (g *GameManager) ServerAnnounceNotify(announceId uint32, announceMsg string) { diff --git a/gs/game/tick_manager.go b/gs/game/tick_manager.go index acc38b3f..7140fd8d 100644 --- a/gs/game/tick_manager.go +++ b/gs/game/tick_manager.go @@ -85,6 +85,17 @@ func (t *TickManager) onUserTickSecond(userId uint32, now int64) { } func (t *TickManager) onUserTickMinute(userId uint32, now int64) { + player := USER_MANAGER.GetOnlineUser(userId) + if player == nil { + logger.Error("player is nil, uid: %v", userId) + return + } + if uint32(now/1000)-player.LastKeepaliveTime > 60 { + logger.Error("remove keepalive timeout user, uid: %v", userId) + GAME_MANAGER.OnUserOffline(userId, &ChangeGsInfo{ + IsChangeGs: false, + }) + } } // 玩家定时任务常量 @@ -182,15 +193,6 @@ func (t *TickManager) onTickMinute(now int64) { count := random.GetRandomInt32(0, 4) i := int32(0) for itemId := range allItemDataConfig { - itemDataConfig, ok := allItemDataConfig[itemId] - if !ok { - logger.Error("config is nil, itemId: %v", itemId) - return - } - // TODO 3.0.0REL版本中 发送某些无效家具 可能会导致客户端背包家具界面卡死 - if uint16(itemDataConfig.Type) == constant.ITEM_TYPE_FURNITURE { - continue - } num := random.GetRandomInt32(1, 9) GAME_MANAGER.AddUserItem(player.PlayerID, []*UserItem{{ItemId: uint32(itemId), ChangeCount: uint32(num)}}, true, 0) i++ diff --git a/gs/game/world_manager.go b/gs/game/world_manager.go index 695a336a..8d238692 100644 --- a/gs/game/world_manager.go +++ b/gs/game/world_manager.go @@ -16,7 +16,7 @@ import ( const ( ENTITY_NUM_UNLIMIT = false // 是否不限制场景内实体数量 ENTITY_MAX_SEND_NUM = 200 // 场景内最大实体数量 - MAX_MULTIPLAYER_WORLD_NUM = 10 // 本服务器最大多人世界数量 + MAX_MULTIPLAYER_WORLD_NUM = 2 // 本服务器最大多人世界数量 ) type WorldManager struct { diff --git a/gs/model/avatar.go b/gs/model/avatar.go index f28946fb..ad579f71 100644 --- a/gs/model/avatar.go +++ b/gs/model/avatar.go @@ -27,12 +27,12 @@ type Avatar struct { FetterLevel uint8 `bson:"fetterLevel"` // 好感度等级 FetterExp uint32 `bson:"fetterExp"` // 好感度经验 PromoteRewardMap map[uint32]bool `bson:"promoteRewardMap"` // 突破奖励 map[突破等级]是否已被领取 - Guid uint64 `bson:"-"` - EquipGuidMap map[uint64]uint64 `bson:"-"` - EquipWeapon *Weapon `bson:"-"` - EquipReliquaryList []*Reliquary `bson:"-"` - FightPropMap map[uint32]float32 `bson:"-"` - ExtraAbilityEmbryos map[string]bool `bson:"-"` + Guid uint64 `bson:"-" msgpack:"-"` + EquipGuidMap map[uint64]uint64 `bson:"-" msgpack:"-"` + EquipWeapon *Weapon `bson:"-" msgpack:"-"` + EquipReliquaryList []*Reliquary `bson:"-" msgpack:"-"` + FightPropMap map[uint32]float32 `bson:"-" msgpack:"-"` + ExtraAbilityEmbryos map[string]bool `bson:"-" msgpack:"-"` } func (p *Player) InitAllAvatar() { diff --git a/gs/model/item.go b/gs/model/item.go index f73c6467..1f4ef2a4 100644 --- a/gs/model/item.go +++ b/gs/model/item.go @@ -5,7 +5,7 @@ import "hk4e/common/constant" type Item struct { ItemId uint32 `bson:"itemId"` // 道具id Count uint32 `bson:"count"` // 道具数量 - Guid uint64 `bson:"-"` + Guid uint64 `bson:"-" msgpack:"-"` } func (p *Player) InitAllItem() { diff --git a/gs/model/player.go b/gs/model/player.go index 1caac8ec..7e30b99d 100644 --- a/gs/model/player.go +++ b/gs/model/player.go @@ -62,6 +62,7 @@ type Player struct { DbState int `bson:"-" msgpack:"-"` // 数据库存档状态 WorldId uint32 `bson:"-" msgpack:"-"` // 所在的世界id GameObjectGuidCounter uint64 `bson:"-" msgpack:"-"` // 游戏对象guid计数器 + LastKeepaliveTime uint32 `bson:"-" msgpack:"-"` // 上一次保持活跃时间 ClientTime uint32 `bson:"-" msgpack:"-"` // 玩家客户端的本地时钟 ClientRTT uint32 `bson:"-" msgpack:"-"` // 玩家客户端往返时延 GameObjectGuidMap map[uint64]GameObject `bson:"-" msgpack:"-"` // 游戏对象guid映射表 diff --git a/gs/model/reliquary.go b/gs/model/reliquary.go index 49ec1efe..1a5aba7a 100644 --- a/gs/model/reliquary.go +++ b/gs/model/reliquary.go @@ -15,7 +15,7 @@ type Reliquary struct { AffixIdList []uint32 `bson:"affixIdList"` // 词缀 MainPropId uint32 `bson:"mainPropId"` // 主词条id AvatarId uint32 `bson:"avatarId"` // 装备角色id - Guid uint64 `bson:"-"` + Guid uint64 `bson:"-" msgpack:"-"` } func (p *Player) InitReliquary(reliquary *Reliquary) { diff --git a/gs/model/team.go b/gs/model/team.go index 6d70dcb7..9736f557 100644 --- a/gs/model/team.go +++ b/gs/model/team.go @@ -36,8 +36,8 @@ type TeamInfo struct { TeamList []*Team `bson:"teamList"` CurrTeamIndex uint8 `bson:"currTeamIndex"` CurrAvatarIndex uint8 `bson:"currAvatarIndex"` - TeamResonances map[uint16]bool `bson:"-"` - TeamResonancesConfig map[int32]bool `bson:"-"` + TeamResonances map[uint16]bool `bson:"-" msgpack:"-"` + TeamResonancesConfig map[int32]bool `bson:"-" msgpack:"-"` } func NewTeamInfo() (r *TeamInfo) { diff --git a/gs/model/weapon.go b/gs/model/weapon.go index ac51a5c3..9c02df0b 100644 --- a/gs/model/weapon.go +++ b/gs/model/weapon.go @@ -15,7 +15,7 @@ type Weapon struct { AffixIdList []uint32 `bson:"affixIdList"` // 词缀 Refinement uint8 `bson:"refinement"` // 精炼等阶 AvatarId uint32 `bson:"avatarId"` // 装备角色id - Guid uint64 `bson:"-"` + Guid uint64 `bson:"-" msgpack:"-"` } func (p *Player) InitWeapon(weapon *Weapon) {