// Copyright 2023 dudaodong@gmail.com. All rights resulterved. // Use of this source code is governed by MIT license // Package stream implements a sequence of elements supporting sequential and operations. // this package is an experiment to explore if stream in go can work as the way java does. its function is very limited. package stream import ( "bytes" "encoding/gob" "github.com/duke-git/lancet/v2/slice" "golang.org/x/exp/constraints" ) // A stream should implements methods: // type StreamI[T any] interface { // // part methods of Java Stream Specification. // Distinct() StreamI[T] // Filter(predicate func(item T) bool) StreamI[T] // FlatMap(mapper func(item T) StreamI[T]) StreamI[T] // Map(mapper func(item T) T) StreamI[T] // Peek(consumer func(item T)) StreamI[T] // Sorted(less func(a, b T) bool) StreamI[T] // Max(less func(a, b T) bool) (T, bool) // Min(less func(a, b T) bool) (T, bool) // Limit(maxSize int) StreamI[T] // Skip(n int) StreamI[T] // AllMatch(predicate func(item T) bool) bool // AnyMatch(predicate func(item T) bool) bool // NoneMatch(predicate func(item T) bool) bool // ForEach(consumer func(item T)) // Reduce(init T, accumulator func(a, b T) T) T // Count() int // FindFirst() (T, bool) // ToSlice() []T // // part of methods custom extension // Reverse() StreamI[T] // Range(start, end int) StreamI[T] // Concat(streams ...StreamI[T]) StreamI[T] // } type Stream[T any] struct { source []T } // Of creates a stream whose elements are the specified values. // Play: https://go.dev/play/p/jI6_iZZuVFE func Of[T any](elems ...T) Stream[T] { return FromSlice(elems) } // Generate stream where each element is generated by the provided generater function // Play: https://go.dev/play/p/rkOWL1yA3j9 func Generate[T any](generator func() func() (item T, ok bool)) Stream[T] { source := make([]T, 0) var zeroValue T for next, item, ok := generator(), zeroValue, true; ok; { item, ok = next() if ok { source = append(source, item) } } return FromSlice(source) } // FromSlice creates stream from slice. // Play: https://go.dev/play/p/wywTO0XZtI4 func FromSlice[T any](source []T) Stream[T] { return Stream[T]{source: source} } // FromChannel creates stream from channel. // Play: https://go.dev/play/p/9TZYugGMhXZ func FromChannel[T any](source <-chan T) Stream[T] { s := make([]T, 0) for v := range source { s = append(s, v) } return FromSlice(s) } // FromRange creates a number stream from start to end. both start and end are included. [start, end] // Play: https://go.dev/play/p/9Ex1-zcg-B- func FromRange[T constraints.Integer | constraints.Float](start, end, step T) Stream[T] { if end < start { panic("stream.FromRange: param start should be before param end") } else if step <= 0 { panic("stream.FromRange: param step should be positive") } l := int((end-start)/step) + 1 source := make([]T, l) for i := 0; i < l; i++ { source[i] = start + (T(i) * step) } return FromSlice(source) } // Concat creates a lazily concatenated stream whose elements are all the elements of the first stream followed by all the elements of the second stream. // Play: https://go.dev/play/p/HM4OlYk_OUC func Concat[T any](a, b Stream[T]) Stream[T] { source := make([]T, 0) source = append(source, a.source...) source = append(source, b.source...) return FromSlice(source) } // Distinct returns a stream that removes the duplicated items. // Play: https://go.dev/play/p/eGkOSrm64cB func (s Stream[T]) Distinct() Stream[T] { source := make([]T, 0) distinct := map[string]bool{} for _, v := range s.source { k := hashKey(v) if _, ok := distinct[k]; !ok { distinct[k] = true source = append(source, v) } } return FromSlice(source) } func hashKey(data any) string { buffer := bytes.NewBuffer(nil) encoder := gob.NewEncoder(buffer) err := encoder.Encode(data) if err != nil { panic("stream.hashKey: get hashkey failed") } return buffer.String() } // Filter returns a stream consisting of the elements of this stream that match the given predicate. // Play: https://go.dev/play/p/MFlSANo-buc func (s Stream[T]) Filter(predicate func(item T) bool) Stream[T] { source := make([]T, 0) for _, v := range s.source { if predicate(v) { source = append(source, v) } } return FromSlice(source) } // Map returns a stream consisting of the elements of this stream that apply the given function to elements of stream. // Play: https://go.dev/play/p/OtNQUImdYko func (s Stream[T]) Map(mapper func(item T) T) Stream[T] { source := make([]T, s.Count()) for i, v := range s.source { source[i] = mapper(v) } return FromSlice(source) } // Peek returns a stream consisting of the elements of this stream, additionally performing the provided action on each element as elements are consumed from the resulting stream. // Play: https://go.dev/play/p/u1VNzHs6cb2 func (s Stream[T]) Peek(consumer func(item T)) Stream[T] { for _, v := range s.source { consumer(v) } return s } // Skip returns a stream consisting of the remaining elements of this stream after discarding the first n elements of the stream. // If this stream contains fewer than n elements then an empty stream will be returned. // Play: https://go.dev/play/p/fNdHbqjahum func (s Stream[T]) Skip(n int) Stream[T] { if n <= 0 { return s } source := make([]T, 0) l := len(s.source) if n > l { return FromSlice(source) } source = make([]T, 0, l-n) for i := n; i < l; i++ { source = append(source, s.source[i]) } return FromSlice(source) } // Limit returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length. // Play: https://go.dev/play/p/qsO4aniDcGf func (s Stream[T]) Limit(maxSize int) Stream[T] { if s.source == nil { return s } if maxSize < 0 { return FromSlice([]T{}) } source := make([]T, 0, maxSize) for i := 0; i < len(s.source) && i < maxSize; i++ { source = append(source, s.source[i]) } return FromSlice(source) } // AllMatch returns whether all elements of this stream match the provided predicate. // Play: https://go.dev/play/p/V5TBpVRs-Cx func (s Stream[T]) AllMatch(predicate func(item T) bool) bool { for _, v := range s.source { if !predicate(v) { return false } } return true } // AnyMatch returns whether any elements of this stream match the provided predicate. // Play: https://go.dev/play/p/PTCnWn4OxSn func (s Stream[T]) AnyMatch(predicate func(item T) bool) bool { for _, v := range s.source { if predicate(v) { return true } } return false } // NoneMatch returns whether no elements of this stream match the provided predicate. // Play: https://go.dev/play/p/iWS64pL1oo3 func (s Stream[T]) NoneMatch(predicate func(item T) bool) bool { return !s.AnyMatch(predicate) } // ForEach performs an action for each element of this stream. // Play: https://go.dev/play/p/Dsm0fPqcidk func (s Stream[T]) ForEach(action func(item T)) { for _, v := range s.source { action(v) } } // Reduce performs a reduction on the elements of this stream, using an associative accumulation function, and returns an Optional describing the reduced value, if any. // Play: https://go.dev/play/p/6uzZjq_DJLU func (s Stream[T]) Reduce(initial T, accumulator func(a, b T) T) T { for _, v := range s.source { initial = accumulator(initial, v) } return initial } // Count returns the count of elements in the stream. // Play: https://go.dev/play/p/r3koY6y_Xo- func (s Stream[T]) Count() int { return len(s.source) } // FindFirst returns the first element of this stream and true, or zero value and false if the stream is empty. // Play: https://go.dev/play/p/9xEf0-6C1e3 func (s Stream[T]) FindFirst() (T, bool) { var result T if s.source == nil || len(s.source) == 0 { return result, false } return s.source[0], true } // FindLast returns the last element of this stream and true, or zero value and false if the stream is empty. // Play: https://go.dev/play/p/WZD2rDAW-2h func (s Stream[T]) FindLast() (T, bool) { var result T if s.source == nil || len(s.source) == 0 { return result, false } return s.source[len(s.source)-1], true } // Reverse returns a stream whose elements are reverse order of given stream. // Play: https://go.dev/play/p/A8_zkJnLHm4 func (s Stream[T]) Reverse() Stream[T] { l := len(s.source) source := make([]T, l) for i := 0; i < l; i++ { source[i] = s.source[l-1-i] } return FromSlice(source) } // Range returns a stream whose elements are in the range from start(included) to end(excluded) original stream. // Play: https://go.dev/play/p/indZY5V2f4j func (s Stream[T]) Range(start, end int) Stream[T] { if start < 0 { start = 0 } if end < 0 { end = 0 } if start >= end { return FromSlice([]T{}) } source := make([]T, 0) if end > len(s.source) { end = len(s.source) } for i := start; i < end; i++ { source = append(source, s.source[i]) } return FromSlice(source) } // Sorted returns a stream consisting of the elements of this stream, sorted according to the provided less function. // Play: https://go.dev/play/p/XXtng5uonFj func (s Stream[T]) Sorted(less func(a, b T) bool) Stream[T] { source := []T{} source = append(source, s.source...) slice.SortBy(source, less) return FromSlice(source) } // Max returns the maximum element of this stream according to the provided less function. // less: a > b // Play: https://go.dev/play/p/fm-1KOPtGzn func (s Stream[T]) Max(less func(a, b T) bool) (T, bool) { var max T if len(s.source) == 0 { return max, false } for i, v := range s.source { if less(v, max) || i == 0 { max = v } } return max, true } // Min returns the minimum element of this stream according to the provided less function. // less: a < b // Play: https://go.dev/play/p/vZfIDgGNRe_0 func (s Stream[T]) Min(less func(a, b T) bool) (T, bool) { var min T if len(s.source) == 0 { return min, false } for i, v := range s.source { if less(v, min) || i == 0 { min = v } } return min, true } // IndexOf returns the index of the first occurrence of the specified element in this stream, or -1 if this stream does not contain the element. // Play: https://go.dev/play/p/tBV5Nc-XDX2 func (s Stream[T]) IndexOf(target T, equal func(a, b T) bool) int { for i, v := range s.source { if equal(v, target) { return i } } return -1 } // LastIndexOf returns the index of the last occurrence of the specified element in this stream, or -1 if this stream does not contain the element. // Play: https://go.dev/play/p/CjeoNw2eac_G func (s Stream[T]) LastIndexOf(target T, equal func(a, b T) bool) int { for i := len(s.source) - 1; i >= 0; i-- { if equal(s.source[i], target) { return i } } return -1 } // ToSlice return the elements in the stream. // Play: https://go.dev/play/p/jI6_iZZuVFE func (s Stream[T]) ToSlice() []T { return s.source } func ToMap[T any, K comparable, V any](s Stream[T], mapper func(item T) (K, V)) map[K]V { result := map[K]V{} for _, v := range s.source { key, value := mapper(v) result[key] = value } return result }