From d1c6c577006b7af3e9d7ee41393c79de9d1e00b4 Mon Sep 17 00:00:00 2001 From: dudaodong Date: Thu, 21 Apr 2022 11:26:02 +0800 Subject: [PATCH] feat: add func Bridge --- concurrency/channel.go | 31 +++++++++++++++++++++++++++++++ concurrency/channel_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/concurrency/channel.go b/concurrency/channel.go index debca1f..58bffdf 100644 --- a/concurrency/channel.go +++ b/concurrency/channel.go @@ -151,6 +151,37 @@ func (c *Channel) Tee(ctx context.Context, in <-chan any) (<-chan any, <-chan an return out1, out2 } +// Bridge link mutiply channels into one channel +func (c *Channel) Bridge(ctx context.Context, chanStream <-chan <-chan any) <-chan any { + valStream := make(chan any) + + go func() { + defer close(valStream) + + for { + var stream <-chan any + select { + case maybeStream, ok := <-chanStream: + if ok == false { + return + } + stream = maybeStream + case <-ctx.Done(): + return + } + + for val := range c.OrDone(ctx, stream) { + select { + case valStream <- val: + case <-ctx.Done(): + } + } + } + }() + + return valStream +} + // Or merge one or more done channels into one done channel, which is closed when any done channel is closed func (c *Channel) Or(channels ...<-chan any) <-chan any { switch len(channels) { diff --git a/concurrency/channel_test.go b/concurrency/channel_test.go index 70870ab..26616ed 100644 --- a/concurrency/channel_test.go +++ b/concurrency/channel_test.go @@ -165,3 +165,32 @@ func TestTee(t *testing.T) { assert.Equal(1, <-out2) } } + +func TestBridge(t *testing.T) { + assert := internal.NewAssert(t, "TestBridge") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := NewChannel() + genVals := func() <-chan <-chan any { + chanStream := make(chan (<-chan any)) + go func() { + defer close(chanStream) + for i := 0; i < 10; i++ { + stream := make(chan any, 1) + stream <- i + close(stream) + chanStream <- stream + } + }() + return chanStream + } + + index := 0 + for val := range c.Bridge(ctx, genVals()) { + // t.Logf("%v ", val) //0 1 2 3 4 5 6 7 8 9 + assert.Equal(index, val) + index++ + } +}