From fc6dee9e77162ee7552df7ca13dc2bf8f85b4e0e Mon Sep 17 00:00:00 2001 From: dudaodong Date: Tue, 19 Apr 2022 16:03:12 +0800 Subject: [PATCH] feat: add func in channel.go --- concurrency/channel.go | 33 +++++++++++++++++++++++++++++++++ concurrency/channel_test.go | 23 +++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/concurrency/channel.go b/concurrency/channel.go index 418e7d3..d5468e9 100644 --- a/concurrency/channel.go +++ b/concurrency/channel.go @@ -124,3 +124,36 @@ func (c *Channel) FanIn(ctx context.Context, channels ...<-chan any) <-chan any return multiplexedStream } + +// Or merge one or more done channels into one done channel, which is closed when any done channel is closed +func (c *Channel) Or(channels ...<-chan any) <-chan any { + switch len(channels) { + case 0: + return nil + case 1: + return channels[0] + } + + orDone := make(chan any) + + go func() { + defer close(orDone) + + switch len(channels) { + case 2: + select { + case <-channels[0]: + case <-channels[1]: + } + default: + select { + case <-channels[0]: + case <-channels[1]: + case <-channels[2]: + case <-c.Or(append(channels[3:], orDone)...): + } + } + }() + + return orDone +} diff --git a/concurrency/channel_test.go b/concurrency/channel_test.go index e8cc0cc..74f8712 100644 --- a/concurrency/channel_test.go +++ b/concurrency/channel_test.go @@ -3,6 +3,7 @@ package concurrency import ( "context" "testing" + "time" "github.com/duke-git/lancet/v2/internal" ) @@ -108,3 +109,25 @@ func TestFanIn(t *testing.T) { assert.Equal(1, 1) } + +func TestOr(t *testing.T) { + assert := internal.NewAssert(t, "TestOr") + + sig := func(after time.Duration) <-chan any { + c := make(chan interface{}) + go func() { + defer close(c) + time.Sleep(after) + }() + return c + } + + start := time.Now() + + c := NewChannel() + <-c.Or(sig(2*time.Hour), sig(5*time.Minute), sig(1*time.Second), sig(1*time.Hour), sig(1*time.Minute)) + + t.Logf("done after %v", time.Since(start)) + + assert.Equal(1, 1) +}