1
0
mirror of https://github.com/duke-git/lancet.git synced 2026-02-04 12:52:28 +08:00

feat: add ReduceConcurrent

This commit is contained in:
dudaodong
2024-08-15 17:48:26 +08:00
parent 305847993c
commit c0b200f846
5 changed files with 165 additions and 4 deletions

View File

@@ -76,6 +76,50 @@ func MapConcurrent[T any, U any](slice []T, iteratee func(index int, item T) U,
return result
}
// ReduceConcurrent reduces the slice to a single value by applying the reducer function to each item in the slice concurrently.
// Play: todo
func ReduceConcurrent[T any](slice []T, initial T, reducer func(index int, item T, agg T) T, numThreads int) T {
if numThreads <= 0 {
numThreads = 1
}
var wg sync.WaitGroup
var mu sync.Mutex
sliceLen := len(slice)
chunkSize := (sliceLen + numThreads - 1) / numThreads
results := make([]T, numThreads)
for i := 0; i < numThreads; i++ {
start := i * chunkSize
end := start + chunkSize
if end > sliceLen {
end = sliceLen
}
wg.Add(1)
go func(i, start, end int) {
defer wg.Done()
tempResult := initial
for j := start; j < end; j++ {
tempResult = reducer(j, slice[j], tempResult)
}
mu.Lock()
results[i] = tempResult
mu.Unlock()
}(i, start, end)
}
wg.Wait()
result := initial
for i, r := range results {
result = reducer(i, result, r)
}
return result
}
// FilterConcurrent applies the provided filter function `predicate` to each element of the input slice concurrently.
// Play: todo
func FilterConcurrent[T any](slice []T, predicate func(index int, item T) bool, numThreads int) []T {

View File

@@ -527,6 +527,18 @@ func ExampleReduce() {
// 6
}
func ExampleReduceConcurrent() {
nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
result := ReduceConcurrent(nums, 0, func(_ int, item, agg int) int {
return agg + item
}, 1)
fmt.Println(result)
// Output:
// 55
}
func ExampleReduceBy() {
result1 := ReduceBy([]int{1, 2, 3, 4}, 0, func(_ int, item int, agg int) int {
return agg + item

View File

@@ -587,6 +587,44 @@ func TestReduce(t *testing.T) {
}
}
func TestReduceConcurrent(t *testing.T) {
t.Parallel()
assert := internal.NewAssert(t, "TestReduceConcurrent")
t.Run("basic", func(t *testing.T) {
nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
result := ReduceConcurrent(nums, 0, func(_ int, item, agg int) int {
return agg + item
}, 4)
assert.Equal(55, result)
})
t.Run("empty slice", func(t *testing.T) {
nums := []int{}
result := ReduceConcurrent(nums, 0, func(_ int, item, agg int) int {
return agg + item
}, 4)
assert.Equal(0, result)
})
t.Run("single thread", func(t *testing.T) {
nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
result := ReduceConcurrent(nums, 0, func(_ int, item, agg int) int {
return agg + item
}, 1)
assert.Equal(55, result)
})
t.Run("negative threads", func(t *testing.T) {
nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
result := ReduceConcurrent(nums, 0, func(_ int, item, agg int) int {
return agg + item
}, -1)
assert.Equal(55, result)
})
}
func TestReduceBy(t *testing.T) {
t.Parallel()