mirror of
https://github.com/duke-git/lancet.git
synced 2026-02-04 12:52:28 +08:00
Merge branch 'main' of github.com:duke-git/lancet
This commit is contained in:
@@ -16,12 +16,14 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/duke-git/lancet/v2/validator"
|
||||
)
|
||||
@@ -866,3 +868,89 @@ func isCsvSupportedType(v interface{}) bool {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// ChunkRead 从文件的指定偏移读取块并返回块内所有行
|
||||
func ChunkRead(f *os.File, offset int64, size int, bufPool *sync.Pool) []string {
|
||||
buf := bufPool.Get().([]byte)[:size] // 从Pool获取缓冲区并调整大小
|
||||
n, err := f.ReadAt(buf, offset) // 从指定偏移读取数据到缓冲区
|
||||
if err != nil && err != io.EOF {
|
||||
log.Fatal(err)
|
||||
}
|
||||
buf = buf[:n] // 调整切片以匹配实际读取的字节数
|
||||
|
||||
var lines []string
|
||||
var lineStart int
|
||||
for i, b := range buf {
|
||||
if b == '\n' {
|
||||
line := string(buf[lineStart:i]) // 不包括换行符
|
||||
lines = append(lines, line)
|
||||
lineStart = i + 1 // 设置下一行的开始
|
||||
}
|
||||
}
|
||||
|
||||
if lineStart < len(buf) { // 处理块末尾的行
|
||||
line := string(buf[lineStart:])
|
||||
lines = append(lines, line)
|
||||
}
|
||||
bufPool.Put(buf) // 读取完成后,将缓冲区放回Pool
|
||||
return lines
|
||||
}
|
||||
|
||||
// 并行读取文件并将每个块的行发送到指定通道
|
||||
// filePath 文件路径
|
||||
// ChunkSizeMB 分块的大小(单位MB,设置为0时使用默认100MB),设置过大反而不利,视情调整
|
||||
// MaxGoroutine 并发读取分块的数量,设置为0时使用CPU核心数
|
||||
// linesCh用于接收返回结果的通道。
|
||||
func ParallelChunkRead(filePath string, linesCh chan<- []string, ChunkSizeMB, MaxGoroutine int) {
|
||||
if ChunkSizeMB == 0 {
|
||||
ChunkSizeMB = 100
|
||||
}
|
||||
ChunkSize := ChunkSizeMB * 1024 * 1024
|
||||
// 内存复用
|
||||
bufPool := sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, 0, ChunkSize)
|
||||
},
|
||||
}
|
||||
|
||||
if MaxGoroutine == 0 {
|
||||
MaxGoroutine = runtime.NumCPU() // 设置为0时使用CPU核心数
|
||||
}
|
||||
|
||||
f, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to open file: %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
info, err := f.Stat()
|
||||
if err != nil {
|
||||
log.Fatalf("failed to get file info: %v", err)
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
chunkOffsetCh := make(chan int64, MaxGoroutine)
|
||||
|
||||
// 分配工作
|
||||
go func() {
|
||||
for i := int64(0); i < info.Size(); i += int64(ChunkSize) {
|
||||
chunkOffsetCh <- i
|
||||
}
|
||||
close(chunkOffsetCh)
|
||||
}()
|
||||
|
||||
// 启动工作协程
|
||||
for i := 0; i < MaxGoroutine; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
for chunkOffset := range chunkOffsetCh {
|
||||
linesCh <- ChunkRead(f, chunkOffset, ChunkSize, &bufPool)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
// 等待所有解析完成后关闭行通道
|
||||
wg.Wait()
|
||||
close(linesCh)
|
||||
}
|
||||
|
||||
@@ -27,7 +27,15 @@ type Iterator[T any] interface {
|
||||
Next() (item T, ok bool)
|
||||
}
|
||||
|
||||
// StopIterator is an interface for stopping Iterator.
|
||||
// ResettableIterator supports to reset the iterator
|
||||
type ResettableIterator[T any] interface {
|
||||
Iterator[T]
|
||||
// Reset allows for the iteration process over a sequence to be restarted from the beginning.
|
||||
// It enables reusing the iterator for multiple traversals without needing to recreate it.
|
||||
Reset()
|
||||
}
|
||||
|
||||
// StopIterator is an interface for stopping Iterator.
|
||||
type StopIterator[T any] interface {
|
||||
Iterator[T]
|
||||
|
||||
@@ -81,8 +89,8 @@ type PrevIterator[T any] interface {
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// FromSlice returns an iterator over a slice of data.
|
||||
func FromSlice[T any](slice []T) Iterator[T] {
|
||||
return &sliceIterator[T]{slice: slice, index: -1}
|
||||
func FromSlice[T any](slice []T) *SliceIterator[T] {
|
||||
return &SliceIterator[T]{slice: slice, index: -1}
|
||||
}
|
||||
|
||||
func ToSlice[T any](iter Iterator[T]) []T {
|
||||
@@ -93,16 +101,16 @@ func ToSlice[T any](iter Iterator[T]) []T {
|
||||
return result
|
||||
}
|
||||
|
||||
type sliceIterator[T any] struct {
|
||||
type SliceIterator[T any] struct {
|
||||
slice []T
|
||||
index int
|
||||
}
|
||||
|
||||
func (iter *sliceIterator[T]) HasNext() bool {
|
||||
func (iter *SliceIterator[T]) HasNext() bool {
|
||||
return iter.index < len(iter.slice)-1
|
||||
}
|
||||
|
||||
func (iter *sliceIterator[T]) Next() (T, bool) {
|
||||
func (iter *SliceIterator[T]) Next() (T, bool) {
|
||||
iter.index++
|
||||
|
||||
ok := iter.index >= 0 && iter.index < len(iter.slice)
|
||||
@@ -116,7 +124,7 @@ func (iter *sliceIterator[T]) Next() (T, bool) {
|
||||
}
|
||||
|
||||
// Prev implements PrevIterator.
|
||||
func (iter *sliceIterator[T]) Prev() {
|
||||
func (iter *SliceIterator[T]) Prev() {
|
||||
if iter.index == -1 {
|
||||
panic("Next function should be called Prev")
|
||||
}
|
||||
@@ -128,7 +136,7 @@ func (iter *sliceIterator[T]) Prev() {
|
||||
}
|
||||
|
||||
// Set implements SetIterator.
|
||||
func (iter *sliceIterator[T]) Set(value T) {
|
||||
func (iter *SliceIterator[T]) Set(value T) {
|
||||
if iter.index == -1 {
|
||||
panic("Next function should be called Set")
|
||||
}
|
||||
@@ -138,52 +146,60 @@ func (iter *sliceIterator[T]) Set(value T) {
|
||||
iter.slice[iter.index] = value
|
||||
}
|
||||
|
||||
func (iter *SliceIterator[T]) Reset() {
|
||||
iter.index = -1
|
||||
}
|
||||
|
||||
// FromRange creates a iterator which returns the numeric range between start inclusive and end
|
||||
// exclusive by the step size. start should be less than end, step shoud be positive.
|
||||
func FromRange[T constraints.Integer | constraints.Float](start, end, step T) Iterator[T] {
|
||||
func FromRange[T constraints.Integer | constraints.Float](start, end, step T) *RangeIterator[T] {
|
||||
if end < start {
|
||||
panic("RangeIterator: start should be before end")
|
||||
} else if step <= 0 {
|
||||
panic("RangeIterator: step should be positive")
|
||||
}
|
||||
|
||||
return &rangeIterator[T]{start: start, end: end, step: step}
|
||||
return &RangeIterator[T]{start: start, end: end, step: step, current: start}
|
||||
}
|
||||
|
||||
type rangeIterator[T constraints.Integer | constraints.Float] struct {
|
||||
start, end, step T
|
||||
type RangeIterator[T constraints.Integer | constraints.Float] struct {
|
||||
start, end, step, current T
|
||||
}
|
||||
|
||||
func (iter *rangeIterator[T]) HasNext() bool {
|
||||
return iter.start < iter.end
|
||||
func (iter *RangeIterator[T]) HasNext() bool {
|
||||
return iter.current < iter.end
|
||||
}
|
||||
|
||||
func (iter *rangeIterator[T]) Next() (T, bool) {
|
||||
if iter.start >= iter.end {
|
||||
func (iter *RangeIterator[T]) Next() (T, bool) {
|
||||
if iter.current >= iter.end {
|
||||
var zero T
|
||||
return zero, false
|
||||
}
|
||||
num := iter.start
|
||||
iter.start += iter.step
|
||||
num := iter.current
|
||||
iter.current += iter.step
|
||||
return num, true
|
||||
}
|
||||
|
||||
// FromRange creates a iterator which returns the numeric range between start inclusive and end
|
||||
// exclusive by the step size. start should be less than end, step shoud be positive.
|
||||
func FromChannel[T any](channel <-chan T) Iterator[T] {
|
||||
return &channelIterator[T]{channel: channel}
|
||||
func (iter *RangeIterator[T]) Reset() {
|
||||
iter.current = iter.start
|
||||
}
|
||||
|
||||
type channelIterator[T any] struct {
|
||||
// FromChannel creates an iterator which returns items received from the provided channel.
|
||||
// The iteration continues until the channel is closed.
|
||||
func FromChannel[T any](channel <-chan T) *ChannelIterator[T] {
|
||||
return &ChannelIterator[T]{channel: channel}
|
||||
}
|
||||
|
||||
type ChannelIterator[T any] struct {
|
||||
channel <-chan T
|
||||
}
|
||||
|
||||
func (iter *channelIterator[T]) Next() (T, bool) {
|
||||
func (iter *ChannelIterator[T]) Next() (T, bool) {
|
||||
item, ok := <-iter.channel
|
||||
return item, ok
|
||||
}
|
||||
|
||||
func (iter *channelIterator[T]) HasNext() bool {
|
||||
func (iter *ChannelIterator[T]) HasNext() bool {
|
||||
return len(iter.channel) == 0
|
||||
}
|
||||
|
||||
|
||||
@@ -50,15 +50,36 @@ func TestSliceIterator(t *testing.T) {
|
||||
assert.Equal(false, ok)
|
||||
})
|
||||
|
||||
// Reset
|
||||
t.Run("slice iterator Reset: ", func(t *testing.T) {
|
||||
iter1 := FromSlice([]int{1, 2, 3, 4})
|
||||
for i := 0; i < 4; i++ {
|
||||
item, ok := iter1.Next()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
assert.Equal(i+1, item)
|
||||
}
|
||||
|
||||
iter1.Reset()
|
||||
|
||||
for i := 0; i < 4; i++ {
|
||||
item, ok := iter1.Next()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
assert.Equal(i+1, item)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("slice iterator ToSlice: ", func(t *testing.T) {
|
||||
iter := FromSlice([]int{1, 2, 3, 4})
|
||||
item, _ := iter.Next()
|
||||
assert.Equal(1, item)
|
||||
|
||||
data := ToSlice(iter)
|
||||
data := ToSlice[int](iter)
|
||||
assert.Equal([]int{2, 3, 4}, data)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestRangeIterator(t *testing.T) {
|
||||
@@ -84,6 +105,54 @@ func TestRangeIterator(t *testing.T) {
|
||||
_, ok = iter.Next()
|
||||
assert.Equal(false, ok)
|
||||
assert.Equal(false, iter.HasNext())
|
||||
|
||||
iter.Reset()
|
||||
|
||||
item, ok = iter.Next()
|
||||
assert.Equal(1, item)
|
||||
assert.Equal(true, ok)
|
||||
|
||||
item, ok = iter.Next()
|
||||
assert.Equal(2, item)
|
||||
assert.Equal(true, ok)
|
||||
|
||||
item, ok = iter.Next()
|
||||
assert.Equal(3, item)
|
||||
assert.Equal(true, ok)
|
||||
|
||||
_, ok = iter.Next()
|
||||
assert.Equal(false, ok)
|
||||
assert.Equal(false, iter.HasNext())
|
||||
})
|
||||
|
||||
t.Run("range iterator reset: ", func(t *testing.T) {
|
||||
iter := FromRange(1, 4, 1)
|
||||
|
||||
item, ok := iter.Next()
|
||||
assert.Equal(1, item)
|
||||
assert.Equal(true, ok)
|
||||
|
||||
item, ok = iter.Next()
|
||||
assert.Equal(2, item)
|
||||
assert.Equal(true, ok)
|
||||
|
||||
iter.Reset()
|
||||
|
||||
item, ok = iter.Next()
|
||||
assert.Equal(1, item)
|
||||
assert.Equal(true, ok)
|
||||
|
||||
item, ok = iter.Next()
|
||||
assert.Equal(2, item)
|
||||
assert.Equal(true, ok)
|
||||
|
||||
item, ok = iter.Next()
|
||||
assert.Equal(3, item)
|
||||
assert.Equal(true, ok)
|
||||
|
||||
_, ok = iter.Next()
|
||||
assert.Equal(false, ok)
|
||||
assert.Equal(false, iter.HasNext())
|
||||
})
|
||||
|
||||
}
|
||||
@@ -93,7 +162,7 @@ func TestChannelIterator(t *testing.T) {
|
||||
|
||||
assert := internal.NewAssert(t, "TestRangeIterator")
|
||||
|
||||
iter := FromSlice([]int{1, 2, 3, 4})
|
||||
var iter Iterator[int] = FromSlice([]int{1, 2, 3, 4})
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
iter = FromChannel(ToChannel(ctx, iter, 0))
|
||||
|
||||
@@ -21,7 +21,7 @@ func TestMapIterator(t *testing.T) {
|
||||
|
||||
assert := internal.NewAssert(t, "TestMapIterator")
|
||||
|
||||
iter := FromSlice([]int{1, 2, 3, 4})
|
||||
var iter Iterator[int] = FromSlice([]int{1, 2, 3, 4})
|
||||
|
||||
iter = Map(iter, func(n int) int { return n / 2 })
|
||||
|
||||
@@ -34,7 +34,7 @@ func TestFilterIterator(t *testing.T) {
|
||||
|
||||
assert := internal.NewAssert(t, "TestFilterIterator")
|
||||
|
||||
iter := FromSlice([]int{1, 2, 3, 4})
|
||||
var iter Iterator[int] = FromSlice([]int{1, 2, 3, 4})
|
||||
|
||||
iter = Filter(iter, func(n int) bool { return n < 3 })
|
||||
|
||||
@@ -47,10 +47,10 @@ func TestJoinIterator(t *testing.T) {
|
||||
|
||||
assert := internal.NewAssert(t, "TestJoinIterator")
|
||||
|
||||
iter1 := FromSlice([]int{1, 2})
|
||||
iter2 := FromSlice([]int{3, 4})
|
||||
var iter1 Iterator[int] = FromSlice([]int{1, 2})
|
||||
var iter2 Iterator[int] = FromSlice([]int{3, 4})
|
||||
|
||||
iter := Join(iter1, iter2)
|
||||
var iter Iterator[int] = Join(iter1, iter2)
|
||||
|
||||
item, ok := iter.Next()
|
||||
assert.Equal(1, item)
|
||||
@@ -64,7 +64,7 @@ func TestReduce(t *testing.T) {
|
||||
|
||||
assert := internal.NewAssert(t, "TestReduce")
|
||||
|
||||
iter := FromSlice([]int{1, 2, 3, 4})
|
||||
var iter Iterator[int] = FromSlice([]int{1, 2, 3, 4})
|
||||
sum := Reduce(iter, 0, func(a, b int) int { return a + b })
|
||||
assert.Equal(10, sum)
|
||||
}
|
||||
@@ -74,7 +74,7 @@ func TestTakeIterator(t *testing.T) {
|
||||
|
||||
assert := internal.NewAssert(t, "TestTakeIterator")
|
||||
|
||||
iter := FromSlice([]int{1, 2, 3, 4, 5})
|
||||
var iter Iterator[int] = FromSlice([]int{1, 2, 3, 4, 5})
|
||||
|
||||
iter = Take(iter, 3)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user