系统架构层面流量控制功能完善

This commit is contained in:
flswld
2023-02-05 07:18:43 +08:00
parent cfb001c18a
commit 94c8db402a
51 changed files with 1049 additions and 2408 deletions

View File

@@ -11,7 +11,6 @@ import (
"crypto/rand"
"encoding/binary"
"encoding/hex"
"hash/crc32"
"io"
"net"
"sync"
@@ -65,16 +64,16 @@ type (
ownConn bool // true if we created conn internally, false if provided by caller
kcp *KCP // KCP ARQ protocol
l *Listener // pointing to the Listener object if it's been accepted by a Listener
block BlockCrypt // block encryption object
// block BlockCrypt // block encryption object
// kcp receiving is based on packets
// recvbuf turns packets into stream
recvbuf []byte
bufptr []byte
// FEC codec
fecDecoder *fecDecoder
fecEncoder *fecEncoder
// // FEC codec
// fecDecoder *fecDecoder
// fecEncoder *fecEncoder
// settings
remote net.Addr // remote peer address
@@ -99,8 +98,8 @@ type (
socketReadErrorOnce sync.Once
socketWriteErrorOnce sync.Once
// nonce generator
nonce Entropy
// // nonce generator
// nonce Entropy
// packets waiting to be sent on wire
txqueue []ipv4.Message
@@ -124,11 +123,11 @@ type (
)
// newUDPSession create a new udp session for client or server
func newUDPSession(conv uint64, dataShards, parityShards int, l *Listener, conn net.PacketConn, ownConn bool, remote net.Addr, block BlockCrypt) *UDPSession {
func newUDPSession(conv uint64, l *Listener, conn net.PacketConn, ownConn bool, remote net.Addr) *UDPSession {
sess := new(UDPSession)
sess.die = make(chan struct{})
sess.nonce = new(nonceAES128)
sess.nonce.Init()
// sess.nonce = new(nonceAES128)
// sess.nonce.Init()
sess.chReadEvent = make(chan struct{}, 1)
sess.chWriteEvent = make(chan struct{}, 1)
sess.chSocketReadError = make(chan struct{})
@@ -137,7 +136,7 @@ func newUDPSession(conv uint64, dataShards, parityShards int, l *Listener, conn
sess.conn = conn
sess.ownConn = ownConn
sess.l = l
sess.block = block
// sess.block = block
sess.recvbuf = make([]byte, mtuLimit)
// cast to writebatch conn
@@ -152,21 +151,21 @@ func newUDPSession(conv uint64, dataShards, parityShards int, l *Listener, conn
}
}
// FEC codec initialization
sess.fecDecoder = newFECDecoder(dataShards, parityShards)
if sess.block != nil {
sess.fecEncoder = newFECEncoder(dataShards, parityShards, cryptHeaderSize)
} else {
sess.fecEncoder = newFECEncoder(dataShards, parityShards, 0)
}
// // FEC codec initialization
// sess.fecDecoder = newFECDecoder(dataShards, parityShards)
// if sess.block != nil {
// sess.fecEncoder = newFECEncoder(dataShards, parityShards, cryptHeaderSize)
// } else {
// sess.fecEncoder = newFECEncoder(dataShards, parityShards, 0)
// }
// calculate additional header size introduced by FEC and encryption
if sess.block != nil {
sess.headerSize += cryptHeaderSize
}
if sess.fecEncoder != nil {
sess.headerSize += fecHeaderSizePlus2
}
// // calculate additional header size introduced by FEC and encryption
// if sess.block != nil {
// sess.headerSize += cryptHeaderSize
// }
// if sess.fecEncoder != nil {
// sess.headerSize += fecHeaderSizePlus2
// }
sess.kcp = NewKCP(conv, func(buf []byte, size int) {
if size >= IKCP_OVERHEAD+sess.headerSize {
@@ -371,9 +370,9 @@ func (s *UDPSession) Close() error {
s.uncork()
// release pending segments
s.kcp.ReleaseTX()
if s.fecDecoder != nil {
s.fecDecoder.release()
}
// if s.fecDecoder != nil {
// s.fecDecoder.release()
// }
s.mu.Unlock()
if s.l != nil { // belongs to listener
@@ -552,25 +551,25 @@ func (s *UDPSession) SetWriteBuffer(bytes int) error {
func (s *UDPSession) output(buf []byte) {
var ecc [][]byte
// 1. FEC encoding
if s.fecEncoder != nil {
ecc = s.fecEncoder.encode(buf)
}
// // 1. FEC encoding
// if s.fecEncoder != nil {
// ecc = s.fecEncoder.encode(buf)
// }
// 2&3. crc32 & encryption
if s.block != nil {
s.nonce.Fill(buf[:nonceSize])
checksum := crc32.ChecksumIEEE(buf[cryptHeaderSize:])
binary.LittleEndian.PutUint32(buf[nonceSize:], checksum)
s.block.Encrypt(buf, buf)
for k := range ecc {
s.nonce.Fill(ecc[k][:nonceSize])
checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
s.block.Encrypt(ecc[k], ecc[k])
}
}
// // 2&3. crc32 & encryption
// if s.block != nil {
// s.nonce.Fill(buf[:nonceSize])
// checksum := crc32.ChecksumIEEE(buf[cryptHeaderSize:])
// binary.LittleEndian.PutUint32(buf[nonceSize:], checksum)
// s.block.Encrypt(buf, buf)
//
// for k := range ecc {
// s.nonce.Fill(ecc[k][:nonceSize])
// checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
// binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
// s.block.Encrypt(ecc[k], ecc[k])
// }
// }
// 4. TxQueue
var msg ipv4.Message
@@ -663,114 +662,130 @@ func (s *UDPSession) notifyWriteError(err error) {
// packet input stage
func (s *UDPSession) packetInput(data []byte) {
decrypted := false
if s.block != nil && len(data) >= cryptHeaderSize {
s.block.Decrypt(data, data)
data = data[nonceSize:]
checksum := crc32.ChecksumIEEE(data[crcSize:])
if checksum == binary.LittleEndian.Uint32(data) {
data = data[crcSize:]
decrypted = true
} else {
atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
}
} else if s.block == nil {
decrypted = true
}
// decrypted := false
// if s.block != nil && len(data) >= cryptHeaderSize {
// s.block.Decrypt(data, data)
// data = data[nonceSize:]
// checksum := crc32.ChecksumIEEE(data[crcSize:])
// if checksum == binary.LittleEndian.Uint32(data) {
// data = data[crcSize:]
// decrypted = true
// } else {
// atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
// }
// } else if s.block == nil {
// decrypted = true
// }
decrypted := true
if decrypted && len(data) >= IKCP_OVERHEAD {
s.kcpInput(data)
}
}
func (s *UDPSession) kcpInput(data []byte) {
var kcpInErrors, fecErrs, fecRecovered, fecParityShards uint64
// var kcpInErrors, fecErrs, fecRecovered, fecParityShards uint64
//
// fecFlag := binary.LittleEndian.Uint16(data[8:])
// if fecFlag == typeData || fecFlag == typeParity { // 16bit kcp cmd [81-84] and frg [0-255] will not overlap with FEC type 0x00f1 0x00f2
// if len(data) >= fecHeaderSizePlus2 {
// f := fecPacket(data)
// if f.flag() == typeParity {
// fecParityShards++
// }
//
// // lock
// s.mu.Lock()
// // if fecDecoder is not initialized, create one with default parameter
// if s.fecDecoder == nil {
// s.fecDecoder = newFECDecoder(1, 1)
// }
// recovers := s.fecDecoder.decode(f)
// if f.flag() == typeData {
// if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 {
// kcpInErrors++
// }
// }
//
// for _, r := range recovers {
// if len(r) >= 2 { // must be larger than 2bytes
// sz := binary.LittleEndian.Uint16(r)
// if int(sz) <= len(r) && sz >= 2 {
// if ret := s.kcp.Input(r[2:sz], false, s.ackNoDelay); ret == 0 {
// fecRecovered++
// } else {
// kcpInErrors++
// }
// } else {
// fecErrs++
// }
// } else {
// fecErrs++
// }
// // recycle the recovers
// xmitBuf.Put(r)
// }
//
// // to notify the readers to receive the data
// if n := s.kcp.PeekSize(); n > 0 {
// s.notifyReadEvent()
// }
// // to notify the writers
// waitsnd := s.kcp.WaitSnd()
// if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) {
// s.notifyWriteEvent()
// }
//
// s.uncork()
// s.mu.Unlock()
// } else {
// atomic.AddUint64(&DefaultSnmp.InErrs, 1)
// }
// } else {
// s.mu.Lock()
// if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 {
// kcpInErrors++
// }
// if n := s.kcp.PeekSize(); n > 0 {
// s.notifyReadEvent()
// }
// waitsnd := s.kcp.WaitSnd()
// if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) {
// s.notifyWriteEvent()
// }
// s.uncork()
// s.mu.Unlock()
// }
fecFlag := binary.LittleEndian.Uint16(data[8:])
if fecFlag == typeData || fecFlag == typeParity { // 16bit kcp cmd [81-84] and frg [0-255] will not overlap with FEC type 0x00f1 0x00f2
if len(data) >= fecHeaderSizePlus2 {
f := fecPacket(data)
if f.flag() == typeParity {
fecParityShards++
}
// lock
s.mu.Lock()
// if fecDecoder is not initialized, create one with default parameter
if s.fecDecoder == nil {
s.fecDecoder = newFECDecoder(1, 1)
}
recovers := s.fecDecoder.decode(f)
if f.flag() == typeData {
if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 {
kcpInErrors++
}
}
for _, r := range recovers {
if len(r) >= 2 { // must be larger than 2bytes
sz := binary.LittleEndian.Uint16(r)
if int(sz) <= len(r) && sz >= 2 {
if ret := s.kcp.Input(r[2:sz], false, s.ackNoDelay); ret == 0 {
fecRecovered++
} else {
kcpInErrors++
}
} else {
fecErrs++
}
} else {
fecErrs++
}
// recycle the recovers
xmitBuf.Put(r)
}
// to notify the readers to receive the data
if n := s.kcp.PeekSize(); n > 0 {
s.notifyReadEvent()
}
// to notify the writers
waitsnd := s.kcp.WaitSnd()
if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) {
s.notifyWriteEvent()
}
s.uncork()
s.mu.Unlock()
} else {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
}
} else {
s.mu.Lock()
if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 {
kcpInErrors++
}
if n := s.kcp.PeekSize(); n > 0 {
s.notifyReadEvent()
}
waitsnd := s.kcp.WaitSnd()
if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) {
s.notifyWriteEvent()
}
s.uncork()
s.mu.Unlock()
var kcpInErrors uint64
s.mu.Lock()
if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 {
kcpInErrors++
}
if n := s.kcp.PeekSize(); n > 0 {
s.notifyReadEvent()
}
waitsnd := s.kcp.WaitSnd()
if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) {
s.notifyWriteEvent()
}
s.uncork()
s.mu.Unlock()
atomic.AddUint64(&DefaultSnmp.InPkts, 1)
atomic.AddUint64(&DefaultSnmp.InBytes, uint64(len(data)))
if fecParityShards > 0 {
atomic.AddUint64(&DefaultSnmp.FECParityShards, fecParityShards)
}
if kcpInErrors > 0 {
atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors)
}
if fecErrs > 0 {
atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs)
}
if fecRecovered > 0 {
atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
}
// if fecParityShards > 0 {
// atomic.AddUint64(&DefaultSnmp.FECParityShards, fecParityShards)
// }
// if fecErrs > 0 {
// atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs)
// }
// if fecRecovered > 0 {
// atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
// }
}
@@ -828,11 +843,11 @@ const (
type (
// Listener defines a server which will be waiting to accept incoming connections
Listener struct {
block BlockCrypt // block encryption
dataShards int // FEC data shard
parityShards int // FEC parity shard
conn net.PacketConn // the underlying packet connection
ownConn bool // true if we created conn internally, false if provided by caller
// block BlockCrypt // block encryption
// dataShards int // FEC data shard
// parityShards int // FEC parity shard
conn net.PacketConn // the underlying packet connection
ownConn bool // true if we created conn internally, false if provided by caller
// 网络切换会话保持改造 将convId作为会话的唯一标识 不再校验源地址
sessions map[uint64]*UDPSession // all sessions accepted by this Listener
@@ -856,21 +871,22 @@ type (
// packet input stage
func (l *Listener) packetInput(data []byte, addr net.Addr, convId uint64) {
decrypted := false
if l.block != nil && len(data) >= cryptHeaderSize {
l.block.Decrypt(data, data)
data = data[nonceSize:]
checksum := crc32.ChecksumIEEE(data[crcSize:])
if checksum == binary.LittleEndian.Uint32(data) {
data = data[crcSize:]
decrypted = true
} else {
atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
}
} else if l.block == nil {
decrypted = true
}
// decrypted := false
// if l.block != nil && len(data) >= cryptHeaderSize {
// l.block.Decrypt(data, data)
// data = data[nonceSize:]
// checksum := crc32.ChecksumIEEE(data[crcSize:])
// if checksum == binary.LittleEndian.Uint32(data) {
// data = data[crcSize:]
// decrypted = true
// } else {
// atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
// }
// } else if l.block == nil {
// decrypted = true
// }
decrypted := true
if decrypted && len(data) >= IKCP_OVERHEAD {
l.sessionLock.RLock()
s, ok := l.sessions[convId]
@@ -879,20 +895,25 @@ func (l *Listener) packetInput(data []byte, addr net.Addr, convId uint64) {
var conv uint64
var sn uint32
convRecovered := false
fecFlag := binary.LittleEndian.Uint16(data[8:])
if fecFlag == typeData || fecFlag == typeParity { // 16bit kcp cmd [81-84] and frg [0-255] will not overlap with FEC type 0x00f1 0x00f2
// packet with FEC
if fecFlag == typeData && len(data) >= fecHeaderSizePlus2+IKCP_OVERHEAD {
conv = binary.LittleEndian.Uint64(data[fecHeaderSizePlus2:])
sn = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2+IKCP_SN_OFFSET:])
convRecovered = true
}
} else {
// packet without FEC
conv = binary.LittleEndian.Uint64(data)
sn = binary.LittleEndian.Uint32(data[IKCP_SN_OFFSET:])
convRecovered = true
}
// fecFlag := binary.LittleEndian.Uint16(data[8:])
// if fecFlag == typeData || fecFlag == typeParity { // 16bit kcp cmd [81-84] and frg [0-255] will not overlap with FEC type 0x00f1 0x00f2
// // packet with FEC
// if fecFlag == typeData && len(data) >= fecHeaderSizePlus2+IKCP_OVERHEAD {
// conv = binary.LittleEndian.Uint64(data[fecHeaderSizePlus2:])
// sn = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2+IKCP_SN_OFFSET:])
// convRecovered = true
// }
// } else {
// // packet without FEC
// conv = binary.LittleEndian.Uint64(data)
// sn = binary.LittleEndian.Uint32(data[IKCP_SN_OFFSET:])
// convRecovered = true
// }
// packet without FEC
conv = binary.LittleEndian.Uint64(data)
sn = binary.LittleEndian.Uint32(data[IKCP_SN_OFFSET:])
convRecovered = true
if ok { // existing connection
if !convRecovered || conv == s.kcp.conv { // parity data or valid conversation
@@ -906,7 +927,7 @@ func (l *Listener) packetInput(data []byte, addr net.Addr, convId uint64) {
if s == nil && convRecovered { // new session
if len(l.chAccepts) < cap(l.chAccepts) { // do not let the new sessions overwhelm accept queue
s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, false, addr, l.block)
s := newUDPSession(conv, l, l.conn, false, addr)
s.kcpInput(data)
l.sessionLock.Lock()
l.sessions[convId] = s
@@ -1047,7 +1068,7 @@ func (l *Listener) closeSession(convId uint64) (ret bool) {
func (l *Listener) Addr() net.Addr { return l.conn.LocalAddr() }
// Listen listens for incoming KCP packets addressed to the local address laddr on the network "udp",
func Listen(laddr string) (net.Listener, error) { return ListenWithOptions(laddr, nil, 0, 0) }
func Listen(laddr string) (net.Listener, error) { return ListenWithOptions(laddr) }
// ListenWithOptions listens for incoming KCP packets addressed to the local address laddr on the network "udp" with packet encryption.
//
@@ -1056,7 +1077,7 @@ func Listen(laddr string) (net.Listener, error) { return ListenWithOptions(laddr
// 'dataShards', 'parityShards' specify how many parity packets will be generated following the data packets.
//
// Check https://github.com/klauspost/reedsolomon for details
func ListenWithOptions(laddr string, block BlockCrypt, dataShards, parityShards int) (*Listener, error) {
func ListenWithOptions(laddr string) (*Listener, error) {
udpaddr, err := net.ResolveUDPAddr("udp", laddr)
if err != nil {
return nil, errors.WithStack(err)
@@ -1066,15 +1087,15 @@ func ListenWithOptions(laddr string, block BlockCrypt, dataShards, parityShards
return nil, errors.WithStack(err)
}
return serveConn(block, dataShards, parityShards, conn, true)
return serveConn(conn, true)
}
// ServeConn serves KCP protocol for a single packet connection.
func ServeConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*Listener, error) {
return serveConn(block, dataShards, parityShards, conn, false)
func ServeConn(conn net.PacketConn) (*Listener, error) {
return serveConn(conn, false)
}
func serveConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketConn, ownConn bool) (*Listener, error) {
func serveConn(conn net.PacketConn, ownConn bool) (*Listener, error) {
l := new(Listener)
l.conn = conn
l.ownConn = ownConn
@@ -1082,9 +1103,9 @@ func serveConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketCo
l.chAccepts = make(chan *UDPSession, acceptBacklog)
l.chSessionClosed = make(chan net.Addr)
l.die = make(chan struct{})
l.dataShards = dataShards
l.parityShards = parityShards
l.block = block
// l.dataShards = dataShards
// l.parityShards = parityShards
// l.block = block
l.chSocketReadError = make(chan struct{})
l.EnetNotify = make(chan *Enet, 1000)
go l.monitor()
@@ -1092,7 +1113,7 @@ func serveConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketCo
}
// Dial connects to the remote address "raddr" on the network "udp" without encryption and FEC
func Dial(raddr string) (net.Conn, error) { return DialWithOptions(raddr, nil, 0, 0) }
func Dial(raddr string) (net.Conn, error) { return DialWithOptions(raddr) }
// DialWithOptions connects to the remote address "raddr" on the network "udp" with packet encryption
//
@@ -1101,7 +1122,7 @@ func Dial(raddr string) (net.Conn, error) { return DialWithOptions(raddr, nil, 0
// 'dataShards', 'parityShards' specify how many parity packets will be generated following the data packets.
//
// Check https://github.com/klauspost/reedsolomon for details
func DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int) (*UDPSession, error) {
func DialWithOptions(raddr string) (*UDPSession, error) {
// network type detection
udpaddr, err := net.ResolveUDPAddr("udp", raddr)
if err != nil {
@@ -1119,26 +1140,26 @@ func DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards in
var convid uint64
binary.Read(rand.Reader, binary.LittleEndian, &convid)
return newUDPSession(convid, dataShards, parityShards, nil, conn, true, udpaddr, block), nil
return newUDPSession(convid, nil, conn, true, udpaddr), nil
}
// NewConn3 establishes a session and talks KCP protocol over a packet connection.
func NewConn3(convid uint64, raddr net.Addr, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
return newUDPSession(convid, dataShards, parityShards, nil, conn, false, raddr, block), nil
func NewConn3(convid uint64, raddr net.Addr, conn net.PacketConn) (*UDPSession, error) {
return newUDPSession(convid, nil, conn, false, raddr), nil
}
// NewConn2 establishes a session and talks KCP protocol over a packet connection.
func NewConn2(raddr net.Addr, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
func NewConn2(raddr net.Addr, conn net.PacketConn) (*UDPSession, error) {
var convid uint64
binary.Read(rand.Reader, binary.LittleEndian, &convid)
return NewConn3(convid, raddr, block, dataShards, parityShards, conn)
return NewConn3(convid, raddr, conn)
}
// NewConn establishes a session and talks KCP protocol over a packet connection.
func NewConn(raddr string, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
func NewConn(raddr string, conn net.PacketConn) (*UDPSession, error) {
udpaddr, err := net.ResolveUDPAddr("udp", raddr)
if err != nil {
return nil, errors.WithStack(err)
}
return NewConn2(udpaddr, block, dataShards, parityShards, conn)
return NewConn2(udpaddr, conn)
}