tcp socket协议分割解析优化

This commit is contained in:
flswld
2023-04-14 22:07:41 +08:00
parent ae5f2809bf
commit b6be98c58c
7 changed files with 89 additions and 72 deletions

View File

@@ -399,10 +399,13 @@ func (m *MessageQueue) gateTcpMqConn(gateServerConnAddrMap map[string]bool) {
} }
func (m *MessageQueue) gateTcpMqRecvHandle(inst *GateTcpMqInst) { func (m *MessageQueue) gateTcpMqRecvHandle(inst *GateTcpMqInst) {
dataBuf := make([]byte, 0, 1024) header := make([]byte, 4)
recvBuf := make([]byte, 1024*1024) payload := make([]byte, 1024)
for { for {
recvLen, err := inst.conn.Read(recvBuf) // 读取头部的消息长度
recvLen := 0
for recvLen < 4 {
n, err := inst.conn.Read(header[recvLen:])
if err != nil { if err != nil {
logger.Error("gate tcp mq recv error: %v", err) logger.Error("gate tcp mq recv error: %v", err)
m.gateTcpMqEventChan <- &GateTcpMqEvent{ m.gateTcpMqEventChan <- &GateTcpMqEvent{
@@ -412,41 +415,30 @@ func (m *MessageQueue) gateTcpMqRecvHandle(inst *GateTcpMqInst) {
_ = inst.conn.Close() _ = inst.conn.Close()
return return
} }
recvBuf = recvBuf[:recvLen] recvLen += n
m.gateTcpMqRecvHandleLoop(recvBuf, &dataBuf)
} }
msgLen := binary.BigEndian.Uint32(header)
// 读取消息体
if len(payload) < int(msgLen) {
payload = make([]byte, msgLen)
} }
recvLen = 0
func (m *MessageQueue) gateTcpMqRecvHandleLoop(data []byte, dataBuf *[]byte) { for recvLen < int(msgLen) {
if len(*dataBuf) != 0 { n, err := inst.conn.Read(payload[recvLen:msgLen])
// 取出之前的缓冲区数据 if err != nil {
data = append(*dataBuf, data...) logger.Error("gate tcp mq recv error: %v", err)
*dataBuf = make([]byte, 0, 1024) m.gateTcpMqEventChan <- &GateTcpMqEvent{
event: EventDisconnect,
inst: inst,
} }
// 长度太短 _ = inst.conn.Close()
if len(data) < 4 {
*dataBuf = append(*dataBuf, data...)
return return
} }
// 消息的载荷部分长度 recvLen += n
msgPayloadLen := binary.BigEndian.Uint32(data[0:4])
// 检查长度
packetLen := int(msgPayloadLen) + 4
haveMorePacket := false
if len(data) > packetLen {
// 有不止一个包
haveMorePacket = true
} else if len(data) < packetLen {
// 这一次没收够 放入缓冲区
*dataBuf = append(*dataBuf, data...)
return
} }
rawData := data[4 : 4+msgPayloadLen] netMsg := m.parseNetMsg(payload[:msgLen])
netMsg := m.parseNetMsg(rawData)
if netMsg != nil { if netMsg != nil {
m.netMsgOutput <- netMsg m.netMsgOutput <- netMsg
} }
if haveMorePacket {
m.gateTcpMqRecvHandleLoop(data[packetLen:], dataBuf)
} }
} }

View File

@@ -287,7 +287,6 @@ func (k *KcpConnectManager) recvHandle(session *Session) {
conn := session.conn conn := session.conn
convId := conn.GetConv() convId := conn.GetConv()
recvBuf := make([]byte, PacketMaxLen) recvBuf := make([]byte, PacketMaxLen)
dataBuf := make([]byte, 0, 1500)
pktFreqLimitCounter := 0 pktFreqLimitCounter := 0
pktFreqLimitTimer := time.Now().UnixNano() pktFreqLimitTimer := time.Now().UnixNano()
for { for {
@@ -313,7 +312,7 @@ func (k *KcpConnectManager) recvHandle(session *Session) {
} }
recvData := recvBuf[:recvLen] recvData := recvBuf[:recvLen]
kcpMsgList := make([]*KcpMsg, 0) kcpMsgList := make([]*KcpMsg, 0)
DecodeBinToPayload(recvData, &dataBuf, convId, &kcpMsgList, session.xorKey) DecodeBinToPayload(recvData, convId, &kcpMsgList, session.xorKey)
for _, v := range kcpMsgList { for _, v := range kcpMsgList {
protoMsgList := ProtoDecode(v, k.serverCmdProtoMap, k.clientCmdProtoMap) protoMsgList := ProtoDecode(v, k.serverCmdProtoMap, k.clientCmdProtoMap)
for _, vv := range protoMsgList { for _, vv := range protoMsgList {

View File

@@ -32,21 +32,16 @@ type KcpMsg struct {
ProtoData []byte ProtoData []byte
} }
func DecodeBinToPayload(data []byte, dataBuf *[]byte, convId uint64, kcpMsgList *[]*KcpMsg, xorKey []byte) { func DecodeBinToPayload(data []byte, convId uint64, kcpMsgList *[]*KcpMsg, xorKey []byte) {
// xor解密 // xor解密
endec.Xor(data, xorKey) endec.Xor(data, xorKey)
DecodeLoop(data, dataBuf, convId, kcpMsgList) DecodeLoop(data, convId, kcpMsgList)
} }
func DecodeLoop(data []byte, dataBuf *[]byte, convId uint64, kcpMsgList *[]*KcpMsg) { func DecodeLoop(data []byte, convId uint64, kcpMsgList *[]*KcpMsg) {
if len(*dataBuf) != 0 {
// 取出之前的缓冲区数据
data = append(*dataBuf, data...)
*dataBuf = make([]byte, 0, 1500)
}
// 长度太短 // 长度太短
if len(data) < 12 { if len(data) < 12 {
logger.Debug("packet len less 12 byte") logger.Error("packet len less than 12 byte")
return return
} }
// 头部幻数错误 // 头部幻数错误
@@ -66,13 +61,8 @@ func DecodeLoop(data []byte, dataBuf *[]byte, convId uint64, kcpMsgList *[]*KcpM
logger.Error("packet len too long") logger.Error("packet len too long")
return return
} }
haveMorePacket := false if len(data) < packetLen {
if len(data) > packetLen { logger.Error("packet len not enough")
// 有不止一个包
haveMorePacket = true
} else if len(data) < packetLen {
// 这一次没收够 放入缓冲区
*dataBuf = append(*dataBuf, data...)
return return
} }
// 尾部幻数错误 // 尾部幻数错误
@@ -91,9 +81,9 @@ func DecodeLoop(data []byte, dataBuf *[]byte, convId uint64, kcpMsgList *[]*KcpM
kcpMsg.HeadData = headData kcpMsg.HeadData = headData
kcpMsg.ProtoData = protoData kcpMsg.ProtoData = protoData
*kcpMsgList = append(*kcpMsgList, kcpMsg) *kcpMsgList = append(*kcpMsgList, kcpMsg)
// 递归解析 // 有不止一个包 递归解析
if haveMorePacket { if len(data) > packetLen {
DecodeLoop(data[packetLen:], dataBuf, convId, kcpMsgList) DecodeLoop(data[packetLen:], convId, kcpMsgList)
} }
} }

View File

@@ -333,6 +333,9 @@ func (s *Scene) RemoveGroupSuite(groupId uint32, suiteId uint8) {
s.DestroyEntity(entity.id) s.DestroyEntity(entity.id)
} }
delete(group.suiteMap, suiteId) delete(group.suiteMap, suiteId)
if len(group.suiteMap) == 0 {
delete(s.groupMap, groupId)
}
} }
type Group struct { type Group struct {

View File

@@ -258,8 +258,25 @@ func (g *Game) SceneBlockAoiPlayerMove(player *model.Player, world *World, scene
} }
// 旧有新没有的group即为卸载的 // 旧有新没有的group即为卸载的
if !world.GetMultiplayer() { if !world.GetMultiplayer() {
// 处理多人世界不同玩家不同位置的group卸载情况 // 单人世界直接卸载group
g.RemoveSceneGroup(player, scene, groupConfig) g.RemoveSceneGroup(player, scene, groupConfig)
} else {
// 多人世界group附近没有任何玩家则卸载
remove := true
for _, otherPlayer := range scene.GetAllPlayer() {
for otherPlayerGroupId := range g.GetNeighborGroup(otherPlayer.SceneId, otherPlayer.Pos) {
if otherPlayerGroupId == groupId {
remove = false
break
}
}
if !remove {
break
}
}
if remove {
g.RemoveSceneGroup(player, scene, groupConfig)
}
} }
} }
for groupId, groupConfig := range newNeighborGroupMap { for groupId, groupConfig := range newNeighborGroupMap {

View File

@@ -56,10 +56,27 @@ func (g *Game) EnterSceneReadyReq(player *model.Player, payloadMsg pb.Message) {
} }
g.RemoveSceneEntityNotifyToPlayer(player, proto.VisionType_VISION_MISS, delEntityIdList) g.RemoveSceneEntityNotifyToPlayer(player, proto.VisionType_VISION_MISS, delEntityIdList)
// 卸载旧位置附近的group // 卸载旧位置附近的group
for _, groupConfig := range g.GetNeighborGroup(ctx.OldSceneId, ctx.OldPos) { for groupId, groupConfig := range g.GetNeighborGroup(ctx.OldSceneId, ctx.OldPos) {
if !world.GetMultiplayer() { if !world.GetMultiplayer() {
// 处理多人世界不同玩家不同位置的group卸载情况 // 单人世界直接卸载group
g.RemoveSceneGroup(player, oldScene, groupConfig) g.RemoveSceneGroup(player, oldScene, groupConfig)
} else {
// 多人世界group附近没有任何玩家则卸载
remove := true
for _, otherPlayer := range oldScene.GetAllPlayer() {
for otherPlayerGroupId := range g.GetNeighborGroup(otherPlayer.SceneId, otherPlayer.Pos) {
if otherPlayerGroupId == groupId {
remove = false
break
}
}
if !remove {
break
}
}
if remove {
g.RemoveSceneGroup(player, oldScene, groupConfig)
}
} }
} }
} }

View File

@@ -69,7 +69,6 @@ func (s *Session) recvHandle() {
conn := s.Conn conn := s.Conn
convId := conn.GetConv() convId := conn.GetConv()
recvBuf := make([]byte, hk4egatenet.PacketMaxLen) recvBuf := make([]byte, hk4egatenet.PacketMaxLen)
dataBuf := make([]byte, 0, 1500)
for { for {
_ = conn.SetReadDeadline(time.Now().Add(time.Second * hk4egatenet.ConnRecvTimeout)) _ = conn.SetReadDeadline(time.Now().Add(time.Second * hk4egatenet.ConnRecvTimeout))
recvLen, err := conn.Read(recvBuf) recvLen, err := conn.Read(recvBuf)
@@ -80,7 +79,7 @@ func (s *Session) recvHandle() {
} }
recvData := recvBuf[:recvLen] recvData := recvBuf[:recvLen]
kcpMsgList := make([]*hk4egatenet.KcpMsg, 0) kcpMsgList := make([]*hk4egatenet.KcpMsg, 0)
hk4egatenet.DecodeBinToPayload(recvData, &dataBuf, convId, &kcpMsgList, s.XorKey) hk4egatenet.DecodeBinToPayload(recvData, convId, &kcpMsgList, s.XorKey)
for _, v := range kcpMsgList { for _, v := range kcpMsgList {
protoMsgList := hk4egatenet.ProtoDecode(v, s.ServerCmdProtoMap, s.ClientCmdProtoMap) protoMsgList := hk4egatenet.ProtoDecode(v, s.ServerCmdProtoMap, s.ClientCmdProtoMap)
for _, vv := range protoMsgList { for _, vv := range protoMsgList {