性能优化

This commit is contained in:
flswld
2023-04-13 19:46:20 +08:00
parent 0440a6b1ed
commit e7c5723f98
10 changed files with 215 additions and 182 deletions

View File

@@ -33,7 +33,6 @@ type MessageQueue struct {
cmdProtoMap *cmd.CmdProtoMap
serverType string
appId string
gateTcpMqChan chan []byte
gateTcpMqEventChan chan *GateTcpMqEvent
gateTcpMqDeadEventChan chan string
rpcClient *rpc.Client
@@ -63,7 +62,6 @@ func NewMessageQueue(serverType string, appId string, rpcClient *rpc.Client) (r
r.cmdProtoMap = cmd.NewCmdProtoMap()
r.serverType = serverType
r.appId = appId
r.gateTcpMqChan = make(chan []byte, 1000)
r.gateTcpMqEventChan = make(chan *GateTcpMqEvent, 1000)
r.gateTcpMqDeadEventChan = make(chan string, 1000)
r.rpcClient = rpcClient
@@ -72,7 +70,7 @@ func NewMessageQueue(serverType string, appId string, rpcClient *rpc.Client) (r
} else if serverType == api.GS || serverType == api.ANTICHEAT || serverType == api.PATHFINDING {
go r.runGateTcpMqClient()
}
go r.recvHandler()
go r.natsMsgRecvHandler()
go r.sendHandler()
return r
}
@@ -92,52 +90,82 @@ func (m *MessageQueue) GetNetMsg() chan *NetMsg {
return m.netMsgOutput
}
func (m *MessageQueue) recvHandler() {
func (m *MessageQueue) natsMsgRecvHandler() {
for {
var rawData []byte = nil
select {
case natsMsg := <-m.natsMsgChan:
rawData = natsMsg.Data
case gateTcpMqMsg := <-m.gateTcpMqChan:
rawData = gateTcpMqMsg
}
// msgpack NetMsg
netMsg := new(NetMsg)
err := msgpack.Unmarshal(rawData, netMsg)
if err != nil {
logger.Error("parse bin to net msg error: %v", err)
natsMsg := <-m.natsMsgChan
rawData := natsMsg.Data
netMsg := m.parseNetMsg(rawData)
if netMsg == nil {
continue
}
// 忽略自己发出的广播消息
if netMsg.OriginServerType == m.serverType && netMsg.OriginServerAppId == m.appId {
continue
}
switch netMsg.MsgType {
case MsgTypeGame:
gameMsg := netMsg.GameMsg
if gameMsg == nil {
logger.Error("recv game msg is nil")
continue
}
if netMsg.EventId == NormalMsg {
// protobuf PayloadMessage
payloadMessage := m.cmdProtoMap.GetProtoObjCacheByCmdId(gameMsg.CmdId)
if payloadMessage == nil {
logger.Error("get protobuf obj by cmd id error: %v", err)
continue
}
err = pb.Unmarshal(gameMsg.PayloadMessageData, payloadMessage)
if err != nil {
logger.Error("parse bin to payload msg error: %v", err)
continue
}
gameMsg.PayloadMessage = payloadMessage
}
}
m.netMsgOutput <- netMsg
}
}
func (m *MessageQueue) buildNetMsg(netMsg *NetMsg) []byte {
switch netMsg.MsgType {
case MsgTypeGame:
gameMsg := netMsg.GameMsg
if gameMsg == nil {
logger.Error("send game msg is nil")
return nil
}
if gameMsg.PayloadMessageData == nil {
// protobuf PayloadMessage
payloadMessageData, err := pb.Marshal(gameMsg.PayloadMessage)
if err != nil {
logger.Error("parse payload msg to bin error: %v", err)
return nil
}
gameMsg.PayloadMessageData = payloadMessageData
}
}
// msgpack NetMsg
rawData, err := msgpack.Marshal(netMsg)
if err != nil {
logger.Error("parse net msg to bin error: %v", err)
return nil
}
return rawData
}
func (m *MessageQueue) parseNetMsg(rawData []byte) *NetMsg {
// msgpack NetMsg
netMsg := new(NetMsg)
err := msgpack.Unmarshal(rawData, netMsg)
if err != nil {
logger.Error("parse bin to net msg error: %v", err)
return nil
}
switch netMsg.MsgType {
case MsgTypeGame:
gameMsg := netMsg.GameMsg
if gameMsg == nil {
logger.Error("recv game msg is nil")
return nil
}
if netMsg.EventId == NormalMsg {
// protobuf PayloadMessage
payloadMessage := m.cmdProtoMap.GetProtoObjCacheByCmdId(gameMsg.CmdId)
if payloadMessage == nil {
logger.Error("get protobuf obj by cmd id error: %v", err)
return nil
}
err = pb.Unmarshal(gameMsg.PayloadMessageData, payloadMessage)
if err != nil {
logger.Error("parse bin to payload msg error: %v", err)
return nil
}
gameMsg.PayloadMessage = payloadMessage
}
}
return netMsg
}
func (m *MessageQueue) sendHandler() {
// 网关tcp连接消息收发快速通道 key1:服务器类型 key2:服务器appid value:连接实例
gateTcpMqInstMap := map[string]map[string]*GateTcpMqInst{
@@ -149,34 +177,15 @@ func (m *MessageQueue) sendHandler() {
for {
select {
case netMsg := <-m.netMsgInput:
switch netMsg.MsgType {
case MsgTypeGame:
gameMsg := netMsg.GameMsg
if gameMsg == nil {
logger.Error("send game msg is nil")
continue
}
if gameMsg.PayloadMessageData == nil {
// protobuf PayloadMessage
payloadMessageData, err := pb.Marshal(gameMsg.PayloadMessage)
if err != nil {
logger.Error("parse payload msg to bin error: %v", err)
continue
}
gameMsg.PayloadMessageData = payloadMessageData
}
}
// msgpack NetMsg
netMsgData, err := msgpack.Marshal(netMsg)
if err != nil {
logger.Error("parse net msg to bin error: %v", err)
rawData := m.buildNetMsg(netMsg)
if rawData == nil {
continue
}
fallbackNatsMqSend := func() {
// 找不到tcp快速通道就fallback回nats
natsMsg := nats.NewMsg(netMsg.Topic)
natsMsg.Data = netMsgData
err = m.natsConn.PublishMsg(natsMsg)
natsMsg.Data = rawData
err := m.natsConn.PublishMsg(natsMsg)
if err != nil {
logger.Error("nats publish msg error: %v", err)
return
@@ -199,25 +208,35 @@ func (m *MessageQueue) sendHandler() {
fallbackNatsMqSend()
continue
}
gateTcpMqSend := func(data []byte) bool {
err := inst.conn.SetWriteDeadline(time.Now().Add(time.Second))
if err != nil {
fallbackNatsMqSend()
return false
}
_, err = inst.conn.Write(data)
if err != nil {
// 发送失败关闭连接fallback回nats
logger.Error("gate tcp mq send error: %v", err)
_ = inst.conn.Close()
m.gateTcpMqEventChan <- &GateTcpMqEvent{
event: EventDisconnect,
inst: inst,
}
fallbackNatsMqSend()
return false
}
return true
}
// 前4个字节为消息的载荷部分长度
netMsgDataTcp := make([]byte, 4+len(netMsgData))
binary.BigEndian.PutUint32(netMsgDataTcp, uint32(len(netMsgData)))
copy(netMsgDataTcp[4:], netMsgData)
err = inst.conn.SetWriteDeadline(time.Now().Add(time.Second))
if err != nil {
fallbackNatsMqSend()
headLenData := make([]byte, 4)
binary.BigEndian.PutUint32(headLenData, uint32(len(rawData)))
ok := gateTcpMqSend(headLenData)
if !ok {
continue
}
_, err = inst.conn.Write(netMsgDataTcp)
if err != nil {
// 发送失败关闭连接fallback回nats
logger.Error("gate tcp mq send error: %v", err)
_ = inst.conn.Close()
m.gateTcpMqEventChan <- &GateTcpMqEvent{
event: EventDisconnect,
inst: inst,
}
fallbackNatsMqSend()
ok = gateTcpMqSend(rawData)
if !ok {
continue
}
case gateTcpMqEvent := <-m.gateTcpMqEventChan:
@@ -381,8 +400,8 @@ func (m *MessageQueue) gateTcpMqConn(gateServerConnAddrMap map[string]bool) {
func (m *MessageQueue) gateTcpMqRecvHandle(inst *GateTcpMqInst) {
dataBuf := make([]byte, 0, 1500)
recvBuf := make([]byte, 1024*1024)
for {
recvBuf := make([]byte, 1500)
recvLen, err := inst.conn.Read(recvBuf)
if err != nil {
logger.Error("gate tcp mq recv error: %v", err)
@@ -423,7 +442,11 @@ func (m *MessageQueue) gateTcpMqRecvHandleLoop(data []byte, dataBuf *[]byte) {
*dataBuf = append(*dataBuf, data...)
return
}
m.gateTcpMqChan <- data[4 : 4+msgPayloadLen]
rawData := data[4 : 4+msgPayloadLen]
netMsg := m.parseNetMsg(rawData)
if netMsg != nil {
m.netMsgOutput <- netMsg
}
if haveMorePacket {
m.gateTcpMqRecvHandleLoop(data[packetLen:], dataBuf)
}