diff --git a/concurrency/channel.go b/concurrency/channel.go index debca1f..58bffdf 100644 --- a/concurrency/channel.go +++ b/concurrency/channel.go @@ -151,6 +151,37 @@ func (c *Channel) Tee(ctx context.Context, in <-chan any) (<-chan any, <-chan an return out1, out2 } +// Bridge link mutiply channels into one channel +func (c *Channel) Bridge(ctx context.Context, chanStream <-chan <-chan any) <-chan any { + valStream := make(chan any) + + go func() { + defer close(valStream) + + for { + var stream <-chan any + select { + case maybeStream, ok := <-chanStream: + if ok == false { + return + } + stream = maybeStream + case <-ctx.Done(): + return + } + + for val := range c.OrDone(ctx, stream) { + select { + case valStream <- val: + case <-ctx.Done(): + } + } + } + }() + + return valStream +} + // Or merge one or more done channels into one done channel, which is closed when any done channel is closed func (c *Channel) Or(channels ...<-chan any) <-chan any { switch len(channels) { diff --git a/concurrency/channel_test.go b/concurrency/channel_test.go index 70870ab..26616ed 100644 --- a/concurrency/channel_test.go +++ b/concurrency/channel_test.go @@ -165,3 +165,32 @@ func TestTee(t *testing.T) { assert.Equal(1, <-out2) } } + +func TestBridge(t *testing.T) { + assert := internal.NewAssert(t, "TestBridge") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := NewChannel() + genVals := func() <-chan <-chan any { + chanStream := make(chan (<-chan any)) + go func() { + defer close(chanStream) + for i := 0; i < 10; i++ { + stream := make(chan any, 1) + stream <- i + close(stream) + chanStream <- stream + } + }() + return chanStream + } + + index := 0 + for val := range c.Bridge(ctx, genVals()) { + // t.Logf("%v ", val) //0 1 2 3 4 5 6 7 8 9 + assert.Equal(index, val) + index++ + } +}