From 0dc45708d69d98a92e8bc15ba650611b1c5bd0ad Mon Sep 17 00:00:00 2001 From: huangxiaolei <1782360262@qq.com> Date: Mon, 19 Dec 2022 22:11:55 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8B=86=E5=88=86=E6=88=98=E6=96=97=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/fight/application.toml | 8 + cmd/fight/main.go | 25 ++++ cmd/hk4e/fight.go | 23 +++ common/mq/nats.go | 115 ++++++++++++++ common/mq/net_msg.go | 50 +++++++ common/mq/topic.go | 31 ++++ fight/app/app.go | 50 +++++++ fight/engine/fight_engine.go | 258 ++++++++++++++++++++++++++++++++ gate/app/app.go | 12 +- gate/mq/mq.go | 98 ------------ gate/net/forward.go | 123 +++++++++------ gate/net/kcp_connect_manager.go | 41 ++--- gs/app/app.go | 11 +- gs/game/game_manager.go | 70 +++++++-- gs/game/route_manager.go | 31 ++-- gs/game/tick_manager.go | 40 ++--- gs/game/user_fight_sync.go | 2 +- gs/game/user_map.go | 2 +- gs/game/world_manager.go | 43 ++++++ gs/mq/mq.go | 86 ----------- protocol/cmd/net_msg.go | 23 --- 21 files changed, 793 insertions(+), 349 deletions(-) create mode 100644 cmd/fight/application.toml create mode 100644 cmd/fight/main.go create mode 100644 cmd/hk4e/fight.go create mode 100644 common/mq/nats.go create mode 100644 common/mq/net_msg.go create mode 100644 common/mq/topic.go create mode 100644 fight/app/app.go create mode 100644 fight/engine/fight_engine.go delete mode 100644 gate/mq/mq.go delete mode 100644 gs/mq/mq.go delete mode 100644 protocol/cmd/net_msg.go diff --git a/cmd/fight/application.toml b/cmd/fight/application.toml new file mode 100644 index 00000000..c4df3002 --- /dev/null +++ b/cmd/fight/application.toml @@ -0,0 +1,8 @@ +[logger] +level = "DEBUG" +mode = "BOTH" +track = true +max_size = 10485760 + +[mq] +nats_url = "nats://nats:4222" diff --git a/cmd/fight/main.go b/cmd/fight/main.go new file mode 100644 index 00000000..e3424363 --- /dev/null +++ b/cmd/fight/main.go @@ -0,0 +1,25 @@ +package main + +import ( + "context" + "flag" + "fmt" + _ "net/http/pprof" + "os" + + "hk4e/fight/app" +) + +var ( + config = flag.String("config", "application.toml", "config file") +) + +func main() { + flag.Parse() + // go statsviz_serve.Serve("0.0.0.0:2345") + err := app.Run(context.TODO(), *config) + if err != nil { + fmt.Println(err) + os.Exit(1) + } +} diff --git a/cmd/hk4e/fight.go b/cmd/hk4e/fight.go new file mode 100644 index 00000000..8c2198bb --- /dev/null +++ b/cmd/hk4e/fight.go @@ -0,0 +1,23 @@ +package main + +import ( + "context" + + "hk4e/fight/app" + + "github.com/spf13/cobra" +) + +// FightCmd 检查配表命令 +func FightCmd() *cobra.Command { + var cfg string + c := &cobra.Command{ + Use: "fight", + Short: "fight server", + RunE: func(cmd *cobra.Command, args []string) error { + return app.Run(context.Background(), cfg) + }, + } + c.Flags().StringVar(&cfg, "config", "application.toml", "config file") + return c +} diff --git a/common/mq/nats.go b/common/mq/nats.go new file mode 100644 index 00000000..696d5dc6 --- /dev/null +++ b/common/mq/nats.go @@ -0,0 +1,115 @@ +package mq + +import ( + "hk4e/common/config" + "hk4e/pkg/logger" + "hk4e/protocol/cmd" + + "github.com/nats-io/nats.go" + "github.com/vmihailenco/msgpack/v5" + pb "google.golang.org/protobuf/proto" +) + +type MessageQueue struct { + natsConn *nats.Conn + natsMsgChan chan *nats.Msg + netMsgInput chan *NetMsg + netMsgOutput chan *NetMsg + cmdProtoMap *cmd.CmdProtoMap +} + +func NewMessageQueue(serverType string, appId string) (r *MessageQueue) { + r = new(MessageQueue) + conn, err := nats.Connect(config.CONF.MQ.NatsUrl) + if err != nil { + logger.Error("connect nats error: %v", err) + return nil + } + r.natsConn = conn + r.natsMsgChan = make(chan *nats.Msg, 1000) + _, err = r.natsConn.ChanSubscribe(r.getTopic(serverType, appId), r.natsMsgChan) + if err != nil { + logger.Error("nats subscribe error: %v", err) + return nil + } + r.netMsgInput = make(chan *NetMsg, 1000) + r.netMsgOutput = make(chan *NetMsg, 1000) + r.cmdProtoMap = cmd.NewCmdProtoMap() + go r.recvHandler() + go r.sendHandler() + return r +} + +func (m *MessageQueue) Close() { + m.natsConn.Close() +} + +func (m *MessageQueue) GetNetMsg() chan *NetMsg { + return m.netMsgOutput +} + +func (m *MessageQueue) recvHandler() { + for { + natsMsg := <-m.natsMsgChan + // msgpack NetMsg + netMsg := new(NetMsg) + err := msgpack.Unmarshal(natsMsg.Data, netMsg) + if err != nil { + logger.Error("parse bin to net msg error: %v", err) + continue + } + switch netMsg.MsgType { + case MsgTypeGame: + gameMsg := netMsg.GameMsg + if netMsg.EventId == NormalMsg || netMsg.EventId == UserRegNotify { + // protobuf PayloadMessage + payloadMessage := m.cmdProtoMap.GetProtoObjByCmdId(gameMsg.CmdId) + if payloadMessage == nil { + logger.Error("get protobuf obj by cmd id error: %v", err) + continue + } + err = pb.Unmarshal(gameMsg.PayloadMessageData, payloadMessage) + if err != nil { + logger.Error("parse bin to payload msg error: %v", err) + continue + } + gameMsg.PayloadMessage = payloadMessage + } + case MsgTypeFight: + } + m.netMsgOutput <- netMsg + } +} + +func (m *MessageQueue) sendHandler() { + for { + netMsg := <-m.netMsgInput + switch netMsg.MsgType { + case MsgTypeGame: + gameMsg := netMsg.GameMsg + if gameMsg.PayloadMessageData == nil { + // protobuf PayloadMessage + payloadMessageData, err := pb.Marshal(gameMsg.PayloadMessage) + if err != nil { + logger.Error("parse payload msg to bin error: %v", err) + continue + } + gameMsg.PayloadMessageData = payloadMessageData + } + case MsgTypeFight: + } + // msgpack NetMsg + netMsgData, err := msgpack.Marshal(netMsg) + if err != nil { + logger.Error("parse net msg to bin error: %v", err) + continue + } + natsMsg := nats.NewMsg(netMsg.Topic) + natsMsg.Data = netMsgData + err = m.natsConn.PublishMsg(natsMsg) + if err != nil { + logger.Error("nats publish msg error: %v", err) + continue + } + } +} diff --git a/common/mq/net_msg.go b/common/mq/net_msg.go new file mode 100644 index 00000000..62789077 --- /dev/null +++ b/common/mq/net_msg.go @@ -0,0 +1,50 @@ +package mq + +import pb "google.golang.org/protobuf/proto" + +const ( + MsgTypeGame = iota + MsgTypeFight +) + +type NetMsg struct { + MsgType uint8 `msgpack:"MsgType"` + EventId uint16 `msgpack:"EventId"` + Topic string `msgpack:"-"` + GameMsg *GameMsg `msgpack:"GameMsg"` + FightMsg *FightMsg `msgpack:"FightMsg"` +} + +const ( + NormalMsg = iota + UserRegNotify + UserLoginNotify + UserOfflineNotify + ClientRttNotify + ClientTimeNotify +) + +type GameMsg struct { + UserId uint32 `msgpack:"UserId"` + CmdId uint16 `msgpack:"CmdId"` + ClientSeq uint32 `msgpack:"ClientSeq"` + ClientRtt uint32 `msgpack:"ClientRtt"` + ClientTime uint32 `msgpack:"ClientTime"` + PayloadMessage pb.Message `msgpack:"-"` + PayloadMessageData []byte `msgpack:"PayloadMessageData"` +} + +const ( + AddFightRoutine = iota + DelFightRoutine + FightRoutineAddEntity + FightRoutineDelEntity +) + +type FightMsg struct { + FightRoutineId uint32 `msgpack:"FightRoutineId"` + EntityId uint32 `msgpack:"EntityId"` + FightPropMap map[uint32]float32 `msgpack:"FightPropMap"` + Uid uint32 `msgpack:"Uid"` + AvatarGuid uint64 `msgpack:"AvatarGuid"` +} diff --git a/common/mq/topic.go b/common/mq/topic.go new file mode 100644 index 00000000..37e349d8 --- /dev/null +++ b/common/mq/topic.go @@ -0,0 +1,31 @@ +package mq + +import ( + "strings" +) + +const ( + GATE = "GATE_${APPID}_HK4E" + GS = "GS_${APPID}_HK4E" + FIGHT = "FIGHT_${APPID}_HK4E" +) + +func (m *MessageQueue) getTopic(serverType string, appId string) string { + topic := strings.ReplaceAll(serverType, "${APPID}", appId) + return topic +} + +func (m *MessageQueue) SendToGate(appId string, netMsg *NetMsg) { + netMsg.Topic = m.getTopic(GATE, appId) + m.netMsgInput <- netMsg +} + +func (m *MessageQueue) SendToGs(appId string, netMsg *NetMsg) { + netMsg.Topic = m.getTopic(GS, appId) + m.netMsgInput <- netMsg +} + +func (m *MessageQueue) SendToFight(appId string, netMsg *NetMsg) { + netMsg.Topic = m.getTopic(FIGHT, appId) + m.netMsgInput <- netMsg +} diff --git a/fight/app/app.go b/fight/app/app.go new file mode 100644 index 00000000..9c392ca3 --- /dev/null +++ b/fight/app/app.go @@ -0,0 +1,50 @@ +package app + +import ( + "context" + _ "net/http/pprof" + "os" + "os/signal" + "syscall" + "time" + + "hk4e/common/config" + "hk4e/common/mq" + "hk4e/fight/engine" + "hk4e/gs/constant" + "hk4e/pkg/logger" +) + +func Run(ctx context.Context, configFile string) error { + config.InitConfig(configFile) + + logger.InitLogger("fight") + logger.Warn("fight start") + + constant.InitConstant() + + messageQueue := mq.NewMessageQueue(mq.FIGHT, "1") + defer messageQueue.Close() + + _ = engine.NewFightEngine(messageQueue) + + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) + for { + select { + case <-ctx.Done(): + return nil + case s := <-c: + logger.Warn("get a signal %s", s.String()) + switch s { + case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT: + logger.Warn("fight exit") + time.Sleep(time.Second) + return nil + case syscall.SIGHUP: + default: + return nil + } + } + } +} diff --git a/fight/engine/fight_engine.go b/fight/engine/fight_engine.go new file mode 100644 index 00000000..4c0c7aa6 --- /dev/null +++ b/fight/engine/fight_engine.go @@ -0,0 +1,258 @@ +package engine + +import ( + "time" + + "hk4e/common/mq" + "hk4e/gs/constant" + "hk4e/pkg/logger" + "hk4e/protocol/cmd" + "hk4e/protocol/proto" + + pb "google.golang.org/protobuf/proto" +) + +type FightEngine struct { + messageQueue *mq.MessageQueue +} + +func NewFightEngine(messageQueue *mq.MessageQueue) (r *FightEngine) { + r = new(FightEngine) + r.messageQueue = messageQueue + go r.fightHandle() + return r +} + +func (f *FightEngine) fightHandle() { + fightRoutineMsgChanMap := make(map[uint32]chan *mq.NetMsg) + fightRoutineCloseChanMap := make(map[uint32]chan bool) + userIdFightRoutineIdMap := make(map[uint32]uint32) + for { + netMsg := <-f.messageQueue.GetNetMsg() + // logger.Debug("recv net msg, netMsg: %v", netMsg) + switch netMsg.MsgType { + case mq.MsgTypeGame: + gameMsg := netMsg.GameMsg + if netMsg.EventId != mq.NormalMsg { + continue + } + // logger.Debug("recv game msg, gameMsg: %v", gameMsg) + fightRoutineId := userIdFightRoutineIdMap[gameMsg.UserId] + fightRoutineMsgChan := fightRoutineMsgChanMap[fightRoutineId] + fightRoutineMsgChan <- netMsg + case mq.MsgTypeFight: + fightMsg := netMsg.FightMsg + logger.Debug("recv fight msg, fightMsg: %v", fightMsg) + switch netMsg.EventId { + case mq.AddFightRoutine: + fightRoutineMsgChan := make(chan *mq.NetMsg, 1000) + fightRoutineMsgChanMap[fightMsg.FightRoutineId] = fightRoutineMsgChan + fightRoutineCloseChan := make(chan bool, 1) + fightRoutineCloseChanMap[fightMsg.FightRoutineId] = fightRoutineCloseChan + go runFightRoutine(fightMsg.FightRoutineId, fightRoutineMsgChan, fightRoutineCloseChan, f.messageQueue) + case mq.DelFightRoutine: + fightRoutineCloseChan := fightRoutineCloseChanMap[fightMsg.FightRoutineId] + fightRoutineCloseChan <- true + case mq.FightRoutineAddEntity: + if fightMsg.Uid != 0 { + userIdFightRoutineIdMap[fightMsg.Uid] = fightMsg.FightRoutineId + } + fightRoutineMsgChan := fightRoutineMsgChanMap[fightMsg.FightRoutineId] + fightRoutineMsgChan <- netMsg + case mq.FightRoutineDelEntity: + if fightMsg.Uid != 0 { + delete(userIdFightRoutineIdMap, fightMsg.Uid) + } + fightRoutineMsgChan := fightRoutineMsgChanMap[fightMsg.FightRoutineId] + fightRoutineMsgChan <- netMsg + } + } + } +} + +// SendMsg 发送消息给客户端 +func SendMsg(messageQueue *mq.MessageQueue, cmdId uint16, userId uint32, payloadMsg pb.Message) { + if userId < 100000000 || payloadMsg == nil { + return + } + gameMsg := new(mq.GameMsg) + gameMsg.UserId = userId + gameMsg.CmdId = cmdId + gameMsg.ClientSeq = 0 + // 在这里直接序列化成二进制数据 防止发送的消息内包含各种游戏数据指针 而造成并发读写的问题 + payloadMessageData, err := pb.Marshal(payloadMsg) + if err != nil { + logger.Error("parse payload msg to bin error: %v", err) + return + } + gameMsg.PayloadMessageData = payloadMessageData + messageQueue.SendToGate("1", &mq.NetMsg{ + MsgType: mq.MsgTypeGame, + EventId: mq.NormalMsg, + GameMsg: gameMsg, + }) +} + +type Entity struct { + entityId uint32 + fightPropMap map[uint32]float32 + uid uint32 + avatarGuid uint64 +} + +// FightRoutine 战局例程 +type FightRoutine struct { + messageQueue *mq.MessageQueue + entityMap map[uint32]*Entity + combatInvokeEntryList []*proto.CombatInvokeEntry + tickCount uint64 +} + +func runFightRoutine(fightRoutineId uint32, fightRoutineMsgChan chan *mq.NetMsg, fightRoutineCloseChan chan bool, messageQueue *mq.MessageQueue) { + f := new(FightRoutine) + f.messageQueue = messageQueue + f.entityMap = make(map[uint32]*Entity) + f.combatInvokeEntryList = make([]*proto.CombatInvokeEntry, 0) + f.tickCount = 0 + logger.Debug("create fight routine, fightRoutineId: %v", fightRoutineId) + ticker := time.NewTicker(time.Millisecond * 10) + for { + select { + case netMsg := <-fightRoutineMsgChan: + switch netMsg.MsgType { + case mq.MsgTypeGame: + gameMsg := netMsg.GameMsg + f.attackHandle(gameMsg) + case mq.MsgTypeFight: + fightMsg := netMsg.FightMsg + switch netMsg.EventId { + case mq.FightRoutineAddEntity: + f.entityMap[fightMsg.EntityId] = &Entity{ + entityId: fightMsg.EntityId, + fightPropMap: fightMsg.FightPropMap, + uid: fightMsg.Uid, + avatarGuid: fightMsg.AvatarGuid, + } + case mq.FightRoutineDelEntity: + delete(f.entityMap, fightMsg.EntityId) + } + } + case <-ticker.C: + f.onTick() + case <-fightRoutineCloseChan: + logger.Debug("destroy fight routine, fightRoutineId: %v", fightRoutineId) + return + } + } +} + +func (f *FightRoutine) onTick() { + f.tickCount++ + now := time.Now().UnixMilli() + if f.tickCount%5 == 0 { + f.onTick50MilliSecond(now) + } + if f.tickCount%100 == 0 { + f.onTickSecond(now) + } +} + +func (f *FightRoutine) onTick50MilliSecond(now int64) { + if len(f.combatInvokeEntryList) > 0 { + combatInvocationsNotifyAll := new(proto.CombatInvocationsNotify) + combatInvocationsNotifyAll.InvokeList = f.combatInvokeEntryList + for _, uid := range f.getAllPlayer(f.entityMap) { + SendMsg(f.messageQueue, cmd.CombatInvocationsNotify, uid, combatInvocationsNotifyAll) + } + f.combatInvokeEntryList = make([]*proto.CombatInvokeEntry, 0) + } +} + +func (f *FightRoutine) onTickSecond(now int64) { + // 改面板 + for _, entity := range f.entityMap { + if entity.uid == 0 { + continue + } + entity.fightPropMap[uint32(constant.FightPropertyConst.FIGHT_PROP_CUR_ATTACK)] = 1000000 + entity.fightPropMap[uint32(constant.FightPropertyConst.FIGHT_PROP_CRITICAL)] = 1.0 + avatarFightPropNotify := &proto.AvatarFightPropNotify{ + AvatarGuid: entity.avatarGuid, + FightPropMap: entity.fightPropMap, + } + SendMsg(f.messageQueue, cmd.AvatarFightPropNotify, entity.uid, avatarFightPropNotify) + } +} + +func (f *FightRoutine) attackHandle(gameMsg *mq.GameMsg) { + _ = gameMsg.UserId + cmdId := gameMsg.CmdId + _ = gameMsg.ClientSeq + payloadMsg := gameMsg.PayloadMessage + + switch cmdId { + case cmd.CombatInvocationsNotify: + req := payloadMsg.(*proto.CombatInvocationsNotify) + for _, entry := range req.InvokeList { + if entry.ForwardType != proto.ForwardType_FORWARD_TYPE_TO_ALL { + continue + } + if entry.ArgumentType != proto.CombatTypeArgument_COMBAT_TYPE_ARGUMENT_EVT_BEING_HIT { + continue + } + hitInfo := new(proto.EvtBeingHitInfo) + err := pb.Unmarshal(entry.CombatData, hitInfo) + if err != nil { + logger.Error("parse combat invocations entity hit info error: %v", err) + continue + } + attackResult := hitInfo.AttackResult + // logger.Debug("run attack handler, attackResult: %v", attackResult) + target := f.entityMap[attackResult.DefenseId] + if target == nil { + logger.Error("could not found target, defense id: %v", attackResult.DefenseId) + continue + } + attackResult.Damage *= 100 + damage := attackResult.Damage + attackerId := attackResult.AttackerId + _ = attackerId + currHp := float32(0) + if target.fightPropMap != nil { + currHp = target.fightPropMap[uint32(constant.FightPropertyConst.FIGHT_PROP_CUR_HP)] + currHp -= damage + if currHp < 0 { + currHp = 0 + } + target.fightPropMap[uint32(constant.FightPropertyConst.FIGHT_PROP_CUR_HP)] = currHp + } + entityFightPropUpdateNotify := new(proto.EntityFightPropUpdateNotify) + entityFightPropUpdateNotify.EntityId = target.entityId + entityFightPropUpdateNotify.FightPropMap = make(map[uint32]float32) + entityFightPropUpdateNotify.FightPropMap[uint32(constant.FightPropertyConst.FIGHT_PROP_CUR_HP)] = currHp + for _, uid := range f.getAllPlayer(f.entityMap) { + SendMsg(f.messageQueue, cmd.EntityFightPropUpdateNotify, uid, entityFightPropUpdateNotify) + } + combatData, err := pb.Marshal(hitInfo) + if err != nil { + logger.Error("create combat invocations entity hit info error: %v", err) + } + entry.CombatData = combatData + f.combatInvokeEntryList = append(f.combatInvokeEntryList, entry) + } + } +} + +func (f *FightRoutine) getAllPlayer(entityMap map[uint32]*Entity) []uint32 { + uidMap := make(map[uint32]bool) + for _, entity := range entityMap { + if entity.uid != 0 { + uidMap[entity.uid] = true + } + } + uidList := make([]uint32, 0) + for uid := range uidMap { + uidList = append(uidList, uid) + } + return uidList +} diff --git a/gate/app/app.go b/gate/app/app.go index c8711c77..fbb818c8 100644 --- a/gate/app/app.go +++ b/gate/app/app.go @@ -9,10 +9,9 @@ import ( "time" "hk4e/common/config" - "hk4e/gate/mq" + "hk4e/common/mq" "hk4e/gate/net" "hk4e/pkg/logger" - "hk4e/protocol/cmd" ) func Run(ctx context.Context, configFile string) error { @@ -21,10 +20,9 @@ func Run(ctx context.Context, configFile string) error { logger.InitLogger("gate") logger.Warn("gate start") - netMsgInput := make(chan *cmd.NetMsg, 10000) - netMsgOutput := make(chan *cmd.NetMsg, 10000) + messageQueue := mq.NewMessageQueue(mq.GATE, "1") - connectManager := net.NewKcpConnectManager(netMsgInput, netMsgOutput) + connectManager := net.NewKcpConnectManager(messageQueue) connectManager.Start() go func() { @@ -34,10 +32,6 @@ func Run(ctx context.Context, configFile string) error { } }() - messageQueue := mq.NewMessageQueue(netMsgInput, netMsgOutput) - messageQueue.Start() - defer messageQueue.Close() - c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) for { diff --git a/gate/mq/mq.go b/gate/mq/mq.go deleted file mode 100644 index d8c6fc64..00000000 --- a/gate/mq/mq.go +++ /dev/null @@ -1,98 +0,0 @@ -package mq - -import ( - "hk4e/common/config" - "hk4e/pkg/logger" - "hk4e/protocol/cmd" - - "github.com/nats-io/nats.go" - "github.com/vmihailenco/msgpack/v5" - pb "google.golang.org/protobuf/proto" -) - -type MessageQueue struct { - natsConn *nats.Conn - natsMsgChan chan *nats.Msg - netMsgInput chan *cmd.NetMsg - netMsgOutput chan *cmd.NetMsg - cmdProtoMap *cmd.CmdProtoMap -} - -func NewMessageQueue(netMsgInput chan *cmd.NetMsg, netMsgOutput chan *cmd.NetMsg) (r *MessageQueue) { - r = new(MessageQueue) - conn, err := nats.Connect(config.CONF.MQ.NatsUrl) - if err != nil { - logger.Error("connect nats error: %v", err) - return nil - } - r.natsConn = conn - r.natsMsgChan = make(chan *nats.Msg, 10000) - _, err = r.natsConn.ChanSubscribe("GATE_CMD_HK4E", r.natsMsgChan) - if err != nil { - logger.Error("nats subscribe error: %v", err) - return nil - } - r.netMsgInput = netMsgInput - r.netMsgOutput = netMsgOutput - r.cmdProtoMap = cmd.NewCmdProtoMap() - return r -} - -func (m *MessageQueue) Start() { - go m.startRecvHandler() - go m.startSendHandler() -} - -func (m *MessageQueue) Close() { - m.natsConn.Close() -} - -func (m *MessageQueue) startRecvHandler() { - for { - natsMsg := <-m.natsMsgChan - // msgpack NetMsg - netMsg := new(cmd.NetMsg) - err := msgpack.Unmarshal(natsMsg.Data, netMsg) - if err != nil { - logger.Error("parse bin to net msg error: %v", err) - continue - } - if netMsg.EventId == cmd.NormalMsg { - // protobuf PayloadMessage - payloadMessage := m.cmdProtoMap.GetProtoObjByCmdId(netMsg.CmdId) - err = pb.Unmarshal(netMsg.PayloadMessageData, payloadMessage) - if err != nil { - logger.Error("parse bin to payload msg error: %v", err) - continue - } - netMsg.PayloadMessage = payloadMessage - } - m.netMsgOutput <- netMsg - } -} - -func (m *MessageQueue) startSendHandler() { - for { - netMsg := <-m.netMsgInput - // protobuf PayloadMessage - payloadMessageData, err := pb.Marshal(netMsg.PayloadMessage) - if err != nil { - logger.Error("parse payload msg to bin error: %v", err) - continue - } - netMsg.PayloadMessageData = payloadMessageData - // msgpack NetMsg - netMsgData, err := msgpack.Marshal(netMsg) - if err != nil { - logger.Error("parse net msg to bin error: %v", err) - continue - } - natsMsg := nats.NewMsg("GS_CMD_HK4E") - natsMsg.Data = netMsgData - err = m.natsConn.PublishMsg(natsMsg) - if err != nil { - logger.Error("nats publish msg error: %v", err) - continue - } - } -} diff --git a/gate/net/forward.go b/gate/net/forward.go index 7d1227b6..160addad 100644 --- a/gate/net/forward.go +++ b/gate/net/forward.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "hk4e/common/mq" "hk4e/dispatch/controller" "hk4e/gate/kcp" "hk4e/pkg/endec" @@ -73,24 +74,30 @@ func (k *KcpConnectManager) recvMsgHandle(protoMsg *ProtoMsg, session *Session) rsp.PayloadMessage = playerLoginRsp k.localMsgOutput <- rsp // 登录成功 通知GS初始化相关数据 - netMsg := new(cmd.NetMsg) - netMsg.UserId = userId - netMsg.EventId = cmd.UserLoginNotify - netMsg.ClientSeq = headMeta.seq - k.netMsgInput <- netMsg - logger.Info("send to gs user login ok, ConvId: %v, UserId: %v", protoMsg.ConvId, netMsg.UserId) + gameMsg := new(mq.GameMsg) + gameMsg.UserId = userId + gameMsg.ClientSeq = headMeta.seq + k.messageQueue.SendToGs("1", &mq.NetMsg{ + MsgType: mq.MsgTypeGame, + EventId: mq.UserLoginNotify, + GameMsg: gameMsg, + }) + logger.Info("send to gs user login ok, ConvId: %v, UserId: %v", protoMsg.ConvId, gameMsg.UserId) case cmd.SetPlayerBornDataReq: // 玩家注册请求 if connState != ConnAlive { return } - netMsg := new(cmd.NetMsg) - netMsg.UserId = userId - netMsg.EventId = cmd.UserRegNotify - netMsg.CmdId = cmd.SetPlayerBornDataReq - netMsg.ClientSeq = protoMsg.HeadMessage.ClientSequenceId - netMsg.PayloadMessage = protoMsg.PayloadMessage - k.netMsgInput <- netMsg + gameMsg := new(mq.GameMsg) + gameMsg.UserId = userId + gameMsg.CmdId = cmd.SetPlayerBornDataReq + gameMsg.ClientSeq = protoMsg.HeadMessage.ClientSequenceId + gameMsg.PayloadMessage = protoMsg.PayloadMessage + k.messageQueue.SendToGs("1", &mq.NetMsg{ + MsgType: mq.MsgTypeGame, + EventId: mq.UserRegNotify, + GameMsg: gameMsg, + }) case cmd.PlayerForceExitReq: // 玩家退出游戏请求 if connState != ConnAlive { @@ -119,34 +126,55 @@ func (k *KcpConnectManager) recvMsgHandle(protoMsg *ProtoMsg, session *Session) rsp.PayloadMessage = pingRsp k.localMsgOutput <- rsp // 通知GS玩家客户端的本地时钟 - netMsg := new(cmd.NetMsg) - netMsg.UserId = userId - netMsg.EventId = cmd.ClientTimeNotify - netMsg.ClientTime = pingReq.ClientTime - k.netMsgInput <- netMsg + gameMsg := new(mq.GameMsg) + gameMsg.UserId = userId + gameMsg.ClientTime = pingReq.ClientTime + k.messageQueue.SendToGs("1", &mq.NetMsg{ + MsgType: mq.MsgTypeGame, + EventId: mq.ClientTimeNotify, + GameMsg: gameMsg, + }) // RTT logger.Debug("convId: %v, RTO: %v, SRTT: %v, RTTVar: %v", protoMsg.ConvId, session.conn.GetRTO(), session.conn.GetSRTT(), session.conn.GetSRTTVar()) - // 客户端往返时延通知 rtt := session.conn.GetSRTT() // 通知GS玩家客户端往返时延 - netMsg = new(cmd.NetMsg) - netMsg.UserId = userId - netMsg.EventId = cmd.ClientRttNotify - netMsg.ClientRtt = uint32(rtt) - k.netMsgInput <- netMsg + gameMsg = new(mq.GameMsg) + gameMsg.UserId = userId + gameMsg.ClientRtt = uint32(rtt) + k.messageQueue.SendToGs("1", &mq.NetMsg{ + MsgType: mq.MsgTypeGame, + EventId: mq.ClientRttNotify, + GameMsg: gameMsg, + }) default: - // 转发到GS - // 未登录禁止访问GS + // 未登录禁止访问 if connState != ConnAlive { return } - netMsg := new(cmd.NetMsg) - netMsg.UserId = userId - netMsg.EventId = cmd.NormalMsg - netMsg.CmdId = protoMsg.CmdId - netMsg.ClientSeq = protoMsg.HeadMessage.ClientSequenceId - netMsg.PayloadMessage = protoMsg.PayloadMessage - k.netMsgInput <- netMsg + // 转发到FIGHT + if protoMsg.CmdId == cmd.CombatInvocationsNotify { + gameMsg := new(mq.GameMsg) + gameMsg.UserId = userId + gameMsg.CmdId = protoMsg.CmdId + gameMsg.ClientSeq = protoMsg.HeadMessage.ClientSequenceId + gameMsg.PayloadMessage = protoMsg.PayloadMessage + k.messageQueue.SendToFight("1", &mq.NetMsg{ + MsgType: mq.MsgTypeGame, + EventId: mq.NormalMsg, + GameMsg: gameMsg, + }) + } + // 转发到GS + gameMsg := new(mq.GameMsg) + gameMsg.UserId = userId + gameMsg.CmdId = protoMsg.CmdId + gameMsg.ClientSeq = protoMsg.HeadMessage.ClientSequenceId + gameMsg.PayloadMessage = protoMsg.PayloadMessage + k.messageQueue.SendToGs("1", &mq.NetMsg{ + MsgType: mq.MsgTypeGame, + EventId: mq.NormalMsg, + GameMsg: gameMsg, + }) } } @@ -179,22 +207,27 @@ func (k *KcpConnectManager) sendMsgHandle() { close(session.kcpRawSendChan) case protoMsg := <-k.localMsgOutput: sendToClientFn(protoMsg) - case netMsg := <-k.netMsgOutput: - convId, exist := userIdConvMap[netMsg.UserId] + case netMsg := <-k.messageQueue.GetNetMsg(): + if netMsg.MsgType != mq.MsgTypeGame { + logger.Error("recv unknown msg type from game server, msg type: %v", netMsg.MsgType) + continue + } + if netMsg.EventId != mq.NormalMsg { + logger.Error("recv unknown event from game server, event id: %v", netMsg.EventId) + continue + } + gameMsg := netMsg.GameMsg + convId, exist := userIdConvMap[gameMsg.UserId] if !exist { logger.Error("can not find convId by userId") continue } - if netMsg.EventId == cmd.NormalMsg { - protoMsg := new(ProtoMsg) - protoMsg.ConvId = convId - protoMsg.CmdId = netMsg.CmdId - protoMsg.HeadMessage = k.getHeadMsg(netMsg.ClientSeq) - protoMsg.PayloadMessage = netMsg.PayloadMessage - sendToClientFn(protoMsg) - } else { - logger.Error("recv unknown event from game server, event id: %v", netMsg.EventId) - } + protoMsg := new(ProtoMsg) + protoMsg.ConvId = convId + protoMsg.CmdId = gameMsg.CmdId + protoMsg.HeadMessage = k.getHeadMsg(gameMsg.ClientSeq) + protoMsg.PayloadMessage = gameMsg.PayloadMessage + sendToClientFn(protoMsg) } } } diff --git a/gate/net/kcp_connect_manager.go b/gate/net/kcp_connect_manager.go index dc963cfa..d793b8df 100644 --- a/gate/net/kcp_connect_manager.go +++ b/gate/net/kcp_connect_manager.go @@ -8,6 +8,7 @@ import ( "time" "hk4e/common/config" + "hk4e/common/mq" "hk4e/common/region" "hk4e/dispatch/controller" "hk4e/gate/kcp" @@ -18,6 +19,8 @@ import ( "hk4e/protocol/proto" ) +const PacketFreqLimit = 1000 + type KcpConnectManager struct { openState bool sessionConvIdMap map[uint64]*Session @@ -26,8 +29,7 @@ type KcpConnectManager struct { kcpEventInput chan *KcpEvent kcpEventOutput chan *KcpEvent cmdProtoMap *cmd.CmdProtoMap - netMsgInput chan *cmd.NetMsg - netMsgOutput chan *cmd.NetMsg + messageQueue *mq.MessageQueue localMsgOutput chan *ProtoMsg createSessionChan chan *Session destroySessionChan chan *Session @@ -38,7 +40,7 @@ type KcpConnectManager struct { encRsaKeyMap map[string][]byte } -func NewKcpConnectManager(netMsgInput chan *cmd.NetMsg, netMsgOutput chan *cmd.NetMsg) (r *KcpConnectManager) { +func NewKcpConnectManager(messageQueue *mq.MessageQueue) (r *KcpConnectManager) { r = new(KcpConnectManager) r.openState = true r.sessionConvIdMap = make(map[uint64]*Session) @@ -46,8 +48,7 @@ func NewKcpConnectManager(netMsgInput chan *cmd.NetMsg, netMsgOutput chan *cmd.N r.kcpEventInput = make(chan *KcpEvent, 1000) r.kcpEventOutput = make(chan *KcpEvent, 1000) r.cmdProtoMap = cmd.NewCmdProtoMap() - r.netMsgInput = netMsgInput - r.netMsgOutput = netMsgOutput + r.messageQueue = messageQueue r.localMsgOutput = make(chan *ProtoMsg, 1000) r.createSessionChan = make(chan *Session, 1000) r.destroySessionChan = make(chan *Session, 1000) @@ -245,7 +246,7 @@ func (k *KcpConnectManager) recvHandle(session *Session) { pktFreqLimitCounter++ now := time.Now().UnixNano() if now-pktFreqLimitTimer > int64(time.Second) { - if pktFreqLimitCounter > 100 { + if pktFreqLimitCounter > PacketFreqLimit { logger.Error("exit recv loop, client packet send freq too high, convId: %v, pps: %v", convId, pktFreqLimitCounter) k.closeKcpConn(session, kcp.EnetPacketFreqTooHigh) break @@ -271,8 +272,6 @@ func (k *KcpConnectManager) sendHandle(session *Session) { // 发送 conn := session.conn convId := conn.GetConv() - pktFreqLimitCounter := 0 - pktFreqLimitTimer := time.Now().UnixNano() for { protoMsg, ok := <-session.kcpRawSendChan if !ok { @@ -293,19 +292,6 @@ func (k *KcpConnectManager) sendHandle(session *Session) { k.closeKcpConn(session, kcp.EnetServerKick) break } - // 发包频率限制 - pktFreqLimitCounter++ - now := time.Now().UnixNano() - if now-pktFreqLimitTimer > int64(time.Second) { - if pktFreqLimitCounter > 100 { - logger.Error("exit send loop, server packet send freq too high, convId: %v, pps: %v", convId, pktFreqLimitCounter) - k.closeKcpConn(session, kcp.EnetPacketFreqTooHigh) - break - } else { - pktFreqLimitCounter = 0 - } - pktFreqLimitTimer = now - } } } @@ -332,11 +318,14 @@ func (k *KcpConnectManager) closeKcpConn(session *Session, enetType uint32) { EventId: KcpConnCloseNotify, } // 通知GS玩家下线 - netMsg := new(cmd.NetMsg) - netMsg.UserId = session.userId - netMsg.EventId = cmd.UserOfflineNotify - k.netMsgInput <- netMsg - logger.Info("send to gs user offline, ConvId: %v, UserId: %v", convId, netMsg.UserId) + gameMsg := new(mq.GameMsg) + gameMsg.UserId = session.userId + k.messageQueue.SendToGs("1", &mq.NetMsg{ + MsgType: mq.MsgTypeGame, + EventId: mq.UserOfflineNotify, + GameMsg: gameMsg, + }) + logger.Info("send to gs user offline, ConvId: %v, UserId: %v", convId, gameMsg.UserId) k.destroySessionChan <- session } diff --git a/gs/app/app.go b/gs/app/app.go index 180c1447..db26750a 100644 --- a/gs/app/app.go +++ b/gs/app/app.go @@ -9,15 +9,14 @@ import ( "time" "hk4e/common/config" + "hk4e/common/mq" "hk4e/gdconf" gdc "hk4e/gs/config" "hk4e/gs/constant" "hk4e/gs/dao" "hk4e/gs/game" - "hk4e/gs/mq" "hk4e/gs/service" "hk4e/pkg/logger" - "hk4e/protocol/cmd" "github.com/nats-io/nats.go" ) @@ -46,14 +45,10 @@ func Run(ctx context.Context, configFile string) error { } defer db.CloseDao() - netMsgInput := make(chan *cmd.NetMsg, 10000) - netMsgOutput := make(chan *cmd.NetMsg, 10000) - - messageQueue := mq.NewMessageQueue(conn, netMsgInput, netMsgOutput) - messageQueue.Start() + messageQueue := mq.NewMessageQueue(mq.GS, "1") defer messageQueue.Close() - gameManager := game.NewGameManager(db, netMsgInput, netMsgOutput) + gameManager := game.NewGameManager(db, messageQueue) gameManager.Start() defer gameManager.Stop() diff --git a/gs/game/game_manager.go b/gs/game/game_manager.go index c6ecb8e2..429c0e53 100644 --- a/gs/game/game_manager.go +++ b/gs/game/game_manager.go @@ -3,6 +3,7 @@ package game import ( "time" + "hk4e/common/mq" "hk4e/gs/dao" "hk4e/gs/model" "hk4e/pkg/alg" @@ -24,16 +25,14 @@ var COMMAND_MANAGER *CommandManager = nil type GameManager struct { dao *dao.Dao - netMsgInput chan *cmd.NetMsg - netMsgOutput chan *cmd.NetMsg + messageQueue *mq.MessageQueue snowflake *alg.SnowflakeWorker } -func NewGameManager(dao *dao.Dao, netMsgInput chan *cmd.NetMsg, netMsgOutput chan *cmd.NetMsg) (r *GameManager) { +func NewGameManager(dao *dao.Dao, messageQueue *mq.MessageQueue) (r *GameManager) { r = new(GameManager) r.dao = dao - r.netMsgInput = netMsgInput - r.netMsgOutput = netMsgOutput + r.messageQueue = messageQueue r.snowflake = alg.NewSnowflakeWorker(1) GAME_MANAGER = r LOCAL_EVENT_MANAGER = NewLocalEventManager() @@ -49,20 +48,62 @@ func (g *GameManager) Start() { ROUTE_MANAGER.InitRoute() USER_MANAGER.StartAutoSaveUser() go func() { + intervalTime := time.Second.Nanoseconds() * 60 + lastTime := time.Now().UnixNano() + routeCost := int64(0) + tickCost := int64(0) + localEventCost := int64(0) + commandCost := int64(0) for { + now := time.Now().UnixNano() + if now-lastTime > intervalTime { + routeCost /= 1e6 + tickCost /= 1e6 + localEventCost /= 1e6 + commandCost /= 1e6 + logger.Info("[GAME MAIN LOOP] cpu time cost detail, routeCost: %vms, tickCost: %vms, localEventCost: %vms, commandCost: %vms", + routeCost, tickCost, localEventCost, commandCost) + totalCost := routeCost + tickCost + localEventCost + commandCost + logger.Info("[GAME MAIN LOOP] cpu time cost percent, routeCost: %v%%, tickCost: %v%%, localEventCost: %v%%, commandCost: %v%%", + float32(routeCost)/float32(totalCost)*100.0, + float32(tickCost)/float32(totalCost)*100.0, + float32(localEventCost)/float32(totalCost)*100.0, + float32(commandCost)/float32(totalCost)*100.0) + logger.Info("[GAME MAIN LOOP] total cpu time cost detail, totalCost: %vms", + totalCost) + logger.Info("[GAME MAIN LOOP] total cpu time cost percent, totalCost: %v%%", + float32(totalCost)/float32(intervalTime/1e6)*100.0) + lastTime = now + routeCost = 0 + tickCost = 0 + localEventCost = 0 + commandCost = 0 + } select { - case netMsg := <-g.netMsgOutput: + case netMsg := <-g.messageQueue.GetNetMsg(): // 接收客户端消息 + start := time.Now().UnixNano() ROUTE_MANAGER.RouteHandle(netMsg) + end := time.Now().UnixNano() + routeCost += end - start case <-TICK_MANAGER.ticker.C: // 游戏服务器定时帧 + start := time.Now().UnixNano() TICK_MANAGER.OnGameServerTick() + end := time.Now().UnixNano() + tickCost += end - start case localEvent := <-LOCAL_EVENT_MANAGER.localEventChan: // 处理本地事件 + start := time.Now().UnixNano() LOCAL_EVENT_MANAGER.LocalEventHandle(localEvent) + end := time.Now().UnixNano() + localEventCost += end - start case command := <-COMMAND_MANAGER.commandTextInput: // 处理传入的命令 (普通玩家 GM命令) + start := time.Now().UnixNano() COMMAND_MANAGER.HandleCommand(command) + end := time.Now().UnixNano() + commandCost += end - start } } }() @@ -82,19 +123,22 @@ func (g *GameManager) SendMsg(cmdId uint16, userId uint32, clientSeq uint32, pay if userId < 100000000 || payloadMsg == nil { return } - netMsg := new(cmd.NetMsg) - netMsg.UserId = userId - netMsg.EventId = cmd.NormalMsg - netMsg.CmdId = cmdId - netMsg.ClientSeq = clientSeq + gameMsg := new(mq.GameMsg) + gameMsg.UserId = userId + gameMsg.CmdId = cmdId + gameMsg.ClientSeq = clientSeq // 在这里直接序列化成二进制数据 防止发送的消息内包含各种游戏数据指针 而造成并发读写的问题 payloadMessageData, err := pb.Marshal(payloadMsg) if err != nil { logger.Error("parse payload msg to bin error: %v", err) return } - netMsg.PayloadMessageData = payloadMessageData - g.netMsgInput <- netMsg + gameMsg.PayloadMessageData = payloadMessageData + g.messageQueue.SendToGate("1", &mq.NetMsg{ + MsgType: mq.MsgTypeGame, + EventId: mq.NormalMsg, + GameMsg: gameMsg, + }) } // CommonRetError 通用返回错误码 diff --git a/gs/game/route_manager.go b/gs/game/route_manager.go index cdb5c770..ba8ef0ac 100644 --- a/gs/game/route_manager.go +++ b/gs/game/route_manager.go @@ -1,6 +1,7 @@ package game import ( + "hk4e/common/mq" "hk4e/gs/model" "hk4e/pkg/logger" "hk4e/protocol/cmd" @@ -116,19 +117,23 @@ func (r *RouteManager) InitRoute() { r.registerRouter(cmd.VehicleInteractReq, GAME_MANAGER.VehicleInteractReq) } -func (r *RouteManager) RouteHandle(netMsg *cmd.NetMsg) { +func (r *RouteManager) RouteHandle(netMsg *mq.NetMsg) { + if netMsg.MsgType != mq.MsgTypeGame { + return + } + gameMsg := netMsg.GameMsg switch netMsg.EventId { - case cmd.NormalMsg: - r.doRoute(netMsg.CmdId, netMsg.UserId, netMsg.ClientSeq, netMsg.PayloadMessage) - case cmd.UserRegNotify: - GAME_MANAGER.OnReg(netMsg.UserId, netMsg.ClientSeq, netMsg.PayloadMessage) - case cmd.UserLoginNotify: - GAME_MANAGER.OnLogin(netMsg.UserId, netMsg.ClientSeq) - case cmd.UserOfflineNotify: - GAME_MANAGER.OnUserOffline(netMsg.UserId) - case cmd.ClientRttNotify: - GAME_MANAGER.ClientRttNotify(netMsg.UserId, netMsg.ClientRtt) - case cmd.ClientTimeNotify: - GAME_MANAGER.ClientTimeNotify(netMsg.UserId, netMsg.ClientTime) + case mq.NormalMsg: + r.doRoute(gameMsg.CmdId, gameMsg.UserId, gameMsg.ClientSeq, gameMsg.PayloadMessage) + case mq.UserRegNotify: + GAME_MANAGER.OnReg(gameMsg.UserId, gameMsg.ClientSeq, gameMsg.PayloadMessage) + case mq.UserLoginNotify: + GAME_MANAGER.OnLogin(gameMsg.UserId, gameMsg.ClientSeq) + case mq.UserOfflineNotify: + GAME_MANAGER.OnUserOffline(gameMsg.UserId) + case mq.ClientRttNotify: + GAME_MANAGER.ClientRttNotify(gameMsg.UserId, gameMsg.ClientRtt) + case mq.ClientTimeNotify: + GAME_MANAGER.ClientTimeNotify(gameMsg.UserId, gameMsg.ClientTime) } } diff --git a/gs/game/tick_manager.go b/gs/game/tick_manager.go index 3c9a93ff..e79bf63e 100644 --- a/gs/game/tick_manager.go +++ b/gs/game/tick_manager.go @@ -132,32 +132,6 @@ func (t *TickManager) onTick10Second(now int64) { GAME_MANAGER.SendMsg(cmd.PlayerTimeNotify, player.PlayerID, 0, playerTimeNotify) } } - if !world.IsBigWorld() && (world.multiplayer || !world.owner.Pause) { - // 刷怪 - scene := world.GetSceneById(3) - monsterEntityCount := 0 - for _, entity := range scene.entityMap { - if entity.entityType == uint32(proto.ProtEntityType_PROT_ENTITY_TYPE_MONSTER) { - monsterEntityCount++ - } - } - if monsterEntityCount < 30 { - monsterEntityId := t.createMonster(scene) - bigWorldOwner := USER_MANAGER.GetOnlineUser(1) - GAME_MANAGER.AddSceneEntityNotify(bigWorldOwner, proto.VisionType_VISION_TYPE_BORN, []uint32{monsterEntityId}, true, false) - } - } - for _, player := range world.playerMap { - if world.multiplayer || !world.owner.Pause { - // 改面板 - for _, worldAvatar := range world.GetPlayerWorldAvatarList(player) { - avatar := player.AvatarMap[worldAvatar.avatarId] - avatar.FightPropMap[uint32(constant.FightPropertyConst.FIGHT_PROP_CUR_ATTACK)] = 1000000 - avatar.FightPropMap[uint32(constant.FightPropertyConst.FIGHT_PROP_CRITICAL)] = 1.0 - GAME_MANAGER.UpdateUserAvatarFightProp(player.PlayerID, worldAvatar.avatarId) - } - } - } } } @@ -236,6 +210,20 @@ func (t *TickManager) onTickSecond(now int64) { } GAME_MANAGER.SendMsg(cmd.WorldPlayerRTTNotify, player.PlayerID, 0, worldPlayerRTTNotify) } + if !world.IsBigWorld() && world.owner.SceneLoadState == model.SceneEnterDone { + // 刷怪 + scene := world.GetSceneById(3) + monsterEntityCount := 0 + for _, entity := range scene.entityMap { + if entity.entityType == uint32(proto.ProtEntityType_PROT_ENTITY_TYPE_MONSTER) { + monsterEntityCount++ + } + } + if monsterEntityCount < 30 { + monsterEntityId := t.createMonster(scene) + GAME_MANAGER.AddSceneEntityNotify(world.owner, proto.VisionType_VISION_TYPE_BORN, []uint32{monsterEntityId}, true, false) + } + } } } diff --git a/gs/game/user_fight_sync.go b/gs/game/user_fight_sync.go index 54a5a0a5..4b8ef5e8 100644 --- a/gs/game/user_fight_sync.go +++ b/gs/game/user_fight_sync.go @@ -94,7 +94,7 @@ func (g *GameManager) CombatInvocationsNotify(player *model.Player, payloadMsg p for _, entry := range req.InvokeList { switch entry.ArgumentType { case proto.CombatTypeArgument_COMBAT_TYPE_ARGUMENT_EVT_BEING_HIT: - player.CombatInvokeHandler.AddEntry(entry.ForwardType, entry) + continue case proto.CombatTypeArgument_COMBAT_TYPE_ARGUMENT_ENTITY_MOVE: entityMoveInfo := new(proto.EntityMoveInfo) err := pb.Unmarshal(entry.CombatData, entityMoveInfo) diff --git a/gs/game/user_map.go b/gs/game/user_map.go index fc92c974..d0b92737 100644 --- a/gs/game/user_map.go +++ b/gs/game/user_map.go @@ -115,7 +115,7 @@ func (g *GameManager) PathfindingEnterSceneReq(player *model.Player, payloadMsg } func (g *GameManager) QueryPathReq(player *model.Player, payloadMsg pb.Message) { - logger.Debug("user query path, uid: %v", player.PlayerID) + // logger.Debug("user query path, uid: %v", player.PlayerID) req := payloadMsg.(*proto.QueryPathReq) queryPathRsp := &proto.QueryPathRsp{ diff --git a/gs/game/world_manager.go b/gs/game/world_manager.go index 0a558be0..0370a186 100644 --- a/gs/game/world_manager.go +++ b/gs/game/world_manager.go @@ -4,6 +4,7 @@ import ( "math" "time" + "hk4e/common/mq" "hk4e/gs/constant" "hk4e/gs/game/aoi" "hk4e/gs/model" @@ -73,6 +74,13 @@ func (w *WorldManager) CreateWorld(owner *model.Player) *World { } world.mpLevelEntityId = world.GetNextWorldEntityId(constant.EntityIdTypeConst.MPLEVEL) w.worldMap[worldId] = world + GAME_MANAGER.messageQueue.SendToFight("1", &mq.NetMsg{ + MsgType: mq.MsgTypeFight, + EventId: mq.AddFightRoutine, + FightMsg: &mq.FightMsg{ + FightRoutineId: world.id, + }, + }) return world } @@ -83,6 +91,13 @@ func (w *WorldManager) DestroyWorld(worldId uint32) { player.WorldId = 0 } delete(w.worldMap, worldId) + GAME_MANAGER.messageQueue.SendToFight("1", &mq.NetMsg{ + MsgType: mq.MsgTypeFight, + EventId: mq.DelFightRoutine, + FightMsg: &mq.FightMsg{ + FightRoutineId: world.id, + }, + }) } // GetBigWorld 获取本服务器的AI世界 @@ -624,6 +639,17 @@ func (s *Scene) CreateEntityAvatar(player *model.Player, avatarId uint32) uint32 if avatarId == s.world.GetPlayerActiveAvatarId(player) { s.world.aoiManager.AddEntityIdToGridByPos(entity.id, float32(entity.pos.X), float32(entity.pos.Y), float32(entity.pos.Z)) } + GAME_MANAGER.messageQueue.SendToFight("1", &mq.NetMsg{ + MsgType: mq.MsgTypeFight, + EventId: mq.FightRoutineAddEntity, + FightMsg: &mq.FightMsg{ + FightRoutineId: s.world.id, + EntityId: entity.id, + FightPropMap: entity.fightProp, + Uid: entity.avatarEntity.uid, + AvatarGuid: player.AvatarMap[avatarId].Guid, + }, + }) return entity.id } @@ -661,6 +687,15 @@ func (s *Scene) CreateEntityMonster(pos *model.Vector, level uint8, fightProp ma } s.entityMap[entity.id] = entity s.world.aoiManager.AddEntityIdToGridByPos(entity.id, float32(entity.pos.X), float32(entity.pos.Y), float32(entity.pos.Z)) + GAME_MANAGER.messageQueue.SendToFight("1", &mq.NetMsg{ + MsgType: mq.MsgTypeFight, + EventId: mq.FightRoutineAddEntity, + FightMsg: &mq.FightMsg{ + FightRoutineId: s.world.id, + EntityId: entity.id, + FightPropMap: entity.fightProp, + }, + }) return entity.id } @@ -771,6 +806,14 @@ func (s *Scene) DestroyEntity(entityId uint32) { } s.world.aoiManager.RemoveEntityIdFromGridByPos(entity.id, float32(entity.pos.X), float32(entity.pos.Y), float32(entity.pos.Z)) delete(s.entityMap, entityId) + GAME_MANAGER.messageQueue.SendToFight("1", &mq.NetMsg{ + MsgType: mq.MsgTypeFight, + EventId: mq.FightRoutineDelEntity, + FightMsg: &mq.FightMsg{ + FightRoutineId: s.world.id, + EntityId: entity.id, + }, + }) } func (s *Scene) GetEntity(entityId uint32) *Entity { diff --git a/gs/mq/mq.go b/gs/mq/mq.go deleted file mode 100644 index cd921dff..00000000 --- a/gs/mq/mq.go +++ /dev/null @@ -1,86 +0,0 @@ -package mq - -import ( - "hk4e/pkg/logger" - "hk4e/protocol/cmd" - - "github.com/nats-io/nats.go" - "github.com/vmihailenco/msgpack/v5" - pb "google.golang.org/protobuf/proto" -) - -type MessageQueue struct { - natsConn *nats.Conn - natsMsgChan chan *nats.Msg - netMsgInput chan *cmd.NetMsg - netMsgOutput chan *cmd.NetMsg - cmdProtoMap *cmd.CmdProtoMap -} - -func NewMessageQueue(conn *nats.Conn, netMsgInput chan *cmd.NetMsg, netMsgOutput chan *cmd.NetMsg) (r *MessageQueue) { - r = new(MessageQueue) - r.natsConn = conn - r.natsMsgChan = make(chan *nats.Msg, 10000) - _, err := r.natsConn.ChanSubscribe("GS_CMD_HK4E", r.natsMsgChan) - if err != nil { - logger.Error("nats subscribe error: %v", err) - return nil - } - r.netMsgInput = netMsgInput - r.netMsgOutput = netMsgOutput - r.cmdProtoMap = cmd.NewCmdProtoMap() - return r -} - -func (m *MessageQueue) Start() { - go m.startRecvHandler() - go m.startSendHandler() -} - -func (m *MessageQueue) Close() { - m.natsConn.Close() -} - -func (m *MessageQueue) startRecvHandler() { - for { - natsMsg := <-m.natsMsgChan - // msgpack NetMsg - netMsg := new(cmd.NetMsg) - err := msgpack.Unmarshal(natsMsg.Data, netMsg) - if err != nil { - logger.Error("parse bin to net msg error: %v", err) - continue - } - if netMsg.EventId == cmd.NormalMsg || netMsg.EventId == cmd.UserRegNotify { - // protobuf PayloadMessage - payloadMessage := m.cmdProtoMap.GetProtoObjByCmdId(netMsg.CmdId) - err = pb.Unmarshal(netMsg.PayloadMessageData, payloadMessage) - if err != nil { - logger.Error("parse bin to payload msg error: %v", err) - continue - } - netMsg.PayloadMessage = payloadMessage - } - m.netMsgOutput <- netMsg - } -} - -func (m *MessageQueue) startSendHandler() { - for { - netMsg := <-m.netMsgInput - // protobuf PayloadMessage 已在上一层完成 - // msgpack NetMsg - netMsgData, err := msgpack.Marshal(netMsg) - if err != nil { - logger.Error("parse net msg to bin error: %v", err) - continue - } - natsMsg := nats.NewMsg("GATE_CMD_HK4E") - natsMsg.Data = netMsgData - err = m.natsConn.PublishMsg(natsMsg) - if err != nil { - logger.Error("nats publish msg error: %v", err) - continue - } - } -} diff --git a/protocol/cmd/net_msg.go b/protocol/cmd/net_msg.go deleted file mode 100644 index e8e8d549..00000000 --- a/protocol/cmd/net_msg.go +++ /dev/null @@ -1,23 +0,0 @@ -package cmd - -import pb "google.golang.org/protobuf/proto" - -const ( - NormalMsg = iota - UserRegNotify - UserLoginNotify - UserOfflineNotify - ClientRttNotify - ClientTimeNotify -) - -type NetMsg struct { - UserId uint32 `msgpack:"UserId"` - EventId uint16 `msgpack:"EventId"` - CmdId uint16 `msgpack:"CmdId"` - ClientSeq uint32 `msgpack:"ClientSeq"` - PayloadMessage pb.Message `msgpack:"-"` - PayloadMessageData []byte `msgpack:"PayloadMessageData"` - ClientRtt uint32 `msgpack:"ClientRtt"` - ClientTime uint32 `msgpack:"ClientTime"` -}