网关服务器IO阻塞优化

This commit is contained in:
flswld
2023-03-17 16:39:42 +08:00
parent 7de1d2e765
commit fd0bd712c9
4 changed files with 36 additions and 39 deletions

View File

@@ -17,6 +17,7 @@ type Handle struct {
func NewHandle(messageQueue *mq.MessageQueue) (r *Handle) {
r = new(Handle)
r.messageQueue = messageQueue
r.run()
return r
}

View File

@@ -203,6 +203,11 @@ func (m *MessageQueue) sendHandler() {
netMsgDataTcp := make([]byte, 4+len(netMsgData))
binary.BigEndian.PutUint32(netMsgDataTcp, uint32(len(netMsgData)))
copy(netMsgDataTcp[4:], netMsgData)
err = inst.conn.SetWriteDeadline(time.Now().Add(time.Second))
if err != nil {
fallbackNatsMqSend()
continue
}
_, err = inst.conn.Write(netMsgDataTcp)
if err != nil {
// 发送失败关闭连接fallback回nats

View File

@@ -156,7 +156,7 @@ func (k *KcpConnectManager) sendMsgHandle() {
convSessionMap := make(map[uint64]*Session)
userIdConvMap := make(map[uint32]uint64)
// 分发到每个连接具体的发送协程
sendToClientFn := func(protoMsg *ProtoMsg) {
sendToClient := func(protoMsg *ProtoMsg) {
session := convSessionMap[protoMsg.ConvId]
if session == nil {
logger.Error("session is nil, convId: %v", protoMsg.ConvId)
@@ -205,8 +205,9 @@ func (k *KcpConnectManager) sendMsgHandle() {
close(session.kcpRawSendChan)
case remoteKick := <-k.reLoginRemoteKickRegChan:
reLoginRemoteKickRegMap[remoteKick.userId] = remoteKick.kickFinishNotifyChan
remoteKick.regFinishNotifyChan <- true
case protoMsg := <-k.localMsgOutput:
sendToClientFn(protoMsg)
sendToClient(protoMsg)
case netMsg := <-k.messageQueue.GetNetMsg():
switch netMsg.MsgType {
case mq.MsgTypeGame:
@@ -223,7 +224,7 @@ func (k *KcpConnectManager) sendMsgHandle() {
protoMsg.CmdId = gameMsg.CmdId
protoMsg.HeadMessage = k.getHeadMsg(gameMsg.ClientSeq)
protoMsg.PayloadMessage = gameMsg.PayloadMessage
sendToClientFn(protoMsg)
sendToClient(protoMsg)
}
case mq.MsgTypeConnCtrl:
connCtrlMsg := netMsg.ConnCtrlMsg
@@ -306,6 +307,7 @@ func (k *KcpConnectManager) getHeadMsg(clientSeq uint32) (headMsg *proto.PacketH
}
type RemoteKick struct {
regFinishNotifyChan chan bool
userId uint32
kickFinishNotifyChan chan bool
}
@@ -367,34 +369,30 @@ func (k *KcpConnectManager) getPlayerToken(req *proto.GetPlayerTokenReq, session
logger.Error("gate conn num limit, uid: %v", uid)
return loginFailRsp(int32(proto.Retcode_RET_MAX_PLAYER), false, 0)
}
oldSession := k.GetSessionByUserId(uid)
if oldSession != nil {
// 本地顶号
k.kcpEventInput <- &KcpEvent{
ConvId: oldSession.conn.GetConv(),
EventId: KcpConnRelogin,
}
kickFinishNotifyChan := make(chan bool, 1)
k.reLoginRemoteKickRegChan <- &RemoteKick{
userId: uid,
kickFinishNotifyChan: kickFinishNotifyChan,
}
logger.Info("run local interrupt login wait, uid: %v", uid)
timer := time.NewTimer(time.Second * 10)
select {
case <-timer.C:
logger.Error("local interrupt login wait timeout, uid: %v", uid)
timer.Stop()
loginFailClose()
return nil
case <-kickFinishNotifyChan:
}
}
k.globalGsOnlineMapLock.RLock()
_, exist := k.globalGsOnlineMap[uid]
k.globalGsOnlineMapLock.RUnlock()
if exist {
// 远程全局顶号
// 注册回调通知
regFinishNotifyChan := make(chan bool, 1)
kickFinishNotifyChan := make(chan bool, 1)
k.reLoginRemoteKickRegChan <- &RemoteKick{
regFinishNotifyChan: regFinishNotifyChan,
userId: uid,
kickFinishNotifyChan: kickFinishNotifyChan,
}
// 注册等待
logger.Info("run global interrupt login reg wait, uid: %v", uid)
timer := time.NewTimer(time.Second * 1)
select {
case <-timer.C:
logger.Error("global interrupt login reg wait timeout, uid: %v", uid)
timer.Stop()
loginFailClose()
return nil
case <-regFinishNotifyChan:
}
// 顶号
connCtrlMsg := new(mq.ConnCtrlMsg)
connCtrlMsg.KickUserId = uid
connCtrlMsg.KickReason = kcp.EnetServerRelogin
@@ -403,17 +401,12 @@ func (k *KcpConnectManager) getPlayerToken(req *proto.GetPlayerTokenReq, session
EventId: mq.KickPlayerNotify,
ConnCtrlMsg: connCtrlMsg,
})
// 注册回调通知
kickFinishNotifyChan := make(chan bool, 1)
k.reLoginRemoteKickRegChan <- &RemoteKick{
userId: uid,
kickFinishNotifyChan: kickFinishNotifyChan,
}
logger.Info("run global interrupt login wait, uid: %v", uid)
timer := time.NewTimer(time.Second * 10)
// 顶号等待
logger.Info("run global interrupt login kick wait, uid: %v", uid)
timer = time.NewTimer(time.Second * 10)
select {
case <-timer.C:
logger.Error("global interrupt login wait timeout, uid: %v", uid)
logger.Error("global interrupt login kick wait timeout, uid: %v", uid)
timer.Stop()
loginFailClose()
return nil

View File

@@ -33,9 +33,7 @@ func DoForward[IET model.InvokeEntryType](player *model.Player, invokeHandler *m
}
}
if invokeHandler.AllLen() == 0 && invokeHandler.AllExceptCurLen() == 0 && invokeHandler.HostLen() == 0 {
for _, v := range world.GetAllPlayer() {
GAME_MANAGER.SendMsg(cmdId, v.PlayerID, player.ClientSeq, newNtf)
}
return
}
if invokeHandler.AllLen() > 0 {
reflection.SetStructFieldValue(newNtf, forwardField, invokeHandler.EntryListForwardAll)