From 9f1c89bf0e1a1ada7e6fddb3469f91817c0b4043 Mon Sep 17 00:00:00 2001 From: dudaodong Date: Thu, 21 Apr 2022 14:31:43 +0800 Subject: [PATCH] refactor: refact FanIn func in channel.go --- concurrency/channel.go | 39 ++++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/concurrency/channel.go b/concurrency/channel.go index 18aa9fb..cd9f6c9 100644 --- a/concurrency/channel.go +++ b/concurrency/channel.go @@ -97,32 +97,29 @@ func (c *Channel) Take(ctx context.Context, valueStream <-chan any, number int) // FanIn merge multiple channels into one channel func (c *Channel) FanIn(ctx context.Context, channels ...<-chan any) <-chan any { - var wg sync.WaitGroup - multiplexedStream := make(chan any) - - multiplex := func(c <-chan any) { - defer wg.Done() - - for i := range c { - select { - case <-ctx.Done(): - return - case multiplexedStream <- i: - } - } - } - - wg.Add(len(channels)) - for _, c := range channels { - go multiplex(c) - } + out := make(chan any) go func() { + var wg sync.WaitGroup + wg.Add(len(channels)) + + for _, c := range channels { + go func(c <-chan any) { + defer wg.Done() + for v := range c { + select { + case <-ctx.Done(): + return + case out <- v: + } + } + }(c) + } wg.Wait() - close(multiplexedStream) + close(out) }() - return multiplexedStream + return out } // Tee split one chanel into two channels