From e27df00fa8016550c454bd25a634a5b4b8b68ba5 Mon Sep 17 00:00:00 2001 From: Tuuuuuuuuu Date: Tue, 14 Jan 2025 16:19:45 +0800 Subject: [PATCH 1/3] make Bridge not block in the first stream that not closed (#288) * not block in the first channel * make Bridge not block in the first stream that not closed * Bridge with test --- concurrency/channel.go | 19 +++++++++++-------- concurrency/channel_example_test.go | 19 ++++++++++++------- concurrency/channel_test.go | 12 ++++++++---- 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/concurrency/channel.go b/concurrency/channel.go index c610971..876d14e 100644 --- a/concurrency/channel.go +++ b/concurrency/channel.go @@ -157,10 +157,10 @@ func (c *Channel[T]) Tee(ctx context.Context, in <-chan T) (<-chan T, <-chan T) // Play: https://go.dev/play/p/qmWSy1NVF-Y func (c *Channel[T]) Bridge(ctx context.Context, chanStream <-chan <-chan T) <-chan T { valStream := make(chan T) - go func() { defer close(valStream) - + wg := sync.WaitGroup{} + defer wg.Wait() for { var stream <-chan T select { @@ -169,19 +169,22 @@ func (c *Channel[T]) Bridge(ctx context.Context, chanStream <-chan <-chan T) <-c return } stream = maybeStream + wg.Add(1) case <-ctx.Done(): return } - for val := range c.OrDone(ctx, stream) { - select { - case valStream <- val: - case <-ctx.Done(): + go func() { + defer wg.Done() + for val := range c.OrDone(ctx, stream) { + select { + case valStream <- val: + case <-ctx.Done(): + } } - } + }() } }() - return valStream } diff --git a/concurrency/channel_example_test.go b/concurrency/channel_example_test.go index 9d4686a..dc72281 100644 --- a/concurrency/channel_example_test.go +++ b/concurrency/channel_example_test.go @@ -168,7 +168,8 @@ func ExampleChannel_Tee() { func ExampleChannel_Bridge() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - + m1 := make(map[int]int) + m2 := make(map[int]int) c := NewChannel[int]() genVals := func() <-chan <-chan int { out := make(chan (<-chan int)) @@ -177,6 +178,7 @@ func ExampleChannel_Bridge() { for i := 1; i <= 5; i++ { stream := make(chan int, 1) stream <- i + m1[i]++ close(stream) out <- stream } @@ -185,12 +187,15 @@ func ExampleChannel_Bridge() { } for v := range c.Bridge(ctx, genVals()) { - fmt.Println(v) + m2[v]++ + } + for k, v := range m1 { + fmt.Println(m2[k] == v) } // Output: - // 1 - // 2 - // 3 - // 4 - // 5 + // true + // true + // true + // true + // true } diff --git a/concurrency/channel_test.go b/concurrency/channel_test.go index e977bcf..4c9da23 100644 --- a/concurrency/channel_test.go +++ b/concurrency/channel_test.go @@ -169,7 +169,8 @@ func TestTee(t *testing.T) { func TestBridge(t *testing.T) { t.Parallel() assert := internal.NewAssert(t, "TestBridge") - + m1 := make(map[int]int) + m2 := make(map[int]int) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -181,6 +182,7 @@ func TestBridge(t *testing.T) { for i := 0; i < 10; i++ { stream := make(chan int, 1) stream <- i + m1[i]++ close(stream) chanStream <- stream } @@ -188,9 +190,11 @@ func TestBridge(t *testing.T) { return chanStream } - index := 0 for val := range c.Bridge(ctx, genVals()) { - assert.Equal(index, val) - index++ + m2[val]++ + } + + for k, v := range m1 { + assert.Equal(m2[k], v) } } From f861e18bc3b78676c7d0377b0d4d990649a64c54 Mon Sep 17 00:00:00 2001 From: guanhongli Date: Sun, 26 Jan 2025 12:04:55 +0800 Subject: [PATCH 2/3] compatible with pointer in convert.ToString --- convertor/convertor.go | 7 +++++++ convertor/convertor_test.go | 15 ++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/convertor/convertor.go b/convertor/convertor.go index c3f36a5..e8d4690 100644 --- a/convertor/convertor.go +++ b/convertor/convertor.go @@ -108,6 +108,13 @@ func ToString(value any) string { if value == nil { return "" } + rv := reflect.ValueOf(value) + if rv.Kind() == reflect.Ptr { + if rv.IsNil() { + return "" + } + return ToString(rv.Elem().Interface()) + } switch val := value.(type) { case float32: diff --git a/convertor/convertor_test.go b/convertor/convertor_test.go index bf1c524..21d97ec 100644 --- a/convertor/convertor_test.go +++ b/convertor/convertor_test.go @@ -142,13 +142,24 @@ func TestToString(t *testing.T) { } aStruct := TestStruct{Name: "TestStruct"} + i32Val := int32(123) + i64Val := int64(123) + iZeroVal := 0 + fVal := 12.3 + sVal := "abc" + var iNilPointer *int + var sNilPointer *string + cases := []any{ "", nil, int(0), int8(1), int16(-1), int32(123), int64(123), uint(123), uint8(123), uint16(123), uint32(123), uint64(123), float64(12.3), float32(12.3), true, false, - []int{1, 2, 3}, aMap, aStruct, []byte{104, 101, 108, 108, 111}} + []int{1, 2, 3}, aMap, aStruct, []byte{104, 101, 108, 108, 111}, + &i32Val, &i64Val, &fVal, &sVal, &aStruct, iNilPointer, sNilPointer, + &iZeroVal, + } expected := []string{ "", "", @@ -157,6 +168,8 @@ func TestToString(t *testing.T) { "12.3", "12.3", "true", "false", "[1,2,3]", "{\"a\":1,\"b\":2,\"c\":3}", "{\"Name\":\"TestStruct\"}", "hello", + "123", "123", "12.3", "abc", "{\"Name\":\"TestStruct\"}", "", "", + "0", } for i := 0; i < len(cases); i++ { From fc43138a0ea5da02923ad36cbb8d2538f44f6ccc Mon Sep 17 00:00:00 2001 From: jialulu Date: Sun, 9 Feb 2025 22:23:35 +0800 Subject: [PATCH 3/3] =?UTF-8?q?fix:=20fix=20slice=5Ftest.go=20,We=20should?= =?UTF-8?q?=20not=20assume=20the=20order=20of=20the=20slice=20when=20using?= =?UTF-8?q?=20multithreads=EF=BC=8Csort=20them=20before=20compare?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- slice/slice_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/slice/slice_test.go b/slice/slice_test.go index 9191c73..06d7f03 100644 --- a/slice/slice_test.go +++ b/slice/slice_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math" "reflect" + "sort" "strconv" "strings" "sync" @@ -1794,6 +1795,8 @@ func TestFilterConcurrent(t *testing.T) { nums := []int{1, 2, 3, 4, 5, 6} expected := []int{4, 5, 6} actual := FilterConcurrent(nums, func(_, n int) bool { return n > 3 }, 4) + sort.Ints(actual) + sort.Ints(expected) assert.Equal(expected, actual) }) }