diff --git a/fileutil/file.go b/fileutil/file.go index 878cdcd..a5631b0 100644 --- a/fileutil/file.go +++ b/fileutil/file.go @@ -16,12 +16,14 @@ import ( "fmt" "io" "io/fs" + "log" "net/http" "os" "path/filepath" "runtime" "sort" "strings" + "sync" "github.com/duke-git/lancet/v2/validator" ) @@ -866,3 +868,89 @@ func isCsvSupportedType(v interface{}) bool { return false } } + +// ChunkRead 从文件的指定偏移读取块并返回块内所有行 +func ChunkRead(f *os.File, offset int64, size int, bufPool *sync.Pool) []string { + buf := bufPool.Get().([]byte)[:size] // 从Pool获取缓冲区并调整大小 + n, err := f.ReadAt(buf, offset) // 从指定偏移读取数据到缓冲区 + if err != nil && err != io.EOF { + log.Fatal(err) + } + buf = buf[:n] // 调整切片以匹配实际读取的字节数 + + var lines []string + var lineStart int + for i, b := range buf { + if b == '\n' { + line := string(buf[lineStart:i]) // 不包括换行符 + lines = append(lines, line) + lineStart = i + 1 // 设置下一行的开始 + } + } + + if lineStart < len(buf) { // 处理块末尾的行 + line := string(buf[lineStart:]) + lines = append(lines, line) + } + bufPool.Put(buf) // 读取完成后,将缓冲区放回Pool + return lines +} + +// 并行读取文件并将每个块的行发送到指定通道 +// filePath 文件路径 +// ChunkSizeMB 分块的大小(单位MB,设置为0时使用默认100MB),设置过大反而不利,视情调整 +// MaxGoroutine 并发读取分块的数量,设置为0时使用CPU核心数 +// linesCh用于接收返回结果的通道。 +func ParallelChunkRead(filePath string, linesCh chan<- []string, ChunkSizeMB, MaxGoroutine int) { + if ChunkSizeMB == 0 { + ChunkSizeMB = 100 + } + ChunkSize := ChunkSizeMB * 1024 * 1024 + // 内存复用 + bufPool := sync.Pool{ + New: func() interface{} { + return make([]byte, 0, ChunkSize) + }, + } + + if MaxGoroutine == 0 { + MaxGoroutine = runtime.NumCPU() // 设置为0时使用CPU核心数 + } + + f, err := os.Open(filePath) + if err != nil { + log.Fatalf("failed to open file: %v", err) + } + defer f.Close() + + info, err := f.Stat() + if err != nil { + log.Fatalf("failed to get file info: %v", err) + } + + wg := sync.WaitGroup{} + chunkOffsetCh := make(chan int64, MaxGoroutine) + + // 分配工作 + go func() { + for i := int64(0); i < info.Size(); i += int64(ChunkSize) { + chunkOffsetCh <- i + } + close(chunkOffsetCh) + }() + + // 启动工作协程 + for i := 0; i < MaxGoroutine; i++ { + wg.Add(1) + go func() { + for chunkOffset := range chunkOffsetCh { + linesCh <- ChunkRead(f, chunkOffset, ChunkSize, &bufPool) + } + wg.Done() + }() + } + + // 等待所有解析完成后关闭行通道 + wg.Wait() + close(linesCh) +}