重构网关服务器

This commit is contained in:
flswld
2022-12-19 01:35:15 +08:00
parent d337799fd8
commit f773ba8df7
19 changed files with 686 additions and 1241 deletions

361
gate/net/forward.go Normal file
View File

@@ -0,0 +1,361 @@
package net
import (
"bytes"
"encoding/base64"
"encoding/binary"
"fmt"
"hk4e/dispatch/controller"
"hk4e/gate/kcp"
"hk4e/pkg/endec"
"hk4e/pkg/httpclient"
"hk4e/pkg/logger"
"hk4e/pkg/random"
"hk4e/protocol/cmd"
"hk4e/protocol/proto"
"math/rand"
"strconv"
"strings"
"time"
)
const (
ConnWaitToken = iota
ConnWaitLogin
ConnAlive
ConnClose
)
// 发送消息到GS
func (k *KcpConnectManager) recvMsgHandle(protoMsg *ProtoMsg, session *Session) {
userId := session.userId
headMeta := session.headMeta
connState := session.connState
if protoMsg.HeadMessage == nil {
logger.LOG.Error("recv null head msg: %v", protoMsg)
}
headMeta.seq = protoMsg.HeadMessage.ClientSequenceId
// gate本地处理的请求
switch protoMsg.CmdId {
case cmd.GetPlayerTokenReq:
// 获取玩家token请求
if connState != ConnWaitToken {
return
}
getPlayerTokenReq := protoMsg.PayloadMessage.(*proto.GetPlayerTokenReq)
getPlayerTokenRsp := k.getPlayerToken(getPlayerTokenReq, session)
if getPlayerTokenRsp == nil {
return
}
// 返回数据到客户端
rsp := new(ProtoMsg)
rsp.ConvId = protoMsg.ConvId
rsp.CmdId = cmd.GetPlayerTokenRsp
rsp.HeadMessage = k.getHeadMsg(protoMsg.HeadMessage.ClientSequenceId)
rsp.PayloadMessage = getPlayerTokenRsp
k.localMsgOutput <- rsp
case cmd.PlayerLoginReq:
// 玩家登录请求
if connState != ConnWaitLogin {
return
}
playerLoginReq := protoMsg.PayloadMessage.(*proto.PlayerLoginReq)
playerLoginRsp := k.playerLogin(playerLoginReq, session)
if playerLoginRsp == nil {
return
}
// 返回数据到客户端
rsp := new(ProtoMsg)
rsp.ConvId = protoMsg.ConvId
rsp.CmdId = cmd.PlayerLoginRsp
rsp.HeadMessage = k.getHeadMsg(protoMsg.HeadMessage.ClientSequenceId)
rsp.PayloadMessage = playerLoginRsp
k.localMsgOutput <- rsp
// 登录成功 通知GS初始化相关数据
netMsg := new(cmd.NetMsg)
netMsg.UserId = userId
netMsg.EventId = cmd.UserLoginNotify
netMsg.ClientSeq = headMeta.seq
k.netMsgInput <- netMsg
logger.LOG.Info("send to gs user login ok, ConvId: %v, UserId: %v", protoMsg.ConvId, netMsg.UserId)
case cmd.SetPlayerBornDataReq:
// 玩家注册请求
if connState != ConnAlive {
return
}
netMsg := new(cmd.NetMsg)
netMsg.UserId = userId
netMsg.EventId = cmd.UserRegNotify
netMsg.CmdId = cmd.SetPlayerBornDataReq
netMsg.ClientSeq = protoMsg.HeadMessage.ClientSequenceId
netMsg.PayloadMessage = protoMsg.PayloadMessage
k.netMsgInput <- netMsg
case cmd.PlayerForceExitReq:
// 玩家退出游戏请求
if connState != ConnAlive {
return
}
k.kcpEventInput <- &KcpEvent{
ConvId: protoMsg.ConvId,
EventId: KcpConnForceClose,
EventMessage: uint32(kcp.EnetClientClose),
}
case cmd.PingReq:
// ping请求
if connState != ConnAlive {
return
}
pingReq := protoMsg.PayloadMessage.(*proto.PingReq)
logger.LOG.Debug("user ping req, data: %v", pingReq.String())
// 返回数据到客户端
// TODO 记录客户端最后一次ping时间做超时下线处理
pingRsp := new(proto.PingRsp)
pingRsp.ClientTime = pingReq.ClientTime
rsp := new(ProtoMsg)
rsp.ConvId = protoMsg.ConvId
rsp.CmdId = cmd.PingRsp
rsp.HeadMessage = k.getHeadMsg(protoMsg.HeadMessage.ClientSequenceId)
rsp.PayloadMessage = pingRsp
k.localMsgOutput <- rsp
// 通知GS玩家客户端的本地时钟
netMsg := new(cmd.NetMsg)
netMsg.UserId = userId
netMsg.EventId = cmd.ClientTimeNotify
netMsg.ClientTime = pingReq.ClientTime
k.netMsgInput <- netMsg
// RTT
logger.LOG.Debug("convId: %v, RTO: %v, SRTT: %v, RTTVar: %v", protoMsg.ConvId, session.conn.GetRTO(), session.conn.GetSRTT(), session.conn.GetSRTTVar())
// 客户端往返时延通知
rtt := session.conn.GetSRTT()
// 通知GS玩家客户端往返时延
netMsg = new(cmd.NetMsg)
netMsg.UserId = userId
netMsg.EventId = cmd.ClientRttNotify
netMsg.ClientRtt = uint32(rtt)
k.netMsgInput <- netMsg
default:
// 转发到GS
// 未登录禁止访问GS
if connState != ConnAlive {
return
}
netMsg := new(cmd.NetMsg)
netMsg.UserId = userId
netMsg.EventId = cmd.NormalMsg
netMsg.CmdId = protoMsg.CmdId
netMsg.ClientSeq = protoMsg.HeadMessage.ClientSequenceId
netMsg.PayloadMessage = protoMsg.PayloadMessage
k.netMsgInput <- netMsg
}
}
// 从GS接收消息
func (k *KcpConnectManager) sendMsgHandle() {
logger.LOG.Debug("send msg handle start")
kcpRawSendChanMap := make(map[uint64]chan *ProtoMsg)
userIdConvMap := make(map[uint32]uint64)
sendToClientFn := func(protoMsg *ProtoMsg) {
// 分发到每个连接具体的发送协程
kcpRawSendChan := kcpRawSendChanMap[protoMsg.ConvId]
if kcpRawSendChan != nil {
select {
case kcpRawSendChan <- protoMsg:
default:
logger.LOG.Error("kcpRawSendChan is full, convId: %v", protoMsg.ConvId)
}
} else {
logger.LOG.Error("kcpRawSendChan is nil, convId: %v", protoMsg.ConvId)
}
}
for {
select {
case session := <-k.createSessionChan:
kcpRawSendChanMap[session.conn.GetConv()] = session.kcpRawSendChan
userIdConvMap[session.userId] = session.conn.GetConv()
case session := <-k.destroySessionChan:
delete(kcpRawSendChanMap, session.conn.GetConv())
delete(userIdConvMap, session.userId)
close(session.kcpRawSendChan)
case protoMsg := <-k.localMsgOutput:
sendToClientFn(protoMsg)
case netMsg := <-k.netMsgOutput:
convId, exist := userIdConvMap[netMsg.UserId]
if !exist {
logger.LOG.Error("can not find convId by userId")
continue
}
if netMsg.EventId == cmd.NormalMsg {
protoMsg := new(ProtoMsg)
protoMsg.ConvId = convId
protoMsg.CmdId = netMsg.CmdId
protoMsg.HeadMessage = k.getHeadMsg(netMsg.ClientSeq)
protoMsg.PayloadMessage = netMsg.PayloadMessage
sendToClientFn(protoMsg)
} else {
logger.LOG.Error("recv unknown event from game server, event id: %v", netMsg.EventId)
}
}
}
}
func (k *KcpConnectManager) getHeadMsg(clientSeq uint32) (headMsg *proto.PacketHead) {
headMsg = new(proto.PacketHead)
if clientSeq != 0 {
headMsg.ClientSequenceId = clientSeq
headMsg.SentMs = uint64(time.Now().UnixMilli())
}
return headMsg
}
func (k *KcpConnectManager) getPlayerToken(req *proto.GetPlayerTokenReq, session *Session) (rsp *proto.GetPlayerTokenRsp) {
tokenVerifyRsp, err := httpclient.Post[controller.TokenVerifyRsp]("http://127.0.0.1:8080/gate/token/verify", &controller.TokenVerifyReq{
AccountId: req.AccountUid,
AccountToken: req.AccountToken,
}, "")
if err != nil {
logger.LOG.Error("verify token error: %v", err)
return nil
}
if !tokenVerifyRsp.Valid {
logger.LOG.Error("token error")
return nil
}
// comboToken验证成功
if tokenVerifyRsp.Forbid {
// 封号通知
rsp = new(proto.GetPlayerTokenRsp)
rsp.Uid = tokenVerifyRsp.PlayerID
rsp.IsProficientPlayer = true
rsp.Retcode = 21
rsp.Msg = "FORBID_CHEATING_PLUGINS"
rsp.BlackUidEndTime = tokenVerifyRsp.ForbidEndTime
if rsp.BlackUidEndTime == 0 {
rsp.BlackUidEndTime = 2051193600 // 2035-01-01 00:00:00
}
rsp.RegPlatform = 3
rsp.CountryCode = "US"
addr := session.conn.RemoteAddr().String()
split := strings.Split(addr, ":")
rsp.ClientIpStr = split[0]
return rsp
}
oldSession := k.GetSessionByUserId(tokenVerifyRsp.PlayerID)
if oldSession != nil {
// 顶号
k.kcpEventInput <- &KcpEvent{
ConvId: oldSession.conn.GetConv(),
EventId: KcpConnForceClose,
EventMessage: uint32(kcp.EnetServerRelogin),
}
}
// 关联玩家uid和连接信息
session.userId = tokenVerifyRsp.PlayerID
session.connState = ConnWaitLogin
k.SetSession(session, session.conn.GetConv(), session.userId)
k.createSessionChan <- session
// 返回响应
rsp = new(proto.GetPlayerTokenRsp)
rsp.Uid = tokenVerifyRsp.PlayerID
rsp.AccountUid = req.AccountUid
rsp.Token = req.AccountToken
data := make([]byte, 16+32)
rand.Read(data)
rsp.SecurityCmdBuffer = data[16:]
rsp.ClientVersionRandomKey = fmt.Sprintf("%03x-%012x", data[:3], data[4:16])
rsp.AccountType = 1
rsp.IsProficientPlayer = true
rsp.PlatformType = 3
rsp.ChannelId = 1
rsp.SubChannelId = 1
rsp.RegPlatform = 2
rsp.Birthday = "2000-01-01"
addr := session.conn.RemoteAddr().String()
split := strings.Split(addr, ":")
rsp.ClientIpStr = split[0]
if req.GetKeyId() != 0 {
logger.LOG.Debug("do hk4e 2.8 rsa logic")
keyId := strconv.Itoa(int(req.GetKeyId()))
encPubPrivKey, exist := k.encRsaKeyMap[keyId]
if !exist {
logger.LOG.Error("can not found key id: %v", keyId)
return
}
pubKey, err := endec.RsaParsePubKeyByPrivKey(encPubPrivKey)
if err != nil {
logger.LOG.Error("parse rsa pub key error: %v", err)
return nil
}
signPrivkey, err := endec.RsaParsePrivKey(k.signRsaKey)
if err != nil {
logger.LOG.Error("parse rsa priv key error: %v", err)
return nil
}
clientSeedBase64 := req.GetClientRandKey()
clientSeedEnc, err := base64.StdEncoding.DecodeString(clientSeedBase64)
if err != nil {
logger.LOG.Error("parse client seed base64 error: %v", err)
return nil
}
clientSeed, err := endec.RsaDecrypt(clientSeedEnc, signPrivkey)
if err != nil {
logger.LOG.Error("rsa dec error: %v", err)
return rsp
}
clientSeedUint64 := uint64(0)
err = binary.Read(bytes.NewReader(clientSeed), binary.BigEndian, &clientSeedUint64)
if err != nil {
logger.LOG.Error("parse client seed to uint64 error: %v", err)
return rsp
}
timeRand := random.GetTimeRand()
serverSeedUint64 := timeRand.Uint64()
session.seed = serverSeedUint64
session.changeXorKey = true
seedUint64 := serverSeedUint64 ^ clientSeedUint64
seedBuf := new(bytes.Buffer)
err = binary.Write(seedBuf, binary.BigEndian, seedUint64)
if err != nil {
logger.LOG.Error("conv seed uint64 to bytes error: %v", err)
return rsp
}
seed := seedBuf.Bytes()
seedEnc, err := endec.RsaEncrypt(seed, pubKey)
if err != nil {
logger.LOG.Error("rsa enc error: %v", err)
return rsp
}
seedSign, err := endec.RsaSign(seed, signPrivkey)
if err != nil {
logger.LOG.Error("rsa sign error: %v", err)
return rsp
}
rsp.KeyId = req.KeyId
rsp.ServerRandKey = base64.StdEncoding.EncodeToString(seedEnc)
rsp.Sign = base64.StdEncoding.EncodeToString(seedSign)
}
return rsp
}
func (k *KcpConnectManager) playerLogin(req *proto.PlayerLoginReq, session *Session) (rsp *proto.PlayerLoginRsp) {
logger.LOG.Debug("player login, info: %v", req.String())
// TODO 验证token
session.connState = ConnAlive
// 返回响应
rsp = new(proto.PlayerLoginRsp)
rsp.IsUseAbilityHash = true
rsp.AbilityHashCode = -228935105
rsp.GameBiz = "hk4e_cn"
rsp.IsScOpen = false
rsp.RegisterCps = "taptap"
rsp.CountryCode = "CN"
rsp.Birthday = "2000-01-01"
rsp.TotalTickTime = 1185941.871788
rsp.ClientDataVersion = k.regionCurr.RegionInfo.ClientDataVersion
rsp.ClientSilenceDataVersion = k.regionCurr.RegionInfo.ClientSilenceDataVersion
rsp.ClientMd5 = k.regionCurr.RegionInfo.ClientDataMd5
rsp.ClientSilenceMd5 = k.regionCurr.RegionInfo.ClientSilenceDataMd5
rsp.ResVersionConfig = k.regionCurr.RegionInfo.ResVersionConfig
rsp.ClientVersionSuffix = k.regionCurr.RegionInfo.ClientVersionSuffix
rsp.ClientSilenceVersionSuffix = k.regionCurr.RegionInfo.ClientSilenceVersionSuffix
return rsp
}

View File

@@ -3,6 +3,11 @@ package net
import (
"bytes"
"encoding/binary"
"hk4e/common/region"
"hk4e/dispatch/controller"
"hk4e/pkg/httpclient"
"hk4e/protocol/cmd"
"hk4e/protocol/proto"
"strconv"
"sync"
"time"
@@ -14,152 +19,158 @@ import (
)
type KcpConnectManager struct {
openState bool
connMap map[uint64]*kcp.UDPSession
connMapLock sync.RWMutex
protoMsgInput chan *ProtoMsg
protoMsgOutput chan *ProtoMsg
kcpEventInput chan *KcpEvent
kcpEventOutput chan *KcpEvent
// 发送协程分发
kcpRawSendChanMap map[uint64]chan *ProtoMsg
kcpRawSendChanMapLock sync.RWMutex
// 收包发包监听标志
kcpRecvListenMap map[uint64]bool
kcpRecvListenMapLock sync.RWMutex
kcpSendListenMap map[uint64]bool
kcpSendListenMapLock sync.RWMutex
// key
dispatchKey []byte
dispatchKeyLock sync.RWMutex
kcpKeyMap map[uint64][]byte
kcpKeyMapLock sync.RWMutex
// conv短时间内唯一生成
convGenMap map[uint64]int64
convGenMapLock sync.RWMutex
openState bool
sessionConvIdMap map[uint64]*Session
sessionUserIdMap map[uint32]*Session
sessionMapLock sync.RWMutex
kcpEventInput chan *KcpEvent
kcpEventOutput chan *KcpEvent
cmdProtoMap *cmd.CmdProtoMap
netMsgInput chan *cmd.NetMsg
netMsgOutput chan *cmd.NetMsg
localMsgOutput chan *ProtoMsg
createSessionChan chan *Session
destroySessionChan chan *Session
// 密钥相关
dispatchKey []byte
regionCurr *proto.QueryCurrRegionHttpRsp
signRsaKey []byte
encRsaKeyMap map[string][]byte
}
func NewKcpConnectManager(protoMsgInput chan *ProtoMsg, protoMsgOutput chan *ProtoMsg,
kcpEventInput chan *KcpEvent, kcpEventOutput chan *KcpEvent) (r *KcpConnectManager) {
func NewKcpConnectManager(netMsgInput chan *cmd.NetMsg, netMsgOutput chan *cmd.NetMsg) (r *KcpConnectManager) {
r = new(KcpConnectManager)
r.openState = true
r.connMap = make(map[uint64]*kcp.UDPSession)
r.protoMsgInput = protoMsgInput
r.protoMsgOutput = protoMsgOutput
r.kcpEventInput = kcpEventInput
r.kcpEventOutput = kcpEventOutput
r.kcpRawSendChanMap = make(map[uint64]chan *ProtoMsg)
r.kcpRecvListenMap = make(map[uint64]bool)
r.kcpSendListenMap = make(map[uint64]bool)
r.kcpKeyMap = make(map[uint64][]byte)
r.convGenMap = make(map[uint64]int64)
r.sessionConvIdMap = make(map[uint64]*Session)
r.sessionUserIdMap = make(map[uint32]*Session)
r.kcpEventInput = make(chan *KcpEvent, 1000)
r.kcpEventOutput = make(chan *KcpEvent, 1000)
r.cmdProtoMap = cmd.NewCmdProtoMap()
r.netMsgInput = netMsgInput
r.netMsgOutput = netMsgOutput
r.localMsgOutput = make(chan *ProtoMsg, 1000)
r.createSessionChan = make(chan *Session, 1000)
r.destroySessionChan = make(chan *Session, 1000)
return r
}
func (k *KcpConnectManager) Start() {
go func() {
// key
k.dispatchKey = make([]byte, 4096)
// kcp
port := strconv.Itoa(int(config.CONF.Hk4e.KcpPort))
listener, err := kcp.ListenWithOptions(config.CONF.Hk4e.KcpAddr+":"+port, nil, 0, 0)
if err != nil {
logger.LOG.Error("listen kcp err: %v", err)
return
} else {
go k.enetHandle(listener)
go k.chanSendHandle()
go k.eventHandle()
for {
conn, err := listener.AcceptKCP()
if err != nil {
logger.LOG.Error("accept kcp err: %v", err)
return
}
if k.openState == false {
_ = conn.Close()
continue
}
conn.SetACKNoDelay(true)
conn.SetWriteDelay(false)
convId := conn.GetConv()
logger.LOG.Debug("client connect, convId: %v", convId)
// 连接建立成功通知
k.kcpEventOutput <- &KcpEvent{
ConvId: convId,
EventId: KcpConnEstNotify,
EventMessage: conn.RemoteAddr().String(),
}
k.connMapLock.Lock()
k.connMap[convId] = conn
k.connMapLock.Unlock()
k.kcpKeyMapLock.Lock()
k.dispatchKeyLock.RLock()
k.kcpKeyMap[convId] = k.dispatchKey
k.dispatchKeyLock.RUnlock()
k.kcpKeyMapLock.Unlock()
go k.recvHandle(convId)
kcpRawSendChan := make(chan *ProtoMsg, 10000)
k.kcpRawSendChanMapLock.Lock()
k.kcpRawSendChanMap[convId] = kcpRawSendChan
k.kcpRawSendChanMapLock.Unlock()
go k.sendHandle(convId, kcpRawSendChan)
go k.rttMonitor(convId)
}
}
}()
go k.clearDeadConv()
// 读取密钥相关文件
k.signRsaKey, k.encRsaKeyMap, _ = region.LoadRsaKey()
// region
regionCurr, _, _ := region.InitRegion(config.CONF.Hk4e.KcpAddr, config.CONF.Hk4e.KcpPort)
k.regionCurr = regionCurr
// key
dispatchEc2bSeedRsp, err := httpclient.Get[controller.DispatchEc2bSeedRsp]("http://127.0.0.1:8080/dispatch/ec2b/seed", "")
if err != nil {
logger.LOG.Error("get dispatch ec2b seed error: %v", err)
return
}
dispatchEc2bSeed, err := strconv.ParseUint(dispatchEc2bSeedRsp.Seed, 10, 64)
if err != nil {
logger.LOG.Error("parse dispatch ec2b seed error: %v", err)
return
}
logger.LOG.Debug("get dispatch ec2b seed: %v", dispatchEc2bSeed)
gateDispatchEc2b := random.NewEc2b()
gateDispatchEc2b.SetSeed(dispatchEc2bSeed)
k.dispatchKey = gateDispatchEc2b.XorKey()
// kcp
port := strconv.Itoa(int(config.CONF.Hk4e.KcpPort))
listener, err := kcp.ListenWithOptions(config.CONF.Hk4e.KcpAddr+":"+port, nil, 0, 0)
if err != nil {
logger.LOG.Error("listen kcp err: %v", err)
return
}
go k.enetHandle(listener)
go k.eventHandle()
go k.sendMsgHandle()
go k.acceptHandle(listener)
}
func (k *KcpConnectManager) clearDeadConv() {
ticker := time.NewTicker(time.Minute)
func (k *KcpConnectManager) acceptHandle(listener *kcp.Listener) {
logger.LOG.Debug("accept handle start")
for {
k.convGenMapLock.Lock()
now := time.Now().UnixNano()
oldConvList := make([]uint64, 0)
for conv, timestamp := range k.convGenMap {
if now-timestamp > int64(time.Hour) {
oldConvList = append(oldConvList, conv)
}
conn, err := listener.AcceptKCP()
if err != nil {
logger.LOG.Error("accept kcp err: %v", err)
return
}
delConvList := make([]uint64, 0)
k.connMapLock.RLock()
for _, conv := range oldConvList {
_, exist := k.connMap[conv]
if !exist {
delConvList = append(delConvList, conv)
delete(k.convGenMap, conv)
}
if k.openState == false {
_ = conn.Close()
continue
}
conn.SetACKNoDelay(true)
conn.SetWriteDelay(false)
convId := conn.GetConv()
logger.LOG.Debug("client connect, convId: %v", convId)
kcpRawSendChan := make(chan *ProtoMsg, 1000)
session := &Session{
conn: conn,
connState: ConnWaitToken,
userId: 0,
headMeta: &ClientHeadMeta{
seq: 0,
},
kcpRawSendChan: kcpRawSendChan,
seed: 0,
xorKey: k.dispatchKey,
changeXorKey: false,
}
go k.recvHandle(session)
go k.sendHandle(session)
// 连接建立成功通知
k.kcpEventOutput <- &KcpEvent{
ConvId: convId,
EventId: KcpConnEstNotify,
EventMessage: conn.RemoteAddr().String(),
}
k.connMapLock.RUnlock()
k.convGenMapLock.Unlock()
logger.LOG.Info("clean dead conv list: %v", delConvList)
<-ticker.C
}
}
func (k *KcpConnectManager) enetHandle(listener *kcp.Listener) {
logger.LOG.Debug("enet handle start")
// conv短时间内唯一生成
convGenMap := make(map[uint64]int64)
for {
enetNotify := <-listener.EnetNotify
logger.LOG.Info("[Enet Notify], addr: %v, conv: %v, conn: %v, enet: %v", enetNotify.Addr, enetNotify.ConvId, enetNotify.ConnType, enetNotify.EnetType)
switch enetNotify.ConnType {
case kcp.ConnEnetSyn:
if enetNotify.EnetType == kcp.EnetClientConnectKey {
// 清理老旧的conv
now := time.Now().UnixNano()
oldConvList := make([]uint64, 0)
for conv, timestamp := range convGenMap {
if now-timestamp > int64(time.Hour) {
oldConvList = append(oldConvList, conv)
}
}
delConvList := make([]uint64, 0)
k.sessionMapLock.RLock()
for _, conv := range oldConvList {
_, exist := k.sessionConvIdMap[conv]
if !exist {
delConvList = append(delConvList, conv)
delete(convGenMap, conv)
}
}
k.sessionMapLock.RUnlock()
logger.LOG.Info("clean dead conv list: %v", delConvList)
// 生成没用过的conv
var conv uint64
k.convGenMapLock.Lock()
for {
convData := random.GetRandomByte(8)
convDataBuffer := bytes.NewBuffer(convData)
_ = binary.Read(convDataBuffer, binary.LittleEndian, &conv)
_, exist := k.convGenMap[conv]
_, exist := convGenMap[conv]
if exist {
continue
} else {
k.convGenMap[conv] = time.Now().UnixNano()
convGenMap[conv] = time.Now().UnixNano()
break
}
}
k.convGenMapLock.Unlock()
listener.SendEnetNotifyToClient(&kcp.Enet{
Addr: enetNotify.Addr,
ConvId: conv,
@@ -169,7 +180,16 @@ func (k *KcpConnectManager) enetHandle(listener *kcp.Listener) {
}
case kcp.ConnEnetEst:
case kcp.ConnEnetFin:
k.closeKcpConn(enetNotify.ConvId, enetNotify.EnetType)
session := k.GetSessionByConvId(enetNotify.ConvId)
if session == nil {
logger.LOG.Error("session not exist, convId: %v", enetNotify.ConvId)
continue
}
session.conn.SendEnetNotify(&kcp.Enet{
ConnType: kcp.ConnEnetFin,
EnetType: enetNotify.EnetType,
})
_ = session.conn.Close()
case kcp.ConnEnetAddrChange:
// 连接地址改变通知
k.kcpEventOutput <- &KcpEvent{
@@ -182,48 +202,52 @@ func (k *KcpConnectManager) enetHandle(listener *kcp.Listener) {
}
}
func (k *KcpConnectManager) chanSendHandle() {
// 分发到每个连接具体的发送协程
for {
protoMsg := <-k.protoMsgInput
k.kcpRawSendChanMapLock.RLock()
kcpRawSendChan := k.kcpRawSendChanMap[protoMsg.ConvId]
k.kcpRawSendChanMapLock.RUnlock()
if kcpRawSendChan != nil {
select {
case kcpRawSendChan <- protoMsg:
default:
logger.LOG.Error("kcpRawSendChan is full, convId: %v", protoMsg.ConvId)
}
} else {
logger.LOG.Error("kcpRawSendChan is nil, convId: %v", protoMsg.ConvId)
}
}
type ClientHeadMeta struct {
seq uint32
}
func (k *KcpConnectManager) recvHandle(convId uint64) {
type Session struct {
conn *kcp.UDPSession
connState uint8
userId uint32
headMeta *ClientHeadMeta
kcpRawSendChan chan *ProtoMsg
seed uint64
xorKey []byte
changeXorKey bool
}
func (k *KcpConnectManager) recvHandle(session *Session) {
logger.LOG.Debug("recv handle start")
// 接收
k.connMapLock.RLock()
conn := k.connMap[convId]
k.connMapLock.RUnlock()
conn := session.conn
convId := conn.GetConv()
pktFreqLimitCounter := 0
pktFreqLimitTimer := time.Now().UnixNano()
protoEnDecode := NewProtoEnDecode()
recvBuf := make([]byte, conn.GetMaxPayloadLen())
for {
_ = conn.SetReadDeadline(time.Now().Add(time.Second * 30))
_ = conn.SetReadDeadline(time.Now().Add(time.Second * 15))
recvLen, err := conn.Read(recvBuf)
if err != nil {
logger.LOG.Error("exit recv loop, conn read err: %v, convId: %v", err, convId)
k.closeKcpConn(convId, kcp.EnetServerKick)
k.closeKcpConn(session, kcp.EnetServerKick)
break
}
if session.changeXorKey {
session.changeXorKey = false
keyBlock := random.NewKeyBlock(session.seed)
xorKey := keyBlock.XorKey()
key := make([]byte, 4096)
copy(key, xorKey[:])
session.xorKey = key
}
// 收包频率限制
pktFreqLimitCounter++
now := time.Now().UnixNano()
if now-pktFreqLimitTimer > int64(time.Second) {
if pktFreqLimitCounter > 1000 {
if pktFreqLimitCounter > 100 {
logger.LOG.Error("exit recv loop, client packet send freq too high, convId: %v, pps: %v", convId, pktFreqLimitCounter)
k.closeKcpConn(convId, kcp.EnetPacketFreqTooHigh)
k.closeKcpConn(session, kcp.EnetPacketFreqTooHigh)
break
} else {
pktFreqLimitCounter = 0
@@ -231,141 +255,132 @@ func (k *KcpConnectManager) recvHandle(convId uint64) {
pktFreqLimitTimer = now
}
recvData := recvBuf[:recvLen]
k.kcpRecvListenMapLock.RLock()
flag := k.kcpRecvListenMap[convId]
k.kcpRecvListenMapLock.RUnlock()
if flag {
// 收包通知
//recvMsg := make([]byte, len(recvData))
//copy(recvMsg, recvData)
k.kcpEventOutput <- &KcpEvent{
ConvId: convId,
EventId: KcpPacketRecvNotify,
EventMessage: recvData,
}
}
kcpMsgList := make([]*KcpMsg, 0)
k.decodeBinToPayload(recvData, convId, &kcpMsgList)
k.decodeBinToPayload(recvData, convId, &kcpMsgList, session.xorKey)
for _, v := range kcpMsgList {
protoMsgList := protoEnDecode.protoDecode(v)
protoMsgList := k.protoDecode(v)
for _, vv := range protoMsgList {
k.protoMsgOutput <- vv
k.recvMsgHandle(vv, session)
}
}
}
}
func (k *KcpConnectManager) sendHandle(convId uint64, kcpRawSendChan chan *ProtoMsg) {
func (k *KcpConnectManager) sendHandle(session *Session) {
logger.LOG.Debug("send handle start")
// 发送
k.connMapLock.RLock()
conn := k.connMap[convId]
k.connMapLock.RUnlock()
protoEnDecode := NewProtoEnDecode()
conn := session.conn
convId := conn.GetConv()
pktFreqLimitCounter := 0
pktFreqLimitTimer := time.Now().UnixNano()
for {
protoMsg, ok := <-kcpRawSendChan
protoMsg, ok := <-session.kcpRawSendChan
if !ok {
logger.LOG.Error("exit send loop, send chan close, convId: %v", convId)
k.closeKcpConn(convId, kcp.EnetServerKick)
k.closeKcpConn(session, kcp.EnetServerKick)
break
}
kcpMsg := protoEnDecode.protoEncode(protoMsg)
kcpMsg := k.protoEncode(protoMsg)
if kcpMsg == nil {
logger.LOG.Error("decode kcp msg is nil, convId: %v", convId)
continue
}
bin := k.encodePayloadToBin(kcpMsg)
_ = conn.SetWriteDeadline(time.Now().Add(time.Second * 10))
bin := k.encodePayloadToBin(kcpMsg, session.xorKey)
_ = conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
_, err := conn.Write(bin)
if err != nil {
logger.LOG.Error("exit send loop, conn write err: %v, convId: %v", err, convId)
k.closeKcpConn(convId, kcp.EnetServerKick)
k.closeKcpConn(session, kcp.EnetServerKick)
break
}
k.kcpSendListenMapLock.RLock()
flag := k.kcpSendListenMap[convId]
k.kcpSendListenMapLock.RUnlock()
if flag {
// 发包通知
k.kcpEventOutput <- &KcpEvent{
ConvId: convId,
EventId: KcpPacketSendNotify,
EventMessage: bin,
}
}
}
}
func (k *KcpConnectManager) rttMonitor(convId uint64) {
ticker := time.NewTicker(time.Second * 10)
for {
select {
case <-ticker.C:
k.connMapLock.RLock()
conn := k.connMap[convId]
k.connMapLock.RUnlock()
if conn == nil {
// 发包频率限制
pktFreqLimitCounter++
now := time.Now().UnixNano()
if now-pktFreqLimitTimer > int64(time.Second) {
if pktFreqLimitCounter > 100 {
logger.LOG.Error("exit send loop, server packet send freq too high, convId: %v, pps: %v", convId, pktFreqLimitCounter)
k.closeKcpConn(session, kcp.EnetPacketFreqTooHigh)
break
} else {
pktFreqLimitCounter = 0
}
logger.LOG.Debug("convId: %v, RTO: %v, SRTT: %v, RTTVar: %v", convId, conn.GetRTO(), conn.GetSRTT(), conn.GetSRTTVar())
k.kcpEventOutput <- &KcpEvent{
ConvId: convId,
EventId: KcpConnRttNotify,
EventMessage: conn.GetSRTT(),
}
pktFreqLimitTimer = now
}
}
}
func (k *KcpConnectManager) closeKcpConn(convId uint64, enetType uint32) {
k.connMapLock.RLock()
conn, exist := k.connMap[convId]
k.connMapLock.RUnlock()
if !exist {
func (k *KcpConnectManager) closeKcpConn(session *Session, enetType uint32) {
if session.connState == ConnClose {
return
}
// 获取待关闭的发送管道
k.kcpRawSendChanMapLock.RLock()
kcpRawSendChan := k.kcpRawSendChanMap[convId]
k.kcpRawSendChanMapLock.RUnlock()
session.connState = ConnClose
conn := session.conn
convId := conn.GetConv()
// 清理数据
k.connMapLock.Lock()
delete(k.connMap, convId)
k.connMapLock.Unlock()
k.kcpRawSendChanMapLock.Lock()
delete(k.kcpRawSendChanMap, convId)
k.kcpRawSendChanMapLock.Unlock()
k.kcpRecvListenMapLock.Lock()
delete(k.kcpRecvListenMap, convId)
k.kcpRecvListenMapLock.Unlock()
k.kcpSendListenMapLock.Lock()
delete(k.kcpSendListenMap, convId)
k.kcpSendListenMapLock.Unlock()
k.kcpKeyMapLock.Lock()
delete(k.kcpKeyMap, convId)
k.kcpKeyMapLock.Unlock()
k.DeleteSession(session.conn.GetConv(), session.userId)
// 关闭连接
conn.SendEnetNotify(&kcp.Enet{
ConnType: kcp.ConnEnetFin,
EnetType: enetType,
})
_ = conn.Close()
// 关闭发送管道
close(kcpRawSendChan)
err := conn.Close()
if err == nil {
conn.SendEnetNotify(&kcp.Enet{
ConnType: kcp.ConnEnetFin,
EnetType: enetType,
})
}
// 连接关闭通知
k.kcpEventOutput <- &KcpEvent{
ConvId: convId,
EventId: KcpConnCloseNotify,
}
// 通知GS玩家下线
netMsg := new(cmd.NetMsg)
netMsg.UserId = session.userId
netMsg.EventId = cmd.UserOfflineNotify
k.netMsgInput <- netMsg
logger.LOG.Info("send to gs user offline, ConvId: %v, UserId: %v", convId, netMsg.UserId)
k.destroySessionChan <- session
}
func (k *KcpConnectManager) closeAllKcpConn() {
closeConnList := make([]*kcp.UDPSession, 0)
k.connMapLock.RLock()
for _, v := range k.connMap {
closeConnList = append(closeConnList, v)
k.sessionMapLock.RLock()
for _, v := range k.sessionConvIdMap {
closeConnList = append(closeConnList, v.conn)
}
k.connMapLock.RUnlock()
k.sessionMapLock.RUnlock()
for _, v := range closeConnList {
k.closeKcpConn(v.GetConv(), kcp.EnetServerShutdown)
// 关闭连接
v.SendEnetNotify(&kcp.Enet{
ConnType: kcp.ConnEnetFin,
EnetType: kcp.EnetServerShutdown,
})
_ = v.Close()
}
}
func (k *KcpConnectManager) GetSessionByConvId(convId uint64) *Session {
k.sessionMapLock.RLock()
session, _ := k.sessionConvIdMap[convId]
k.sessionMapLock.RUnlock()
return session
}
func (k *KcpConnectManager) GetSessionByUserId(userId uint32) *Session {
k.sessionMapLock.RLock()
session, _ := k.sessionUserIdMap[userId]
k.sessionMapLock.RUnlock()
return session
}
func (k *KcpConnectManager) SetSession(session *Session, convId uint64, userId uint32) {
k.sessionMapLock.Lock()
k.sessionConvIdMap[convId] = session
k.sessionUserIdMap[userId] = session
k.sessionMapLock.Unlock()
}
func (k *KcpConnectManager) DeleteSession(convId uint64, userId uint32) {
k.sessionMapLock.RLock()
delete(k.sessionConvIdMap, convId)
delete(k.sessionUserIdMap, userId)
k.sessionMapLock.RUnlock()
}

View File

@@ -33,26 +33,19 @@ type KcpMsg struct {
ProtoData []byte
}
func (k *KcpConnectManager) decodeBinToPayload(data []byte, convId uint64, kcpMsgList *[]*KcpMsg) {
func (k *KcpConnectManager) decodeBinToPayload(data []byte, convId uint64, kcpMsgList *[]*KcpMsg, xorKey []byte) {
// xor解密
k.kcpKeyMapLock.RLock()
xorKey, exist := k.kcpKeyMap[convId]
k.kcpKeyMapLock.RUnlock()
if !exist {
logger.LOG.Error("kcp xor key not exist, convId: %v", convId)
return
}
endec.Xor(data, xorKey)
k.decodeRecur(data, convId, kcpMsgList)
k.decodeLoop(data, convId, kcpMsgList)
}
func (k *KcpConnectManager) decodeRecur(data []byte, convId uint64, kcpMsgList *[]*KcpMsg) {
func (k *KcpConnectManager) decodeLoop(data []byte, convId uint64, kcpMsgList *[]*KcpMsg) {
// 长度太短
if len(data) < 12 {
logger.LOG.Debug("packet len less 12 byte")
return
}
// 头部标志错误
// 头部幻数错误
if data[0] != 0x45 || data[1] != 0x67 {
logger.LOG.Error("packet head magic 0x4567 error")
return
@@ -97,7 +90,7 @@ func (k *KcpConnectManager) decodeRecur(data []byte, convId uint64, kcpMsgList *
logger.LOG.Error("packet len error")
return
}
// 尾部标志错误
// 尾部幻数错误
if data[headLen+protoLen+10] != 0x89 || data[headLen+protoLen+11] != 0xAB {
logger.LOG.Error("packet tail magic 0x89AB error")
return
@@ -124,11 +117,11 @@ func (k *KcpConnectManager) decodeRecur(data []byte, convId uint64, kcpMsgList *
*kcpMsgList = append(*kcpMsgList, kcpMsg)
// 递归解析
if haveMoreData {
k.decodeRecur(data[int(headLen+protoLen)+12:], convId, kcpMsgList)
k.decodeLoop(data[int(headLen+protoLen)+12:], convId, kcpMsgList)
}
}
func (k *KcpConnectManager) encodePayloadToBin(kcpMsg *KcpMsg) (bin []byte) {
func (k *KcpConnectManager) encodePayloadToBin(kcpMsg *KcpMsg, xorKey []byte) (bin []byte) {
if kcpMsg.HeadData == nil {
kcpMsg.HeadData = make([]byte, 0)
}
@@ -136,7 +129,7 @@ func (k *KcpConnectManager) encodePayloadToBin(kcpMsg *KcpMsg) (bin []byte) {
kcpMsg.ProtoData = make([]byte, 0)
}
bin = make([]byte, len(kcpMsg.HeadData)+len(kcpMsg.ProtoData)+12)
// 头部标志
// 头部幻数
bin[0] = 0x45
bin[1] = 0x67
// 协议号
@@ -172,17 +165,10 @@ func (k *KcpConnectManager) encodePayloadToBin(kcpMsg *KcpMsg) (bin []byte) {
copy(bin[10:], kcpMsg.HeadData)
// proto数据
copy(bin[10+len(kcpMsg.HeadData):], kcpMsg.ProtoData)
// 尾部标志
// 尾部幻数
bin[len(bin)-2] = 0x89
bin[len(bin)-1] = 0xAB
// xor加密
k.kcpKeyMapLock.RLock()
xorKey, exist := k.kcpKeyMap[kcpMsg.ConvId]
k.kcpKeyMapLock.RUnlock()
if !exist {
logger.LOG.Error("kcp xor key not exist, convId: %v", kcpMsg.ConvId)
return
}
endec.Xor(bin, xorKey)
return bin
}

View File

@@ -1,23 +1,17 @@
package net
import (
"hk4e/gate/kcp"
"hk4e/pkg/logger"
"reflect"
)
const (
KcpXorKeyChange = iota
KcpDispatchKeyChange
KcpPacketRecvListen
KcpPacketSendListen
KcpConnForceClose
KcpConnForceClose = iota
KcpAllConnForceClose
KcpGateOpenState
KcpPacketRecvNotify
KcpPacketSendNotify
KcpConnCloseNotify
KcpConnEstNotify
KcpConnRttNotify
KcpConnAddrChangeNotify
)
@@ -27,92 +21,26 @@ type KcpEvent struct {
EventMessage any
}
func (k *KcpConnectManager) GetKcpEventInputChan() chan *KcpEvent {
return k.kcpEventInput
}
func (k *KcpConnectManager) GetKcpEventOutputChan() chan *KcpEvent {
return k.kcpEventOutput
}
func (k *KcpConnectManager) eventHandle() {
logger.LOG.Debug("event handle start")
// 事件处理
for {
event := <-k.kcpEventInput
logger.LOG.Info("kcp manager recv event, ConvId: %v, EventId: %v, EventMessage Type: %v", event.ConvId, event.EventId, reflect.TypeOf(event.EventMessage))
switch event.EventId {
case KcpXorKeyChange:
// XOR密钥切换
k.connMapLock.RLock()
_, exist := k.connMap[event.ConvId]
k.connMapLock.RUnlock()
if !exist {
logger.LOG.Error("conn not exist, convId: %v", event.ConvId)
continue
}
key, ok := event.EventMessage.([]byte)
if !ok {
logger.LOG.Error("event KcpXorKeyChange msg type error")
continue
}
k.kcpKeyMapLock.Lock()
k.kcpKeyMap[event.ConvId] = key
k.kcpKeyMapLock.Unlock()
case KcpDispatchKeyChange:
// 首包加密XOR密钥切换
key, ok := event.EventMessage.([]byte)
if !ok {
logger.LOG.Error("event KcpXorKeyChange msg type error")
continue
}
k.dispatchKeyLock.Lock()
k.dispatchKey = key
k.dispatchKeyLock.Unlock()
case KcpPacketRecvListen:
// 收包监听
k.connMapLock.RLock()
_, exist := k.connMap[event.ConvId]
k.connMapLock.RUnlock()
if !exist {
logger.LOG.Error("conn not exist, convId: %v", event.ConvId)
continue
}
flag, ok := event.EventMessage.(string)
if !ok {
logger.LOG.Error("event KcpXorKeyChange msg type error")
continue
}
if flag == "Enable" {
k.kcpRecvListenMapLock.Lock()
k.kcpRecvListenMap[event.ConvId] = true
k.kcpRecvListenMapLock.Unlock()
} else if flag == "Disable" {
k.kcpRecvListenMapLock.Lock()
k.kcpRecvListenMap[event.ConvId] = false
k.kcpRecvListenMapLock.Unlock()
}
case KcpPacketSendListen:
// 发包监听
k.connMapLock.RLock()
_, exist := k.connMap[event.ConvId]
k.connMapLock.RUnlock()
if !exist {
logger.LOG.Error("conn not exist, convId: %v", event.ConvId)
continue
}
flag, ok := event.EventMessage.(string)
if !ok {
logger.LOG.Error("event KcpXorKeyChange msg type error")
continue
}
if flag == "Enable" {
k.kcpSendListenMapLock.Lock()
k.kcpSendListenMap[event.ConvId] = true
k.kcpSendListenMapLock.Unlock()
} else if flag == "Disable" {
k.kcpSendListenMapLock.Lock()
k.kcpSendListenMap[event.ConvId] = false
k.kcpSendListenMapLock.Unlock()
}
case KcpConnForceClose:
// 强制关闭某个连接
k.connMapLock.RLock()
_, exist := k.connMap[event.ConvId]
k.connMapLock.RUnlock()
if !exist {
logger.LOG.Error("conn not exist, convId: %v", event.ConvId)
session := k.GetSessionByConvId(event.ConvId)
if session == nil {
logger.LOG.Error("session not exist, convId: %v", event.ConvId)
continue
}
reason, ok := event.EventMessage.(uint32)
@@ -120,7 +48,11 @@ func (k *KcpConnectManager) eventHandle() {
logger.LOG.Error("event KcpConnForceClose msg type error")
continue
}
k.closeKcpConn(event.ConvId, reason)
session.conn.SendEnetNotify(&kcp.Enet{
ConnType: kcp.ConnEnetFin,
EnetType: reason,
})
_ = session.conn.Close()
logger.LOG.Info("conn has been force close, convId: %v", event.ConvId)
case KcpAllConnForceClose:
// 强制关闭所有连接

View File

@@ -8,16 +8,6 @@ import (
pb "google.golang.org/protobuf/proto"
)
type ProtoEnDecode struct {
cmdProtoMap *cmd.CmdProtoMap
}
func NewProtoEnDecode() (r *ProtoEnDecode) {
r = new(ProtoEnDecode)
r.cmdProtoMap = cmd.NewCmdProtoMap()
return r
}
type ProtoMsg struct {
ConvId uint64
CmdId uint16
@@ -30,7 +20,7 @@ type ProtoMessage struct {
message pb.Message
}
func (p *ProtoEnDecode) protoDecode(kcpMsg *KcpMsg) (protoMsgList []*ProtoMsg) {
func (k *KcpConnectManager) protoDecode(kcpMsg *KcpMsg) (protoMsgList []*ProtoMsg) {
protoMsgList = make([]*ProtoMsg, 0)
protoMsg := new(ProtoMsg)
protoMsg.ConvId = kcpMsg.ConvId
@@ -49,7 +39,7 @@ func (p *ProtoEnDecode) protoDecode(kcpMsg *KcpMsg) (protoMsgList []*ProtoMsg) {
}
// payload msg
protoMessageList := make([]*ProtoMessage, 0)
p.protoDecodePayloadLoop(kcpMsg.CmdId, kcpMsg.ProtoData, &protoMessageList)
k.protoDecodePayloadLoop(kcpMsg.CmdId, kcpMsg.ProtoData, &protoMessageList)
if len(protoMessageList) == 0 {
logger.LOG.Error("decode proto object is nil")
return protoMsgList
@@ -82,8 +72,8 @@ func (p *ProtoEnDecode) protoDecode(kcpMsg *KcpMsg) (protoMsgList []*ProtoMsg) {
return protoMsgList
}
func (p *ProtoEnDecode) protoDecodePayloadLoop(cmdId uint16, protoData []byte, protoMessageList *[]*ProtoMessage) {
protoObj := p.decodePayloadToProto(cmdId, protoData)
func (k *KcpConnectManager) protoDecodePayloadLoop(cmdId uint16, protoData []byte, protoMessageList *[]*ProtoMessage) {
protoObj := k.decodePayloadToProto(cmdId, protoData)
if protoObj == nil {
logger.LOG.Error("decode proto object is nil")
return
@@ -96,7 +86,7 @@ func (p *ProtoEnDecode) protoDecodePayloadLoop(cmdId uint16, protoData []byte, p
return
}
for _, unionCmd := range unionCmdNotify.GetCmdList() {
p.protoDecodePayloadLoop(uint16(unionCmd.MessageId), unionCmd.Body, protoMessageList)
k.protoDecodePayloadLoop(uint16(unionCmd.MessageId), unionCmd.Body, protoMessageList)
}
}
*protoMessageList = append(*protoMessageList, &ProtoMessage{
@@ -105,7 +95,7 @@ func (p *ProtoEnDecode) protoDecodePayloadLoop(cmdId uint16, protoData []byte, p
})
}
func (p *ProtoEnDecode) protoEncode(protoMsg *ProtoMsg) (kcpMsg *KcpMsg) {
func (k *KcpConnectManager) protoEncode(protoMsg *ProtoMsg) (kcpMsg *KcpMsg) {
cmdName := ""
if protoMsg.PayloadMessage != nil {
cmdName = string(protoMsg.PayloadMessage.ProtoReflect().Descriptor().FullName())
@@ -127,7 +117,7 @@ func (p *ProtoEnDecode) protoEncode(protoMsg *ProtoMsg) (kcpMsg *KcpMsg) {
}
// payload msg
if protoMsg.PayloadMessage != nil {
cmdId, protoData := p.encodeProtoToPayload(protoMsg.PayloadMessage)
cmdId, protoData := k.encodeProtoToPayload(protoMsg.PayloadMessage)
if cmdId == 0 || protoData == nil {
logger.LOG.Error("encode proto data is nil")
return nil
@@ -143,8 +133,8 @@ func (p *ProtoEnDecode) protoEncode(protoMsg *ProtoMsg) (kcpMsg *KcpMsg) {
return kcpMsg
}
func (p *ProtoEnDecode) decodePayloadToProto(cmdId uint16, protoData []byte) (protoObj pb.Message) {
protoObj = p.cmdProtoMap.GetProtoObjByCmdId(cmdId)
func (k *KcpConnectManager) decodePayloadToProto(cmdId uint16, protoData []byte) (protoObj pb.Message) {
protoObj = k.cmdProtoMap.GetProtoObjByCmdId(cmdId)
if protoObj == nil {
logger.LOG.Error("get new proto object is nil")
return nil
@@ -157,8 +147,8 @@ func (p *ProtoEnDecode) decodePayloadToProto(cmdId uint16, protoData []byte) (pr
return protoObj
}
func (p *ProtoEnDecode) encodeProtoToPayload(protoObj pb.Message) (cmdId uint16, protoData []byte) {
cmdId = p.cmdProtoMap.GetCmdIdByProtoObj(protoObj)
func (k *KcpConnectManager) encodeProtoToPayload(protoObj pb.Message) (cmdId uint16, protoData []byte) {
cmdId = k.cmdProtoMap.GetCmdIdByProtoObj(protoObj)
var err error = nil
protoData, err = pb.Marshal(protoObj)
if err != nil {