1
0
mirror of https://github.com/duke-git/lancet.git synced 2026-03-01 00:35:28 +08:00

refactor: refact FanIn func in channel.go

This commit is contained in:
dudaodong
2022-04-21 14:31:43 +08:00
parent 9444582e44
commit 9f1c89bf0e
+11 -14
View File
@@ -97,32 +97,29 @@ func (c *Channel) Take(ctx context.Context, valueStream <-chan any, number int)
// FanIn merge multiple channels into one channel // FanIn merge multiple channels into one channel
func (c *Channel) FanIn(ctx context.Context, channels ...<-chan any) <-chan any { func (c *Channel) FanIn(ctx context.Context, channels ...<-chan any) <-chan any {
out := make(chan any)
go func() {
var wg sync.WaitGroup var wg sync.WaitGroup
multiplexedStream := make(chan any) wg.Add(len(channels))
multiplex := func(c <-chan any) { for _, c := range channels {
go func(c <-chan any) {
defer wg.Done() defer wg.Done()
for v := range c {
for i := range c {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case multiplexedStream <- i: case out <- v:
} }
} }
}(c)
} }
wg.Add(len(channels))
for _, c := range channels {
go multiplex(c)
}
go func() {
wg.Wait() wg.Wait()
close(multiplexedStream) close(out)
}() }()
return multiplexedStream return out
} }
// Tee split one chanel into two channels // Tee split one chanel into two channels