1
0
mirror of https://github.com/duke-git/lancet.git synced 2026-02-04 12:52:28 +08:00

feat: add func Bridge

This commit is contained in:
dudaodong
2022-04-21 11:26:02 +08:00
parent 922999037f
commit d1c6c57700
2 changed files with 60 additions and 0 deletions

View File

@@ -151,6 +151,37 @@ func (c *Channel) Tee(ctx context.Context, in <-chan any) (<-chan any, <-chan an
return out1, out2 return out1, out2
} }
// Bridge link mutiply channels into one channel
func (c *Channel) Bridge(ctx context.Context, chanStream <-chan <-chan any) <-chan any {
valStream := make(chan any)
go func() {
defer close(valStream)
for {
var stream <-chan any
select {
case maybeStream, ok := <-chanStream:
if ok == false {
return
}
stream = maybeStream
case <-ctx.Done():
return
}
for val := range c.OrDone(ctx, stream) {
select {
case valStream <- val:
case <-ctx.Done():
}
}
}
}()
return valStream
}
// Or merge one or more done channels into one done channel, which is closed when any done channel is closed // Or merge one or more done channels into one done channel, which is closed when any done channel is closed
func (c *Channel) Or(channels ...<-chan any) <-chan any { func (c *Channel) Or(channels ...<-chan any) <-chan any {
switch len(channels) { switch len(channels) {

View File

@@ -165,3 +165,32 @@ func TestTee(t *testing.T) {
assert.Equal(1, <-out2) assert.Equal(1, <-out2)
} }
} }
func TestBridge(t *testing.T) {
assert := internal.NewAssert(t, "TestBridge")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := NewChannel()
genVals := func() <-chan <-chan any {
chanStream := make(chan (<-chan any))
go func() {
defer close(chanStream)
for i := 0; i < 10; i++ {
stream := make(chan any, 1)
stream <- i
close(stream)
chanStream <- stream
}
}()
return chanStream
}
index := 0
for val := range c.Bridge(ctx, genVals()) {
// t.Logf("%v ", val) //0 1 2 3 4 5 6 7 8 9
assert.Equal(index, val)
index++
}
}