diff --git a/fileutil/file.go b/fileutil/file.go index 878cdcd..a5631b0 100644 --- a/fileutil/file.go +++ b/fileutil/file.go @@ -16,12 +16,14 @@ import ( "fmt" "io" "io/fs" + "log" "net/http" "os" "path/filepath" "runtime" "sort" "strings" + "sync" "github.com/duke-git/lancet/v2/validator" ) @@ -866,3 +868,89 @@ func isCsvSupportedType(v interface{}) bool { return false } } + +// ChunkRead 从文件的指定偏移读取块并返回块内所有行 +func ChunkRead(f *os.File, offset int64, size int, bufPool *sync.Pool) []string { + buf := bufPool.Get().([]byte)[:size] // 从Pool获取缓冲区并调整大小 + n, err := f.ReadAt(buf, offset) // 从指定偏移读取数据到缓冲区 + if err != nil && err != io.EOF { + log.Fatal(err) + } + buf = buf[:n] // 调整切片以匹配实际读取的字节数 + + var lines []string + var lineStart int + for i, b := range buf { + if b == '\n' { + line := string(buf[lineStart:i]) // 不包括换行符 + lines = append(lines, line) + lineStart = i + 1 // 设置下一行的开始 + } + } + + if lineStart < len(buf) { // 处理块末尾的行 + line := string(buf[lineStart:]) + lines = append(lines, line) + } + bufPool.Put(buf) // 读取完成后,将缓冲区放回Pool + return lines +} + +// 并行读取文件并将每个块的行发送到指定通道 +// filePath 文件路径 +// ChunkSizeMB 分块的大小(单位MB,设置为0时使用默认100MB),设置过大反而不利,视情调整 +// MaxGoroutine 并发读取分块的数量,设置为0时使用CPU核心数 +// linesCh用于接收返回结果的通道。 +func ParallelChunkRead(filePath string, linesCh chan<- []string, ChunkSizeMB, MaxGoroutine int) { + if ChunkSizeMB == 0 { + ChunkSizeMB = 100 + } + ChunkSize := ChunkSizeMB * 1024 * 1024 + // 内存复用 + bufPool := sync.Pool{ + New: func() interface{} { + return make([]byte, 0, ChunkSize) + }, + } + + if MaxGoroutine == 0 { + MaxGoroutine = runtime.NumCPU() // 设置为0时使用CPU核心数 + } + + f, err := os.Open(filePath) + if err != nil { + log.Fatalf("failed to open file: %v", err) + } + defer f.Close() + + info, err := f.Stat() + if err != nil { + log.Fatalf("failed to get file info: %v", err) + } + + wg := sync.WaitGroup{} + chunkOffsetCh := make(chan int64, MaxGoroutine) + + // 分配工作 + go func() { + for i := int64(0); i < info.Size(); i += int64(ChunkSize) { + chunkOffsetCh <- i + } + close(chunkOffsetCh) + }() + + // 启动工作协程 + for i := 0; i < MaxGoroutine; i++ { + wg.Add(1) + go func() { + for chunkOffset := range chunkOffsetCh { + linesCh <- ChunkRead(f, chunkOffset, ChunkSize, &bufPool) + } + wg.Done() + }() + } + + // 等待所有解析完成后关闭行通道 + wg.Wait() + close(linesCh) +} diff --git a/iterator/iterator.go b/iterator/iterator.go index 1acda7e..949405e 100644 --- a/iterator/iterator.go +++ b/iterator/iterator.go @@ -27,7 +27,15 @@ type Iterator[T any] interface { Next() (item T, ok bool) } -// StopIterator is an interface for stopping Iterator. +// ResettableIterator supports to reset the iterator +type ResettableIterator[T any] interface { + Iterator[T] + // Reset allows for the iteration process over a sequence to be restarted from the beginning. + // It enables reusing the iterator for multiple traversals without needing to recreate it. + Reset() +} + +// StopIterator is an interface for stopping Iterator. type StopIterator[T any] interface { Iterator[T] @@ -81,8 +89,8 @@ type PrevIterator[T any] interface { //////////////////////////////////////////////////////////////////////////////////////////////////// // FromSlice returns an iterator over a slice of data. -func FromSlice[T any](slice []T) Iterator[T] { - return &sliceIterator[T]{slice: slice, index: -1} +func FromSlice[T any](slice []T) *SliceIterator[T] { + return &SliceIterator[T]{slice: slice, index: -1} } func ToSlice[T any](iter Iterator[T]) []T { @@ -93,16 +101,16 @@ func ToSlice[T any](iter Iterator[T]) []T { return result } -type sliceIterator[T any] struct { +type SliceIterator[T any] struct { slice []T index int } -func (iter *sliceIterator[T]) HasNext() bool { +func (iter *SliceIterator[T]) HasNext() bool { return iter.index < len(iter.slice)-1 } -func (iter *sliceIterator[T]) Next() (T, bool) { +func (iter *SliceIterator[T]) Next() (T, bool) { iter.index++ ok := iter.index >= 0 && iter.index < len(iter.slice) @@ -116,7 +124,7 @@ func (iter *sliceIterator[T]) Next() (T, bool) { } // Prev implements PrevIterator. -func (iter *sliceIterator[T]) Prev() { +func (iter *SliceIterator[T]) Prev() { if iter.index == -1 { panic("Next function should be called Prev") } @@ -128,7 +136,7 @@ func (iter *sliceIterator[T]) Prev() { } // Set implements SetIterator. -func (iter *sliceIterator[T]) Set(value T) { +func (iter *SliceIterator[T]) Set(value T) { if iter.index == -1 { panic("Next function should be called Set") } @@ -138,52 +146,60 @@ func (iter *sliceIterator[T]) Set(value T) { iter.slice[iter.index] = value } +func (iter *SliceIterator[T]) Reset() { + iter.index = -1 +} + // FromRange creates a iterator which returns the numeric range between start inclusive and end // exclusive by the step size. start should be less than end, step shoud be positive. -func FromRange[T constraints.Integer | constraints.Float](start, end, step T) Iterator[T] { +func FromRange[T constraints.Integer | constraints.Float](start, end, step T) *RangeIterator[T] { if end < start { panic("RangeIterator: start should be before end") } else if step <= 0 { panic("RangeIterator: step should be positive") } - return &rangeIterator[T]{start: start, end: end, step: step} + return &RangeIterator[T]{start: start, end: end, step: step, current: start} } -type rangeIterator[T constraints.Integer | constraints.Float] struct { - start, end, step T +type RangeIterator[T constraints.Integer | constraints.Float] struct { + start, end, step, current T } -func (iter *rangeIterator[T]) HasNext() bool { - return iter.start < iter.end +func (iter *RangeIterator[T]) HasNext() bool { + return iter.current < iter.end } -func (iter *rangeIterator[T]) Next() (T, bool) { - if iter.start >= iter.end { +func (iter *RangeIterator[T]) Next() (T, bool) { + if iter.current >= iter.end { var zero T return zero, false } - num := iter.start - iter.start += iter.step + num := iter.current + iter.current += iter.step return num, true } -// FromRange creates a iterator which returns the numeric range between start inclusive and end -// exclusive by the step size. start should be less than end, step shoud be positive. -func FromChannel[T any](channel <-chan T) Iterator[T] { - return &channelIterator[T]{channel: channel} +func (iter *RangeIterator[T]) Reset() { + iter.current = iter.start } -type channelIterator[T any] struct { +// FromChannel creates an iterator which returns items received from the provided channel. +// The iteration continues until the channel is closed. +func FromChannel[T any](channel <-chan T) *ChannelIterator[T] { + return &ChannelIterator[T]{channel: channel} +} + +type ChannelIterator[T any] struct { channel <-chan T } -func (iter *channelIterator[T]) Next() (T, bool) { +func (iter *ChannelIterator[T]) Next() (T, bool) { item, ok := <-iter.channel return item, ok } -func (iter *channelIterator[T]) HasNext() bool { +func (iter *ChannelIterator[T]) HasNext() bool { return len(iter.channel) == 0 } diff --git a/iterator/iterator_test.go b/iterator/iterator_test.go index 726175b..3a6e606 100644 --- a/iterator/iterator_test.go +++ b/iterator/iterator_test.go @@ -50,15 +50,36 @@ func TestSliceIterator(t *testing.T) { assert.Equal(false, ok) }) + // Reset + t.Run("slice iterator Reset: ", func(t *testing.T) { + iter1 := FromSlice([]int{1, 2, 3, 4}) + for i := 0; i < 4; i++ { + item, ok := iter1.Next() + if !ok { + break + } + assert.Equal(i+1, item) + } + + iter1.Reset() + + for i := 0; i < 4; i++ { + item, ok := iter1.Next() + if !ok { + break + } + assert.Equal(i+1, item) + } + }) + t.Run("slice iterator ToSlice: ", func(t *testing.T) { iter := FromSlice([]int{1, 2, 3, 4}) item, _ := iter.Next() assert.Equal(1, item) - data := ToSlice(iter) + data := ToSlice[int](iter) assert.Equal([]int{2, 3, 4}, data) }) - } func TestRangeIterator(t *testing.T) { @@ -84,6 +105,54 @@ func TestRangeIterator(t *testing.T) { _, ok = iter.Next() assert.Equal(false, ok) assert.Equal(false, iter.HasNext()) + + iter.Reset() + + item, ok = iter.Next() + assert.Equal(1, item) + assert.Equal(true, ok) + + item, ok = iter.Next() + assert.Equal(2, item) + assert.Equal(true, ok) + + item, ok = iter.Next() + assert.Equal(3, item) + assert.Equal(true, ok) + + _, ok = iter.Next() + assert.Equal(false, ok) + assert.Equal(false, iter.HasNext()) + }) + + t.Run("range iterator reset: ", func(t *testing.T) { + iter := FromRange(1, 4, 1) + + item, ok := iter.Next() + assert.Equal(1, item) + assert.Equal(true, ok) + + item, ok = iter.Next() + assert.Equal(2, item) + assert.Equal(true, ok) + + iter.Reset() + + item, ok = iter.Next() + assert.Equal(1, item) + assert.Equal(true, ok) + + item, ok = iter.Next() + assert.Equal(2, item) + assert.Equal(true, ok) + + item, ok = iter.Next() + assert.Equal(3, item) + assert.Equal(true, ok) + + _, ok = iter.Next() + assert.Equal(false, ok) + assert.Equal(false, iter.HasNext()) }) } @@ -93,7 +162,7 @@ func TestChannelIterator(t *testing.T) { assert := internal.NewAssert(t, "TestRangeIterator") - iter := FromSlice([]int{1, 2, 3, 4}) + var iter Iterator[int] = FromSlice([]int{1, 2, 3, 4}) ctx, cancel := context.WithCancel(context.Background()) iter = FromChannel(ToChannel(ctx, iter, 0)) diff --git a/iterator/operation_test.go b/iterator/operation_test.go index 4c2517e..e0889e9 100644 --- a/iterator/operation_test.go +++ b/iterator/operation_test.go @@ -21,7 +21,7 @@ func TestMapIterator(t *testing.T) { assert := internal.NewAssert(t, "TestMapIterator") - iter := FromSlice([]int{1, 2, 3, 4}) + var iter Iterator[int] = FromSlice([]int{1, 2, 3, 4}) iter = Map(iter, func(n int) int { return n / 2 }) @@ -34,7 +34,7 @@ func TestFilterIterator(t *testing.T) { assert := internal.NewAssert(t, "TestFilterIterator") - iter := FromSlice([]int{1, 2, 3, 4}) + var iter Iterator[int] = FromSlice([]int{1, 2, 3, 4}) iter = Filter(iter, func(n int) bool { return n < 3 }) @@ -47,10 +47,10 @@ func TestJoinIterator(t *testing.T) { assert := internal.NewAssert(t, "TestJoinIterator") - iter1 := FromSlice([]int{1, 2}) - iter2 := FromSlice([]int{3, 4}) + var iter1 Iterator[int] = FromSlice([]int{1, 2}) + var iter2 Iterator[int] = FromSlice([]int{3, 4}) - iter := Join(iter1, iter2) + var iter Iterator[int] = Join(iter1, iter2) item, ok := iter.Next() assert.Equal(1, item) @@ -64,7 +64,7 @@ func TestReduce(t *testing.T) { assert := internal.NewAssert(t, "TestReduce") - iter := FromSlice([]int{1, 2, 3, 4}) + var iter Iterator[int] = FromSlice([]int{1, 2, 3, 4}) sum := Reduce(iter, 0, func(a, b int) int { return a + b }) assert.Equal(10, sum) } @@ -74,7 +74,7 @@ func TestTakeIterator(t *testing.T) { assert := internal.NewAssert(t, "TestTakeIterator") - iter := FromSlice([]int{1, 2, 3, 4, 5}) + var iter Iterator[int] = FromSlice([]int{1, 2, 3, 4, 5}) iter = Take(iter, 3)