diff --git a/docs/api/packages/slice.md b/docs/api/packages/slice.md index ebbcdf3..e81b531 100644 --- a/docs/api/packages/slice.md +++ b/docs/api/packages/slice.md @@ -53,6 +53,7 @@ import ( - [Flatten](#Flatten) - [FlattenDeep](#FlattenDeep) - [ForEach](#ForEach) +- [ForEachConcurrent](#ForEachConcurrent) - [ForEachWithBreak](#ForEachWithBreak) - [GroupBy](#GroupBy) - [GroupWith](#GroupWith) @@ -1179,6 +1180,43 @@ func main() { } ``` +### ForEachConcurrent + +

对slice并发执行foreach操作。

+ +函数签名: + +```go +func ForEachConcurrent[T any](slice []T, iteratee func(index int, item T), numThreads int) +``` + +示例:[运行]() + +```go +import ( + "fmt" + "github.com/duke-git/lancet/v2/slice" +) + +func main() { + nums := []int{1, 2, 3, 4, 5, 6, 7, 8} + + result := make([]int, len(nums)) + + addOne := func(index int, value int) { + result[index] = value + 1 + } + + slice.ForEachConcurrent(nums, addOne, 4) + + fmt.Println(result) + + // Output: + // [2 3 4 5 6 7 8 9] +} +``` + + ### ForEachWithBreak

遍历切片的元素并为每个元素调用iteratee函数,当iteratee函数返回false时,终止遍历。

diff --git a/docs/en/api/packages/slice.md b/docs/en/api/packages/slice.md index 64b2615..b739fd4 100644 --- a/docs/en/api/packages/slice.md +++ b/docs/en/api/packages/slice.md @@ -53,6 +53,7 @@ import ( - [Flatten](#Flatten) - [FlattenDeep](#FlattenDeep) - [ForEach](#ForEach) +- [ForEachConcurrent](#ForEachConcurrent) - [ForEachWithBreak](#ForEachWithBreak) - [GroupBy](#GroupBy) - [GroupWith](#GroupWith) @@ -1177,6 +1178,42 @@ func main() { } ``` +### ForEachConcurrent + +

Applies the iteratee function to each item in the slice concurrently.

+ +Signature: + +```go +func ForEachConcurrent[T any](slice []T, iteratee func(index int, item T), numThreads int) +``` + +Example:[Run]() + +```go +import ( + "fmt" + "github.com/duke-git/lancet/v2/slice" +) + +func main() { + nums := []int{1, 2, 3, 4, 5, 6, 7, 8} + + result := make([]int, len(nums)) + + addOne := func(index int, value int) { + result[index] = value + 1 + } + + slice.ForEachConcurrent(nums, addOne, 4) + + fmt.Println(result) + + // Output: + // [2 3 4 5 6 7 8 9] +} +``` + ### ForEachWithBreak

Iterates over elements of slice and invokes function for each element, when iteratee return false, will break the for each loop.

diff --git a/slice/slice_concurrent.go b/slice/slice_concurrent.go index 5ba4111..7ff4788 100644 --- a/slice/slice_concurrent.go +++ b/slice/slice_concurrent.go @@ -8,6 +8,47 @@ import ( "sync" ) +// ForEachConcurrent applies the iteratee function to each item in the slice concurrently. +// Play: todo +func ForEachConcurrent[T any](slice []T, iteratee func(index int, item T), numThreads int) { + sliceLen := len(slice) + if sliceLen == 0 { + return + } + + if numThreads <= 0 { + numThreads = 1 + } + + var wg sync.WaitGroup + + chunkSize := (sliceLen + numThreads - 1) / numThreads + + for i := 0; i < numThreads; i++ { + start := i * chunkSize + end := start + chunkSize + + if start >= sliceLen { + break + } + + if end > sliceLen { + end = sliceLen + } + + wg.Add(1) + go func(start, end int) { + defer wg.Done() + + for j := start; j < end; j++ { + iteratee(j, slice[j]) + } + }(start, end) + } + + wg.Wait() +} + // MapConcurrent applies the iteratee function to each item in the slice concurrently. // Play: todo func MapConcurrent[T any, U any](slice []T, iteratee func(index int, item T) U, numThreads int) []U { diff --git a/slice/slice_example_test.go b/slice/slice_example_test.go index d8e7b92..c9bde20 100644 --- a/slice/slice_example_test.go +++ b/slice/slice_example_test.go @@ -429,6 +429,23 @@ func ExampleForEach() { // [2 3 4] } +func ExampleForEachConcurrent() { + nums := []int{1, 2, 3, 4, 5, 6, 7, 8} + + result := make([]int, len(nums)) + + addOne := func(index int, value int) { + result[index] = value + 1 + } + + ForEachConcurrent(nums, addOne, 4) + + fmt.Println(result) + + // Output: + // [2 3 4 5 6 7 8 9] +} + func ExampleForEachWithBreak() { numbers := []int{1, 2, 3, 4, 5} diff --git a/slice/slice_test.go b/slice/slice_test.go index f841946..21b00c2 100644 --- a/slice/slice_test.go +++ b/slice/slice_test.go @@ -410,6 +410,73 @@ func TestForEach(t *testing.T) { assert.Equal([]int{3, 4, 5, 6, 7}, result) } +func TestForEachConcurrent(t *testing.T) { + t.Parallel() + + assert := internal.NewAssert(t, "TestForEachConcurrent") + + t.Run("single thread", func(t *testing.T) { + numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} + result := make([]int, len(numbers)) + + addOne := func(index int, value int) { + result[index] = value + 1 + } + + ForEachConcurrent(numbers, addOne, 1) + + expected := []int{2, 3, 4, 5, 6, 7, 8, 9, 10} + assert.Equal(expected, result) + }) + + t.Run("normal", func(t *testing.T) { + numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} + result := make([]int, len(numbers)) + + addOne := func(index int, value int) { + result[index] = value + 1 + } + + ForEachConcurrent(numbers, addOne, 4) + + expected := []int{2, 3, 4, 5, 6, 7, 8, 9, 10} + assert.Equal(expected, result) + }) + + t.Run("negative threads", func(t *testing.T) { + numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} + result := make([]int, len(numbers)) + + addOne := func(index int, value int) { + result[index] = value + 1 + } + + ForEachConcurrent(numbers, addOne, -4) + + expected := []int{2, 3, 4, 5, 6, 7, 8, 9, 10} + assert.Equal(expected, result) + }) + + t.Run("high number threads", func(t *testing.T) { + numbers := make([]int, 1000) + for i := range numbers { + numbers[i] = i + } + + result := make([]int, len(numbers)) + + addOne := func(index int, value int) { + result[index] = value + 1 + } + + ForEachConcurrent(numbers, addOne, 50) + + for i, item := range numbers { + assert.Equal(item+1, result[i]) + } + }) +} + func TestForEachWithBreak(t *testing.T) { t.Parallel()