1
0
mirror of https://github.com/duke-git/lancet.git synced 2026-03-01 00:35:28 +08:00

Compare commits

...

6 Commits

Author SHA1 Message Date
dudaodong 930bb9c839 feat: complete promise Any function 2023-03-16 15:59:06 +08:00
dudaodong 3d8f1be212 feat: add error_join.go to support join error under go1.19, for internal use. 2023-03-16 15:56:54 +08:00
dudaodong 13a4ed59fa doc: update comment for concurrency package 2023-03-16 15:35:26 +08:00
dudaodong c799d10ce9 feat: add promise All, Race, Any methods 2023-03-16 15:34:28 +08:00
dudaodong 5ab322ade2 feat: add async package, promise implemention 2023-03-16 14:49:07 +08:00
dudaodong d0ffc61842 doc: fix doc error 2023-03-16 10:36:06 +08:00
8 changed files with 622 additions and 297 deletions
+278
View File
@@ -0,0 +1,278 @@
// Copyright 2023 dudaodong@gmail.com. All rights reserved.
// Use of this source code is governed by MIT license
// Package async contain some featurese to support async programming. eg, promise, asycn/await, eventbus.
package async
import (
"errors"
"fmt"
"sync"
"github.com/duke-git/lancet/v2/internal"
)
// Promise represents the eventual completion (or failure) of an asynchronous operation and its resulting value.
// ref : chebyrash/promise (https://github.com/chebyrash/promise)
// see js promise: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise
type Promise[T any] struct {
runnable func(resolve func(T), reject func(error))
result T
err error
pending bool
mu *sync.Mutex
wg *sync.WaitGroup
}
// New create a new promise instance.
func New[T any](runnable func(resolve func(T), reject func(error))) *Promise[T] {
if runnable == nil {
panic("runnable function should not be nil")
}
p := &Promise[T]{
pending: true,
mu: &sync.Mutex{},
wg: &sync.WaitGroup{},
}
defer p.run()
return p
}
func (p *Promise[T]) run() {
p.wg.Add(1)
go func() {
defer func() {
if !p.pending {
return
}
if err := recover(); err != nil {
p.reject(errors.New(fmt.Sprint(err)))
}
}()
p.runnable(p.resolve, p.reject)
}()
}
// Resolve returns a Promise that has been resolved with a given value.
func Resolve[T any](resolution T) *Promise[T] {
return &Promise[T]{
result: resolution,
pending: false,
mu: &sync.Mutex{},
wg: &sync.WaitGroup{},
}
}
func (p *Promise[T]) resolve(value T) {
p.mu.Lock()
defer p.mu.Unlock()
if !p.pending {
return
}
p.result = value
p.pending = false
p.wg.Done()
}
// Reject returns a Promise that has been rejected with a given error.
func Reject[T any](err error) *Promise[T] {
return &Promise[T]{
err: err,
pending: false,
mu: &sync.Mutex{},
wg: &sync.WaitGroup{},
}
}
func (p *Promise[T]) reject(err error) {
p.mu.Lock()
defer p.mu.Unlock()
if !p.pending {
return
}
p.err = err
p.pending = false
p.wg.Done()
}
// Then allows chain calls to other promise methods.
func Then[T1, T2 any](promise *Promise[T1], resolve1 func(value T1) T2) *Promise[T2] {
return New(func(resolve2 func(T2), reject func(error)) {
result, err := promise.Await()
if err != nil {
reject(err)
return
}
resolve2(resolve1(result))
})
}
// Then allows chain calls to other promise methods.
func (p *Promise[T]) Then(resolve func(value T) T) *Promise[T] {
return New(func(_resolve func(T), reject func(error)) {
result, err := p.Await()
if err != nil {
reject(err)
return
}
_resolve(resolve(result))
})
}
// Catch allows to chain promises.
func Catch[T any](promise *Promise[T], rejection func(err error) error) *Promise[T] {
return New(func(resolve func(T), reject func(error)) {
result, err := promise.Await()
if err != nil {
reject(rejection(err))
return
}
resolve(result)
})
}
// Catch chain an existing promise with an intermediate reject function.
func (p *Promise[T]) Catch(reject func(error) error) *Promise[T] {
return New(func(resolve func(T), rej func(error)) {
resutl, err := p.Await()
if err != nil {
rej(reject(err))
return
}
resolve(resutl)
})
}
// Await blocks until the 'runable' to finish execution.
func (p *Promise[T]) Await() (T, error) {
p.wg.Wait()
return p.result, p.err
}
type tuple[T1, T2 any] struct {
_1 T1
_2 T2
}
// All resolves when all of the promises have resolved, reject immediately upon any of the input promises rejecting.
func All[T any](promises []*Promise[T]) *Promise[[]T] {
if len(promises) == 0 {
return nil
}
return New(func(resolve func([]T), reject func(error)) {
valsChan := make(chan tuple[T, int], len(promises))
errsChan := make(chan error, 1)
for idx, p := range promises {
idx := idx
_ = Then(p, func(data T) T {
valsChan <- tuple[T, int]{_1: data, _2: idx}
return data
})
_ = Catch(p, func(err error) error {
errsChan <- err
return err
})
}
resolutions := make([]T, len(promises))
for idx := 0; idx < len(promises); idx++ {
select {
case val := <-valsChan:
resolutions[val._2] = val._1
case err := <-errsChan:
reject(err)
return
}
}
resolve(resolutions)
})
}
// Race will settle the first fullfiled promise among muti promises.
func Race[T any](promises []*Promise[T]) *Promise[T] {
if len(promises) == 0 {
return nil
}
return New(func(resolve func(T), reject func(error)) {
valsChan := make(chan T, 1)
errsChan := make(chan error, 1)
for _, p := range promises {
_ = Then(p, func(data T) T {
valsChan <- data
return data
})
_ = Catch(p, func(err error) error {
errsChan <- err
return err
})
}
select {
case val := <-valsChan:
resolve(val)
case err := <-errsChan:
reject(err)
}
})
}
// Any resolves as soon as any of the input's Promises resolve, with the value of the resolved Promise.
// Any rejects if all of the given Promises are rejected with a combination of all errors.
func Any[T any](promises ...*Promise[T]) *Promise[T] {
if len(promises) == 0 {
return nil
}
return New(func(resolve func(T), reject func(error)) {
valsChan := make(chan T, 1)
errsChan := make(chan tuple[error, int], len(promises))
for idx, p := range promises {
idx := idx
_ = Then(p, func(data T) T {
valsChan <- data
return data
})
_ = Catch(p, func(err error) error {
errsChan <- tuple[error, int]{_1: err, _2: idx}
return err
})
}
errs := make([]error, len(promises))
for idx := 0; idx < len(promises); idx++ {
select {
case val := <-valsChan:
resolve(val)
return
case err := <-errsChan:
errs[err._2] = err._1
}
}
errCombo := errs[0]
for _, err := range errs[1:] {
errCombo = internal.JoinError(err)
}
reject(errCombo)
})
}
+1 -1
View File
@@ -1,7 +1,7 @@
// Copyright 2021 dudaodong@gmail.com. All rights reserved.
// Use of this source code is governed by MIT license
// Package concurrency contain some functions to support concurrent programming. eg, goroutine, channel, async.
// Package concurrency contain some functions to support concurrent programming. eg, goroutine, channel.
package concurrency
import (
+1 -1
View File
@@ -1,5 +1,5 @@
# Concurrency
Package concurrency contain some functions to support concurrent programming. eg, goroutine, channel, async.
Package concurrency contain some functions to support concurrent programming. eg, goroutine, channel.
<div STYLE="page-break-after: always;"></div>
+1 -1
View File
@@ -1,5 +1,5 @@
# Concurrency
并发包包含一些支持并发编程的功能。例如:goroutine, channel, async等。
并发包包含一些支持并发编程的功能。例如:goroutine, channel等。
<div STYLE="page-break-after: always;"></div>
+1 -2
View File
@@ -1002,10 +1002,9 @@ func main() {
}
```
### <span id="ForEachWithBreak">ForEachWithBreak</span>
<p>Iterates over elements of slice and invokes function for each element, when iteratee return true, will break the for each loop.</p>
<p>Iterates over elements of slice and invokes function for each element, when iteratee return false, will break the for each loop.</p>
<b>Signature:</b>
-3
View File
@@ -542,7 +542,6 @@ func main() {
}
```
### <span id="Drop">Drop</span>
<p>从切片的头部删除n个元素。</p>
@@ -1004,7 +1003,6 @@ func main() {
}
```
### <span id="ForEachWithBreak">ForEachWithBreak</span>
<p>遍历切片的元素并为每个元素调用iteratee函数,当iteratee函数返回false时,终止遍历。</p>
@@ -1043,7 +1041,6 @@ func main() {
}
```
### <span id="GroupBy">GroupBy</span>
<p>迭代切片的元素,每个元素将按条件分组,返回两个切片</p>
+51
View File
@@ -0,0 +1,51 @@
package internal
// Note: this file is copyed from the go standart repo (https://github.com/golang/go/blob/master/src/errors/join.go).
// just in order to adapt under go1.9
// do not use it outside lancet lib.
// Join returns an error that wraps the given errors.
// Any nil error values are discarded.
// Join returns nil if errs contains no non-nil values.
// The error formats as the concatenation of the strings obtained
// by calling the Error method of each element of errs, with a newline
// between each string.
func JoinError(errs ...error) error {
n := 0
for _, err := range errs {
if err != nil {
n++
}
}
if n == 0 {
return nil
}
e := &joinError{
errs: make([]error, 0, n),
}
for _, err := range errs {
if err != nil {
e.errs = append(e.errs, err)
}
}
return e
}
type joinError struct {
errs []error
}
func (e *joinError) Error() string {
var b []byte
for i, err := range e.errs {
if i > 0 {
b = append(b, '\n')
}
b = append(b, err.Error()...)
}
return string(b)
}
func (e *joinError) Unwrap() []error {
return e.errs
}
+1 -1
View File
@@ -422,7 +422,7 @@ func ForEach[T any](slice []T, iteratee func(index int, item T)) {
}
// ForEachWithBreak iterates over elements of slice and invokes function for each element,
// when iteratee return true, will break the for each loop.
// when iteratee return false, will break the for each loop.
func ForEachWithBreak[T any](slice []T, iteratee func(index int, item T) bool) {
loop:
for i, v := range slice {