1
0
mirror of https://github.com/duke-git/lancet.git synced 2026-02-04 12:52:28 +08:00

feat: add ForEachConcurrent

This commit is contained in:
dudaodong
2024-08-15 16:44:22 +08:00
parent f5d70728c3
commit 305847993c
5 changed files with 200 additions and 0 deletions

View File

@@ -8,6 +8,47 @@ import (
"sync"
)
// ForEachConcurrent applies the iteratee function to each item in the slice concurrently.
// Play: todo
func ForEachConcurrent[T any](slice []T, iteratee func(index int, item T), numThreads int) {
sliceLen := len(slice)
if sliceLen == 0 {
return
}
if numThreads <= 0 {
numThreads = 1
}
var wg sync.WaitGroup
chunkSize := (sliceLen + numThreads - 1) / numThreads
for i := 0; i < numThreads; i++ {
start := i * chunkSize
end := start + chunkSize
if start >= sliceLen {
break
}
if end > sliceLen {
end = sliceLen
}
wg.Add(1)
go func(start, end int) {
defer wg.Done()
for j := start; j < end; j++ {
iteratee(j, slice[j])
}
}(start, end)
}
wg.Wait()
}
// MapConcurrent applies the iteratee function to each item in the slice concurrently.
// Play: todo
func MapConcurrent[T any, U any](slice []T, iteratee func(index int, item T) U, numThreads int) []U {

View File

@@ -429,6 +429,23 @@ func ExampleForEach() {
// [2 3 4]
}
func ExampleForEachConcurrent() {
nums := []int{1, 2, 3, 4, 5, 6, 7, 8}
result := make([]int, len(nums))
addOne := func(index int, value int) {
result[index] = value + 1
}
ForEachConcurrent(nums, addOne, 4)
fmt.Println(result)
// Output:
// [2 3 4 5 6 7 8 9]
}
func ExampleForEachWithBreak() {
numbers := []int{1, 2, 3, 4, 5}

View File

@@ -410,6 +410,73 @@ func TestForEach(t *testing.T) {
assert.Equal([]int{3, 4, 5, 6, 7}, result)
}
func TestForEachConcurrent(t *testing.T) {
t.Parallel()
assert := internal.NewAssert(t, "TestForEachConcurrent")
t.Run("single thread", func(t *testing.T) {
numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
result := make([]int, len(numbers))
addOne := func(index int, value int) {
result[index] = value + 1
}
ForEachConcurrent(numbers, addOne, 1)
expected := []int{2, 3, 4, 5, 6, 7, 8, 9, 10}
assert.Equal(expected, result)
})
t.Run("normal", func(t *testing.T) {
numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
result := make([]int, len(numbers))
addOne := func(index int, value int) {
result[index] = value + 1
}
ForEachConcurrent(numbers, addOne, 4)
expected := []int{2, 3, 4, 5, 6, 7, 8, 9, 10}
assert.Equal(expected, result)
})
t.Run("negative threads", func(t *testing.T) {
numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
result := make([]int, len(numbers))
addOne := func(index int, value int) {
result[index] = value + 1
}
ForEachConcurrent(numbers, addOne, -4)
expected := []int{2, 3, 4, 5, 6, 7, 8, 9, 10}
assert.Equal(expected, result)
})
t.Run("high number threads", func(t *testing.T) {
numbers := make([]int, 1000)
for i := range numbers {
numbers[i] = i
}
result := make([]int, len(numbers))
addOne := func(index int, value int) {
result[index] = value + 1
}
ForEachConcurrent(numbers, addOne, 50)
for i, item := range numbers {
assert.Equal(item+1, result[i])
}
})
}
func TestForEachWithBreak(t *testing.T) {
t.Parallel()