From 02c73f844401cd67b84ce20b1de119342d26a02c Mon Sep 17 00:00:00 2001 From: flswld Date: Tue, 14 Feb 2023 22:57:35 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=9C=BA=E5=99=A8=E4=BA=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gate/kcp/batchconn.go | 12 -- gate/kcp/enet.go | 134 +++++++++++++++ gate/kcp/{sess.go => session.go} | 274 ++++--------------------------- gate/kcp/snmp.go | 42 +---- gate/kcp/udp_socket.go | 137 ++++++---------- gate/kcp/udp_socket_linux.go | 166 +++++++++---------- gate/kcp/udp_socket_windows.go | 8 +- gate/net/kcp_connect_manager.go | 6 +- robot/login/http_login.go | 4 +- robot/net/session.go | 27 +-- 10 files changed, 326 insertions(+), 484 deletions(-) delete mode 100644 gate/kcp/batchconn.go create mode 100644 gate/kcp/enet.go rename gate/kcp/{sess.go => session.go} (76%) diff --git a/gate/kcp/batchconn.go b/gate/kcp/batchconn.go deleted file mode 100644 index 6c307010..00000000 --- a/gate/kcp/batchconn.go +++ /dev/null @@ -1,12 +0,0 @@ -package kcp - -import "golang.org/x/net/ipv4" - -const ( - batchSize = 16 -) - -type batchConn interface { - WriteBatch(ms []ipv4.Message, flags int) (int, error) - ReadBatch(ms []ipv4.Message, flags int) (int, error) -} diff --git a/gate/kcp/enet.go b/gate/kcp/enet.go new file mode 100644 index 00000000..eef3fb80 --- /dev/null +++ b/gate/kcp/enet.go @@ -0,0 +1,134 @@ +package kcp + +import ( + "bytes" + "encoding/binary" + "encoding/hex" + "errors" +) + +// 原神Enet连接控制协议 +// MM MM MM MM | LL LL LL LL | HH HH HH HH | EE EE EE EE | MM MM MM MM +// MM为表示连接状态的幻数 在开头的4字节和结尾的4字节 +// LL和HH分别为convId的低4字节和高4字节 +// EE为Enet事件类型 4字节 + +// Enet协议上报结构体 +type Enet struct { + Addr string + ConvId uint64 + ConnType uint8 + EnetType uint32 +} + +// Enet连接状态类型 +const ( + ConnEnetSyn = 1 + ConnEnetEst = 2 + ConnEnetFin = 3 + ConnEnetAddrChange = 4 +) + +// Enet连接状态类型幻数 +var MagicEnetSynHead, _ = hex.DecodeString("000000ff") +var MagicEnetSynTail, _ = hex.DecodeString("ffffffff") +var MagicEnetEstHead, _ = hex.DecodeString("00000145") +var MagicEnetEstTail, _ = hex.DecodeString("14514545") +var MagicEnetFinHead, _ = hex.DecodeString("00000194") +var MagicEnetFinTail, _ = hex.DecodeString("19419494") + +// Enet事件类型 +const ( + EnetTimeout = 0 + EnetClientClose = 1 + EnetClientRebindFail = 2 + EnetClientShutdown = 3 + EnetServerRelogin = 4 + EnetServerKick = 5 + EnetServerShutdown = 6 + EnetNotFoundSession = 7 + EnetLoginUnfinished = 8 + EnetPacketFreqTooHigh = 9 + EnetPingTimeout = 10 + EnetTranferFailed = 11 + EnetServerKillClient = 12 + EnetCheckMoveSpeed = 13 + EnetAccountPasswordChange = 14 + EnetClientEditorConnectKey = 987654321 + EnetClientConnectKey = 1234567890 +) + +func BuildEnet(connType uint8, enetType uint32, conv uint64) []byte { + data := make([]byte, 20) + if connType == ConnEnetSyn { + copy(data[0:4], MagicEnetSynHead) + copy(data[16:20], MagicEnetSynTail) + } else if connType == ConnEnetEst { + copy(data[0:4], MagicEnetEstHead) + copy(data[16:20], MagicEnetEstTail) + } else if connType == ConnEnetFin { + copy(data[0:4], MagicEnetFinHead) + copy(data[16:20], MagicEnetFinTail) + } else { + return nil + } + // conv的高四个字节和低四个字节分开 + // 例如 00 00 01 45 | LL LL LL LL | HH HH HH HH | 49 96 02 d2 | 14 51 45 45 + data[4] = uint8(conv >> 24) + data[5] = uint8(conv >> 16) + data[6] = uint8(conv >> 8) + data[7] = uint8(conv >> 0) + data[8] = uint8(conv >> 56) + data[9] = uint8(conv >> 48) + data[10] = uint8(conv >> 40) + data[11] = uint8(conv >> 32) + // Enet + data[12] = uint8(enetType >> 24) + data[13] = uint8(enetType >> 16) + data[14] = uint8(enetType >> 8) + data[15] = uint8(enetType >> 0) + return data +} + +func ParseEnet(data []byte) (connType uint8, enetType uint32, conv uint64, err error) { + // 提取convId + conv = uint64(0) + conv += uint64(data[4]) << 24 + conv += uint64(data[5]) << 16 + conv += uint64(data[6]) << 8 + conv += uint64(data[7]) << 0 + conv += uint64(data[8]) << 56 + conv += uint64(data[9]) << 48 + conv += uint64(data[10]) << 40 + conv += uint64(data[11]) << 32 + // 提取Enet协议头部和尾部幻数 + udpPayloadEnetHead := data[:4] + udpPayloadEnetTail := data[len(data)-4:] + // 提取Enet协议类型 + enetTypeData := data[12:16] + enetTypeDataBuffer := bytes.NewBuffer(enetTypeData) + enetType = uint32(0) + _ = binary.Read(enetTypeDataBuffer, binary.BigEndian, &enetType) + equalHead := bytes.Equal(udpPayloadEnetHead, MagicEnetSynHead) + equalTail := bytes.Equal(udpPayloadEnetTail, MagicEnetSynTail) + if equalHead && equalTail { + // 客户端前置握手获取conv + connType = ConnEnetSyn + return connType, enetType, conv, nil + } + equalHead = bytes.Equal(udpPayloadEnetHead, MagicEnetEstHead) + equalTail = bytes.Equal(udpPayloadEnetTail, MagicEnetEstTail) + if equalHead && equalTail { + // 连接建立 + connType = ConnEnetEst + return connType, enetType, conv, nil + } + equalHead = bytes.Equal(udpPayloadEnetHead, MagicEnetFinHead) + equalTail = bytes.Equal(udpPayloadEnetTail, MagicEnetFinTail) + if equalHead && equalTail { + // 连接断开 + connType = ConnEnetFin + return connType, enetType, conv, nil + } + return 0, 0, 0, errors.New("unknown conn type") +} diff --git a/gate/kcp/sess.go b/gate/kcp/session.go similarity index 76% rename from gate/kcp/sess.go rename to gate/kcp/session.go index 4eae1ca5..b26431e6 100644 --- a/gate/kcp/sess.go +++ b/gate/kcp/session.go @@ -10,7 +10,6 @@ package kcp import ( "crypto/rand" "encoding/binary" - "encoding/hex" "io" "net" "sync" @@ -64,17 +63,12 @@ type ( ownConn bool // true if we created conn internally, false if provided by caller kcp *KCP // KCP ARQ protocol l *Listener // pointing to the Listener object if it's been accepted by a Listener - // block BlockCrypt // block encryption object // kcp receiving is based on packets // recvbuf turns packets into stream recvbuf []byte bufptr []byte - // // FEC codec - // fecDecoder *fecDecoder - // fecEncoder *fecEncoder - // settings remote net.Addr // remote peer address rd time.Time // read deadline @@ -98,9 +92,6 @@ type ( socketReadErrorOnce sync.Once socketWriteErrorOnce sync.Once - // // nonce generator - // nonce Entropy - // packets waiting to be sent on wire txqueue []ipv4.Message xconn batchConn // for x/net @@ -126,8 +117,6 @@ type ( func newUDPSession(conv uint64, l *Listener, conn net.PacketConn, ownConn bool, remote net.Addr) *UDPSession { sess := new(UDPSession) sess.die = make(chan struct{}) - // sess.nonce = new(nonceAES128) - // sess.nonce.Init() sess.chReadEvent = make(chan struct{}, 1) sess.chWriteEvent = make(chan struct{}, 1) sess.chSocketReadError = make(chan struct{}) @@ -136,7 +125,6 @@ func newUDPSession(conv uint64, l *Listener, conn net.PacketConn, ownConn bool, sess.conn = conn sess.ownConn = ownConn sess.l = l - // sess.block = block sess.recvbuf = make([]byte, mtuLimit) // cast to writebatch conn @@ -151,22 +139,6 @@ func newUDPSession(conv uint64, l *Listener, conn net.PacketConn, ownConn bool, } } - // // FEC codec initialization - // sess.fecDecoder = newFECDecoder(dataShards, parityShards) - // if sess.block != nil { - // sess.fecEncoder = newFECEncoder(dataShards, parityShards, cryptHeaderSize) - // } else { - // sess.fecEncoder = newFECEncoder(dataShards, parityShards, 0) - // } - - // // calculate additional header size introduced by FEC and encryption - // if sess.block != nil { - // sess.headerSize += cryptHeaderSize - // } - // if sess.fecEncoder != nil { - // sess.headerSize += fecHeaderSizePlus2 - // } - sess.kcp = NewKCP(conv, func(buf []byte, size int) { if size >= IKCP_OVERHEAD+sess.headerSize { sess.output(buf[:size]) @@ -370,9 +342,6 @@ func (s *UDPSession) Close() error { s.uncork() // release pending segments s.kcp.ReleaseTX() - // if s.fecDecoder != nil { - // s.fecDecoder.release() - // } s.mu.Unlock() if s.l != nil { // belongs to listener @@ -551,26 +520,6 @@ func (s *UDPSession) SetWriteBuffer(bytes int) error { func (s *UDPSession) output(buf []byte) { var ecc [][]byte - // // 1. FEC encoding - // if s.fecEncoder != nil { - // ecc = s.fecEncoder.encode(buf) - // } - - // // 2&3. crc32 & encryption - // if s.block != nil { - // s.nonce.Fill(buf[:nonceSize]) - // checksum := crc32.ChecksumIEEE(buf[cryptHeaderSize:]) - // binary.LittleEndian.PutUint32(buf[nonceSize:], checksum) - // s.block.Encrypt(buf, buf) - // - // for k := range ecc { - // s.nonce.Fill(ecc[k][:nonceSize]) - // checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:]) - // binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum) - // s.block.Encrypt(ecc[k], ecc[k]) - // } - // } - // 4. TxQueue var msg ipv4.Message for i := 0; i < s.dup+1; i++ { @@ -662,101 +611,12 @@ func (s *UDPSession) notifyWriteError(err error) { // packet input stage func (s *UDPSession) packetInput(data []byte) { - // decrypted := false - // if s.block != nil && len(data) >= cryptHeaderSize { - // s.block.Decrypt(data, data) - // data = data[nonceSize:] - // checksum := crc32.ChecksumIEEE(data[crcSize:]) - // if checksum == binary.LittleEndian.Uint32(data) { - // data = data[crcSize:] - // decrypted = true - // } else { - // atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1) - // } - // } else if s.block == nil { - // decrypted = true - // } - - decrypted := true - if decrypted && len(data) >= IKCP_OVERHEAD { + if len(data) >= IKCP_OVERHEAD { s.kcpInput(data) } } func (s *UDPSession) kcpInput(data []byte) { - // var kcpInErrors, fecErrs, fecRecovered, fecParityShards uint64 - // - // fecFlag := binary.LittleEndian.Uint16(data[8:]) - // if fecFlag == typeData || fecFlag == typeParity { // 16bit kcp cmd [81-84] and frg [0-255] will not overlap with FEC type 0x00f1 0x00f2 - // if len(data) >= fecHeaderSizePlus2 { - // f := fecPacket(data) - // if f.flag() == typeParity { - // fecParityShards++ - // } - // - // // lock - // s.mu.Lock() - // // if fecDecoder is not initialized, create one with default parameter - // if s.fecDecoder == nil { - // s.fecDecoder = newFECDecoder(1, 1) - // } - // recovers := s.fecDecoder.decode(f) - // if f.flag() == typeData { - // if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 { - // kcpInErrors++ - // } - // } - // - // for _, r := range recovers { - // if len(r) >= 2 { // must be larger than 2bytes - // sz := binary.LittleEndian.Uint16(r) - // if int(sz) <= len(r) && sz >= 2 { - // if ret := s.kcp.Input(r[2:sz], false, s.ackNoDelay); ret == 0 { - // fecRecovered++ - // } else { - // kcpInErrors++ - // } - // } else { - // fecErrs++ - // } - // } else { - // fecErrs++ - // } - // // recycle the recovers - // xmitBuf.Put(r) - // } - // - // // to notify the readers to receive the data - // if n := s.kcp.PeekSize(); n > 0 { - // s.notifyReadEvent() - // } - // // to notify the writers - // waitsnd := s.kcp.WaitSnd() - // if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) { - // s.notifyWriteEvent() - // } - // - // s.uncork() - // s.mu.Unlock() - // } else { - // atomic.AddUint64(&DefaultSnmp.InErrs, 1) - // } - // } else { - // s.mu.Lock() - // if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 { - // kcpInErrors++ - // } - // if n := s.kcp.PeekSize(); n > 0 { - // s.notifyReadEvent() - // } - // waitsnd := s.kcp.WaitSnd() - // if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) { - // s.notifyWriteEvent() - // } - // s.uncork() - // s.mu.Unlock() - // } - var kcpInErrors uint64 s.mu.Lock() if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 { @@ -777,75 +637,11 @@ func (s *UDPSession) kcpInput(data []byte) { if kcpInErrors > 0 { atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors) } - // if fecParityShards > 0 { - // atomic.AddUint64(&DefaultSnmp.FECParityShards, fecParityShards) - // } - // if fecErrs > 0 { - // atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs) - // } - // if fecRecovered > 0 { - // atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered) - // } - } -// 原神Enet连接控制协议 -// MM MM MM MM | LL LL LL LL | HH HH HH HH | EE EE EE EE | MM MM MM MM -// MM为表示连接状态的幻数 在开头的4字节和结尾的4字节 -// LL和HH分别为convId的低4字节和高4字节 -// EE为Enet事件类型 4字节 - -// Enet协议上报结构体 -type Enet struct { - Addr string - ConvId uint64 - ConnType uint8 - EnetType uint32 -} - -// Enet连接状态类型 -const ( - ConnEnetSyn = 1 - ConnEnetEst = 2 - ConnEnetFin = 3 - ConnEnetAddrChange = 4 -) - -// Enet连接状态类型幻数 -var MagicEnetSynHead, _ = hex.DecodeString("000000ff") -var MagicEnetSynTail, _ = hex.DecodeString("ffffffff") -var MagicEnetEstHead, _ = hex.DecodeString("00000145") -var MagicEnetEstTail, _ = hex.DecodeString("14514545") -var MagicEnetFinHead, _ = hex.DecodeString("00000194") -var MagicEnetFinTail, _ = hex.DecodeString("19419494") - -// Enet事件类型 -const ( - EnetTimeout = 0 - EnetClientClose = 1 - EnetClientRebindFail = 2 - EnetClientShutdown = 3 - EnetServerRelogin = 4 - EnetServerKick = 5 - EnetServerShutdown = 6 - EnetNotFoundSession = 7 - EnetLoginUnfinished = 8 - EnetPacketFreqTooHigh = 9 - EnetPingTimeout = 10 - EnetTranferFailed = 11 - EnetServerKillClient = 12 - EnetCheckMoveSpeed = 13 - EnetAccountPasswordChange = 14 - EnetClientEditorConnectKey = 987654321 - EnetClientConnectKey = 1234567890 -) - type ( // Listener defines a server which will be waiting to accept incoming connections Listener struct { - // block BlockCrypt // block encryption - // dataShards int // FEC data shard - // parityShards int // FEC parity shard conn net.PacketConn // the underlying packet connection ownConn bool // true if we created conn internally, false if provided by caller @@ -871,23 +667,7 @@ type ( // packet input stage func (l *Listener) packetInput(data []byte, addr net.Addr, convId uint64) { - // decrypted := false - // if l.block != nil && len(data) >= cryptHeaderSize { - // l.block.Decrypt(data, data) - // data = data[nonceSize:] - // checksum := crc32.ChecksumIEEE(data[crcSize:]) - // if checksum == binary.LittleEndian.Uint32(data) { - // data = data[crcSize:] - // decrypted = true - // } else { - // atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1) - // } - // } else if l.block == nil { - // decrypted = true - // } - - decrypted := true - if decrypted && len(data) >= IKCP_OVERHEAD { + if len(data) >= IKCP_OVERHEAD { l.sessionLock.RLock() s, ok := l.sessions[convId] l.sessionLock.RUnlock() @@ -895,20 +675,6 @@ func (l *Listener) packetInput(data []byte, addr net.Addr, convId uint64) { var conv uint64 var sn uint32 convRecovered := false - // fecFlag := binary.LittleEndian.Uint16(data[8:]) - // if fecFlag == typeData || fecFlag == typeParity { // 16bit kcp cmd [81-84] and frg [0-255] will not overlap with FEC type 0x00f1 0x00f2 - // // packet with FEC - // if fecFlag == typeData && len(data) >= fecHeaderSizePlus2+IKCP_OVERHEAD { - // conv = binary.LittleEndian.Uint64(data[fecHeaderSizePlus2:]) - // sn = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2+IKCP_SN_OFFSET:]) - // convRecovered = true - // } - // } else { - // // packet without FEC - // conv = binary.LittleEndian.Uint64(data) - // sn = binary.LittleEndian.Uint32(data[IKCP_SN_OFFSET:]) - // convRecovered = true - // } // packet without FEC conv = binary.LittleEndian.Uint64(data) @@ -1103,9 +869,6 @@ func serveConn(conn net.PacketConn, ownConn bool) (*Listener, error) { l.chAccepts = make(chan *UDPSession, acceptBacklog) l.chSessionClosed = make(chan net.Addr) l.die = make(chan struct{}) - // l.dataShards = dataShards - // l.parityShards = parityShards - // l.block = block l.chSocketReadError = make(chan struct{}) l.EnetNotify = make(chan *Enet, 1000) go l.monitor() @@ -1138,9 +901,36 @@ func DialWithOptions(raddr string) (*UDPSession, error) { return nil, errors.WithStack(err) } - var convid uint64 - binary.Read(rand.Reader, binary.LittleEndian, &convid) - return newUDPSession(convid, nil, conn, true, udpaddr), nil + hsconn, err := net.DialUDP(network, nil, udpaddr) + if err != nil { + return nil, err + } + enet := &Enet{ + Addr: raddr, + ConvId: 0, + ConnType: ConnEnetSyn, + EnetType: EnetClientConnectKey, + } + data := BuildEnet(enet.ConnType, enet.EnetType, enet.ConvId) + _, err = hsconn.Write(data) + if err != nil { + return nil, err + } + buf := make([]byte, mtuLimit) + n, addr, err := hsconn.ReadFrom(buf) + if err != nil { + return nil, err + } + if addr.String() != raddr { + return nil, errors.New("recv packet remote addr not match") + } + udpPayload := buf[:n] + connType, enetType, conv, err := ParseEnet(udpPayload) + if err != nil || connType != ConnEnetEst || enetType != EnetClientConnectKey { + return nil, errors.New("recv packet format error") + } + + return newUDPSession(conv, nil, conn, true, udpaddr), nil } // NewConn3 establishes a session and talks KCP protocol over a packet connection. diff --git a/gate/kcp/snmp.go b/gate/kcp/snmp.go index 4101b1c8..d63f661d 100644 --- a/gate/kcp/snmp.go +++ b/gate/kcp/snmp.go @@ -7,14 +7,12 @@ import ( // Snmp defines network statistics indicator type Snmp struct { - BytesSent uint64 // bytes sent from upper level - BytesReceived uint64 // bytes received to upper level - MaxConn uint64 // max number of connections ever reached - ActiveOpens uint64 // accumulated active open connections - PassiveOpens uint64 // accumulated passive open connections - CurrEstab uint64 // current number of established connections - // InErrs uint64 // UDP read errors reported from net.PacketConn - // InCsumErrors uint64 // checksum errors from CRC32 + BytesSent uint64 // bytes sent from upper level + BytesReceived uint64 // bytes received to upper level + MaxConn uint64 // max number of connections ever reached + ActiveOpens uint64 // accumulated active open connections + PassiveOpens uint64 // accumulated passive open connections + CurrEstab uint64 // current number of established connections KCPInErrors uint64 // packet iput errors reported from KCP InPkts uint64 // incoming packets count OutPkts uint64 // outgoing packets count @@ -27,10 +25,6 @@ type Snmp struct { EarlyRetransSegs uint64 // accmulated early retransmitted segments LostSegs uint64 // number of segs inferred as lost RepeatSegs uint64 // number of segs duplicated - // FECRecovered uint64 // correct packets recovered from FEC - // FECErrs uint64 // incorrect packets recovered from FEC - // FECParityShards uint64 // FEC segments received - // FECShortShards uint64 // number of data shards that's not enough for recovery } func newSnmp() *Snmp { @@ -46,8 +40,6 @@ func (s *Snmp) Header() []string { "ActiveOpens", "PassiveOpens", "CurrEstab", - // "InErrs", - // "InCsumErrors", "KCPInErrors", "InPkts", "OutPkts", @@ -60,10 +52,6 @@ func (s *Snmp) Header() []string { "EarlyRetransSegs", "LostSegs", "RepeatSegs", - // "FECParityShards", - // "FECErrs", - // "FECRecovered", - // "FECShortShards", } } @@ -77,8 +65,6 @@ func (s *Snmp) ToSlice() []string { fmt.Sprint(snmp.ActiveOpens), fmt.Sprint(snmp.PassiveOpens), fmt.Sprint(snmp.CurrEstab), - // fmt.Sprint(snmp.InErrs), - // fmt.Sprint(snmp.InCsumErrors), fmt.Sprint(snmp.KCPInErrors), fmt.Sprint(snmp.InPkts), fmt.Sprint(snmp.OutPkts), @@ -91,10 +77,6 @@ func (s *Snmp) ToSlice() []string { fmt.Sprint(snmp.EarlyRetransSegs), fmt.Sprint(snmp.LostSegs), fmt.Sprint(snmp.RepeatSegs), - // fmt.Sprint(snmp.FECParityShards), - // fmt.Sprint(snmp.FECErrs), - // fmt.Sprint(snmp.FECRecovered), - // fmt.Sprint(snmp.FECShortShards), } } @@ -107,8 +89,6 @@ func (s *Snmp) Copy() *Snmp { d.ActiveOpens = atomic.LoadUint64(&s.ActiveOpens) d.PassiveOpens = atomic.LoadUint64(&s.PassiveOpens) d.CurrEstab = atomic.LoadUint64(&s.CurrEstab) - // d.InErrs = atomic.LoadUint64(&s.InErrs) - // d.InCsumErrors = atomic.LoadUint64(&s.InCsumErrors) d.KCPInErrors = atomic.LoadUint64(&s.KCPInErrors) d.InPkts = atomic.LoadUint64(&s.InPkts) d.OutPkts = atomic.LoadUint64(&s.OutPkts) @@ -121,10 +101,6 @@ func (s *Snmp) Copy() *Snmp { d.EarlyRetransSegs = atomic.LoadUint64(&s.EarlyRetransSegs) d.LostSegs = atomic.LoadUint64(&s.LostSegs) d.RepeatSegs = atomic.LoadUint64(&s.RepeatSegs) - // d.FECParityShards = atomic.LoadUint64(&s.FECParityShards) - // d.FECErrs = atomic.LoadUint64(&s.FECErrs) - // d.FECRecovered = atomic.LoadUint64(&s.FECRecovered) - // d.FECShortShards = atomic.LoadUint64(&s.FECShortShards) return d } @@ -136,8 +112,6 @@ func (s *Snmp) Reset() { atomic.StoreUint64(&s.ActiveOpens, 0) atomic.StoreUint64(&s.PassiveOpens, 0) atomic.StoreUint64(&s.CurrEstab, 0) - // atomic.StoreUint64(&s.InErrs, 0) - // atomic.StoreUint64(&s.InCsumErrors, 0) atomic.StoreUint64(&s.KCPInErrors, 0) atomic.StoreUint64(&s.InPkts, 0) atomic.StoreUint64(&s.OutPkts, 0) @@ -150,10 +124,6 @@ func (s *Snmp) Reset() { atomic.StoreUint64(&s.EarlyRetransSegs, 0) atomic.StoreUint64(&s.LostSegs, 0) atomic.StoreUint64(&s.RepeatSegs, 0) - // atomic.StoreUint64(&s.FECParityShards, 0) - // atomic.StoreUint64(&s.FECErrs, 0) - // atomic.StoreUint64(&s.FECRecovered, 0) - // atomic.StoreUint64(&s.FECShortShards, 0) } // DefaultSnmp is the global KCP connection statistics collector diff --git a/gate/kcp/udp_socket.go b/gate/kcp/udp_socket.go index 734b0429..aa4e3298 100644 --- a/gate/kcp/udp_socket.go +++ b/gate/kcp/udp_socket.go @@ -1,8 +1,6 @@ package kcp import ( - "bytes" - "encoding/binary" "net" "sync/atomic" @@ -21,12 +19,24 @@ func (s *UDPSession) defaultReadLoop() { if src == "" { // set source address src = addr.String() } else if addr.String() != src { - // atomic.AddUint64(&DefaultSnmp.InErrs, 1) - // continue s.remote = addr src = addr.String() } + if n == 20 { + connType, _, conv, err := ParseEnet(udpPayload) + if err != nil { + continue + } + if conv != s.GetConv() { + continue + } + if connType == ConnEnetFin { + s.Close() + continue + } + } + s.packetInput(udpPayload) } else { s.notifyReadError(errors.WithStack(err)) @@ -42,27 +52,13 @@ func (l *Listener) defaultMonitor() { udpPayload := buf[:n] var convId uint64 = 0 if n == 20 { - // 原神KCP的Enet协议 - // 提取convId - convId += uint64(udpPayload[4]) << 24 - convId += uint64(udpPayload[5]) << 16 - convId += uint64(udpPayload[6]) << 8 - convId += uint64(udpPayload[7]) << 0 - convId += uint64(udpPayload[8]) << 56 - convId += uint64(udpPayload[9]) << 48 - convId += uint64(udpPayload[10]) << 40 - convId += uint64(udpPayload[11]) << 32 - // 提取Enet协议头部和尾部幻数 - udpPayloadEnetHead := udpPayload[:4] - udpPayloadEnetTail := udpPayload[len(udpPayload)-4:] - // 提取Enet协议类型 - enetTypeData := udpPayload[12:16] - enetTypeDataBuffer := bytes.NewBuffer(enetTypeData) - var enetType uint32 - _ = binary.Read(enetTypeDataBuffer, binary.BigEndian, &enetType) - equalHead := bytes.Compare(udpPayloadEnetHead, MagicEnetSynHead) - equalTail := bytes.Compare(udpPayloadEnetTail, MagicEnetSynTail) - if equalHead == 0 && equalTail == 0 { + connType, enetType, conv, err := ParseEnet(udpPayload) + if err != nil { + continue + } + convId = conv + switch connType { + case ConnEnetSyn: // 客户端前置握手获取conv l.EnetNotify <- &Enet{ Addr: from.String(), @@ -70,11 +66,7 @@ func (l *Listener) defaultMonitor() { ConnType: ConnEnetSyn, EnetType: enetType, } - continue - } - equalHead = bytes.Compare(udpPayloadEnetHead, MagicEnetEstHead) - equalTail = bytes.Compare(udpPayloadEnetTail, MagicEnetEstTail) - if equalHead == 0 && equalTail == 0 { + case ConnEnetEst: // 连接建立 l.EnetNotify <- &Enet{ Addr: from.String(), @@ -82,11 +74,7 @@ func (l *Listener) defaultMonitor() { ConnType: ConnEnetEst, EnetType: enetType, } - continue - } - equalHead = bytes.Compare(udpPayloadEnetHead, MagicEnetFinHead) - equalTail = bytes.Compare(udpPayloadEnetTail, MagicEnetFinTail) - if equalHead == 0 && equalTail == 0 { + case ConnEnetFin: // 连接断开 l.EnetNotify <- &Enet{ Addr: from.String(), @@ -94,6 +82,7 @@ func (l *Listener) defaultMonitor() { ConnType: ConnEnetFin, EnetType: enetType, } + default: continue } } else { @@ -129,61 +118,6 @@ func (l *Listener) defaultMonitor() { } } -func buildEnet(connType uint8, enetType uint32, conv uint64) []byte { - data := make([]byte, 20) - if connType == ConnEnetSyn { - copy(data[0:4], MagicEnetSynHead) - copy(data[16:20], MagicEnetSynTail) - } else if connType == ConnEnetEst { - copy(data[0:4], MagicEnetEstHead) - copy(data[16:20], MagicEnetEstTail) - } else if connType == ConnEnetFin { - copy(data[0:4], MagicEnetFinHead) - copy(data[16:20], MagicEnetFinTail) - } else { - return nil - } - // conv的高四个字节和低四个字节分开 - // 例如 00 00 01 45 | LL LL LL LL | HH HH HH HH | 49 96 02 d2 | 14 51 45 45 - data[4] = uint8(conv >> 24) - data[5] = uint8(conv >> 16) - data[6] = uint8(conv >> 8) - data[7] = uint8(conv >> 0) - data[8] = uint8(conv >> 56) - data[9] = uint8(conv >> 48) - data[10] = uint8(conv >> 40) - data[11] = uint8(conv >> 32) - // Enet - data[12] = uint8(enetType >> 24) - data[13] = uint8(enetType >> 16) - data[14] = uint8(enetType >> 8) - data[15] = uint8(enetType >> 0) - return data -} - -func (l *Listener) defaultSendEnetNotifyToClient(enet *Enet) { - remoteAddr, err := net.ResolveUDPAddr("udp", enet.Addr) - if err != nil { - return - } - data := buildEnet(enet.ConnType, enet.EnetType, enet.ConvId) - if data == nil { - return - } - _, _ = l.conn.WriteTo(data, remoteAddr) -} - -func (s *UDPSession) defaultSendEnetNotify(enet *Enet) { - data := buildEnet(enet.ConnType, enet.EnetType, s.GetConv()) - if data == nil { - return - } - s.defaultTx([]ipv4.Message{{ - Buffers: [][]byte{data}, - Addr: s.remote, - }}) -} - func (s *UDPSession) defaultTx(txqueue []ipv4.Message) { nbytes := 0 npkts := 0 @@ -199,3 +133,26 @@ func (s *UDPSession) defaultTx(txqueue []ipv4.Message) { atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts)) atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes)) } + +func (l *Listener) defaultSendEnetNotifyToPeer(enet *Enet) { + remoteAddr, err := net.ResolveUDPAddr("udp", enet.Addr) + if err != nil { + return + } + data := BuildEnet(enet.ConnType, enet.EnetType, enet.ConvId) + if data == nil { + return + } + _, _ = l.conn.WriteTo(data, remoteAddr) +} + +func (s *UDPSession) defaultSendEnetNotifyToPeer(enet *Enet) { + data := BuildEnet(enet.ConnType, enet.EnetType, s.GetConv()) + if data == nil { + return + } + s.defaultTx([]ipv4.Message{{ + Buffers: [][]byte{data}, + Addr: s.remote, + }}) +} diff --git a/gate/kcp/udp_socket_linux.go b/gate/kcp/udp_socket_linux.go index 32309b52..b1551d2e 100644 --- a/gate/kcp/udp_socket_linux.go +++ b/gate/kcp/udp_socket_linux.go @@ -4,8 +4,6 @@ package kcp import ( - "bytes" - "encoding/binary" "net" "os" "sync/atomic" @@ -15,6 +13,15 @@ import ( "golang.org/x/net/ipv6" ) +const ( + batchSize = 16 +) + +type batchConn interface { + WriteBatch(ms []ipv4.Message, flags int) (int, error) + ReadBatch(ms []ipv4.Message, flags int) (int, error) +} + // the read loop for a client session func (s *UDPSession) readLoop() { // default version @@ -39,14 +46,26 @@ func (s *UDPSession) readLoop() { if src == "" { // set source address if nil src = msg.Addr.String() } else if msg.Addr.String() != src { - // atomic.AddUint64(&DefaultSnmp.InErrs, 1) - // continue s.remote = msg.Addr src = msg.Addr.String() } udpPayload := msg.Buffers[0][:msg.N] + if msg.N == 20 { + connType, _, conv, err := ParseEnet(udpPayload) + if err != nil { + continue + } + if conv != s.GetConv() { + continue + } + if connType == ConnEnetFin { + s.Close() + continue + } + } + // source and size has validated s.packetInput(udpPayload) } @@ -101,27 +120,13 @@ func (l *Listener) monitor() { udpPayload := msg.Buffers[0][:msg.N] var convId uint64 = 0 if msg.N == 20 { - // 原神KCP的Enet协议 - // 提取convId - convId += uint64(udpPayload[4]) << 24 - convId += uint64(udpPayload[5]) << 16 - convId += uint64(udpPayload[6]) << 8 - convId += uint64(udpPayload[7]) << 0 - convId += uint64(udpPayload[8]) << 56 - convId += uint64(udpPayload[9]) << 48 - convId += uint64(udpPayload[10]) << 40 - convId += uint64(udpPayload[11]) << 32 - // 提取Enet协议头部和尾部幻数 - udpPayloadEnetHead := udpPayload[:4] - udpPayloadEnetTail := udpPayload[len(udpPayload)-4:] - // 提取Enet协议类型 - enetTypeData := udpPayload[12:16] - enetTypeDataBuffer := bytes.NewBuffer(enetTypeData) - var enetType uint32 - _ = binary.Read(enetTypeDataBuffer, binary.BigEndian, &enetType) - equalHead := bytes.Compare(udpPayloadEnetHead, MagicEnetSynHead) - equalTail := bytes.Compare(udpPayloadEnetTail, MagicEnetSynTail) - if equalHead == 0 && equalTail == 0 { + connType, enetType, conv, err := ParseEnet(udpPayload) + if err != nil { + continue + } + convId = conv + switch connType { + case ConnEnetSyn: // 客户端前置握手获取conv l.EnetNotify <- &Enet{ Addr: msg.Addr.String(), @@ -129,11 +134,7 @@ func (l *Listener) monitor() { ConnType: ConnEnetSyn, EnetType: enetType, } - continue - } - equalHead = bytes.Compare(udpPayloadEnetHead, MagicEnetEstHead) - equalTail = bytes.Compare(udpPayloadEnetTail, MagicEnetEstTail) - if equalHead == 0 && equalTail == 0 { + case ConnEnetEst: // 连接建立 l.EnetNotify <- &Enet{ Addr: msg.Addr.String(), @@ -141,11 +142,7 @@ func (l *Listener) monitor() { ConnType: ConnEnetEst, EnetType: enetType, } - continue - } - equalHead = bytes.Compare(udpPayloadEnetHead, MagicEnetFinHead) - equalTail = bytes.Compare(udpPayloadEnetTail, MagicEnetFinTail) - if equalHead == 0 && equalTail == 0 { + case ConnEnetFin: // 连接断开 l.EnetNotify <- &Enet{ Addr: msg.Addr.String(), @@ -153,6 +150,7 @@ func (l *Listener) monitor() { ConnType: ConnEnetFin, EnetType: enetType, } + default: continue } } else { @@ -200,55 +198,6 @@ func (l *Listener) monitor() { } } -func (l *Listener) SendEnetNotifyToClient(enet *Enet) { - var xconn batchConn - _, ok := l.conn.(*net.UDPConn) - if !ok { - return - } - localAddr, err := net.ResolveUDPAddr("udp", l.conn.LocalAddr().String()) - if err != nil { - return - } - if localAddr.IP.To4() != nil { - xconn = ipv4.NewPacketConn(l.conn) - } else { - xconn = ipv6.NewPacketConn(l.conn) - } - - // default version - if xconn == nil { - l.defaultSendEnetNotifyToClient(enet) - return - } - - remoteAddr, err := net.ResolveUDPAddr("udp", enet.Addr) - if err != nil { - return - } - - data := buildEnet(enet.ConnType, enet.EnetType, enet.ConvId) - if data == nil { - return - } - - _, _ = xconn.WriteBatch([]ipv4.Message{{ - Buffers: [][]byte{data}, - Addr: remoteAddr, - }}, 0) -} - -func (s *UDPSession) SendEnetNotify(enet *Enet) { - data := buildEnet(enet.ConnType, enet.EnetType, s.GetConv()) - if data == nil { - return - } - s.tx([]ipv4.Message{{ - Buffers: [][]byte{data}, - Addr: s.remote, - }}) -} - func (s *UDPSession) tx(txqueue []ipv4.Message) { // default version if s.xconn == nil || s.xconnWriteError != nil { @@ -287,3 +236,52 @@ func (s *UDPSession) tx(txqueue []ipv4.Message) { atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts)) atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes)) } + +func (l *Listener) SendEnetNotifyToPeer(enet *Enet) { + var xconn batchConn + _, ok := l.conn.(*net.UDPConn) + if !ok { + return + } + localAddr, err := net.ResolveUDPAddr("udp", l.conn.LocalAddr().String()) + if err != nil { + return + } + if localAddr.IP.To4() != nil { + xconn = ipv4.NewPacketConn(l.conn) + } else { + xconn = ipv6.NewPacketConn(l.conn) + } + + // default version + if xconn == nil { + l.defaultSendEnetNotifyToPeer(enet) + return + } + + remoteAddr, err := net.ResolveUDPAddr("udp", enet.Addr) + if err != nil { + return + } + + data := BuildEnet(enet.ConnType, enet.EnetType, enet.ConvId) + if data == nil { + return + } + + _, _ = xconn.WriteBatch([]ipv4.Message{{ + Buffers: [][]byte{data}, + Addr: remoteAddr, + }}, 0) +} + +func (s *UDPSession) SendEnetNotifyToPeer(enet *Enet) { + data := BuildEnet(enet.ConnType, enet.EnetType, s.GetConv()) + if data == nil { + return + } + s.tx([]ipv4.Message{{ + Buffers: [][]byte{data}, + Addr: s.remote, + }}) +} diff --git a/gate/kcp/udp_socket_windows.go b/gate/kcp/udp_socket_windows.go index aa76bd6d..d542cab2 100644 --- a/gate/kcp/udp_socket_windows.go +++ b/gate/kcp/udp_socket_windows.go @@ -15,12 +15,12 @@ func (l *Listener) monitor() { l.defaultMonitor() } -func (l *Listener) SendEnetNotifyToClient(enet *Enet) { - l.defaultSendEnetNotifyToClient(enet) +func (l *Listener) SendEnetNotifyToPeer(enet *Enet) { + l.defaultSendEnetNotifyToPeer(enet) } -func (s *UDPSession) SendEnetNotify(enet *Enet) { - s.defaultSendEnetNotify(enet) +func (s *UDPSession) SendEnetNotifyToPeer(enet *Enet) { + s.defaultSendEnetNotifyToPeer(enet) } func (s *UDPSession) tx(txqueue []ipv4.Message) { diff --git a/gate/net/kcp_connect_manager.go b/gate/net/kcp_connect_manager.go index 61d9b175..c0a236f8 100644 --- a/gate/net/kcp_connect_manager.go +++ b/gate/net/kcp_connect_manager.go @@ -238,7 +238,7 @@ func (k *KcpConnectManager) enetHandle(listener *kcp.Listener) { break } } - listener.SendEnetNotifyToClient(&kcp.Enet{ + listener.SendEnetNotifyToPeer(&kcp.Enet{ Addr: enetNotify.Addr, ConvId: conv, ConnType: kcp.ConnEnetEst, @@ -251,7 +251,7 @@ func (k *KcpConnectManager) enetHandle(listener *kcp.Listener) { logger.Error("session not exist, conv: %v", enetNotify.ConvId) continue } - session.conn.SendEnetNotify(&kcp.Enet{ + session.conn.SendEnetNotifyToPeer(&kcp.Enet{ ConnType: kcp.ConnEnetFin, EnetType: enetNotify.EnetType, }) @@ -404,7 +404,7 @@ func (k *KcpConnectManager) closeKcpConn(session *Session, enetType uint32) { // 关闭连接 err := conn.Close() if err == nil { - conn.SendEnetNotify(&kcp.Enet{ + conn.SendEnetNotifyToPeer(&kcp.Enet{ ConnType: kcp.ConnEnetFin, EnetType: enetType, }) diff --git a/robot/login/http_login.go b/robot/login/http_login.go index cb19a36b..f58c9205 100644 --- a/robot/login/http_login.go +++ b/robot/login/http_login.go @@ -143,7 +143,9 @@ func AccountLogin(url string, account string, password string) (*AccountInfo, er return nil, err } comboTokenReq := &api.ComboTokenReq{ - Data: string(loginTokenDataJson), + AppID: 4, + ChannelID: 1, + Data: string(loginTokenDataJson), } logger.Info("http post url: %v", url+"/hk4e_global/combo/granter/login/v2/login") comboTokenRsp, err := httpclient.PostJson[api.ComboTokenRsp](url+"/hk4e_global/combo/granter/login/v2/login", comboTokenReq, "") diff --git a/robot/net/session.go b/robot/net/session.go index d077e457..8691bb9f 100644 --- a/robot/net/session.go +++ b/robot/net/session.go @@ -3,6 +3,7 @@ package net import ( "time" + "hk4e/gate/client_proto" "hk4e/gate/kcp" hk4egatenet "hk4e/gate/net" "hk4e/pkg/logger" @@ -10,10 +11,12 @@ import ( ) type Session struct { - Conn *kcp.UDPSession - XorKey []byte - SendChan chan *hk4egatenet.ProtoMsg - RecvChan chan *hk4egatenet.ProtoMsg + Conn *kcp.UDPSession + XorKey []byte + SendChan chan *hk4egatenet.ProtoMsg + RecvChan chan *hk4egatenet.ProtoMsg + ServerCmdProtoMap *cmd.CmdProtoMap + ClientCmdProtoMap *client_proto.ClientCmdProtoMap } func NewSession(gateAddr string, dispatchKey []byte, localPort int) (*Session, error) { @@ -27,13 +30,13 @@ func NewSession(gateAddr string, dispatchKey []byte, localPort int) (*Session, e } conn.SetACKNoDelay(true) conn.SetWriteDelay(false) - sendChan := make(chan *hk4egatenet.ProtoMsg, 1000) - recvChan := make(chan *hk4egatenet.ProtoMsg, 1000) r := &Session{ - Conn: conn, - XorKey: dispatchKey, - SendChan: sendChan, - RecvChan: recvChan, + Conn: conn, + XorKey: dispatchKey, + SendChan: make(chan *hk4egatenet.ProtoMsg, 1000), + RecvChan: make(chan *hk4egatenet.ProtoMsg, 1000), + ServerCmdProtoMap: cmd.NewCmdProtoMap(), + ClientCmdProtoMap: client_proto.NewClientCmdProtoMap(), } go r.recvHandle() go r.sendHandle() @@ -58,7 +61,7 @@ func (s *Session) recvHandle() { kcpMsgList := make([]*hk4egatenet.KcpMsg, 0) hk4egatenet.DecodeBinToPayload(recvData, &dataBuf, convId, &kcpMsgList, s.XorKey) for _, v := range kcpMsgList { - protoMsgList := hk4egatenet.ProtoDecode(v, cmd.NewCmdProtoMap(), nil) + protoMsgList := hk4egatenet.ProtoDecode(v, s.ServerCmdProtoMap, s.ClientCmdProtoMap) for _, vv := range protoMsgList { s.RecvChan <- vv } @@ -77,7 +80,7 @@ func (s *Session) sendHandle() { _ = conn.Close() break } - kcpMsg := hk4egatenet.ProtoEncode(protoMsg, cmd.NewCmdProtoMap(), nil) + kcpMsg := hk4egatenet.ProtoEncode(protoMsg, s.ServerCmdProtoMap, s.ClientCmdProtoMap) if kcpMsg == nil { logger.Error("decode kcp msg is nil, convId: %v", convId) continue