From fd0bd712c9ba4c018d20415cb4b69276a32ae0f7 Mon Sep 17 00:00:00 2001 From: flswld Date: Fri, 17 Mar 2023 16:39:42 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BD=91=E5=85=B3=E6=9C=8D=E5=8A=A1=E5=99=A8IO?= =?UTF-8?q?=E9=98=BB=E5=A1=9E=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- anticheat/handle/handle.go | 1 + common/mq/nats.go | 5 +++ gate/net/session.go | 65 ++++++++++++++++-------------------- gs/game/player_fight_sync.go | 4 +-- 4 files changed, 36 insertions(+), 39 deletions(-) diff --git a/anticheat/handle/handle.go b/anticheat/handle/handle.go index ea65d6cb..49b43e44 100644 --- a/anticheat/handle/handle.go +++ b/anticheat/handle/handle.go @@ -17,6 +17,7 @@ type Handle struct { func NewHandle(messageQueue *mq.MessageQueue) (r *Handle) { r = new(Handle) r.messageQueue = messageQueue + r.run() return r } diff --git a/common/mq/nats.go b/common/mq/nats.go index 8aaa7882..c59e5561 100644 --- a/common/mq/nats.go +++ b/common/mq/nats.go @@ -203,6 +203,11 @@ func (m *MessageQueue) sendHandler() { netMsgDataTcp := make([]byte, 4+len(netMsgData)) binary.BigEndian.PutUint32(netMsgDataTcp, uint32(len(netMsgData))) copy(netMsgDataTcp[4:], netMsgData) + err = inst.conn.SetWriteDeadline(time.Now().Add(time.Second)) + if err != nil { + fallbackNatsMqSend() + continue + } _, err = inst.conn.Write(netMsgDataTcp) if err != nil { // 发送失败关闭连接fallback回nats diff --git a/gate/net/session.go b/gate/net/session.go index 468d1180..1bc92f70 100644 --- a/gate/net/session.go +++ b/gate/net/session.go @@ -156,7 +156,7 @@ func (k *KcpConnectManager) sendMsgHandle() { convSessionMap := make(map[uint64]*Session) userIdConvMap := make(map[uint32]uint64) // 分发到每个连接具体的发送协程 - sendToClientFn := func(protoMsg *ProtoMsg) { + sendToClient := func(protoMsg *ProtoMsg) { session := convSessionMap[protoMsg.ConvId] if session == nil { logger.Error("session is nil, convId: %v", protoMsg.ConvId) @@ -205,8 +205,9 @@ func (k *KcpConnectManager) sendMsgHandle() { close(session.kcpRawSendChan) case remoteKick := <-k.reLoginRemoteKickRegChan: reLoginRemoteKickRegMap[remoteKick.userId] = remoteKick.kickFinishNotifyChan + remoteKick.regFinishNotifyChan <- true case protoMsg := <-k.localMsgOutput: - sendToClientFn(protoMsg) + sendToClient(protoMsg) case netMsg := <-k.messageQueue.GetNetMsg(): switch netMsg.MsgType { case mq.MsgTypeGame: @@ -223,7 +224,7 @@ func (k *KcpConnectManager) sendMsgHandle() { protoMsg.CmdId = gameMsg.CmdId protoMsg.HeadMessage = k.getHeadMsg(gameMsg.ClientSeq) protoMsg.PayloadMessage = gameMsg.PayloadMessage - sendToClientFn(protoMsg) + sendToClient(protoMsg) } case mq.MsgTypeConnCtrl: connCtrlMsg := netMsg.ConnCtrlMsg @@ -306,6 +307,7 @@ func (k *KcpConnectManager) getHeadMsg(clientSeq uint32) (headMsg *proto.PacketH } type RemoteKick struct { + regFinishNotifyChan chan bool userId uint32 kickFinishNotifyChan chan bool } @@ -367,34 +369,30 @@ func (k *KcpConnectManager) getPlayerToken(req *proto.GetPlayerTokenReq, session logger.Error("gate conn num limit, uid: %v", uid) return loginFailRsp(int32(proto.Retcode_RET_MAX_PLAYER), false, 0) } - oldSession := k.GetSessionByUserId(uid) - if oldSession != nil { - // 本地顶号 - k.kcpEventInput <- &KcpEvent{ - ConvId: oldSession.conn.GetConv(), - EventId: KcpConnRelogin, - } - kickFinishNotifyChan := make(chan bool, 1) - k.reLoginRemoteKickRegChan <- &RemoteKick{ - userId: uid, - kickFinishNotifyChan: kickFinishNotifyChan, - } - logger.Info("run local interrupt login wait, uid: %v", uid) - timer := time.NewTimer(time.Second * 10) - select { - case <-timer.C: - logger.Error("local interrupt login wait timeout, uid: %v", uid) - timer.Stop() - loginFailClose() - return nil - case <-kickFinishNotifyChan: - } - } k.globalGsOnlineMapLock.RLock() _, exist := k.globalGsOnlineMap[uid] k.globalGsOnlineMapLock.RUnlock() if exist { - // 远程全局顶号 + // 注册回调通知 + regFinishNotifyChan := make(chan bool, 1) + kickFinishNotifyChan := make(chan bool, 1) + k.reLoginRemoteKickRegChan <- &RemoteKick{ + regFinishNotifyChan: regFinishNotifyChan, + userId: uid, + kickFinishNotifyChan: kickFinishNotifyChan, + } + // 注册等待 + logger.Info("run global interrupt login reg wait, uid: %v", uid) + timer := time.NewTimer(time.Second * 1) + select { + case <-timer.C: + logger.Error("global interrupt login reg wait timeout, uid: %v", uid) + timer.Stop() + loginFailClose() + return nil + case <-regFinishNotifyChan: + } + // 顶号 connCtrlMsg := new(mq.ConnCtrlMsg) connCtrlMsg.KickUserId = uid connCtrlMsg.KickReason = kcp.EnetServerRelogin @@ -403,17 +401,12 @@ func (k *KcpConnectManager) getPlayerToken(req *proto.GetPlayerTokenReq, session EventId: mq.KickPlayerNotify, ConnCtrlMsg: connCtrlMsg, }) - // 注册回调通知 - kickFinishNotifyChan := make(chan bool, 1) - k.reLoginRemoteKickRegChan <- &RemoteKick{ - userId: uid, - kickFinishNotifyChan: kickFinishNotifyChan, - } - logger.Info("run global interrupt login wait, uid: %v", uid) - timer := time.NewTimer(time.Second * 10) + // 顶号等待 + logger.Info("run global interrupt login kick wait, uid: %v", uid) + timer = time.NewTimer(time.Second * 10) select { case <-timer.C: - logger.Error("global interrupt login wait timeout, uid: %v", uid) + logger.Error("global interrupt login kick wait timeout, uid: %v", uid) timer.Stop() loginFailClose() return nil diff --git a/gs/game/player_fight_sync.go b/gs/game/player_fight_sync.go index a37e73d0..44ee67a0 100644 --- a/gs/game/player_fight_sync.go +++ b/gs/game/player_fight_sync.go @@ -33,9 +33,7 @@ func DoForward[IET model.InvokeEntryType](player *model.Player, invokeHandler *m } } if invokeHandler.AllLen() == 0 && invokeHandler.AllExceptCurLen() == 0 && invokeHandler.HostLen() == 0 { - for _, v := range world.GetAllPlayer() { - GAME_MANAGER.SendMsg(cmdId, v.PlayerID, player.ClientSeq, newNtf) - } + return } if invokeHandler.AllLen() > 0 { reflection.SetStructFieldValue(newNtf, forwardField, invokeHandler.EntryListForwardAll)