From 356351896dcdc28b4895814894356a8dcb6c3bcd Mon Sep 17 00:00:00 2001 From: dudaodong Date: Thu, 8 Aug 2024 10:20:52 +0800 Subject: [PATCH] feat: add UniqueByParallel for slice --- slice/slice_example_test.go | 12 +++++ slice/slice_parallel.go | 96 +++++++++++++++++++++++++++++++++++++ slice/slice_test.go | 14 ++++++ 3 files changed, 122 insertions(+) create mode 100644 slice/slice_parallel.go diff --git a/slice/slice_example_test.go b/slice/slice_example_test.go index 8fa8c13..9bd9803 100644 --- a/slice/slice_example_test.go +++ b/slice/slice_example_test.go @@ -1163,3 +1163,15 @@ func ExampleLeftPadding() { // Output: // [0 0 0 1 2 3 4 5] } + +func ExampleUniqueByParallel() { + nums := []int{1, 2, 3, 1, 2, 4, 5, 6, 4, 7} + numOfThreads := 4 + comparator := func(item int, other int) bool { return item == other } + + result := UniqueByParallel(nums, numOfThreads, comparator) + + fmt.Println(result) + // Output: + // [1 2 3 4 5 6 7] +} diff --git a/slice/slice_parallel.go b/slice/slice_parallel.go new file mode 100644 index 0000000..d485d8c --- /dev/null +++ b/slice/slice_parallel.go @@ -0,0 +1,96 @@ +// Copyright 2024 dudaodong@gmail.com. All rights resulterved. +// Use of this source code is governed by MIT license + +package slice + +import ( + "runtime" + "sync" +) + +// UniqueByParallel removes duplicate elements from the slice by parallel +// The comparator function is used to compare the elements +// The numOfThreads parameter specifies the number of threads to use +// If numOfThreads is less than or equal to 0, it will be set to 1 +// The comparator function should return true if the two elements are equal +// Play: todo +func UniqueByParallel[T comparable](slice []T, numOfThreads int, comparator func(item T, other T) bool) []T { + if numOfThreads <= 0 { + numOfThreads = 1 + } else if numOfThreads > len(slice) { + numOfThreads = len(slice) + } + + maxThreads := runtime.NumCPU() + if numOfThreads > maxThreads { + numOfThreads = maxThreads + } + + removeDuplicate := func(items []T, comparator func(item T, other T) bool) []T { + var result []T + for _, item := range items { + seen := false + for _, r := range result { + if comparator(item, r) { + seen = true + break + } + } + if !seen { + result = append(result, item) + } + } + return result + } + + chunkSize := (len(slice) + numOfThreads - 1) / numOfThreads + + chunks := make([][]T, 0, numOfThreads) + for i := 0; i < len(slice); i += chunkSize { + end := i + chunkSize + if end > len(slice) { + end = len(slice) + } + chunks = append(chunks, slice[i:end]) + } + + type resultChunk struct { + index int + data []T + } + resultCh := make(chan resultChunk, numOfThreads) + var wg sync.WaitGroup + + for i, chunk := range chunks { + wg.Add(1) + go func(index int, chunk []T) { + defer wg.Done() + resultCh <- resultChunk{index, removeDuplicate(chunk, comparator)} + }(i, chunk) + } + + go func() { + wg.Wait() + close(resultCh) + }() + + results := make([][]T, len(chunks)) + for r := range resultCh { + results[r.index] = r.data + } + + result := []T{} + seen := make(map[T]bool) + + for _, chunk := range results { + for _, item := range chunk { + if !seen[item] { + seen[item] = true + result = append(result, item) + } + } + + } + + return result +} diff --git a/slice/slice_test.go b/slice/slice_test.go index c1d4d19..7a543f0 100644 --- a/slice/slice_test.go +++ b/slice/slice_test.go @@ -1448,3 +1448,17 @@ func TestRightPaddingAndLeftPadding(t *testing.T) { padded := LeftPadding(RightPadding(nums, 0, 3), 0, 3) assert.Equal([]int{0, 0, 0, 1, 2, 3, 4, 5, 0, 0, 0}, padded) } + +func TestUniqueByParallel(t *testing.T) { + t.Parallel() + + assert := internal.NewAssert(t, "TestUniqueByParallel") + + nums := []int{1, 2, 3, 1, 2, 4, 5, 6, 4, 7} + numOfThreads := 4 + comparator := func(item int, other int) bool { return item == other } + + result := UniqueByParallel(nums, numOfThreads, comparator) + + assert.Equal([]int{1, 2, 3, 4, 5, 6, 7}, result) +}