Files
hk4e/gate/net/kcp_connect_manager.go
UnKownOwO baea3b1334 补充
2023-06-01 00:10:09 +08:00

514 lines
15 KiB
Go

package net
import (
"bytes"
"context"
"encoding/binary"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"hk4e/common/config"
"hk4e/common/mq"
"hk4e/common/region"
"hk4e/common/rpc"
"hk4e/gate/client_proto"
"hk4e/gate/kcp"
"hk4e/node/api"
"hk4e/pkg/logger"
"hk4e/pkg/random"
"hk4e/protocol/cmd"
)
const (
ConnSynPacketFreqLimit = 100 // 连接建立握手包每秒发包频率限制
RecvPacketFreqLimit = 1000 // 客户端上行每秒发包频率限制
SendPacketFreqLimit = 1000 // 服务器下行每秒发包频率限制
PacketMaxLen = 343 * 1024 // 最大应用层包长度
ConnRecvTimeout = 30 // 收包超时时间 秒
ConnSendTimeout = 10 // 发包超时时间 秒
MaxClientConnNumLimit = 1000 // 最大客户端连接数限制
)
var CLIENT_CONN_NUM int32 = 0 // 当前客户端连接数
type KcpConnectManager struct {
discovery *rpc.DiscoveryClient // node服务器客户端
openState bool // 网关开放状态
// 会话
sessionConvIdMap map[uint64]*Session
sessionUserIdMap map[uint32]*Session
sessionMapLock sync.RWMutex
createSessionChan chan *Session
destroySessionChan chan *Session
globalGsOnlineMap map[uint32]string
globalGsOnlineMapLock sync.RWMutex
// 连接事件
kcpEventInput chan *KcpEvent
kcpEventOutput chan *KcpEvent
reLoginRemoteKickRegChan chan *RemoteKick
// 协议
serverCmdProtoMap *cmd.CmdProtoMap
clientCmdProtoMap *client_proto.ClientCmdProtoMap
// 输入输出管道
messageQueue *mq.MessageQueue
// 密钥
dispatchKey []byte
signRsaKey []byte
encRsaKeyMap map[string][]byte
}
func NewKcpConnectManager(messageQueue *mq.MessageQueue, discovery *rpc.DiscoveryClient) (r *KcpConnectManager) {
r = new(KcpConnectManager)
r.discovery = discovery
r.openState = true
r.sessionConvIdMap = make(map[uint64]*Session)
r.sessionUserIdMap = make(map[uint32]*Session)
r.createSessionChan = make(chan *Session, 1000)
r.destroySessionChan = make(chan *Session, 1000)
r.globalGsOnlineMap = make(map[uint32]string)
r.kcpEventInput = make(chan *KcpEvent, 1000)
r.kcpEventOutput = make(chan *KcpEvent, 1000)
r.reLoginRemoteKickRegChan = make(chan *RemoteKick, 1000)
r.serverCmdProtoMap = cmd.NewCmdProtoMap()
if config.GetConfig().Hk4e.ClientProtoProxyEnable {
r.clientCmdProtoMap = client_proto.NewClientCmdProtoMap()
}
r.messageQueue = messageQueue
r.run()
return r
}
func (k *KcpConnectManager) run() {
// 读取密钥相关文件
k.signRsaKey, k.encRsaKeyMap, _ = region.LoadRsaKey()
// key
rsp, err := k.discovery.GetRegionEc2B(context.TODO(), &api.NullMsg{})
if err != nil {
logger.Error("get region ec2b error: %v", err)
return
}
ec2b, err := random.LoadEc2bKey(rsp.Data)
if err != nil {
logger.Error("parse region ec2b error: %v", err)
return
}
regionEc2b := random.NewEc2b()
regionEc2b.SetSeed(ec2b.Seed())
// 3.7的时候修改了xor 首包无需使用加密密钥
if k.getGateMaxVersion() < 3.7 {
k.dispatchKey = regionEc2b.XorKey()
} else {
// 全部填充为0 不然会出问题
k.dispatchKey = make([]byte, 4096)
}
// kcp
port := strconv.Itoa(int(config.GetConfig().Hk4e.KcpPort))
listener, err := kcp.ListenWithOptions("0.0.0.0:" + port)
if err != nil {
logger.Error("listen kcp err: %v", err)
return
}
go k.enetHandle(listener)
go k.eventHandle()
go k.sendMsgHandle()
go k.acceptHandle(listener)
go k.gateNetInfo()
k.syncGlobalGsOnlineMap()
go k.autoSyncGlobalGsOnlineMap()
}
func (k *KcpConnectManager) Close() {
k.closeAllKcpConn()
}
// getGateMaxVersion 获取gate最大可兼容的版本
func (k *KcpConnectManager) getGateMaxVersion() (maxVersion float64) {
versionSplit := strings.Split(config.GetConfig().Hk4e.Version, ",")
for _, verStr := range versionSplit {
version, err := strconv.ParseFloat(verStr, 64)
if err != nil {
logger.Error("version a to i error: %v", err)
return
}
if version > maxVersion {
maxVersion = version
}
}
return
}
func (k *KcpConnectManager) gateNetInfo() {
ticker := time.NewTicker(time.Second * 60)
kcpErrorCount := uint64(0)
for {
<-ticker.C
snmp := kcp.DefaultSnmp.Copy()
kcpErrorCount += snmp.KCPInErrors
logger.Info("kcp send: %v B/s, kcp recv: %v B/s", snmp.BytesSent/60, snmp.BytesReceived/60)
logger.Info("udp send: %v B/s, udp recv: %v B/s", snmp.OutBytes/60, snmp.InBytes/60)
logger.Info("udp send: %v pps, udp recv: %v pps", snmp.OutPkts/60, snmp.InPkts/60)
clientConnNum := atomic.LoadInt32(&CLIENT_CONN_NUM)
logger.Info("conn num: %v, new conn num: %v, kcp error num: %v", clientConnNum, snmp.CurrEstab, kcpErrorCount)
kcp.DefaultSnmp.Reset()
}
}
// 接收并创建新连接处理函数
func (k *KcpConnectManager) acceptHandle(listener *kcp.Listener) {
logger.Info("accept handle start")
for {
conn, err := listener.AcceptKCP()
if err != nil {
logger.Error("accept kcp err: %v", err)
return
}
convId := conn.GetConv()
if k.openState == false {
logger.Error("gate not open, convId: %v", convId)
_ = conn.Close()
continue
}
conn.SetACKNoDelay(true)
conn.SetWriteDelay(false)
conn.SetWindowSize(255, 255)
atomic.AddInt32(&CLIENT_CONN_NUM, 1)
logger.Info("client connect, convId: %v", convId)
kcpRawSendChan := make(chan *ProtoMsg, 1000)
session := &Session{
conn: conn,
connState: ConnEst,
userId: 0,
kcpRawSendChan: kcpRawSendChan,
seed: 0,
xorKey: k.dispatchKey,
changeXorKeyFin: false,
gsServerAppId: "",
anticheatServerAppId: "",
pathfindingServerAppId: "",
useMagicSeed: false,
}
go k.recvHandle(session)
go k.sendHandle(session)
// 连接建立成功通知
k.kcpEventOutput <- &KcpEvent{
ConvId: convId,
EventId: KcpConnEstNotify,
EventMessage: conn.RemoteAddr().String(),
}
}
}
// 连接事件处理函数
func (k *KcpConnectManager) enetHandle(listener *kcp.Listener) {
logger.Info("enet handle start")
// conv短时间内唯一生成
convGenMap := make(map[uint64]int64)
pktFreqLimitCounter := 0
pktFreqLimitTimer := time.Now().UnixNano()
for {
enetNotify := <-listener.EnetNotify
logger.Info("[Enet Notify], addr: %v, conv: %v, conn: %v, enet: %v", enetNotify.Addr, enetNotify.ConvId, enetNotify.ConnType, enetNotify.EnetType)
switch enetNotify.ConnType {
case kcp.ConnEnetSyn:
// 连接建立握手包频率限制
pktFreqLimitCounter++
if pktFreqLimitCounter > ConnSynPacketFreqLimit {
now := time.Now().UnixNano()
if now-pktFreqLimitTimer > int64(time.Second) {
pktFreqLimitCounter = 0
pktFreqLimitTimer = now
} else {
continue
}
}
if enetNotify.EnetType != kcp.EnetClientConnectKey {
continue
}
// 清理老旧的conv
now := time.Now().UnixNano()
oldConvList := make([]uint64, 0)
for conv, timestamp := range convGenMap {
if now-timestamp > int64(time.Hour) {
oldConvList = append(oldConvList, conv)
}
}
delConvList := make([]uint64, 0)
k.sessionMapLock.RLock()
for _, conv := range oldConvList {
_, exist := k.sessionConvIdMap[conv]
if !exist {
delConvList = append(delConvList, conv)
delete(convGenMap, conv)
}
}
k.sessionMapLock.RUnlock()
logger.Info("clean dead conv list: %v", delConvList)
// 生成没用过的conv
var conv uint64
for {
convData := random.GetRandomByte(8)
convDataBuffer := bytes.NewBuffer(convData)
_ = binary.Read(convDataBuffer, binary.LittleEndian, &conv)
_, exist := convGenMap[conv]
if exist {
continue
} else {
convGenMap[conv] = time.Now().UnixNano()
break
}
}
listener.SendEnetNotifyToPeer(&kcp.Enet{
Addr: enetNotify.Addr,
ConvId: conv,
ConnType: kcp.ConnEnetEst,
EnetType: enetNotify.EnetType,
})
case kcp.ConnEnetEst:
case kcp.ConnEnetFin:
session := k.GetSessionByConvId(enetNotify.ConvId)
if session == nil {
logger.Error("session not exist, conv: %v", enetNotify.ConvId)
continue
}
session.conn.SendEnetNotifyToPeer(&kcp.Enet{
ConnType: kcp.ConnEnetFin,
EnetType: enetNotify.EnetType,
})
_ = session.conn.Close()
case kcp.ConnEnetAddrChange:
// 连接地址改变通知
k.kcpEventOutput <- &KcpEvent{
ConvId: enetNotify.ConvId,
EventId: KcpConnAddrChangeNotify,
EventMessage: enetNotify.Addr,
}
default:
}
}
}
// Session 连接会话结构 只允许定义并发安全或者简单的基础数据结构
type Session struct {
conn *kcp.UDPSession
connState uint8
userId uint32
kcpRawSendChan chan *ProtoMsg
seed uint64
xorKey []byte
changeXorKeyFin bool
gsServerAppId string
anticheatServerAppId string
pathfindingServerAppId string
useMagicSeed bool
}
// 接收
func (k *KcpConnectManager) recvHandle(session *Session) {
logger.Info("recv handle start")
conn := session.conn
convId := conn.GetConv()
recvBuf := make([]byte, PacketMaxLen)
pktFreqLimitCounter := 0
pktFreqLimitTimer := time.Now().UnixNano()
for {
_ = conn.SetReadDeadline(time.Now().Add(time.Second * ConnRecvTimeout))
recvLen, err := conn.Read(recvBuf)
if err != nil {
logger.Error("exit recv loop, conn read err: %v, convId: %v", err, convId)
k.closeKcpConn(session, kcp.EnetServerKick)
break
}
// 收包频率限制
pktFreqLimitCounter++
if pktFreqLimitCounter > RecvPacketFreqLimit {
now := time.Now().UnixNano()
if now-pktFreqLimitTimer > int64(time.Second) {
pktFreqLimitCounter = 0
pktFreqLimitTimer = now
} else {
logger.Error("exit recv loop, client packet send freq too high, convId: %v, pps: %v", convId, pktFreqLimitCounter)
k.closeKcpConn(session, kcp.EnetPacketFreqTooHigh)
break
}
}
recvData := recvBuf[:recvLen]
kcpMsgList := make([]*KcpMsg, 0)
DecodeBinToPayload(recvData, convId, &kcpMsgList, session.xorKey)
for _, v := range kcpMsgList {
protoMsgList := ProtoDecode(v, k.serverCmdProtoMap, k.clientCmdProtoMap)
for _, vv := range protoMsgList {
k.recvMsgHandle(vv, session)
}
}
}
}
// 发送
func (k *KcpConnectManager) sendHandle(session *Session) {
logger.Info("send handle start")
conn := session.conn
convId := conn.GetConv()
pktFreqLimitCounter := 0
pktFreqLimitTimer := time.Now().UnixNano()
for {
protoMsg, ok := <-session.kcpRawSendChan
if !ok {
logger.Error("exit send loop, send chan close, convId: %v", convId)
k.closeKcpConn(session, kcp.EnetServerKick)
break
}
kcpMsg := ProtoEncode(protoMsg, k.serverCmdProtoMap, k.clientCmdProtoMap)
if kcpMsg == nil {
logger.Error("decode kcp msg is nil, convId: %v", convId)
continue
}
bin := EncodePayloadToBin(kcpMsg, session.xorKey)
_ = conn.SetWriteDeadline(time.Now().Add(time.Second * ConnSendTimeout))
_, err := conn.Write(bin)
if err != nil {
logger.Error("exit send loop, conn write err: %v, convId: %v", err, convId)
k.closeKcpConn(session, kcp.EnetServerKick)
break
}
// 发包频率限制
pktFreqLimitCounter++
if pktFreqLimitCounter > SendPacketFreqLimit {
now := time.Now().UnixNano()
if now-pktFreqLimitTimer > int64(time.Second) {
pktFreqLimitCounter = 0
pktFreqLimitTimer = now
} else {
logger.Error("exit send loop, server packet send freq too high, convId: %v, pps: %v", convId, pktFreqLimitCounter)
k.closeKcpConn(session, kcp.EnetPacketFreqTooHigh)
break
}
}
if session.changeXorKeyFin == false && protoMsg.CmdId == cmd.GetPlayerTokenRsp {
// XOR密钥切换
logger.Info("change session xor key, convId: %v", convId)
session.changeXorKeyFin = true
keyBlock := random.NewKeyBlock(session.seed, session.useMagicSeed)
xorKey := keyBlock.XorKey()
key := make([]byte, 4096)
copy(key, xorKey[:])
session.xorKey = key
}
}
}
// 强制关闭指定连接
func (k *KcpConnectManager) forceCloseKcpConn(convId uint64, reason uint32) {
session := k.GetSessionByConvId(convId)
if session == nil {
logger.Error("session not exist, convId: %v", convId)
return
}
k.closeKcpConn(session, reason)
logger.Info("conn has been force close, convId: %v", convId)
}
// 关闭指定连接
func (k *KcpConnectManager) closeKcpConn(session *Session, enetType uint32) {
if session.connState == ConnClose {
return
}
session.connState = ConnClose
conn := session.conn
convId := conn.GetConv()
// 清理数据
k.DeleteSession(session.conn.GetConv(), session.userId)
// 关闭连接
err := conn.Close()
if err == nil {
conn.SendEnetNotifyToPeer(&kcp.Enet{
ConnType: kcp.ConnEnetFin,
EnetType: enetType,
})
}
// 连接关闭通知
k.kcpEventOutput <- &KcpEvent{
ConvId: convId,
EventId: KcpConnCloseNotify,
}
// 通知GS玩家下线
connCtrlMsg := new(mq.ConnCtrlMsg)
connCtrlMsg.UserId = session.userId
k.messageQueue.SendToGs(session.gsServerAppId, &mq.NetMsg{
MsgType: mq.MsgTypeConnCtrl,
EventId: mq.UserOfflineNotify,
ConnCtrlMsg: connCtrlMsg,
})
logger.Info("send to gs user offline, ConvId: %v, UserId: %v", convId, connCtrlMsg.UserId)
k.destroySessionChan <- session
atomic.AddInt32(&CLIENT_CONN_NUM, -1)
}
// 关闭所有连接
func (k *KcpConnectManager) closeAllKcpConn() {
sessionList := make([]*Session, 0)
k.sessionMapLock.RLock()
for _, session := range k.sessionConvIdMap {
sessionList = append(sessionList, session)
}
k.sessionMapLock.RUnlock()
for _, session := range sessionList {
k.closeKcpConn(session, kcp.EnetServerShutdown)
}
}
func (k *KcpConnectManager) GetSessionByConvId(convId uint64) *Session {
k.sessionMapLock.RLock()
session, _ := k.sessionConvIdMap[convId]
k.sessionMapLock.RUnlock()
return session
}
func (k *KcpConnectManager) GetSessionByUserId(userId uint32) *Session {
k.sessionMapLock.RLock()
session, _ := k.sessionUserIdMap[userId]
k.sessionMapLock.RUnlock()
return session
}
func (k *KcpConnectManager) SetSession(session *Session, convId uint64, userId uint32) {
k.sessionMapLock.Lock()
k.sessionConvIdMap[convId] = session
k.sessionUserIdMap[userId] = session
k.sessionMapLock.Unlock()
}
func (k *KcpConnectManager) DeleteSession(convId uint64, userId uint32) {
k.sessionMapLock.Lock()
delete(k.sessionConvIdMap, convId)
delete(k.sessionUserIdMap, userId)
k.sessionMapLock.Unlock()
}
func (k *KcpConnectManager) autoSyncGlobalGsOnlineMap() {
ticker := time.NewTicker(time.Second * 60)
for {
<-ticker.C
k.syncGlobalGsOnlineMap()
}
}
func (k *KcpConnectManager) syncGlobalGsOnlineMap() {
rsp, err := k.discovery.GetGlobalGsOnlineMap(context.TODO(), nil)
if err != nil {
logger.Error("get global gs online map error: %v", err)
return
}
copyMap := make(map[uint32]string)
for k, v := range rsp.GlobalGsOnlineMap {
copyMap[k] = v
}
copyMapLen := len(copyMap)
k.globalGsOnlineMapLock.Lock()
k.globalGsOnlineMap = copyMap
k.globalGsOnlineMapLock.Unlock()
logger.Info("sync global gs online map finish, len: %v", copyMapLen)
}