1
0
mirror of https://github.com/duke-git/lancet.git synced 2026-02-04 12:52:28 +08:00

doc: add doc for ChunkRead and ParallelChunkRead

This commit is contained in:
dudaodong
2024-03-05 11:42:03 +08:00
parent 9bfdc686f8
commit c58c50327c
8 changed files with 386 additions and 25 deletions

View File

@@ -729,6 +729,10 @@ import "github.com/duke-git/lancet/v2/fileutil"
[[play](https://go.dev/play/p/GhLS6d8lH_g)] [[play](https://go.dev/play/p/GhLS6d8lH_g)]
- **<big>ReadFile</big>** : read file or url. - **<big>ReadFile</big>** : read file or url.
[[doc](https://github.com/duke-git/lancet/blob/main/docs/en/api/packages/fileutil.md#ReadFile)] [[doc](https://github.com/duke-git/lancet/blob/main/docs/en/api/packages/fileutil.md#ReadFile)]
- **<big>ChunkRead</big>** : reads a block from the file at the specified offset and returns all lines within the block.
[[doc](https://github.com/duke-git/lancet/blob/main/docs/en/api/packages/fileutil.md#ChunkRead)]
- **<big>ParallelChunkRead</big>** : reads the file in parallel and send each chunk of lines to the specified channel.
[[doc](https://github.com/duke-git/lancet/blob/main/docs/en/api/packages/fileutil.md#ParallelChunkRead)]
<h3 id="formatter"> 10. Formatter contains some functions for data formatting. &nbsp; &nbsp; &nbsp; &nbsp;<a href="#index">index</a></h3> <h3 id="formatter"> 10. Formatter contains some functions for data formatting. &nbsp; &nbsp; &nbsp; &nbsp;<a href="#index">index</a></h3>

View File

@@ -728,6 +728,12 @@ import "github.com/duke-git/lancet/v2/fileutil"
[[play](https://go.dev/play/p/GhLS6d8lH_g)] [[play](https://go.dev/play/p/GhLS6d8lH_g)]
- **<big>ReadFile</big>** : 读取文件或者URL。 - **<big>ReadFile</big>** : 读取文件或者URL。
[[doc](https://github.com/duke-git/lancet/blob/main/docs/api/packages/fileutil.md#ReadFile)] [[doc](https://github.com/duke-git/lancet/blob/main/docs/api/packages/fileutil.md#ReadFile)]
- **<big>ChunkRead</big>** : 从文件的指定偏移读取块并返回块内所有行。
[[doc](https://github.com/duke-git/lancet/blob/main/docs/api/packages/fileutil.md#ChunkRead)]
- **<big>ParallelChunkRead</big>** : 并行读取文件并将每个块的行发送到指定通道。
[[doc](https://github.com/duke-git/lancet/blob/main/docs/api/packages/fileutil.md#ParallelChunkRead)]
<h3 id="formatter"> 10. formatter 格式化器包含一些数据格式化处理方法。&nbsp; &nbsp; &nbsp; &nbsp;<a href="#index">回到目录</a></h3> <h3 id="formatter"> 10. formatter 格式化器包含一些数据格式化处理方法。&nbsp; &nbsp; &nbsp; &nbsp;<a href="#index">回到目录</a></h3>

View File

@@ -50,6 +50,8 @@ import (
- [WriteStringToFile](#WriteStringToFile) - [WriteStringToFile](#WriteStringToFile)
- [WriteBytesToFile](#WriteBytesToFile) - [WriteBytesToFile](#WriteBytesToFile)
- [ReadFile](#ReadFile) - [ReadFile](#ReadFile)
- [ChunkRead](#ChunkRead)
- [ParallelChunkRead](#ParallelChunkRead)
<div STYLE="page-break-after: always;"></div> <div STYLE="page-break-after: always;"></div>
@@ -955,9 +957,123 @@ func main() {
if err != nil { if err != nil {
return return
} }
fmt.Println(string(dat)) fmt.Println(string(dat))
// Output: // Output:
// User-agent: * // User-agent: *
// Disallow: /deny // Disallow: /deny
} }
``` ```
### <span id="ChunkRead">ChunkRead</span>
<p>从文件的指定偏移读取块并返回块内所有行。</p>
<b>函数签名:</b>
```go
func ChunkRead(file *os.File, offset int64, size int, bufPool *sync.Pool) ([]string, error)
```
<b>示例:</b>
```go
package main
import (
"fmt"
"github.com/duke-git/lancet/v2/fileutil"
)
func main() {
const mb = 1024 * 1024
const defaultChunkSizeMB = 100
// test1.csv file content:
// Lili,22,female
// Jim,21,male
filePath := "./testdata/test1.csv" // 替换为你的文件路径
f, err := os.Open(filePath)
if err != nil {
return
}
defer f.Close()
var bufPool = sync.Pool{
New: func() interface{} {
return make([]byte, 0, defaultChunkSizeMB*mb)
},
}
lines, err := fileutil.ChunkRead(f, 0, 100, &bufPool)
if err != nil {
return
}
fmt.Println(lines[0])
fmt.Println(lines[1])
// Output:
// Lili,22,female
// Jim,21,male
}
```
### <span id="ParallelChunkRead">ParallelChunkRead</span>
<p>并行读取文件并将每个块的行发送到指定通道。</p>
<b>函数签名:</b>
```go
// filePath:文件路径
// chunkSizeMB: 分块的大小单位MB设置为0时使用默认100MB,设置过大反而不利,视情调整
// maxGoroutine: 并发读取分块的数量设置为0时使用CPU核心数
// linesCh: 用于接收返回结果的通道。
func ParallelChunkRead(filePath string, linesCh chan<- []string, chunkSizeMB, maxGoroutine int) error
```
<b>示例:</b>
```go
package main
import (
"fmt"
"github.com/duke-git/lancet/v2/fileutil"
)
func main() {
const mb = 1024 * 1024
const defaultChunkSizeMB = 100 // 默认值
numParsers := runtime.NumCPU()
linesCh := make(chan []string, numParsers)
// test1.csv file content:
// Lili,22,female
// Jim,21,male
filePath := "./testdata/test1.csv"
go fileutil.ParallelChunkRead(filePath, linesCh, defaultChunkSizeMB, numParsers)
var totalLines int
for lines := range linesCh {
totalLines += len(lines)
for _, line := range lines {
fmt.Println(line)
}
}
fmt.Println(totalLines)
// Output:
// Lili,22,female
// Jim,21,male
// 2
}
```

View File

@@ -50,6 +50,8 @@ import (
- [WriteStringToFile](#WriteStringToFile) - [WriteStringToFile](#WriteStringToFile)
- [WriteBytesToFile](#WriteBytesToFile) - [WriteBytesToFile](#WriteBytesToFile)
- [ReadFile](#ReadFile) - [ReadFile](#ReadFile)
- [ChunkRead](#ChunkRead)
- [ParallelChunkRead](#ParallelChunkRead)
<div STYLE="page-break-after: always;"></div> <div STYLE="page-break-after: always;"></div>
@@ -961,3 +963,115 @@ func main() {
// Disallow: /deny // Disallow: /deny
} }
``` ```
### <span id="ChunkRead">ChunkRead</span>
<p>reads a block from the file at the specified offset and returns all lines within the block.</p>
<b>Signature :</b>
```go
func ChunkRead(file *os.File, offset int64, size int, bufPool *sync.Pool) ([]string, error)
```
<b>Example:</b>
```go
package main
import (
"fmt"
"github.com/duke-git/lancet/v2/fileutil"
)
func main() {
const mb = 1024 * 1024
const defaultChunkSizeMB = 100
// test1.csv file content:
// Lili,22,female
// Jim,21,male
filePath := "./testdata/test1.csv"
f, err := os.Open(filePath)
if err != nil {
return
}
defer f.Close()
var bufPool = sync.Pool{
New: func() interface{} {
return make([]byte, 0, defaultChunkSizeMB*mb)
},
}
lines, err := fileutil.ChunkRead(f, 0, 100, &bufPool)
if err != nil {
return
}
fmt.Println(lines[0])
fmt.Println(lines[1])
// Output:
// Lili,22,female
// Jim,21,male
}
```
### <span id="ParallelChunkRead">ParallelChunkRead</span>
<p>Reads the file in parallel and send each chunk of lines to the specified channel.</p>
<b>Signature :</b>
```go
// filePath: file path.
// chunkSizeMB: The size of the block (in MB, the default is 100MB when set to 0). Setting it too large will be detrimental. Adjust it as appropriate.
// maxGoroutine: The number of concurrent read chunks, the number of CPU cores used when set to 0.
// linesCh: The channel used to receive the returned results.
func ParallelChunkRead(filePath string, linesCh chan<- []string, chunkSizeMB, maxGoroutine int) error
```
<b>Example:</b>
```go
package main
import (
"fmt"
"github.com/duke-git/lancet/v2/fileutil"
)
func main() {
const mb = 1024 * 1024
const defaultChunkSizeMB = 100 // 默认值
numParsers := runtime.NumCPU()
linesCh := make(chan []string, numParsers)
// test1.csv file content:
// Lili,22,female
// Jim,21,male
filePath := "./testdata/test1.csv"
go fileutil.ParallelChunkRead(filePath, linesCh, defaultChunkSizeMB, numParsers)
var totalLines int
for lines := range linesCh {
totalLines += len(lines)
for _, line := range lines {
fmt.Println(line)
}
}
fmt.Println(totalLines)
// Output:
// Lili,22,female
// Jim,21,male
// 2
}
```

View File

@@ -16,7 +16,6 @@ import (
"fmt" "fmt"
"io" "io"
"io/fs" "io/fs"
"log"
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
@@ -869,12 +868,13 @@ func isCsvSupportedType(v interface{}) bool {
} }
} }
// ChunkRead 从文件的指定偏移读取块并返回块内所有行 // ChunkRead reads a block from the file at the specified offset and returns all lines within the block
func ChunkRead(f *os.File, offset int64, size int, bufPool *sync.Pool) []string { // Play: todo
func ChunkRead(file *os.File, offset int64, size int, bufPool *sync.Pool) ([]string, error) {
buf := bufPool.Get().([]byte)[:size] // 从Pool获取缓冲区并调整大小 buf := bufPool.Get().([]byte)[:size] // 从Pool获取缓冲区并调整大小
n, err := f.ReadAt(buf, offset) // 从指定偏移读取数据到缓冲区 n, err := file.ReadAt(buf, offset) // 从指定偏移读取数据到缓冲区
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
log.Fatal(err) return nil, err
} }
buf = buf[:n] // 调整切片以匹配实际读取的字节数 buf = buf[:n] // 调整切片以匹配实际读取的字节数
@@ -893,58 +893,64 @@ func ChunkRead(f *os.File, offset int64, size int, bufPool *sync.Pool) []string
lines = append(lines, line) lines = append(lines, line)
} }
bufPool.Put(buf) // 读取完成后将缓冲区放回Pool bufPool.Put(buf) // 读取完成后将缓冲区放回Pool
return lines return lines, nil
} }
// 并行读取文件并将每个块的行发送到指定通道 // ParallelChunkRead reads the file in parallel and send each chunk of lines to the specified channel.
// filePath 文件路径 // filePath 文件路径
// ChunkSizeMB 分块的大小单位MB设置为0时使用默认100MB,设置过大反而不利,视情调整 // chunkSizeMB 分块的大小单位MB设置为0时使用默认100MB,设置过大反而不利,视情调整
// MaxGoroutine 并发读取分块的数量设置为0时使用CPU核心数 // maxGoroutine 并发读取分块的数量设置为0时使用CPU核心数
// linesCh用于接收返回结果的通道。 // linesCh用于接收返回结果的通道。
func ParallelChunkRead(filePath string, linesCh chan<- []string, ChunkSizeMB, MaxGoroutine int) { // Play: todo
if ChunkSizeMB == 0 { func ParallelChunkRead(filePath string, linesCh chan<- []string, chunkSizeMB, maxGoroutine int) error {
ChunkSizeMB = 100 if chunkSizeMB == 0 {
chunkSizeMB = 100
} }
ChunkSize := ChunkSizeMB * 1024 * 1024 chunkSize := chunkSizeMB * 1024 * 1024
// 内存复用 // 内存复用
bufPool := sync.Pool{ bufPool := sync.Pool{
New: func() interface{} { New: func() interface{} {
return make([]byte, 0, ChunkSize) return make([]byte, 0, chunkSize)
}, },
} }
if MaxGoroutine == 0 { if maxGoroutine == 0 {
MaxGoroutine = runtime.NumCPU() // 设置为0时使用CPU核心数 maxGoroutine = runtime.NumCPU() // 设置为0时使用CPU核心数
} }
f, err := os.Open(filePath) f, err := os.Open(filePath)
if err != nil { if err != nil {
log.Fatalf("failed to open file: %v", err) return err
} }
defer f.Close() defer f.Close()
info, err := f.Stat() info, err := f.Stat()
if err != nil { if err != nil {
log.Fatalf("failed to get file info: %v", err) return err
} }
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
chunkOffsetCh := make(chan int64, MaxGoroutine) chunkOffsetCh := make(chan int64, maxGoroutine)
// 分配工作 // 分配工作
go func() { go func() {
for i := int64(0); i < info.Size(); i += int64(ChunkSize) { for i := int64(0); i < info.Size(); i += int64(chunkSize) {
chunkOffsetCh <- i chunkOffsetCh <- i
} }
close(chunkOffsetCh) close(chunkOffsetCh)
}() }()
// 启动工作协程 // 启动工作协程
for i := 0; i < MaxGoroutine; i++ { for i := 0; i < maxGoroutine; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
for chunkOffset := range chunkOffsetCh { for chunkOffset := range chunkOffsetCh {
linesCh <- ChunkRead(f, chunkOffset, ChunkSize, &bufPool) chunk, err := ChunkRead(f, chunkOffset, chunkSize, &bufPool)
if err == nil {
linesCh <- chunk
}
} }
wg.Done() wg.Done()
}() }()
@@ -953,4 +959,6 @@ func ParallelChunkRead(filePath string, linesCh chan<- []string, ChunkSizeMB, Ma
// 等待所有解析完成后关闭行通道 // 等待所有解析完成后关闭行通道
wg.Wait() wg.Wait()
close(linesCh) close(linesCh)
return nil
} }

View File

@@ -5,6 +5,8 @@ import (
"io" "io"
"log" "log"
"os" "os"
"runtime"
"sync"
) )
func ExampleIsExist() { func ExampleIsExist() {
@@ -421,8 +423,69 @@ func ExampleReadFile() {
if err != nil { if err != nil {
return return
} }
fmt.Println(string(dat)) fmt.Println(string(dat))
// Output: // Output:
// User-agent: * // User-agent: *
// Disallow: /deny // Disallow: /deny
} }
func ExampleChunkRead() {
const mb = 1024 * 1024
const defaultChunkSizeMB = 100
filePath := "./testdata/test1.csv"
f, err := os.Open(filePath)
if err != nil {
return
}
defer f.Close()
var bufPool = sync.Pool{
New: func() interface{} {
return make([]byte, 0, defaultChunkSizeMB*mb)
},
}
lines, err := ChunkRead(f, 0, 100, &bufPool)
if err != nil {
return
}
fmt.Println(lines[0])
fmt.Println(lines[1])
// Output:
// Lili,22,female
// Jim,21,male
}
func ExampleParallelChunkRead() {
const mb = 1024 * 1024
const defaultChunkSizeMB = 100 // 默认值
numParsers := runtime.NumCPU()
linesCh := make(chan []string, numParsers)
filePath := "./testdata/test1.csv"
go ParallelChunkRead(filePath, linesCh, defaultChunkSizeMB, numParsers)
var totalLines int
for lines := range linesCh {
totalLines += len(lines)
for _, line := range lines {
fmt.Println(line)
}
}
fmt.Println(totalLines)
// Output:
// Lili,22,female
// Jim,21,male
// 2
}

View File

@@ -4,7 +4,9 @@ import (
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"strings" "strings"
"sync"
"testing" "testing"
"github.com/duke-git/lancet/v2/internal" "github.com/duke-git/lancet/v2/internal"
@@ -566,3 +568,54 @@ func TestCopyDir(t *testing.T) {
os.RemoveAll(dest) os.RemoveAll(dest)
} }
func TestParallelChunkRead(t *testing.T) {
assert := internal.NewAssert(t, "TestParallelChunkRead")
const mb = 1024 * 1024
const defaultChunkSizeMB = 100 // 默认值
numParsers := runtime.NumCPU()
linesCh := make(chan []string, numParsers)
filePath := "./testdata/test1.csv" // 替换为你的文件路径
go ParallelChunkRead(filePath, linesCh, defaultChunkSizeMB, numParsers)
var totalLines int
for lines := range linesCh {
totalLines += len(lines)
assert.Equal("Lili,22,female", lines[0])
assert.Equal("Jim,21,male", lines[1])
}
assert.Equal(2, totalLines)
}
func TestChunkRead(t *testing.T) {
assert := internal.NewAssert(t, "TestChunkRead")
const mb = 1024 * 1024
const defaultChunkSizeMB = 100 // 默认值
filePath := "./testdata/test1.csv" // 替换为你的文件路径
f, err := os.Open(filePath)
if err != nil {
return
}
defer f.Close()
var bufPool = sync.Pool{
New: func() interface{} {
return make([]byte, 0, defaultChunkSizeMB*mb)
},
}
lines, err := ChunkRead(f, 0, 100, &bufPool)
assert.Equal("Lili,22,female", lines[0])
assert.Equal("Jim,21,male", lines[1])
}

View File

@@ -1,5 +1,2 @@
Lili,22,female Lili,22,female
Jim,21,male Jim,21,male
1 Lili 22 female
2 Jim 21 male