diff --git a/concurrency/channel.go b/concurrency/channel.go index 092bece..debca1f 100644 --- a/concurrency/channel.go +++ b/concurrency/channel.go @@ -125,6 +125,32 @@ func (c *Channel) FanIn(ctx context.Context, channels ...<-chan any) <-chan any return multiplexedStream } +// Tee split one chanel into two channels +func (c *Channel) Tee(ctx context.Context, in <-chan any) (<-chan any, <-chan any) { + out1 := make(chan any) + out2 := make(chan any) + + go func() { + defer close(out1) + defer close(out2) + + for val := range c.OrDone(ctx, in) { + var out1, out2 = out1, out2 + for i := 0; i < 2; i++ { + select { + case <-ctx.Done(): + case out1 <- val: + out1 = nil + case out2 <- val: + out2 = nil + } + } + } + }() + + return out1, out2 +} + // Or merge one or more done channels into one done channel, which is closed when any done channel is closed func (c *Channel) Or(channels ...<-chan any) <-chan any { switch len(channels) { diff --git a/concurrency/channel_test.go b/concurrency/channel_test.go index 22b81d1..70870ab 100644 --- a/concurrency/channel_test.go +++ b/concurrency/channel_test.go @@ -149,3 +149,19 @@ func TestOrDone(t *testing.T) { assert.Equal(1, res) } + +func TestTee(t *testing.T) { + assert := internal.NewAssert(t, "TestTee") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := NewChannel() + inStream := c.Take(ctx, c.Repeat(ctx, 1, 2), 4) + + out1, out2 := c.Tee(ctx, inStream) + for val := range out1 { + assert.Equal(1, val) + assert.Equal(1, <-out2) + } +}