拆分战斗服务器

This commit is contained in:
huangxiaolei
2022-12-19 22:11:55 +08:00
parent cf4804c444
commit 0dc45708d6
21 changed files with 793 additions and 349 deletions

View File

@@ -0,0 +1,8 @@
[logger]
level = "DEBUG"
mode = "BOTH"
track = true
max_size = 10485760
[mq]
nats_url = "nats://nats:4222"

25
cmd/fight/main.go Normal file
View File

@@ -0,0 +1,25 @@
package main
import (
"context"
"flag"
"fmt"
_ "net/http/pprof"
"os"
"hk4e/fight/app"
)
var (
config = flag.String("config", "application.toml", "config file")
)
func main() {
flag.Parse()
// go statsviz_serve.Serve("0.0.0.0:2345")
err := app.Run(context.TODO(), *config)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}

23
cmd/hk4e/fight.go Normal file
View File

@@ -0,0 +1,23 @@
package main
import (
"context"
"hk4e/fight/app"
"github.com/spf13/cobra"
)
// FightCmd 检查配表命令
func FightCmd() *cobra.Command {
var cfg string
c := &cobra.Command{
Use: "fight",
Short: "fight server",
RunE: func(cmd *cobra.Command, args []string) error {
return app.Run(context.Background(), cfg)
},
}
c.Flags().StringVar(&cfg, "config", "application.toml", "config file")
return c
}

115
common/mq/nats.go Normal file
View File

@@ -0,0 +1,115 @@
package mq
import (
"hk4e/common/config"
"hk4e/pkg/logger"
"hk4e/protocol/cmd"
"github.com/nats-io/nats.go"
"github.com/vmihailenco/msgpack/v5"
pb "google.golang.org/protobuf/proto"
)
type MessageQueue struct {
natsConn *nats.Conn
natsMsgChan chan *nats.Msg
netMsgInput chan *NetMsg
netMsgOutput chan *NetMsg
cmdProtoMap *cmd.CmdProtoMap
}
func NewMessageQueue(serverType string, appId string) (r *MessageQueue) {
r = new(MessageQueue)
conn, err := nats.Connect(config.CONF.MQ.NatsUrl)
if err != nil {
logger.Error("connect nats error: %v", err)
return nil
}
r.natsConn = conn
r.natsMsgChan = make(chan *nats.Msg, 1000)
_, err = r.natsConn.ChanSubscribe(r.getTopic(serverType, appId), r.natsMsgChan)
if err != nil {
logger.Error("nats subscribe error: %v", err)
return nil
}
r.netMsgInput = make(chan *NetMsg, 1000)
r.netMsgOutput = make(chan *NetMsg, 1000)
r.cmdProtoMap = cmd.NewCmdProtoMap()
go r.recvHandler()
go r.sendHandler()
return r
}
func (m *MessageQueue) Close() {
m.natsConn.Close()
}
func (m *MessageQueue) GetNetMsg() chan *NetMsg {
return m.netMsgOutput
}
func (m *MessageQueue) recvHandler() {
for {
natsMsg := <-m.natsMsgChan
// msgpack NetMsg
netMsg := new(NetMsg)
err := msgpack.Unmarshal(natsMsg.Data, netMsg)
if err != nil {
logger.Error("parse bin to net msg error: %v", err)
continue
}
switch netMsg.MsgType {
case MsgTypeGame:
gameMsg := netMsg.GameMsg
if netMsg.EventId == NormalMsg || netMsg.EventId == UserRegNotify {
// protobuf PayloadMessage
payloadMessage := m.cmdProtoMap.GetProtoObjByCmdId(gameMsg.CmdId)
if payloadMessage == nil {
logger.Error("get protobuf obj by cmd id error: %v", err)
continue
}
err = pb.Unmarshal(gameMsg.PayloadMessageData, payloadMessage)
if err != nil {
logger.Error("parse bin to payload msg error: %v", err)
continue
}
gameMsg.PayloadMessage = payloadMessage
}
case MsgTypeFight:
}
m.netMsgOutput <- netMsg
}
}
func (m *MessageQueue) sendHandler() {
for {
netMsg := <-m.netMsgInput
switch netMsg.MsgType {
case MsgTypeGame:
gameMsg := netMsg.GameMsg
if gameMsg.PayloadMessageData == nil {
// protobuf PayloadMessage
payloadMessageData, err := pb.Marshal(gameMsg.PayloadMessage)
if err != nil {
logger.Error("parse payload msg to bin error: %v", err)
continue
}
gameMsg.PayloadMessageData = payloadMessageData
}
case MsgTypeFight:
}
// msgpack NetMsg
netMsgData, err := msgpack.Marshal(netMsg)
if err != nil {
logger.Error("parse net msg to bin error: %v", err)
continue
}
natsMsg := nats.NewMsg(netMsg.Topic)
natsMsg.Data = netMsgData
err = m.natsConn.PublishMsg(natsMsg)
if err != nil {
logger.Error("nats publish msg error: %v", err)
continue
}
}
}

50
common/mq/net_msg.go Normal file
View File

@@ -0,0 +1,50 @@
package mq
import pb "google.golang.org/protobuf/proto"
const (
MsgTypeGame = iota
MsgTypeFight
)
type NetMsg struct {
MsgType uint8 `msgpack:"MsgType"`
EventId uint16 `msgpack:"EventId"`
Topic string `msgpack:"-"`
GameMsg *GameMsg `msgpack:"GameMsg"`
FightMsg *FightMsg `msgpack:"FightMsg"`
}
const (
NormalMsg = iota
UserRegNotify
UserLoginNotify
UserOfflineNotify
ClientRttNotify
ClientTimeNotify
)
type GameMsg struct {
UserId uint32 `msgpack:"UserId"`
CmdId uint16 `msgpack:"CmdId"`
ClientSeq uint32 `msgpack:"ClientSeq"`
ClientRtt uint32 `msgpack:"ClientRtt"`
ClientTime uint32 `msgpack:"ClientTime"`
PayloadMessage pb.Message `msgpack:"-"`
PayloadMessageData []byte `msgpack:"PayloadMessageData"`
}
const (
AddFightRoutine = iota
DelFightRoutine
FightRoutineAddEntity
FightRoutineDelEntity
)
type FightMsg struct {
FightRoutineId uint32 `msgpack:"FightRoutineId"`
EntityId uint32 `msgpack:"EntityId"`
FightPropMap map[uint32]float32 `msgpack:"FightPropMap"`
Uid uint32 `msgpack:"Uid"`
AvatarGuid uint64 `msgpack:"AvatarGuid"`
}

31
common/mq/topic.go Normal file
View File

@@ -0,0 +1,31 @@
package mq
import (
"strings"
)
const (
GATE = "GATE_${APPID}_HK4E"
GS = "GS_${APPID}_HK4E"
FIGHT = "FIGHT_${APPID}_HK4E"
)
func (m *MessageQueue) getTopic(serverType string, appId string) string {
topic := strings.ReplaceAll(serverType, "${APPID}", appId)
return topic
}
func (m *MessageQueue) SendToGate(appId string, netMsg *NetMsg) {
netMsg.Topic = m.getTopic(GATE, appId)
m.netMsgInput <- netMsg
}
func (m *MessageQueue) SendToGs(appId string, netMsg *NetMsg) {
netMsg.Topic = m.getTopic(GS, appId)
m.netMsgInput <- netMsg
}
func (m *MessageQueue) SendToFight(appId string, netMsg *NetMsg) {
netMsg.Topic = m.getTopic(FIGHT, appId)
m.netMsgInput <- netMsg
}

50
fight/app/app.go Normal file
View File

@@ -0,0 +1,50 @@
package app
import (
"context"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"
"hk4e/common/config"
"hk4e/common/mq"
"hk4e/fight/engine"
"hk4e/gs/constant"
"hk4e/pkg/logger"
)
func Run(ctx context.Context, configFile string) error {
config.InitConfig(configFile)
logger.InitLogger("fight")
logger.Warn("fight start")
constant.InitConstant()
messageQueue := mq.NewMessageQueue(mq.FIGHT, "1")
defer messageQueue.Close()
_ = engine.NewFightEngine(messageQueue)
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
select {
case <-ctx.Done():
return nil
case s := <-c:
logger.Warn("get a signal %s", s.String())
switch s {
case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
logger.Warn("fight exit")
time.Sleep(time.Second)
return nil
case syscall.SIGHUP:
default:
return nil
}
}
}
}

View File

@@ -0,0 +1,258 @@
package engine
import (
"time"
"hk4e/common/mq"
"hk4e/gs/constant"
"hk4e/pkg/logger"
"hk4e/protocol/cmd"
"hk4e/protocol/proto"
pb "google.golang.org/protobuf/proto"
)
type FightEngine struct {
messageQueue *mq.MessageQueue
}
func NewFightEngine(messageQueue *mq.MessageQueue) (r *FightEngine) {
r = new(FightEngine)
r.messageQueue = messageQueue
go r.fightHandle()
return r
}
func (f *FightEngine) fightHandle() {
fightRoutineMsgChanMap := make(map[uint32]chan *mq.NetMsg)
fightRoutineCloseChanMap := make(map[uint32]chan bool)
userIdFightRoutineIdMap := make(map[uint32]uint32)
for {
netMsg := <-f.messageQueue.GetNetMsg()
// logger.Debug("recv net msg, netMsg: %v", netMsg)
switch netMsg.MsgType {
case mq.MsgTypeGame:
gameMsg := netMsg.GameMsg
if netMsg.EventId != mq.NormalMsg {
continue
}
// logger.Debug("recv game msg, gameMsg: %v", gameMsg)
fightRoutineId := userIdFightRoutineIdMap[gameMsg.UserId]
fightRoutineMsgChan := fightRoutineMsgChanMap[fightRoutineId]
fightRoutineMsgChan <- netMsg
case mq.MsgTypeFight:
fightMsg := netMsg.FightMsg
logger.Debug("recv fight msg, fightMsg: %v", fightMsg)
switch netMsg.EventId {
case mq.AddFightRoutine:
fightRoutineMsgChan := make(chan *mq.NetMsg, 1000)
fightRoutineMsgChanMap[fightMsg.FightRoutineId] = fightRoutineMsgChan
fightRoutineCloseChan := make(chan bool, 1)
fightRoutineCloseChanMap[fightMsg.FightRoutineId] = fightRoutineCloseChan
go runFightRoutine(fightMsg.FightRoutineId, fightRoutineMsgChan, fightRoutineCloseChan, f.messageQueue)
case mq.DelFightRoutine:
fightRoutineCloseChan := fightRoutineCloseChanMap[fightMsg.FightRoutineId]
fightRoutineCloseChan <- true
case mq.FightRoutineAddEntity:
if fightMsg.Uid != 0 {
userIdFightRoutineIdMap[fightMsg.Uid] = fightMsg.FightRoutineId
}
fightRoutineMsgChan := fightRoutineMsgChanMap[fightMsg.FightRoutineId]
fightRoutineMsgChan <- netMsg
case mq.FightRoutineDelEntity:
if fightMsg.Uid != 0 {
delete(userIdFightRoutineIdMap, fightMsg.Uid)
}
fightRoutineMsgChan := fightRoutineMsgChanMap[fightMsg.FightRoutineId]
fightRoutineMsgChan <- netMsg
}
}
}
}
// SendMsg 发送消息给客户端
func SendMsg(messageQueue *mq.MessageQueue, cmdId uint16, userId uint32, payloadMsg pb.Message) {
if userId < 100000000 || payloadMsg == nil {
return
}
gameMsg := new(mq.GameMsg)
gameMsg.UserId = userId
gameMsg.CmdId = cmdId
gameMsg.ClientSeq = 0
// 在这里直接序列化成二进制数据 防止发送的消息内包含各种游戏数据指针 而造成并发读写的问题
payloadMessageData, err := pb.Marshal(payloadMsg)
if err != nil {
logger.Error("parse payload msg to bin error: %v", err)
return
}
gameMsg.PayloadMessageData = payloadMessageData
messageQueue.SendToGate("1", &mq.NetMsg{
MsgType: mq.MsgTypeGame,
EventId: mq.NormalMsg,
GameMsg: gameMsg,
})
}
type Entity struct {
entityId uint32
fightPropMap map[uint32]float32
uid uint32
avatarGuid uint64
}
// FightRoutine 战局例程
type FightRoutine struct {
messageQueue *mq.MessageQueue
entityMap map[uint32]*Entity
combatInvokeEntryList []*proto.CombatInvokeEntry
tickCount uint64
}
func runFightRoutine(fightRoutineId uint32, fightRoutineMsgChan chan *mq.NetMsg, fightRoutineCloseChan chan bool, messageQueue *mq.MessageQueue) {
f := new(FightRoutine)
f.messageQueue = messageQueue
f.entityMap = make(map[uint32]*Entity)
f.combatInvokeEntryList = make([]*proto.CombatInvokeEntry, 0)
f.tickCount = 0
logger.Debug("create fight routine, fightRoutineId: %v", fightRoutineId)
ticker := time.NewTicker(time.Millisecond * 10)
for {
select {
case netMsg := <-fightRoutineMsgChan:
switch netMsg.MsgType {
case mq.MsgTypeGame:
gameMsg := netMsg.GameMsg
f.attackHandle(gameMsg)
case mq.MsgTypeFight:
fightMsg := netMsg.FightMsg
switch netMsg.EventId {
case mq.FightRoutineAddEntity:
f.entityMap[fightMsg.EntityId] = &Entity{
entityId: fightMsg.EntityId,
fightPropMap: fightMsg.FightPropMap,
uid: fightMsg.Uid,
avatarGuid: fightMsg.AvatarGuid,
}
case mq.FightRoutineDelEntity:
delete(f.entityMap, fightMsg.EntityId)
}
}
case <-ticker.C:
f.onTick()
case <-fightRoutineCloseChan:
logger.Debug("destroy fight routine, fightRoutineId: %v", fightRoutineId)
return
}
}
}
func (f *FightRoutine) onTick() {
f.tickCount++
now := time.Now().UnixMilli()
if f.tickCount%5 == 0 {
f.onTick50MilliSecond(now)
}
if f.tickCount%100 == 0 {
f.onTickSecond(now)
}
}
func (f *FightRoutine) onTick50MilliSecond(now int64) {
if len(f.combatInvokeEntryList) > 0 {
combatInvocationsNotifyAll := new(proto.CombatInvocationsNotify)
combatInvocationsNotifyAll.InvokeList = f.combatInvokeEntryList
for _, uid := range f.getAllPlayer(f.entityMap) {
SendMsg(f.messageQueue, cmd.CombatInvocationsNotify, uid, combatInvocationsNotifyAll)
}
f.combatInvokeEntryList = make([]*proto.CombatInvokeEntry, 0)
}
}
func (f *FightRoutine) onTickSecond(now int64) {
// 改面板
for _, entity := range f.entityMap {
if entity.uid == 0 {
continue
}
entity.fightPropMap[uint32(constant.FightPropertyConst.FIGHT_PROP_CUR_ATTACK)] = 1000000
entity.fightPropMap[uint32(constant.FightPropertyConst.FIGHT_PROP_CRITICAL)] = 1.0
avatarFightPropNotify := &proto.AvatarFightPropNotify{
AvatarGuid: entity.avatarGuid,
FightPropMap: entity.fightPropMap,
}
SendMsg(f.messageQueue, cmd.AvatarFightPropNotify, entity.uid, avatarFightPropNotify)
}
}
func (f *FightRoutine) attackHandle(gameMsg *mq.GameMsg) {
_ = gameMsg.UserId
cmdId := gameMsg.CmdId
_ = gameMsg.ClientSeq
payloadMsg := gameMsg.PayloadMessage
switch cmdId {
case cmd.CombatInvocationsNotify:
req := payloadMsg.(*proto.CombatInvocationsNotify)
for _, entry := range req.InvokeList {
if entry.ForwardType != proto.ForwardType_FORWARD_TYPE_TO_ALL {
continue
}
if entry.ArgumentType != proto.CombatTypeArgument_COMBAT_TYPE_ARGUMENT_EVT_BEING_HIT {
continue
}
hitInfo := new(proto.EvtBeingHitInfo)
err := pb.Unmarshal(entry.CombatData, hitInfo)
if err != nil {
logger.Error("parse combat invocations entity hit info error: %v", err)
continue
}
attackResult := hitInfo.AttackResult
// logger.Debug("run attack handler, attackResult: %v", attackResult)
target := f.entityMap[attackResult.DefenseId]
if target == nil {
logger.Error("could not found target, defense id: %v", attackResult.DefenseId)
continue
}
attackResult.Damage *= 100
damage := attackResult.Damage
attackerId := attackResult.AttackerId
_ = attackerId
currHp := float32(0)
if target.fightPropMap != nil {
currHp = target.fightPropMap[uint32(constant.FightPropertyConst.FIGHT_PROP_CUR_HP)]
currHp -= damage
if currHp < 0 {
currHp = 0
}
target.fightPropMap[uint32(constant.FightPropertyConst.FIGHT_PROP_CUR_HP)] = currHp
}
entityFightPropUpdateNotify := new(proto.EntityFightPropUpdateNotify)
entityFightPropUpdateNotify.EntityId = target.entityId
entityFightPropUpdateNotify.FightPropMap = make(map[uint32]float32)
entityFightPropUpdateNotify.FightPropMap[uint32(constant.FightPropertyConst.FIGHT_PROP_CUR_HP)] = currHp
for _, uid := range f.getAllPlayer(f.entityMap) {
SendMsg(f.messageQueue, cmd.EntityFightPropUpdateNotify, uid, entityFightPropUpdateNotify)
}
combatData, err := pb.Marshal(hitInfo)
if err != nil {
logger.Error("create combat invocations entity hit info error: %v", err)
}
entry.CombatData = combatData
f.combatInvokeEntryList = append(f.combatInvokeEntryList, entry)
}
}
}
func (f *FightRoutine) getAllPlayer(entityMap map[uint32]*Entity) []uint32 {
uidMap := make(map[uint32]bool)
for _, entity := range entityMap {
if entity.uid != 0 {
uidMap[entity.uid] = true
}
}
uidList := make([]uint32, 0)
for uid := range uidMap {
uidList = append(uidList, uid)
}
return uidList
}

View File

@@ -9,10 +9,9 @@ import (
"time"
"hk4e/common/config"
"hk4e/gate/mq"
"hk4e/common/mq"
"hk4e/gate/net"
"hk4e/pkg/logger"
"hk4e/protocol/cmd"
)
func Run(ctx context.Context, configFile string) error {
@@ -21,10 +20,9 @@ func Run(ctx context.Context, configFile string) error {
logger.InitLogger("gate")
logger.Warn("gate start")
netMsgInput := make(chan *cmd.NetMsg, 10000)
netMsgOutput := make(chan *cmd.NetMsg, 10000)
messageQueue := mq.NewMessageQueue(mq.GATE, "1")
connectManager := net.NewKcpConnectManager(netMsgInput, netMsgOutput)
connectManager := net.NewKcpConnectManager(messageQueue)
connectManager.Start()
go func() {
@@ -34,10 +32,6 @@ func Run(ctx context.Context, configFile string) error {
}
}()
messageQueue := mq.NewMessageQueue(netMsgInput, netMsgOutput)
messageQueue.Start()
defer messageQueue.Close()
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {

View File

@@ -1,98 +0,0 @@
package mq
import (
"hk4e/common/config"
"hk4e/pkg/logger"
"hk4e/protocol/cmd"
"github.com/nats-io/nats.go"
"github.com/vmihailenco/msgpack/v5"
pb "google.golang.org/protobuf/proto"
)
type MessageQueue struct {
natsConn *nats.Conn
natsMsgChan chan *nats.Msg
netMsgInput chan *cmd.NetMsg
netMsgOutput chan *cmd.NetMsg
cmdProtoMap *cmd.CmdProtoMap
}
func NewMessageQueue(netMsgInput chan *cmd.NetMsg, netMsgOutput chan *cmd.NetMsg) (r *MessageQueue) {
r = new(MessageQueue)
conn, err := nats.Connect(config.CONF.MQ.NatsUrl)
if err != nil {
logger.Error("connect nats error: %v", err)
return nil
}
r.natsConn = conn
r.natsMsgChan = make(chan *nats.Msg, 10000)
_, err = r.natsConn.ChanSubscribe("GATE_CMD_HK4E", r.natsMsgChan)
if err != nil {
logger.Error("nats subscribe error: %v", err)
return nil
}
r.netMsgInput = netMsgInput
r.netMsgOutput = netMsgOutput
r.cmdProtoMap = cmd.NewCmdProtoMap()
return r
}
func (m *MessageQueue) Start() {
go m.startRecvHandler()
go m.startSendHandler()
}
func (m *MessageQueue) Close() {
m.natsConn.Close()
}
func (m *MessageQueue) startRecvHandler() {
for {
natsMsg := <-m.natsMsgChan
// msgpack NetMsg
netMsg := new(cmd.NetMsg)
err := msgpack.Unmarshal(natsMsg.Data, netMsg)
if err != nil {
logger.Error("parse bin to net msg error: %v", err)
continue
}
if netMsg.EventId == cmd.NormalMsg {
// protobuf PayloadMessage
payloadMessage := m.cmdProtoMap.GetProtoObjByCmdId(netMsg.CmdId)
err = pb.Unmarshal(netMsg.PayloadMessageData, payloadMessage)
if err != nil {
logger.Error("parse bin to payload msg error: %v", err)
continue
}
netMsg.PayloadMessage = payloadMessage
}
m.netMsgOutput <- netMsg
}
}
func (m *MessageQueue) startSendHandler() {
for {
netMsg := <-m.netMsgInput
// protobuf PayloadMessage
payloadMessageData, err := pb.Marshal(netMsg.PayloadMessage)
if err != nil {
logger.Error("parse payload msg to bin error: %v", err)
continue
}
netMsg.PayloadMessageData = payloadMessageData
// msgpack NetMsg
netMsgData, err := msgpack.Marshal(netMsg)
if err != nil {
logger.Error("parse net msg to bin error: %v", err)
continue
}
natsMsg := nats.NewMsg("GS_CMD_HK4E")
natsMsg.Data = netMsgData
err = m.natsConn.PublishMsg(natsMsg)
if err != nil {
logger.Error("nats publish msg error: %v", err)
continue
}
}
}

View File

@@ -10,6 +10,7 @@ import (
"strings"
"time"
"hk4e/common/mq"
"hk4e/dispatch/controller"
"hk4e/gate/kcp"
"hk4e/pkg/endec"
@@ -73,24 +74,30 @@ func (k *KcpConnectManager) recvMsgHandle(protoMsg *ProtoMsg, session *Session)
rsp.PayloadMessage = playerLoginRsp
k.localMsgOutput <- rsp
// 登录成功 通知GS初始化相关数据
netMsg := new(cmd.NetMsg)
netMsg.UserId = userId
netMsg.EventId = cmd.UserLoginNotify
netMsg.ClientSeq = headMeta.seq
k.netMsgInput <- netMsg
logger.Info("send to gs user login ok, ConvId: %v, UserId: %v", protoMsg.ConvId, netMsg.UserId)
gameMsg := new(mq.GameMsg)
gameMsg.UserId = userId
gameMsg.ClientSeq = headMeta.seq
k.messageQueue.SendToGs("1", &mq.NetMsg{
MsgType: mq.MsgTypeGame,
EventId: mq.UserLoginNotify,
GameMsg: gameMsg,
})
logger.Info("send to gs user login ok, ConvId: %v, UserId: %v", protoMsg.ConvId, gameMsg.UserId)
case cmd.SetPlayerBornDataReq:
// 玩家注册请求
if connState != ConnAlive {
return
}
netMsg := new(cmd.NetMsg)
netMsg.UserId = userId
netMsg.EventId = cmd.UserRegNotify
netMsg.CmdId = cmd.SetPlayerBornDataReq
netMsg.ClientSeq = protoMsg.HeadMessage.ClientSequenceId
netMsg.PayloadMessage = protoMsg.PayloadMessage
k.netMsgInput <- netMsg
gameMsg := new(mq.GameMsg)
gameMsg.UserId = userId
gameMsg.CmdId = cmd.SetPlayerBornDataReq
gameMsg.ClientSeq = protoMsg.HeadMessage.ClientSequenceId
gameMsg.PayloadMessage = protoMsg.PayloadMessage
k.messageQueue.SendToGs("1", &mq.NetMsg{
MsgType: mq.MsgTypeGame,
EventId: mq.UserRegNotify,
GameMsg: gameMsg,
})
case cmd.PlayerForceExitReq:
// 玩家退出游戏请求
if connState != ConnAlive {
@@ -119,34 +126,55 @@ func (k *KcpConnectManager) recvMsgHandle(protoMsg *ProtoMsg, session *Session)
rsp.PayloadMessage = pingRsp
k.localMsgOutput <- rsp
// 通知GS玩家客户端的本地时钟
netMsg := new(cmd.NetMsg)
netMsg.UserId = userId
netMsg.EventId = cmd.ClientTimeNotify
netMsg.ClientTime = pingReq.ClientTime
k.netMsgInput <- netMsg
gameMsg := new(mq.GameMsg)
gameMsg.UserId = userId
gameMsg.ClientTime = pingReq.ClientTime
k.messageQueue.SendToGs("1", &mq.NetMsg{
MsgType: mq.MsgTypeGame,
EventId: mq.ClientTimeNotify,
GameMsg: gameMsg,
})
// RTT
logger.Debug("convId: %v, RTO: %v, SRTT: %v, RTTVar: %v", protoMsg.ConvId, session.conn.GetRTO(), session.conn.GetSRTT(), session.conn.GetSRTTVar())
// 客户端往返时延通知
rtt := session.conn.GetSRTT()
// 通知GS玩家客户端往返时延
netMsg = new(cmd.NetMsg)
netMsg.UserId = userId
netMsg.EventId = cmd.ClientRttNotify
netMsg.ClientRtt = uint32(rtt)
k.netMsgInput <- netMsg
gameMsg = new(mq.GameMsg)
gameMsg.UserId = userId
gameMsg.ClientRtt = uint32(rtt)
k.messageQueue.SendToGs("1", &mq.NetMsg{
MsgType: mq.MsgTypeGame,
EventId: mq.ClientRttNotify,
GameMsg: gameMsg,
})
default:
// 转发到GS
// 未登录禁止访问GS
// 未登录禁止访问
if connState != ConnAlive {
return
}
netMsg := new(cmd.NetMsg)
netMsg.UserId = userId
netMsg.EventId = cmd.NormalMsg
netMsg.CmdId = protoMsg.CmdId
netMsg.ClientSeq = protoMsg.HeadMessage.ClientSequenceId
netMsg.PayloadMessage = protoMsg.PayloadMessage
k.netMsgInput <- netMsg
// 转发到FIGHT
if protoMsg.CmdId == cmd.CombatInvocationsNotify {
gameMsg := new(mq.GameMsg)
gameMsg.UserId = userId
gameMsg.CmdId = protoMsg.CmdId
gameMsg.ClientSeq = protoMsg.HeadMessage.ClientSequenceId
gameMsg.PayloadMessage = protoMsg.PayloadMessage
k.messageQueue.SendToFight("1", &mq.NetMsg{
MsgType: mq.MsgTypeGame,
EventId: mq.NormalMsg,
GameMsg: gameMsg,
})
}
// 转发到GS
gameMsg := new(mq.GameMsg)
gameMsg.UserId = userId
gameMsg.CmdId = protoMsg.CmdId
gameMsg.ClientSeq = protoMsg.HeadMessage.ClientSequenceId
gameMsg.PayloadMessage = protoMsg.PayloadMessage
k.messageQueue.SendToGs("1", &mq.NetMsg{
MsgType: mq.MsgTypeGame,
EventId: mq.NormalMsg,
GameMsg: gameMsg,
})
}
}
@@ -179,22 +207,27 @@ func (k *KcpConnectManager) sendMsgHandle() {
close(session.kcpRawSendChan)
case protoMsg := <-k.localMsgOutput:
sendToClientFn(protoMsg)
case netMsg := <-k.netMsgOutput:
convId, exist := userIdConvMap[netMsg.UserId]
case netMsg := <-k.messageQueue.GetNetMsg():
if netMsg.MsgType != mq.MsgTypeGame {
logger.Error("recv unknown msg type from game server, msg type: %v", netMsg.MsgType)
continue
}
if netMsg.EventId != mq.NormalMsg {
logger.Error("recv unknown event from game server, event id: %v", netMsg.EventId)
continue
}
gameMsg := netMsg.GameMsg
convId, exist := userIdConvMap[gameMsg.UserId]
if !exist {
logger.Error("can not find convId by userId")
continue
}
if netMsg.EventId == cmd.NormalMsg {
protoMsg := new(ProtoMsg)
protoMsg.ConvId = convId
protoMsg.CmdId = netMsg.CmdId
protoMsg.HeadMessage = k.getHeadMsg(netMsg.ClientSeq)
protoMsg.PayloadMessage = netMsg.PayloadMessage
sendToClientFn(protoMsg)
} else {
logger.Error("recv unknown event from game server, event id: %v", netMsg.EventId)
}
protoMsg := new(ProtoMsg)
protoMsg.ConvId = convId
protoMsg.CmdId = gameMsg.CmdId
protoMsg.HeadMessage = k.getHeadMsg(gameMsg.ClientSeq)
protoMsg.PayloadMessage = gameMsg.PayloadMessage
sendToClientFn(protoMsg)
}
}
}

View File

@@ -8,6 +8,7 @@ import (
"time"
"hk4e/common/config"
"hk4e/common/mq"
"hk4e/common/region"
"hk4e/dispatch/controller"
"hk4e/gate/kcp"
@@ -18,6 +19,8 @@ import (
"hk4e/protocol/proto"
)
const PacketFreqLimit = 1000
type KcpConnectManager struct {
openState bool
sessionConvIdMap map[uint64]*Session
@@ -26,8 +29,7 @@ type KcpConnectManager struct {
kcpEventInput chan *KcpEvent
kcpEventOutput chan *KcpEvent
cmdProtoMap *cmd.CmdProtoMap
netMsgInput chan *cmd.NetMsg
netMsgOutput chan *cmd.NetMsg
messageQueue *mq.MessageQueue
localMsgOutput chan *ProtoMsg
createSessionChan chan *Session
destroySessionChan chan *Session
@@ -38,7 +40,7 @@ type KcpConnectManager struct {
encRsaKeyMap map[string][]byte
}
func NewKcpConnectManager(netMsgInput chan *cmd.NetMsg, netMsgOutput chan *cmd.NetMsg) (r *KcpConnectManager) {
func NewKcpConnectManager(messageQueue *mq.MessageQueue) (r *KcpConnectManager) {
r = new(KcpConnectManager)
r.openState = true
r.sessionConvIdMap = make(map[uint64]*Session)
@@ -46,8 +48,7 @@ func NewKcpConnectManager(netMsgInput chan *cmd.NetMsg, netMsgOutput chan *cmd.N
r.kcpEventInput = make(chan *KcpEvent, 1000)
r.kcpEventOutput = make(chan *KcpEvent, 1000)
r.cmdProtoMap = cmd.NewCmdProtoMap()
r.netMsgInput = netMsgInput
r.netMsgOutput = netMsgOutput
r.messageQueue = messageQueue
r.localMsgOutput = make(chan *ProtoMsg, 1000)
r.createSessionChan = make(chan *Session, 1000)
r.destroySessionChan = make(chan *Session, 1000)
@@ -245,7 +246,7 @@ func (k *KcpConnectManager) recvHandle(session *Session) {
pktFreqLimitCounter++
now := time.Now().UnixNano()
if now-pktFreqLimitTimer > int64(time.Second) {
if pktFreqLimitCounter > 100 {
if pktFreqLimitCounter > PacketFreqLimit {
logger.Error("exit recv loop, client packet send freq too high, convId: %v, pps: %v", convId, pktFreqLimitCounter)
k.closeKcpConn(session, kcp.EnetPacketFreqTooHigh)
break
@@ -271,8 +272,6 @@ func (k *KcpConnectManager) sendHandle(session *Session) {
// 发送
conn := session.conn
convId := conn.GetConv()
pktFreqLimitCounter := 0
pktFreqLimitTimer := time.Now().UnixNano()
for {
protoMsg, ok := <-session.kcpRawSendChan
if !ok {
@@ -293,19 +292,6 @@ func (k *KcpConnectManager) sendHandle(session *Session) {
k.closeKcpConn(session, kcp.EnetServerKick)
break
}
// 发包频率限制
pktFreqLimitCounter++
now := time.Now().UnixNano()
if now-pktFreqLimitTimer > int64(time.Second) {
if pktFreqLimitCounter > 100 {
logger.Error("exit send loop, server packet send freq too high, convId: %v, pps: %v", convId, pktFreqLimitCounter)
k.closeKcpConn(session, kcp.EnetPacketFreqTooHigh)
break
} else {
pktFreqLimitCounter = 0
}
pktFreqLimitTimer = now
}
}
}
@@ -332,11 +318,14 @@ func (k *KcpConnectManager) closeKcpConn(session *Session, enetType uint32) {
EventId: KcpConnCloseNotify,
}
// 通知GS玩家下线
netMsg := new(cmd.NetMsg)
netMsg.UserId = session.userId
netMsg.EventId = cmd.UserOfflineNotify
k.netMsgInput <- netMsg
logger.Info("send to gs user offline, ConvId: %v, UserId: %v", convId, netMsg.UserId)
gameMsg := new(mq.GameMsg)
gameMsg.UserId = session.userId
k.messageQueue.SendToGs("1", &mq.NetMsg{
MsgType: mq.MsgTypeGame,
EventId: mq.UserOfflineNotify,
GameMsg: gameMsg,
})
logger.Info("send to gs user offline, ConvId: %v, UserId: %v", convId, gameMsg.UserId)
k.destroySessionChan <- session
}

View File

@@ -9,15 +9,14 @@ import (
"time"
"hk4e/common/config"
"hk4e/common/mq"
"hk4e/gdconf"
gdc "hk4e/gs/config"
"hk4e/gs/constant"
"hk4e/gs/dao"
"hk4e/gs/game"
"hk4e/gs/mq"
"hk4e/gs/service"
"hk4e/pkg/logger"
"hk4e/protocol/cmd"
"github.com/nats-io/nats.go"
)
@@ -46,14 +45,10 @@ func Run(ctx context.Context, configFile string) error {
}
defer db.CloseDao()
netMsgInput := make(chan *cmd.NetMsg, 10000)
netMsgOutput := make(chan *cmd.NetMsg, 10000)
messageQueue := mq.NewMessageQueue(conn, netMsgInput, netMsgOutput)
messageQueue.Start()
messageQueue := mq.NewMessageQueue(mq.GS, "1")
defer messageQueue.Close()
gameManager := game.NewGameManager(db, netMsgInput, netMsgOutput)
gameManager := game.NewGameManager(db, messageQueue)
gameManager.Start()
defer gameManager.Stop()

View File

@@ -3,6 +3,7 @@ package game
import (
"time"
"hk4e/common/mq"
"hk4e/gs/dao"
"hk4e/gs/model"
"hk4e/pkg/alg"
@@ -24,16 +25,14 @@ var COMMAND_MANAGER *CommandManager = nil
type GameManager struct {
dao *dao.Dao
netMsgInput chan *cmd.NetMsg
netMsgOutput chan *cmd.NetMsg
messageQueue *mq.MessageQueue
snowflake *alg.SnowflakeWorker
}
func NewGameManager(dao *dao.Dao, netMsgInput chan *cmd.NetMsg, netMsgOutput chan *cmd.NetMsg) (r *GameManager) {
func NewGameManager(dao *dao.Dao, messageQueue *mq.MessageQueue) (r *GameManager) {
r = new(GameManager)
r.dao = dao
r.netMsgInput = netMsgInput
r.netMsgOutput = netMsgOutput
r.messageQueue = messageQueue
r.snowflake = alg.NewSnowflakeWorker(1)
GAME_MANAGER = r
LOCAL_EVENT_MANAGER = NewLocalEventManager()
@@ -49,20 +48,62 @@ func (g *GameManager) Start() {
ROUTE_MANAGER.InitRoute()
USER_MANAGER.StartAutoSaveUser()
go func() {
intervalTime := time.Second.Nanoseconds() * 60
lastTime := time.Now().UnixNano()
routeCost := int64(0)
tickCost := int64(0)
localEventCost := int64(0)
commandCost := int64(0)
for {
now := time.Now().UnixNano()
if now-lastTime > intervalTime {
routeCost /= 1e6
tickCost /= 1e6
localEventCost /= 1e6
commandCost /= 1e6
logger.Info("[GAME MAIN LOOP] cpu time cost detail, routeCost: %vms, tickCost: %vms, localEventCost: %vms, commandCost: %vms",
routeCost, tickCost, localEventCost, commandCost)
totalCost := routeCost + tickCost + localEventCost + commandCost
logger.Info("[GAME MAIN LOOP] cpu time cost percent, routeCost: %v%%, tickCost: %v%%, localEventCost: %v%%, commandCost: %v%%",
float32(routeCost)/float32(totalCost)*100.0,
float32(tickCost)/float32(totalCost)*100.0,
float32(localEventCost)/float32(totalCost)*100.0,
float32(commandCost)/float32(totalCost)*100.0)
logger.Info("[GAME MAIN LOOP] total cpu time cost detail, totalCost: %vms",
totalCost)
logger.Info("[GAME MAIN LOOP] total cpu time cost percent, totalCost: %v%%",
float32(totalCost)/float32(intervalTime/1e6)*100.0)
lastTime = now
routeCost = 0
tickCost = 0
localEventCost = 0
commandCost = 0
}
select {
case netMsg := <-g.netMsgOutput:
case netMsg := <-g.messageQueue.GetNetMsg():
// 接收客户端消息
start := time.Now().UnixNano()
ROUTE_MANAGER.RouteHandle(netMsg)
end := time.Now().UnixNano()
routeCost += end - start
case <-TICK_MANAGER.ticker.C:
// 游戏服务器定时帧
start := time.Now().UnixNano()
TICK_MANAGER.OnGameServerTick()
end := time.Now().UnixNano()
tickCost += end - start
case localEvent := <-LOCAL_EVENT_MANAGER.localEventChan:
// 处理本地事件
start := time.Now().UnixNano()
LOCAL_EVENT_MANAGER.LocalEventHandle(localEvent)
end := time.Now().UnixNano()
localEventCost += end - start
case command := <-COMMAND_MANAGER.commandTextInput:
// 处理传入的命令 (普通玩家 GM命令)
start := time.Now().UnixNano()
COMMAND_MANAGER.HandleCommand(command)
end := time.Now().UnixNano()
commandCost += end - start
}
}
}()
@@ -82,19 +123,22 @@ func (g *GameManager) SendMsg(cmdId uint16, userId uint32, clientSeq uint32, pay
if userId < 100000000 || payloadMsg == nil {
return
}
netMsg := new(cmd.NetMsg)
netMsg.UserId = userId
netMsg.EventId = cmd.NormalMsg
netMsg.CmdId = cmdId
netMsg.ClientSeq = clientSeq
gameMsg := new(mq.GameMsg)
gameMsg.UserId = userId
gameMsg.CmdId = cmdId
gameMsg.ClientSeq = clientSeq
// 在这里直接序列化成二进制数据 防止发送的消息内包含各种游戏数据指针 而造成并发读写的问题
payloadMessageData, err := pb.Marshal(payloadMsg)
if err != nil {
logger.Error("parse payload msg to bin error: %v", err)
return
}
netMsg.PayloadMessageData = payloadMessageData
g.netMsgInput <- netMsg
gameMsg.PayloadMessageData = payloadMessageData
g.messageQueue.SendToGate("1", &mq.NetMsg{
MsgType: mq.MsgTypeGame,
EventId: mq.NormalMsg,
GameMsg: gameMsg,
})
}
// CommonRetError 通用返回错误码

View File

@@ -1,6 +1,7 @@
package game
import (
"hk4e/common/mq"
"hk4e/gs/model"
"hk4e/pkg/logger"
"hk4e/protocol/cmd"
@@ -116,19 +117,23 @@ func (r *RouteManager) InitRoute() {
r.registerRouter(cmd.VehicleInteractReq, GAME_MANAGER.VehicleInteractReq)
}
func (r *RouteManager) RouteHandle(netMsg *cmd.NetMsg) {
func (r *RouteManager) RouteHandle(netMsg *mq.NetMsg) {
if netMsg.MsgType != mq.MsgTypeGame {
return
}
gameMsg := netMsg.GameMsg
switch netMsg.EventId {
case cmd.NormalMsg:
r.doRoute(netMsg.CmdId, netMsg.UserId, netMsg.ClientSeq, netMsg.PayloadMessage)
case cmd.UserRegNotify:
GAME_MANAGER.OnReg(netMsg.UserId, netMsg.ClientSeq, netMsg.PayloadMessage)
case cmd.UserLoginNotify:
GAME_MANAGER.OnLogin(netMsg.UserId, netMsg.ClientSeq)
case cmd.UserOfflineNotify:
GAME_MANAGER.OnUserOffline(netMsg.UserId)
case cmd.ClientRttNotify:
GAME_MANAGER.ClientRttNotify(netMsg.UserId, netMsg.ClientRtt)
case cmd.ClientTimeNotify:
GAME_MANAGER.ClientTimeNotify(netMsg.UserId, netMsg.ClientTime)
case mq.NormalMsg:
r.doRoute(gameMsg.CmdId, gameMsg.UserId, gameMsg.ClientSeq, gameMsg.PayloadMessage)
case mq.UserRegNotify:
GAME_MANAGER.OnReg(gameMsg.UserId, gameMsg.ClientSeq, gameMsg.PayloadMessage)
case mq.UserLoginNotify:
GAME_MANAGER.OnLogin(gameMsg.UserId, gameMsg.ClientSeq)
case mq.UserOfflineNotify:
GAME_MANAGER.OnUserOffline(gameMsg.UserId)
case mq.ClientRttNotify:
GAME_MANAGER.ClientRttNotify(gameMsg.UserId, gameMsg.ClientRtt)
case mq.ClientTimeNotify:
GAME_MANAGER.ClientTimeNotify(gameMsg.UserId, gameMsg.ClientTime)
}
}

View File

@@ -132,32 +132,6 @@ func (t *TickManager) onTick10Second(now int64) {
GAME_MANAGER.SendMsg(cmd.PlayerTimeNotify, player.PlayerID, 0, playerTimeNotify)
}
}
if !world.IsBigWorld() && (world.multiplayer || !world.owner.Pause) {
// 刷怪
scene := world.GetSceneById(3)
monsterEntityCount := 0
for _, entity := range scene.entityMap {
if entity.entityType == uint32(proto.ProtEntityType_PROT_ENTITY_TYPE_MONSTER) {
monsterEntityCount++
}
}
if monsterEntityCount < 30 {
monsterEntityId := t.createMonster(scene)
bigWorldOwner := USER_MANAGER.GetOnlineUser(1)
GAME_MANAGER.AddSceneEntityNotify(bigWorldOwner, proto.VisionType_VISION_TYPE_BORN, []uint32{monsterEntityId}, true, false)
}
}
for _, player := range world.playerMap {
if world.multiplayer || !world.owner.Pause {
// 改面板
for _, worldAvatar := range world.GetPlayerWorldAvatarList(player) {
avatar := player.AvatarMap[worldAvatar.avatarId]
avatar.FightPropMap[uint32(constant.FightPropertyConst.FIGHT_PROP_CUR_ATTACK)] = 1000000
avatar.FightPropMap[uint32(constant.FightPropertyConst.FIGHT_PROP_CRITICAL)] = 1.0
GAME_MANAGER.UpdateUserAvatarFightProp(player.PlayerID, worldAvatar.avatarId)
}
}
}
}
}
@@ -236,6 +210,20 @@ func (t *TickManager) onTickSecond(now int64) {
}
GAME_MANAGER.SendMsg(cmd.WorldPlayerRTTNotify, player.PlayerID, 0, worldPlayerRTTNotify)
}
if !world.IsBigWorld() && world.owner.SceneLoadState == model.SceneEnterDone {
// 刷怪
scene := world.GetSceneById(3)
monsterEntityCount := 0
for _, entity := range scene.entityMap {
if entity.entityType == uint32(proto.ProtEntityType_PROT_ENTITY_TYPE_MONSTER) {
monsterEntityCount++
}
}
if monsterEntityCount < 30 {
monsterEntityId := t.createMonster(scene)
GAME_MANAGER.AddSceneEntityNotify(world.owner, proto.VisionType_VISION_TYPE_BORN, []uint32{monsterEntityId}, true, false)
}
}
}
}

View File

@@ -94,7 +94,7 @@ func (g *GameManager) CombatInvocationsNotify(player *model.Player, payloadMsg p
for _, entry := range req.InvokeList {
switch entry.ArgumentType {
case proto.CombatTypeArgument_COMBAT_TYPE_ARGUMENT_EVT_BEING_HIT:
player.CombatInvokeHandler.AddEntry(entry.ForwardType, entry)
continue
case proto.CombatTypeArgument_COMBAT_TYPE_ARGUMENT_ENTITY_MOVE:
entityMoveInfo := new(proto.EntityMoveInfo)
err := pb.Unmarshal(entry.CombatData, entityMoveInfo)

View File

@@ -115,7 +115,7 @@ func (g *GameManager) PathfindingEnterSceneReq(player *model.Player, payloadMsg
}
func (g *GameManager) QueryPathReq(player *model.Player, payloadMsg pb.Message) {
logger.Debug("user query path, uid: %v", player.PlayerID)
// logger.Debug("user query path, uid: %v", player.PlayerID)
req := payloadMsg.(*proto.QueryPathReq)
queryPathRsp := &proto.QueryPathRsp{

View File

@@ -4,6 +4,7 @@ import (
"math"
"time"
"hk4e/common/mq"
"hk4e/gs/constant"
"hk4e/gs/game/aoi"
"hk4e/gs/model"
@@ -73,6 +74,13 @@ func (w *WorldManager) CreateWorld(owner *model.Player) *World {
}
world.mpLevelEntityId = world.GetNextWorldEntityId(constant.EntityIdTypeConst.MPLEVEL)
w.worldMap[worldId] = world
GAME_MANAGER.messageQueue.SendToFight("1", &mq.NetMsg{
MsgType: mq.MsgTypeFight,
EventId: mq.AddFightRoutine,
FightMsg: &mq.FightMsg{
FightRoutineId: world.id,
},
})
return world
}
@@ -83,6 +91,13 @@ func (w *WorldManager) DestroyWorld(worldId uint32) {
player.WorldId = 0
}
delete(w.worldMap, worldId)
GAME_MANAGER.messageQueue.SendToFight("1", &mq.NetMsg{
MsgType: mq.MsgTypeFight,
EventId: mq.DelFightRoutine,
FightMsg: &mq.FightMsg{
FightRoutineId: world.id,
},
})
}
// GetBigWorld 获取本服务器的AI世界
@@ -624,6 +639,17 @@ func (s *Scene) CreateEntityAvatar(player *model.Player, avatarId uint32) uint32
if avatarId == s.world.GetPlayerActiveAvatarId(player) {
s.world.aoiManager.AddEntityIdToGridByPos(entity.id, float32(entity.pos.X), float32(entity.pos.Y), float32(entity.pos.Z))
}
GAME_MANAGER.messageQueue.SendToFight("1", &mq.NetMsg{
MsgType: mq.MsgTypeFight,
EventId: mq.FightRoutineAddEntity,
FightMsg: &mq.FightMsg{
FightRoutineId: s.world.id,
EntityId: entity.id,
FightPropMap: entity.fightProp,
Uid: entity.avatarEntity.uid,
AvatarGuid: player.AvatarMap[avatarId].Guid,
},
})
return entity.id
}
@@ -661,6 +687,15 @@ func (s *Scene) CreateEntityMonster(pos *model.Vector, level uint8, fightProp ma
}
s.entityMap[entity.id] = entity
s.world.aoiManager.AddEntityIdToGridByPos(entity.id, float32(entity.pos.X), float32(entity.pos.Y), float32(entity.pos.Z))
GAME_MANAGER.messageQueue.SendToFight("1", &mq.NetMsg{
MsgType: mq.MsgTypeFight,
EventId: mq.FightRoutineAddEntity,
FightMsg: &mq.FightMsg{
FightRoutineId: s.world.id,
EntityId: entity.id,
FightPropMap: entity.fightProp,
},
})
return entity.id
}
@@ -771,6 +806,14 @@ func (s *Scene) DestroyEntity(entityId uint32) {
}
s.world.aoiManager.RemoveEntityIdFromGridByPos(entity.id, float32(entity.pos.X), float32(entity.pos.Y), float32(entity.pos.Z))
delete(s.entityMap, entityId)
GAME_MANAGER.messageQueue.SendToFight("1", &mq.NetMsg{
MsgType: mq.MsgTypeFight,
EventId: mq.FightRoutineDelEntity,
FightMsg: &mq.FightMsg{
FightRoutineId: s.world.id,
EntityId: entity.id,
},
})
}
func (s *Scene) GetEntity(entityId uint32) *Entity {

View File

@@ -1,86 +0,0 @@
package mq
import (
"hk4e/pkg/logger"
"hk4e/protocol/cmd"
"github.com/nats-io/nats.go"
"github.com/vmihailenco/msgpack/v5"
pb "google.golang.org/protobuf/proto"
)
type MessageQueue struct {
natsConn *nats.Conn
natsMsgChan chan *nats.Msg
netMsgInput chan *cmd.NetMsg
netMsgOutput chan *cmd.NetMsg
cmdProtoMap *cmd.CmdProtoMap
}
func NewMessageQueue(conn *nats.Conn, netMsgInput chan *cmd.NetMsg, netMsgOutput chan *cmd.NetMsg) (r *MessageQueue) {
r = new(MessageQueue)
r.natsConn = conn
r.natsMsgChan = make(chan *nats.Msg, 10000)
_, err := r.natsConn.ChanSubscribe("GS_CMD_HK4E", r.natsMsgChan)
if err != nil {
logger.Error("nats subscribe error: %v", err)
return nil
}
r.netMsgInput = netMsgInput
r.netMsgOutput = netMsgOutput
r.cmdProtoMap = cmd.NewCmdProtoMap()
return r
}
func (m *MessageQueue) Start() {
go m.startRecvHandler()
go m.startSendHandler()
}
func (m *MessageQueue) Close() {
m.natsConn.Close()
}
func (m *MessageQueue) startRecvHandler() {
for {
natsMsg := <-m.natsMsgChan
// msgpack NetMsg
netMsg := new(cmd.NetMsg)
err := msgpack.Unmarshal(natsMsg.Data, netMsg)
if err != nil {
logger.Error("parse bin to net msg error: %v", err)
continue
}
if netMsg.EventId == cmd.NormalMsg || netMsg.EventId == cmd.UserRegNotify {
// protobuf PayloadMessage
payloadMessage := m.cmdProtoMap.GetProtoObjByCmdId(netMsg.CmdId)
err = pb.Unmarshal(netMsg.PayloadMessageData, payloadMessage)
if err != nil {
logger.Error("parse bin to payload msg error: %v", err)
continue
}
netMsg.PayloadMessage = payloadMessage
}
m.netMsgOutput <- netMsg
}
}
func (m *MessageQueue) startSendHandler() {
for {
netMsg := <-m.netMsgInput
// protobuf PayloadMessage 已在上一层完成
// msgpack NetMsg
netMsgData, err := msgpack.Marshal(netMsg)
if err != nil {
logger.Error("parse net msg to bin error: %v", err)
continue
}
natsMsg := nats.NewMsg("GATE_CMD_HK4E")
natsMsg.Data = netMsgData
err = m.natsConn.PublishMsg(natsMsg)
if err != nil {
logger.Error("nats publish msg error: %v", err)
continue
}
}
}

View File

@@ -1,23 +0,0 @@
package cmd
import pb "google.golang.org/protobuf/proto"
const (
NormalMsg = iota
UserRegNotify
UserLoginNotify
UserOfflineNotify
ClientRttNotify
ClientTimeNotify
)
type NetMsg struct {
UserId uint32 `msgpack:"UserId"`
EventId uint16 `msgpack:"EventId"`
CmdId uint16 `msgpack:"CmdId"`
ClientSeq uint32 `msgpack:"ClientSeq"`
PayloadMessage pb.Message `msgpack:"-"`
PayloadMessageData []byte `msgpack:"PayloadMessageData"`
ClientRtt uint32 `msgpack:"ClientRtt"`
ClientTime uint32 `msgpack:"ClientTime"`
}