1
0
mirror of https://github.com/duke-git/lancet.git synced 2026-03-01 00:35:28 +08:00

feat: add func Tee

This commit is contained in:
dudaodong
2022-04-21 11:18:45 +08:00
parent 046e90486d
commit 922999037f
2 changed files with 42 additions and 0 deletions

View File

@@ -125,6 +125,32 @@ func (c *Channel) FanIn(ctx context.Context, channels ...<-chan any) <-chan any
return multiplexedStream
}
// Tee split one chanel into two channels
func (c *Channel) Tee(ctx context.Context, in <-chan any) (<-chan any, <-chan any) {
out1 := make(chan any)
out2 := make(chan any)
go func() {
defer close(out1)
defer close(out2)
for val := range c.OrDone(ctx, in) {
var out1, out2 = out1, out2
for i := 0; i < 2; i++ {
select {
case <-ctx.Done():
case out1 <- val:
out1 = nil
case out2 <- val:
out2 = nil
}
}
}
}()
return out1, out2
}
// Or merge one or more done channels into one done channel, which is closed when any done channel is closed
func (c *Channel) Or(channels ...<-chan any) <-chan any {
switch len(channels) {