mirror of
https://github.com/duke-git/lancet.git
synced 2026-03-01 00:35:28 +08:00
Compare commits
6 Commits
5e66bc6227
...
930bb9c839
| Author | SHA1 | Date | |
|---|---|---|---|
| 930bb9c839 | |||
| 3d8f1be212 | |||
| 13a4ed59fa | |||
| c799d10ce9 | |||
| 5ab322ade2 | |||
| d0ffc61842 |
@@ -0,0 +1,278 @@
|
||||
// Copyright 2023 dudaodong@gmail.com. All rights reserved.
|
||||
// Use of this source code is governed by MIT license
|
||||
|
||||
// Package async contain some featurese to support async programming. eg, promise, asycn/await, eventbus.
|
||||
package async
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/duke-git/lancet/v2/internal"
|
||||
)
|
||||
|
||||
// Promise represents the eventual completion (or failure) of an asynchronous operation and its resulting value.
|
||||
// ref : chebyrash/promise (https://github.com/chebyrash/promise)
|
||||
// see js promise: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise
|
||||
type Promise[T any] struct {
|
||||
runnable func(resolve func(T), reject func(error))
|
||||
result T
|
||||
err error
|
||||
|
||||
pending bool
|
||||
|
||||
mu *sync.Mutex
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
// New create a new promise instance.
|
||||
func New[T any](runnable func(resolve func(T), reject func(error))) *Promise[T] {
|
||||
if runnable == nil {
|
||||
panic("runnable function should not be nil")
|
||||
}
|
||||
|
||||
p := &Promise[T]{
|
||||
pending: true,
|
||||
mu: &sync.Mutex{},
|
||||
wg: &sync.WaitGroup{},
|
||||
}
|
||||
|
||||
defer p.run()
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *Promise[T]) run() {
|
||||
p.wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
if !p.pending {
|
||||
return
|
||||
}
|
||||
|
||||
if err := recover(); err != nil {
|
||||
p.reject(errors.New(fmt.Sprint(err)))
|
||||
}
|
||||
}()
|
||||
|
||||
p.runnable(p.resolve, p.reject)
|
||||
}()
|
||||
}
|
||||
|
||||
// Resolve returns a Promise that has been resolved with a given value.
|
||||
func Resolve[T any](resolution T) *Promise[T] {
|
||||
return &Promise[T]{
|
||||
result: resolution,
|
||||
pending: false,
|
||||
mu: &sync.Mutex{},
|
||||
wg: &sync.WaitGroup{},
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Promise[T]) resolve(value T) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if !p.pending {
|
||||
return
|
||||
}
|
||||
|
||||
p.result = value
|
||||
p.pending = false
|
||||
|
||||
p.wg.Done()
|
||||
}
|
||||
|
||||
// Reject returns a Promise that has been rejected with a given error.
|
||||
func Reject[T any](err error) *Promise[T] {
|
||||
return &Promise[T]{
|
||||
err: err,
|
||||
pending: false,
|
||||
mu: &sync.Mutex{},
|
||||
wg: &sync.WaitGroup{},
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Promise[T]) reject(err error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if !p.pending {
|
||||
return
|
||||
}
|
||||
|
||||
p.err = err
|
||||
p.pending = false
|
||||
|
||||
p.wg.Done()
|
||||
}
|
||||
|
||||
// Then allows chain calls to other promise methods.
|
||||
func Then[T1, T2 any](promise *Promise[T1], resolve1 func(value T1) T2) *Promise[T2] {
|
||||
return New(func(resolve2 func(T2), reject func(error)) {
|
||||
result, err := promise.Await()
|
||||
if err != nil {
|
||||
reject(err)
|
||||
return
|
||||
}
|
||||
resolve2(resolve1(result))
|
||||
})
|
||||
}
|
||||
|
||||
// Then allows chain calls to other promise methods.
|
||||
func (p *Promise[T]) Then(resolve func(value T) T) *Promise[T] {
|
||||
return New(func(_resolve func(T), reject func(error)) {
|
||||
result, err := p.Await()
|
||||
if err != nil {
|
||||
reject(err)
|
||||
return
|
||||
}
|
||||
_resolve(resolve(result))
|
||||
})
|
||||
}
|
||||
|
||||
// Catch allows to chain promises.
|
||||
func Catch[T any](promise *Promise[T], rejection func(err error) error) *Promise[T] {
|
||||
return New(func(resolve func(T), reject func(error)) {
|
||||
result, err := promise.Await()
|
||||
if err != nil {
|
||||
reject(rejection(err))
|
||||
return
|
||||
}
|
||||
resolve(result)
|
||||
})
|
||||
}
|
||||
|
||||
// Catch chain an existing promise with an intermediate reject function.
|
||||
func (p *Promise[T]) Catch(reject func(error) error) *Promise[T] {
|
||||
return New(func(resolve func(T), rej func(error)) {
|
||||
resutl, err := p.Await()
|
||||
if err != nil {
|
||||
rej(reject(err))
|
||||
return
|
||||
}
|
||||
resolve(resutl)
|
||||
})
|
||||
}
|
||||
|
||||
// Await blocks until the 'runable' to finish execution.
|
||||
func (p *Promise[T]) Await() (T, error) {
|
||||
p.wg.Wait()
|
||||
return p.result, p.err
|
||||
}
|
||||
|
||||
type tuple[T1, T2 any] struct {
|
||||
_1 T1
|
||||
_2 T2
|
||||
}
|
||||
|
||||
// All resolves when all of the promises have resolved, reject immediately upon any of the input promises rejecting.
|
||||
func All[T any](promises []*Promise[T]) *Promise[[]T] {
|
||||
if len(promises) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return New(func(resolve func([]T), reject func(error)) {
|
||||
valsChan := make(chan tuple[T, int], len(promises))
|
||||
errsChan := make(chan error, 1)
|
||||
|
||||
for idx, p := range promises {
|
||||
idx := idx
|
||||
_ = Then(p, func(data T) T {
|
||||
valsChan <- tuple[T, int]{_1: data, _2: idx}
|
||||
return data
|
||||
})
|
||||
_ = Catch(p, func(err error) error {
|
||||
errsChan <- err
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
resolutions := make([]T, len(promises))
|
||||
for idx := 0; idx < len(promises); idx++ {
|
||||
select {
|
||||
case val := <-valsChan:
|
||||
resolutions[val._2] = val._1
|
||||
case err := <-errsChan:
|
||||
reject(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
resolve(resolutions)
|
||||
})
|
||||
}
|
||||
|
||||
// Race will settle the first fullfiled promise among muti promises.
|
||||
func Race[T any](promises []*Promise[T]) *Promise[T] {
|
||||
if len(promises) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return New(func(resolve func(T), reject func(error)) {
|
||||
valsChan := make(chan T, 1)
|
||||
errsChan := make(chan error, 1)
|
||||
|
||||
for _, p := range promises {
|
||||
_ = Then(p, func(data T) T {
|
||||
valsChan <- data
|
||||
return data
|
||||
})
|
||||
_ = Catch(p, func(err error) error {
|
||||
errsChan <- err
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
select {
|
||||
case val := <-valsChan:
|
||||
resolve(val)
|
||||
case err := <-errsChan:
|
||||
reject(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Any resolves as soon as any of the input's Promises resolve, with the value of the resolved Promise.
|
||||
// Any rejects if all of the given Promises are rejected with a combination of all errors.
|
||||
func Any[T any](promises ...*Promise[T]) *Promise[T] {
|
||||
if len(promises) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return New(func(resolve func(T), reject func(error)) {
|
||||
valsChan := make(chan T, 1)
|
||||
errsChan := make(chan tuple[error, int], len(promises))
|
||||
|
||||
for idx, p := range promises {
|
||||
idx := idx
|
||||
_ = Then(p, func(data T) T {
|
||||
valsChan <- data
|
||||
return data
|
||||
})
|
||||
_ = Catch(p, func(err error) error {
|
||||
errsChan <- tuple[error, int]{_1: err, _2: idx}
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
errs := make([]error, len(promises))
|
||||
for idx := 0; idx < len(promises); idx++ {
|
||||
select {
|
||||
case val := <-valsChan:
|
||||
resolve(val)
|
||||
return
|
||||
case err := <-errsChan:
|
||||
errs[err._2] = err._1
|
||||
}
|
||||
}
|
||||
|
||||
errCombo := errs[0]
|
||||
for _, err := range errs[1:] {
|
||||
errCombo = internal.JoinError(err)
|
||||
}
|
||||
|
||||
reject(errCombo)
|
||||
})
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
// Copyright 2021 dudaodong@gmail.com. All rights reserved.
|
||||
// Use of this source code is governed by MIT license
|
||||
|
||||
// Package concurrency contain some functions to support concurrent programming. eg, goroutine, channel, async.
|
||||
// Package concurrency contain some functions to support concurrent programming. eg, goroutine, channel.
|
||||
package concurrency
|
||||
|
||||
import (
|
||||
|
||||
+1
-1
@@ -1,5 +1,5 @@
|
||||
# Concurrency
|
||||
Package concurrency contain some functions to support concurrent programming. eg, goroutine, channel, async.
|
||||
Package concurrency contain some functions to support concurrent programming. eg, goroutine, channel.
|
||||
|
||||
<div STYLE="page-break-after: always;"></div>
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# Concurrency
|
||||
并发包包含一些支持并发编程的功能。例如:goroutine, channel, async等。
|
||||
并发包包含一些支持并发编程的功能。例如:goroutine, channel等。
|
||||
|
||||
<div STYLE="page-break-after: always;"></div>
|
||||
|
||||
|
||||
+1
-2
@@ -1002,10 +1002,9 @@ func main() {
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
### <span id="ForEachWithBreak">ForEachWithBreak</span>
|
||||
|
||||
<p>Iterates over elements of slice and invokes function for each element, when iteratee return true, will break the for each loop.</p>
|
||||
<p>Iterates over elements of slice and invokes function for each element, when iteratee return false, will break the for each loop.</p>
|
||||
|
||||
<b>Signature:</b>
|
||||
|
||||
|
||||
@@ -542,7 +542,6 @@ func main() {
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
### <span id="Drop">Drop</span>
|
||||
|
||||
<p>从切片的头部删除n个元素。</p>
|
||||
@@ -1004,7 +1003,6 @@ func main() {
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
### <span id="ForEachWithBreak">ForEachWithBreak</span>
|
||||
|
||||
<p>遍历切片的元素并为每个元素调用iteratee函数,当iteratee函数返回false时,终止遍历。</p>
|
||||
@@ -1043,7 +1041,6 @@ func main() {
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
### <span id="GroupBy">GroupBy</span>
|
||||
|
||||
<p>迭代切片的元素,每个元素将按条件分组,返回两个切片</p>
|
||||
|
||||
@@ -0,0 +1,51 @@
|
||||
package internal
|
||||
|
||||
// Note: this file is copyed from the go standart repo (https://github.com/golang/go/blob/master/src/errors/join.go).
|
||||
// just in order to adapt under go1.9
|
||||
// do not use it outside lancet lib.
|
||||
|
||||
// Join returns an error that wraps the given errors.
|
||||
// Any nil error values are discarded.
|
||||
// Join returns nil if errs contains no non-nil values.
|
||||
// The error formats as the concatenation of the strings obtained
|
||||
// by calling the Error method of each element of errs, with a newline
|
||||
// between each string.
|
||||
func JoinError(errs ...error) error {
|
||||
n := 0
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
n++
|
||||
}
|
||||
}
|
||||
if n == 0 {
|
||||
return nil
|
||||
}
|
||||
e := &joinError{
|
||||
errs: make([]error, 0, n),
|
||||
}
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
e.errs = append(e.errs, err)
|
||||
}
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
type joinError struct {
|
||||
errs []error
|
||||
}
|
||||
|
||||
func (e *joinError) Error() string {
|
||||
var b []byte
|
||||
for i, err := range e.errs {
|
||||
if i > 0 {
|
||||
b = append(b, '\n')
|
||||
}
|
||||
b = append(b, err.Error()...)
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func (e *joinError) Unwrap() []error {
|
||||
return e.errs
|
||||
}
|
||||
+1
-1
@@ -422,7 +422,7 @@ func ForEach[T any](slice []T, iteratee func(index int, item T)) {
|
||||
}
|
||||
|
||||
// ForEachWithBreak iterates over elements of slice and invokes function for each element,
|
||||
// when iteratee return true, will break the for each loop.
|
||||
// when iteratee return false, will break the for each loop.
|
||||
func ForEachWithBreak[T any](slice []T, iteratee func(index int, item T) bool) {
|
||||
loop:
|
||||
for i, v := range slice {
|
||||
|
||||
Reference in New Issue
Block a user