1
0
mirror of https://github.com/duke-git/lancet.git synced 2026-03-01 00:35:28 +08:00

make Bridge not block in the first stream that not closed (#288)

* not block in the first channel

* make Bridge not block in the first stream that not closed

* Bridge with test
This commit is contained in:
Tuuuuuuuuu
2025-01-14 16:19:45 +08:00
committed by GitHub
parent a0d97cf38e
commit e27df00fa8
3 changed files with 31 additions and 19 deletions

View File

@@ -157,10 +157,10 @@ func (c *Channel[T]) Tee(ctx context.Context, in <-chan T) (<-chan T, <-chan T)
// Play: https://go.dev/play/p/qmWSy1NVF-Y
func (c *Channel[T]) Bridge(ctx context.Context, chanStream <-chan <-chan T) <-chan T {
valStream := make(chan T)
go func() {
defer close(valStream)
wg := sync.WaitGroup{}
defer wg.Wait()
for {
var stream <-chan T
select {
@@ -169,19 +169,22 @@ func (c *Channel[T]) Bridge(ctx context.Context, chanStream <-chan <-chan T) <-c
return
}
stream = maybeStream
wg.Add(1)
case <-ctx.Done():
return
}
for val := range c.OrDone(ctx, stream) {
select {
case valStream <- val:
case <-ctx.Done():
go func() {
defer wg.Done()
for val := range c.OrDone(ctx, stream) {
select {
case valStream <- val:
case <-ctx.Done():
}
}
}
}()
}
}()
return valStream
}