From cc54dd7ec983b0985b86fcde996f45c57f501c49 Mon Sep 17 00:00:00 2001 From: dudaodong Date: Sat, 31 Dec 2022 13:12:16 +0800 Subject: [PATCH] doc: add example and update docment for channel --- concurrency/channel.go | 88 +++++++------ concurrency/channel_example_test.go | 196 ++++++++++++++++++++++++++++ concurrency/channel_test.go | 52 +++----- docs/concurrency.md | 150 ++++++++++++--------- docs/concurrency_zh-CN.md | 164 +++++++++++++---------- 5 files changed, 442 insertions(+), 208 deletions(-) create mode 100644 concurrency/channel_example_test.go diff --git a/concurrency/channel.go b/concurrency/channel.go index 9f40dd3..c803f12 100644 --- a/concurrency/channel.go +++ b/concurrency/channel.go @@ -11,17 +11,18 @@ import ( // Channel is a logic object which can generate or manipulate go channel // all methods of Channel are in the book tilted《Concurrency in Go》 -type Channel struct { +type Channel[T any] struct { } // NewChannel return a Channel instance -func NewChannel() *Channel { - return &Channel{} +func NewChannel[T any]() *Channel[T] { + return &Channel[T]{} } -// Generate a data of type any chan, put param `values` into the chan -func (c *Channel) Generate(ctx context.Context, values ...any) <-chan any { - dataStream := make(chan any) +// Generate a data of type any channel, put param values into the channel. +// Play: +func (c *Channel[T]) Generate(ctx context.Context, values ...T) <-chan T { + dataStream := make(chan T) go func() { defer close(dataStream) @@ -38,9 +39,10 @@ func (c *Channel) Generate(ctx context.Context, values ...any) <-chan any { return dataStream } -// Repeat return a data of type any chan, put param `values` into the chan repeatly until cancel the context. -func (c *Channel) Repeat(ctx context.Context, values ...any) <-chan any { - dataStream := make(chan any) +// Repeat return a data of type any channel, put param `values` into the channel repeatly until cancel the context. +// Play: +func (c *Channel[T]) Repeat(ctx context.Context, values ...T) <-chan T { + dataStream := make(chan T) go func() { defer close(dataStream) @@ -57,10 +59,11 @@ func (c *Channel) Repeat(ctx context.Context, values ...any) <-chan any { return dataStream } -// RepeatFn return a chan, excutes fn repeatly, and put the result into retruned chan -// until close the `done` channel -func (c *Channel) RepeatFn(ctx context.Context, fn func() any) <-chan any { - dataStream := make(chan any) +// RepeatFn return a channel, excutes fn repeatly, and put the result into retruned channel +// until close the `done` channel. +// Play: +func (c *Channel[T]) RepeatFn(ctx context.Context, fn func() T) <-chan T { + dataStream := make(chan T) go func() { defer close(dataStream) @@ -75,9 +78,10 @@ func (c *Channel) RepeatFn(ctx context.Context, fn func() any) <-chan any { return dataStream } -// Take return a chan whose values are tahken from another chan -func (c *Channel) Take(ctx context.Context, valueStream <-chan any, number int) <-chan any { - takeStream := make(chan any) +// Take return a channel whose values are taken from another channel. +// Play: +func (c *Channel[T]) Take(ctx context.Context, valueStream <-chan T, number int) <-chan T { + takeStream := make(chan T) go func() { defer close(takeStream) @@ -94,16 +98,17 @@ func (c *Channel) Take(ctx context.Context, valueStream <-chan any, number int) return takeStream } -// FanIn merge multiple channels into one channel -func (c *Channel) FanIn(ctx context.Context, channels ...<-chan any) <-chan any { - out := make(chan any) +// FanIn merge multiple channels into one channel. +// Play: +func (c *Channel[T]) FanIn(ctx context.Context, channels ...<-chan T) <-chan T { + out := make(chan T) go func() { var wg sync.WaitGroup wg.Add(len(channels)) for _, c := range channels { - go func(c <-chan any) { + go func(c <-chan T) { defer wg.Done() for v := range c { select { @@ -121,10 +126,11 @@ func (c *Channel) FanIn(ctx context.Context, channels ...<-chan any) <-chan any return out } -// Tee split one chanel into two channels -func (c *Channel) Tee(ctx context.Context, in <-chan any) (<-chan any, <-chan any) { - out1 := make(chan any) - out2 := make(chan any) +// Tee split one chanel into two channels. +// Play: +func (c *Channel[T]) Tee(ctx context.Context, in <-chan T) (<-chan T, <-chan T) { + out1 := make(chan T) + out2 := make(chan T) go func() { defer close(out1) @@ -147,15 +153,16 @@ func (c *Channel) Tee(ctx context.Context, in <-chan any) (<-chan any, <-chan an return out1, out2 } -// Bridge link multiply channels into one channel -func (c *Channel) Bridge(ctx context.Context, chanStream <-chan <-chan any) <-chan any { - valStream := make(chan any) +// Bridge link multiply channels into one channel. +// Play: +func (c *Channel[T]) Bridge(ctx context.Context, chanStream <-chan <-chan T) <-chan T { + valStream := make(chan T) go func() { defer close(valStream) for { - var stream <-chan any + var stream <-chan T select { case maybeStream, ok := <-chanStream: if !ok { @@ -178,8 +185,9 @@ func (c *Channel) Bridge(ctx context.Context, chanStream <-chan <-chan any) <-ch return valStream } -// Or read one or more channels into one channel, will close when any readin channel is closed -func (c *Channel) Or(channels ...<-chan any) <-chan any { +// Or read one or more channels into one channel, will close when any readin channel is closed. +// Play: +func (c *Channel[T]) Or(channels ...<-chan T) <-chan T { switch len(channels) { case 0: return nil @@ -187,7 +195,7 @@ func (c *Channel) Or(channels ...<-chan any) <-chan any { return channels[0] } - orDone := make(chan any) + orDone := make(chan T) go func() { defer close(orDone) @@ -199,17 +207,12 @@ func (c *Channel) Or(channels ...<-chan any) <-chan any { case <-channels[1]: } default: - m := len(channels) / 2 select { - case <-c.Or(channels[:m]...): - case <-c.Or(channels[m:]...): + case <-channels[0]: + case <-channels[1]: + case <-channels[2]: + case <-c.Or(append(channels[3:], orDone)...): } - // select { - // case <-channels[0]: - // case <-channels[1]: - // case <-channels[2]: - // case <-c.Or(append(channels[3:], orDone)...): - // } } }() @@ -217,8 +220,9 @@ func (c *Channel) Or(channels ...<-chan any) <-chan any { } // OrDone read a channel into another channel, will close until cancel context. -func (c *Channel) OrDone(ctx context.Context, channel <-chan any) <-chan any { - valStream := make(chan any) +// Play: +func (c *Channel[T]) OrDone(ctx context.Context, channel <-chan T) <-chan T { + valStream := make(chan T) go func() { defer close(valStream) diff --git a/concurrency/channel_example_test.go b/concurrency/channel_example_test.go new file mode 100644 index 0000000..9d4686a --- /dev/null +++ b/concurrency/channel_example_test.go @@ -0,0 +1,196 @@ +package concurrency + +import ( + "context" + "fmt" + "time" +) + +func ExampleChannel_Generate() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := NewChannel[int]() + intStream := c.Generate(ctx, 1, 2, 3) + + fmt.Println(<-intStream) + fmt.Println(<-intStream) + fmt.Println(<-intStream) + + // Output: + // 1 + // 2 + // 3 +} + +func ExampleChannel_Repeat() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := NewChannel[int]() + intStream := c.Take(ctx, c.Repeat(ctx, 1, 2), 4) + + for v := range intStream { + fmt.Println(v) + } + // Output: + // 1 + // 2 + // 1 + // 2 +} + +func ExampleChannel_RepeatFn() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fn := func() string { + return "hello" + } + + c := NewChannel[string]() + intStream := c.Take(ctx, c.RepeatFn(ctx, fn), 3) + + for v := range intStream { + fmt.Println(v) + } + // Output: + // hello + // hello + // hello +} + +func ExampleChannel_Take() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + numbers := make(chan int, 5) + numbers <- 1 + numbers <- 2 + numbers <- 3 + numbers <- 4 + numbers <- 5 + defer close(numbers) + + c := NewChannel[int]() + intStream := c.Take(ctx, numbers, 3) + + for v := range intStream { + fmt.Println(v) + } + // Output: + // 1 + // 2 + // 3 +} + +func ExampleChannel_FanIn() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := NewChannel[int]() + channels := make([]<-chan int, 2) + + for i := 0; i < 2; i++ { + channels[i] = c.Take(ctx, c.Repeat(ctx, i), 2) + } + + chs := c.FanIn(ctx, channels...) + + for v := range chs { + fmt.Println(v) //1 1 0 0 or 0 0 1 1 + } + +} + +func ExampleChannel_Or() { + sig := func(after time.Duration) <-chan any { + c := make(chan any) + go func() { + defer close(c) + time.Sleep(after) + }() + return c + } + + start := time.Now() + + c := NewChannel[any]() + <-c.Or( + sig(1*time.Second), + sig(2*time.Second), + sig(3*time.Second), + ) + + if time.Since(start).Seconds() < 2 { + fmt.Println("ok") + } + // Output: + // ok +} + +func ExampleChannel_OrDone() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := NewChannel[int]() + intStream := c.Take(ctx, c.Repeat(ctx, 1), 3) + + for v := range c.OrDone(ctx, intStream) { + fmt.Println(v) + } + // Output: + // 1 + // 1 + // 1 +} + +func ExampleChannel_Tee() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := NewChannel[int]() + intStream := c.Take(ctx, c.Repeat(ctx, 1), 2) + + ch1, ch2 := c.Tee(ctx, intStream) + + for v := range ch1 { + fmt.Println(v) + fmt.Println(<-ch2) + } + // Output: + // 1 + // 1 + // 1 + // 1 +} + +func ExampleChannel_Bridge() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := NewChannel[int]() + genVals := func() <-chan <-chan int { + out := make(chan (<-chan int)) + go func() { + defer close(out) + for i := 1; i <= 5; i++ { + stream := make(chan int, 1) + stream <- i + close(stream) + out <- stream + } + }() + return out + } + + for v := range c.Bridge(ctx, genVals()) { + fmt.Println(v) + } + // Output: + // 1 + // 2 + // 3 + // 4 + // 5 +} diff --git a/concurrency/channel_test.go b/concurrency/channel_test.go index e6e073f..cc6c56d 100644 --- a/concurrency/channel_test.go +++ b/concurrency/channel_test.go @@ -14,12 +14,9 @@ func TestGenerate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := NewChannel() + c := NewChannel[int]() intStream := c.Generate(ctx, 1, 2, 3) - // for v := range intStream { - // t.Log(v) //1, 2, 3 - // } assert.Equal(1, <-intStream) assert.Equal(2, <-intStream) assert.Equal(3, <-intStream) @@ -31,12 +28,9 @@ func TestRepeat(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := NewChannel() + c := NewChannel[int]() intStream := c.Take(ctx, c.Repeat(ctx, 1, 2), 5) - // for v := range intStream { - // t.Log(v) //1, 2, 1, 2, 1 - // } assert.Equal(1, <-intStream) assert.Equal(2, <-intStream) assert.Equal(1, <-intStream) @@ -50,17 +44,13 @@ func TestRepeatFn(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - fn := func() any { + fn := func() string { s := "a" return s } - c := NewChannel() + c := NewChannel[string]() dataStream := c.Take(ctx, c.RepeatFn(ctx, fn), 3) - // for v := range dataStream { - // t.Log(v) //a, a, a - // } - assert.Equal("a", <-dataStream) assert.Equal("a", <-dataStream) assert.Equal("a", <-dataStream) @@ -72,7 +62,7 @@ func TestTake(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - numbers := make(chan any, 5) + numbers := make(chan int, 5) numbers <- 1 numbers <- 2 numbers <- 3 @@ -80,7 +70,7 @@ func TestTake(t *testing.T) { numbers <- 5 defer close(numbers) - c := NewChannel() + c := NewChannel[int]() intStream := c.Take(ctx, numbers, 3) assert.Equal(1, <-intStream) @@ -94,8 +84,8 @@ func TestFanIn(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := NewChannel() - channels := make([]<-chan any, 3) + c := NewChannel[int]() + channels := make([]<-chan int, 3) for i := 0; i < 3; i++ { channels[i] = c.Take(ctx, c.Repeat(ctx, i), 3) @@ -124,7 +114,7 @@ func TestOr(t *testing.T) { start := time.Now() - c := NewChannel() + c := NewChannel[any]() <-c.Or( sig(1*time.Second), sig(2*time.Second), @@ -133,9 +123,7 @@ func TestOr(t *testing.T) { sig(5*time.Second), ) - t.Logf("done after %v", time.Since(start)) - - assert.Equal(1, 1) + assert.Equal(true, time.Since(start).Seconds() < 2) } func TestOrDone(t *testing.T) { @@ -144,16 +132,12 @@ func TestOrDone(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := NewChannel() + c := NewChannel[int]() intStream := c.Take(ctx, c.Repeat(ctx, 1), 3) - var res any for val := range c.OrDone(ctx, intStream) { - t.Logf("%v", val) - res = val + assert.Equal(1, val) } - - assert.Equal(1, res) } func TestTee(t *testing.T) { @@ -162,15 +146,13 @@ func TestTee(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := NewChannel() + c := NewChannel[int]() inStream := c.Take(ctx, c.Repeat(ctx, 1), 4) out1, out2 := c.Tee(ctx, inStream) for val := range out1 { val1 := val val2 := <-out2 - // t.Log("val1 is", val1) - // t.Log("val2 is", val2) assert.Equal(1, val1) assert.Equal(1, val2) } @@ -182,13 +164,13 @@ func TestBridge(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := NewChannel() - genVals := func() <-chan <-chan any { - chanStream := make(chan (<-chan any)) + c := NewChannel[int]() + genVals := func() <-chan <-chan int { + chanStream := make(chan (<-chan int)) go func() { defer close(chanStream) for i := 0; i < 10; i++ { - stream := make(chan any, 1) + stream := make(chan int, 1) stream <- i close(stream) chanStream <- stream diff --git a/docs/concurrency.md b/docs/concurrency.md index 18b5abf..724c9e1 100644 --- a/docs/concurrency.md +++ b/docs/concurrency.md @@ -38,13 +38,13 @@ import ( ## Channel ### NewChannel -

return a Channel pointer instance.

+

Create a Channel pointer instance.

Signature: ```go -type Channel struct {} -func NewChannel() *Channel +type Channel[T any] struct +func NewChannel[T any]() *Channel[T] ``` Example: @@ -57,7 +57,7 @@ import ( ) func main() { - c := concurrency.NewChannel() + c := concurrency.NewChannel[int]() } ``` @@ -70,7 +70,7 @@ func main() { Signature: ```go -func (c *Channel) Bridge(ctx context.Context, chanStream <-chan <-chan any) <-chan any +func (c *Channel[T]) Bridge(ctx context.Context, chanStream <-chan <-chan T) <-chan T ``` Example: @@ -87,25 +87,30 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := concurrency.NewChannel() - genVals := func() <-chan <-chan any { - chanStream := make(chan (<-chan any)) + c := concurrency.NewChannel[int]() + genVals := func() <-chan <-chan int { + out := make(chan (<-chan int)) go func() { - defer close(chanStream) - for i := 0; i < 10; i++ { - stream := make(chan any, 1) + defer close(out) + for i := 1; i <= 5; i++ { + stream := make(chan int, 1) stream <- i close(stream) - chanStream <- stream + out <- stream } }() - return chanStream + return out } - index := 0 - for val := range c.Bridge(ctx, genVals()) { - fmt.Printf("%v ", val) //0 1 2 3 4 5 6 7 8 9 + for v := range c.Bridge(ctx, genVals()) { + fmt.Println(v) } + // Output: + // 1 + // 2 + // 3 + // 4 + // 5 } ``` @@ -114,12 +119,12 @@ func main() { ### FanIn -

merge multiple channels into one channel until cancel the context.

+

Merge multiple channels into one channel until cancel the context.

Signature: ```go -func (c *Channel) FanIn(ctx context.Context, channels ...<-chan any) <-chan any +func (c *Channel[T]) FanIn(ctx context.Context, channels ...<-chan T) <-chan T ``` Example: @@ -136,17 +141,17 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := concurrency.NewChannel() - channels := make([]<-chan any, 3) + c := concurrency.NewChannel[int]() + channels := make([]<-chan int, 2) - for i := 0; i < 3; i++ { - channels[i] = c.Take(ctx, c.Repeat(ctx, i), 3) + for i := 0; i < 2; i++ { + channels[i] = c.Take(ctx, c.Repeat(ctx, i), 2) } - mergedChannel := c.FanIn(ctx, channels...) + chs := c.FanIn(ctx, channels...) - for val := range mergedChannel { - fmt.Println("\t%d\n", val) //1,2,1,0,0,1,0,2,2 (order not for sure) + for v := range chs { + fmt.Println(v) //1 1 0 0 or 0 0 1 1 } } ``` @@ -154,12 +159,12 @@ func main() { ### Repeat -

Return a chan, put param `values` into the chan repeatly until cancel the context.

+

Return a channel, put param `values` into the channel repeatly until cancel the context.

Signature: ```go -func (c *Channel) Repeat(ctx context.Context, values ...any) <-chan any +func (c *Channel[T]) Repeat(ctx context.Context, values ...T) <-chan T ``` Example: @@ -176,12 +181,17 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := concurrency.NewChannel() - intStream := c.Take(ctx, c.Repeat(ctx, 1, 2), 5) + c := concurrency.NewChannel[int]() + intStream := c.Take(ctx, c.Repeat(ctx, 1, 2), 4) for v := range intStream { - fmt.Println(v) //1, 2, 1, 2, 1 + fmt.Println(v) } + // Output: + // 1 + // 2 + // 1 + // 2 } ``` @@ -190,12 +200,12 @@ func main() { ### RepeatFn -

Return a chan, excutes fn repeatly, and put the result into retruned chan until cancel context.

+

Return a channel, excutes fn repeatly, and put the result into retruned channel until cancel context.

Signature: ```go -func (c *Channel) RepeatFn(ctx context.Context, fn func() any) <-chan any +func (c *Channel[T]) RepeatFn(ctx context.Context, fn func() T) <-chan T ``` Example: @@ -212,16 +222,20 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - fn := func() any { - s := "a" - return s + fn := func() string { + return "hello" } - c := concurrency.NewChannel() - dataStream := c.Take(ctx, c.RepeatFn(ctx, fn), 3) - for v := range dataStream { - fmt.Println(v) //a, a, a + c := concurrency.NewChannel[string]() + intStream := c.Take(ctx, c.RepeatFn(ctx, fn), 3) + + for v := range intStream { + fmt.Println(v) } + // Output: + // hello + // hello + // hello } ``` @@ -234,7 +248,7 @@ func main() { Signature: ```go -func (c *Channel) Or(channels ...<-chan any) <-chan any +func (c *Channel[T]) Or(channels ...<-chan T) <-chan T ``` Example: @@ -249,7 +263,7 @@ import ( func main() { sig := func(after time.Duration) <-chan any { - c := make(chan interface{}) + c := make(chan any) go func() { defer close(c) time.Sleep(after) @@ -259,13 +273,11 @@ func main() { start := time.Now() - c := concurrency.NewChannel() + c := concurrency.NewChannel[any]() <-c.Or( sig(1*time.Second), sig(2*time.Second), sig(3*time.Second), - sig(4*time.Second), - sig(5*time.Second), ) fmt.Println("done after %v", time.Since(start)) //1.003s @@ -282,7 +294,7 @@ func main() { Signature: ```go -func (c *Channel) OrDone(ctx context.Context, channel <-chan any) <-chan any +func (c *Channel[T]) OrDone(ctx context.Context, channel <-chan T) <-chan T ``` Example: @@ -299,12 +311,16 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := concurrency.NewChannel() + c := concurrency.NewChannel[int]() intStream := c.Take(ctx, c.Repeat(ctx, 1), 3) - for val := range c.OrDone(ctx, intStream) { - fmt.Println(val) //1 + for v := range c.OrDone(ctx, intStream) { + fmt.Println(v) } + // Output: + // 1 + // 1 + // 1 } ``` @@ -313,12 +329,12 @@ func main() { ### Take -

Return a chan whose values are tahken from another chan until cancel context.

+

Return a channel whose values are tahken from another channel until cancel context.

Signature: ```go -func (c *Channel) Take(ctx context.Context, valueStream <-chan any, number int) <-chan any +func (c *Channel[T]) Take(ctx context.Context, valueStream <-chan T, number int) <-chan T ``` Example: @@ -335,7 +351,7 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - numbers := make(chan any, 5) + numbers := make(chan int, 5) numbers <- 1 numbers <- 2 numbers <- 3 @@ -343,12 +359,16 @@ func main() { numbers <- 5 defer close(numbers) - c := concurrency.NewChannel() + c := concurrency.NewChannel[int]() intStream := c.Take(ctx, numbers, 3) - for val := range intStream { - fmt.Println(val) //1, 2, 3 + for v := range intStream { + fmt.Println(v) } + // Output: + // 1 + // 2 + // 3 } ``` @@ -356,12 +376,12 @@ func main() { ### Tee -

Split one chanel into two channels until cancel context.

+

Split one channel into two channels until cancel context.

Signature: ```go -func (c *Channel) Tee(ctx context.Context, in <-chan any) (<-chan any, <-chan any) +func (c *Channel[T]) Tee(ctx context.Context, in <-chan T) (<-chan T, <-chan T) ``` Example: @@ -378,13 +398,19 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := concurrency.NewChannel() - inStream := c.Take(ctx, c.Repeat(ctx, 1, 2), 4) + c := concurrency.NewChannel[int]() + intStream := c.Take(ctx, c.Repeat(ctx, 1), 2) - out1, out2 := c.Tee(ctx, inStream) - for val := range out1 { - fmt.Println(val) //1 - fmt.Println(<-out2) //1 + ch1, ch2 := c.Tee(ctx, intStream) + + for v := range ch1 { + fmt.Println(v) + fmt.Println(<-ch2) } + // Output: + // 1 + // 1 + // 1 + // 1 } ``` \ No newline at end of file diff --git a/docs/concurrency_zh-CN.md b/docs/concurrency_zh-CN.md index 7331bf9..941d40e 100644 --- a/docs/concurrency_zh-CN.md +++ b/docs/concurrency_zh-CN.md @@ -38,13 +38,13 @@ import ( ### Channel ### NewChannel -

返回一个 Channel 指针实例

+

返回一个Channel指针实例

函数签名: ```go -type Channel struct {} -func NewChannel() *Channel +type Channel[T any] struct +func NewChannel[T any]() *Channel[T] ``` 例子: @@ -57,7 +57,7 @@ import ( ) func main() { - c := concurrency.NewChannel() + c := concurrency.NewChannel[int]() } ``` @@ -65,12 +65,12 @@ func main() { ### Bridge -

将多个通道链接到一个通道,直到取消上下文。

+

将多个channel链接到一个channel,直到取消上下文。

函数签名: ```go -func (c *Channel) Bridge(ctx context.Context, chanStream <-chan <-chan any) <-chan any +func (c *Channel[T]) Bridge(ctx context.Context, chanStream <-chan <-chan T) <-chan T ``` 例子: @@ -84,28 +84,33 @@ import ( ) func main() { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := concurrency.NewChannel() - genVals := func() <-chan <-chan any { - chanStream := make(chan (<-chan any)) + c := concurrency.NewChannel[int]() + genVals := func() <-chan <-chan int { + out := make(chan (<-chan int)) go func() { - defer close(chanStream) - for i := 0; i < 10; i++ { - stream := make(chan any, 1) + defer close(out) + for i := 1; i <= 5; i++ { + stream := make(chan int, 1) stream <- i close(stream) - chanStream <- stream + out <- stream } }() - return chanStream + return out } - index := 0 - for val := range c.Bridge(ctx, genVals()) { - fmt.Printf("%v ", val) //0 1 2 3 4 5 6 7 8 9 + for v := range c.Bridge(ctx, genVals()) { + fmt.Println(v) } + // Output: + // 1 + // 2 + // 3 + // 4 + // 5 } ``` @@ -114,12 +119,12 @@ func main() { ### FanIn -

将多个通道合并为一个通道,直到取消上下文

+

将多个channel合并为一个channel,直到取消上下文。

函数签名: ```go -func (c *Channel) FanIn(ctx context.Context, channels ...<-chan any) <-chan any +func (c *Channel[T]) FanIn(ctx context.Context, channels ...<-chan T) <-chan T ``` 例子: @@ -133,20 +138,20 @@ import ( ) func main() { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := concurrency.NewChannel() - channels := make([]<-chan any, 3) + c := concurrency.NewChannel[int]() + channels := make([]<-chan int, 2) - for i := 0; i < 3; i++ { - channels[i] = c.Take(ctx, c.Repeat(ctx, i), 3) + for i := 0; i < 2; i++ { + channels[i] = c.Take(ctx, c.Repeat(ctx, i), 2) } - mergedChannel := c.FanIn(ctx, channels...) + chs := c.FanIn(ctx, channels...) - for val := range mergedChannel { - fmt.Println("\t%d\n", val) //1,2,1,0,0,1,0,2,2 (order not for sure) + for v := range chs { + fmt.Println(v) //1 1 0 0 or 0 0 1 1 } } ``` @@ -154,12 +159,12 @@ func main() { ### Repeat -

返回一个chan,将参数`values`重复放入chan,直到取消上下文。

+

返回一个channel,将参数`values`重复放入channel,直到取消上下文。

函数签名: ```go -func (c *Channel) Repeat(ctx context.Context, values ...any) <-chan any +func (c *Channel[T]) Repeat(ctx context.Context, values ...T) <-chan T ``` 例子: @@ -173,15 +178,20 @@ import ( ) func main() { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := concurrency.NewChannel() - intStream := c.Take(ctx, c.Repeat(ctx, 1, 2), 5) + c := concurrency.NewChannel[int]() + intStream := c.Take(ctx, c.Repeat(ctx, 1, 2), 4) for v := range intStream { - fmt.Println(v) //1, 2, 1, 2, 1 + fmt.Println(v) } + // Output: + // 1 + // 2 + // 1 + // 2 } ``` @@ -190,12 +200,12 @@ func main() { ### RepeatFn -

返回一个chan,重复执行函数fn,并将结果放入返回的chan,直到取消上下文。

+

返回一个channel,重复执行函数fn,并将结果放入返回的chan,直到取消上下文。

函数签名: ```go -func (c *Channel) RepeatFn(ctx context.Context, fn func() any) <-chan any +func (c *Channel[T]) RepeatFn(ctx context.Context, fn func() T) <-chan T ``` 例子: @@ -209,19 +219,23 @@ import ( ) func main() { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - fn := func() any { - s := "a" - return s + fn := func() string { + return "hello" } - c := concurrency.NewChannel() - dataStream := c.Take(ctx, c.RepeatFn(ctx, fn), 3) - for v := range dataStream { - fmt.Println(v) //a, a, a + c := concurrency.NewChannel[string]() + intStream := c.Take(ctx, c.RepeatFn(ctx, fn), 3) + + for v := range intStream { + fmt.Println(v) } + // Output: + // hello + // hello + // hello } ``` @@ -229,12 +243,12 @@ func main() { ### Or -

将一个或多个通道读取到一个通道中,当任何读取通道关闭时将结束读取。

+

将一个或多个channel读取到一个channel中,当任何读取channel关闭时将结束读取。

函数签名: ```go -func (c *Channel) Or(channels ...<-chan any) <-chan any +func (c *Channel[T]) Or(channels ...<-chan T) <-chan T ``` 例子: @@ -249,7 +263,7 @@ import ( func main() { sig := func(after time.Duration) <-chan any { - c := make(chan interface{}) + c := make(chan any) go func() { defer close(c) time.Sleep(after) @@ -259,13 +273,11 @@ func main() { start := time.Now() - c := concurrency.NewChannel() + c := concurrency.NewChannel[any]() <-c.Or( sig(1*time.Second), sig(2*time.Second), sig(3*time.Second), - sig(4*time.Second), - sig(5*time.Second), ) fmt.Println("done after %v", time.Since(start)) //1.003s @@ -277,12 +289,12 @@ func main() { ### OrDone -

将一个通道读入另一个通道,直到取消上下文。

+

将一个channel读入另一个channel,直到取消上下文。.

函数签名: ```go -func (c *Channel) OrDone(ctx context.Context, channel <-chan any) <-chan any +func (c *Channel[T]) OrDone(ctx context.Context, channel <-chan T) <-chan T ``` 例子: @@ -299,12 +311,16 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := concurrency.NewChannel() + c := concurrency.NewChannel[int]() intStream := c.Take(ctx, c.Repeat(ctx, 1), 3) - for val := range c.OrDone(ctx, intStream) { - fmt.Println(val) //1 + for v := range c.OrDone(ctx, intStream) { + fmt.Println(v) } + // Output: + // 1 + // 1 + // 1 } ``` @@ -313,12 +329,12 @@ func main() { ### Take -

返回一个chan,其值从另一个chan获取,直到取消上下文。

+

返回一个channel,其值从另一个channel获取,直到取消上下文。

函数签名: ```go -func (c *Channel) Take(ctx context.Context, valueStream <-chan any, number int) <-chan any +func (c *Channel[T]) Take(ctx context.Context, valueStream <-chan T, number int) <-chan T ``` 例子: @@ -335,7 +351,7 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - numbers := make(chan any, 5) + numbers := make(chan int, 5) numbers <- 1 numbers <- 2 numbers <- 3 @@ -343,12 +359,16 @@ func main() { numbers <- 5 defer close(numbers) - c := concurrency.NewChannel() + c := concurrency.NewChannel[int]() intStream := c.Take(ctx, numbers, 3) - for val := range intStream { - fmt.Println(val) //1, 2, 3 + for v := range intStream { + fmt.Println(v) } + // Output: + // 1 + // 2 + // 3 } ``` @@ -356,12 +376,12 @@ func main() { ### Tee -

将一个通道分成两个通道,直到取消上下文。

+

将一个channel分成两个channel,直到取消上下文。

函数签名: ```go -func (c *Channel) Tee(ctx context.Context, in <-chan any) (<-chan any, <-chan any) +func (c *Channel[T]) Tee(ctx context.Context, in <-chan T) (<-chan T, <-chan T) ``` 例子: @@ -378,13 +398,19 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := concurrency.NewChannel() - inStream := c.Take(ctx, c.Repeat(ctx, 1, 2), 4) + c := concurrency.NewChannel[int]() + intStream := c.Take(ctx, c.Repeat(ctx, 1), 2) - out1, out2 := c.Tee(ctx, inStream) - for val := range out1 { - fmt.Println(val) //1 - fmt.Println(<-out2) //1 + ch1, ch2 := c.Tee(ctx, intStream) + + for v := range ch1 { + fmt.Println(v) + fmt.Println(<-ch2) } + // Output: + // 1 + // 1 + // 1 + // 1 } ``` \ No newline at end of file