diff --git a/concurrency/channel.go b/concurrency/channel.go index d9d592f..b0e71ac 100644 --- a/concurrency/channel.go +++ b/concurrency/channel.go @@ -53,6 +53,24 @@ func (c *Channel) Repeat(done <-chan any, values ...any) <-chan any { return dataStream } +// RepeatFn return a chan, excutes fn repeatly, and put the result into retruned chan +// until close the `done` channel +func (c *Channel) RepeatFn(done <-chan any, fn func() any) <-chan any { + dataStream := make(chan any) + + go func() { + defer close(dataStream) + for { + select { + case <-done: + return + case dataStream <- fn(): + } + } + }() + return dataStream +} + // Take return a chan whose values are tahken from another chan func (c *Channel) Take(done <-chan any, valueStream <-chan any, number int) <-chan any { takeStream := make(chan any) diff --git a/concurrency/channel_test.go b/concurrency/channel_test.go index 6ee33bd..289e07d 100644 --- a/concurrency/channel_test.go +++ b/concurrency/channel_test.go @@ -42,6 +42,28 @@ func TestRepeat(t *testing.T) { assert.Equal(1, <-intStream) } +func TestRepeatFn(t *testing.T) { + assert := internal.NewAssert(t, "TestRepeatFn") + + done := make(chan any) + defer close(done) + + fn := func() any { + s := "a" + return s + } + c := NewChannel() + dataStream := c.Take(done, c.RepeatFn(done, fn), 3) + + // for v := range dataStream { + // t.Log(v) //a, a, a + // } + + assert.Equal("a", <-dataStream) + assert.Equal("a", <-dataStream) + assert.Equal("a", <-dataStream) +} + func TestTake(t *testing.T) { assert := internal.NewAssert(t, "TestRepeat")