From b6be98c58cc824f6c309deed8e5ab8095baaadab Mon Sep 17 00:00:00 2001 From: flswld Date: Fri, 14 Apr 2023 22:07:41 +0800 Subject: [PATCH] =?UTF-8?q?tcp=20socket=E5=8D=8F=E8=AE=AE=E5=88=86?= =?UTF-8?q?=E5=89=B2=E8=A7=A3=E6=9E=90=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/mq/nats.go | 84 +++++++++++++++------------------ gate/net/kcp_connect_manager.go | 3 +- gate/net/kcp_endecode.go | 28 ++++------- gs/game/game_world_scene.go | 3 ++ gs/game/player_fight_sync.go | 19 +++++++- gs/game/player_scene.go | 21 ++++++++- robot/net/session.go | 3 +- 7 files changed, 89 insertions(+), 72 deletions(-) diff --git a/common/mq/nats.go b/common/mq/nats.go index 1cd449f2..77da7c54 100644 --- a/common/mq/nats.go +++ b/common/mq/nats.go @@ -399,54 +399,46 @@ func (m *MessageQueue) gateTcpMqConn(gateServerConnAddrMap map[string]bool) { } func (m *MessageQueue) gateTcpMqRecvHandle(inst *GateTcpMqInst) { - dataBuf := make([]byte, 0, 1024) - recvBuf := make([]byte, 1024*1024) + header := make([]byte, 4) + payload := make([]byte, 1024) for { - recvLen, err := inst.conn.Read(recvBuf) - if err != nil { - logger.Error("gate tcp mq recv error: %v", err) - m.gateTcpMqEventChan <- &GateTcpMqEvent{ - event: EventDisconnect, - inst: inst, + // 读取头部的消息长度 + recvLen := 0 + for recvLen < 4 { + n, err := inst.conn.Read(header[recvLen:]) + if err != nil { + logger.Error("gate tcp mq recv error: %v", err) + m.gateTcpMqEventChan <- &GateTcpMqEvent{ + event: EventDisconnect, + inst: inst, + } + _ = inst.conn.Close() + return } - _ = inst.conn.Close() - return + recvLen += n + } + msgLen := binary.BigEndian.Uint32(header) + // 读取消息体 + if len(payload) < int(msgLen) { + payload = make([]byte, msgLen) + } + recvLen = 0 + for recvLen < int(msgLen) { + n, err := inst.conn.Read(payload[recvLen:msgLen]) + if err != nil { + logger.Error("gate tcp mq recv error: %v", err) + m.gateTcpMqEventChan <- &GateTcpMqEvent{ + event: EventDisconnect, + inst: inst, + } + _ = inst.conn.Close() + return + } + recvLen += n + } + netMsg := m.parseNetMsg(payload[:msgLen]) + if netMsg != nil { + m.netMsgOutput <- netMsg } - recvBuf = recvBuf[:recvLen] - m.gateTcpMqRecvHandleLoop(recvBuf, &dataBuf) - } -} - -func (m *MessageQueue) gateTcpMqRecvHandleLoop(data []byte, dataBuf *[]byte) { - if len(*dataBuf) != 0 { - // 取出之前的缓冲区数据 - data = append(*dataBuf, data...) - *dataBuf = make([]byte, 0, 1024) - } - // 长度太短 - if len(data) < 4 { - *dataBuf = append(*dataBuf, data...) - return - } - // 消息的载荷部分长度 - msgPayloadLen := binary.BigEndian.Uint32(data[0:4]) - // 检查长度 - packetLen := int(msgPayloadLen) + 4 - haveMorePacket := false - if len(data) > packetLen { - // 有不止一个包 - haveMorePacket = true - } else if len(data) < packetLen { - // 这一次没收够 放入缓冲区 - *dataBuf = append(*dataBuf, data...) - return - } - rawData := data[4 : 4+msgPayloadLen] - netMsg := m.parseNetMsg(rawData) - if netMsg != nil { - m.netMsgOutput <- netMsg - } - if haveMorePacket { - m.gateTcpMqRecvHandleLoop(data[packetLen:], dataBuf) } } diff --git a/gate/net/kcp_connect_manager.go b/gate/net/kcp_connect_manager.go index 36e695e3..9f4431a8 100644 --- a/gate/net/kcp_connect_manager.go +++ b/gate/net/kcp_connect_manager.go @@ -287,7 +287,6 @@ func (k *KcpConnectManager) recvHandle(session *Session) { conn := session.conn convId := conn.GetConv() recvBuf := make([]byte, PacketMaxLen) - dataBuf := make([]byte, 0, 1500) pktFreqLimitCounter := 0 pktFreqLimitTimer := time.Now().UnixNano() for { @@ -313,7 +312,7 @@ func (k *KcpConnectManager) recvHandle(session *Session) { } recvData := recvBuf[:recvLen] kcpMsgList := make([]*KcpMsg, 0) - DecodeBinToPayload(recvData, &dataBuf, convId, &kcpMsgList, session.xorKey) + DecodeBinToPayload(recvData, convId, &kcpMsgList, session.xorKey) for _, v := range kcpMsgList { protoMsgList := ProtoDecode(v, k.serverCmdProtoMap, k.clientCmdProtoMap) for _, vv := range protoMsgList { diff --git a/gate/net/kcp_endecode.go b/gate/net/kcp_endecode.go index 5c769c02..00a46ff7 100644 --- a/gate/net/kcp_endecode.go +++ b/gate/net/kcp_endecode.go @@ -32,21 +32,16 @@ type KcpMsg struct { ProtoData []byte } -func DecodeBinToPayload(data []byte, dataBuf *[]byte, convId uint64, kcpMsgList *[]*KcpMsg, xorKey []byte) { +func DecodeBinToPayload(data []byte, convId uint64, kcpMsgList *[]*KcpMsg, xorKey []byte) { // xor解密 endec.Xor(data, xorKey) - DecodeLoop(data, dataBuf, convId, kcpMsgList) + DecodeLoop(data, convId, kcpMsgList) } -func DecodeLoop(data []byte, dataBuf *[]byte, convId uint64, kcpMsgList *[]*KcpMsg) { - if len(*dataBuf) != 0 { - // 取出之前的缓冲区数据 - data = append(*dataBuf, data...) - *dataBuf = make([]byte, 0, 1500) - } +func DecodeLoop(data []byte, convId uint64, kcpMsgList *[]*KcpMsg) { // 长度太短 if len(data) < 12 { - logger.Debug("packet len less 12 byte") + logger.Error("packet len less than 12 byte") return } // 头部幻数错误 @@ -66,13 +61,8 @@ func DecodeLoop(data []byte, dataBuf *[]byte, convId uint64, kcpMsgList *[]*KcpM logger.Error("packet len too long") return } - haveMorePacket := false - if len(data) > packetLen { - // 有不止一个包 - haveMorePacket = true - } else if len(data) < packetLen { - // 这一次没收够 放入缓冲区 - *dataBuf = append(*dataBuf, data...) + if len(data) < packetLen { + logger.Error("packet len not enough") return } // 尾部幻数错误 @@ -91,9 +81,9 @@ func DecodeLoop(data []byte, dataBuf *[]byte, convId uint64, kcpMsgList *[]*KcpM kcpMsg.HeadData = headData kcpMsg.ProtoData = protoData *kcpMsgList = append(*kcpMsgList, kcpMsg) - // 递归解析 - if haveMorePacket { - DecodeLoop(data[packetLen:], dataBuf, convId, kcpMsgList) + // 有不止一个包 递归解析 + if len(data) > packetLen { + DecodeLoop(data[packetLen:], convId, kcpMsgList) } } diff --git a/gs/game/game_world_scene.go b/gs/game/game_world_scene.go index f3e3a88b..20259b05 100644 --- a/gs/game/game_world_scene.go +++ b/gs/game/game_world_scene.go @@ -333,6 +333,9 @@ func (s *Scene) RemoveGroupSuite(groupId uint32, suiteId uint8) { s.DestroyEntity(entity.id) } delete(group.suiteMap, suiteId) + if len(group.suiteMap) == 0 { + delete(s.groupMap, groupId) + } } type Group struct { diff --git a/gs/game/player_fight_sync.go b/gs/game/player_fight_sync.go index c1a33ed3..6d6e36a3 100644 --- a/gs/game/player_fight_sync.go +++ b/gs/game/player_fight_sync.go @@ -258,8 +258,25 @@ func (g *Game) SceneBlockAoiPlayerMove(player *model.Player, world *World, scene } // 旧有新没有的group即为卸载的 if !world.GetMultiplayer() { - // 处理多人世界不同玩家不同位置的group卸载情况 + // 单人世界直接卸载group g.RemoveSceneGroup(player, scene, groupConfig) + } else { + // 多人世界group附近没有任何玩家则卸载 + remove := true + for _, otherPlayer := range scene.GetAllPlayer() { + for otherPlayerGroupId := range g.GetNeighborGroup(otherPlayer.SceneId, otherPlayer.Pos) { + if otherPlayerGroupId == groupId { + remove = false + break + } + } + if !remove { + break + } + } + if remove { + g.RemoveSceneGroup(player, scene, groupConfig) + } } } for groupId, groupConfig := range newNeighborGroupMap { diff --git a/gs/game/player_scene.go b/gs/game/player_scene.go index f48f0560..246d25ba 100644 --- a/gs/game/player_scene.go +++ b/gs/game/player_scene.go @@ -56,10 +56,27 @@ func (g *Game) EnterSceneReadyReq(player *model.Player, payloadMsg pb.Message) { } g.RemoveSceneEntityNotifyToPlayer(player, proto.VisionType_VISION_MISS, delEntityIdList) // 卸载旧位置附近的group - for _, groupConfig := range g.GetNeighborGroup(ctx.OldSceneId, ctx.OldPos) { + for groupId, groupConfig := range g.GetNeighborGroup(ctx.OldSceneId, ctx.OldPos) { if !world.GetMultiplayer() { - // 处理多人世界不同玩家不同位置的group卸载情况 + // 单人世界直接卸载group g.RemoveSceneGroup(player, oldScene, groupConfig) + } else { + // 多人世界group附近没有任何玩家则卸载 + remove := true + for _, otherPlayer := range oldScene.GetAllPlayer() { + for otherPlayerGroupId := range g.GetNeighborGroup(otherPlayer.SceneId, otherPlayer.Pos) { + if otherPlayerGroupId == groupId { + remove = false + break + } + } + if !remove { + break + } + } + if remove { + g.RemoveSceneGroup(player, oldScene, groupConfig) + } } } } diff --git a/robot/net/session.go b/robot/net/session.go index cef02e4a..4b8d4716 100644 --- a/robot/net/session.go +++ b/robot/net/session.go @@ -69,7 +69,6 @@ func (s *Session) recvHandle() { conn := s.Conn convId := conn.GetConv() recvBuf := make([]byte, hk4egatenet.PacketMaxLen) - dataBuf := make([]byte, 0, 1500) for { _ = conn.SetReadDeadline(time.Now().Add(time.Second * hk4egatenet.ConnRecvTimeout)) recvLen, err := conn.Read(recvBuf) @@ -80,7 +79,7 @@ func (s *Session) recvHandle() { } recvData := recvBuf[:recvLen] kcpMsgList := make([]*hk4egatenet.KcpMsg, 0) - hk4egatenet.DecodeBinToPayload(recvData, &dataBuf, convId, &kcpMsgList, s.XorKey) + hk4egatenet.DecodeBinToPayload(recvData, convId, &kcpMsgList, s.XorKey) for _, v := range kcpMsgList { protoMsgList := hk4egatenet.ProtoDecode(v, s.ServerCmdProtoMap, s.ClientCmdProtoMap) for _, vv := range protoMsgList {