mirror of
https://github.com/duke-git/lancet.git
synced 2026-03-01 00:35:28 +08:00
增加ParallelChunkRead方法,分块并发读取超大文本 (#192)
This commit is contained in:
@@ -16,12 +16,14 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/fs"
|
"io/fs"
|
||||||
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/duke-git/lancet/v2/validator"
|
"github.com/duke-git/lancet/v2/validator"
|
||||||
)
|
)
|
||||||
@@ -866,3 +868,89 @@ func isCsvSupportedType(v interface{}) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ChunkRead 从文件的指定偏移读取块并返回块内所有行
|
||||||
|
func ChunkRead(f *os.File, offset int64, size int, bufPool *sync.Pool) []string {
|
||||||
|
buf := bufPool.Get().([]byte)[:size] // 从Pool获取缓冲区并调整大小
|
||||||
|
n, err := f.ReadAt(buf, offset) // 从指定偏移读取数据到缓冲区
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
buf = buf[:n] // 调整切片以匹配实际读取的字节数
|
||||||
|
|
||||||
|
var lines []string
|
||||||
|
var lineStart int
|
||||||
|
for i, b := range buf {
|
||||||
|
if b == '\n' {
|
||||||
|
line := string(buf[lineStart:i]) // 不包括换行符
|
||||||
|
lines = append(lines, line)
|
||||||
|
lineStart = i + 1 // 设置下一行的开始
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if lineStart < len(buf) { // 处理块末尾的行
|
||||||
|
line := string(buf[lineStart:])
|
||||||
|
lines = append(lines, line)
|
||||||
|
}
|
||||||
|
bufPool.Put(buf) // 读取完成后,将缓冲区放回Pool
|
||||||
|
return lines
|
||||||
|
}
|
||||||
|
|
||||||
|
// 并行读取文件并将每个块的行发送到指定通道
|
||||||
|
// filePath 文件路径
|
||||||
|
// ChunkSizeMB 分块的大小(单位MB,设置为0时使用默认100MB),设置过大反而不利,视情调整
|
||||||
|
// MaxGoroutine 并发读取分块的数量,设置为0时使用CPU核心数
|
||||||
|
// linesCh用于接收返回结果的通道。
|
||||||
|
func ParallelChunkRead(filePath string, linesCh chan<- []string, ChunkSizeMB, MaxGoroutine int) {
|
||||||
|
if ChunkSizeMB == 0 {
|
||||||
|
ChunkSizeMB = 100
|
||||||
|
}
|
||||||
|
ChunkSize := ChunkSizeMB * 1024 * 1024
|
||||||
|
// 内存复用
|
||||||
|
bufPool := sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return make([]byte, 0, ChunkSize)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if MaxGoroutine == 0 {
|
||||||
|
MaxGoroutine = runtime.NumCPU() // 设置为0时使用CPU核心数
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := os.Open(filePath)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("failed to open file: %v", err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
info, err := f.Stat()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("failed to get file info: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
chunkOffsetCh := make(chan int64, MaxGoroutine)
|
||||||
|
|
||||||
|
// 分配工作
|
||||||
|
go func() {
|
||||||
|
for i := int64(0); i < info.Size(); i += int64(ChunkSize) {
|
||||||
|
chunkOffsetCh <- i
|
||||||
|
}
|
||||||
|
close(chunkOffsetCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// 启动工作协程
|
||||||
|
for i := 0; i < MaxGoroutine; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
for chunkOffset := range chunkOffsetCh {
|
||||||
|
linesCh <- ChunkRead(f, chunkOffset, ChunkSize, &bufPool)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// 等待所有解析完成后关闭行通道
|
||||||
|
wg.Wait()
|
||||||
|
close(linesCh)
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user