// Copyright 2021 dudaodong@gmail.com. All rights reserved. // Use of this source code is governed by MIT license // Package concurrency contain some functions to support concurrent programming. eg, goroutine, channel, async. package concurrency import ( "context" "sync" ) // Channel is a logic object which can generate or manipulate go channel // all methods of Channel are in the book tilted《Concurrency in Go》 type Channel struct { } // NewChannel return a Channel instance func NewChannel() *Channel { return &Channel{} } // Generate a data of type any chan, put param `values` into the chan func (c *Channel) Generate(ctx context.Context, values ...any) <-chan any { dataStream := make(chan any) go func() { defer close(dataStream) for _, v := range values { select { case <-ctx.Done(): return case dataStream <- v: } } }() return dataStream } // Repeat return a data of type any chan, put param `values` into the chan repeatly, // until close the `done` chan func (c *Channel) Repeat(ctx context.Context, values ...any) <-chan any { dataStream := make(chan any) go func() { defer close(dataStream) for { for _, v := range values { select { case <-ctx.Done(): return case dataStream <- v: } } } }() return dataStream } // RepeatFn return a chan, excutes fn repeatly, and put the result into retruned chan // until close the `done` channel func (c *Channel) RepeatFn(ctx context.Context, fn func() any) <-chan any { dataStream := make(chan any) go func() { defer close(dataStream) for { select { case <-ctx.Done(): return case dataStream <- fn(): } } }() return dataStream } // Take return a chan whose values are tahken from another chan func (c *Channel) Take(ctx context.Context, valueStream <-chan any, number int) <-chan any { takeStream := make(chan any) go func() { defer close(takeStream) for i := 0; i < number; i++ { select { case <-ctx.Done(): return case takeStream <- <-valueStream: } } }() return takeStream } // FanIn merge multiple channels into one channel func (c *Channel) FanIn(ctx context.Context, channels ...<-chan any) <-chan any { var wg sync.WaitGroup multiplexedStream := make(chan any) multiplex := func(c <-chan any) { defer wg.Done() for i := range c { select { case <-ctx.Done(): return case multiplexedStream <- i: } } } wg.Add(len(channels)) for _, c := range channels { go multiplex(c) } go func() { wg.Wait() close(multiplexedStream) }() return multiplexedStream } // Or merge one or more done channels into one done channel, which is closed when any done channel is closed func (c *Channel) Or(channels ...<-chan any) <-chan any { switch len(channels) { case 0: return nil case 1: return channels[0] } orDone := make(chan any) go func() { defer close(orDone) switch len(channels) { case 2: select { case <-channels[0]: case <-channels[1]: } default: select { case <-channels[0]: case <-channels[1]: case <-channels[2]: case <-c.Or(append(channels[3:], orDone)...): } } }() return orDone }