优化服务器之间的底层连接方式

This commit is contained in:
flswld
2023-01-08 22:11:14 +08:00
parent 99408a745f
commit 6779ed8f20
21 changed files with 742 additions and 216 deletions

View File

@@ -45,6 +45,8 @@ type Hk4e struct {
GachaHistoryServer string `toml:"gacha_history_server"`
ClientProtoProxyEnable bool `toml:"client_proto_proxy_enable"`
Version string `toml:"version"`
GateTcpMqAddr string `toml:"gate_tcp_mq_addr"`
GateTcpMqPort int32 `toml:"gate_tcp_mq_port"`
}
// MQ 消息队列

View File

@@ -1,7 +1,15 @@
package mq
import (
"context"
"encoding/binary"
"net"
"strconv"
"strings"
"hk4e/common/config"
"hk4e/common/rpc"
"hk4e/node/api"
"hk4e/pkg/logger"
"hk4e/protocol/cmd"
@@ -12,21 +20,24 @@ import (
// 用于服务器之间传输游戏协议
// 仅用于传递数据平面(client<--->server)和控制平面(server<--->server)的消息
// 目前是全部消息都走NATS 之后可以做优化服务器之间socket直连
// 服务器之间消息优先走tcp socket直连 tcp连接断开或不存在时降级回NATS
// 请不要用这个来搞RPC写一大堆异步回调!!!
// 要用RPC有专门的NATSRPC
type MessageQueue struct {
natsConn *nats.Conn
natsMsgChan chan *nats.Msg
netMsgInput chan *NetMsg
netMsgOutput chan *NetMsg
cmdProtoMap *cmd.CmdProtoMap
serverType string
appId string
natsConn *nats.Conn
natsMsgChan chan *nats.Msg
netMsgInput chan *NetMsg
netMsgOutput chan *NetMsg
cmdProtoMap *cmd.CmdProtoMap
serverType string
appId string
gateTcpMqChan chan []byte
gateTcpMqEventChan chan *GateTcpMqEvent
rpcClient *rpc.Client
}
func NewMessageQueue(serverType string, appId string) (r *MessageQueue) {
func NewMessageQueue(serverType string, appId string, rpcClient *rpc.Client) (r *MessageQueue) {
r = new(MessageQueue)
conn, err := nats.Connect(config.CONF.MQ.NatsUrl)
if err != nil {
@@ -52,6 +63,14 @@ func NewMessageQueue(serverType string, appId string) (r *MessageQueue) {
r.appId = appId
go r.recvHandler()
go r.sendHandler()
r.gateTcpMqChan = make(chan []byte, 1000)
r.gateTcpMqEventChan = make(chan *GateTcpMqEvent, 1000)
r.rpcClient = rpcClient
if serverType == api.GATE {
r.initGateTcpMqServer()
} else {
r.initGateTcpMqClient()
}
return r
}
@@ -65,10 +84,16 @@ func (m *MessageQueue) GetNetMsg() chan *NetMsg {
func (m *MessageQueue) recvHandler() {
for {
natsMsg := <-m.natsMsgChan
var rawData []byte = nil
select {
case natsMsg := <-m.natsMsgChan:
rawData = natsMsg.Data
case gateTcpMqMsg := <-m.gateTcpMqChan:
rawData = gateTcpMqMsg
}
// msgpack NetMsg
netMsg := new(NetMsg)
err := msgpack.Unmarshal(natsMsg.Data, netMsg)
err := msgpack.Unmarshal(rawData, netMsg)
if err != nil {
logger.Error("parse bin to net msg error: %v", err)
continue
@@ -76,6 +101,10 @@ func (m *MessageQueue) recvHandler() {
switch netMsg.MsgType {
case MsgTypeGame:
gameMsg := netMsg.GameMsg
if gameMsg == nil {
logger.Error("recv game msg is nil")
continue
}
if netMsg.EventId == NormalMsg {
// protobuf PayloadMessage
payloadMessage := m.cmdProtoMap.GetProtoObjByCmdId(gameMsg.CmdId)
@@ -96,33 +125,257 @@ func (m *MessageQueue) recvHandler() {
}
func (m *MessageQueue) sendHandler() {
// 网关tcp连接消息收发快速通道 key1:服务器类型 key2:服务器appid value:连接实例
gateTcpMqInstMap := map[string]map[string]*GateTcpMqInst{
api.GATE: make(map[string]*GateTcpMqInst),
api.GS: make(map[string]*GateTcpMqInst),
api.FIGHT: make(map[string]*GateTcpMqInst),
api.PATHFINDING: make(map[string]*GateTcpMqInst),
}
for {
netMsg := <-m.netMsgInput
switch netMsg.MsgType {
case MsgTypeGame:
gameMsg := netMsg.GameMsg
if gameMsg.PayloadMessageData == nil {
// protobuf PayloadMessage
payloadMessageData, err := pb.Marshal(gameMsg.PayloadMessage)
if err != nil {
logger.Error("parse payload msg to bin error: %v", err)
select {
case netMsg := <-m.netMsgInput:
switch netMsg.MsgType {
case MsgTypeGame:
gameMsg := netMsg.GameMsg
if gameMsg == nil {
logger.Error("send game msg is nil")
continue
}
gameMsg.PayloadMessageData = payloadMessageData
if gameMsg.PayloadMessageData == nil {
// protobuf PayloadMessage
payloadMessageData, err := pb.Marshal(gameMsg.PayloadMessage)
if err != nil {
logger.Error("parse payload msg to bin error: %v", err)
continue
}
gameMsg.PayloadMessageData = payloadMessageData
}
}
// msgpack NetMsg
netMsgData, err := msgpack.Marshal(netMsg)
if err != nil {
logger.Error("parse net msg to bin error: %v", err)
continue
}
fallbackNatsMqSend := func() {
// 找不到tcp快速通道就fallback回nats
natsMsg := nats.NewMsg(netMsg.Topic)
natsMsg.Data = netMsgData
err = m.natsConn.PublishMsg(natsMsg)
if err != nil {
logger.Error("nats publish msg error: %v", err)
return
}
}
// 有tcp快速通道就走快速通道
instMap, exist := gateTcpMqInstMap[netMsg.ServerType]
if !exist {
fallbackNatsMqSend()
continue
}
inst, exist := instMap[netMsg.AppId]
if !exist {
fallbackNatsMqSend()
continue
}
// 前4个字节为消息的载荷部分长度
netMsgDataTcp := make([]byte, 4+len(netMsgData))
binary.BigEndian.PutUint32(netMsgDataTcp, uint32(len(netMsgData)))
copy(netMsgDataTcp[4:], netMsgData)
_, err = inst.conn.Write(netMsgDataTcp)
if err != nil {
// 发送失败关闭连接fallback回nats
logger.Error("gate tcp mq send error: %v", err)
_ = inst.conn.Close()
m.gateTcpMqEventChan <- &GateTcpMqEvent{
event: EventDisconnect,
inst: inst,
}
fallbackNatsMqSend()
continue
}
case gateTcpMqEvent := <-m.gateTcpMqEventChan:
inst := gateTcpMqEvent.inst
switch gateTcpMqEvent.event {
case EventConnect:
logger.Warn("gate tcp mq connect, addr: %v, server type: %v, appid: %v", inst.conn.RemoteAddr().String(), inst.serverType, inst.appId)
gateTcpMqInstMap[inst.serverType][inst.appId] = inst
case EventDisconnect:
logger.Warn("gate tcp mq disconnect, addr: %v, server type: %v, appid: %v", inst.conn.RemoteAddr().String(), inst.serverType, inst.appId)
delete(gateTcpMqInstMap[inst.serverType], inst.appId)
}
}
// msgpack NetMsg
netMsgData, err := msgpack.Marshal(netMsg)
if err != nil {
logger.Error("parse net msg to bin error: %v", err)
continue
}
natsMsg := nats.NewMsg(netMsg.Topic)
natsMsg.Data = netMsgData
err = m.natsConn.PublishMsg(natsMsg)
if err != nil {
logger.Error("nats publish msg error: %v", err)
continue
}
}
}
type GateTcpMqInst struct {
conn net.Conn
serverType string
appId string
}
const (
EventConnect = iota
EventDisconnect
)
type GateTcpMqEvent struct {
event int
inst *GateTcpMqInst
}
func (m *MessageQueue) initGateTcpMqServer() {
addr, err := net.ResolveTCPAddr("tcp4", "0.0.0.0:"+strconv.Itoa(int(config.CONF.Hk4e.GateTcpMqPort)))
if err != nil {
logger.Error("gate tcp mq parse port error: %v", err)
return
}
listener, err := net.ListenTCP("tcp4", addr)
if err != nil {
logger.Error("gate tcp mq listen error: %v", err)
return
}
go m.gateTcpMqAcceptHandle(listener)
}
func (m *MessageQueue) gateTcpMqAcceptHandle(listener *net.TCPListener) {
for {
conn, err := listener.Accept()
if err != nil {
logger.Error("gate tcp mq accept error: %v", err)
return
}
logger.Info("server connect gate tcp mq, addr: %v", conn.RemoteAddr().String())
go m.gateTcpMqHandshake(conn)
}
}
func (m *MessageQueue) gateTcpMqHandshake(conn net.Conn) {
recvBuf := make([]byte, 1500)
recvLen, err := conn.Read(recvBuf)
if err != nil {
logger.Error("handshake packet recv error: %v", err)
return
}
recvBuf = recvBuf[:recvLen]
serverMetaData := string(recvBuf)
// 握手包格式 服务器类型@appid
split := strings.Split(serverMetaData, "@")
if len(split) != 2 {
logger.Error("handshake packet format error")
return
}
inst := &GateTcpMqInst{
conn: conn,
serverType: "",
appId: "",
}
switch split[0] {
case api.GATE:
inst.serverType = api.GATE
case api.GS:
inst.serverType = api.GS
case api.FIGHT:
inst.serverType = api.FIGHT
case api.PATHFINDING:
inst.serverType = api.PATHFINDING
default:
logger.Error("invalid server type")
return
}
if len(split[1]) != 8 {
logger.Error("invalid appid")
return
}
inst.appId = split[1]
go m.gateTcpMqRecvHandle(inst)
m.gateTcpMqEventChan <- &GateTcpMqEvent{
event: EventConnect,
inst: inst,
}
}
func (m *MessageQueue) initGateTcpMqClient() {
rsp, err := m.rpcClient.Discovery.GetAllGateServerInfoList(context.TODO(), new(api.NullMsg))
if err != nil {
logger.Error("gate tcp mq get gate list error: %v", err)
return
}
for _, gateServerInfo := range rsp.GateServerInfoList {
addr, err := net.ResolveTCPAddr("tcp4", gateServerInfo.MqAddr+":"+strconv.Itoa(int(gateServerInfo.MqPort)))
if err != nil {
logger.Error("gate tcp mq parse addr error: %v", err)
return
}
conn, err := net.DialTCP("tcp4", nil, addr)
if err != nil {
logger.Error("gate tcp mq conn error: %v", err)
return
}
_, err = conn.Write([]byte(m.serverType + "@" + m.appId))
if err != nil {
logger.Error("gate tcp mq handshake send error: %v", err)
return
}
inst := &GateTcpMqInst{
conn: conn,
serverType: api.GATE,
appId: gateServerInfo.AppId,
}
go m.gateTcpMqRecvHandle(inst)
m.gateTcpMqEventChan <- &GateTcpMqEvent{
event: EventConnect,
inst: inst,
}
}
}
func (m *MessageQueue) gateTcpMqRecvHandle(inst *GateTcpMqInst) {
dataBuf := make([]byte, 0, 1500)
for {
recvBuf := make([]byte, 1500)
recvLen, err := inst.conn.Read(recvBuf)
if err != nil {
logger.Error("gate tcp mq recv error: %v", err)
m.gateTcpMqEventChan <- &GateTcpMqEvent{
event: EventDisconnect,
inst: inst,
}
_ = inst.conn.Close()
return
}
recvBuf = recvBuf[:recvLen]
m.gateTcpMqRecvHandleLoop(recvBuf, &dataBuf)
}
}
func (m *MessageQueue) gateTcpMqRecvHandleLoop(data []byte, dataBuf *[]byte) {
if len(*dataBuf) != 0 {
// 取出之前的缓冲区数据
data = append(*dataBuf, data...)
*dataBuf = make([]byte, 0, 1500)
}
// 长度太短
if len(data) < 4 {
logger.Debug("packet len less 4 byte")
return
}
// 消息的载荷部分长度
msgPayloadLen := binary.BigEndian.Uint32(data[0:4])
// 检查长度
packetLen := int(msgPayloadLen) + 4
haveMorePacket := false
if len(data) > packetLen {
// 有不止一个包
haveMorePacket = true
} else if len(data) < packetLen {
// 这一次没收够 放入缓冲区
*dataBuf = append(*dataBuf, data...)
return
}
m.gateTcpMqChan <- data[4 : 4+msgPayloadLen]
if haveMorePacket {
m.gateTcpMqRecvHandleLoop(data[packetLen:], dataBuf)
}
}

View File

@@ -12,6 +12,8 @@ const (
type NetMsg struct {
MsgType uint8 `msgpack:"MsgType"`
EventId uint16 `msgpack:"EventId"`
ServerType string `msgpack:"-"`
AppId string `msgpack:"-"`
Topic string `msgpack:"-"`
GameMsg *GameMsg `msgpack:"GameMsg"`
FightMsg *FightMsg `msgpack:"FightMsg"`

View File

@@ -18,6 +18,8 @@ func (m *MessageQueue) getTopic(serverType string, appId string) string {
func (m *MessageQueue) SendToGate(appId string, netMsg *NetMsg) {
netMsg.Topic = m.getTopic(api.GATE, appId)
originServerType, originServerAppId := m.getOriginServer()
netMsg.ServerType = api.GATE
netMsg.AppId = appId
netMsg.OriginServerType = originServerType
netMsg.OriginServerAppId = originServerAppId
m.netMsgInput <- netMsg
@@ -26,6 +28,8 @@ func (m *MessageQueue) SendToGate(appId string, netMsg *NetMsg) {
func (m *MessageQueue) SendToGs(appId string, netMsg *NetMsg) {
netMsg.Topic = m.getTopic(api.GS, appId)
originServerType, originServerAppId := m.getOriginServer()
netMsg.ServerType = api.GS
netMsg.AppId = appId
netMsg.OriginServerType = originServerType
netMsg.OriginServerAppId = originServerAppId
m.netMsgInput <- netMsg
@@ -34,6 +38,8 @@ func (m *MessageQueue) SendToGs(appId string, netMsg *NetMsg) {
func (m *MessageQueue) SendToFight(appId string, netMsg *NetMsg) {
netMsg.Topic = m.getTopic(api.FIGHT, appId)
originServerType, originServerAppId := m.getOriginServer()
netMsg.ServerType = api.FIGHT
netMsg.AppId = appId
netMsg.OriginServerType = originServerType
netMsg.OriginServerAppId = originServerAppId
m.netMsgInput <- netMsg
@@ -42,6 +48,8 @@ func (m *MessageQueue) SendToFight(appId string, netMsg *NetMsg) {
func (m *MessageQueue) SendToPathfinding(appId string, netMsg *NetMsg) {
netMsg.Topic = m.getTopic(api.PATHFINDING, appId)
originServerType, originServerAppId := m.getOriginServer()
netMsg.ServerType = api.PATHFINDING
netMsg.AppId = appId
netMsg.OriginServerType = originServerType
netMsg.OriginServerAppId = originServerAppId
m.netMsgInput <- netMsg