优化机器人

This commit is contained in:
flswld
2023-02-14 22:57:35 +08:00
parent 804e0dfc1a
commit 02c73f8444
10 changed files with 326 additions and 484 deletions

View File

@@ -1,12 +0,0 @@
package kcp
import "golang.org/x/net/ipv4"
const (
batchSize = 16
)
type batchConn interface {
WriteBatch(ms []ipv4.Message, flags int) (int, error)
ReadBatch(ms []ipv4.Message, flags int) (int, error)
}

134
gate/kcp/enet.go Normal file
View File

@@ -0,0 +1,134 @@
package kcp
import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
)
// 原神Enet连接控制协议
// MM MM MM MM | LL LL LL LL | HH HH HH HH | EE EE EE EE | MM MM MM MM
// MM为表示连接状态的幻数 在开头的4字节和结尾的4字节
// LL和HH分别为convId的低4字节和高4字节
// EE为Enet事件类型 4字节
// Enet协议上报结构体
type Enet struct {
Addr string
ConvId uint64
ConnType uint8
EnetType uint32
}
// Enet连接状态类型
const (
ConnEnetSyn = 1
ConnEnetEst = 2
ConnEnetFin = 3
ConnEnetAddrChange = 4
)
// Enet连接状态类型幻数
var MagicEnetSynHead, _ = hex.DecodeString("000000ff")
var MagicEnetSynTail, _ = hex.DecodeString("ffffffff")
var MagicEnetEstHead, _ = hex.DecodeString("00000145")
var MagicEnetEstTail, _ = hex.DecodeString("14514545")
var MagicEnetFinHead, _ = hex.DecodeString("00000194")
var MagicEnetFinTail, _ = hex.DecodeString("19419494")
// Enet事件类型
const (
EnetTimeout = 0
EnetClientClose = 1
EnetClientRebindFail = 2
EnetClientShutdown = 3
EnetServerRelogin = 4
EnetServerKick = 5
EnetServerShutdown = 6
EnetNotFoundSession = 7
EnetLoginUnfinished = 8
EnetPacketFreqTooHigh = 9
EnetPingTimeout = 10
EnetTranferFailed = 11
EnetServerKillClient = 12
EnetCheckMoveSpeed = 13
EnetAccountPasswordChange = 14
EnetClientEditorConnectKey = 987654321
EnetClientConnectKey = 1234567890
)
func BuildEnet(connType uint8, enetType uint32, conv uint64) []byte {
data := make([]byte, 20)
if connType == ConnEnetSyn {
copy(data[0:4], MagicEnetSynHead)
copy(data[16:20], MagicEnetSynTail)
} else if connType == ConnEnetEst {
copy(data[0:4], MagicEnetEstHead)
copy(data[16:20], MagicEnetEstTail)
} else if connType == ConnEnetFin {
copy(data[0:4], MagicEnetFinHead)
copy(data[16:20], MagicEnetFinTail)
} else {
return nil
}
// conv的高四个字节和低四个字节分开
// 例如 00 00 01 45 | LL LL LL LL | HH HH HH HH | 49 96 02 d2 | 14 51 45 45
data[4] = uint8(conv >> 24)
data[5] = uint8(conv >> 16)
data[6] = uint8(conv >> 8)
data[7] = uint8(conv >> 0)
data[8] = uint8(conv >> 56)
data[9] = uint8(conv >> 48)
data[10] = uint8(conv >> 40)
data[11] = uint8(conv >> 32)
// Enet
data[12] = uint8(enetType >> 24)
data[13] = uint8(enetType >> 16)
data[14] = uint8(enetType >> 8)
data[15] = uint8(enetType >> 0)
return data
}
func ParseEnet(data []byte) (connType uint8, enetType uint32, conv uint64, err error) {
// 提取convId
conv = uint64(0)
conv += uint64(data[4]) << 24
conv += uint64(data[5]) << 16
conv += uint64(data[6]) << 8
conv += uint64(data[7]) << 0
conv += uint64(data[8]) << 56
conv += uint64(data[9]) << 48
conv += uint64(data[10]) << 40
conv += uint64(data[11]) << 32
// 提取Enet协议头部和尾部幻数
udpPayloadEnetHead := data[:4]
udpPayloadEnetTail := data[len(data)-4:]
// 提取Enet协议类型
enetTypeData := data[12:16]
enetTypeDataBuffer := bytes.NewBuffer(enetTypeData)
enetType = uint32(0)
_ = binary.Read(enetTypeDataBuffer, binary.BigEndian, &enetType)
equalHead := bytes.Equal(udpPayloadEnetHead, MagicEnetSynHead)
equalTail := bytes.Equal(udpPayloadEnetTail, MagicEnetSynTail)
if equalHead && equalTail {
// 客户端前置握手获取conv
connType = ConnEnetSyn
return connType, enetType, conv, nil
}
equalHead = bytes.Equal(udpPayloadEnetHead, MagicEnetEstHead)
equalTail = bytes.Equal(udpPayloadEnetTail, MagicEnetEstTail)
if equalHead && equalTail {
// 连接建立
connType = ConnEnetEst
return connType, enetType, conv, nil
}
equalHead = bytes.Equal(udpPayloadEnetHead, MagicEnetFinHead)
equalTail = bytes.Equal(udpPayloadEnetTail, MagicEnetFinTail)
if equalHead && equalTail {
// 连接断开
connType = ConnEnetFin
return connType, enetType, conv, nil
}
return 0, 0, 0, errors.New("unknown conn type")
}

View File

@@ -10,7 +10,6 @@ package kcp
import (
"crypto/rand"
"encoding/binary"
"encoding/hex"
"io"
"net"
"sync"
@@ -64,17 +63,12 @@ type (
ownConn bool // true if we created conn internally, false if provided by caller
kcp *KCP // KCP ARQ protocol
l *Listener // pointing to the Listener object if it's been accepted by a Listener
// block BlockCrypt // block encryption object
// kcp receiving is based on packets
// recvbuf turns packets into stream
recvbuf []byte
bufptr []byte
// // FEC codec
// fecDecoder *fecDecoder
// fecEncoder *fecEncoder
// settings
remote net.Addr // remote peer address
rd time.Time // read deadline
@@ -98,9 +92,6 @@ type (
socketReadErrorOnce sync.Once
socketWriteErrorOnce sync.Once
// // nonce generator
// nonce Entropy
// packets waiting to be sent on wire
txqueue []ipv4.Message
xconn batchConn // for x/net
@@ -126,8 +117,6 @@ type (
func newUDPSession(conv uint64, l *Listener, conn net.PacketConn, ownConn bool, remote net.Addr) *UDPSession {
sess := new(UDPSession)
sess.die = make(chan struct{})
// sess.nonce = new(nonceAES128)
// sess.nonce.Init()
sess.chReadEvent = make(chan struct{}, 1)
sess.chWriteEvent = make(chan struct{}, 1)
sess.chSocketReadError = make(chan struct{})
@@ -136,7 +125,6 @@ func newUDPSession(conv uint64, l *Listener, conn net.PacketConn, ownConn bool,
sess.conn = conn
sess.ownConn = ownConn
sess.l = l
// sess.block = block
sess.recvbuf = make([]byte, mtuLimit)
// cast to writebatch conn
@@ -151,22 +139,6 @@ func newUDPSession(conv uint64, l *Listener, conn net.PacketConn, ownConn bool,
}
}
// // FEC codec initialization
// sess.fecDecoder = newFECDecoder(dataShards, parityShards)
// if sess.block != nil {
// sess.fecEncoder = newFECEncoder(dataShards, parityShards, cryptHeaderSize)
// } else {
// sess.fecEncoder = newFECEncoder(dataShards, parityShards, 0)
// }
// // calculate additional header size introduced by FEC and encryption
// if sess.block != nil {
// sess.headerSize += cryptHeaderSize
// }
// if sess.fecEncoder != nil {
// sess.headerSize += fecHeaderSizePlus2
// }
sess.kcp = NewKCP(conv, func(buf []byte, size int) {
if size >= IKCP_OVERHEAD+sess.headerSize {
sess.output(buf[:size])
@@ -370,9 +342,6 @@ func (s *UDPSession) Close() error {
s.uncork()
// release pending segments
s.kcp.ReleaseTX()
// if s.fecDecoder != nil {
// s.fecDecoder.release()
// }
s.mu.Unlock()
if s.l != nil { // belongs to listener
@@ -551,26 +520,6 @@ func (s *UDPSession) SetWriteBuffer(bytes int) error {
func (s *UDPSession) output(buf []byte) {
var ecc [][]byte
// // 1. FEC encoding
// if s.fecEncoder != nil {
// ecc = s.fecEncoder.encode(buf)
// }
// // 2&3. crc32 & encryption
// if s.block != nil {
// s.nonce.Fill(buf[:nonceSize])
// checksum := crc32.ChecksumIEEE(buf[cryptHeaderSize:])
// binary.LittleEndian.PutUint32(buf[nonceSize:], checksum)
// s.block.Encrypt(buf, buf)
//
// for k := range ecc {
// s.nonce.Fill(ecc[k][:nonceSize])
// checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
// binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
// s.block.Encrypt(ecc[k], ecc[k])
// }
// }
// 4. TxQueue
var msg ipv4.Message
for i := 0; i < s.dup+1; i++ {
@@ -662,101 +611,12 @@ func (s *UDPSession) notifyWriteError(err error) {
// packet input stage
func (s *UDPSession) packetInput(data []byte) {
// decrypted := false
// if s.block != nil && len(data) >= cryptHeaderSize {
// s.block.Decrypt(data, data)
// data = data[nonceSize:]
// checksum := crc32.ChecksumIEEE(data[crcSize:])
// if checksum == binary.LittleEndian.Uint32(data) {
// data = data[crcSize:]
// decrypted = true
// } else {
// atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
// }
// } else if s.block == nil {
// decrypted = true
// }
decrypted := true
if decrypted && len(data) >= IKCP_OVERHEAD {
if len(data) >= IKCP_OVERHEAD {
s.kcpInput(data)
}
}
func (s *UDPSession) kcpInput(data []byte) {
// var kcpInErrors, fecErrs, fecRecovered, fecParityShards uint64
//
// fecFlag := binary.LittleEndian.Uint16(data[8:])
// if fecFlag == typeData || fecFlag == typeParity { // 16bit kcp cmd [81-84] and frg [0-255] will not overlap with FEC type 0x00f1 0x00f2
// if len(data) >= fecHeaderSizePlus2 {
// f := fecPacket(data)
// if f.flag() == typeParity {
// fecParityShards++
// }
//
// // lock
// s.mu.Lock()
// // if fecDecoder is not initialized, create one with default parameter
// if s.fecDecoder == nil {
// s.fecDecoder = newFECDecoder(1, 1)
// }
// recovers := s.fecDecoder.decode(f)
// if f.flag() == typeData {
// if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 {
// kcpInErrors++
// }
// }
//
// for _, r := range recovers {
// if len(r) >= 2 { // must be larger than 2bytes
// sz := binary.LittleEndian.Uint16(r)
// if int(sz) <= len(r) && sz >= 2 {
// if ret := s.kcp.Input(r[2:sz], false, s.ackNoDelay); ret == 0 {
// fecRecovered++
// } else {
// kcpInErrors++
// }
// } else {
// fecErrs++
// }
// } else {
// fecErrs++
// }
// // recycle the recovers
// xmitBuf.Put(r)
// }
//
// // to notify the readers to receive the data
// if n := s.kcp.PeekSize(); n > 0 {
// s.notifyReadEvent()
// }
// // to notify the writers
// waitsnd := s.kcp.WaitSnd()
// if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) {
// s.notifyWriteEvent()
// }
//
// s.uncork()
// s.mu.Unlock()
// } else {
// atomic.AddUint64(&DefaultSnmp.InErrs, 1)
// }
// } else {
// s.mu.Lock()
// if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 {
// kcpInErrors++
// }
// if n := s.kcp.PeekSize(); n > 0 {
// s.notifyReadEvent()
// }
// waitsnd := s.kcp.WaitSnd()
// if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) {
// s.notifyWriteEvent()
// }
// s.uncork()
// s.mu.Unlock()
// }
var kcpInErrors uint64
s.mu.Lock()
if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 {
@@ -777,75 +637,11 @@ func (s *UDPSession) kcpInput(data []byte) {
if kcpInErrors > 0 {
atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors)
}
// if fecParityShards > 0 {
// atomic.AddUint64(&DefaultSnmp.FECParityShards, fecParityShards)
// }
// if fecErrs > 0 {
// atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs)
// }
// if fecRecovered > 0 {
// atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
// }
}
// 原神Enet连接控制协议
// MM MM MM MM | LL LL LL LL | HH HH HH HH | EE EE EE EE | MM MM MM MM
// MM为表示连接状态的幻数 在开头的4字节和结尾的4字节
// LL和HH分别为convId的低4字节和高4字节
// EE为Enet事件类型 4字节
// Enet协议上报结构体
type Enet struct {
Addr string
ConvId uint64
ConnType uint8
EnetType uint32
}
// Enet连接状态类型
const (
ConnEnetSyn = 1
ConnEnetEst = 2
ConnEnetFin = 3
ConnEnetAddrChange = 4
)
// Enet连接状态类型幻数
var MagicEnetSynHead, _ = hex.DecodeString("000000ff")
var MagicEnetSynTail, _ = hex.DecodeString("ffffffff")
var MagicEnetEstHead, _ = hex.DecodeString("00000145")
var MagicEnetEstTail, _ = hex.DecodeString("14514545")
var MagicEnetFinHead, _ = hex.DecodeString("00000194")
var MagicEnetFinTail, _ = hex.DecodeString("19419494")
// Enet事件类型
const (
EnetTimeout = 0
EnetClientClose = 1
EnetClientRebindFail = 2
EnetClientShutdown = 3
EnetServerRelogin = 4
EnetServerKick = 5
EnetServerShutdown = 6
EnetNotFoundSession = 7
EnetLoginUnfinished = 8
EnetPacketFreqTooHigh = 9
EnetPingTimeout = 10
EnetTranferFailed = 11
EnetServerKillClient = 12
EnetCheckMoveSpeed = 13
EnetAccountPasswordChange = 14
EnetClientEditorConnectKey = 987654321
EnetClientConnectKey = 1234567890
)
type (
// Listener defines a server which will be waiting to accept incoming connections
Listener struct {
// block BlockCrypt // block encryption
// dataShards int // FEC data shard
// parityShards int // FEC parity shard
conn net.PacketConn // the underlying packet connection
ownConn bool // true if we created conn internally, false if provided by caller
@@ -871,23 +667,7 @@ type (
// packet input stage
func (l *Listener) packetInput(data []byte, addr net.Addr, convId uint64) {
// decrypted := false
// if l.block != nil && len(data) >= cryptHeaderSize {
// l.block.Decrypt(data, data)
// data = data[nonceSize:]
// checksum := crc32.ChecksumIEEE(data[crcSize:])
// if checksum == binary.LittleEndian.Uint32(data) {
// data = data[crcSize:]
// decrypted = true
// } else {
// atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
// }
// } else if l.block == nil {
// decrypted = true
// }
decrypted := true
if decrypted && len(data) >= IKCP_OVERHEAD {
if len(data) >= IKCP_OVERHEAD {
l.sessionLock.RLock()
s, ok := l.sessions[convId]
l.sessionLock.RUnlock()
@@ -895,20 +675,6 @@ func (l *Listener) packetInput(data []byte, addr net.Addr, convId uint64) {
var conv uint64
var sn uint32
convRecovered := false
// fecFlag := binary.LittleEndian.Uint16(data[8:])
// if fecFlag == typeData || fecFlag == typeParity { // 16bit kcp cmd [81-84] and frg [0-255] will not overlap with FEC type 0x00f1 0x00f2
// // packet with FEC
// if fecFlag == typeData && len(data) >= fecHeaderSizePlus2+IKCP_OVERHEAD {
// conv = binary.LittleEndian.Uint64(data[fecHeaderSizePlus2:])
// sn = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2+IKCP_SN_OFFSET:])
// convRecovered = true
// }
// } else {
// // packet without FEC
// conv = binary.LittleEndian.Uint64(data)
// sn = binary.LittleEndian.Uint32(data[IKCP_SN_OFFSET:])
// convRecovered = true
// }
// packet without FEC
conv = binary.LittleEndian.Uint64(data)
@@ -1103,9 +869,6 @@ func serveConn(conn net.PacketConn, ownConn bool) (*Listener, error) {
l.chAccepts = make(chan *UDPSession, acceptBacklog)
l.chSessionClosed = make(chan net.Addr)
l.die = make(chan struct{})
// l.dataShards = dataShards
// l.parityShards = parityShards
// l.block = block
l.chSocketReadError = make(chan struct{})
l.EnetNotify = make(chan *Enet, 1000)
go l.monitor()
@@ -1138,9 +901,36 @@ func DialWithOptions(raddr string) (*UDPSession, error) {
return nil, errors.WithStack(err)
}
var convid uint64
binary.Read(rand.Reader, binary.LittleEndian, &convid)
return newUDPSession(convid, nil, conn, true, udpaddr), nil
hsconn, err := net.DialUDP(network, nil, udpaddr)
if err != nil {
return nil, err
}
enet := &Enet{
Addr: raddr,
ConvId: 0,
ConnType: ConnEnetSyn,
EnetType: EnetClientConnectKey,
}
data := BuildEnet(enet.ConnType, enet.EnetType, enet.ConvId)
_, err = hsconn.Write(data)
if err != nil {
return nil, err
}
buf := make([]byte, mtuLimit)
n, addr, err := hsconn.ReadFrom(buf)
if err != nil {
return nil, err
}
if addr.String() != raddr {
return nil, errors.New("recv packet remote addr not match")
}
udpPayload := buf[:n]
connType, enetType, conv, err := ParseEnet(udpPayload)
if err != nil || connType != ConnEnetEst || enetType != EnetClientConnectKey {
return nil, errors.New("recv packet format error")
}
return newUDPSession(conv, nil, conn, true, udpaddr), nil
}
// NewConn3 establishes a session and talks KCP protocol over a packet connection.

View File

@@ -7,14 +7,12 @@ import (
// Snmp defines network statistics indicator
type Snmp struct {
BytesSent uint64 // bytes sent from upper level
BytesReceived uint64 // bytes received to upper level
MaxConn uint64 // max number of connections ever reached
ActiveOpens uint64 // accumulated active open connections
PassiveOpens uint64 // accumulated passive open connections
CurrEstab uint64 // current number of established connections
// InErrs uint64 // UDP read errors reported from net.PacketConn
// InCsumErrors uint64 // checksum errors from CRC32
BytesSent uint64 // bytes sent from upper level
BytesReceived uint64 // bytes received to upper level
MaxConn uint64 // max number of connections ever reached
ActiveOpens uint64 // accumulated active open connections
PassiveOpens uint64 // accumulated passive open connections
CurrEstab uint64 // current number of established connections
KCPInErrors uint64 // packet iput errors reported from KCP
InPkts uint64 // incoming packets count
OutPkts uint64 // outgoing packets count
@@ -27,10 +25,6 @@ type Snmp struct {
EarlyRetransSegs uint64 // accmulated early retransmitted segments
LostSegs uint64 // number of segs inferred as lost
RepeatSegs uint64 // number of segs duplicated
// FECRecovered uint64 // correct packets recovered from FEC
// FECErrs uint64 // incorrect packets recovered from FEC
// FECParityShards uint64 // FEC segments received
// FECShortShards uint64 // number of data shards that's not enough for recovery
}
func newSnmp() *Snmp {
@@ -46,8 +40,6 @@ func (s *Snmp) Header() []string {
"ActiveOpens",
"PassiveOpens",
"CurrEstab",
// "InErrs",
// "InCsumErrors",
"KCPInErrors",
"InPkts",
"OutPkts",
@@ -60,10 +52,6 @@ func (s *Snmp) Header() []string {
"EarlyRetransSegs",
"LostSegs",
"RepeatSegs",
// "FECParityShards",
// "FECErrs",
// "FECRecovered",
// "FECShortShards",
}
}
@@ -77,8 +65,6 @@ func (s *Snmp) ToSlice() []string {
fmt.Sprint(snmp.ActiveOpens),
fmt.Sprint(snmp.PassiveOpens),
fmt.Sprint(snmp.CurrEstab),
// fmt.Sprint(snmp.InErrs),
// fmt.Sprint(snmp.InCsumErrors),
fmt.Sprint(snmp.KCPInErrors),
fmt.Sprint(snmp.InPkts),
fmt.Sprint(snmp.OutPkts),
@@ -91,10 +77,6 @@ func (s *Snmp) ToSlice() []string {
fmt.Sprint(snmp.EarlyRetransSegs),
fmt.Sprint(snmp.LostSegs),
fmt.Sprint(snmp.RepeatSegs),
// fmt.Sprint(snmp.FECParityShards),
// fmt.Sprint(snmp.FECErrs),
// fmt.Sprint(snmp.FECRecovered),
// fmt.Sprint(snmp.FECShortShards),
}
}
@@ -107,8 +89,6 @@ func (s *Snmp) Copy() *Snmp {
d.ActiveOpens = atomic.LoadUint64(&s.ActiveOpens)
d.PassiveOpens = atomic.LoadUint64(&s.PassiveOpens)
d.CurrEstab = atomic.LoadUint64(&s.CurrEstab)
// d.InErrs = atomic.LoadUint64(&s.InErrs)
// d.InCsumErrors = atomic.LoadUint64(&s.InCsumErrors)
d.KCPInErrors = atomic.LoadUint64(&s.KCPInErrors)
d.InPkts = atomic.LoadUint64(&s.InPkts)
d.OutPkts = atomic.LoadUint64(&s.OutPkts)
@@ -121,10 +101,6 @@ func (s *Snmp) Copy() *Snmp {
d.EarlyRetransSegs = atomic.LoadUint64(&s.EarlyRetransSegs)
d.LostSegs = atomic.LoadUint64(&s.LostSegs)
d.RepeatSegs = atomic.LoadUint64(&s.RepeatSegs)
// d.FECParityShards = atomic.LoadUint64(&s.FECParityShards)
// d.FECErrs = atomic.LoadUint64(&s.FECErrs)
// d.FECRecovered = atomic.LoadUint64(&s.FECRecovered)
// d.FECShortShards = atomic.LoadUint64(&s.FECShortShards)
return d
}
@@ -136,8 +112,6 @@ func (s *Snmp) Reset() {
atomic.StoreUint64(&s.ActiveOpens, 0)
atomic.StoreUint64(&s.PassiveOpens, 0)
atomic.StoreUint64(&s.CurrEstab, 0)
// atomic.StoreUint64(&s.InErrs, 0)
// atomic.StoreUint64(&s.InCsumErrors, 0)
atomic.StoreUint64(&s.KCPInErrors, 0)
atomic.StoreUint64(&s.InPkts, 0)
atomic.StoreUint64(&s.OutPkts, 0)
@@ -150,10 +124,6 @@ func (s *Snmp) Reset() {
atomic.StoreUint64(&s.EarlyRetransSegs, 0)
atomic.StoreUint64(&s.LostSegs, 0)
atomic.StoreUint64(&s.RepeatSegs, 0)
// atomic.StoreUint64(&s.FECParityShards, 0)
// atomic.StoreUint64(&s.FECErrs, 0)
// atomic.StoreUint64(&s.FECRecovered, 0)
// atomic.StoreUint64(&s.FECShortShards, 0)
}
// DefaultSnmp is the global KCP connection statistics collector

View File

@@ -1,8 +1,6 @@
package kcp
import (
"bytes"
"encoding/binary"
"net"
"sync/atomic"
@@ -21,12 +19,24 @@ func (s *UDPSession) defaultReadLoop() {
if src == "" { // set source address
src = addr.String()
} else if addr.String() != src {
// atomic.AddUint64(&DefaultSnmp.InErrs, 1)
// continue
s.remote = addr
src = addr.String()
}
if n == 20 {
connType, _, conv, err := ParseEnet(udpPayload)
if err != nil {
continue
}
if conv != s.GetConv() {
continue
}
if connType == ConnEnetFin {
s.Close()
continue
}
}
s.packetInput(udpPayload)
} else {
s.notifyReadError(errors.WithStack(err))
@@ -42,27 +52,13 @@ func (l *Listener) defaultMonitor() {
udpPayload := buf[:n]
var convId uint64 = 0
if n == 20 {
// 原神KCP的Enet协议
// 提取convId
convId += uint64(udpPayload[4]) << 24
convId += uint64(udpPayload[5]) << 16
convId += uint64(udpPayload[6]) << 8
convId += uint64(udpPayload[7]) << 0
convId += uint64(udpPayload[8]) << 56
convId += uint64(udpPayload[9]) << 48
convId += uint64(udpPayload[10]) << 40
convId += uint64(udpPayload[11]) << 32
// 提取Enet协议头部和尾部幻数
udpPayloadEnetHead := udpPayload[:4]
udpPayloadEnetTail := udpPayload[len(udpPayload)-4:]
// 提取Enet协议类型
enetTypeData := udpPayload[12:16]
enetTypeDataBuffer := bytes.NewBuffer(enetTypeData)
var enetType uint32
_ = binary.Read(enetTypeDataBuffer, binary.BigEndian, &enetType)
equalHead := bytes.Compare(udpPayloadEnetHead, MagicEnetSynHead)
equalTail := bytes.Compare(udpPayloadEnetTail, MagicEnetSynTail)
if equalHead == 0 && equalTail == 0 {
connType, enetType, conv, err := ParseEnet(udpPayload)
if err != nil {
continue
}
convId = conv
switch connType {
case ConnEnetSyn:
// 客户端前置握手获取conv
l.EnetNotify <- &Enet{
Addr: from.String(),
@@ -70,11 +66,7 @@ func (l *Listener) defaultMonitor() {
ConnType: ConnEnetSyn,
EnetType: enetType,
}
continue
}
equalHead = bytes.Compare(udpPayloadEnetHead, MagicEnetEstHead)
equalTail = bytes.Compare(udpPayloadEnetTail, MagicEnetEstTail)
if equalHead == 0 && equalTail == 0 {
case ConnEnetEst:
// 连接建立
l.EnetNotify <- &Enet{
Addr: from.String(),
@@ -82,11 +74,7 @@ func (l *Listener) defaultMonitor() {
ConnType: ConnEnetEst,
EnetType: enetType,
}
continue
}
equalHead = bytes.Compare(udpPayloadEnetHead, MagicEnetFinHead)
equalTail = bytes.Compare(udpPayloadEnetTail, MagicEnetFinTail)
if equalHead == 0 && equalTail == 0 {
case ConnEnetFin:
// 连接断开
l.EnetNotify <- &Enet{
Addr: from.String(),
@@ -94,6 +82,7 @@ func (l *Listener) defaultMonitor() {
ConnType: ConnEnetFin,
EnetType: enetType,
}
default:
continue
}
} else {
@@ -129,61 +118,6 @@ func (l *Listener) defaultMonitor() {
}
}
func buildEnet(connType uint8, enetType uint32, conv uint64) []byte {
data := make([]byte, 20)
if connType == ConnEnetSyn {
copy(data[0:4], MagicEnetSynHead)
copy(data[16:20], MagicEnetSynTail)
} else if connType == ConnEnetEst {
copy(data[0:4], MagicEnetEstHead)
copy(data[16:20], MagicEnetEstTail)
} else if connType == ConnEnetFin {
copy(data[0:4], MagicEnetFinHead)
copy(data[16:20], MagicEnetFinTail)
} else {
return nil
}
// conv的高四个字节和低四个字节分开
// 例如 00 00 01 45 | LL LL LL LL | HH HH HH HH | 49 96 02 d2 | 14 51 45 45
data[4] = uint8(conv >> 24)
data[5] = uint8(conv >> 16)
data[6] = uint8(conv >> 8)
data[7] = uint8(conv >> 0)
data[8] = uint8(conv >> 56)
data[9] = uint8(conv >> 48)
data[10] = uint8(conv >> 40)
data[11] = uint8(conv >> 32)
// Enet
data[12] = uint8(enetType >> 24)
data[13] = uint8(enetType >> 16)
data[14] = uint8(enetType >> 8)
data[15] = uint8(enetType >> 0)
return data
}
func (l *Listener) defaultSendEnetNotifyToClient(enet *Enet) {
remoteAddr, err := net.ResolveUDPAddr("udp", enet.Addr)
if err != nil {
return
}
data := buildEnet(enet.ConnType, enet.EnetType, enet.ConvId)
if data == nil {
return
}
_, _ = l.conn.WriteTo(data, remoteAddr)
}
func (s *UDPSession) defaultSendEnetNotify(enet *Enet) {
data := buildEnet(enet.ConnType, enet.EnetType, s.GetConv())
if data == nil {
return
}
s.defaultTx([]ipv4.Message{{
Buffers: [][]byte{data},
Addr: s.remote,
}})
}
func (s *UDPSession) defaultTx(txqueue []ipv4.Message) {
nbytes := 0
npkts := 0
@@ -199,3 +133,26 @@ func (s *UDPSession) defaultTx(txqueue []ipv4.Message) {
atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
}
func (l *Listener) defaultSendEnetNotifyToPeer(enet *Enet) {
remoteAddr, err := net.ResolveUDPAddr("udp", enet.Addr)
if err != nil {
return
}
data := BuildEnet(enet.ConnType, enet.EnetType, enet.ConvId)
if data == nil {
return
}
_, _ = l.conn.WriteTo(data, remoteAddr)
}
func (s *UDPSession) defaultSendEnetNotifyToPeer(enet *Enet) {
data := BuildEnet(enet.ConnType, enet.EnetType, s.GetConv())
if data == nil {
return
}
s.defaultTx([]ipv4.Message{{
Buffers: [][]byte{data},
Addr: s.remote,
}})
}

View File

@@ -4,8 +4,6 @@
package kcp
import (
"bytes"
"encoding/binary"
"net"
"os"
"sync/atomic"
@@ -15,6 +13,15 @@ import (
"golang.org/x/net/ipv6"
)
const (
batchSize = 16
)
type batchConn interface {
WriteBatch(ms []ipv4.Message, flags int) (int, error)
ReadBatch(ms []ipv4.Message, flags int) (int, error)
}
// the read loop for a client session
func (s *UDPSession) readLoop() {
// default version
@@ -39,14 +46,26 @@ func (s *UDPSession) readLoop() {
if src == "" { // set source address if nil
src = msg.Addr.String()
} else if msg.Addr.String() != src {
// atomic.AddUint64(&DefaultSnmp.InErrs, 1)
// continue
s.remote = msg.Addr
src = msg.Addr.String()
}
udpPayload := msg.Buffers[0][:msg.N]
if msg.N == 20 {
connType, _, conv, err := ParseEnet(udpPayload)
if err != nil {
continue
}
if conv != s.GetConv() {
continue
}
if connType == ConnEnetFin {
s.Close()
continue
}
}
// source and size has validated
s.packetInput(udpPayload)
}
@@ -101,27 +120,13 @@ func (l *Listener) monitor() {
udpPayload := msg.Buffers[0][:msg.N]
var convId uint64 = 0
if msg.N == 20 {
// 原神KCP的Enet协议
// 提取convId
convId += uint64(udpPayload[4]) << 24
convId += uint64(udpPayload[5]) << 16
convId += uint64(udpPayload[6]) << 8
convId += uint64(udpPayload[7]) << 0
convId += uint64(udpPayload[8]) << 56
convId += uint64(udpPayload[9]) << 48
convId += uint64(udpPayload[10]) << 40
convId += uint64(udpPayload[11]) << 32
// 提取Enet协议头部和尾部幻数
udpPayloadEnetHead := udpPayload[:4]
udpPayloadEnetTail := udpPayload[len(udpPayload)-4:]
// 提取Enet协议类型
enetTypeData := udpPayload[12:16]
enetTypeDataBuffer := bytes.NewBuffer(enetTypeData)
var enetType uint32
_ = binary.Read(enetTypeDataBuffer, binary.BigEndian, &enetType)
equalHead := bytes.Compare(udpPayloadEnetHead, MagicEnetSynHead)
equalTail := bytes.Compare(udpPayloadEnetTail, MagicEnetSynTail)
if equalHead == 0 && equalTail == 0 {
connType, enetType, conv, err := ParseEnet(udpPayload)
if err != nil {
continue
}
convId = conv
switch connType {
case ConnEnetSyn:
// 客户端前置握手获取conv
l.EnetNotify <- &Enet{
Addr: msg.Addr.String(),
@@ -129,11 +134,7 @@ func (l *Listener) monitor() {
ConnType: ConnEnetSyn,
EnetType: enetType,
}
continue
}
equalHead = bytes.Compare(udpPayloadEnetHead, MagicEnetEstHead)
equalTail = bytes.Compare(udpPayloadEnetTail, MagicEnetEstTail)
if equalHead == 0 && equalTail == 0 {
case ConnEnetEst:
// 连接建立
l.EnetNotify <- &Enet{
Addr: msg.Addr.String(),
@@ -141,11 +142,7 @@ func (l *Listener) monitor() {
ConnType: ConnEnetEst,
EnetType: enetType,
}
continue
}
equalHead = bytes.Compare(udpPayloadEnetHead, MagicEnetFinHead)
equalTail = bytes.Compare(udpPayloadEnetTail, MagicEnetFinTail)
if equalHead == 0 && equalTail == 0 {
case ConnEnetFin:
// 连接断开
l.EnetNotify <- &Enet{
Addr: msg.Addr.String(),
@@ -153,6 +150,7 @@ func (l *Listener) monitor() {
ConnType: ConnEnetFin,
EnetType: enetType,
}
default:
continue
}
} else {
@@ -200,55 +198,6 @@ func (l *Listener) monitor() {
}
}
func (l *Listener) SendEnetNotifyToClient(enet *Enet) {
var xconn batchConn
_, ok := l.conn.(*net.UDPConn)
if !ok {
return
}
localAddr, err := net.ResolveUDPAddr("udp", l.conn.LocalAddr().String())
if err != nil {
return
}
if localAddr.IP.To4() != nil {
xconn = ipv4.NewPacketConn(l.conn)
} else {
xconn = ipv6.NewPacketConn(l.conn)
}
// default version
if xconn == nil {
l.defaultSendEnetNotifyToClient(enet)
return
}
remoteAddr, err := net.ResolveUDPAddr("udp", enet.Addr)
if err != nil {
return
}
data := buildEnet(enet.ConnType, enet.EnetType, enet.ConvId)
if data == nil {
return
}
_, _ = xconn.WriteBatch([]ipv4.Message{{
Buffers: [][]byte{data},
Addr: remoteAddr,
}}, 0)
}
func (s *UDPSession) SendEnetNotify(enet *Enet) {
data := buildEnet(enet.ConnType, enet.EnetType, s.GetConv())
if data == nil {
return
}
s.tx([]ipv4.Message{{
Buffers: [][]byte{data},
Addr: s.remote,
}})
}
func (s *UDPSession) tx(txqueue []ipv4.Message) {
// default version
if s.xconn == nil || s.xconnWriteError != nil {
@@ -287,3 +236,52 @@ func (s *UDPSession) tx(txqueue []ipv4.Message) {
atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
}
func (l *Listener) SendEnetNotifyToPeer(enet *Enet) {
var xconn batchConn
_, ok := l.conn.(*net.UDPConn)
if !ok {
return
}
localAddr, err := net.ResolveUDPAddr("udp", l.conn.LocalAddr().String())
if err != nil {
return
}
if localAddr.IP.To4() != nil {
xconn = ipv4.NewPacketConn(l.conn)
} else {
xconn = ipv6.NewPacketConn(l.conn)
}
// default version
if xconn == nil {
l.defaultSendEnetNotifyToPeer(enet)
return
}
remoteAddr, err := net.ResolveUDPAddr("udp", enet.Addr)
if err != nil {
return
}
data := BuildEnet(enet.ConnType, enet.EnetType, enet.ConvId)
if data == nil {
return
}
_, _ = xconn.WriteBatch([]ipv4.Message{{
Buffers: [][]byte{data},
Addr: remoteAddr,
}}, 0)
}
func (s *UDPSession) SendEnetNotifyToPeer(enet *Enet) {
data := BuildEnet(enet.ConnType, enet.EnetType, s.GetConv())
if data == nil {
return
}
s.tx([]ipv4.Message{{
Buffers: [][]byte{data},
Addr: s.remote,
}})
}

View File

@@ -15,12 +15,12 @@ func (l *Listener) monitor() {
l.defaultMonitor()
}
func (l *Listener) SendEnetNotifyToClient(enet *Enet) {
l.defaultSendEnetNotifyToClient(enet)
func (l *Listener) SendEnetNotifyToPeer(enet *Enet) {
l.defaultSendEnetNotifyToPeer(enet)
}
func (s *UDPSession) SendEnetNotify(enet *Enet) {
s.defaultSendEnetNotify(enet)
func (s *UDPSession) SendEnetNotifyToPeer(enet *Enet) {
s.defaultSendEnetNotifyToPeer(enet)
}
func (s *UDPSession) tx(txqueue []ipv4.Message) {

View File

@@ -238,7 +238,7 @@ func (k *KcpConnectManager) enetHandle(listener *kcp.Listener) {
break
}
}
listener.SendEnetNotifyToClient(&kcp.Enet{
listener.SendEnetNotifyToPeer(&kcp.Enet{
Addr: enetNotify.Addr,
ConvId: conv,
ConnType: kcp.ConnEnetEst,
@@ -251,7 +251,7 @@ func (k *KcpConnectManager) enetHandle(listener *kcp.Listener) {
logger.Error("session not exist, conv: %v", enetNotify.ConvId)
continue
}
session.conn.SendEnetNotify(&kcp.Enet{
session.conn.SendEnetNotifyToPeer(&kcp.Enet{
ConnType: kcp.ConnEnetFin,
EnetType: enetNotify.EnetType,
})
@@ -404,7 +404,7 @@ func (k *KcpConnectManager) closeKcpConn(session *Session, enetType uint32) {
// 关闭连接
err := conn.Close()
if err == nil {
conn.SendEnetNotify(&kcp.Enet{
conn.SendEnetNotifyToPeer(&kcp.Enet{
ConnType: kcp.ConnEnetFin,
EnetType: enetType,
})

View File

@@ -143,7 +143,9 @@ func AccountLogin(url string, account string, password string) (*AccountInfo, er
return nil, err
}
comboTokenReq := &api.ComboTokenReq{
Data: string(loginTokenDataJson),
AppID: 4,
ChannelID: 1,
Data: string(loginTokenDataJson),
}
logger.Info("http post url: %v", url+"/hk4e_global/combo/granter/login/v2/login")
comboTokenRsp, err := httpclient.PostJson[api.ComboTokenRsp](url+"/hk4e_global/combo/granter/login/v2/login", comboTokenReq, "")

View File

@@ -3,6 +3,7 @@ package net
import (
"time"
"hk4e/gate/client_proto"
"hk4e/gate/kcp"
hk4egatenet "hk4e/gate/net"
"hk4e/pkg/logger"
@@ -10,10 +11,12 @@ import (
)
type Session struct {
Conn *kcp.UDPSession
XorKey []byte
SendChan chan *hk4egatenet.ProtoMsg
RecvChan chan *hk4egatenet.ProtoMsg
Conn *kcp.UDPSession
XorKey []byte
SendChan chan *hk4egatenet.ProtoMsg
RecvChan chan *hk4egatenet.ProtoMsg
ServerCmdProtoMap *cmd.CmdProtoMap
ClientCmdProtoMap *client_proto.ClientCmdProtoMap
}
func NewSession(gateAddr string, dispatchKey []byte, localPort int) (*Session, error) {
@@ -27,13 +30,13 @@ func NewSession(gateAddr string, dispatchKey []byte, localPort int) (*Session, e
}
conn.SetACKNoDelay(true)
conn.SetWriteDelay(false)
sendChan := make(chan *hk4egatenet.ProtoMsg, 1000)
recvChan := make(chan *hk4egatenet.ProtoMsg, 1000)
r := &Session{
Conn: conn,
XorKey: dispatchKey,
SendChan: sendChan,
RecvChan: recvChan,
Conn: conn,
XorKey: dispatchKey,
SendChan: make(chan *hk4egatenet.ProtoMsg, 1000),
RecvChan: make(chan *hk4egatenet.ProtoMsg, 1000),
ServerCmdProtoMap: cmd.NewCmdProtoMap(),
ClientCmdProtoMap: client_proto.NewClientCmdProtoMap(),
}
go r.recvHandle()
go r.sendHandle()
@@ -58,7 +61,7 @@ func (s *Session) recvHandle() {
kcpMsgList := make([]*hk4egatenet.KcpMsg, 0)
hk4egatenet.DecodeBinToPayload(recvData, &dataBuf, convId, &kcpMsgList, s.XorKey)
for _, v := range kcpMsgList {
protoMsgList := hk4egatenet.ProtoDecode(v, cmd.NewCmdProtoMap(), nil)
protoMsgList := hk4egatenet.ProtoDecode(v, s.ServerCmdProtoMap, s.ClientCmdProtoMap)
for _, vv := range protoMsgList {
s.RecvChan <- vv
}
@@ -77,7 +80,7 @@ func (s *Session) sendHandle() {
_ = conn.Close()
break
}
kcpMsg := hk4egatenet.ProtoEncode(protoMsg, cmd.NewCmdProtoMap(), nil)
kcpMsg := hk4egatenet.ProtoEncode(protoMsg, s.ServerCmdProtoMap, s.ClientCmdProtoMap)
if kcpMsg == nil {
logger.Error("decode kcp msg is nil, convId: %v", convId)
continue