diff --git a/dispatch/controller/controller.go b/dispatch/controller/controller.go index ea4cc4eb..45735166 100644 --- a/dispatch/controller/controller.go +++ b/dispatch/controller/controller.go @@ -2,8 +2,7 @@ package controller import ( "encoding/base64" - "encoding/binary" - "github.com/nats-io/nats.go" + "hk4e/pkg/random" "net/http" "strconv" @@ -23,6 +22,7 @@ type Controller struct { signRsaKey []byte encRsaKeyMap map[string][]byte pwdRsaKey []byte + dispatchEc2b *random.Ec2b } func NewController(dao *dao.Dao) (r *Controller) { @@ -31,25 +31,7 @@ func NewController(dao *dao.Dao) (r *Controller) { r.regionListBase64 = "" r.regionCurrBase64 = "" regionCurr, regionList, dispatchEc2b := region.InitRegion(config.CONF.Hk4e.KcpAddr, config.CONF.Hk4e.KcpPort) - - // TODO 临时写一下用来传递新的密钥后面改RPC - conn, err := nats.Connect(config.CONF.MQ.NatsUrl) - if err != nil { - logger.LOG.Error("connect nats error: %v", err) - return nil - } - natsMsg := nats.NewMsg("GATE_KEY_HK4E") - natsMsg.Data = make([]byte, 8) - dispatchEc2bSeed := dispatchEc2b.Seed() - binary.BigEndian.PutUint64(natsMsg.Data, dispatchEc2bSeed) - err = conn.PublishMsg(natsMsg) - if err != nil { - logger.LOG.Error("nats publish msg error: %v", err) - return nil - } - conn.Close() - logger.LOG.Debug("send new dispatch ec2b seed: %v", dispatchEc2bSeed) - + r.dispatchEc2b = dispatchEc2b r.signRsaKey, r.encRsaKeyMap, r.pwdRsaKey = region.LoadRsaKey() regionCurrModify, err := pb.Marshal(regionCurr) if err != nil { @@ -161,6 +143,7 @@ func (c *Controller) registerRouter() { } engine.Use(c.authorize()) engine.POST("/gate/token/verify", c.gateTokenVerify) + engine.GET("/dispatch/ec2b/seed", c.getDispatchEc2bSeed) port := config.CONF.HttpPort addr := ":" + strconv.Itoa(int(port)) err := engine.Run(addr) diff --git a/dispatch/controller/gate_controller.go b/dispatch/controller/gate_controller.go index 8d2f8bdd..8429f638 100644 --- a/dispatch/controller/gate_controller.go +++ b/dispatch/controller/gate_controller.go @@ -47,3 +47,14 @@ func (c *Controller) gateTokenVerify(context *gin.Context) { PlayerID: uint32(account.PlayerID), }) } + +type DispatchEc2bSeedRsp struct { + Seed string `json:"seed"` +} + +func (c *Controller) getDispatchEc2bSeed(context *gin.Context) { + dispatchEc2bSeed := c.dispatchEc2b.Seed() + context.JSON(http.StatusOK, &DispatchEc2bSeedRsp{ + Seed: strconv.FormatUint(dispatchEc2bSeed, 10), + }) +} diff --git a/gate/app/app.go b/gate/app/app.go index 633c31ae..f8d892e0 100644 --- a/gate/app/app.go +++ b/gate/app/app.go @@ -9,7 +9,6 @@ import ( "time" "hk4e/common/config" - "hk4e/gate/forward" "hk4e/gate/mq" "hk4e/gate/net" "hk4e/pkg/logger" @@ -22,20 +21,20 @@ func Run(ctx context.Context, configFile string) error { logger.InitLogger("gate") logger.LOG.Info("gate start") - kcpEventInput := make(chan *net.KcpEvent) - kcpEventOutput := make(chan *net.KcpEvent) - protoMsgInput := make(chan *net.ProtoMsg, 10000) - protoMsgOutput := make(chan *net.ProtoMsg, 10000) netMsgInput := make(chan *cmd.NetMsg, 10000) netMsgOutput := make(chan *cmd.NetMsg, 10000) - connectManager := net.NewKcpConnectManager(protoMsgInput, protoMsgOutput, kcpEventInput, kcpEventOutput) + connectManager := net.NewKcpConnectManager(netMsgInput, netMsgOutput) connectManager.Start() - forwardManager := forward.NewForwardManager(protoMsgInput, protoMsgOutput, kcpEventInput, kcpEventOutput, netMsgInput, netMsgOutput) - forwardManager.Start() + go func() { + outputChan := connectManager.GetKcpEventOutputChan() + for { + <-outputChan + } + }() - messageQueue := mq.NewMessageQueue(netMsgInput, netMsgOutput, kcpEventInput) + messageQueue := mq.NewMessageQueue(netMsgInput, netMsgOutput) messageQueue.Start() defer messageQueue.Close() diff --git a/gate/entity/gm/kick_player_info.go b/gate/entity/gm/kick_player_info.go deleted file mode 100644 index 75b591d4..00000000 --- a/gate/entity/gm/kick_player_info.go +++ /dev/null @@ -1,6 +0,0 @@ -package gm - -type KickPlayerInfo struct { - UserId uint32 - Reason uint32 -} diff --git a/gate/entity/gm/online_user_info.go b/gate/entity/gm/online_user_info.go deleted file mode 100644 index c12c90c0..00000000 --- a/gate/entity/gm/online_user_info.go +++ /dev/null @@ -1,11 +0,0 @@ -package gm - -type OnlineUserList struct { - UserList []*OnlineUserInfo `json:"userList"` -} - -type OnlineUserInfo struct { - Uid uint32 `json:"uid"` - ConvId uint64 `json:"convId"` - Addr string `json:"addr"` -} diff --git a/gate/forward/forward.go b/gate/forward/forward.go deleted file mode 100644 index eed927fb..00000000 --- a/gate/forward/forward.go +++ /dev/null @@ -1,555 +0,0 @@ -package forward - -import ( - "hk4e/pkg/random" - "runtime" - "sync" - "time" - - "hk4e/common/config" - "hk4e/common/region" - "hk4e/gate/entity/gm" - "hk4e/gate/kcp" - "hk4e/gate/net" - "hk4e/pkg/logger" - "hk4e/protocol/cmd" - "hk4e/protocol/proto" -) - -const ( - ConnWaitToken = iota - ConnWaitLogin - ConnAlive - ConnClose -) - -type ClientHeadMeta struct { - seq uint32 -} - -type ForwardManager struct { - dao string - protoMsgInput chan *net.ProtoMsg - protoMsgOutput chan *net.ProtoMsg - netMsgInput chan *cmd.NetMsg - netMsgOutput chan *cmd.NetMsg - // 玩家登录相关 - connStateMap map[uint64]uint8 - connStateMapLock sync.RWMutex - // kcpConv -> userID - convUserIdMap map[uint64]uint32 - convUserIdMapLock sync.RWMutex - // userID -> kcpConv - userIdConvMap map[uint32]uint64 - userIdConvMapLock sync.RWMutex - // kcpConv -> ipAddr - convAddrMap map[uint64]string - convAddrMapLock sync.RWMutex - // kcpConv -> headMeta - convHeadMetaMap map[uint64]*ClientHeadMeta - convHeadMetaMapLock sync.RWMutex - // kcpConv -> seed - convSeedMap map[uint64]uint64 - convSeedMapLock sync.RWMutex - kcpEventInput chan *net.KcpEvent - kcpEventOutput chan *net.KcpEvent - regionCurr *proto.QueryCurrRegionHttpRsp - signRsaKey []byte - encRsaKeyMap map[string][]byte -} - -func NewForwardManager( - protoMsgInput chan *net.ProtoMsg, protoMsgOutput chan *net.ProtoMsg, - kcpEventInput chan *net.KcpEvent, kcpEventOutput chan *net.KcpEvent, - netMsgInput chan *cmd.NetMsg, netMsgOutput chan *cmd.NetMsg) (r *ForwardManager) { - r = new(ForwardManager) - r.protoMsgInput = protoMsgInput - r.protoMsgOutput = protoMsgOutput - r.netMsgInput = netMsgInput - r.netMsgOutput = netMsgOutput - r.connStateMap = make(map[uint64]uint8) - r.convUserIdMap = make(map[uint64]uint32) - r.userIdConvMap = make(map[uint32]uint64) - r.convAddrMap = make(map[uint64]string) - r.convHeadMetaMap = make(map[uint64]*ClientHeadMeta) - r.convSeedMap = make(map[uint64]uint64) - r.kcpEventInput = kcpEventInput - r.kcpEventOutput = kcpEventOutput - return r -} - -func (f *ForwardManager) getHeadMsg(clientSeq uint32) (headMsg *proto.PacketHead) { - headMsg = new(proto.PacketHead) - if clientSeq != 0 { - headMsg.ClientSequenceId = clientSeq - headMsg.SentMs = uint64(time.Now().UnixMilli()) - } - return headMsg -} - -func (f *ForwardManager) kcpEventHandle() { - for { - event := <-f.kcpEventOutput - logger.LOG.Info("rpc manager recv event, ConvId: %v, EventId: %v", event.ConvId, event.EventId) - switch event.EventId { - case net.KcpPacketSendNotify: - // 发包通知 - // 关闭发包监听 - f.kcpEventInput <- &net.KcpEvent{ - ConvId: event.ConvId, - EventId: net.KcpPacketSendListen, - EventMessage: "Disable", - } - seed, exist := f.getSeedByConvId(event.ConvId) - if !exist { - logger.LOG.Error("can not find seed by convId") - continue - } - keyBlock := random.NewKeyBlock(seed) - xorKey := keyBlock.XorKey() - key := make([]byte, 4096) - copy(key, xorKey[:]) - // 改变密钥 - f.kcpEventInput <- &net.KcpEvent{ - ConvId: event.ConvId, - EventId: net.KcpXorKeyChange, - EventMessage: key, - } - case net.KcpConnCloseNotify: - // 连接断开通知 - userId, exist := f.getUserIdByConvId(event.ConvId) - if !exist { - logger.LOG.Error("can not find userId by convId") - continue - } - // 通知GS玩家下线 - netMsg := new(cmd.NetMsg) - netMsg.UserId = userId - netMsg.EventId = cmd.UserOfflineNotify - f.netMsgInput <- netMsg - logger.LOG.Info("send to gs user offline, ConvId: %v, UserId: %v", event.ConvId, netMsg.UserId) - // 删除各种map数据 - f.deleteConnState(event.ConvId) - f.deleteUserIdByConvId(event.ConvId) - currConvId, currExist := f.getConvIdByUserId(userId) - if currExist && currConvId == event.ConvId { - // 防止误删顶号的新连接数据 - f.deleteConvIdByUserId(userId) - } - f.deleteAddrByConvId(event.ConvId) - f.deleteHeadMetaByConvId(event.ConvId) - f.deleteSeedByConvId(event.ConvId) - case net.KcpConnEstNotify: - // 连接建立通知 - addr, ok := event.EventMessage.(string) - if !ok { - logger.LOG.Error("event KcpConnEstNotify msg type error") - continue - } - f.setAddrByConvId(event.ConvId, addr) - case net.KcpConnRttNotify: - // 客户端往返时延通知 - rtt, ok := event.EventMessage.(int32) - if !ok { - logger.LOG.Error("event KcpConnRttNotify msg type error") - continue - } - // 通知GS玩家客户端往返时延 - userId, exist := f.getUserIdByConvId(event.ConvId) - if !exist { - logger.LOG.Error("can not find userId by convId") - continue - } - netMsg := new(cmd.NetMsg) - netMsg.UserId = userId - netMsg.EventId = cmd.ClientRttNotify - netMsg.ClientRtt = uint32(rtt) - f.netMsgInput <- netMsg - case net.KcpConnAddrChangeNotify: - // 客户端网络地址改变通知 - f.convAddrMapLock.Lock() - _, exist := f.convAddrMap[event.ConvId] - if !exist { - f.convAddrMapLock.Unlock() - logger.LOG.Error("conn addr change but conn can not be found") - continue - } - addr := event.EventMessage.(string) - f.convAddrMap[event.ConvId] = addr - f.convAddrMapLock.Unlock() - } - } -} - -func (f *ForwardManager) Start() { - // 读取密钥相关文件 - f.signRsaKey, f.encRsaKeyMap, _ = region.LoadRsaKey() - // region - regionCurr, _, _ := region.InitRegion(config.CONF.Hk4e.KcpAddr, config.CONF.Hk4e.KcpPort) - f.regionCurr = regionCurr - // kcp事件监听 - go f.kcpEventHandle() - go f.recvNetMsgFromGameServer() - // 接收客户端消息 - cpuCoreNum := runtime.NumCPU() - _ = cpuCoreNum * 10 - for i := 0; i < 1; i++ { - go f.sendNetMsgToGameServer() - } -} - -// 发送消息到GS -func (f *ForwardManager) sendNetMsgToGameServer() { - for { - protoMsg := <-f.protoMsgOutput - if protoMsg.HeadMessage == nil { - logger.LOG.Error("recv null head msg: %v", protoMsg) - } - f.setHeadMetaByConvId(protoMsg.ConvId, &ClientHeadMeta{ - seq: protoMsg.HeadMessage.ClientSequenceId, - }) - connState := f.getConnState(protoMsg.ConvId) - // gate本地处理的请求 - switch protoMsg.CmdId { - case cmd.GetPlayerTokenReq: - // 获取玩家token请求 - if connState != ConnWaitToken { - continue - } - getPlayerTokenReq := protoMsg.PayloadMessage.(*proto.GetPlayerTokenReq) - getPlayerTokenRsp := f.getPlayerToken(protoMsg.ConvId, getPlayerTokenReq) - if getPlayerTokenRsp == nil { - continue - } - // 返回数据到客户端 - resp := new(net.ProtoMsg) - resp.ConvId = protoMsg.ConvId - resp.CmdId = cmd.GetPlayerTokenRsp - resp.HeadMessage = f.getHeadMsg(protoMsg.HeadMessage.ClientSequenceId) - resp.PayloadMessage = getPlayerTokenRsp - f.protoMsgInput <- resp - case cmd.PlayerLoginReq: - // 玩家登录请求 - if connState != ConnWaitLogin { - continue - } - playerLoginReq := protoMsg.PayloadMessage.(*proto.PlayerLoginReq) - playerLoginRsp := f.playerLogin(protoMsg.ConvId, playerLoginReq) - if playerLoginRsp == nil { - continue - } - // 返回数据到客户端 - resp := new(net.ProtoMsg) - resp.ConvId = protoMsg.ConvId - resp.CmdId = cmd.PlayerLoginRsp - resp.HeadMessage = f.getHeadMsg(protoMsg.HeadMessage.ClientSequenceId) - resp.PayloadMessage = playerLoginRsp - f.protoMsgInput <- resp - // 登录成功 通知GS初始化相关数据 - userId, exist := f.getUserIdByConvId(protoMsg.ConvId) - if !exist { - logger.LOG.Error("can not find userId by convId") - continue - } - headMeta, exist := f.getHeadMetaByConvId(protoMsg.ConvId) - if !exist { - logger.LOG.Error("can not find client head metadata by convId") - continue - } - netMsg := new(cmd.NetMsg) - netMsg.UserId = userId - netMsg.EventId = cmd.UserLoginNotify - netMsg.ClientSeq = headMeta.seq - f.netMsgInput <- netMsg - logger.LOG.Info("send to gs user login ok, ConvId: %v, UserId: %v", protoMsg.ConvId, netMsg.UserId) - case cmd.SetPlayerBornDataReq: - // 玩家注册请求 - if connState != ConnAlive { - continue - } - userId, exist := f.getUserIdByConvId(protoMsg.ConvId) - if !exist { - logger.LOG.Error("can not find userId by convId") - continue - } - netMsg := new(cmd.NetMsg) - netMsg.UserId = userId - netMsg.EventId = cmd.UserRegNotify - netMsg.CmdId = cmd.SetPlayerBornDataReq - netMsg.ClientSeq = protoMsg.HeadMessage.ClientSequenceId - netMsg.PayloadMessage = protoMsg.PayloadMessage - f.netMsgInput <- netMsg - case cmd.PlayerForceExitReq: - // 玩家退出游戏请求 - if connState != ConnAlive { - continue - } - userId, exist := f.getUserIdByConvId(protoMsg.ConvId) - if !exist { - logger.LOG.Error("can not find userId by convId") - continue - } - f.setConnState(protoMsg.ConvId, ConnClose) - info := new(gm.KickPlayerInfo) - info.UserId = userId - info.Reason = uint32(kcp.EnetClientClose) - f.KickPlayer(info) - case cmd.PingReq: - // ping请求 - if connState != ConnAlive { - continue - } - pingReq := protoMsg.PayloadMessage.(*proto.PingReq) - logger.LOG.Debug("user ping req, data: %v", pingReq.String()) - // 返回数据到客户端 - // TODO 记录客户端最后一次ping时间做超时下线处理 - pingRsp := new(proto.PingRsp) - pingRsp.ClientTime = pingReq.ClientTime - resp := new(net.ProtoMsg) - resp.ConvId = protoMsg.ConvId - resp.CmdId = cmd.PingRsp - resp.HeadMessage = f.getHeadMsg(protoMsg.HeadMessage.ClientSequenceId) - resp.PayloadMessage = pingRsp - f.protoMsgInput <- resp - // 通知GS玩家客户端的本地时钟 - userId, exist := f.getUserIdByConvId(protoMsg.ConvId) - if !exist { - logger.LOG.Error("can not find userId by convId") - continue - } - netMsg := new(cmd.NetMsg) - netMsg.UserId = userId - netMsg.EventId = cmd.ClientTimeNotify - netMsg.ClientTime = pingReq.ClientTime - f.netMsgInput <- netMsg - default: - // 转发到GS - // 未登录禁止访问GS - if connState != ConnAlive { - continue - } - netMsg := new(cmd.NetMsg) - userId, exist := f.getUserIdByConvId(protoMsg.ConvId) - if exist { - netMsg.UserId = userId - } else { - logger.LOG.Error("can not find userId by convId") - continue - } - netMsg.EventId = cmd.NormalMsg - netMsg.CmdId = protoMsg.CmdId - netMsg.ClientSeq = protoMsg.HeadMessage.ClientSequenceId - netMsg.PayloadMessage = protoMsg.PayloadMessage - f.netMsgInput <- netMsg - } - } -} - -// 从GS接收消息 -func (f *ForwardManager) recvNetMsgFromGameServer() { - for { - netMsg := <-f.netMsgOutput - convId, exist := f.getConvIdByUserId(netMsg.UserId) - if !exist { - logger.LOG.Error("can not find convId by userId") - continue - } - if netMsg.EventId == cmd.NormalMsg { - protoMsg := new(net.ProtoMsg) - protoMsg.ConvId = convId - protoMsg.CmdId = netMsg.CmdId - protoMsg.HeadMessage = f.getHeadMsg(netMsg.ClientSeq) - protoMsg.PayloadMessage = netMsg.PayloadMessage - f.protoMsgInput <- protoMsg - } else { - logger.LOG.Error("recv unknown event from game server, event id: %v", netMsg.EventId) - } - } -} - -func (f *ForwardManager) getConnState(convId uint64) uint8 { - f.connStateMapLock.RLock() - connState, connStateExist := f.connStateMap[convId] - f.connStateMapLock.RUnlock() - if !connStateExist { - connState = ConnWaitToken - f.connStateMapLock.Lock() - f.connStateMap[convId] = ConnWaitToken - f.connStateMapLock.Unlock() - } - return connState -} - -func (f *ForwardManager) setConnState(convId uint64, state uint8) { - f.connStateMapLock.Lock() - f.connStateMap[convId] = state - f.connStateMapLock.Unlock() -} - -func (f *ForwardManager) deleteConnState(convId uint64) { - f.connStateMapLock.Lock() - delete(f.connStateMap, convId) - f.connStateMapLock.Unlock() -} - -func (f *ForwardManager) getUserIdByConvId(convId uint64) (userId uint32, exist bool) { - f.convUserIdMapLock.RLock() - userId, exist = f.convUserIdMap[convId] - f.convUserIdMapLock.RUnlock() - return userId, exist -} - -func (f *ForwardManager) setUserIdByConvId(convId uint64, userId uint32) { - f.convUserIdMapLock.Lock() - f.convUserIdMap[convId] = userId - f.convUserIdMapLock.Unlock() -} - -func (f *ForwardManager) deleteUserIdByConvId(convId uint64) { - f.convUserIdMapLock.Lock() - delete(f.convUserIdMap, convId) - f.convUserIdMapLock.Unlock() -} - -func (f *ForwardManager) getConvIdByUserId(userId uint32) (convId uint64, exist bool) { - f.userIdConvMapLock.RLock() - convId, exist = f.userIdConvMap[userId] - f.userIdConvMapLock.RUnlock() - return convId, exist -} - -func (f *ForwardManager) setConvIdByUserId(userId uint32, convId uint64) { - f.userIdConvMapLock.Lock() - f.userIdConvMap[userId] = convId - f.userIdConvMapLock.Unlock() -} - -func (f *ForwardManager) deleteConvIdByUserId(userId uint32) { - f.userIdConvMapLock.Lock() - delete(f.userIdConvMap, userId) - f.userIdConvMapLock.Unlock() -} - -func (f *ForwardManager) getAddrByConvId(convId uint64) (addr string, exist bool) { - f.convAddrMapLock.RLock() - addr, exist = f.convAddrMap[convId] - f.convAddrMapLock.RUnlock() - return addr, exist -} - -func (f *ForwardManager) setAddrByConvId(convId uint64, addr string) { - f.convAddrMapLock.Lock() - f.convAddrMap[convId] = addr - f.convAddrMapLock.Unlock() -} - -func (f *ForwardManager) deleteAddrByConvId(convId uint64) { - f.convAddrMapLock.Lock() - delete(f.convAddrMap, convId) - f.convAddrMapLock.Unlock() -} - -func (f *ForwardManager) getHeadMetaByConvId(convId uint64) (headMeta *ClientHeadMeta, exist bool) { - f.convHeadMetaMapLock.RLock() - headMeta, exist = f.convHeadMetaMap[convId] - f.convHeadMetaMapLock.RUnlock() - return headMeta, exist -} - -func (f *ForwardManager) setHeadMetaByConvId(convId uint64, headMeta *ClientHeadMeta) { - f.convHeadMetaMapLock.Lock() - f.convHeadMetaMap[convId] = headMeta - f.convHeadMetaMapLock.Unlock() -} - -func (f *ForwardManager) deleteHeadMetaByConvId(convId uint64) { - f.convHeadMetaMapLock.Lock() - delete(f.convHeadMetaMap, convId) - f.convHeadMetaMapLock.Unlock() -} - -func (f *ForwardManager) getSeedByConvId(convId uint64) (seed uint64, exist bool) { - f.convSeedMapLock.RLock() - seed, exist = f.convSeedMap[convId] - f.convSeedMapLock.RUnlock() - return seed, exist -} - -func (f *ForwardManager) setSeedByConvId(convId uint64, seed uint64) { - f.convSeedMapLock.Lock() - f.convSeedMap[convId] = seed - f.convSeedMapLock.Unlock() -} - -func (f *ForwardManager) deleteSeedByConvId(convId uint64) { - f.convSeedMapLock.Lock() - delete(f.convSeedMap, convId) - f.convSeedMapLock.Unlock() -} - -// 改变网关开放状态 -func (f *ForwardManager) ChangeGateOpenState(isOpen bool) bool { - f.kcpEventInput <- &net.KcpEvent{ - EventId: net.KcpGateOpenState, - EventMessage: isOpen, - } - logger.LOG.Info("change gate open state to: %v", isOpen) - return true -} - -// 踢出玩家下线 -func (f *ForwardManager) KickPlayer(info *gm.KickPlayerInfo) bool { - if info == nil { - return false - } - convId, exist := f.getConvIdByUserId(info.UserId) - if !exist { - return false - } - f.kcpEventInput <- &net.KcpEvent{ - ConvId: convId, - EventId: net.KcpConnForceClose, - EventMessage: info.Reason, - } - return true -} - -// 获取网关在线玩家信息 -func (f *ForwardManager) GetOnlineUser(uid uint32) (list *gm.OnlineUserList) { - list = &gm.OnlineUserList{ - UserList: make([]*gm.OnlineUserInfo, 0), - } - if uid == 0 { - // 获取全部玩家 - f.convUserIdMapLock.RLock() - f.convAddrMapLock.RLock() - for convId, userId := range f.convUserIdMap { - addr := f.convAddrMap[convId] - info := &gm.OnlineUserInfo{ - Uid: userId, - ConvId: convId, - Addr: addr, - } - list.UserList = append(list.UserList, info) - } - f.convAddrMapLock.RUnlock() - f.convUserIdMapLock.RUnlock() - } else { - // 获取指定uid玩家 - convId, exist := f.getConvIdByUserId(uid) - if !exist { - return list - } - addr, exist := f.getAddrByConvId(convId) - if !exist { - return list - } - info := &gm.OnlineUserInfo{ - Uid: uid, - ConvId: convId, - Addr: addr, - } - list.UserList = append(list.UserList, info) - } - return list -} diff --git a/gate/forward/login_hk4e.go b/gate/forward/login_hk4e.go deleted file mode 100644 index a7ec345e..00000000 --- a/gate/forward/login_hk4e.go +++ /dev/null @@ -1,190 +0,0 @@ -package forward - -import ( - "bytes" - "encoding/base64" - "encoding/binary" - "fmt" - "hk4e/dispatch/controller" - "hk4e/pkg/httpclient" - "hk4e/pkg/random" - "math/rand" - "strconv" - "strings" - - "hk4e/gate/kcp" - "hk4e/gate/net" - "hk4e/pkg/endec" - "hk4e/pkg/logger" - "hk4e/protocol/proto" -) - -func (f *ForwardManager) getPlayerToken(convId uint64, req *proto.GetPlayerTokenReq) (rsp *proto.GetPlayerTokenRsp) { - // TODO 请求sdk验证token - tokenVerifyRsp, err := httpclient.Post[controller.TokenVerifyRsp]("http://127.0.0.1:8080/gate/token/verify", &controller.TokenVerifyReq{ - AccountId: req.AccountUid, - AccountToken: req.AccountToken, - }, "") - if err != nil { - logger.LOG.Error("verify token error: %v", err) - return nil - } - if !tokenVerifyRsp.Valid { - logger.LOG.Error("token error") - return nil - } - // comboToken验证成功 - if tokenVerifyRsp.Forbid { - // 封号通知 - rsp = new(proto.GetPlayerTokenRsp) - rsp.Uid = tokenVerifyRsp.PlayerID - rsp.IsProficientPlayer = true - rsp.Retcode = 21 - rsp.Msg = "FORBID_CHEATING_PLUGINS" - //rsp.BlackUidEndTime = 2051193600 // 2035-01-01 00:00:00 - rsp.BlackUidEndTime = tokenVerifyRsp.ForbidEndTime - rsp.RegPlatform = 3 - rsp.CountryCode = "US" - addr, exist := f.getAddrByConvId(convId) - if !exist { - logger.LOG.Error("can not find addr by convId") - return nil - } - split := strings.Split(addr, ":") - rsp.ClientIpStr = split[0] - return rsp - } - oldConvId, oldExist := f.getConvIdByUserId(tokenVerifyRsp.PlayerID) - if oldExist { - // 顶号 - f.kcpEventInput <- &net.KcpEvent{ - ConvId: oldConvId, - EventId: net.KcpConnForceClose, - EventMessage: uint32(kcp.EnetServerRelogin), - } - } - // 关联玩家uid和连接信息 - f.setUserIdByConvId(convId, tokenVerifyRsp.PlayerID) - f.setConvIdByUserId(tokenVerifyRsp.PlayerID, convId) - f.setConnState(convId, ConnWaitLogin) - // 返回响应 - rsp = new(proto.GetPlayerTokenRsp) - rsp.Uid = tokenVerifyRsp.PlayerID - rsp.AccountUid = req.AccountUid - rsp.Token = req.AccountToken - data := make([]byte, 16+32) - rand.Read(data) - rsp.SecurityCmdBuffer = data[16:] - rsp.ClientVersionRandomKey = fmt.Sprintf("%03x-%012x", data[:3], data[4:16]) - rsp.AccountType = 1 - rsp.IsProficientPlayer = true - rsp.PlatformType = 3 - rsp.ChannelId = 1 - rsp.SubChannelId = 1 - rsp.RegPlatform = 2 - rsp.Birthday = "2000-01-01" - addr, exist := f.getAddrByConvId(convId) - if !exist { - logger.LOG.Error("can not find addr by convId") - return nil - } - split := strings.Split(addr, ":") - rsp.ClientIpStr = split[0] - if req.GetKeyId() != 0 { - logger.LOG.Debug("do hk4e 2.8 rsa logic") - keyId := strconv.Itoa(int(req.GetKeyId())) - encPubPrivKey, exist := f.encRsaKeyMap[keyId] - if !exist { - logger.LOG.Error("can not found key id: %v", keyId) - return - } - pubKey, err := endec.RsaParsePubKeyByPrivKey(encPubPrivKey) - if err != nil { - logger.LOG.Error("parse rsa pub key error: %v", err) - return nil - } - signPrivkey, err := endec.RsaParsePrivKey(f.signRsaKey) - if err != nil { - logger.LOG.Error("parse rsa priv key error: %v", err) - return nil - } - clientSeedBase64 := req.GetClientRandKey() - clientSeedEnc, err := base64.StdEncoding.DecodeString(clientSeedBase64) - if err != nil { - logger.LOG.Error("parse client seed base64 error: %v", err) - return nil - } - clientSeed, err := endec.RsaDecrypt(clientSeedEnc, signPrivkey) - if err != nil { - logger.LOG.Error("rsa dec error: %v", err) - return rsp - } - clientSeedUint64 := uint64(0) - err = binary.Read(bytes.NewReader(clientSeed), binary.BigEndian, &clientSeedUint64) - if err != nil { - logger.LOG.Error("parse client seed to uint64 error: %v", err) - return rsp - } - timeRand := random.GetTimeRand() - serverSeedUint64 := timeRand.Uint64() - f.setSeedByConvId(convId, serverSeedUint64) - seedUint64 := serverSeedUint64 ^ clientSeedUint64 - seedBuf := new(bytes.Buffer) - err = binary.Write(seedBuf, binary.BigEndian, seedUint64) - if err != nil { - logger.LOG.Error("conv seed uint64 to bytes error: %v", err) - return rsp - } - seed := seedBuf.Bytes() - seedEnc, err := endec.RsaEncrypt(seed, pubKey) - if err != nil { - logger.LOG.Error("rsa enc error: %v", err) - return rsp - } - seedSign, err := endec.RsaSign(seed, signPrivkey) - if err != nil { - logger.LOG.Error("rsa sign error: %v", err) - return rsp - } - rsp.KeyId = req.KeyId - rsp.ServerRandKey = base64.StdEncoding.EncodeToString(seedEnc) - rsp.Sign = base64.StdEncoding.EncodeToString(seedSign) - // 开启发包监听 - f.kcpEventInput <- &net.KcpEvent{ - ConvId: convId, - EventId: net.KcpPacketSendListen, - EventMessage: "Enable", - } - } - return rsp -} - -func (f *ForwardManager) playerLogin(convId uint64, req *proto.PlayerLoginReq) (rsp *proto.PlayerLoginRsp) { - tokenValid := true - if !tokenValid { - logger.LOG.Error("token error") - return nil - } - // token验证成功 - f.setConnState(convId, ConnAlive) - // 返回响应 - rsp = new(proto.PlayerLoginRsp) - rsp.IsUseAbilityHash = true - rsp.AbilityHashCode = -228935105 - rsp.GameBiz = "hk4e_cn" - rsp.IsScOpen = false - rsp.RegisterCps = "taptap" - rsp.CountryCode = "CN" - rsp.Birthday = "2000-01-01" - rsp.TotalTickTime = 1185941.871788 - - rsp.ClientDataVersion = f.regionCurr.RegionInfo.ClientDataVersion - rsp.ClientSilenceDataVersion = f.regionCurr.RegionInfo.ClientSilenceDataVersion - rsp.ClientMd5 = f.regionCurr.RegionInfo.ClientDataMd5 - rsp.ClientSilenceMd5 = f.regionCurr.RegionInfo.ClientSilenceDataMd5 - rsp.ResVersionConfig = f.regionCurr.RegionInfo.ResVersionConfig - rsp.ClientVersionSuffix = f.regionCurr.RegionInfo.ClientVersionSuffix - rsp.ClientSilenceVersionSuffix = f.regionCurr.RegionInfo.ClientSilenceVersionSuffix - - return rsp -} diff --git a/gate/mq/mq.go b/gate/mq/mq.go index e5283599..3ab90129 100644 --- a/gate/mq/mq.go +++ b/gate/mq/mq.go @@ -1,11 +1,8 @@ package mq import ( - "encoding/binary" "hk4e/common/config" - "hk4e/gate/net" "hk4e/pkg/logger" - "hk4e/pkg/random" "hk4e/protocol/cmd" "github.com/nats-io/nats.go" @@ -21,7 +18,7 @@ type MessageQueue struct { cmdProtoMap *cmd.CmdProtoMap } -func NewMessageQueue(netMsgInput chan *cmd.NetMsg, netMsgOutput chan *cmd.NetMsg, kcpEventInput chan *net.KcpEvent) (r *MessageQueue) { +func NewMessageQueue(netMsgInput chan *cmd.NetMsg, netMsgOutput chan *cmd.NetMsg) (r *MessageQueue) { r = new(MessageQueue) conn, err := nats.Connect(config.CONF.MQ.NatsUrl) if err != nil { @@ -35,30 +32,6 @@ func NewMessageQueue(netMsgInput chan *cmd.NetMsg, netMsgOutput chan *cmd.NetMsg logger.LOG.Error("nats subscribe error: %v", err) return nil } - - // TODO 临时写一下用来传递新的密钥后面改RPC - keyNatsMsgChan := make(chan *nats.Msg, 10000) - _, err = r.natsConn.ChanSubscribe("GATE_KEY_HK4E", keyNatsMsgChan) - if err != nil { - logger.LOG.Error("nats subscribe error: %v", err) - return nil - } - go func() { - for { - natsMsg := <-keyNatsMsgChan - dispatchEc2bSeed := binary.BigEndian.Uint64(natsMsg.Data) - logger.LOG.Debug("recv new dispatch ec2b seed: %v", dispatchEc2bSeed) - gateDispatchEc2b := random.NewEc2b() - gateDispatchEc2b.SetSeed(dispatchEc2bSeed) - gateDispatchXorKey := gateDispatchEc2b.XorKey() - // 改变密钥 - kcpEventInput <- &net.KcpEvent{ - EventId: net.KcpDispatchKeyChange, - EventMessage: gateDispatchXorKey, - } - } - }() - r.netMsgInput = netMsgInput r.netMsgOutput = netMsgOutput r.cmdProtoMap = cmd.NewCmdProtoMap() diff --git a/gate/net/forward.go b/gate/net/forward.go new file mode 100644 index 00000000..49cd66e0 --- /dev/null +++ b/gate/net/forward.go @@ -0,0 +1,361 @@ +package net + +import ( + "bytes" + "encoding/base64" + "encoding/binary" + "fmt" + "hk4e/dispatch/controller" + "hk4e/gate/kcp" + "hk4e/pkg/endec" + "hk4e/pkg/httpclient" + "hk4e/pkg/logger" + "hk4e/pkg/random" + "hk4e/protocol/cmd" + "hk4e/protocol/proto" + "math/rand" + "strconv" + "strings" + "time" +) + +const ( + ConnWaitToken = iota + ConnWaitLogin + ConnAlive + ConnClose +) + +// 发送消息到GS +func (k *KcpConnectManager) recvMsgHandle(protoMsg *ProtoMsg, session *Session) { + userId := session.userId + headMeta := session.headMeta + connState := session.connState + if protoMsg.HeadMessage == nil { + logger.LOG.Error("recv null head msg: %v", protoMsg) + } + headMeta.seq = protoMsg.HeadMessage.ClientSequenceId + // gate本地处理的请求 + switch protoMsg.CmdId { + case cmd.GetPlayerTokenReq: + // 获取玩家token请求 + if connState != ConnWaitToken { + return + } + getPlayerTokenReq := protoMsg.PayloadMessage.(*proto.GetPlayerTokenReq) + getPlayerTokenRsp := k.getPlayerToken(getPlayerTokenReq, session) + if getPlayerTokenRsp == nil { + return + } + // 返回数据到客户端 + rsp := new(ProtoMsg) + rsp.ConvId = protoMsg.ConvId + rsp.CmdId = cmd.GetPlayerTokenRsp + rsp.HeadMessage = k.getHeadMsg(protoMsg.HeadMessage.ClientSequenceId) + rsp.PayloadMessage = getPlayerTokenRsp + k.localMsgOutput <- rsp + case cmd.PlayerLoginReq: + // 玩家登录请求 + if connState != ConnWaitLogin { + return + } + playerLoginReq := protoMsg.PayloadMessage.(*proto.PlayerLoginReq) + playerLoginRsp := k.playerLogin(playerLoginReq, session) + if playerLoginRsp == nil { + return + } + // 返回数据到客户端 + rsp := new(ProtoMsg) + rsp.ConvId = protoMsg.ConvId + rsp.CmdId = cmd.PlayerLoginRsp + rsp.HeadMessage = k.getHeadMsg(protoMsg.HeadMessage.ClientSequenceId) + rsp.PayloadMessage = playerLoginRsp + k.localMsgOutput <- rsp + // 登录成功 通知GS初始化相关数据 + netMsg := new(cmd.NetMsg) + netMsg.UserId = userId + netMsg.EventId = cmd.UserLoginNotify + netMsg.ClientSeq = headMeta.seq + k.netMsgInput <- netMsg + logger.LOG.Info("send to gs user login ok, ConvId: %v, UserId: %v", protoMsg.ConvId, netMsg.UserId) + case cmd.SetPlayerBornDataReq: + // 玩家注册请求 + if connState != ConnAlive { + return + } + netMsg := new(cmd.NetMsg) + netMsg.UserId = userId + netMsg.EventId = cmd.UserRegNotify + netMsg.CmdId = cmd.SetPlayerBornDataReq + netMsg.ClientSeq = protoMsg.HeadMessage.ClientSequenceId + netMsg.PayloadMessage = protoMsg.PayloadMessage + k.netMsgInput <- netMsg + case cmd.PlayerForceExitReq: + // 玩家退出游戏请求 + if connState != ConnAlive { + return + } + k.kcpEventInput <- &KcpEvent{ + ConvId: protoMsg.ConvId, + EventId: KcpConnForceClose, + EventMessage: uint32(kcp.EnetClientClose), + } + case cmd.PingReq: + // ping请求 + if connState != ConnAlive { + return + } + pingReq := protoMsg.PayloadMessage.(*proto.PingReq) + logger.LOG.Debug("user ping req, data: %v", pingReq.String()) + // 返回数据到客户端 + // TODO 记录客户端最后一次ping时间做超时下线处理 + pingRsp := new(proto.PingRsp) + pingRsp.ClientTime = pingReq.ClientTime + rsp := new(ProtoMsg) + rsp.ConvId = protoMsg.ConvId + rsp.CmdId = cmd.PingRsp + rsp.HeadMessage = k.getHeadMsg(protoMsg.HeadMessage.ClientSequenceId) + rsp.PayloadMessage = pingRsp + k.localMsgOutput <- rsp + // 通知GS玩家客户端的本地时钟 + netMsg := new(cmd.NetMsg) + netMsg.UserId = userId + netMsg.EventId = cmd.ClientTimeNotify + netMsg.ClientTime = pingReq.ClientTime + k.netMsgInput <- netMsg + // RTT + logger.LOG.Debug("convId: %v, RTO: %v, SRTT: %v, RTTVar: %v", protoMsg.ConvId, session.conn.GetRTO(), session.conn.GetSRTT(), session.conn.GetSRTTVar()) + // 客户端往返时延通知 + rtt := session.conn.GetSRTT() + // 通知GS玩家客户端往返时延 + netMsg = new(cmd.NetMsg) + netMsg.UserId = userId + netMsg.EventId = cmd.ClientRttNotify + netMsg.ClientRtt = uint32(rtt) + k.netMsgInput <- netMsg + default: + // 转发到GS + // 未登录禁止访问GS + if connState != ConnAlive { + return + } + netMsg := new(cmd.NetMsg) + netMsg.UserId = userId + netMsg.EventId = cmd.NormalMsg + netMsg.CmdId = protoMsg.CmdId + netMsg.ClientSeq = protoMsg.HeadMessage.ClientSequenceId + netMsg.PayloadMessage = protoMsg.PayloadMessage + k.netMsgInput <- netMsg + } +} + +// 从GS接收消息 +func (k *KcpConnectManager) sendMsgHandle() { + logger.LOG.Debug("send msg handle start") + kcpRawSendChanMap := make(map[uint64]chan *ProtoMsg) + userIdConvMap := make(map[uint32]uint64) + sendToClientFn := func(protoMsg *ProtoMsg) { + // 分发到每个连接具体的发送协程 + kcpRawSendChan := kcpRawSendChanMap[protoMsg.ConvId] + if kcpRawSendChan != nil { + select { + case kcpRawSendChan <- protoMsg: + default: + logger.LOG.Error("kcpRawSendChan is full, convId: %v", protoMsg.ConvId) + } + } else { + logger.LOG.Error("kcpRawSendChan is nil, convId: %v", protoMsg.ConvId) + } + } + for { + select { + case session := <-k.createSessionChan: + kcpRawSendChanMap[session.conn.GetConv()] = session.kcpRawSendChan + userIdConvMap[session.userId] = session.conn.GetConv() + case session := <-k.destroySessionChan: + delete(kcpRawSendChanMap, session.conn.GetConv()) + delete(userIdConvMap, session.userId) + close(session.kcpRawSendChan) + case protoMsg := <-k.localMsgOutput: + sendToClientFn(protoMsg) + case netMsg := <-k.netMsgOutput: + convId, exist := userIdConvMap[netMsg.UserId] + if !exist { + logger.LOG.Error("can not find convId by userId") + continue + } + if netMsg.EventId == cmd.NormalMsg { + protoMsg := new(ProtoMsg) + protoMsg.ConvId = convId + protoMsg.CmdId = netMsg.CmdId + protoMsg.HeadMessage = k.getHeadMsg(netMsg.ClientSeq) + protoMsg.PayloadMessage = netMsg.PayloadMessage + sendToClientFn(protoMsg) + } else { + logger.LOG.Error("recv unknown event from game server, event id: %v", netMsg.EventId) + } + } + } +} + +func (k *KcpConnectManager) getHeadMsg(clientSeq uint32) (headMsg *proto.PacketHead) { + headMsg = new(proto.PacketHead) + if clientSeq != 0 { + headMsg.ClientSequenceId = clientSeq + headMsg.SentMs = uint64(time.Now().UnixMilli()) + } + return headMsg +} + +func (k *KcpConnectManager) getPlayerToken(req *proto.GetPlayerTokenReq, session *Session) (rsp *proto.GetPlayerTokenRsp) { + tokenVerifyRsp, err := httpclient.Post[controller.TokenVerifyRsp]("http://127.0.0.1:8080/gate/token/verify", &controller.TokenVerifyReq{ + AccountId: req.AccountUid, + AccountToken: req.AccountToken, + }, "") + if err != nil { + logger.LOG.Error("verify token error: %v", err) + return nil + } + if !tokenVerifyRsp.Valid { + logger.LOG.Error("token error") + return nil + } + // comboToken验证成功 + if tokenVerifyRsp.Forbid { + // 封号通知 + rsp = new(proto.GetPlayerTokenRsp) + rsp.Uid = tokenVerifyRsp.PlayerID + rsp.IsProficientPlayer = true + rsp.Retcode = 21 + rsp.Msg = "FORBID_CHEATING_PLUGINS" + rsp.BlackUidEndTime = tokenVerifyRsp.ForbidEndTime + if rsp.BlackUidEndTime == 0 { + rsp.BlackUidEndTime = 2051193600 // 2035-01-01 00:00:00 + } + rsp.RegPlatform = 3 + rsp.CountryCode = "US" + addr := session.conn.RemoteAddr().String() + split := strings.Split(addr, ":") + rsp.ClientIpStr = split[0] + return rsp + } + oldSession := k.GetSessionByUserId(tokenVerifyRsp.PlayerID) + if oldSession != nil { + // 顶号 + k.kcpEventInput <- &KcpEvent{ + ConvId: oldSession.conn.GetConv(), + EventId: KcpConnForceClose, + EventMessage: uint32(kcp.EnetServerRelogin), + } + } + // 关联玩家uid和连接信息 + session.userId = tokenVerifyRsp.PlayerID + session.connState = ConnWaitLogin + k.SetSession(session, session.conn.GetConv(), session.userId) + k.createSessionChan <- session + // 返回响应 + rsp = new(proto.GetPlayerTokenRsp) + rsp.Uid = tokenVerifyRsp.PlayerID + rsp.AccountUid = req.AccountUid + rsp.Token = req.AccountToken + data := make([]byte, 16+32) + rand.Read(data) + rsp.SecurityCmdBuffer = data[16:] + rsp.ClientVersionRandomKey = fmt.Sprintf("%03x-%012x", data[:3], data[4:16]) + rsp.AccountType = 1 + rsp.IsProficientPlayer = true + rsp.PlatformType = 3 + rsp.ChannelId = 1 + rsp.SubChannelId = 1 + rsp.RegPlatform = 2 + rsp.Birthday = "2000-01-01" + addr := session.conn.RemoteAddr().String() + split := strings.Split(addr, ":") + rsp.ClientIpStr = split[0] + if req.GetKeyId() != 0 { + logger.LOG.Debug("do hk4e 2.8 rsa logic") + keyId := strconv.Itoa(int(req.GetKeyId())) + encPubPrivKey, exist := k.encRsaKeyMap[keyId] + if !exist { + logger.LOG.Error("can not found key id: %v", keyId) + return + } + pubKey, err := endec.RsaParsePubKeyByPrivKey(encPubPrivKey) + if err != nil { + logger.LOG.Error("parse rsa pub key error: %v", err) + return nil + } + signPrivkey, err := endec.RsaParsePrivKey(k.signRsaKey) + if err != nil { + logger.LOG.Error("parse rsa priv key error: %v", err) + return nil + } + clientSeedBase64 := req.GetClientRandKey() + clientSeedEnc, err := base64.StdEncoding.DecodeString(clientSeedBase64) + if err != nil { + logger.LOG.Error("parse client seed base64 error: %v", err) + return nil + } + clientSeed, err := endec.RsaDecrypt(clientSeedEnc, signPrivkey) + if err != nil { + logger.LOG.Error("rsa dec error: %v", err) + return rsp + } + clientSeedUint64 := uint64(0) + err = binary.Read(bytes.NewReader(clientSeed), binary.BigEndian, &clientSeedUint64) + if err != nil { + logger.LOG.Error("parse client seed to uint64 error: %v", err) + return rsp + } + timeRand := random.GetTimeRand() + serverSeedUint64 := timeRand.Uint64() + session.seed = serverSeedUint64 + session.changeXorKey = true + seedUint64 := serverSeedUint64 ^ clientSeedUint64 + seedBuf := new(bytes.Buffer) + err = binary.Write(seedBuf, binary.BigEndian, seedUint64) + if err != nil { + logger.LOG.Error("conv seed uint64 to bytes error: %v", err) + return rsp + } + seed := seedBuf.Bytes() + seedEnc, err := endec.RsaEncrypt(seed, pubKey) + if err != nil { + logger.LOG.Error("rsa enc error: %v", err) + return rsp + } + seedSign, err := endec.RsaSign(seed, signPrivkey) + if err != nil { + logger.LOG.Error("rsa sign error: %v", err) + return rsp + } + rsp.KeyId = req.KeyId + rsp.ServerRandKey = base64.StdEncoding.EncodeToString(seedEnc) + rsp.Sign = base64.StdEncoding.EncodeToString(seedSign) + } + return rsp +} + +func (k *KcpConnectManager) playerLogin(req *proto.PlayerLoginReq, session *Session) (rsp *proto.PlayerLoginRsp) { + logger.LOG.Debug("player login, info: %v", req.String()) + // TODO 验证token + session.connState = ConnAlive + // 返回响应 + rsp = new(proto.PlayerLoginRsp) + rsp.IsUseAbilityHash = true + rsp.AbilityHashCode = -228935105 + rsp.GameBiz = "hk4e_cn" + rsp.IsScOpen = false + rsp.RegisterCps = "taptap" + rsp.CountryCode = "CN" + rsp.Birthday = "2000-01-01" + rsp.TotalTickTime = 1185941.871788 + rsp.ClientDataVersion = k.regionCurr.RegionInfo.ClientDataVersion + rsp.ClientSilenceDataVersion = k.regionCurr.RegionInfo.ClientSilenceDataVersion + rsp.ClientMd5 = k.regionCurr.RegionInfo.ClientDataMd5 + rsp.ClientSilenceMd5 = k.regionCurr.RegionInfo.ClientSilenceDataMd5 + rsp.ResVersionConfig = k.regionCurr.RegionInfo.ResVersionConfig + rsp.ClientVersionSuffix = k.regionCurr.RegionInfo.ClientVersionSuffix + rsp.ClientSilenceVersionSuffix = k.regionCurr.RegionInfo.ClientSilenceVersionSuffix + return rsp +} diff --git a/gate/net/kcp_connect_manager.go b/gate/net/kcp_connect_manager.go index 4388ed2f..3bd0418c 100644 --- a/gate/net/kcp_connect_manager.go +++ b/gate/net/kcp_connect_manager.go @@ -3,6 +3,11 @@ package net import ( "bytes" "encoding/binary" + "hk4e/common/region" + "hk4e/dispatch/controller" + "hk4e/pkg/httpclient" + "hk4e/protocol/cmd" + "hk4e/protocol/proto" "strconv" "sync" "time" @@ -14,152 +19,158 @@ import ( ) type KcpConnectManager struct { - openState bool - connMap map[uint64]*kcp.UDPSession - connMapLock sync.RWMutex - protoMsgInput chan *ProtoMsg - protoMsgOutput chan *ProtoMsg - kcpEventInput chan *KcpEvent - kcpEventOutput chan *KcpEvent - // 发送协程分发 - kcpRawSendChanMap map[uint64]chan *ProtoMsg - kcpRawSendChanMapLock sync.RWMutex - // 收包发包监听标志 - kcpRecvListenMap map[uint64]bool - kcpRecvListenMapLock sync.RWMutex - kcpSendListenMap map[uint64]bool - kcpSendListenMapLock sync.RWMutex - // key - dispatchKey []byte - dispatchKeyLock sync.RWMutex - kcpKeyMap map[uint64][]byte - kcpKeyMapLock sync.RWMutex - // conv短时间内唯一生成 - convGenMap map[uint64]int64 - convGenMapLock sync.RWMutex + openState bool + sessionConvIdMap map[uint64]*Session + sessionUserIdMap map[uint32]*Session + sessionMapLock sync.RWMutex + kcpEventInput chan *KcpEvent + kcpEventOutput chan *KcpEvent + cmdProtoMap *cmd.CmdProtoMap + netMsgInput chan *cmd.NetMsg + netMsgOutput chan *cmd.NetMsg + localMsgOutput chan *ProtoMsg + createSessionChan chan *Session + destroySessionChan chan *Session + // 密钥相关 + dispatchKey []byte + regionCurr *proto.QueryCurrRegionHttpRsp + signRsaKey []byte + encRsaKeyMap map[string][]byte } -func NewKcpConnectManager(protoMsgInput chan *ProtoMsg, protoMsgOutput chan *ProtoMsg, - kcpEventInput chan *KcpEvent, kcpEventOutput chan *KcpEvent) (r *KcpConnectManager) { +func NewKcpConnectManager(netMsgInput chan *cmd.NetMsg, netMsgOutput chan *cmd.NetMsg) (r *KcpConnectManager) { r = new(KcpConnectManager) r.openState = true - r.connMap = make(map[uint64]*kcp.UDPSession) - r.protoMsgInput = protoMsgInput - r.protoMsgOutput = protoMsgOutput - r.kcpEventInput = kcpEventInput - r.kcpEventOutput = kcpEventOutput - r.kcpRawSendChanMap = make(map[uint64]chan *ProtoMsg) - r.kcpRecvListenMap = make(map[uint64]bool) - r.kcpSendListenMap = make(map[uint64]bool) - r.kcpKeyMap = make(map[uint64][]byte) - r.convGenMap = make(map[uint64]int64) + r.sessionConvIdMap = make(map[uint64]*Session) + r.sessionUserIdMap = make(map[uint32]*Session) + r.kcpEventInput = make(chan *KcpEvent, 1000) + r.kcpEventOutput = make(chan *KcpEvent, 1000) + r.cmdProtoMap = cmd.NewCmdProtoMap() + r.netMsgInput = netMsgInput + r.netMsgOutput = netMsgOutput + r.localMsgOutput = make(chan *ProtoMsg, 1000) + r.createSessionChan = make(chan *Session, 1000) + r.destroySessionChan = make(chan *Session, 1000) return r } func (k *KcpConnectManager) Start() { - go func() { - // key - k.dispatchKey = make([]byte, 4096) - // kcp - port := strconv.Itoa(int(config.CONF.Hk4e.KcpPort)) - listener, err := kcp.ListenWithOptions(config.CONF.Hk4e.KcpAddr+":"+port, nil, 0, 0) - if err != nil { - logger.LOG.Error("listen kcp err: %v", err) - return - } else { - go k.enetHandle(listener) - go k.chanSendHandle() - go k.eventHandle() - for { - conn, err := listener.AcceptKCP() - if err != nil { - logger.LOG.Error("accept kcp err: %v", err) - return - } - if k.openState == false { - _ = conn.Close() - continue - } - conn.SetACKNoDelay(true) - conn.SetWriteDelay(false) - convId := conn.GetConv() - logger.LOG.Debug("client connect, convId: %v", convId) - // 连接建立成功通知 - k.kcpEventOutput <- &KcpEvent{ - ConvId: convId, - EventId: KcpConnEstNotify, - EventMessage: conn.RemoteAddr().String(), - } - k.connMapLock.Lock() - k.connMap[convId] = conn - k.connMapLock.Unlock() - k.kcpKeyMapLock.Lock() - k.dispatchKeyLock.RLock() - k.kcpKeyMap[convId] = k.dispatchKey - k.dispatchKeyLock.RUnlock() - k.kcpKeyMapLock.Unlock() - go k.recvHandle(convId) - kcpRawSendChan := make(chan *ProtoMsg, 10000) - k.kcpRawSendChanMapLock.Lock() - k.kcpRawSendChanMap[convId] = kcpRawSendChan - k.kcpRawSendChanMapLock.Unlock() - go k.sendHandle(convId, kcpRawSendChan) - go k.rttMonitor(convId) - } - } - }() - go k.clearDeadConv() + // 读取密钥相关文件 + k.signRsaKey, k.encRsaKeyMap, _ = region.LoadRsaKey() + // region + regionCurr, _, _ := region.InitRegion(config.CONF.Hk4e.KcpAddr, config.CONF.Hk4e.KcpPort) + k.regionCurr = regionCurr + // key + dispatchEc2bSeedRsp, err := httpclient.Get[controller.DispatchEc2bSeedRsp]("http://127.0.0.1:8080/dispatch/ec2b/seed", "") + if err != nil { + logger.LOG.Error("get dispatch ec2b seed error: %v", err) + return + } + dispatchEc2bSeed, err := strconv.ParseUint(dispatchEc2bSeedRsp.Seed, 10, 64) + if err != nil { + logger.LOG.Error("parse dispatch ec2b seed error: %v", err) + return + } + logger.LOG.Debug("get dispatch ec2b seed: %v", dispatchEc2bSeed) + gateDispatchEc2b := random.NewEc2b() + gateDispatchEc2b.SetSeed(dispatchEc2bSeed) + k.dispatchKey = gateDispatchEc2b.XorKey() + // kcp + port := strconv.Itoa(int(config.CONF.Hk4e.KcpPort)) + listener, err := kcp.ListenWithOptions(config.CONF.Hk4e.KcpAddr+":"+port, nil, 0, 0) + if err != nil { + logger.LOG.Error("listen kcp err: %v", err) + return + } + go k.enetHandle(listener) + go k.eventHandle() + go k.sendMsgHandle() + go k.acceptHandle(listener) } -func (k *KcpConnectManager) clearDeadConv() { - ticker := time.NewTicker(time.Minute) +func (k *KcpConnectManager) acceptHandle(listener *kcp.Listener) { + logger.LOG.Debug("accept handle start") for { - k.convGenMapLock.Lock() - now := time.Now().UnixNano() - oldConvList := make([]uint64, 0) - for conv, timestamp := range k.convGenMap { - if now-timestamp > int64(time.Hour) { - oldConvList = append(oldConvList, conv) - } + conn, err := listener.AcceptKCP() + if err != nil { + logger.LOG.Error("accept kcp err: %v", err) + return } - delConvList := make([]uint64, 0) - k.connMapLock.RLock() - for _, conv := range oldConvList { - _, exist := k.connMap[conv] - if !exist { - delConvList = append(delConvList, conv) - delete(k.convGenMap, conv) - } + if k.openState == false { + _ = conn.Close() + continue + } + conn.SetACKNoDelay(true) + conn.SetWriteDelay(false) + convId := conn.GetConv() + logger.LOG.Debug("client connect, convId: %v", convId) + kcpRawSendChan := make(chan *ProtoMsg, 1000) + session := &Session{ + conn: conn, + connState: ConnWaitToken, + userId: 0, + headMeta: &ClientHeadMeta{ + seq: 0, + }, + kcpRawSendChan: kcpRawSendChan, + seed: 0, + xorKey: k.dispatchKey, + changeXorKey: false, + } + go k.recvHandle(session) + go k.sendHandle(session) + // 连接建立成功通知 + k.kcpEventOutput <- &KcpEvent{ + ConvId: convId, + EventId: KcpConnEstNotify, + EventMessage: conn.RemoteAddr().String(), } - k.connMapLock.RUnlock() - k.convGenMapLock.Unlock() - logger.LOG.Info("clean dead conv list: %v", delConvList) - <-ticker.C } } func (k *KcpConnectManager) enetHandle(listener *kcp.Listener) { + logger.LOG.Debug("enet handle start") + // conv短时间内唯一生成 + convGenMap := make(map[uint64]int64) for { enetNotify := <-listener.EnetNotify logger.LOG.Info("[Enet Notify], addr: %v, conv: %v, conn: %v, enet: %v", enetNotify.Addr, enetNotify.ConvId, enetNotify.ConnType, enetNotify.EnetType) switch enetNotify.ConnType { case kcp.ConnEnetSyn: if enetNotify.EnetType == kcp.EnetClientConnectKey { + // 清理老旧的conv + now := time.Now().UnixNano() + oldConvList := make([]uint64, 0) + for conv, timestamp := range convGenMap { + if now-timestamp > int64(time.Hour) { + oldConvList = append(oldConvList, conv) + } + } + delConvList := make([]uint64, 0) + k.sessionMapLock.RLock() + for _, conv := range oldConvList { + _, exist := k.sessionConvIdMap[conv] + if !exist { + delConvList = append(delConvList, conv) + delete(convGenMap, conv) + } + } + k.sessionMapLock.RUnlock() + logger.LOG.Info("clean dead conv list: %v", delConvList) + // 生成没用过的conv var conv uint64 - k.convGenMapLock.Lock() for { convData := random.GetRandomByte(8) convDataBuffer := bytes.NewBuffer(convData) _ = binary.Read(convDataBuffer, binary.LittleEndian, &conv) - _, exist := k.convGenMap[conv] + _, exist := convGenMap[conv] if exist { continue } else { - k.convGenMap[conv] = time.Now().UnixNano() + convGenMap[conv] = time.Now().UnixNano() break } } - k.convGenMapLock.Unlock() listener.SendEnetNotifyToClient(&kcp.Enet{ Addr: enetNotify.Addr, ConvId: conv, @@ -169,7 +180,16 @@ func (k *KcpConnectManager) enetHandle(listener *kcp.Listener) { } case kcp.ConnEnetEst: case kcp.ConnEnetFin: - k.closeKcpConn(enetNotify.ConvId, enetNotify.EnetType) + session := k.GetSessionByConvId(enetNotify.ConvId) + if session == nil { + logger.LOG.Error("session not exist, convId: %v", enetNotify.ConvId) + continue + } + session.conn.SendEnetNotify(&kcp.Enet{ + ConnType: kcp.ConnEnetFin, + EnetType: enetNotify.EnetType, + }) + _ = session.conn.Close() case kcp.ConnEnetAddrChange: // 连接地址改变通知 k.kcpEventOutput <- &KcpEvent{ @@ -182,48 +202,52 @@ func (k *KcpConnectManager) enetHandle(listener *kcp.Listener) { } } -func (k *KcpConnectManager) chanSendHandle() { - // 分发到每个连接具体的发送协程 - for { - protoMsg := <-k.protoMsgInput - k.kcpRawSendChanMapLock.RLock() - kcpRawSendChan := k.kcpRawSendChanMap[protoMsg.ConvId] - k.kcpRawSendChanMapLock.RUnlock() - if kcpRawSendChan != nil { - select { - case kcpRawSendChan <- protoMsg: - default: - logger.LOG.Error("kcpRawSendChan is full, convId: %v", protoMsg.ConvId) - } - } else { - logger.LOG.Error("kcpRawSendChan is nil, convId: %v", protoMsg.ConvId) - } - } +type ClientHeadMeta struct { + seq uint32 } -func (k *KcpConnectManager) recvHandle(convId uint64) { +type Session struct { + conn *kcp.UDPSession + connState uint8 + userId uint32 + headMeta *ClientHeadMeta + kcpRawSendChan chan *ProtoMsg + seed uint64 + xorKey []byte + changeXorKey bool +} + +func (k *KcpConnectManager) recvHandle(session *Session) { + logger.LOG.Debug("recv handle start") // 接收 - k.connMapLock.RLock() - conn := k.connMap[convId] - k.connMapLock.RUnlock() + conn := session.conn + convId := conn.GetConv() pktFreqLimitCounter := 0 pktFreqLimitTimer := time.Now().UnixNano() - protoEnDecode := NewProtoEnDecode() recvBuf := make([]byte, conn.GetMaxPayloadLen()) for { - _ = conn.SetReadDeadline(time.Now().Add(time.Second * 30)) + _ = conn.SetReadDeadline(time.Now().Add(time.Second * 15)) recvLen, err := conn.Read(recvBuf) if err != nil { logger.LOG.Error("exit recv loop, conn read err: %v, convId: %v", err, convId) - k.closeKcpConn(convId, kcp.EnetServerKick) + k.closeKcpConn(session, kcp.EnetServerKick) break } + if session.changeXorKey { + session.changeXorKey = false + keyBlock := random.NewKeyBlock(session.seed) + xorKey := keyBlock.XorKey() + key := make([]byte, 4096) + copy(key, xorKey[:]) + session.xorKey = key + } + // 收包频率限制 pktFreqLimitCounter++ now := time.Now().UnixNano() if now-pktFreqLimitTimer > int64(time.Second) { - if pktFreqLimitCounter > 1000 { + if pktFreqLimitCounter > 100 { logger.LOG.Error("exit recv loop, client packet send freq too high, convId: %v, pps: %v", convId, pktFreqLimitCounter) - k.closeKcpConn(convId, kcp.EnetPacketFreqTooHigh) + k.closeKcpConn(session, kcp.EnetPacketFreqTooHigh) break } else { pktFreqLimitCounter = 0 @@ -231,141 +255,132 @@ func (k *KcpConnectManager) recvHandle(convId uint64) { pktFreqLimitTimer = now } recvData := recvBuf[:recvLen] - k.kcpRecvListenMapLock.RLock() - flag := k.kcpRecvListenMap[convId] - k.kcpRecvListenMapLock.RUnlock() - if flag { - // 收包通知 - //recvMsg := make([]byte, len(recvData)) - //copy(recvMsg, recvData) - k.kcpEventOutput <- &KcpEvent{ - ConvId: convId, - EventId: KcpPacketRecvNotify, - EventMessage: recvData, - } - } kcpMsgList := make([]*KcpMsg, 0) - k.decodeBinToPayload(recvData, convId, &kcpMsgList) + k.decodeBinToPayload(recvData, convId, &kcpMsgList, session.xorKey) for _, v := range kcpMsgList { - protoMsgList := protoEnDecode.protoDecode(v) + protoMsgList := k.protoDecode(v) for _, vv := range protoMsgList { - k.protoMsgOutput <- vv + k.recvMsgHandle(vv, session) } } } } -func (k *KcpConnectManager) sendHandle(convId uint64, kcpRawSendChan chan *ProtoMsg) { +func (k *KcpConnectManager) sendHandle(session *Session) { + logger.LOG.Debug("send handle start") // 发送 - k.connMapLock.RLock() - conn := k.connMap[convId] - k.connMapLock.RUnlock() - protoEnDecode := NewProtoEnDecode() + conn := session.conn + convId := conn.GetConv() + pktFreqLimitCounter := 0 + pktFreqLimitTimer := time.Now().UnixNano() for { - protoMsg, ok := <-kcpRawSendChan + protoMsg, ok := <-session.kcpRawSendChan if !ok { logger.LOG.Error("exit send loop, send chan close, convId: %v", convId) - k.closeKcpConn(convId, kcp.EnetServerKick) + k.closeKcpConn(session, kcp.EnetServerKick) break } - kcpMsg := protoEnDecode.protoEncode(protoMsg) + kcpMsg := k.protoEncode(protoMsg) if kcpMsg == nil { logger.LOG.Error("decode kcp msg is nil, convId: %v", convId) continue } - bin := k.encodePayloadToBin(kcpMsg) - _ = conn.SetWriteDeadline(time.Now().Add(time.Second * 10)) + bin := k.encodePayloadToBin(kcpMsg, session.xorKey) + _ = conn.SetWriteDeadline(time.Now().Add(time.Second * 5)) _, err := conn.Write(bin) if err != nil { logger.LOG.Error("exit send loop, conn write err: %v, convId: %v", err, convId) - k.closeKcpConn(convId, kcp.EnetServerKick) + k.closeKcpConn(session, kcp.EnetServerKick) break } - k.kcpSendListenMapLock.RLock() - flag := k.kcpSendListenMap[convId] - k.kcpSendListenMapLock.RUnlock() - if flag { - // 发包通知 - k.kcpEventOutput <- &KcpEvent{ - ConvId: convId, - EventId: KcpPacketSendNotify, - EventMessage: bin, - } - } - } -} - -func (k *KcpConnectManager) rttMonitor(convId uint64) { - ticker := time.NewTicker(time.Second * 10) - for { - select { - case <-ticker.C: - k.connMapLock.RLock() - conn := k.connMap[convId] - k.connMapLock.RUnlock() - if conn == nil { + // 发包频率限制 + pktFreqLimitCounter++ + now := time.Now().UnixNano() + if now-pktFreqLimitTimer > int64(time.Second) { + if pktFreqLimitCounter > 100 { + logger.LOG.Error("exit send loop, server packet send freq too high, convId: %v, pps: %v", convId, pktFreqLimitCounter) + k.closeKcpConn(session, kcp.EnetPacketFreqTooHigh) break + } else { + pktFreqLimitCounter = 0 } - logger.LOG.Debug("convId: %v, RTO: %v, SRTT: %v, RTTVar: %v", convId, conn.GetRTO(), conn.GetSRTT(), conn.GetSRTTVar()) - k.kcpEventOutput <- &KcpEvent{ - ConvId: convId, - EventId: KcpConnRttNotify, - EventMessage: conn.GetSRTT(), - } + pktFreqLimitTimer = now } } } -func (k *KcpConnectManager) closeKcpConn(convId uint64, enetType uint32) { - k.connMapLock.RLock() - conn, exist := k.connMap[convId] - k.connMapLock.RUnlock() - if !exist { +func (k *KcpConnectManager) closeKcpConn(session *Session, enetType uint32) { + if session.connState == ConnClose { return } - // 获取待关闭的发送管道 - k.kcpRawSendChanMapLock.RLock() - kcpRawSendChan := k.kcpRawSendChanMap[convId] - k.kcpRawSendChanMapLock.RUnlock() + session.connState = ConnClose + conn := session.conn + convId := conn.GetConv() // 清理数据 - k.connMapLock.Lock() - delete(k.connMap, convId) - k.connMapLock.Unlock() - k.kcpRawSendChanMapLock.Lock() - delete(k.kcpRawSendChanMap, convId) - k.kcpRawSendChanMapLock.Unlock() - k.kcpRecvListenMapLock.Lock() - delete(k.kcpRecvListenMap, convId) - k.kcpRecvListenMapLock.Unlock() - k.kcpSendListenMapLock.Lock() - delete(k.kcpSendListenMap, convId) - k.kcpSendListenMapLock.Unlock() - k.kcpKeyMapLock.Lock() - delete(k.kcpKeyMap, convId) - k.kcpKeyMapLock.Unlock() + k.DeleteSession(session.conn.GetConv(), session.userId) // 关闭连接 - conn.SendEnetNotify(&kcp.Enet{ - ConnType: kcp.ConnEnetFin, - EnetType: enetType, - }) - _ = conn.Close() - // 关闭发送管道 - close(kcpRawSendChan) + err := conn.Close() + if err == nil { + conn.SendEnetNotify(&kcp.Enet{ + ConnType: kcp.ConnEnetFin, + EnetType: enetType, + }) + } // 连接关闭通知 k.kcpEventOutput <- &KcpEvent{ ConvId: convId, EventId: KcpConnCloseNotify, } + // 通知GS玩家下线 + netMsg := new(cmd.NetMsg) + netMsg.UserId = session.userId + netMsg.EventId = cmd.UserOfflineNotify + k.netMsgInput <- netMsg + logger.LOG.Info("send to gs user offline, ConvId: %v, UserId: %v", convId, netMsg.UserId) + k.destroySessionChan <- session } func (k *KcpConnectManager) closeAllKcpConn() { closeConnList := make([]*kcp.UDPSession, 0) - k.connMapLock.RLock() - for _, v := range k.connMap { - closeConnList = append(closeConnList, v) + k.sessionMapLock.RLock() + for _, v := range k.sessionConvIdMap { + closeConnList = append(closeConnList, v.conn) } - k.connMapLock.RUnlock() + k.sessionMapLock.RUnlock() for _, v := range closeConnList { - k.closeKcpConn(v.GetConv(), kcp.EnetServerShutdown) + // 关闭连接 + v.SendEnetNotify(&kcp.Enet{ + ConnType: kcp.ConnEnetFin, + EnetType: kcp.EnetServerShutdown, + }) + _ = v.Close() } } + +func (k *KcpConnectManager) GetSessionByConvId(convId uint64) *Session { + k.sessionMapLock.RLock() + session, _ := k.sessionConvIdMap[convId] + k.sessionMapLock.RUnlock() + return session +} + +func (k *KcpConnectManager) GetSessionByUserId(userId uint32) *Session { + k.sessionMapLock.RLock() + session, _ := k.sessionUserIdMap[userId] + k.sessionMapLock.RUnlock() + return session +} + +func (k *KcpConnectManager) SetSession(session *Session, convId uint64, userId uint32) { + k.sessionMapLock.Lock() + k.sessionConvIdMap[convId] = session + k.sessionUserIdMap[userId] = session + k.sessionMapLock.Unlock() +} + +func (k *KcpConnectManager) DeleteSession(convId uint64, userId uint32) { + k.sessionMapLock.RLock() + delete(k.sessionConvIdMap, convId) + delete(k.sessionUserIdMap, userId) + k.sessionMapLock.RUnlock() +} diff --git a/gate/net/kcp_endecode.go b/gate/net/kcp_endecode.go index 00630f17..8d7e99ee 100644 --- a/gate/net/kcp_endecode.go +++ b/gate/net/kcp_endecode.go @@ -33,26 +33,19 @@ type KcpMsg struct { ProtoData []byte } -func (k *KcpConnectManager) decodeBinToPayload(data []byte, convId uint64, kcpMsgList *[]*KcpMsg) { +func (k *KcpConnectManager) decodeBinToPayload(data []byte, convId uint64, kcpMsgList *[]*KcpMsg, xorKey []byte) { // xor解密 - k.kcpKeyMapLock.RLock() - xorKey, exist := k.kcpKeyMap[convId] - k.kcpKeyMapLock.RUnlock() - if !exist { - logger.LOG.Error("kcp xor key not exist, convId: %v", convId) - return - } endec.Xor(data, xorKey) - k.decodeRecur(data, convId, kcpMsgList) + k.decodeLoop(data, convId, kcpMsgList) } -func (k *KcpConnectManager) decodeRecur(data []byte, convId uint64, kcpMsgList *[]*KcpMsg) { +func (k *KcpConnectManager) decodeLoop(data []byte, convId uint64, kcpMsgList *[]*KcpMsg) { // 长度太短 if len(data) < 12 { logger.LOG.Debug("packet len less 12 byte") return } - // 头部标志错误 + // 头部幻数错误 if data[0] != 0x45 || data[1] != 0x67 { logger.LOG.Error("packet head magic 0x4567 error") return @@ -97,7 +90,7 @@ func (k *KcpConnectManager) decodeRecur(data []byte, convId uint64, kcpMsgList * logger.LOG.Error("packet len error") return } - // 尾部标志错误 + // 尾部幻数错误 if data[headLen+protoLen+10] != 0x89 || data[headLen+protoLen+11] != 0xAB { logger.LOG.Error("packet tail magic 0x89AB error") return @@ -124,11 +117,11 @@ func (k *KcpConnectManager) decodeRecur(data []byte, convId uint64, kcpMsgList * *kcpMsgList = append(*kcpMsgList, kcpMsg) // 递归解析 if haveMoreData { - k.decodeRecur(data[int(headLen+protoLen)+12:], convId, kcpMsgList) + k.decodeLoop(data[int(headLen+protoLen)+12:], convId, kcpMsgList) } } -func (k *KcpConnectManager) encodePayloadToBin(kcpMsg *KcpMsg) (bin []byte) { +func (k *KcpConnectManager) encodePayloadToBin(kcpMsg *KcpMsg, xorKey []byte) (bin []byte) { if kcpMsg.HeadData == nil { kcpMsg.HeadData = make([]byte, 0) } @@ -136,7 +129,7 @@ func (k *KcpConnectManager) encodePayloadToBin(kcpMsg *KcpMsg) (bin []byte) { kcpMsg.ProtoData = make([]byte, 0) } bin = make([]byte, len(kcpMsg.HeadData)+len(kcpMsg.ProtoData)+12) - // 头部标志 + // 头部幻数 bin[0] = 0x45 bin[1] = 0x67 // 协议号 @@ -172,17 +165,10 @@ func (k *KcpConnectManager) encodePayloadToBin(kcpMsg *KcpMsg) (bin []byte) { copy(bin[10:], kcpMsg.HeadData) // proto数据 copy(bin[10+len(kcpMsg.HeadData):], kcpMsg.ProtoData) - // 尾部标志 + // 尾部幻数 bin[len(bin)-2] = 0x89 bin[len(bin)-1] = 0xAB // xor加密 - k.kcpKeyMapLock.RLock() - xorKey, exist := k.kcpKeyMap[kcpMsg.ConvId] - k.kcpKeyMapLock.RUnlock() - if !exist { - logger.LOG.Error("kcp xor key not exist, convId: %v", kcpMsg.ConvId) - return - } endec.Xor(bin, xorKey) return bin } diff --git a/gate/net/kcp_event.go b/gate/net/kcp_event.go index 3a842cf6..e0ede2ac 100644 --- a/gate/net/kcp_event.go +++ b/gate/net/kcp_event.go @@ -1,23 +1,17 @@ package net import ( + "hk4e/gate/kcp" "hk4e/pkg/logger" "reflect" ) const ( - KcpXorKeyChange = iota - KcpDispatchKeyChange - KcpPacketRecvListen - KcpPacketSendListen - KcpConnForceClose + KcpConnForceClose = iota KcpAllConnForceClose KcpGateOpenState - KcpPacketRecvNotify - KcpPacketSendNotify KcpConnCloseNotify KcpConnEstNotify - KcpConnRttNotify KcpConnAddrChangeNotify ) @@ -27,92 +21,26 @@ type KcpEvent struct { EventMessage any } +func (k *KcpConnectManager) GetKcpEventInputChan() chan *KcpEvent { + return k.kcpEventInput +} + +func (k *KcpConnectManager) GetKcpEventOutputChan() chan *KcpEvent { + return k.kcpEventOutput +} + func (k *KcpConnectManager) eventHandle() { + logger.LOG.Debug("event handle start") // 事件处理 for { event := <-k.kcpEventInput logger.LOG.Info("kcp manager recv event, ConvId: %v, EventId: %v, EventMessage Type: %v", event.ConvId, event.EventId, reflect.TypeOf(event.EventMessage)) switch event.EventId { - case KcpXorKeyChange: - // XOR密钥切换 - k.connMapLock.RLock() - _, exist := k.connMap[event.ConvId] - k.connMapLock.RUnlock() - if !exist { - logger.LOG.Error("conn not exist, convId: %v", event.ConvId) - continue - } - key, ok := event.EventMessage.([]byte) - if !ok { - logger.LOG.Error("event KcpXorKeyChange msg type error") - continue - } - k.kcpKeyMapLock.Lock() - k.kcpKeyMap[event.ConvId] = key - k.kcpKeyMapLock.Unlock() - case KcpDispatchKeyChange: - // 首包加密XOR密钥切换 - key, ok := event.EventMessage.([]byte) - if !ok { - logger.LOG.Error("event KcpXorKeyChange msg type error") - continue - } - k.dispatchKeyLock.Lock() - k.dispatchKey = key - k.dispatchKeyLock.Unlock() - case KcpPacketRecvListen: - // 收包监听 - k.connMapLock.RLock() - _, exist := k.connMap[event.ConvId] - k.connMapLock.RUnlock() - if !exist { - logger.LOG.Error("conn not exist, convId: %v", event.ConvId) - continue - } - flag, ok := event.EventMessage.(string) - if !ok { - logger.LOG.Error("event KcpXorKeyChange msg type error") - continue - } - if flag == "Enable" { - k.kcpRecvListenMapLock.Lock() - k.kcpRecvListenMap[event.ConvId] = true - k.kcpRecvListenMapLock.Unlock() - } else if flag == "Disable" { - k.kcpRecvListenMapLock.Lock() - k.kcpRecvListenMap[event.ConvId] = false - k.kcpRecvListenMapLock.Unlock() - } - case KcpPacketSendListen: - // 发包监听 - k.connMapLock.RLock() - _, exist := k.connMap[event.ConvId] - k.connMapLock.RUnlock() - if !exist { - logger.LOG.Error("conn not exist, convId: %v", event.ConvId) - continue - } - flag, ok := event.EventMessage.(string) - if !ok { - logger.LOG.Error("event KcpXorKeyChange msg type error") - continue - } - if flag == "Enable" { - k.kcpSendListenMapLock.Lock() - k.kcpSendListenMap[event.ConvId] = true - k.kcpSendListenMapLock.Unlock() - } else if flag == "Disable" { - k.kcpSendListenMapLock.Lock() - k.kcpSendListenMap[event.ConvId] = false - k.kcpSendListenMapLock.Unlock() - } case KcpConnForceClose: // 强制关闭某个连接 - k.connMapLock.RLock() - _, exist := k.connMap[event.ConvId] - k.connMapLock.RUnlock() - if !exist { - logger.LOG.Error("conn not exist, convId: %v", event.ConvId) + session := k.GetSessionByConvId(event.ConvId) + if session == nil { + logger.LOG.Error("session not exist, convId: %v", event.ConvId) continue } reason, ok := event.EventMessage.(uint32) @@ -120,7 +48,11 @@ func (k *KcpConnectManager) eventHandle() { logger.LOG.Error("event KcpConnForceClose msg type error") continue } - k.closeKcpConn(event.ConvId, reason) + session.conn.SendEnetNotify(&kcp.Enet{ + ConnType: kcp.ConnEnetFin, + EnetType: reason, + }) + _ = session.conn.Close() logger.LOG.Info("conn has been force close, convId: %v", event.ConvId) case KcpAllConnForceClose: // 强制关闭所有连接 diff --git a/gate/net/proto_endecode.go b/gate/net/proto_endecode.go index c5d14885..01a0ef5c 100644 --- a/gate/net/proto_endecode.go +++ b/gate/net/proto_endecode.go @@ -8,16 +8,6 @@ import ( pb "google.golang.org/protobuf/proto" ) -type ProtoEnDecode struct { - cmdProtoMap *cmd.CmdProtoMap -} - -func NewProtoEnDecode() (r *ProtoEnDecode) { - r = new(ProtoEnDecode) - r.cmdProtoMap = cmd.NewCmdProtoMap() - return r -} - type ProtoMsg struct { ConvId uint64 CmdId uint16 @@ -30,7 +20,7 @@ type ProtoMessage struct { message pb.Message } -func (p *ProtoEnDecode) protoDecode(kcpMsg *KcpMsg) (protoMsgList []*ProtoMsg) { +func (k *KcpConnectManager) protoDecode(kcpMsg *KcpMsg) (protoMsgList []*ProtoMsg) { protoMsgList = make([]*ProtoMsg, 0) protoMsg := new(ProtoMsg) protoMsg.ConvId = kcpMsg.ConvId @@ -49,7 +39,7 @@ func (p *ProtoEnDecode) protoDecode(kcpMsg *KcpMsg) (protoMsgList []*ProtoMsg) { } // payload msg protoMessageList := make([]*ProtoMessage, 0) - p.protoDecodePayloadLoop(kcpMsg.CmdId, kcpMsg.ProtoData, &protoMessageList) + k.protoDecodePayloadLoop(kcpMsg.CmdId, kcpMsg.ProtoData, &protoMessageList) if len(protoMessageList) == 0 { logger.LOG.Error("decode proto object is nil") return protoMsgList @@ -82,8 +72,8 @@ func (p *ProtoEnDecode) protoDecode(kcpMsg *KcpMsg) (protoMsgList []*ProtoMsg) { return protoMsgList } -func (p *ProtoEnDecode) protoDecodePayloadLoop(cmdId uint16, protoData []byte, protoMessageList *[]*ProtoMessage) { - protoObj := p.decodePayloadToProto(cmdId, protoData) +func (k *KcpConnectManager) protoDecodePayloadLoop(cmdId uint16, protoData []byte, protoMessageList *[]*ProtoMessage) { + protoObj := k.decodePayloadToProto(cmdId, protoData) if protoObj == nil { logger.LOG.Error("decode proto object is nil") return @@ -96,7 +86,7 @@ func (p *ProtoEnDecode) protoDecodePayloadLoop(cmdId uint16, protoData []byte, p return } for _, unionCmd := range unionCmdNotify.GetCmdList() { - p.protoDecodePayloadLoop(uint16(unionCmd.MessageId), unionCmd.Body, protoMessageList) + k.protoDecodePayloadLoop(uint16(unionCmd.MessageId), unionCmd.Body, protoMessageList) } } *protoMessageList = append(*protoMessageList, &ProtoMessage{ @@ -105,7 +95,7 @@ func (p *ProtoEnDecode) protoDecodePayloadLoop(cmdId uint16, protoData []byte, p }) } -func (p *ProtoEnDecode) protoEncode(protoMsg *ProtoMsg) (kcpMsg *KcpMsg) { +func (k *KcpConnectManager) protoEncode(protoMsg *ProtoMsg) (kcpMsg *KcpMsg) { cmdName := "" if protoMsg.PayloadMessage != nil { cmdName = string(protoMsg.PayloadMessage.ProtoReflect().Descriptor().FullName()) @@ -127,7 +117,7 @@ func (p *ProtoEnDecode) protoEncode(protoMsg *ProtoMsg) (kcpMsg *KcpMsg) { } // payload msg if protoMsg.PayloadMessage != nil { - cmdId, protoData := p.encodeProtoToPayload(protoMsg.PayloadMessage) + cmdId, protoData := k.encodeProtoToPayload(protoMsg.PayloadMessage) if cmdId == 0 || protoData == nil { logger.LOG.Error("encode proto data is nil") return nil @@ -143,8 +133,8 @@ func (p *ProtoEnDecode) protoEncode(protoMsg *ProtoMsg) (kcpMsg *KcpMsg) { return kcpMsg } -func (p *ProtoEnDecode) decodePayloadToProto(cmdId uint16, protoData []byte) (protoObj pb.Message) { - protoObj = p.cmdProtoMap.GetProtoObjByCmdId(cmdId) +func (k *KcpConnectManager) decodePayloadToProto(cmdId uint16, protoData []byte) (protoObj pb.Message) { + protoObj = k.cmdProtoMap.GetProtoObjByCmdId(cmdId) if protoObj == nil { logger.LOG.Error("get new proto object is nil") return nil @@ -157,8 +147,8 @@ func (p *ProtoEnDecode) decodePayloadToProto(cmdId uint16, protoData []byte) (pr return protoObj } -func (p *ProtoEnDecode) encodeProtoToPayload(protoObj pb.Message) (cmdId uint16, protoData []byte) { - cmdId = p.cmdProtoMap.GetCmdIdByProtoObj(protoObj) +func (k *KcpConnectManager) encodeProtoToPayload(protoObj pb.Message) (cmdId uint16, protoData []byte) { + cmdId = k.cmdProtoMap.GetCmdIdByProtoObj(protoObj) var err error = nil protoData, err = pb.Marshal(protoObj) if err != nil { diff --git a/gs/config/drop_group_data.go b/gdconf/drop_group_data.go similarity index 63% rename from gs/config/drop_group_data.go rename to gdconf/drop_group_data.go index 33a4955d..db75964f 100644 --- a/gs/config/drop_group_data.go +++ b/gdconf/drop_group_data.go @@ -1,12 +1,8 @@ -package config +package gdconf import ( - "os" - "strings" - - "hk4e/pkg/logger" - "github.com/jszwec/csvutil" + "hk4e/pkg/logger" ) type Drop struct { @@ -26,20 +22,9 @@ func (g *GameDataConfig) loadDropGroupData() { g.DropGroupDataMap = make(map[int32]*DropGroupData) fileNameList := []string{"DropGachaAvatarUp.csv", "DropGachaWeaponUp.csv", "DropGachaNormal.csv"} for _, fileName := range fileNameList { - fileData, err := os.ReadFile(g.csvPrefix + fileName) - if err != nil { - logger.LOG.Error("open file error: %v", err) - return - } - // 去除第二三行的内容变成标准格式的csv - index1 := strings.Index(string(fileData), "\n") - index2 := strings.Index(string(fileData[(index1+1):]), "\n") - index3 := strings.Index(string(fileData[(index2+1)+(index1+1):]), "\n") - standardCsvData := make([]byte, 0) - standardCsvData = append(standardCsvData, fileData[:index1]...) - standardCsvData = append(standardCsvData, fileData[index3+(index2+1)+(index1+1):]...) + data := g.readCsvFileData("ext/" + fileName) var dropList []*Drop - err = csvutil.Unmarshal(standardCsvData, &dropList) + err := csvutil.Unmarshal(data, &dropList) if err != nil { logger.LOG.Error("parse file error: %v", err) return diff --git a/gdconf/game_data_config.go b/gdconf/game_data_config.go index 77bd15c4..6cb178e8 100644 --- a/gdconf/game_data_config.go +++ b/gdconf/game_data_config.go @@ -19,6 +19,7 @@ type GameDataConfig struct { AvatarDataMap map[int32]*AvatarData // 角色 AvatarSkillDataMap map[int32]*AvatarSkillData // 角色技能 AvatarSkillDepotDataMap map[int32]*AvatarSkillDepotData // 角色技能库 + DropGroupDataMap map[int32]*DropGroupData // 掉落组 } func InitGameDataConfig() { @@ -59,6 +60,7 @@ func (g *GameDataConfig) load() { g.loadAvatarData() // 角色 g.loadAvatarSkillData() // 角色技能 g.loadAvatarSkillDepotData() // 角色技能库 + g.loadDropGroupData() // 掉落组 } func (g *GameDataConfig) readCsvFileData(fileName string) []byte { diff --git a/gs/config/game_data_config.go b/gs/config/game_data_config.go index 7a83790e..28c5076c 100644 --- a/gs/config/game_data_config.go +++ b/gs/config/game_data_config.go @@ -12,7 +12,6 @@ var CONF *GameDataConfig = nil type GameDataConfig struct { binPrefix string excelBinPrefix string - csvPrefix string GameDepot *GameDepot // 配置表 // BinOutput @@ -33,8 +32,6 @@ type GameDataConfig struct { // 角色技能 AvatarSkillDataMap map[int32]*AvatarSkillData AvatarSkillDepotDataMap map[int32]*AvatarSkillDepotData - // 掉落组配置表 - DropGroupDataMap map[int32]*DropGroupData // GG GadgetDataMap map[int32]*GadgetData // 采集物 @@ -45,7 +42,6 @@ func InitGameDataConfig() { CONF = new(GameDataConfig) CONF.binPrefix = "" CONF.excelBinPrefix = "" - CONF.csvPrefix = "" CONF.loadAll() } @@ -65,8 +61,6 @@ func (g *GameDataConfig) load() { // 角色技能 g.loadAvatarSkillData() g.loadAvatarSkillDepotData() - // 掉落组配置表 - g.loadDropGroupData() // GG g.loadGadgetData() // 采集物 @@ -87,7 +81,6 @@ func (g *GameDataConfig) loadAll() { } g.binPrefix = resourcePath + "/BinOutput" g.excelBinPrefix = resourcePath + "/ExcelBinOutput" - g.csvPrefix = resourcePath + "/Csv" dirInfo, err = os.Stat(g.binPrefix) if err != nil || !dirInfo.IsDir() { logger.LOG.Error("open game data bin output config dir error: %v", err) @@ -98,14 +91,8 @@ func (g *GameDataConfig) loadAll() { logger.LOG.Error("open game data excel bin output config dir error: %v", err) return } - dirInfo, err = os.Stat(g.csvPrefix) - if err != nil || !dirInfo.IsDir() { - logger.LOG.Error("open game data csv config dir error: %v", err) - return - } g.binPrefix += "/" g.excelBinPrefix += "/" - g.csvPrefix += "/" g.load() } diff --git a/gs/game/game_manager.go b/gs/game/game_manager.go index a3d736ae..52f3a5ba 100644 --- a/gs/game/game_manager.go +++ b/gs/game/game_manager.go @@ -2,8 +2,6 @@ package game import ( pb "google.golang.org/protobuf/proto" - "hk4e/gate/entity/gm" - "hk4e/gate/kcp" "hk4e/gs/dao" "hk4e/gs/model" "hk4e/pkg/alg" @@ -156,18 +154,3 @@ func (g *GameManager) ReconnectPlayer(userId uint32) { func (g *GameManager) DisconnectPlayer(userId uint32) { g.SendMsg(cmd.ServerDisconnectClientNotify, userId, 0, new(proto.ServerDisconnectClientNotify)) } - -// KickPlayer 踢出玩家 -func (g *GameManager) KickPlayer(userId uint32) { - info := new(gm.KickPlayerInfo) - info.UserId = userId - // 客户端提示信息为服务器断开连接 - info.Reason = uint32(kcp.EnetServerKick) - var result bool - ok := false - //ok := r.hk4eGatewayConsumer.CallFunction("RpcManager", "KickPlayer", &info, &result) - if ok == true && result == true { - return - } - return -} diff --git a/gs/game/tick_manager.go b/gs/game/tick_manager.go index 484e8290..ff74df75 100644 --- a/gs/game/tick_manager.go +++ b/gs/game/tick_manager.go @@ -241,7 +241,7 @@ func (t *TickManager) onTickSecond(now int64) { func (t *TickManager) onTick200MilliSecond(now int64) { // 耐力消耗 - for _, player := range USER_MANAGER.playerMap { + for _, player := range USER_MANAGER.GetAllOnlineUserList() { GAME_MANAGER.SustainStaminaHandler(player) GAME_MANAGER.VehicleRestoreStaminaHandler(player) } diff --git a/gs/game/user_gacha.go b/gs/game/user_gacha.go index 397b8498..732d7727 100644 --- a/gs/game/user_gacha.go +++ b/gs/game/user_gacha.go @@ -1,10 +1,10 @@ package game import ( + "hk4e/gdconf" "time" "hk4e/common/config" - gdc "hk4e/gs/config" "hk4e/gs/model" "hk4e/pkg/logger" "hk4e/pkg/random" @@ -385,7 +385,7 @@ func (g *GameManager) doGachaOnce(userId uint32, gachaType uint32, mustGetUpEnab } // 找到卡池对应的掉落组 - dropGroupDataConfig := gdc.CONF.DropGroupDataMap[int32(gachaType)] + dropGroupDataConfig := gdconf.CONF.DropGroupDataMap[int32(gachaType)] if dropGroupDataConfig == nil { logger.LOG.Error("drop group not found, drop id: %v", gachaType) return false, 0 @@ -419,7 +419,7 @@ func (g *GameManager) doGachaOnce(userId uint32, gachaType uint32, mustGetUpEnab PurpleTimesFixValue = WeaponPurpleTimesFixValue } if gachaPoolInfo.OrangeTimes >= OrangeTimesFixThreshold || gachaPoolInfo.PurpleTimes >= PurpleTimesFixThreshold { - fixDropGroupDataConfig := new(gdc.DropGroupData) + fixDropGroupDataConfig := new(gdconf.DropGroupData) fixDropGroupDataConfig.DropId = dropGroupDataConfig.DropId fixDropGroupDataConfig.WeightAll = dropGroupDataConfig.WeightAll // 计算4星和5星权重修正值 @@ -432,7 +432,7 @@ func (g *GameManager) doGachaOnce(userId uint32, gachaType uint32, mustGetUpEnab addPurpleWeight = 0 } for _, drop := range dropGroupDataConfig.DropConfig { - fixDrop := new(gdc.Drop) + fixDrop := new(gdconf.Drop) fixDrop.Result = drop.Result fixDrop.DropId = drop.DropId fixDrop.IsEnd = drop.IsEnd @@ -527,7 +527,7 @@ func (g *GameManager) doGachaOnce(userId uint32, gachaType uint32, mustGetUpEnab // 替换本次结果为5星大保底 if gachaPoolInfo.MustGetUpOrange { logger.LOG.Debug("trigger must get up orange, uid: %v", userId) - upOrangeDropGroupDataConfig := gdc.CONF.DropGroupDataMap[upOrangeDropId] + upOrangeDropGroupDataConfig := gdconf.CONF.DropGroupDataMap[upOrangeDropId] if upOrangeDropGroupDataConfig == nil { logger.LOG.Error("drop group not found, drop id: %v", upOrangeDropId) return false, 0 @@ -554,7 +554,7 @@ func (g *GameManager) doGachaOnce(userId uint32, gachaType uint32, mustGetUpEnab // 替换本次结果为4星大保底 if gachaPoolInfo.MustGetUpPurple { logger.LOG.Debug("trigger must get up purple, uid: %v", userId) - upPurpleDropGroupDataConfig := gdc.CONF.DropGroupDataMap[upPurpleDropId] + upPurpleDropGroupDataConfig := gdconf.CONF.DropGroupDataMap[upPurpleDropId] if upPurpleDropGroupDataConfig == nil { logger.LOG.Error("drop group not found, drop id: %v", upPurpleDropId) return false, 0 @@ -578,7 +578,7 @@ func (g *GameManager) doGachaOnce(userId uint32, gachaType uint32, mustGetUpEnab } // 走一次完整流程的掉落组 -func (g *GameManager) doFullRandDrop(dropGroupDataConfig *gdc.DropGroupData) (bool, *gdc.Drop) { +func (g *GameManager) doFullRandDrop(dropGroupDataConfig *gdconf.DropGroupData) (bool, *gdconf.Drop) { for { drop := g.doRandDropOnce(dropGroupDataConfig) if drop == nil { @@ -590,7 +590,7 @@ func (g *GameManager) doFullRandDrop(dropGroupDataConfig *gdc.DropGroupData) (bo return true, drop } // 进行下一步掉落流程 - dropGroupDataConfig = gdc.CONF.DropGroupDataMap[drop.Result] + dropGroupDataConfig = gdconf.CONF.DropGroupDataMap[drop.Result] if dropGroupDataConfig == nil { logger.LOG.Error("drop config tab exist error, invalid drop id: %v", drop.Result) return false, nil @@ -599,7 +599,7 @@ func (g *GameManager) doFullRandDrop(dropGroupDataConfig *gdc.DropGroupData) (bo } // 进行单次随机掉落 -func (g *GameManager) doRandDropOnce(dropGroupDataConfig *gdc.DropGroupData) *gdc.Drop { +func (g *GameManager) doRandDropOnce(dropGroupDataConfig *gdconf.DropGroupData) *gdconf.Drop { randNum := random.GetRandomInt32(0, dropGroupDataConfig.WeightAll-1) sumWeight := int32(0) // 轮盘选择法