Documentation
ΒΆ
Index ΒΆ
- Constants
- Variables
- func Abs() func(Observable[float64]) Observable[float64]
- func All[T any](predicate func(T) bool) func(Observable[T]) Observable[bool]
- func AllI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[bool]
- func AllIWithContext[T any](predicate func(ctx context.Context, item T, index int64) bool) func(Observable[T]) Observable[bool]
- func AllWithContext[T any](predicate func(ctx context.Context, item T) bool) func(Observable[T]) Observable[bool]
- func Average[T constraints.Numeric]() func(Observable[T]) Observable[float64]
- func BufferWhen[T, B any](boundary Observable[B]) func(Observable[T]) Observable[[]T]
- func BufferWithCount[T any](size int) func(Observable[T]) Observable[[]T]
- func BufferWithTime[T any](duration time.Duration) func(Observable[T]) Observable[[]T]
- func BufferWithTimeOrCount[T any](size int, duration time.Duration) func(Observable[T]) Observable[[]T]
- func Cast[T, U any]() func(Observable[T]) Observable[U]
- func Catch[T any](finally func(err error) Observable[T]) func(Observable[T]) Observable[T]
- func Ceil() func(Observable[float64]) Observable[float64]
- func CeilWithPrecision(places int) func(Observable[float64]) Observable[float64]
- func Clamp[T constraints.Numeric](lower, upper T) func(Observable[T]) Observable[T]
- func Collect[T any](obs Observable[T]) ([]T, error)
- func CollectWithContext[T any](ctx context.Context, obs Observable[T]) ([]T, context.Context, error)
- func CombineLatestAll[T any]() func(Observable[Observable[T]]) Observable[[]T]
- func CombineLatestAllAny() func(Observable[Observable[any]]) Observable[[]any]
- func CombineLatestWith[A, B any](obsB Observable[B]) func(Observable[A]) Observable[lo.Tuple2[A, B]]
- func CombineLatestWith1[A, B any](obsB Observable[B]) func(Observable[A]) Observable[lo.Tuple2[A, B]]
- func CombineLatestWith2[A, B, C any](obsB Observable[B], obsC Observable[C]) func(Observable[A]) Observable[lo.Tuple3[A, B, C]]
- func CombineLatestWith3[A, B, C, D any](obsB Observable[B], obsC Observable[C], obsD Observable[D]) func(Observable[A]) Observable[lo.Tuple4[A, B, C, D]]
- func CombineLatestWith4[A, B, C, D, E any](obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E]) func(Observable[A]) Observable[lo.Tuple5[A, B, C, D, E]]
- func ConcatAll[T any]() func(Observable[Observable[T]]) Observable[T]
- func ConcatWith[T any](obs ...Observable[T]) func(Observable[T]) Observable[T]
- func Contains[T any](predicate func(item T) bool) func(Observable[T]) Observable[bool]
- func ContainsI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[bool]
- func ContainsIWithContext[T any](predicate func(ctx context.Context, item T, index int64) bool) func(Observable[T]) Observable[bool]
- func ContainsWithContext[T any](predicate func(ctx context.Context, item T) bool) func(Observable[T]) Observable[bool]
- func ContextMap[T any](project func(ctx context.Context) context.Context) func(Observable[T]) Observable[T]
- func ContextMapI[T any](project func(ctx context.Context, index int64) context.Context) func(Observable[T]) Observable[T]
- func ContextReset[T any](newCtx context.Context) func(Observable[T]) Observable[T]
- func ContextWithDeadline[T any](deadline time.Time) func(Observable[T]) Observable[T]
- func ContextWithTimeout[T any](timeout time.Duration) func(Observable[T]) Observable[T]
- func ContextWithValue[T any](k, v any) func(Observable[T]) Observable[T]
- func Count[T any]() func(Observable[T]) Observable[int64]
- func DefaultIfEmpty[T any](defaultValue T) func(Observable[T]) Observable[T]
- func DefaultIfEmptyWithContext[T any](defaultCtx context.Context, defaultValue T) func(Observable[T]) Observable[T]
- func DefaultOnDroppedNotification(ctx context.Context, notification fmt.Stringer)
- func DefaultOnUnhandledError(ctx context.Context, err error)
- func Delay[T any](duration time.Duration) func(Observable[T]) Observable[T]
- func DelayEach[T any](duration time.Duration) func(Observable[T]) Observable[T]
- func Dematerialize[T any]() func(Observable[Notification[T]]) Observable[T]
- func Distinct[T comparable]() func(Observable[T]) Observable[T]
- func DistinctBy[T any, K comparable](keySelector func(item T) K) func(Observable[T]) Observable[T]
- func DistinctByWithContext[T any, K comparable](keySelector func(ctx context.Context, item T) (context.Context, K)) func(Observable[T]) Observable[T]
- func Do[T any](onNext func(value T), onError func(err error), onComplete func()) func(Observable[T]) Observable[T]
- func DoOnComplete[T any](onComplete func()) func(Observable[T]) Observable[T]
- func DoOnCompleteWithContext[T any](onComplete func(ctx context.Context)) func(Observable[T]) Observable[T]
- func DoOnError[T any](onError func(err error)) func(Observable[T]) Observable[T]
- func DoOnErrorWithContext[T any](onError func(ctx context.Context, err error)) func(Observable[T]) Observable[T]
- func DoOnFinalize[T any](onFinalize func()) func(Observable[T]) Observable[T]
- func DoOnNext[T any](onNext func(value T)) func(Observable[T]) Observable[T]
- func DoOnNextWithContext[T any](onNext func(ctx context.Context, value T)) func(Observable[T]) Observable[T]
- func DoOnSubscribe[T any](onSubscribe func()) func(Observable[T]) Observable[T]
- func DoOnSubscribeWithContext[T any](onSubscribe func(ctx context.Context)) func(Observable[T]) Observable[T]
- func DoWhile[T any](condition func() bool) func(Observable[T]) Observable[T]
- func DoWhileI[T any](condition func(index int64) bool) func(Observable[T]) Observable[T]
- func DoWhileIWithContext[T any](condition func(ctx context.Context, index int64) (context.Context, bool)) func(Observable[T]) Observable[T]
- func DoWhileWithContext[T any](condition func(ctx context.Context) (context.Context, bool)) func(Observable[T]) Observable[T]
- func DoWithContext[T any](onNext func(ctx context.Context, value T), ...) func(Observable[T]) Observable[T]
- func ElementAt[T any](nth int) func(Observable[T]) Observable[T]
- func ElementAtOrDefault[T any](nth int64, fallback T) func(Observable[T]) Observable[T]
- func EndWith[T any](suffixes ...T) func(Observable[T]) Observable[T]
- func Filter[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]
- func FilterI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]
- func FilterIWithContext[T any](...) func(Observable[T]) Observable[T]
- func FilterWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool)) func(Observable[T]) Observable[T]
- func Find[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]
- func FindI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]
- func FindIWithContext[T any](predicate func(ctx context.Context, item T, index int64) bool) func(Observable[T]) Observable[T]
- func FindWithContext[T any](predicate func(ctx context.Context, item T) bool) func(Observable[T]) Observable[T]
- func First[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]
- func FirstI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]
- func FirstIWithContext[T any](...) func(Observable[T]) Observable[T]
- func FirstWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool)) func(Observable[T]) Observable[T]
- func FlatMap[T, R any](project func(item T) Observable[R]) func(Observable[T]) Observable[R]
- func FlatMapI[T, R any](project func(item T, index int64) Observable[R]) func(Observable[T]) Observable[R]
- func FlatMapIWithContext[T, R any](project func(ctx context.Context, item T, index int64) Observable[R]) func(Observable[T]) Observable[R]
- func FlatMapWithContext[T, R any](project func(ctx context.Context, item T) Observable[R]) func(Observable[T]) Observable[R]
- func Flatten[T any]() func(Observable[[]T]) Observable[T]
- func Floor() func(Observable[float64]) Observable[float64]
- func FloorWithPrecision(places int) func(Observable[float64]) Observable[float64]
- func GroupBy[T any, K comparable](iteratee func(item T) K) func(Observable[T]) Observable[Observable[T]]
- func GroupByI[T any, K comparable](iteratee func(item T, index int64) K) func(Observable[T]) Observable[Observable[T]]
- func GroupByIWithContext[T any, K comparable](iteratee func(ctx context.Context, item T, index int64) (context.Context, K)) func(Observable[T]) Observable[Observable[T]]
- func GroupByWithContext[T any, K comparable](iteratee func(ctx context.Context, item T) (context.Context, K)) func(Observable[T]) Observable[Observable[T]]
- func Head[T any]() func(Observable[T]) Observable[T]
- func IgnoreElements[T any]() func(Observable[T]) Observable[T]
- func IgnoreOnDroppedNotification(ctx context.Context, notification fmt.Stringer)
- func IgnoreOnUnhandledError(ctx context.Context, err error)
- func Iif[T any](predicate func() bool, source1, source2 Observable[T]) func() Observable[T]
- func Last[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]
- func LastI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]
- func LastIWithContext[T any](...) func(Observable[T]) Observable[T]
- func LastWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool)) func(Observable[T]) Observable[T]
- func Map[T, R any](project func(item T) R) func(Observable[T]) Observable[R]
- func MapErr[T, R any](project func(item T) (R, error)) func(Observable[T]) Observable[R]
- func MapErrI[T, R any](project func(item T, index int64) (R, error)) func(Observable[T]) Observable[R]
- func MapErrIWithContext[T, R any](...) func(Observable[T]) Observable[R]
- func MapErrWithContext[T, R any](project func(ctx context.Context, item T) (R, context.Context, error)) func(Observable[T]) Observable[R]
- func MapI[T, R any](project func(item T, index int64) R) func(Observable[T]) Observable[R]
- func MapIWithContext[T, R any](project func(ctx context.Context, item T, index int64) (context.Context, R)) func(Observable[T]) Observable[R]
- func MapTo[T, R any](output R) func(Observable[T]) Observable[R]
- func MapWithContext[T, R any](project func(ctx context.Context, item T) (context.Context, R)) func(Observable[T]) Observable[R]
- func Materialize[T any]() func(Observable[T]) Observable[Notification[T]]
- func Max[T constraints.Numeric]() func(Observable[T]) Observable[T]
- func MergeAll[T any]() func(Observable[Observable[T]]) Observable[T]
- func MergeMap[T, R any](projection func(item T) Observable[R]) func(Observable[T]) Observable[R]
- func MergeMapI[T, R any](projection func(item T, index int64) Observable[R]) func(Observable[T]) Observable[R]
- func MergeMapIWithContext[T, R any](...) func(Observable[T]) Observable[R]
- func MergeMapWithContext[T, R any](projection func(ctx context.Context, item T) Observable[R]) func(Observable[T]) Observable[R]
- func MergeWith[T any](observables ...Observable[T]) func(Observable[T]) Observable[T]
- func MergeWith1[T any](obsB Observable[T]) func(Observable[T]) Observable[T]
- func MergeWith2[T any](obsB, obsC Observable[T]) func(Observable[T]) Observable[T]
- func MergeWith3[T any](obsB, obsC, obsD Observable[T]) func(Observable[T]) Observable[T]
- func MergeWith4[T any](obsB, obsC, obsD, obsE Observable[T]) func(Observable[T]) Observable[T]
- func MergeWith5[T any](obsB, obsC, obsD, obsE, obsF Observable[T]) func(Observable[T]) Observable[T]
- func Min[T constraints.Numeric]() func(Observable[T]) Observable[T]
- func NewScheduler()
- func ObserveOn[T any](bufferSize int) func(Observable[T]) Observable[T]
- func OnErrorResumeNextWith[T any](finally ...Observable[T]) func(Observable[T]) Observable[T]
- func OnErrorReturn[T any](finally T) func(Observable[T]) Observable[T]
- func Pairwise[T any]() func(Observable[T]) Observable[[]T]
- func PipeOp[First, Last any](operators ...any) func(Observable[First]) Observable[Last]
- func PipeOp1[A, B any](operator1 func(Observable[A]) Observable[B]) func(Observable[A]) Observable[B]
- func PipeOp2[A, B, C any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[C]
- func PipeOp3[A, B, C, D any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[D]
- func PipeOp4[A, B, C, D, E any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[E]
- func PipeOp5[A, B, C, D, E, F any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[F]
- func PipeOp6[A, B, C, D, E, F, G any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[G]
- func PipeOp7[A, B, C, D, E, F, G, H any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[H]
- func PipeOp8[A, B, C, D, E, F, G, H, I any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[I]
- func PipeOp9[A, B, C, D, E, F, G, H, I, J any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[J]
- func PipeOp10[A, B, C, D, E, F, G, H, I, J, K any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[K]
- func PipeOp11[A, B, C, D, E, F, G, H, I, J, K, L any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[L]
- func PipeOp12[A, B, C, D, E, F, G, H, I, J, K, L, M any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[M]
- func PipeOp13[A, B, C, D, E, F, G, H, I, J, K, L, M, N any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[N]
- func PipeOp14[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[O]
- func PipeOp15[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[P]
- func PipeOp16[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[Q]
- func PipeOp17[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[R]
- func PipeOp18[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[S]
- func PipeOp19[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[T]
- func PipeOp20[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[U]
- func PipeOp21[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[V]
- func PipeOp22[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[W]
- func PipeOp23[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[X]
- func PipeOp24[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y any](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[Y]
- func PipeOp25[...](operator1 func(Observable[A]) Observable[B], ...) func(Observable[A]) Observable[Z]
- func RaceWith[T any](sources ...Observable[T]) func(Observable[T]) Observable[T]
- func Reduce[T, R any](accumulator func(agg R, item T) R, seed R) func(Observable[T]) Observable[R]
- func ReduceI[T, R any](accumulator func(agg R, item T, index int64) R, seed R) func(Observable[T]) Observable[R]
- func ReduceIWithContext[T, R any](...) func(Observable[T]) Observable[R]
- func ReduceWithContext[T, R any](accumulator func(ctx context.Context, agg R, item T) (context.Context, R), ...) func(Observable[T]) Observable[R]
- func RepeatWith[T any](count int64) func(Observable[T]) Observable[T]
- func Retry[T any]() func(Observable[T]) Observable[T]
- func RetryWithConfig[T any](opts RetryConfig) func(Observable[T]) Observable[T]
- func Round() func(Observable[float64]) Observable[float64]
- func SampleTime[T any](interval time.Duration) func(Observable[T]) Observable[T]
- func SampleWhen[T, t any](tick Observable[t]) func(Observable[T]) Observable[T]
- func Scan[T, R any](reduce func(accumulator R, item T) R, seed R) func(Observable[T]) Observable[R]
- func ScanI[T, R any](reduce func(accumulator R, item T, index int64) R, seed R) func(Observable[T]) Observable[R]
- func ScanIWithContext[T, R any](...) func(Observable[T]) Observable[R]
- func ScanWithContext[T, R any](reduce func(ctx context.Context, accumulator R, item T) (context.Context, R), ...) func(Observable[T]) Observable[R]
- func SequenceEqual[T comparable](obsB Observable[T]) func(Observable[T]) Observable[bool]
- func Serialize[T any]() func(Observable[T]) Observable[T]
- func Share[T any]() func(Observable[T]) Observable[T]
- func ShareReplay[T any](bufferSize int) func(Observable[T]) Observable[T]
- func ShareReplayWithConfig[T any](bufferSize int, config ShareReplayConfig) func(Observable[T]) Observable[T]
- func ShareWithConfig[T any](config ShareConfig[T]) func(Observable[T]) Observable[T]
- func Skip[T any](count int64) func(Observable[T]) Observable[T]
- func SkipLast[T any](count int) func(Observable[T]) Observable[T]
- func SkipUntil[T, S any](signal Observable[S]) func(Observable[T]) Observable[T]
- func SkipWhile[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]
- func SkipWhileI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]
- func SkipWhileIWithContext[T any](...) func(Observable[T]) Observable[T]
- func SkipWhileWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool)) func(Observable[T]) Observable[T]
- func StartWith[T any](prefixes ...T) func(Observable[T]) Observable[T]
- func SubscribeOn[T any](bufferSize int) func(Observable[T]) Observable[T]
- func Sum[T constraints.Numeric]() func(Observable[T]) Observable[T]
- func Tail[T any]() func(Observable[T]) Observable[T]
- func Take[T any](count int64) func(Observable[T]) Observable[T]
- func TakeLast[T any](count int) func(Observable[T]) Observable[T]
- func TakeUntil[T, S any](signal Observable[S]) func(Observable[T]) Observable[T]
- func TakeWhile[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]
- func TakeWhileI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]
- func TakeWhileIWithContext[T any](...) func(Observable[T]) Observable[T]
- func TakeWhileWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool)) func(Observable[T]) Observable[T]
- func Tap[T any](onNext func(value T), onError func(err error), onComplete func()) func(Observable[T]) Observable[T]
- func TapOnComplete[T any](onComplete func()) func(Observable[T]) Observable[T]
- func TapOnCompleteWithContext[T any](onComplete func(ctx context.Context)) func(Observable[T]) Observable[T]
- func TapOnError[T any](onError func(err error)) func(Observable[T]) Observable[T]
- func TapOnErrorWithContext[T any](onError func(ctx context.Context, err error)) func(Observable[T]) Observable[T]
- func TapOnFinalize[T any](onFinalize func()) func(Observable[T]) Observable[T]
- func TapOnNext[T any](onNext func(value T)) func(Observable[T]) Observable[T]
- func TapOnNextWithContext[T any](onNext func(ctx context.Context, value T)) func(Observable[T]) Observable[T]
- func TapOnSubscribe[T any](onSubscribe func()) func(Observable[T]) Observable[T]
- func TapOnSubscribeWithContext[T any](onSubscribe func(ctx context.Context)) func(Observable[T]) Observable[T]
- func TapWithContext[T any](onNext func(ctx context.Context, value T), ...) func(Observable[T]) Observable[T]
- func ThrottleTime[T any](interval time.Duration) func(Observable[T]) Observable[T]
- func ThrottleWhen[T, t any](tick Observable[t]) func(Observable[T]) Observable[T]
- func ThrowIfEmpty[T any](throw func() error) func(Observable[T]) Observable[T]
- func ThrowOnContextCancel[T any]() func(Observable[T]) Observable[T]
- func TimeInterval[T any]() func(Observable[T]) Observable[IntervalValue[T]]
- func Timeout[T any](duration time.Duration) func(Observable[T]) Observable[T]
- func Timestamp[T any]() func(Observable[T]) Observable[TimestampValue[T]]
- func ToChannel[T any](size int) func(Observable[T]) Observable[<-chan Notification[T]]
- func ToMap[T any, K comparable, V any](project func(item T) (K, V)) func(Observable[T]) Observable[map[K]V]
- func ToMapI[T any, K comparable, V any](mapper func(item T, index int64) (K, V)) func(Observable[T]) Observable[map[K]V]
- func ToMapIWithContext[T any, K comparable, V any](mapper func(ctx context.Context, item T, index int64) (K, V)) func(Observable[T]) Observable[map[K]V]
- func ToMapWithContext[T any, K comparable, V any](project func(ctx context.Context, item T) (K, V)) func(Observable[T]) Observable[map[K]V]
- func ToSlice[T any]() func(Observable[T]) Observable[[]T]
- func Trunc() func(Observable[float64]) Observable[float64]
- func While[T any](condition func() bool) func(Observable[T]) Observable[T]
- func WhileI[T any](condition func(index int64) bool) func(Observable[T]) Observable[T]
- func WhileIWithContext[T any](condition func(ctx context.Context, index int64) (context.Context, bool)) func(Observable[T]) Observable[T]
- func WhileWithContext[T any](condition func(ctx context.Context) (context.Context, bool)) func(Observable[T]) Observable[T]
- func WindowWhen[T, B any](boundary Observable[B]) func(Observable[T]) Observable[Observable[T]]
- func ZipAll[T any]() func(Observable[Observable[T]]) Observable[[]T]
- func ZipWith[A, B any](obsB Observable[B]) func(Observable[A]) Observable[lo.Tuple2[A, B]]
- func ZipWith1[A, B any](obsB Observable[B]) func(Observable[A]) Observable[lo.Tuple2[A, B]]
- func ZipWith2[A, B, C any](obsB Observable[B], obsC Observable[C]) func(Observable[A]) Observable[lo.Tuple3[A, B, C]]
- func ZipWith3[A, B, C, D any](obsB Observable[B], obsC Observable[C], obsD Observable[D]) func(Observable[A]) Observable[lo.Tuple4[A, B, C, D]]
- func ZipWith4[A, B, C, D, E any](obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E]) func(Observable[A]) Observable[lo.Tuple5[A, B, C, D, E]]
- func ZipWith5[A, B, C, D, E, F any](obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E], ...) func(Observable[A]) Observable[lo.Tuple6[A, B, C, D, E, F]]
- type Backpressure
- type ConcurrencyMode
- type ConnectableConfig
- type ConnectableObservable
- func Connectable[T any](source Observable[T]) ConnectableObservable[T]
- func ConnectableWithConfig[T any](source Observable[T], config ConnectableConfig[T]) ConnectableObservable[T]
- func NewConnectableObservable[T any](subscribe func(destination Observer[T]) Teardown) ConnectableObservable[T]
- func NewConnectableObservableWithConfig[T any](subscribe func(destination Observer[T]) Teardown, config ConnectableConfig[T]) ConnectableObservable[T]
- func NewConnectableObservableWithConfigAndContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown, ...) ConnectableObservable[T]
- func NewConnectableObservableWithContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown) ConnectableObservable[T]
- type IntervalValue
- type Kind
- type Notification
- type Observable
- func Amb[T any](sources ...Observable[T]) Observable[T]
- func CombineLatest2[A, B any](obsA Observable[A], obsB Observable[B]) Observable[lo.Tuple2[A, B]]
- func CombineLatest3[A, B, C any](obsA Observable[A], obsB Observable[B], obsC Observable[C]) Observable[lo.Tuple3[A, B, C]]
- func CombineLatest4[A, B, C, D any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D]) Observable[lo.Tuple4[A, B, C, D]]
- func CombineLatest5[A, B, C, D, E any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D], ...) Observable[lo.Tuple5[A, B, C, D, E]]
- func CombineLatestAny(sources ...Observable[any]) Observable[[]any]
- func Concat[T any](obs ...Observable[T]) Observable[T]
- func Defer[T any](factory func() Observable[T]) Observable[T]
- func Empty[T any]() Observable[T]
- func FromChannel[T any](in <-chan T) Observable[T]
- func FromSlice[T any](collections ...[]T) Observable[T]
- func Future[T any](factory func() (T, error)) Observable[T]
- func Interval(interval time.Duration) Observable[int64]
- func IntervalWithInitial(initial, interval time.Duration) Observable[int64]
- func Just[T any](values ...T) Observable[T]
- func Merge[T any](sources ...Observable[T]) Observable[T]
- func Never() Observable[struct{}]
- func NewEventuallySafeObservable[T any](subscribe func(destination Observer[T]) Teardown) Observable[T]
- func NewEventuallySafeObservableWithContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown) Observable[T]
- func NewObservable[T any](subscribe func(destination Observer[T]) Teardown) Observable[T]
- func NewObservableWithConcurrencyMode[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown, ...) Observable[T]
- func NewObservableWithContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown) Observable[T]
- func NewSafeObservable[T any](subscribe func(destination Observer[T]) Teardown) Observable[T]
- func NewSafeObservableWithContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown) Observable[T]
- func NewUnsafeObservable[T any](subscribe func(destination Observer[T]) Teardown) Observable[T]
- func NewUnsafeObservableWithContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown) Observable[T]
- func Of[T any](values ...T) Observable[T]
- func Pipe[First, Last any](source Observable[First], operators ...any) Observable[Last]
- func Pipe1[A, B any](source Observable[A], operator1 func(Observable[A]) Observable[B]) Observable[B]
- func Pipe2[A, B, C any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[C]
- func Pipe3[A, B, C, D any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[D]
- func Pipe4[A, B, C, D, E any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[E]
- func Pipe5[A, B, C, D, E, F any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[F]
- func Pipe6[A, B, C, D, E, F, G any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[G]
- func Pipe7[A, B, C, D, E, F, G, H any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[H]
- func Pipe8[A, B, C, D, E, F, G, H, I any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[I]
- func Pipe9[A, B, C, D, E, F, G, H, I, J any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[J]
- func Pipe10[A, B, C, D, E, F, G, H, I, J, K any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[K]
- func Pipe11[A, B, C, D, E, F, G, H, I, J, K, L any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[L]
- func Pipe12[A, B, C, D, E, F, G, H, I, J, K, L, M any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[M]
- func Pipe13[A, B, C, D, E, F, G, H, I, J, K, L, M, N any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[N]
- func Pipe14[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[O]
- func Pipe15[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[P]
- func Pipe16[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[Q]
- func Pipe17[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[R]
- func Pipe18[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[S]
- func Pipe19[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[T]
- func Pipe20[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[U]
- func Pipe21[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[V]
- func Pipe22[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[W]
- func Pipe23[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[X]
- func Pipe24[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y any](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[Y]
- func Pipe25[...](source Observable[A], operator1 func(Observable[A]) Observable[B], ...) Observable[Z]
- func Race[T any](sources ...Observable[T]) Observable[T]
- func RandFloat64(count int) Observable[float64]
- func RandIntN(n, count int) Observable[int]
- func Range(start, end int64) Observable[int64]
- func RangeWithInterval(start, end int64, interval time.Duration) Observable[int64]
- func RangeWithStep(start, end, step float64) Observable[float64]
- func RangeWithStepAndInterval(start, end, step float64, interval time.Duration) Observable[float64]
- func Repeat[T any](item T, count int64) Observable[T]
- func RepeatWithInterval[T any](item T, count int64, interval time.Duration) Observable[T]
- func Start[T any](cb func() T) Observable[T]
- func Throw[T any](err error) Observable[T]
- func Timer(duration time.Duration) Observable[time.Duration]
- func Zip[T any](sources ...Observable[T]) Observable[[]T]
- func Zip2[A, B any](obsA Observable[A], obsB Observable[B]) Observable[lo.Tuple2[A, B]]
- func Zip3[A, B, C any](obsA Observable[A], obsB Observable[B], obsC Observable[C]) Observable[lo.Tuple3[A, B, C]]
- func Zip4[A, B, C, D any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D]) Observable[lo.Tuple4[A, B, C, D]]
- func Zip5[A, B, C, D, E any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D], ...) Observable[lo.Tuple5[A, B, C, D, E]]
- func Zip6[A, B, C, D, E, F any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D], ...) Observable[lo.Tuple6[A, B, C, D, E, F]]
- type Observer
- func NewObserver[T any](onNext func(value T), onError func(err error), onComplete func()) Observer[T]
- func NewObserverWithContext[T any](onNext func(ctx context.Context, value T), ...) Observer[T]
- func NoopObserver[T any]() Observer[T]
- func OnComplete[T any](onComplete func()) Observer[T]
- func OnCompleteWithContext[T any](onComplete func(ctx context.Context)) Observer[T]
- func OnError[T any](onError func(err error)) Observer[T]
- func OnErrorWithContext[T any](onError func(ctx context.Context, err error)) Observer[T]
- func OnNext[T any](onNext func(value T)) Observer[T]
- func OnNextWithContext[T any](onNext func(ctx context.Context, value T)) Observer[T]
- func PrintObserver[T any]() Observer[T]
- type RetryConfig
- type ShareConfig
- type ShareReplayConfig
- type Subject
- type Subscriber
- func NewEventuallySafeSubscriber[T any](destination Observer[T]) Subscriber[T]
- func NewSafeSubscriber[T any](destination Observer[T]) Subscriber[T]
- func NewSubscriber[T any](destination Observer[T]) Subscriber[T]
- func NewSubscriberWithConcurrencyMode[T any](destination Observer[T], mode ConcurrencyMode) Subscriber[T]
- func NewUnsafeSubscriber[T any](destination Observer[T]) Subscriber[T]
- type Subscription
- type Teardown
- type TimestampValue
- type Unsubscribable
Examples ΒΆ
- Abs (Error)
- Abs (Ok)
- All (Error)
- All (Ok)
- Amb (Error)
- Amb (Ok)
- Average (Error)
- Average (Ok)
- BufferWhen (Error)
- BufferWhen (Ok)
- BufferWithCount (Error)
- BufferWithCount (Ok)
- BufferWithTime (Error)
- BufferWithTime (Ok)
- BufferWithTimeOrCount (Error)
- BufferWithTimeOrCount (Ok)
- Catch
- Ceil (Error)
- Ceil (Ok)
- Clamp (Error)
- Clamp (Ok)
- CombineLatest2 (Error)
- CombineLatest2 (Ok)
- CombineLatest3 (Ok)
- CombineLatestAll (Error)
- CombineLatestAll (Ok)
- CombineLatestAllAny (Error)
- CombineLatestAllAny (Ok)
- CombineLatestAny (Error)
- CombineLatestAny (Ok)
- CombineLatestWith (Ok)
- CombineLatestWith1 (Ok)
- CombineLatestWith2 (Ok)
- CombineLatestWith3 (Ok)
- CombineLatestWith4 (Ok)
- Concat (Error)
- Concat (Ok)
- ConcatAll (Error)
- ConcatAll (Ok)
- ConcatWith (Error)
- ConcatWith (Ok)
- Contains (Error)
- Contains (Ok)
- ContextWithValue
- Count (Error)
- Count (Ok)
- DefaultIfEmpty (Error)
- DefaultIfEmpty (Ok)
- Defer
- Delay (Cancel)
- Delay (Error)
- Delay (Ok)
- Dematerialize (Error)
- Dematerialize (Ok)
- Distinct (Error)
- Distinct (Ok)
- DistinctBy (Error)
- DistinctBy (Ok)
- DoWhile
- ElementAt (Error)
- ElementAt (NotFound)
- ElementAt (Ok)
- ElementAtOrDefault (Error)
- ElementAtOrDefault (NotFound)
- ElementAtOrDefault (Ok)
- Empty
- EndWith (Error)
- EndWith (Ok)
- Filter (Error)
- Filter (Ok)
- Find (Error)
- Find (Ok)
- First (Error)
- First (Ok)
- FlatMap (Error)
- FlatMap (Ok)
- Floor (Error)
- Floor (Ok)
- FloorWithPrecision
- FloorWithPrecision (SmallNumbers)
- FromChannel
- FromSlice
- Future (Error)
- Future (Ok)
- GroupBy (Error)
- GroupBy (Ok)
- Head (Error)
- Head (Ok)
- IgnoreElements (Error)
- IgnoreElements (Ok)
- Iif (Error)
- Iif (Ok)
- Interval
- IntervalWithInitial
- Just
- Last (Error)
- Last (Ok)
- Map (Error)
- Map (Ok)
- MapErr (Error)
- MapErr (Ok)
- MapTo (Error)
- MapTo (Ok)
- Materialize (Error)
- Materialize (Ok)
- Max (Error)
- Max (Ok)
- Merge (Error)
- Merge (Ok)
- MergeAll (Error)
- MergeAll (Ok)
- MergeMap (Error)
- MergeMap (Ok)
- MergeWith (Error)
- MergeWith (Ok)
- MergeWith1 (Error)
- MergeWith1 (Ok)
- MergeWith2 (Error)
- MergeWith2 (Ok)
- MergeWith3 (Error)
- MergeWith3 (Ok)
- MergeWith4 (Error)
- MergeWith4 (Ok)
- MergeWith5 (Error)
- MergeWith5 (Ok)
- Min (Error)
- Min (Ok)
- Never
- NewAsyncSubject
- NewAsyncSubject (Empty)
- NewAsyncSubject (Error)
- NewBehaviorSubject
- NewBehaviorSubject (Empty)
- NewBehaviorSubject (Error)
- NewObservable (Error)
- NewObservable (Ok)
- NewObserver
- NewObserver (Empty)
- NewObserver (Error)
- NewPublishSubject
- NewPublishSubject (Empty)
- NewPublishSubject (Error)
- NewReplaySubject
- NewReplaySubject (Empty)
- NewReplaySubject (Error)
- NewReplaySubject (Overflow)
- NewUnicastSubject
- NewUnicastSubject (Empty)
- NewUnicastSubject (Error)
- NewUnicastSubject (Overflow)
- Of
- OnErrorResumeNextWith
- OnErrorReturn
- Pairwise (Error)
- Pairwise (Ok)
- Pipe
- Pipe1
- Pipe2
- Pipe3
- Pipe4
- Pipe5
- Pipe6
- PipeOp
- PipeOp4
- Race (Error)
- Race (Ok)
- RaceWith (Error)
- RaceWith (Ok)
- Range
- RangeWithInterval
- RangeWithStep
- RangeWithStepAndInterval
- Reduce (Error)
- Reduce (Ok)
- Repeat
- RepeatWith (Error)
- RepeatWith (Ok)
- RepeatWithInterval
- RetryWithConfig
- Round (Error)
- Round (Ok)
- Scan (Error)
- Scan (Ok)
- Skip (Error)
- Skip (Ok)
- SkipLast (Empty)
- SkipLast (Error)
- SkipLast (Ok)
- SkipUntil (Empty)
- SkipUntil (Error)
- SkipUntil (Ok)
- SkipWhile (Error)
- SkipWhile (Ok)
- Start
- StartWith (Error)
- StartWith (Ok)
- Sum (Error)
- Sum (Ok)
- Tail (Error)
- Tail (Ok)
- Take (Error1)
- Take (Error2)
- Take (Ok)
- TakeLast (Error)
- TakeLast (Ok)
- TakeUntil (Empty)
- TakeUntil (Error)
- TakeUntil (Ok)
- TakeWhile (Error1)
- TakeWhile (Error2)
- TakeWhile (Ok)
- Tap (Error)
- Tap (Ok)
- TapOnComplete (Error)
- TapOnComplete (Ok)
- TapOnError (Error)
- TapOnError (Ok)
- TapOnNext (Error)
- TapOnNext (Ok)
- Throw
- ThrowIfEmpty
- TimeInterval
- Timeout (Error)
- Timeout (Ok)
- Timer
- Timestamp
- ToChannel (Error)
- ToChannel (Ok)
- ToMap (Error)
- ToMap (Ok)
- ToSlice (Error)
- ToSlice (Ok)
- Trunc (Error)
- Trunc (Ok)
- While
- Zip (Error)
- Zip (Ok)
- Zip2 (Error)
- Zip2 (Ok)
- Zip3 (Error)
- Zip3 (Ok)
- Zip4 (Error)
- Zip4 (Ok)
- Zip5 (Error)
- Zip5 (Ok)
- Zip6 (Error)
- Zip6 (Ok)
- ZipAll (Error)
- ZipAll (Ok)
- ZipWith (Error)
- ZipWith (Ok)
- ZipWith1 (Error)
- ZipWith1 (Ok)
- ZipWith2 (Error)
- ZipWith2 (Ok)
- ZipWith3 (Error)
- ZipWith3 (Ok)
- ZipWith4 (Error)
- ZipWith4 (Ok)
- ZipWith5 (Error)
- ZipWith5 (Ok)
Constants ΒΆ
const ReplaySubjectUnlimitedBufferSize = -1
ReplaySubjectUnlimitedBufferSize is the unlimited buffer size for a ReplaySubject.
const UnicastSubjectUnlimitedBufferSize = -1
UnicastSubjectUnlimitedBufferSize is the unlimited buffer size for a UnicastSubject.
Variables ΒΆ
var ( //nolint:revive ErrRangeWithStepWrongStep = errors.New("ro.RangeWithStep: step must be greater than 0") ErrRangeWithStepAndIntervalWrongStep = errors.New("ro.RangeWithStepAndInterval: step must be greater than 0") ErrFirstEmpty = errors.New("ro.First: empty") ErrLastEmpty = errors.New("ro.Last: empty") ErrHeadEmpty = errors.New("ro.First: empty") ErrTailEmpty = errors.New("ro.Last: empty") ErrTakeWrongCount = errors.New("ro.Take: count must be greater or equal to 0") ErrTakeLastWrongCount = errors.New("ro.TakeLast: count must be greater than 0") ErrSkipWrongCount = errors.New("ro.Skip: count must be greater or equal to 0") ErrSkipLastWrongCount = errors.New("ro.SkipLast: count must be greater than 0") ErrElementAtWrongNth = errors.New("ro.ElementAt: nth must be greater or equal to 0") ErrElementAtNotFound = errors.New("ro.ElementAt: nth element not found") ErrElementAtOrDefaultWrongNth = errors.New("ro.ElementAtOrDefault: nth must be greater or equal to 0") ErrRepeatWrongCount = errors.New("ro.Repeat: count must be greater or equal to 0") ErrRepeatWithIntervalWrongCount = errors.New("ro.RepeatWithInterval: count must be greater or equal to 0") ErrRepeatWithWrongCount = errors.New("ro.RepeatWith: count must be greater or equal to 0") ErrBufferWithCountWrongSize = errors.New("ro.BufferWithCount: size must be greater than 0") ErrBufferWithTimeWrongDuration = errors.New("ro.BufferWithTime: duration must be greater than 0") ErrBufferWithTimeOrCountWrongSize = errors.New("ro.BufferWithTimeOrCount: size must be greater than 0") ErrBufferWithTimeOrCountWrongDuration = errors.New("ro.BufferWithTimeOrCount: duration must be greater than 0") ErrClampLowerLessThanUpper = errors.New("ro.Clamp: lower must be less than or equal to upper") ErrToChannelWrongSize = errors.New("ro.ErrToChannelWrongSize: size must be greater or equal to 0") ErrPoolWrongSize = errors.New("ro.Pool: size must be greater than 0") ErrSubscribeOnWrongBufferSize = errors.New("ro.SubscribeOn: buffer size must be greater than 0") ErrObserveOnWrongBufferSize = errors.New("ro.ObserveOn: buffer size must be greater than 0") ErrDetachOnWrongMode = errors.New("ro.detachOn: unexpected detach mode") ErrUnicastSubjectConcurrent = errors.New("ro.UnicastSubject: a single subscriber accepted") ErrConnectableObservableMissingConnectorFactory = errors.New("ro.ConnectableObservable: missing connector factory") )
var ( // OnUnhandledError is called when an error is emitted by an Observable and // no error handler is registered. OnUnhandledError = IgnoreOnUnhandledError // OnDroppedNotification is called when a notification is emitted by an Observable and // no notification handler is registered. OnDroppedNotification = IgnoreOnDroppedNotification )
Functions ΒΆ
func Abs ΒΆ
func Abs() func(Observable[float64]) Observable[float64]
Abs emits the absolute values emitted by the source Observable. Play: https://go.dev/play/p/WCzxrucg7BC
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[float64]) Teardown {
observer.Next(-3)
observer.Next(-2)
observer.Next(-1)
observer.Next(0)
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Abs(),
)
subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output: Next: 3 Next: 2 Next: 1 Next: 0 Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just[float64](-5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5), Abs(), ) subscription := observable.Subscribe(PrintObserver[float64]()) defer subscription.Unsubscribe()
Output: Next: 5 Next: 4 Next: 3 Next: 2 Next: 1 Next: 0 Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Completed
func All ΒΆ
func All[T any](predicate func(T) bool) func(Observable[T]) Observable[bool]
All determines whether all elements of an observable sequence satisfy a condition. Play: https://go.dev/play/p/t22F_crlA-l
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
All(func(i int) bool { return i > 0 }),
)
subscription := observable.Subscribe(PrintObserver[bool]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable1 := Pipe1(
Just(1, 2, 3, 4, 5),
All(func(i int) bool { return i > 0 }),
)
subscription1 := observable1.Subscribe(PrintObserver[bool]())
defer subscription1.Unsubscribe()
observable2 := Pipe1(
Just(1, 2, 3, 4, 5),
All(func(i int) bool { return i%2 == 0 }),
)
subscription2 := observable2.Subscribe(PrintObserver[bool]())
defer subscription2.Unsubscribe()
Output: Next: true Completed Next: false Completed
func AllI ΒΆ
func AllI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[bool]
AllI determines whether all elements of an observable sequence satisfy a condition.
func AllIWithContext ΒΆ
func AllIWithContext[T any](predicate func(ctx context.Context, item T, index int64) bool) func(Observable[T]) Observable[bool]
AllIWithContext determines whether all elements of an observable sequence satisfy a condition. Play: https://go.dev/play/p/UkOzE4wQXPG
func AllWithContext ΒΆ
func AllWithContext[T any](predicate func(ctx context.Context, item T) bool) func(Observable[T]) Observable[bool]
AllWithContext determines whether all elements of an observable sequence satisfy a condition. Play: https://go.dev/play/p/NEA7Zi7yVNh
func Average ΒΆ
func Average[T constraints.Numeric]() func(Observable[T]) Observable[float64]
Average calculates the average of the values emitted by the source Observable. It emits the average when the source completes. If the source is empty, it emits NaN. Play: https://go.dev/play/p/B0IhFEsQAin
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Average[int](),
)
subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), Average[int](), ) subscription := observable.Subscribe(PrintObserver[float64]()) defer subscription.Unsubscribe()
Output: Next: 3 Completed
func BufferWhen ΒΆ
func BufferWhen[T, B any](boundary Observable[B]) func(Observable[T]) Observable[[]T]
BufferWhen buffers the items emitted by an Observable until a second Observable emits an item. Then it emits the buffer and starts a new buffer. It repeats this process until the source Observable completes. If the boundary Observable completes, the buffer is emitted and the source Observable completes. If the source Observable errors, the buffer is emitted and the error is propagated. Play: https://go.dev/play/p/w8c_zuaLl9l
Example (Error) ΒΆ
observable := Pipe1( Throw[int64](assert.AnError), BufferWhen[int64](Interval(50*time.Millisecond)), ) subscription := observable.Subscribe(PrintObserver[[]int64]()) defer subscription.Unsubscribe() time.Sleep(200 * time.Millisecond)
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Interval(30*time.Millisecond), BufferWhen[int64](Interval(100*time.Millisecond)), ) subscription := observable.Subscribe(PrintObserver[[]int64]()) time.Sleep(250 * time.Millisecond) subscription.Unsubscribe()
Output: Next: [0 1 2] Next: [3 4 5]
func BufferWithCount ΒΆ
func BufferWithCount[T any](size int) func(Observable[T]) Observable[[]T]
BufferWithCount buffers the items emitted by an Observable until the buffer is full. Then it emits the buffer and starts a new buffer. It repeats this process until the source Observable completes. If the source Observable errors, the buffer is emitted and the error is propagated. If the source Observable completes, the buffer is emitted and the complete notification is propagated. If the specified count is reached, the buffer is emitted and a new buffer is started. Play: https://go.dev/play/p/IXhDtSybE4R
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
go func() {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
}()
return nil
}),
BufferWithCount[int](2),
)
subscription := observable.Subscribe(PrintObserver[[]int]())
time.Sleep(10 * time.Millisecond)
subscription.Unsubscribe()
Output: Next: [1 2] Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), BufferWithCount[int](2), ) subscription := observable.Subscribe(PrintObserver[[]int]()) time.Sleep(10 * time.Millisecond) subscription.Unsubscribe()
Output: Next: [1 2] Next: [3 4] Next: [5] Completed
func BufferWithTime ΒΆ
func BufferWithTime[T any](duration time.Duration) func(Observable[T]) Observable[[]T]
BufferWithTime buffers the items emitted by an Observable for a specified time. It emits the buffer and starts a new buffer. It repeats this process until the source Observable completes. If the source Observable errors, the buffer is emitted and the error is propagated. If the source Observable completes, the buffer is emitted and the complete notification is propagated. If the specified time is reached, the buffer is emitted and a new buffer is started. Play: https://go.dev/play/p/TfOhP-f_O45
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
go func() {
observer.Next(1)
time.Sleep(10 * time.Millisecond)
observer.Next(2)
time.Sleep(10 * time.Millisecond)
observer.Next(3)
time.Sleep(200 * time.Millisecond)
// 1 empty buffer
observer.Next(4)
observer.Error(assert.AnError)
observer.Next(5)
}()
return nil
}),
BufferWithTime[int](100*time.Millisecond),
)
subscription := observable.Subscribe(PrintObserver[[]int]())
time.Sleep(300 * time.Millisecond)
subscription.Unsubscribe()
Output: Next: [1 2 3] Next: [] Error: assert.AnError general error for testing
Example (Ok) ΒΆ
Commented because i get a weired conflict with other tests.
observable := Pipe1( RangeWithInterval(1, 6, 20*time.Millisecond), BufferWithTime[int64](70*time.Millisecond), ) subscription := observable.Subscribe(PrintObserver[[]int64]()) time.Sleep(200 * time.Millisecond) subscription.Unsubscribe()
Output: Next: [1 2 3] Next: [4 5] Completed
func BufferWithTimeOrCount ΒΆ
func BufferWithTimeOrCount[T any](size int, duration time.Duration) func(Observable[T]) Observable[[]T]
BufferWithTimeOrCount buffers the items emitted by an Observable for a specified time or count. It emits the buffer and starts a new buffer. It repeats this process until the source Observable completes. If the source Observable errors, the buffer is emitted and the error is propagated. If the source Observable completes, the buffer is emitted and the complete notification is propagated. If the specified time or count is reached, the buffer is emitted and a new buffer is started. Play: https://go.dev/play/p/NyiF19jUdQD
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
go func() {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
}()
return nil
}),
BufferWithTimeOrCount[int](2, 100*time.Millisecond),
)
subscription := observable.Subscribe(PrintObserver[[]int]())
time.Sleep(10 * time.Millisecond)
subscription.Unsubscribe()
Output: Next: [1 2] Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), BufferWithTimeOrCount[int](2, 100*time.Millisecond), ) subscription := observable.Subscribe(PrintObserver[[]int]()) time.Sleep(10 * time.Millisecond) subscription.Unsubscribe()
Output: Next: [1 2] Next: [3 4] Next: [5] Completed
func Cast ΒΆ
func Cast[T, U any]() func(Observable[T]) Observable[U]
Cast converts each value emitted by an Observable into a specified type. Play: https://go.dev/play/p/XUdqodfFyT6
func Catch ΒΆ
func Catch[T any](finally func(err error) Observable[T]) func(Observable[T]) Observable[T]
Catch catches errors on the observable to be handled by returning a new observable or throwing an error. Play: https://go.dev/play/p/0pVlxwjhdMT
Example ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
observer.Complete()
return nil
}),
Catch(func(err error) Observable[int] {
return Of(4, 5, 6)
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Next: 6 Completed
func Ceil ΒΆ
func Ceil() func(Observable[float64]) Observable[float64]
Ceil emits the ceiling of the values emitted by the source Observable. Play: https://go.dev/play/p/BlpeIki-oMG
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[float64]) Teardown {
observer.Next(1.1)
observer.Next(2.5)
observer.Next(3.9)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Ceil(),
)
subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output: Next: 2 Next: 3 Next: 4 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1.1, 2.4, 3.5, 4.9, 5.0), Ceil(), ) subscription := observable.Subscribe(PrintObserver[float64]()) defer subscription.Unsubscribe()
Output: Next: 2 Next: 3 Next: 4 Next: 5 Next: 5 Completed
func CeilWithPrecision ΒΆ
func CeilWithPrecision(places int) func(Observable[float64]) Observable[float64]
CeilWithPrecision emits the ceiling of the values emitted by the source Observable. It uses the provided decimal precision. Positive precisions apply the ceiling to the specified number of digits to the right of the decimal point, while negative precisions round to powers of ten.
func Clamp ΒΆ
func Clamp[T constraints.Numeric](lower, upper T) func(Observable[T]) Observable[T]
Clamp emits the number within the inclusive lower and upper bounds. Play: https://go.dev/play/p/fu8O-BixXPM
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Clamp(2, 4),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 2 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), Clamp(2, 4), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 2 Next: 2 Next: 3 Next: 4 Next: 4 Completed
func Collect ΒΆ
func Collect[T any](obs Observable[T]) ([]T, error)
Collect collects all values emitted by the source Observable and returns them as a slice. It waits for the source Observable to complete before returning. If the source Observable emits an error, the error is returned along with the values collected so far.
func CollectWithContext ΒΆ
func CollectWithContext[T any](ctx context.Context, obs Observable[T]) ([]T, context.Context, error)
CollectWithContext collects all values emitted by the source Observable and returns them as a slice. It waits for the source Observable to complete before returning. If the source Observable emits an error, the error is returned along with the values collected so far. @TODO: return more values, such as (isCanceled bool) or (duration time.Duration) ?
func CombineLatestAll ΒΆ
func CombineLatestAll[T any]() func(Observable[Observable[T]]) Observable[[]T]
CombineLatestAll combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/nT1qq9ipwZL
Example (Error) ΒΆ
observable := Pipe1( Just( RangeWithInterval(1, 3, 50*time.Millisecond), Delay[int64](75*time.Millisecond)(Throw[int64](assert.AnError)), RangeWithInterval(5, 7, 50*time.Millisecond), ), CombineLatestAll[int64](), ) subscription := observable.Subscribe(PrintObserver[[]int64]()) time.Sleep(200 * time.Millisecond) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just( RangeWithInterval(1, 3, 40*time.Millisecond), RangeWithInterval(3, 5, 60*time.Millisecond), Delay[int64](25*time.Millisecond)(RangeWithInterval(5, 7, 100*time.Millisecond)), ), CombineLatestAll[int64](), ) subscription := observable.Subscribe(PrintObserver[[]int64]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: [2 4 5] Next: [2 4 6] Completed
func CombineLatestAllAny ΒΆ
func CombineLatestAllAny() func(Observable[Observable[any]]) Observable[[]any]
CombineLatestAllAny combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/nKMychGg9KH
Example (Error) ΒΆ
observable1 := Map(func(x int64) any { return x })(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := Delay[any](75 * time.Millisecond)(Throw[any](assert.AnError))
observable3 := Of[any]("a", "b")
combined := Just(observable1, observable2, observable3)
observable := CombineLatestAllAny()(combined)
subscription := observable.Subscribe(PrintObserver[[]any]())
time.Sleep(200 * time.Millisecond)
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable1 := Map(func(x int64) any { return x })(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := Of[any]("a", "b")
observable3 := Delay[any](25 * time.Millisecond)(Of[any]("c", "d"))
combined := Just(observable1, observable2, observable3)
observable := CombineLatestAllAny()(combined)
subscription := observable.Subscribe(PrintObserver[[]any]())
time.Sleep(200 * time.Millisecond)
defer subscription.Unsubscribe()
Output: Next: [1 b d] Next: [2 b d] Completed
func CombineLatestWith ΒΆ
func CombineLatestWith[A, B any](obsB Observable[B]) func(Observable[A]) Observable[lo.Tuple2[A, B]]
CombineLatestWith combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes.
It is a curried function that takes the other Observable as an argument. Play: https://go.dev/play/p/yq7G8eItuzO
Example (Ok) ΒΆ
observable1 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := RangeWithInterval(3, 5, 50*time.Millisecond)
observable := Pipe2(
observable1,
CombineLatestWith[int64](observable2),
Map(func(snapshot lo.Tuple2[int64, int64]) []int64 {
return []int64{snapshot.A, snapshot.B}
}),
)
subscription := observable.Subscribe(PrintObserver[[]int64]())
time.Sleep(200 * time.Millisecond)
subscription.Unsubscribe()
Output: Next: [1 3] Next: [1 4] Next: [2 4] Completed
func CombineLatestWith1 ΒΆ
func CombineLatestWith1[A, B any](obsB Observable[B]) func(Observable[A]) Observable[lo.Tuple2[A, B]]
CombineLatestWith1 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes.
It is a curried function that takes the other Observable as an argument. Play: https://go.dev/play/p/KXb19PPjCb1
Example (Ok) ΒΆ
observable1 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := RangeWithInterval(3, 5, 50*time.Millisecond)
observable := Pipe1(
CombineLatestWith1[int64](observable2)(observable1),
Map(func(snapshot lo.Tuple2[int64, int64]) []int64 {
return []int64{snapshot.A, snapshot.B}
}),
)
subscription := observable.Subscribe(PrintObserver[[]int64]())
time.Sleep(200 * time.Millisecond)
defer subscription.Unsubscribe()
Output: Next: [1 3] Next: [1 4] Next: [2 4] Completed
func CombineLatestWith2 ΒΆ
func CombineLatestWith2[A, B, C any](obsB Observable[B], obsC Observable[C]) func(Observable[A]) Observable[lo.Tuple3[A, B, C]]
CombineLatestWith2 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes.
It is a curried function that takes the other Observable as an argument. Play: https://go.dev/play/p/hPDCDwEOB84
Example (Ok) ΒΆ
observable1 := Delay[int64](150 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := RangeWithInterval(3, 5, 50*time.Millisecond)
observable3 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(5, 7, 50*time.Millisecond))
combined := CombineLatestWith2[int64](observable2, observable3)(observable1)
observable := Map(func(snapshot lo.Tuple3[int64, int64, int64]) []int64 {
return []int64{snapshot.A, snapshot.B, snapshot.C}
})(combined)
subscription := observable.Subscribe(PrintObserver[[]int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: [1 4 6] Next: [2 4 6] Completed
func CombineLatestWith3 ΒΆ
func CombineLatestWith3[A, B, C, D any](obsB Observable[B], obsC Observable[C], obsD Observable[D]) func(Observable[A]) Observable[lo.Tuple4[A, B, C, D]]
CombineLatestWith3 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes.
It is a curried function that takes the other Observable as an argument. Play: https://go.dev/play/p/PcMxo8yakQq
Example (Ok) ΒΆ
observable1 := Delay[int64](175 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := RangeWithInterval(3, 5, 50*time.Millisecond)
observable3 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(5, 7, 50*time.Millisecond))
observable4 := Delay[int64](50 * time.Millisecond)(RangeWithInterval(7, 9, 50*time.Millisecond))
combined := CombineLatestWith3[int64](observable2, observable3, observable4)(observable1)
observable := Map(func(snapshot lo.Tuple4[int64, int64, int64, int64]) []int64 {
return []int64{snapshot.A, snapshot.B, snapshot.C, snapshot.D}
})(combined)
subscription := observable.Subscribe(PrintObserver[[]int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: [1 4 6 8] Next: [2 4 6 8] Completed
func CombineLatestWith4 ΒΆ
func CombineLatestWith4[A, B, C, D, E any](obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E]) func(Observable[A]) Observable[lo.Tuple5[A, B, C, D, E]]
CombineLatestWith4 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes.
It is a curried function that takes the other Observable as an argument.
Example (Ok) ΒΆ
observable1 := Delay[int64](200 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := RangeWithInterval(3, 5, 50*time.Millisecond)
observable3 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(5, 7, 50*time.Millisecond))
observable4 := Delay[int64](50 * time.Millisecond)(RangeWithInterval(7, 9, 50*time.Millisecond))
observable5 := Delay[int64](75 * time.Millisecond)(RangeWithInterval(9, 11, 50*time.Millisecond))
combined := CombineLatestWith4[int64](observable2, observable3, observable4, observable5)(observable1)
observable := Map(func(snapshot lo.Tuple5[int64, int64, int64, int64, int64]) []int64 {
return []int64{snapshot.A, snapshot.B, snapshot.C, snapshot.D, snapshot.E}
})(combined)
subscription := observable.Subscribe(PrintObserver[[]int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: [1 4 6 8 10] Next: [2 4 6 8 10] Completed
func ConcatAll ΒΆ
func ConcatAll[T any]() func(Observable[Observable[T]]) Observable[T]
ConcatAll concatenates the source Observable with other Observables. It subscribes to each inner Observable only after the previous one completes, maintaining their order. It completes when all inner Observables are done. Play: https://go.dev/play/p/zygV4Ld9tcv
Example (Error) ΒΆ
observable := Pipe1( Just( Just(1, 2, 3), Throw[int](assert.AnError), Just(4, 5, 6), ), ConcatAll[int](), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(30 * time.Millisecond) subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just( Just(1, 2, 3), Just(4, 5, 6), ), ConcatAll[int](), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(30 * time.Millisecond) subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Next: 6 Completed
func ConcatWith ΒΆ
func ConcatWith[T any](obs ...Observable[T]) func(Observable[T]) Observable[T]
ConcatWith concatenates the source Observable with other Observables. It subscribes to each inner Observable only after the previous one completes, maintaining their order. It completes when all inner Observables are done.
It is a curried function that takes the other Observables as arguments. Play: https://go.dev/play/p/nRHRSR2yNvd
Example (Error) ΒΆ
observable := Pipe1( Just(1, 2, 3), ConcatWith(Throw[int](assert.AnError)), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3), ConcatWith(Just(4, 5, 6)), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Next: 6 Completed
func Contains ΒΆ
func Contains[T any](predicate func(item T) bool) func(Observable[T]) Observable[bool]
Contains determines whether an observable sequence contains a specified element with an equality comparer. Play: https://go.dev/play/p/ldteqqGsMWM
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Contains(func(i int) bool { return i == 4 }),
)
subscription := observable.Subscribe(PrintObserver[bool]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable1 := Pipe1(
Just(1, 2, 3, 4, 5),
Contains(func(i int) bool { return i < 0 }),
)
subscription1 := observable1.Subscribe(PrintObserver[bool]())
defer subscription1.Unsubscribe()
observable2 := Pipe1(
Just(1, 2, 3, 4, 5),
Contains(func(i int) bool { return i%2 == 0 }),
)
subscription2 := observable2.Subscribe(PrintObserver[bool]())
defer subscription2.Unsubscribe()
Output: Next: false Completed Next: true Completed
func ContainsI ΒΆ
func ContainsI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[bool]
ContainsI determines whether an observable sequence contains a specified element with an equality comparer.
func ContainsIWithContext ΒΆ
func ContainsIWithContext[T any](predicate func(ctx context.Context, item T, index int64) bool) func(Observable[T]) Observable[bool]
ContainsIWithContext determines whether an observable sequence contains a specified element with an equality comparer. Play: https://go.dev/play/p/TkLfujMVNJb
func ContainsWithContext ΒΆ
func ContainsWithContext[T any](predicate func(ctx context.Context, item T) bool) func(Observable[T]) Observable[bool]
ContainsWithContext determines whether an observable sequence contains a specified element with an equality comparer. Play: https://go.dev/play/p/RPHkyiLrFVW
func ContextMap ΒΆ
func ContextMap[T any](project func(ctx context.Context) context.Context) func(Observable[T]) Observable[T]
ContextMap returns an Observable that emits the same items as the source Observable, but with a new context. The project function is called for each item emitted by the source Observable, and the context is replaced with the context returned by the project function. Play: https://go.dev/play/p/jbshjD3sb6M
func ContextMapI ΒΆ
func ContextMapI[T any](project func(ctx context.Context, index int64) context.Context) func(Observable[T]) Observable[T]
ContextMapI returns an Observable that emits the same items as the source Observable, but with a new context. The project function is called for each item emitted by the source Observable, and the context is replaced with the context returned by the project function. Play: https://go.dev/play/p/jbshjD3sb6M
func ContextReset ΒΆ
func ContextReset[T any](newCtx context.Context) func(Observable[T]) Observable[T]
ContextReset returns an Observable that emits the same items as the source Observable, but with a new context. If the new context is nil, it uses context.Background(). Play: https://go.dev/play/p/PgvV0SejJpH
func ContextWithDeadline ΒΆ
func ContextWithDeadline[T any](deadline time.Time) func(Observable[T]) Observable[T]
ContextWithDeadline returns an Observable that emits the same items as the source Observable, but adds a deadline to the context of each item. This operator should be chained with ThrowOnContextCancel. Play: https://go.dev/play/p/NPYFzhI2YDK
func ContextWithTimeout ΒΆ
func ContextWithTimeout[T any](timeout time.Duration) func(Observable[T]) Observable[T]
ContextWithTimeout returns an Observable that emits the same items as the source Observable, but adds a cancel function to the context of each item. This operator should be chained with ThrowOnContextCancel. Play: https://go.dev/play/p/1qijKGsyn0D
func ContextWithValue ΒΆ
func ContextWithValue[T any](k, v any) func(Observable[T]) Observable[T]
ContextWithValue returns an Observable that emits the same items as the source Observable, but adds a key-value pair to the context of each item. Play: https://go.dev/play/p/l70D6fuiVhK
Example ΒΆ
type contextValue struct{}
observable := Pipe2(
Just(1, 2, 3, 4, 5),
ContextWithValue[int](contextValue{}, 42),
Filter(func(i int) bool {
return i%2 == 0
}),
)
subscription := observable.Subscribe(
OnNextWithContext(func(ctx context.Context, value int) {
fmt.Printf("Next: %v\n", value)
fmt.Printf("Next context value: %v\n", ctx.Value(contextValue{}))
}),
)
defer subscription.Unsubscribe()
Output: Next: 2 Next context value: 42 Next: 4 Next context value: 42
func Count ΒΆ
func Count[T any]() func(Observable[T]) Observable[int64]
Count counts the number of values emitted by the source Observable. It emits the count when the source completes. Play: https://go.dev/play/p/igtOxOLeHPp
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Count[int](),
)
subscription := observable.Subscribe(PrintObserver[int64]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), Count[int](), ) subscription := observable.Subscribe(PrintObserver[int64]()) defer subscription.Unsubscribe()
Output: Next: 5 Completed
func DefaultIfEmpty ΒΆ
func DefaultIfEmpty[T any](defaultValue T) func(Observable[T]) Observable[T]
DefaultIfEmpty emits a default value if the source observable emits no items. Play: https://go.dev/play/p/WDh807OLPWv
Example (Error) ΒΆ
observable := Pipe1( Throw[int](assert.AnError), DefaultIfEmpty(42), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable1 := Pipe1( Just(1, 2, 3), DefaultIfEmpty(42), ) subscription1 := observable1.Subscribe(PrintObserver[int]()) defer subscription1.Unsubscribe() observable2 := Pipe1( Empty[int](), DefaultIfEmpty(42), ) subscription2 := observable2.Subscribe(PrintObserver[int]()) defer subscription2.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Completed Next: 42 Completed
func DefaultIfEmptyWithContext ΒΆ
func DefaultIfEmptyWithContext[T any](defaultCtx context.Context, defaultValue T) func(Observable[T]) Observable[T]
DefaultIfEmptyWithContext emits a default value if the source observable emits no items.
func DefaultOnDroppedNotification ΒΆ
DefaultOnDroppedNotification is the default implementation of `OnDroppedNotification`.
Since we cannot assign a generic callback to `OnDroppedNotification`, we had to use a `fmt.Stringer` instead a `Notification[T any]`.
func DefaultOnUnhandledError ΒΆ
DefaultOnUnhandledError is the default implementation of `OnUnhandledError`.
func Delay ΒΆ
func Delay[T any](duration time.Duration) func(Observable[T]) Observable[T]
Delay delays the emissions of the source Observable by a given duration without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Error and Complete notifications are delayed as well.
@TODO: set queue size ? Play: https://go.dev/play/p/K3md7WPtZGI
Example (Cancel) ΒΆ
observable := Pipe1( Of(1), Delay[int](100*time.Millisecond), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(50 * time.Millisecond) subscription.Unsubscribe() // canceled before first message
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Delay[int](10*time.Millisecond),
)
subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3), Delay[int](10*time.Millisecond), ) subscription := observable.Subscribe(PrintObserver[int]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 1 Next: 2 Next: 3 Completed
func DelayEach ΒΆ
func DelayEach[T any](duration time.Duration) func(Observable[T]) Observable[T]
DelayEach delays the emissions of the source Observable by a given duration without modifying the emitted items. Play: https://go.dev/play/p/dReP7-bffEU
func Dematerialize ΒΆ
func Dematerialize[T any]() func(Observable[Notification[T]]) Observable[T]
Dematerialize converts the source Observable of Notification instances back into a stream of items. Play: https://go.dev/play/p/oRymdDqkh25
Example (Error) ΒΆ
observable := Pipe1(
Just(
Notification[int]{Kind: KindNext, Value: 1, Err: nil},
Notification[int]{Kind: KindNext, Value: 2, Err: nil},
Notification[int]{Kind: KindNext, Value: 3, Err: nil},
Notification[int]{Kind: KindError, Value: 0, Err: assert.AnError},
),
Dematerialize[int](),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
Just(
Notification[int]{Kind: KindNext, Value: 1, Err: nil},
Notification[int]{Kind: KindNext, Value: 2, Err: nil},
Notification[int]{Kind: KindNext, Value: 3, Err: nil},
Notification[int]{Kind: KindComplete, Value: 0, Err: nil},
),
Dematerialize[int](),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Completed
func Distinct ΒΆ
func Distinct[T comparable]() func(Observable[T]) Observable[T]
Distinct suppresses duplicate items in an Observable. Play: https://go.dev/play/p/szxp8gO0_I7
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(1)
observer.Next(2)
observer.Next(2)
observer.Next(3)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
observer.Next(4)
return nil
}),
Distinct[int](),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 1, 2, 2, 3, 3, 4, 4, 5, 5), Distinct[int](), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Completed
func DistinctBy ΒΆ
func DistinctBy[T any, K comparable](keySelector func(item T) K) func(Observable[T]) Observable[T]
DistinctBy suppresses duplicate items in an Observable based on a key selector.
Example (Error) ΒΆ
type user struct {
id int
name string
}
observable := Pipe1(
NewObservable(func(observer Observer[user]) Teardown {
observer.Next(user{id: 1, name: "John"})
observer.Next(user{id: 2, name: "Jane"})
observer.Next(user{id: 1, name: "John"})
observer.Error(assert.AnError)
observer.Next(user{id: 3, name: "Jim"})
return nil
}),
DistinctBy(func(item user) int {
return item.id
}),
)
subscription := observable.Subscribe(PrintObserver[user]())
defer subscription.Unsubscribe()
Output: Next: {1 John} Next: {2 Jane} Error: assert.AnError general error for testing
Example (Ok) ΒΆ
type user struct {
id int
name string
}
observable := Pipe1(
Just(
user{id: 1, name: "John"},
user{id: 2, name: "Jane"},
user{id: 1, name: "John"},
user{id: 3, name: "Jim"},
),
DistinctBy(func(item user) int {
return item.id
}),
)
subscription := observable.Subscribe(PrintObserver[user]())
defer subscription.Unsubscribe()
Output: Next: {1 John} Next: {2 Jane} Next: {3 Jim} Completed
func DistinctByWithContext ΒΆ
func DistinctByWithContext[T any, K comparable](keySelector func(ctx context.Context, item T) (context.Context, K)) func(Observable[T]) Observable[T]
DistinctByWithContext suppresses duplicate items in an Observable based on a key selector. The context is passed to the key selector function.
func Do ΒΆ
func Do[T any](onNext func(value T), onError func(err error), onComplete func()) func(Observable[T]) Observable[T]
Do is an alias to Tap. Play: https://go.dev/play/p/s_BSHgxdjUR
func DoOnComplete ΒΆ
func DoOnComplete[T any](onComplete func()) func(Observable[T]) Observable[T]
DoOnComplete is an alias to TapOnComplete.
func DoOnCompleteWithContext ΒΆ
func DoOnCompleteWithContext[T any](onComplete func(ctx context.Context)) func(Observable[T]) Observable[T]
DoOnCompleteWithContext is an alias to TapOnCompleteWithContext.
func DoOnError ΒΆ
func DoOnError[T any](onError func(err error)) func(Observable[T]) Observable[T]
DoOnError is an alias to TapOnError.
func DoOnErrorWithContext ΒΆ
func DoOnErrorWithContext[T any](onError func(ctx context.Context, err error)) func(Observable[T]) Observable[T]
DoOnErrorWithContext is an alias to TapOnErrorWithContext.
func DoOnFinalize ΒΆ
func DoOnFinalize[T any](onFinalize func()) func(Observable[T]) Observable[T]
DoOnFinalize is an alias to TapOnFinalize. Play: https://go.dev/play/p/7en6T1q33WF
func DoOnNext ΒΆ
func DoOnNext[T any](onNext func(value T)) func(Observable[T]) Observable[T]
DoOnNext is an alias to TapOnNext.
func DoOnNextWithContext ΒΆ
func DoOnNextWithContext[T any](onNext func(ctx context.Context, value T)) func(Observable[T]) Observable[T]
DoOnNextWithContext is an alias to TapOnNextWithContext.
func DoOnSubscribe ΒΆ
func DoOnSubscribe[T any](onSubscribe func()) func(Observable[T]) Observable[T]
DoOnSubscribe is an alias to TapOnSubscribe.
func DoOnSubscribeWithContext ΒΆ
func DoOnSubscribeWithContext[T any](onSubscribe func(ctx context.Context)) func(Observable[T]) Observable[T]
DoOnSubscribeWithContext is an alias to TapOnSubscribe.
func DoWhile ΒΆ
func DoWhile[T any](condition func() bool) func(Observable[T]) Observable[T]
DoWhile repeats the source observable while the condition is true. It will complete when the condition is false. It will not emit any values if the source observable is empty. It will not emit any values if the source observable emits an error. Play: https://go.dev/play/p/nEWabaItDpn
Example ΒΆ
i := 0
observable := Pipe1(
Just(1, 2, 3),
DoWhile[int](func() bool {
i++
return i < 2
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 1 Next: 2 Next: 3 Completed
func DoWhileI ΒΆ
func DoWhileI[T any](condition func(index int64) bool) func(Observable[T]) Observable[T]
DoWhileI repeats the source observable while the condition is true. It will complete when the condition is false. It will not emit any values if the source observable is empty. It will not emit any values if the source observable emits an error. Play: https://go.dev/play/p/cxOA9gimkCq
func DoWhileIWithContext ΒΆ
func DoWhileIWithContext[T any](condition func(ctx context.Context, index int64) (context.Context, bool)) func(Observable[T]) Observable[T]
DoWhileIWithContext repeats the source observable while the condition is true. It will complete when the condition is false. It will not emit any values if the source observable is empty. It will not emit any values if the source observable emits an error. Play: https://go.dev/play/p/yMoCCnnvRRH
func DoWhileWithContext ΒΆ
func DoWhileWithContext[T any](condition func(ctx context.Context) (context.Context, bool)) func(Observable[T]) Observable[T]
DoWhileWithContext repeats the source observable while the condition is true. It will complete when the condition is false. It will not emit any values if the source observable is empty. It will not emit any values if the source observable emits an error.
func DoWithContext ΒΆ
func DoWithContext[T any](onNext func(ctx context.Context, value T), onError func(ctx context.Context, err error), onComplete func(ctx context.Context)) func(Observable[T]) Observable[T]
DoWithContext is an alias to Tap.
func ElementAt ΒΆ
func ElementAt[T any](nth int) func(Observable[T]) Observable[T]
ElementAt emits only the nth item emitted by an Observable. If the source Observable emits fewer than n items, ElementAt will emit an error. Play: https://go.dev/play/p/0YE1tCbPaDg
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
ElementAt[int](10),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (NotFound) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), ElementAt[int](10), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Error: ro.ElementAt: nth element not found
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), ElementAt[int](2), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 3 Completed
func ElementAtOrDefault ΒΆ
func ElementAtOrDefault[T any](nth int64, fallback T) func(Observable[T]) Observable[T]
ElementAtOrDefault emits only the nth item emitted by an Observable. If the source Observable emits fewer than n items, ElementAtOrDefault will emit a fallback value. Play: https://go.dev/play/p/DWMWPXkc8x4
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
ElementAtOrDefault(10, 100),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (NotFound) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), ElementAtOrDefault(10, 100), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 100 Completed
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), ElementAtOrDefault(2, 100), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 3 Completed
func EndWith ΒΆ
func EndWith[T any](suffixes ...T) func(Observable[T]) Observable[T]
EndWith emits the given values after emitting the values from the source Observable. Play: https://go.dev/play/p/9FPyf3bqJk_n
Example (Error) ΒΆ
observable := Pipe1( Throw[int](assert.AnError), EndWith(1, 2, 3), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3), EndWith(4, 5, 6), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Next: 6 Completed
func Filter ΒΆ
func Filter[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]
Filter emits only those items from an Observable that pass a predicate test. Play: https://go.dev/play/p/gjk_wULxyEW
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Filter(func(i int) bool {
return i%2 == 0
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 2 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
Just(1, 2, 3, 4, 5),
Filter(func(i int) bool {
return i%2 == 0
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 2 Next: 4 Completed
func FilterI ΒΆ
func FilterI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]
FilterI emits only those items from an Observable that pass a predicate test. Play: https://go.dev/play/p/Y5a2-AicBWO
func FilterIWithContext ΒΆ
func FilterIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool)) func(Observable[T]) Observable[T]
FilterIWithContext emits only those items from an Observable that pass a predicate test. Play: https://go.dev/play/p/xjz-pViifdB
func FilterWithContext ΒΆ
func FilterWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool)) func(Observable[T]) Observable[T]
FilterWithContext emits only those items from an Observable that pass a predicate test. Play: https://go.dev/play/p/y4gstlmx4KR
func Find ΒΆ
func Find[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]
Find returns the first element of an observable sequence that satisfies the condition. Play: https://go.dev/play/p/2f5rn0HoKeq
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Find(func(i int) bool { return i == 4 }),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable1 := Pipe1(
Just(1, 2, 3, 4, 5),
Find(func(i int) bool { return i < 0 }),
)
subscription1 := observable1.Subscribe(PrintObserver[int]())
defer subscription1.Unsubscribe()
observable2 := Pipe1(
Just(1, 2, 3, 4, 5),
Find(func(i int) bool { return i%2 == 0 }),
)
subscription2 := observable2.Subscribe(PrintObserver[int]())
defer subscription2.Unsubscribe()
Output: Completed Next: 2 Completed
func FindI ΒΆ
func FindI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]
FindI returns the first element of an observable sequence that satisfies the condition.
func FindIWithContext ΒΆ
func FindIWithContext[T any](predicate func(ctx context.Context, item T, index int64) bool) func(Observable[T]) Observable[T]
FindIWithContext returns the first element of an observable sequence that satisfies the condition. Play: https://go.dev/play/p/X8oT_CF9IKM
func FindWithContext ΒΆ
func FindWithContext[T any](predicate func(ctx context.Context, item T) bool) func(Observable[T]) Observable[T]
FindWithContext returns the first element of an observable sequence that satisfies the condition. Play: https://go.dev/play/p/BVm-Grgv11w
func First ΒΆ
func First[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]
First emits only the first item emitted by an Observable that satisfies a specified condition. If the source Observable is empty, First will emit an error. Play: https://go.dev/play/p/yneVKit6vh0
Example (Error) ΒΆ
observable1 := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
First(func(n int) bool {
return n > 2
}),
)
subscription1 := observable1.Subscribe(PrintObserver[int]())
defer subscription1.Unsubscribe()
observable2 := Pipe1(
Throw[int](assert.AnError), // no item transmitted
First(func(n int) bool {
return n > 2
}),
)
subscription2 := observable2.Subscribe(PrintObserver[int]())
defer subscription2.Unsubscribe()
Output: Next: 3 Completed Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
Just(1, 2, 3, 4, 5),
First(func(n int) bool {
return n > 2
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 3 Completed
func FirstI ΒΆ
func FirstI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]
FirstI emits only the first item emitted by an Observable that satisfies a specified condition. If the source Observable is empty, FirstI will emit an error.
func FirstIWithContext ΒΆ
func FirstIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool)) func(Observable[T]) Observable[T]
FirstIWithContext emits only the first item emitted by an Observable that satisfies a specified condition. If the source Observable is empty, FirstI will emit an error.
func FirstWithContext ΒΆ
func FirstWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool)) func(Observable[T]) Observable[T]
FirstWithContext emits only the first item emitted by an Observable that satisfies a specified condition. If the source Observable is empty, First will emit an error.
func FlatMap ΒΆ
func FlatMap[T, R any](project func(item T) Observable[R]) func(Observable[T]) Observable[R]
FlatMap transforms the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable. Play: https://go.dev/play/p/QBkDMwskibT
Example (Error) ΒΆ
observable1 := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
FlatMap(func(item int) Observable[int] {
return Just(item, item)
}),
)
subscription1 := observable1.Subscribe(PrintObserver[int]())
defer subscription1.Unsubscribe()
observable2 := Pipe1(
Just(1, 2, 3),
FlatMap(func(item int) Observable[int] {
if item == 2 {
return Throw[int](assert.AnError)
}
return Just(item, item)
}),
)
subscription2 := observable2.Subscribe(PrintObserver[int]())
defer subscription2.Unsubscribe()
Output: Next: 1 Next: 1 Next: 2 Next: 2 Next: 3 Next: 3 Error: assert.AnError general error for testing Next: 1 Next: 1 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
Just(1, 2, 3),
FlatMap(func(item int) Observable[int] {
return Just(item, item)
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 1 Next: 2 Next: 2 Next: 3 Next: 3 Completed
func FlatMapI ΒΆ
func FlatMapI[T, R any](project func(item T, index int64) Observable[R]) func(Observable[T]) Observable[R]
FlatMapI transforms the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable. Play: https://go.dev/play/p/H04QF1dltPI
func FlatMapIWithContext ΒΆ
func FlatMapIWithContext[T, R any](project func(ctx context.Context, item T, index int64) Observable[R]) func(Observable[T]) Observable[R]
FlatMapIWithContext transforms the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable. Play: https://go.dev/play/p/BCv4krqHEhI
func FlatMapWithContext ΒΆ
func FlatMapWithContext[T, R any](project func(ctx context.Context, item T) Observable[R]) func(Observable[T]) Observable[R]
FlatMapWithContext transforms the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable. Play: https://go.dev/play/p/lE04v4_lJ7M
func Flatten ΒΆ
func Flatten[T any]() func(Observable[[]T]) Observable[T]
Flatten flattens an Observable of Observables into a single Observable. Play: https://go.dev/play/p/vUyrQ4GO87S
func Floor ΒΆ
func Floor() func(Observable[float64]) Observable[float64]
Floor emits the floor of the values emitted by the source Observable. Play: https://go.dev/play/p/UulGlomv9K5
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[float64]) Teardown {
observer.Next(1.1)
observer.Next(2.5)
observer.Next(3.9)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Floor(),
)
subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1.1, 2.4, 3.5, 4.9, 5.0), Floor(), ) subscription := observable.Subscribe(PrintObserver[float64]()) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Completed
func FloorWithPrecision ΒΆ
func FloorWithPrecision(places int) func(Observable[float64]) Observable[float64]
FloorWithPrecision emits the floored values with decimal precision applied before flooring. The `places` parameter controls the decimal precision:
- positive `places` applies flooring to that many digits to the right of the decimal point (e.g. places=2 turns 1.234 -> 1.23),
- zero behaves like `Floor()` (floor to integer),
- negative `places` floors to powers of ten (e.g. places=-1 turns 123.45 -> 120).
For very large precision magnitudes the operator uses chunked big.Float arithmetic to avoid overflow. If the requested precision exceeds internal chunking caps the implementation will intentionally return the original source observable (no-op) to avoid unbounded allocations; callers should avoid extremely large `places` values for performance reasons.
Example ΒΆ
observable := Pipe1( Just(3.14159, 2.71828, -1.2345), FloorWithPrecision(2), ) subscription := observable.Subscribe(PrintObserver[float64]()) defer subscription.Unsubscribe()
Output: Next: 3.14 Next: 2.71 Next: -1.24 Completed
Example (SmallNumbers) ΒΆ
observable := Pipe1( Just(0.000123, -0.000987, math.Inf(1), math.NaN()), FloorWithPrecision(4), ) subscription := observable.Subscribe(PrintObserver[float64]()) defer subscription.Unsubscribe()
Output: Next: 0.0001 Next: -0.001 Next: +Inf Next: NaN Completed
func GroupBy ΒΆ
func GroupBy[T any, K comparable](iteratee func(item T) K) func(Observable[T]) Observable[Observable[T]]
GroupBy groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as Observables. Play: https://go.dev/play/p/GOL8imC0H5S
Example (Error) ΒΆ
odd := func(v int) bool { return v%2 == 0 }
observable := Pipe2(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
time.Sleep(5 * time.Millisecond)
observer.Next(2)
time.Sleep(5 * time.Millisecond)
observer.Next(3)
time.Sleep(5 * time.Millisecond)
observer.Error(assert.AnError)
time.Sleep(5 * time.Millisecond)
observer.Next(4)
return nil
}),
GroupBy(odd),
MergeAll[int](),
)
subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
odd := func(v int64) bool { return v%2 == 0 }
observable := Pipe2(
RangeWithInterval(1, 5, 10*time.Millisecond),
GroupBy(odd),
MergeAll[int64](),
)
subscription := observable.Subscribe(PrintObserver[int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 1 Next: 2 Next: 3 Next: 4 Completed
func GroupByI ΒΆ
func GroupByI[T any, K comparable](iteratee func(item T, index int64) K) func(Observable[T]) Observable[Observable[T]]
GroupByI groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as Observables.
func GroupByIWithContext ΒΆ
func GroupByIWithContext[T any, K comparable](iteratee func(ctx context.Context, item T, index int64) (context.Context, K)) func(Observable[T]) Observable[Observable[T]]
GroupByIWithContext groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as Observables. Play: https://go.dev/play/p/h7vpeD0djre
func GroupByWithContext ΒΆ
func GroupByWithContext[T any, K comparable](iteratee func(ctx context.Context, item T) (context.Context, K)) func(Observable[T]) Observable[Observable[T]]
GroupByWithContext groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as Observables.
func Head ΒΆ
func Head[T any]() func(Observable[T]) Observable[T]
Head emits only the first item emitted by an Observable. If the source Observable is empty, Head will emit an error. Play: https://go.dev/play/p/TmhTvpuKAp_U
Example (Error) ΒΆ
observable1 := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Head[int](),
)
subscription1 := observable1.Subscribe(PrintObserver[int]())
defer subscription1.Unsubscribe()
observable2 := Pipe1(
Throw[int](assert.AnError), // no item transmitted
Head[int](),
)
subscription2 := observable2.Subscribe(PrintObserver[int]())
defer subscription2.Unsubscribe()
Output: Next: 1 Completed Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), Head[int](), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 1 Completed
func IgnoreElements ΒΆ
func IgnoreElements[T any]() func(Observable[T]) Observable[T]
IgnoreElements does not emit any items from an Observable but mirrors its termination notification. It is useful for ignoring all the items from an Observable but you want to be notified when it completes or when it throws an error. Play: https://go.dev/play/p/glDG6E-gZ1V
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
IgnoreElements[int](),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), IgnoreElements[int](), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Completed
func IgnoreOnDroppedNotification ΒΆ
IgnoreOnDroppedNotification is the default implementation of `OnDroppedNotification`.
func IgnoreOnUnhandledError ΒΆ
IgnoreOnUnhandledError is the default implementation of `OnUnhandledError`.
func Iif ΒΆ
func Iif[T any](predicate func() bool, source1, source2 Observable[T]) func() Observable[T]
Iif determines which one of two observables to return based on a condition. Play: https://go.dev/play/p/t-sNgL5EZA-
Example (Error) ΒΆ
observable := Iif(
func() bool {
return false
},
Just(1, 2, 3),
Throw[int](assert.AnError),
)()
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Iif(
func() bool {
return true
},
Just(1, 2, 3),
Just(4, 5, 6),
)()
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Completed
func Last ΒΆ
func Last[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]
Last emits only the last item emitted by an Observable that satisfies a specified condition. If the source Observable is empty, Last will emit an error. Play: https://go.dev/play/p/aMsvsTPbmHY
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Last(func(n int) bool {
return n > 2
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
Just(1, 2, 3, 4, 5),
Last(func(n int) bool {
return n > 2
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 5 Completed
func LastI ΒΆ
func LastI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]
LastI emits only the last item emitted by an Observable that satisfies a specified condition. If the source Observable is empty, LastI will emit an error.
func LastIWithContext ΒΆ
func LastIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool)) func(Observable[T]) Observable[T]
LastIWithContext emits only the last item emitted by an Observable that satisfies a specified condition. If the source Observable is empty, LastI will emit an error.
func LastWithContext ΒΆ
func LastWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool)) func(Observable[T]) Observable[T]
LastWithContext emits only the last item emitted by an Observable that satisfies a specified condition. If the source Observable is empty, Last will emit an error.
func Map ΒΆ
func Map[T, R any](project func(item T) R) func(Observable[T]) Observable[R]
Map applies a given project function to each item emitted by an Observable and emits the result. Play: https://go.dev/play/p/JhTBEQFQGYr
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Map(func(x int) int {
return x * 2
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 2 Next: 4 Next: 6 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
Just(1, 2, 3, 4, 5),
Map(func(x int) int {
return x * 2
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 2 Next: 4 Next: 6 Next: 8 Next: 10 Completed
func MapErr ΒΆ
func MapErr[T, R any](project func(item T) (R, error)) func(Observable[T]) Observable[R]
MapErr applies a given project function to each item emitted by an Observable and emits the result. Play: https://go.dev/play/p/x7-KC-SDXr1
Example (Error) ΒΆ
observable1 := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
MapErr(func(item int) (string, error) {
return "Hey!", nil
}),
)
subscription1 := observable1.Subscribe(PrintObserver[string]())
defer subscription1.Unsubscribe()
observable2 := Pipe1(
Just(1, 2, 3, 4, 5),
MapErr(func(item int) (string, error) {
if item == 2 {
return "Hey!", assert.AnError
}
return "Hey!", nil
}),
)
subscription2 := observable2.Subscribe(PrintObserver[string]())
defer subscription2.Unsubscribe()
Output: Next: Hey! Next: Hey! Next: Hey! Error: assert.AnError general error for testing Next: Hey! Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
Just(1, 2, 3),
MapErr(func(item int) (string, error) {
return "Hey!", nil
}),
)
subscription := observable.Subscribe(PrintObserver[string]())
defer subscription.Unsubscribe()
Output: Next: Hey! Next: Hey! Next: Hey! Completed
func MapErrI ΒΆ
func MapErrI[T, R any](project func(item T, index int64) (R, error)) func(Observable[T]) Observable[R]
MapErrI applies a given project function to each item emitted by an Observable and emits the result. Play: https://go.dev/play/p/IAZJ9eQhNqN
func MapErrIWithContext ΒΆ
func MapErrIWithContext[T, R any](project func(ctx context.Context, item T, index int64) (R, context.Context, error)) func(Observable[T]) Observable[R]
MapErrIWithContext applies a given project function to each item emitted by an Observable and emits the result. Play: https://go.dev/play/p/OO8FayqJesp
func MapErrWithContext ΒΆ
func MapErrWithContext[T, R any](project func(ctx context.Context, item T) (R, context.Context, error)) func(Observable[T]) Observable[R]
MapErrWithContext applies a given project function to each item emitted by an Observable and emits the result. Play: https://go.dev/play/p/EIGtDpPq5y-
func MapI ΒΆ
func MapI[T, R any](project func(item T, index int64) R) func(Observable[T]) Observable[R]
MapI applies a given project function to each item emitted by an Observable and emits the result. Play: https://go.dev/play/p/F8IKEdyC4sl
func MapIWithContext ΒΆ
func MapIWithContext[T, R any](project func(ctx context.Context, item T, index int64) (context.Context, R)) func(Observable[T]) Observable[R]
MapIWithContext applies a given project function to each item emitted by an Observable and emits the result. Play: https://go.dev/play/p/dDFC9SU3FF1
func MapTo ΒΆ
func MapTo[T, R any](output R) func(Observable[T]) Observable[R]
MapTo emits a constant value for each item emitted by an Observable. Play: https://go.dev/play/p/Ghc5ar7GJag
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
MapTo[int]("Hey!"),
)
subscription := observable.Subscribe(PrintObserver[string]())
defer subscription.Unsubscribe()
Output: Next: Hey! Next: Hey! Next: Hey! Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe2(
Just(1, 2, 3, 4, 5),
MapTo[int]("Hey!"),
Take[string](3),
)
subscription := observable.Subscribe(PrintObserver[string]())
defer subscription.Unsubscribe()
Output: Next: Hey! Next: Hey! Next: Hey! Completed
func MapWithContext ΒΆ
func MapWithContext[T, R any](project func(ctx context.Context, item T) (context.Context, R)) func(Observable[T]) Observable[R]
MapWithContext applies a given project function to each item emitted by an Observable and emits the result. Play: https://go.dev/play/p/b6i0jQenObW
func Materialize ΒΆ
func Materialize[T any]() func(Observable[T]) Observable[Notification[T]]
Materialize converts the source Observable into a stream of Notification instances. Play: https://go.dev/play/p/ZHtPviPoqWK
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Materialize[int](),
)
subscription := observable.Subscribe(PrintObserver[Notification[int]]())
defer subscription.Unsubscribe()
Output: Next: Next(1) Next: Next(2) Next: Next(3) Next: Error(assert.AnError general error for testing) Completed
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3), Materialize[int](), ) subscription := observable.Subscribe(PrintObserver[Notification[int]]()) defer subscription.Unsubscribe()
Output: Next: Next(1) Next: Next(2) Next: Next(3) Next: Complete() Completed
func Max ΒΆ
func Max[T constraints.Numeric]() func(Observable[T]) Observable[T]
Max emits the maximum value emitted by the source Observable. It emits the maximum value when the source completes. If the source is empty, it emits no value. Play: https://go.dev/play/p/wWljVN6i1Ip
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Max[int](),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), Max[int](), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 5 Completed
func MergeAll ΒΆ
func MergeAll[T any]() func(Observable[Observable[T]]) Observable[T]
MergeAll converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables. It subscribes to each inner Observable as they arrive, and emits all values from each inner Observable, maintaining their order. It completes when all inner Observables are done. Play: https://go.dev/play/p/m3nHZZJbwMF
Example (Error) ΒΆ
observable := Pipe1( Just( Delay[int](10*time.Millisecond)(Just(1)), Delay[int](50*time.Millisecond)(Throw[int](assert.AnError)), Delay[int](100*time.Millisecond)(Just(3)), ), MergeAll[int](), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(300 * time.Millisecond) defer subscription.Unsubscribe()
Output: Next: 1 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just( Just(1), Delay[int](20*time.Millisecond)(Just(2)), Delay[int](40*time.Millisecond)(Just(3)), ), MergeAll[int](), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(60 * time.Millisecond) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Completed
func MergeMap ΒΆ
func MergeMap[T, R any](projection func(item T) Observable[R]) func(Observable[T]) Observable[R]
MergeMap applies a projection function to each item emitted by the source Observable and then merges the results into a single Observable. Play: https://go.dev/play/p/NwEyrLITshG
Example (Error) ΒΆ
observable := Pipe1(
Just("a", "bb", "ccc"),
MergeMap(func(item string) Observable[string] {
if item == "bb" {
return Throw[string](assert.AnError)
}
return Delay[string](time.Duration(len(item)) * 50 * time.Millisecond)(Just(strings.ToUpper(item)))
}),
)
subscription := observable.Subscribe(PrintObserver[string]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
Just("a", "bb", "ccc"),
MergeMap(func(item string) Observable[string] {
return Delay[string](time.Duration(len(item)) * 50 * time.Millisecond)(Just(strings.ToUpper(item)))
}),
)
subscription := observable.Subscribe(PrintObserver[string]())
time.Sleep(200 * time.Millisecond)
defer subscription.Unsubscribe()
Output: Next: A Next: BB Next: CCC Completed
func MergeMapI ΒΆ
func MergeMapI[T, R any](projection func(item T, index int64) Observable[R]) func(Observable[T]) Observable[R]
MergeMapI applies a projection function to each item emitted by the source Observable and then merges the results into a single Observable. Play: https://go.dev/play/p/dPDI7ch4g0i
func MergeMapIWithContext ΒΆ
func MergeMapIWithContext[T, R any](projection func(ctx context.Context, item T, index int64) (context.Context, Observable[R])) func(Observable[T]) Observable[R]
MergeMapIWithContext applies a projection function to each item emitted by the source Observable and then merges the results into a single Observable. Play: https://go.dev/play/p/8Ih5mCaDbB8
func MergeMapWithContext ΒΆ
func MergeMapWithContext[T, R any](projection func(ctx context.Context, item T) Observable[R]) func(Observable[T]) Observable[R]
MergeMapWithContext applies a projection function to each item emitted by the source Observable and then merges the results into a single Observable. Play: https://go.dev/play/p/i2Ru9sUdL-x
func MergeWith ΒΆ
func MergeWith[T any](observables ...Observable[T]) func(Observable[T]) Observable[T]
MergeWith merges the values from all observables to a single observable result. It subscribes to each inner Observable, and emits all values from each inner Observable, maintaining their order. It completes when all inner Observables are done.
It is a curried function that takes the first Observable as an argument. Play: https://go.dev/play/p/6QpUzcdRWJl
Example (Error) ΒΆ
observable := Pipe1( Throw[int](assert.AnError), MergeWith(Just(1, 2, 3, 4)), ) subscription := observable.Subscribe(PrintObserver[int]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2), MergeWith( Delay[int](20*time.Millisecond)(Just(3, 4)), Delay[int](40*time.Millisecond)(Just(5, 6)), ), ) subscription := observable.Subscribe(PrintObserver[int]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Next: 6 Completed
func MergeWith1 ΒΆ
func MergeWith1[T any](obsB Observable[T]) func(Observable[T]) Observable[T]
MergeWith1 merges the values from all observables to a single observable result. It subscribes to each inner Observable, and emits all values from each inner Observable, maintaining their order. It completes when all inner Observables are done.
It is a curried function that takes the first Observable as an argument. Play: https://go.dev/play/p/P47lkUFpYq7
Example (Error) ΒΆ
observable := Pipe1( Delay[int](20*time.Millisecond)(Throw[int](assert.AnError)), MergeWith(Just(1)), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(30 * time.Millisecond) defer subscription.Unsubscribe()
Output: Next: 1 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Delay[int](20*time.Millisecond)(Just(2)), MergeWith(Just(1)), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(30 * time.Millisecond) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Completed
func MergeWith2 ΒΆ
func MergeWith2[T any](obsB, obsC Observable[T]) func(Observable[T]) Observable[T]
MergeWith2 merges the values from all observables to a single observable result. It subscribes to each inner Observable, and emits all values from each inner Observable, maintaining their order. It completes when all inner Observables are done.
It is a curried function that takes the first Observable as an argument. Play: https://go.dev/play/p/LOQ3YbuDyC9
Example (Error) ΒΆ
observable := Pipe1( Delay[int](50*time.Millisecond)(Throw[int](assert.AnError)), MergeWith2( Just(1, 2), Delay[int](25*time.Millisecond)(Just(3)), ), ) subscription := observable.Subscribe(PrintObserver[int]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Delay[int](50*time.Millisecond)(Just(4, 5)), MergeWith2( Just(1, 2), Delay[int](25*time.Millisecond)(Just(3)), ), ) subscription := observable.Subscribe(PrintObserver[int]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Completed
func MergeWith3 ΒΆ
func MergeWith3[T any](obsB, obsC, obsD Observable[T]) func(Observable[T]) Observable[T]
MergeWith3 merges the values from all observables to a single observable result. It subscribes to each inner Observable, and emits all values from each inner Observable, maintaining their order. It completes when all inner Observables are done.
It is a curried function that takes the first Observable as an argument. Play: https://go.dev/play/p/pMQ5bNOlWj9
Example (Error) ΒΆ
observable := Pipe1( Delay[int](75*time.Millisecond)(Throw[int](assert.AnError)), MergeWith3( Just(1, 2), Delay[int](25*time.Millisecond)(Just(3, 4)), Delay[int](50*time.Millisecond)(Just(5, 6)), ), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(100 * time.Millisecond) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Next: 6 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Delay[int](75*time.Millisecond)(Just(7, 8)), MergeWith3( Just(1, 2), Delay[int](25*time.Millisecond)(Just(3, 4)), Delay[int](50*time.Millisecond)(Just(5, 6)), ), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(100 * time.Millisecond) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Next: 6 Next: 7 Next: 8 Completed
func MergeWith4 ΒΆ
func MergeWith4[T any](obsB, obsC, obsD, obsE Observable[T]) func(Observable[T]) Observable[T]
MergeWith4 merges the values from all observables to a single observable result. It subscribes to each inner Observable, and emits all values from each inner Observable, maintaining their order. It completes when all inner Observables are done.
It is a curried function that takes the first Observable as an argument. Play: https://go.dev/play/p/FvJTHVOe52s
Example (Error) ΒΆ
observable := Pipe1( Delay[int](100*time.Millisecond)(Throw[int](assert.AnError)), MergeWith4( Just(1, 2), Delay[int](25*time.Millisecond)(Just(3, 4)), Delay[int](50*time.Millisecond)(Just(5, 6)), Delay[int](75*time.Millisecond)(Just(7, 8)), ), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(120 * time.Millisecond) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Next: 6 Next: 7 Next: 8 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Delay[int](100*time.Millisecond)(Just(9, 10)), MergeWith4( Just(1, 2), Delay[int](25*time.Millisecond)(Just(3, 4)), Delay[int](50*time.Millisecond)(Just(5, 6)), Delay[int](75*time.Millisecond)(Just(7, 8)), ), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(120 * time.Millisecond) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Next: 6 Next: 7 Next: 8 Next: 9 Next: 10 Completed
func MergeWith5 ΒΆ
func MergeWith5[T any](obsB, obsC, obsD, obsE, obsF Observable[T]) func(Observable[T]) Observable[T]
MergeWith5 merges the values from all observables to a single observable result. It subscribes to each inner Observable, and emits all values from each inner Observable, maintaining their order. It completes when all inner Observables are done.
It is a curried function that takes the first Observable as an argument. Play: https://go.dev/play/p/kR3rFF7Bw-i
Example (Error) ΒΆ
observable := Pipe1( Delay[int](125*time.Millisecond)(Throw[int](assert.AnError)), MergeWith5( Just(1, 2), Delay[int](25*time.Millisecond)(Just(3, 4)), Delay[int](50*time.Millisecond)(Just(5, 6)), Delay[int](75*time.Millisecond)(Just(7, 8)), Delay[int](100*time.Millisecond)(Just(9, 10)), ), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(150 * time.Millisecond) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Next: 6 Next: 7 Next: 8 Next: 9 Next: 10 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Delay[int](125*time.Millisecond)(Just(11, 12)), MergeWith5( Just(1, 2), Delay[int](25*time.Millisecond)(Just(3, 4)), Delay[int](50*time.Millisecond)(Just(5, 6)), Delay[int](75*time.Millisecond)(Just(7, 8)), Delay[int](100*time.Millisecond)(Just(9, 10)), ), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(150 * time.Millisecond) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Next: 6 Next: 7 Next: 8 Next: 9 Next: 10 Next: 11 Next: 12 Completed
func Min ΒΆ
func Min[T constraints.Numeric]() func(Observable[T]) Observable[T]
Min emits the minimum value emitted by the source Observable. It emits the minimum value when the source completes. If the source is empty, it emits no value. Play: https://go.dev/play/p/SPK3L-NvZ98
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Min[int](),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), Min[int](), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 1 Completed
func NewScheduler ΒΆ
func NewScheduler()
NewScheduler just trolls other languages. π https://reactivex.io/documentation/scheduler.html
func ObserveOn ΒΆ
func ObserveOn[T any](bufferSize int) func(Observable[T]) Observable[T]
ObserveOn schedule the downstream flow to a different goroutine. Next, Error and Complete notifications are sent to a queue first, then the consumer consume this queue. ObserveOn converts a push-based Observable into a pullable stream with backpressure capabilities.
To schedule the upstream flow to a different goroutine, refer to SubscribeOn.
When an Observable emits values faster than they can be consumed, ObserveOn buffers these values in a queue of specified capacity. This allows downstream consumers to pull values at their own pace while managing backpressure from upstream emissions.
Note: Once the buffer reaches its capacity, upstream emissions will block until space becomes available, effectively implementing backpressure control.
@TODO: add a backpressure policy ? drop vs block. Play: https://go.dev/play/p/BpdKJ6Mya03
func OnErrorResumeNextWith ΒΆ
func OnErrorResumeNextWith[T any](finally ...Observable[T]) func(Observable[T]) Observable[T]
OnErrorResumeNextWith instructs an Observable to begin emitting a second Observable sequence if it encounters an error or completes. It immediately subscribes to the next one that was passed. Play: https://go.dev/play/p/9XLTAOginbK
Example ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
observer.Complete()
return nil
}),
OnErrorResumeNextWith(Of(4, 5, 6)),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Next: 6 Completed
func OnErrorReturn ΒΆ
func OnErrorReturn[T any](finally T) func(Observable[T]) Observable[T]
OnErrorReturn instructs an Observable to emit a particular item when it encounters an error. It will then complete the sequence. Play: https://go.dev/play/p/d_9xe1oedjU
Example ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
observer.Complete()
return nil
}),
OnErrorReturn(42),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 42 Completed
func Pairwise ΒΆ
func Pairwise[T any]() func(Observable[T]) Observable[[]T]
Pairwise emits the previous and current values as a pair of two values. Play: https://go.dev/play/p/0YujgFTL4e0
Example (Error) ΒΆ
observable := Pipe1( Throw[int](assert.AnError), Pairwise[int](), ) subscription := observable.Subscribe(PrintObserver[[]int]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
obsercable := Pipe1( Just(1, 2, 3, 4, 5), Pairwise[int](), ) subscription := obsercable.Subscribe(PrintObserver[[]int]()) defer subscription.Unsubscribe()
Output: Next: [1 2] Next: [2 3] Next: [3 4] Next: [4 5] Completed
func PipeOp ΒΆ
func PipeOp[First, Last any](operators ...any) func(Observable[First]) Observable[Last]
PipeOp is similar to Pipe, but can be used as an operator.
Example ΒΆ
observable := Pipe1(
Just(1, 2, 3, 4, 5),
Map(func(x int) int {
return x + 1
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 2 Next: 3 Next: 4 Next: 5 Next: 6 Completed
func PipeOp1 ΒΆ
func PipeOp1[A, B any]( operator1 func(Observable[A]) Observable[B], ) func(Observable[A]) Observable[B]
PipeOp1 is similar to Pipe1, but can be used as an operator.
func PipeOp2 ΒΆ
func PipeOp2[A, B, C any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], ) func(Observable[A]) Observable[C]
PipeOp2 is similar to Pipe2, but can be used as an operator.
func PipeOp3 ΒΆ
func PipeOp3[A, B, C, D any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], ) func(Observable[A]) Observable[D]
PipeOp3 is similar to Pipe3, but can be used as an operator.
func PipeOp4 ΒΆ
func PipeOp4[A, B, C, D, E any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], ) func(Observable[A]) Observable[E]
PipeOp4 is similar to Pipe4, but can be used as an operator.
Example ΒΆ
observable := Pipe3(
Just(1, 2, 3, 4, 5),
Map(func(x int) int {
return x * 2
}),
Filter(func(x int) bool {
return x%2 == 0
}),
Take[int](2),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 2 Next: 4 Completed
func PipeOp5 ΒΆ
func PipeOp5[A, B, C, D, E, F any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], ) func(Observable[A]) Observable[F]
PipeOp5 is similar to Pipe5, but can be used as an operator.
func PipeOp6 ΒΆ
func PipeOp6[A, B, C, D, E, F, G any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], ) func(Observable[A]) Observable[G]
PipeOp6 is similar to Pipe6, but can be used as an operator.
func PipeOp7 ΒΆ
func PipeOp7[A, B, C, D, E, F, G, H any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], ) func(Observable[A]) Observable[H]
PipeOp7 is similar to Pipe7, but can be used as an operator.
func PipeOp8 ΒΆ
func PipeOp8[A, B, C, D, E, F, G, H, I any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], ) func(Observable[A]) Observable[I]
PipeOp8 is similar to Pipe8, but can be used as an operator.
func PipeOp9 ΒΆ
func PipeOp9[A, B, C, D, E, F, G, H, I, J any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], ) func(Observable[A]) Observable[J]
PipeOp9 is similar to Pipe9, but can be used as an operator.
func PipeOp10 ΒΆ
func PipeOp10[A, B, C, D, E, F, G, H, I, J, K any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], ) func(Observable[A]) Observable[K]
PipeOp10 is similar to Pipe10, but can be used as an operator.
func PipeOp11 ΒΆ
func PipeOp11[A, B, C, D, E, F, G, H, I, J, K, L any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], ) func(Observable[A]) Observable[L]
PipeOp11 is similar to Pipe11, but can be used as an operator.
func PipeOp12 ΒΆ
func PipeOp12[A, B, C, D, E, F, G, H, I, J, K, L, M any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], ) func(Observable[A]) Observable[M]
PipeOp12 is similar to Pipe12, but can be used as an operator.
func PipeOp13 ΒΆ
func PipeOp13[A, B, C, D, E, F, G, H, I, J, K, L, M, N any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], ) func(Observable[A]) Observable[N]
PipeOp13 is similar to Pipe13, but can be used as an operator.
func PipeOp14 ΒΆ
func PipeOp14[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], ) func(Observable[A]) Observable[O]
PipeOp14 is similar to Pipe14, but can be used as an operator.
func PipeOp15 ΒΆ
func PipeOp15[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], ) func(Observable[A]) Observable[P]
PipeOp15 is similar to Pipe15, but can be used as an operator.
func PipeOp16 ΒΆ
func PipeOp16[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], ) func(Observable[A]) Observable[Q]
PipeOp16 is similar to Pipe16, but can be used as an operator.
func PipeOp17 ΒΆ
func PipeOp17[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], ) func(Observable[A]) Observable[R]
PipeOp17 is similar to Pipe17, but can be used as an operator.
func PipeOp18 ΒΆ
func PipeOp18[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], operator18 func(Observable[R]) Observable[S], ) func(Observable[A]) Observable[S]
PipeOp18 is similar to Pipe18, but can be used as an operator.
func PipeOp19 ΒΆ
func PipeOp19[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], operator18 func(Observable[R]) Observable[S], operator19 func(Observable[S]) Observable[T], ) func(Observable[A]) Observable[T]
PipeOp19 is similar to Pipe19, but can be used as an operator.
func PipeOp20 ΒΆ
func PipeOp20[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], operator18 func(Observable[R]) Observable[S], operator19 func(Observable[S]) Observable[T], operator20 func(Observable[T]) Observable[U], ) func(Observable[A]) Observable[U]
PipeOp20 is similar to Pipe20, but can be used as an operator.
func PipeOp21 ΒΆ
func PipeOp21[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], operator18 func(Observable[R]) Observable[S], operator19 func(Observable[S]) Observable[T], operator20 func(Observable[T]) Observable[U], operator21 func(Observable[U]) Observable[V], ) func(Observable[A]) Observable[V]
PipeOp21 is similar to Pipe21, but can be used as an operator.
func PipeOp22 ΒΆ
func PipeOp22[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], operator18 func(Observable[R]) Observable[S], operator19 func(Observable[S]) Observable[T], operator20 func(Observable[T]) Observable[U], operator21 func(Observable[U]) Observable[V], operator22 func(Observable[V]) Observable[W], ) func(Observable[A]) Observable[W]
PipeOp22 is similar to Pipe22, but can be used as an operator.
func PipeOp23 ΒΆ
func PipeOp23[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], operator18 func(Observable[R]) Observable[S], operator19 func(Observable[S]) Observable[T], operator20 func(Observable[T]) Observable[U], operator21 func(Observable[U]) Observable[V], operator22 func(Observable[V]) Observable[W], operator23 func(Observable[W]) Observable[X], ) func(Observable[A]) Observable[X]
PipeOp23 is similar to Pipe23, but can be used as an operator.
func PipeOp24 ΒΆ
func PipeOp24[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], operator18 func(Observable[R]) Observable[S], operator19 func(Observable[S]) Observable[T], operator20 func(Observable[T]) Observable[U], operator21 func(Observable[U]) Observable[V], operator22 func(Observable[V]) Observable[W], operator23 func(Observable[W]) Observable[X], operator24 func(Observable[X]) Observable[Y], ) func(Observable[A]) Observable[Y]
PipeOp24 is similar to Pipe24, but can be used as an operator.
func PipeOp25 ΒΆ
func PipeOp25[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y, Z any]( operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], operator18 func(Observable[R]) Observable[S], operator19 func(Observable[S]) Observable[T], operator20 func(Observable[T]) Observable[U], operator21 func(Observable[U]) Observable[V], operator22 func(Observable[V]) Observable[W], operator23 func(Observable[W]) Observable[X], operator24 func(Observable[X]) Observable[Y], operator25 func(Observable[Y]) Observable[Z], ) func(Observable[A]) Observable[Z]
PipeOp25 is similar to Pipe25, but can be used as an operator.
func RaceWith ΒΆ
func RaceWith[T any](sources ...Observable[T]) func(Observable[T]) Observable[T]
RaceWith creates an Observable that mirrors the first source Observable to emit a next, error or complete notification from the combination of the Observable to which the operator is applied and supplied Observables. It cancels the subscriptions to all other Observables. It completes when the source Observable completes. If the source Observable errors, it errors with the same error.
It is a curried function that takes the other Observables as arguments. Play: https://go.dev/play/p/5VzGFd62SMC
Example (Error) ΒΆ
observable := Race( Delay[int](50*time.Millisecond)(Throw[int](assert.AnError)), Delay[int](20*time.Millisecond)(Just(4, 5, 6)), Delay[int](100*time.Millisecond)(Just(7, 8, 9)), ) subscription := observable.Subscribe(PrintObserver[int]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 4 Next: 5 Next: 6 Completed
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3), RaceWith( Delay[int](50*time.Millisecond)(Just(4, 5, 6)), Delay[int](100*time.Millisecond)(Just(7, 8, 9)), ), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe() time.Sleep(150 * time.Millisecond)
Output: Next: 1 Next: 2 Next: 3 Completed
func Reduce ΒΆ
func Reduce[T, R any](accumulator func(agg R, item T) R, seed R) func(Observable[T]) Observable[R]
Reduce applies an accumulator function over the source Observable, and emits the result when the source completes. It takes a seed value as the initial accumulator value. Play: https://go.dev/play/p/GpOF9eNpA5w
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Reduce(func(agg, current int) int {
return agg + current
}, 42),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
Just(1, 2, 3, 4, 5),
Reduce(func(agg, current int) int {
return agg + current
}, 42),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 57 Completed
func ReduceI ΒΆ
func ReduceI[T, R any](accumulator func(agg R, item T, index int64) R, seed R) func(Observable[T]) Observable[R]
ReduceI applies an accumulator function over the source Observable, and emits the result when the source completes. It takes a seed value as the initial accumulator value.
func ReduceIWithContext ΒΆ
func ReduceIWithContext[T, R any](accumulator func(ctx context.Context, agg R, item T, index int64) (context.Context, R), seed R) func(Observable[T]) Observable[R]
ReduceIWithContext applies an accumulator function over the source Observable, and emits the result when the source completes. It takes a seed value as the initial accumulator value. Play: https://go.dev/play/p/WALnb341F4U
func ReduceWithContext ΒΆ
func ReduceWithContext[T, R any](accumulator func(ctx context.Context, agg R, item T) (context.Context, R), seed R) func(Observable[T]) Observable[R]
ReduceWithContext applies an accumulator function over the source Observable, and emits the result when the source completes. It takes a seed value as the initial accumulator value.
func RepeatWith ΒΆ
func RepeatWith[T any](count int64) func(Observable[T]) Observable[T]
RepeatWith repeats the source Observable a specified number of times. This is a pipeable operator. The creation operator equivalent is `Repeat`.
The destination is flatten. Play: https://go.dev/play/p/fEKtAX9_nYe
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
RepeatWith[int](3),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3), RepeatWith[int](3), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 1 Next: 2 Next: 3 Next: 1 Next: 2 Next: 3 Completed
func Retry ΒΆ
func Retry[T any]() func(Observable[T]) Observable[T]
Retry resubscribes to the source observable when it encounters an error. It will retry infinitely. If you want to limit the number of retries, use RetryWithConfig. Play: https://go.dev/play/p/Llj9dT9Y3Z2
func RetryWithConfig ΒΆ
func RetryWithConfig[T any](opts RetryConfig) func(Observable[T]) Observable[T]
RetryWithConfig resubscribes to the source observable when it encounters an error. If a max number of retries is set, it will retry until the max number of retries is reached. If a delay is set, it will wait before retrying. If resetOnSuccess is set, it will reset the number of retries when a value is emitted. Play: https://go.dev/play/p/GilWi5xG0lr
Example ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
observer.Complete()
return nil
}),
RetryWithConfig[int](RetryConfig{
MaxRetries: 1,
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
func Round ΒΆ
func Round() func(Observable[float64]) Observable[float64]
Round emits the rounded values emitted by the source Observable. Play: https://go.dev/play/p/aXwxpsJq_BQ
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[float64]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Round(),
)
subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just[float64](1, 2, 3, 4, 5), Round(), ) subscription := observable.Subscribe(PrintObserver[float64]()) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Completed
func SampleTime ΒΆ
func SampleTime[T any](interval time.Duration) func(Observable[T]) Observable[T]
SampleTime emits the most recently emitted value from the source Observable within periodic time intervals.
Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from this operator will emit no item for that sampling period. Play: https://go.dev/play/p/PcPo4lE9-_T
func SampleWhen ΒΆ
func SampleWhen[T, t any](tick Observable[t]) func(Observable[T]) Observable[T]
SampleWhen emits the most recently emitted value from the source Observable within a period determined by another Observable?
Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from this operator will emit no item for that sampling period. Play: https://go.dev/play/p/tr4FEd-CSce
func Scan ΒΆ
func Scan[T, R any](reduce func(accumulator R, item T) R, seed R) func(Observable[T]) Observable[R]
Scan applies an accumulator function over an Observable and emits each intermediate result. Play: https://go.dev/play/p/gAzVq-a0Jiz
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Scan(func(agg, current int) int {
return agg + current
}, 42),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 43 Next: 45 Next: 48 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
Just(1, 2, 3, 4, 5),
Scan(func(agg, current int) int {
return agg + current
}, 42),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 43 Next: 45 Next: 48 Next: 52 Next: 57 Completed
func ScanI ΒΆ
func ScanI[T, R any](reduce func(accumulator R, item T, index int64) R, seed R) func(Observable[T]) Observable[R]
ScanI applies an accumulator function over an Observable and emits each intermediate result.
func ScanIWithContext ΒΆ
func ScanIWithContext[T, R any](reduce func(ctx context.Context, accumulator R, item T, index int64) (context.Context, R), seed R) func(Observable[T]) Observable[R]
ScanIWithContext applies an accumulator function over an Observable and emits each intermediate result. Play: https://go.dev/play/p/BG6OmY35v4x
func ScanWithContext ΒΆ
func ScanWithContext[T, R any](reduce func(ctx context.Context, accumulator R, item T) (context.Context, R), seed R) func(Observable[T]) Observable[R]
ScanWithContext applies an accumulator function over an Observable and emits each intermediate result.
func SequenceEqual ΒΆ
func SequenceEqual[T comparable](obsB Observable[T]) func(Observable[T]) Observable[bool]
SequenceEqual determines whether two observable sequences are equal by comparing the elements pairwise. Play: https://go.dev/play/p/cBIQlH01byQ
func Serialize ΒΆ
func Serialize[T any]() func(Observable[T]) Observable[T]
Serialize ensures thread-safe message passing by wrapping any observable in a ro.SafeObservable implementation.
func Share ΒΆ
func Share[T any]() func(Observable[T]) Observable[T]
Share creates a new Observable that multicasts (shares) the original Observable. As long as there is at least one subscription to the multicasted Observable, the source Observable will be subscribed and emitting data. When all subscribers have unsubscribed, the source Observable will be unsubscribed.
This is an alias for ShareWithConfig with default configuration. Play: https://go.dev/play/p/C34fv02jAIH
func ShareReplay ΒΆ
func ShareReplay[T any](bufferSize int) func(Observable[T]) Observable[T]
ShareReplay creates a new Observable that multicasts (shares) the original Observable and replays a specified number of items to any future subscribers. As long as there is at least one subscription to the multicasted Observable, the source Observable will be subscribed and emitting data. When all subscribers have unsubscribed, the source Observable will be unsubscribed.
This is an alias for ShareReplayWithConfig with default configuration. Play: https://go.dev/play/p/QmsDbChzRgu
func ShareReplayWithConfig ΒΆ
func ShareReplayWithConfig[T any](bufferSize int, config ShareReplayConfig) func(Observable[T]) Observable[T]
ShareReplayWithConfig creates a new Observable that multicasts (shares) the original Observable and replays a specified number of items to any future subscribers. As long as there is at least one subscription to the multicasted Observable, the source Observable will be subscribed and emitting data. When all subscribers have unsubscribed, the source Observable will be unsubscribed.
The configuration allows to customize the behavior of the shared Observable:
- `bufferSize` is the number of items to replay to future subscribers.
- `ResetOnRefCountZero` determines whether the shared Observable should be reset when the reference count reaches zero.
func ShareWithConfig ΒΆ
func ShareWithConfig[T any](config ShareConfig[T]) func(Observable[T]) Observable[T]
ShareWithConfig creates a new Observable that multicasts (shares) the original Observable. As long as there is at least one subscription to the multicasted Observable, the source Observable will be subscribed and emitting data. When all subscribers have unsubscribed, the source Observable will be unsubscribed.
The configuration allows to customize the behavior of the shared Observable:
- `Connector` is a factory function that creates a new Subject for each subscription. The Subject can be any type of Subject, such as a ReplaySubject, a BehaviorSubject, a ReplaySubject, etc.
- `ResetOnError` determines whether the shared Observable should be reset when an error is emitted.
- `ResetOnComplete` determines whether the shared Observable should be reset when it completes.
- `ResetOnRefCountZero` determines whether the shared Observable should be reset when the reference count reaches zero.
func Skip ΒΆ
func Skip[T any](count int64) func(Observable[T]) Observable[T]
Skip suppresses the first n items emitted by an Observable. If the count is greater than the number of items emitted by the source Observable, Skip will not emit any items. If the count is zero, Skip will emit all items. Play: https://go.dev/play/p/AAEJaUZJuIj
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Skip[int](2),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), Skip[int](2), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 3 Next: 4 Next: 5 Completed
func SkipLast ΒΆ
func SkipLast[T any](count int) func(Observable[T]) Observable[T]
SkipLast suppresses the last n items emitted by an Observable. If the count is greater than the number of items emitted by the source Observable, SkipLast will not emit any items. If the count is zero, SkipLast will emit all items. Play: https://go.dev/play/p/gire30ONRBB
Example (Empty) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), SkipLast[int](10), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Completed
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
SkipLast[int](2),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), SkipLast[int](2), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Completed
func SkipUntil ΒΆ
func SkipUntil[T, S any](signal Observable[S]) func(Observable[T]) Observable[T]
SkipUntil suppresses items emitted by an Observable until a second Observable emits an item or completes. It will then emit all the subsequent items. If the second Observable is empty, SkipUntil will not emit any items. If the second Observable emits an item or completes, SkipUntil will emit all items.
Example (Empty) ΒΆ
observable := Pipe1( RangeWithInterval(0, 5, 10*time.Millisecond), SkipUntil[int64](Interval(100*time.Millisecond)), ) subscription := observable.Subscribe(PrintObserver[int64]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Completed
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
go func() {
time.Sleep(30 * time.Millisecond)
observer.Next(1)
time.Sleep(30 * time.Millisecond)
observer.Next(2)
time.Sleep(30 * time.Millisecond)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
}()
return nil
}),
SkipUntil[int](Interval(45*time.Millisecond)),
)
subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( RangeWithInterval(0, 5, 40*time.Millisecond), SkipUntil[int64](Interval(100*time.Millisecond)), ) subscription := observable.Subscribe(PrintObserver[int64]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 2 Next: 3 Next: 4 Completed
func SkipWhile ΒΆ
func SkipWhile[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]
SkipWhile skips items emitted by an Observable until a specified condition becomes false. It will then emit all the subsequent items. If the condition is never false, SkipWhile will not emit any items. If the condition is false on the first item, SkipWhile will emit all items.
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
SkipWhile(func(v int) bool {
return v <= 2
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
Just(1, 2, 3, 4, 5),
SkipWhile(func(v int) bool {
return v <= 2
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 3 Next: 4 Next: 5 Completed
func SkipWhileI ΒΆ
func SkipWhileI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]
SkipWhileI skips items emitted by an Observable until a specified condition becomes false. It will then emit all the subsequent items. If the condition is never false, SkipWhile will not emit any items. If the condition is false on the first item, SkipWhile will emit all items.
func SkipWhileIWithContext ΒΆ
func SkipWhileIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool)) func(Observable[T]) Observable[T]
SkipWhileIWithContext skips items emitted by an Observable until a specified condition becomes false. It will then emit all the subsequent items. If the condition is never false, SkipWhile will not emit any items. If the condition is false on the first item, SkipWhile will emit all items. Play: https://go.dev/play/p/oYUQuPWIytL
func SkipWhileWithContext ΒΆ
func SkipWhileWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool)) func(Observable[T]) Observable[T]
SkipWhileWithContext skips items emitted by an Observable until a specified condition becomes false. It will then emit all the subsequent items. If the condition is never false, SkipWhile will not emit any items. If the condition is false on the first item, SkipWhile will emit all items.
func StartWith ΒΆ
func StartWith[T any](prefixes ...T) func(Observable[T]) Observable[T]
StartWith emits the given values before emitting the values from the source Observable. Play: https://go.dev/play/p/vS_gIw8Ce1C
Example (Error) ΒΆ
observable := Pipe1( Throw[int](assert.AnError), StartWith(1, 2, 3), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(4, 5, 6), StartWith(1, 2, 3), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Next: 6 Completed
func SubscribeOn ΒΆ
func SubscribeOn[T any](bufferSize int) func(Observable[T]) Observable[T]
SubscribeOn schedule the upstream flow to a different goroutine. Next, Error and Complete notifications are sent to a queue first, then the consumer consume this queue. SubscribeOn converts a push-based Observable into a pullable stream with backpressure capabilities.
To schedule the downstream flow to a different goroutine, refer to SubscribeOn.
When an Observable emits values faster than they can be consumed, SubscribeOn buffers these values in a queue of specified capacity. This allows downstream consumers to pull values at their own pace while managing backpressure from upstream emissions.
Note: Once the buffer reaches its capacity, upstream emissions will block until space becomes available, effectively implementing backpressure control.
@TODO: add a backpressure policy ? drop vs block. Play: https://go.dev/play/p/WrsTUq6yxtO
func Sum ΒΆ
func Sum[T constraints.Numeric]() func(Observable[T]) Observable[T]
Sum calculates the sum of the values emitted by the source Observable. It emits the sum when the source completes. Play: https://go.dev/play/p/b3rRlI80igo
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Sum[int](),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), Sum[int](), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 15 Completed
func Tail ΒΆ
func Tail[T any]() func(Observable[T]) Observable[T]
Tail emits only the last item emitted by an Observable. If the source Observable is empty, Tail will emit an error.
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Tail[int](),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), Tail[int](), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 5 Completed
func Take ΒΆ
func Take[T any](count int64) func(Observable[T]) Observable[T]
Take emits only the first n items emitted by an Observable. If the count is greater than the number of items emitted by the source Observable, Take will emit all items. If the count is zero, Take will not emit any items. Play: https://go.dev/play/p/IC_hJMsg7yk
Example (Error1) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Take[int](5),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Error2) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Take[int](2),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Completed
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), Take[int](2), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Completed
func TakeLast ΒΆ
func TakeLast[T any](count int) func(Observable[T]) Observable[T]
TakeLast emits only the last n items emitted by an Observable. If the count is greater than the number of items emitted by the source Observable, TakeLast will emit all items. If the count is zero, TakeLast will not emit any items. Play: https://go.dev/play/p/J0mX3NpEHzy
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
TakeLast[int](2),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), TakeLast[int](2), ) subscription := observable.Subscribe(PrintObserver[int]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 4 Next: 5 Completed
func TakeUntil ΒΆ
func TakeUntil[T, S any](signal Observable[S]) func(Observable[T]) Observable[T]
TakeUntil emits items emitted by an Observable until a second Observable emits an item or completes. It will then complete. If the second Observable is empty, TakeUntil will emit all items. If the second Observable emits an item or completes, TakeUntil will emit all items. If the second Observable emits an item or completes, TakeUntil will complete. Play: https://go.dev/play/p/nhgYGyREW1r
Example (Empty) ΒΆ
observable := Pipe1( RangeWithInterval(0, 5, 50*time.Millisecond), TakeUntil[int64](Interval(10*time.Millisecond)), ) subscription := observable.Subscribe(PrintObserver[int64]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Completed
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
go func() {
time.Sleep(20 * time.Millisecond)
observer.Next(1)
time.Sleep(20 * time.Millisecond)
observer.Next(2)
time.Sleep(20 * time.Millisecond)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
}()
return nil
}),
TakeUntil[int](Interval(50*time.Millisecond)),
)
subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 1 Next: 2 Completed
Example (Ok) ΒΆ
observable := Pipe1( RangeWithInterval(0, 5, 40*time.Millisecond), TakeUntil[int64](Interval(100*time.Millisecond)), ) subscription := observable.Subscribe(PrintObserver[int64]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 0 Next: 1 Completed
func TakeWhile ΒΆ
func TakeWhile[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]
TakeWhile emits items emitted by an Observable so long as a specified condition is true. It will then complete. If the condition is never true, TakeWhile will not emit any items. If the condition is true on the first item, TakeWhile will emit all items. If the condition is false on the first item, TakeWhile will not emit any items. Play: https://go.dev/play/p/lxV03GzOa2J
Example (Error1) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
TakeWhile(func(n int) bool {
return n < 5
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Error2) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
TakeWhile(func(n int) bool {
return n < 3
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Completed
Example (Ok) ΒΆ
observable := Pipe1(
Just(1, 2, 3, 4, 5),
TakeWhile(func(n int) bool {
return n < 3
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Completed
func TakeWhileI ΒΆ
func TakeWhileI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]
TakeWhileI emits items emitted by an Observable so long as a specified condition is true. It will then complete. If the condition is never true, TakeWhile will not emit any items. If the condition is true on the first item, TakeWhile will emit all items. If the condition is false on the first item, TakeWhile will not emit any items.
func TakeWhileIWithContext ΒΆ
func TakeWhileIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool)) func(Observable[T]) Observable[T]
TakeWhileIWithContext emits items emitted by an Observable so long as a specified condition is true. It will then complete. If the condition is never true, TakeWhile will not emit any items. If the condition is true on the first item, TakeWhile will emit all items. If the condition is false on the first item, TakeWhile will not emit any items.
func TakeWhileWithContext ΒΆ
func TakeWhileWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool)) func(Observable[T]) Observable[T]
TakeWhileWithContext emits items emitted by an Observable so long as a specified condition is true. It will then complete. If the condition is never true, TakeWhile will not emit any items. If the condition is true on the first item, TakeWhile will emit all items. If the condition is false on the first item, TakeWhile will not emit any items.
func Tap ΒΆ
func Tap[T any](onNext func(value T), onError func(err error), onComplete func()) func(Observable[T]) Observable[T]
Tap allows you to perform side effects for notifications from the source Observable without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Play: https://go.dev/play/p/oDI3d6553MI
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Tap(
func(value int) {
fmt.Printf("Next: %v\n", value)
},
func(err error) {
fmt.Printf("Error: %s\n", err.Error())
},
func() {
fmt.Printf("Completed\n")
},
),
)
subscription := observable.Subscribe(NoopObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
Range(1, 4),
Tap(
func(value int64) {
fmt.Printf("Next: %v\n", value)
},
func(err error) {
fmt.Printf("Error: %s\n", err.Error())
},
func() {
fmt.Printf("Completed\n")
},
),
)
subscription := observable.Subscribe(NoopObserver[int64]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Completed
func TapOnComplete ΒΆ
func TapOnComplete[T any](onComplete func()) func(Observable[T]) Observable[T]
TapOnComplete allows you to perform side effects for Complete notifications from the source Observable without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Play: https://go.dev/play/p/z1sntT6bplM
Example (Error) ΒΆ
observable := Pipe2(
Throw[int](assert.AnError),
Delay[int](10*time.Millisecond),
TapOnComplete[int](func() { fmt.Printf("Completed") }),
)
subscription := observable.Subscribe(NoopObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Example (Ok) ΒΆ
observable := Pipe1(
Range(1, 4),
TapOnComplete[int64](func() { fmt.Printf("Completed") }),
)
subscription := observable.Subscribe(NoopObserver[int64]())
defer subscription.Unsubscribe()
Output: Completed
func TapOnCompleteWithContext ΒΆ
func TapOnCompleteWithContext[T any](onComplete func(ctx context.Context)) func(Observable[T]) Observable[T]
TapOnCompleteWithContext allows you to perform side effects for Complete notifications from the source Observable without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Play: https://go.dev/play/p/3k25j_D1OTW
func TapOnError ΒΆ
func TapOnError[T any](onError func(err error)) func(Observable[T]) Observable[T]
TapOnError allows you to perform side effects for Error notifications from the source Observable without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Play: https://go.dev/play/p/oDI3d6553MI
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
TapOnError[int](func(err error) { fmt.Printf("Error: %s\n", err.Error()) }),
)
subscription := observable.Subscribe(NoopObserver[int]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
Range(1, 4),
TapOnError[int64](func(err error) { fmt.Printf("Error: %s\n", err.Error()) }),
)
subscription := observable.Subscribe(NoopObserver[int64]())
defer subscription.Unsubscribe()
func TapOnErrorWithContext ΒΆ
func TapOnErrorWithContext[T any](onError func(ctx context.Context, err error)) func(Observable[T]) Observable[T]
TapOnErrorWithContext allows you to perform side effects for Error notifications from the source Observable without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer.
func TapOnFinalize ΒΆ
func TapOnFinalize[T any](onFinalize func()) func(Observable[T]) Observable[T]
TapOnFinalize allows you to perform side effects when the source Observable is unsubscribed from without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Play: https://go.dev/play/p/VEACE_KhdvU
func TapOnNext ΒΆ
func TapOnNext[T any](onNext func(value T)) func(Observable[T]) Observable[T]
TapOnNext allows you to perform side effects for Next notifications from the source Observable without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Play: https://go.dev/play/p/oDI3d6553MI
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int64]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
TapOnNext(func(v int64) { fmt.Println("Next:", v) }),
)
subscription := observable.Subscribe(NoopObserver[int64]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3
Example (Ok) ΒΆ
observable := Pipe1(
Range(1, 4),
TapOnNext(func(v int64) { fmt.Println("Next:", v) }),
)
subscription := observable.Subscribe(NoopObserver[int64]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3
func TapOnNextWithContext ΒΆ
func TapOnNextWithContext[T any](onNext func(ctx context.Context, value T)) func(Observable[T]) Observable[T]
TapOnNextWithContext allows you to perform side effects for Next notifications from the source Observable Play: https://go.dev/play/p/oDI3d6553MI without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer.
func TapOnSubscribe ΒΆ
func TapOnSubscribe[T any](onSubscribe func()) func(Observable[T]) Observable[T]
TapOnSubscribe allows you to perform side effects when the source Observable is subscribed to without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Play: https://go.dev/play/p/0YzsxpRkO4T
func TapOnSubscribeWithContext ΒΆ
func TapOnSubscribeWithContext[T any](onSubscribe func(ctx context.Context)) func(Observable[T]) Observable[T]
TapOnSubscribeWithContext allows you to perform side effects when the source Observable is subscribed to without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer.
func TapWithContext ΒΆ
func TapWithContext[T any](onNext func(ctx context.Context, value T), onError func(ctx context.Context, err error), onComplete func(ctx context.Context)) func(Observable[T]) Observable[T]
TapWithContext allows you to perform side effects for notifications from the source Observable without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Play: https://go.dev/play/p/oDI3d6553MI
func ThrottleTime ΒΆ
func ThrottleTime[T any](interval time.Duration) func(Observable[T]) Observable[T]
ThrottleTime emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process. Play: https://go.dev/play/p/ITogsevmh88
func ThrottleWhen ΒΆ
func ThrottleWhen[T, t any](tick Observable[t]) func(Observable[T]) Observable[T]
ThrottleWhen emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process. Play: https://go.dev/play/p/q3ISV03EL3q
func ThrowIfEmpty ΒΆ
func ThrowIfEmpty[T any](throw func() error) func(Observable[T]) Observable[T]
ThrowIfEmpty throws an error if the source observable is empty. It will throw the error returned by the throw function. If the source observable emits a value, it will complete. If the source observable emits an error, it will propagate the error. Play: https://go.dev/play/p/mLCaC7p_6p4
Example ΒΆ
observable := Pipe1(
Empty[int](),
ThrowIfEmpty[int](func() error {
return errors.New("empty")
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Error: empty
func ThrowOnContextCancel ΒΆ
func ThrowOnContextCancel[T any]() func(Observable[T]) Observable[T]
ThrowOnContextCancel returns an Observable that emits the same items as the source Observable, but throws an error if the context is canceled. This operator should be chained after an operator such as ContextWithTimeout or ContextWithDeadline. Play: https://go.dev/play/p/K9oGdZFa-b1
func TimeInterval ΒΆ
func TimeInterval[T any]() func(Observable[T]) Observable[IntervalValue[T]]
TimeInterval emits the values emitted by the source Observable with the time elapsed between each emission. Play: https://go.dev/play/p/VX73ZL74hPk
Example ΒΆ
observable := Pipe1( RangeWithInterval(0, 3, 10*time.Millisecond), TimeInterval[int64](), ) subscription := observable.Subscribe(NoopObserver[IntervalValue[int64]]()) defer subscription.Unsubscribe()
func Timeout ΒΆ
func Timeout[T any](duration time.Duration) func(Observable[T]) Observable[T]
Timeout raises an error if the source Observable does not emit any item within the specified duration. Play: https://go.dev/play/p/t0xKoj-_AqZ
Example (Error) ΒΆ
subscription := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
go func() {
observer.Next(1)
time.Sleep(100 * time.Millisecond)
observer.Next(2)
time.Sleep(100 * time.Millisecond)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
}()
return nil
}),
Timeout[int](50*time.Millisecond),
).Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 1 Error: ro.Timeout: timeout after 50ms
Example (Ok) ΒΆ
observable := Pipe1( Range(1, 4), Timeout[int64](20*time.Millisecond), ) subscription := observable.Subscribe(PrintObserver[int64]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 1 Next: 2 Next: 3 Completed
func Timestamp ΒΆ
func Timestamp[T any]() func(Observable[T]) Observable[TimestampValue[T]]
Timestamp emits the values emitted by the source Observable with the time elapsed since the source Observable was subscribed to. Play: https://go.dev/play/p/cDiCr6qIE2P
Example ΒΆ
observable := Pipe1( RangeWithInterval(0, 3, 10*time.Millisecond), Timestamp[int64](), ) subscription := observable.Subscribe(NoopObserver[TimestampValue[int64]]()) defer subscription.Unsubscribe()
func ToChannel ΒΆ
func ToChannel[T any](size int) func(Observable[T]) Observable[<-chan Notification[T]]
ToChannel materializes and forward all items from the observable into a channel. It is a sink operator so it emit a single value. It emits the channel when the source completes. If the source is empty, it emits an empty channel. The channel will be closed when the source completes or emit an error. Play: https://go.dev/play/p/WMKa26sirV0
Example (Error) ΒΆ
observable := Pipe3(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
ToChannel[int](42),
Map(lo.ChannelToSlice[Notification[int]]),
Flatten[Notification[int]](),
)
subscription := observable.Subscribe(PrintObserver[Notification[int]]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: Next(1) Next: Next(2) Next: Next(3) Next: Error(assert.AnError general error for testing) Completed
Example (Ok) ΒΆ
observable := Pipe3( Just(1, 2, 3, 4, 5), ToChannel[int](42), Map(lo.ChannelToSlice[Notification[int]]), Flatten[Notification[int]](), ) subscription := observable.Subscribe(PrintObserver[Notification[int]]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: Next(1) Next: Next(2) Next: Next(3) Next: Next(4) Next: Next(5) Next: Complete() Completed
func ToMap ΒΆ
func ToMap[T any, K comparable, V any](project func(item T) (K, V)) func(Observable[T]) Observable[map[K]V]
ToMap collects all items from the observable into a map. It is a sink operator so it emit a single value. It emits the map when the source completes. If the source is empty, it emits an empty map. Play: https://go.dev/play/p/FiF83XYB0ba
Example (Error) ΒΆ
mapper := func(v int) (string, string) {
return strconv.FormatInt(int64(v), 10), strconv.FormatInt(int64(v), 10)
}
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
ToMap(mapper),
)
subscription := observable.Subscribe(PrintObserver[map[string]string]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
mapper := func(v int) (string, string) {
return strconv.FormatInt(int64(v), 10), strconv.FormatInt(int64(v), 10)
}
observable := Pipe1(
Just(1, 2, 3, 4, 5),
ToMap(mapper),
)
subscription := observable.Subscribe(PrintObserver[map[string]string]())
defer subscription.Unsubscribe()
Output: Next: map[1:1 2:2 3:3 4:4 5:5] Completed
func ToMapI ΒΆ
func ToMapI[T any, K comparable, V any](mapper func(item T, index int64) (K, V)) func(Observable[T]) Observable[map[K]V]
ToMapI collects all items from the observable into a map. It is a sink operator so it emit a single value. It emits the map when the source completes. If the source is empty, it emits an empty map. Play: https://go.dev/play/p/FiF83XYB0ba
func ToMapIWithContext ΒΆ
func ToMapIWithContext[T any, K comparable, V any](mapper func(ctx context.Context, item T, index int64) (K, V)) func(Observable[T]) Observable[map[K]V]
ToMapIWithContext collects all items from the observable into a map. It is a sink operator so it emit a single value. It emits the map when the source completes. If the source is empty, it emits an empty map. Play: https://go.dev/play/p/FiF83XYB0ba
func ToMapWithContext ΒΆ
func ToMapWithContext[T any, K comparable, V any](project func(ctx context.Context, item T) (K, V)) func(Observable[T]) Observable[map[K]V]
ToMapWithContext collects all items from the observable into a map. It is a sink operator so it emit a single value. It emits the map when the source completes. If the source is empty, it emits an empty map. Play: https://go.dev/play/p/FiF83XYB0ba
func ToSlice ΒΆ
func ToSlice[T any]() func(Observable[T]) Observable[[]T]
ToSlice collects all items from the observable into a slice. It is a sink operator so it emit a single value. It emits the slice when the source completes. If the source is empty, it emits an empty slice. Play: https://go.dev/play/p/kxbU_PzpN6t
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
ToSlice[int](),
)
subscription := observable.Subscribe(PrintObserver[[]int]())
defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), ToSlice[int](), ) subscription := observable.Subscribe(PrintObserver[[]int]()) defer subscription.Unsubscribe()
Output: Next: [1 2 3 4 5] Completed
func Trunc ΒΆ
func Trunc() func(Observable[float64]) Observable[float64]
Trunc emits the truncated values emitted by the source Observable. Play: https://go.dev/play/p/iYc9oGDgRZJ
Example (Error) ΒΆ
observable := Pipe1(
NewObservable(func(observer Observer[float64]) Teardown {
observer.Next(1.1)
observer.Next(2.5)
observer.Next(3.9)
observer.Error(assert.AnError)
observer.Next(4)
return nil
}),
Trunc(),
)
subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just(1.1, 2.4, 3.5, 4.9, 5.0), Trunc(), ) subscription := observable.Subscribe(PrintObserver[float64]()) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Completed
func While ΒΆ
func While[T any](condition func() bool) func(Observable[T]) Observable[T]
While repeats the source observable while the condition is true. It will complete when the condition is false. It will not emit any values if the source observable is empty. It will not emit any values if the source observable emits an error. Play: https://go.dev/play/p/hMj3DBVtp73
Example ΒΆ
i := 0
observable := Pipe1(
Just(1, 2, 3),
While[int](func() bool {
i++
return i < 2
}),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Completed
func WhileI ΒΆ
func WhileI[T any](condition func(index int64) bool) func(Observable[T]) Observable[T]
WhileI repeats the source observable while the condition is true. It will complete when the condition is false. It will not emit any values if the source observable is empty. It will not emit any values if the source observable emits an error. Play: https://go.dev/play/p/9aAuzAspyMc
func WhileIWithContext ΒΆ
func WhileIWithContext[T any](condition func(ctx context.Context, index int64) (context.Context, bool)) func(Observable[T]) Observable[T]
WhileIWithContext repeats the source observable while the condition is true. It will complete when the condition is false. It will not emit any values if the source observable is empty. It will not emit any values if the source observable emits an error. Play: https://go.dev/play/p/xTpqdGSxOxw
func WhileWithContext ΒΆ
func WhileWithContext[T any](condition func(ctx context.Context) (context.Context, bool)) func(Observable[T]) Observable[T]
WhileWithContext repeats the source observable while the condition is true. It will complete when the condition is false. It will not emit any values if the source observable is empty. It will not emit any values if the source observable emits an error.
func WindowWhen ΒΆ
func WindowWhen[T, B any](boundary Observable[B]) func(Observable[T]) Observable[Observable[T]]
WindowWhen emits an Observable that represents a window of items emitted by the source Observable. The window emits items when the specified boundary Observable emits an item. The window closes and a new window opens when the boundary Observable emits an item. If the source Observable completes, the window emits the complete notification and the complete notification is propagated. If the boundary Observable completes, the window emits the complete notification and the complete notification is propagated. Play: https://go.dev/play/p/vK0elE-rPbl
func ZipAll ΒΆ
func ZipAll[T any]() func(Observable[Observable[T]]) Observable[[]T]
ZipAll combines the values from the source Observable with the latest values from the other Observables. It emits only when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/FcpgTItKX-Q
Example (Error) ΒΆ
observable := Pipe1( Just( Range(1, 3), Throw[int64](assert.AnError), Range(100, 3), ), ZipAll[int64](), ) subscription := observable.Subscribe(PrintObserver[[]int64]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1( Just( Range(1, 3), Range(10, 13), Range(100, 103), ), ZipAll[int64](), ) subscription := observable.Subscribe(PrintObserver[[]int64]()) defer subscription.Unsubscribe()
Output: Next: [1 10 100] Next: [2 11 101] Completed
func ZipWith ΒΆ
func ZipWith[A, B any](obsB Observable[B]) func(Observable[A]) Observable[lo.Tuple2[A, B]]
ZipWith combines the values from the source Observable with the latest values from the other Observables. It emits only when all Observables have emitted at least one value. It completes when the source Observable completes.
It is a curried function that takes the other Observable as an argument. Play: https://go.dev/play/p/RmErtE3pHjb
Example (Error) ΒΆ
observable := ZipWith2[int]( Throw[int64](assert.AnError), Range(20, 23), )(Just(1, 2, 3)) subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int, int64, int64]]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := ZipWith2[int]( Range(10, 13), Range(20, 23), )(Just(1, 2, 3)) subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int, int64, int64]]()) defer subscription.Unsubscribe()
Output: Next: {1 10 20} Next: {2 11 21} Next: {3 12 22} Completed
func ZipWith1 ΒΆ
func ZipWith1[A, B any](obsB Observable[B]) func(Observable[A]) Observable[lo.Tuple2[A, B]]
ZipWith1 combines the values from the source Observable with the latest values from the other Observables. It emits only when all Observables have emitted at least one value. It completes when the source Observable completes.
It is a curried function that takes the other Observable as an argument.
Example (Error) ΒΆ
observable := ZipWith1[int](Throw[int64](assert.AnError))(Just(1, 2, 3)) subscription := observable.Subscribe(PrintObserver[lo.Tuple2[int, int64]]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := ZipWith1[int](Range(10, 13))(Just(1, 2, 3)) subscription := observable.Subscribe(PrintObserver[lo.Tuple2[int, int64]]()) defer subscription.Unsubscribe()
Output: Next: {1 10} Next: {2 11} Next: {3 12} Completed
func ZipWith2 ΒΆ
func ZipWith2[A, B, C any](obsB Observable[B], obsC Observable[C]) func(Observable[A]) Observable[lo.Tuple3[A, B, C]]
ZipWith2 combines the values from the source Observable with the latest values from the other Observables. It emits only when all Observables have emitted at least one value. It completes when the source Observable completes.
It is a curried function that takes the other Observable as an argument. Play: https://go.dev/play/p/MMq82Rkb0oh
Example (Error) ΒΆ
observable := ZipWith2[int](Throw[int64](assert.AnError), Range(20, 23))(Just(1, 2)) subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int, int64, int64]]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := ZipWith2[int](Range(10, 13), Range(20, 23))(Just(1, 2)) subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int, int64, int64]]()) defer subscription.Unsubscribe()
Output: Next: {1 10 20} Next: {2 11 21} Completed
func ZipWith3 ΒΆ
func ZipWith3[A, B, C, D any](obsB Observable[B], obsC Observable[C], obsD Observable[D]) func(Observable[A]) Observable[lo.Tuple4[A, B, C, D]]
ZipWith3 combines the values from the source Observable with the latest values from the other Observables. It emits only when all Observables have emitted at least one value. It completes when the source Observable completes.
It is a curried function that takes the other Observable as an argument.
Example (Error) ΒΆ
observable := ZipWith3[int](Throw[int64](assert.AnError), Range(20, 23), Range(30, 33))(Just(1)) subscription := observable.Subscribe(PrintObserver[lo.Tuple4[int, int64, int64, int64]]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := ZipWith3[int](Range(10, 13), Range(20, 23), Range(30, 33))(Just(1)) subscription := observable.Subscribe(PrintObserver[lo.Tuple4[int, int64, int64, int64]]()) defer subscription.Unsubscribe()
Output: Next: {1 10 20 30} Completed
func ZipWith4 ΒΆ
func ZipWith4[A, B, C, D, E any](obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E]) func(Observable[A]) Observable[lo.Tuple5[A, B, C, D, E]]
ZipWith4 combines the values from the source Observable with the latest values from the other Observables. It emits only when all Observables have emitted at least one value. It completes when the source Observable completes.
It is a curried function that takes the other Observable as an argument.
Example (Error) ΒΆ
observable := ZipWith4[int](Throw[int64](assert.AnError), Range(20, 23), Range(30, 33), Range(40, 43))(Just(1)) subscription := observable.Subscribe(PrintObserver[lo.Tuple5[int, int64, int64, int64, int64]]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := ZipWith4[int](Range(10, 13), Range(20, 23), Range(30, 33), Range(40, 43))(Just(1)) subscription := observable.Subscribe(PrintObserver[lo.Tuple5[int, int64, int64, int64, int64]]()) defer subscription.Unsubscribe()
Output: Next: {1 10 20 30 40} Completed
func ZipWith5 ΒΆ
func ZipWith5[A, B, C, D, E, F any](obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E], obsF Observable[F]) func(Observable[A]) Observable[lo.Tuple6[A, B, C, D, E, F]]
ZipWith5 combines the values from the source Observable with the latest values from the other Observables. It emits only when all Observables have emitted at least one value. It completes when the source Observable completes.
It is a curried function that takes the other Observable as an argument. Play: https://go.dev/play/p/OJz-AVo0-hY
Example (Error) ΒΆ
observable := ZipWith5[int](Throw[int64](assert.AnError), Range(20, 23), Range(30, 33), Range(40, 43), Range(50, 53))(Just(1)) subscription := observable.Subscribe(PrintObserver[lo.Tuple6[int, int64, int64, int64, int64, int64]]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := ZipWith5[int](Range(10, 13), Range(20, 23), Range(30, 33), Range(40, 43), Range(50, 53))(Just(1)) subscription := observable.Subscribe(PrintObserver[lo.Tuple6[int, int64, int64, int64, int64, int64]]()) defer subscription.Unsubscribe()
Output: Next: {1 10 20 30 40 50} Completed
Types ΒΆ
type Backpressure ΒΆ
type Backpressure int8
Backpressure is a type that represents the backpressure strategy to use.
const ( // BackpressureBlock blocks the source observable when the destination is not ready to receive more values. BackpressureBlock Backpressure = iota // BackpressureDrop drops the source observable when the destination is not ready to receive more values. BackpressureDrop )
type ConcurrencyMode ΒΆ
type ConcurrencyMode int8
ConcurrencyMode is a type that represents the concurrency mode to use.
const ( ConcurrencyModeSafe ConcurrencyMode = iota ConcurrencyModeUnsafe ConcurrencyModeEventuallySafe )
ConcurrencyModeSafe is a concurrency mode that is safe to use. Spinlock is ignored because it is too slow when chaining operators. Spinlock should be used only for short-lived local locks.
type ConnectableConfig ΒΆ
ConnectableConfig is the configuration for a ConnectableObservable.
type ConnectableObservable ΒΆ
type ConnectableObservable[T any] interface { Observable[T] // Connect connects the ConnectableObservable. When connected, the ConnectableObservable // will emit values to its observers. If the ConnectableObservable is already connected, // this method creates a new subscription and starts emitting values to its observers. // // The Connect method returns a Subscription that can be used to disconnect the // ConnectableObservable. The Subscription may be used to cancel the connection, // and to wait for the connection to complete. // // The Subscription might be already disposed when the Connect method returns. Connect() Subscription ConnectWithContext(ctx context.Context) Subscription }
ConnectableObservable is an Observable that can be connected and disconnected. When connected, it will emit values to its observers.
ConnectableObservable is useful when you want to share a single subscription to an Observable among multiple observers. This is useful when you want to multicast the values of an Observable.
func Connectable ΒΆ
func Connectable[T any](source Observable[T]) ConnectableObservable[T]
Connectable creates a new ConnectableObservable from an Observable. The ConnectableObservable will use the default connector, which is a PublishSubject. The ConnectableObservable will reset the source when disconnected. This means that when the ConnectableObservable is disconnected, it will create a new source when reconnected.
If you want to use a different connector or change the reset behavior, use ConnectableWithConfig.
func ConnectableWithConfig ΒΆ
func ConnectableWithConfig[T any](source Observable[T], config ConnectableConfig[T]) ConnectableObservable[T]
ConnectableWithConfig creates a new ConnectableObservable from an Observable. The ConnectableObservable will use the given connector. The ConnectableObservable will reset the source when disconnected if ResetOnDisconnect is true. This means that when the ConnectableObservable is disconnected, it will create a new source when reconnected.
func NewConnectableObservable ΒΆ
func NewConnectableObservable[T any](subscribe func(destination Observer[T]) Teardown) ConnectableObservable[T]
NewConnectableObservable creates a new ConnectableObservable. The subscribe function is called when the ConnectableObservable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the ConnectableObservable will not emit any more items.
The ConnectableObservable will use the default connector, which is a PublishSubject. The ConnectableObservable will reset the source when disconnected. This means that when the ConnectableObservable is disconnected, it will create a new source when reconnected.
If you want to use a different connector or change the reset behavior, use NewConnectableObservableWithConfig.
func NewConnectableObservableWithConfig ΒΆ
func NewConnectableObservableWithConfig[T any](subscribe func(destination Observer[T]) Teardown, config ConnectableConfig[T]) ConnectableObservable[T]
NewConnectableObservableWithConfig creates a new ConnectableObservable. The subscribe function is called when the ConnectableObservable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the ConnectableObservable will not emit any more items.
The ConnectableObservable will use the given connector. The ConnectableObservable will reset the source when disconnected if ResetOnDisconnect is true. This means that when the ConnectableObservable is disconnected, it will create a new source when reconnected.
func NewConnectableObservableWithConfigAndContext ΒΆ
func NewConnectableObservableWithConfigAndContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown, config ConnectableConfig[T]) ConnectableObservable[T]
NewConnectableObservableWithConfigAndContext creates a new ConnectableObservable. The subscribe function is called when the ConnectableObservable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the ConnectableObservable will not emit any more items.
The ConnectableObservable will use the given connector. The ConnectableObservable will reset the source when disconnected if ResetOnDisconnect is true. This means that when the ConnectableObservable is disconnected, it will create a new source when reconnected.
func NewConnectableObservableWithContext ΒΆ
func NewConnectableObservableWithContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown) ConnectableObservable[T]
NewConnectableObservableWithContext creates a new ConnectableObservable. The subscribe function is called when the ConnectableObservable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the ConnectableObservable will not emit any more items.
The ConnectableObservable will use the default connector, which is a PublishSubject. The ConnectableObservable will reset the source when disconnected. This means that when the ConnectableObservable is disconnected, it will create a new source when reconnected.
If you want to use a different connector or change the reset behavior, use NewConnectableObservableWithConfig.
type IntervalValue ΒΆ
IntervalValue is a value emitted by the `TimeInterval` operator.
type Kind ΒΆ
type Kind uint8
Kind represents the kind of a Notification. It can be Next, Error, or Complete.
type Notification ΒΆ
Notification represents a value emitted by an Observable. It can be a Next value, an Error, or a Complete signal. It is used to communicate between Observables and Observers. It is a generic type, so it can hold any value.
func NewNotificationComplete ΒΆ
func NewNotificationComplete[T any]() Notification[T]
NewNotificationComplete creates a new Notification with a Complete signal.
func NewNotificationError ΒΆ
func NewNotificationError[T any](err error) Notification[T]
NewNotificationError creates a new Notification with an Error.
func NewNotificationNext ΒΆ
func NewNotificationNext[T any](value T) Notification[T]
NewNotificationNext creates a new Notification with a Next value.
func (Notification[T]) String ΒΆ
func (n Notification[T]) String() string
type Observable ΒΆ
type Observable[T any] interface { // Subscribe subscribes an Observer to the Observable. The Observer will begin // to receive items emitted by the Observable. The Observer may receive any // number of items (including zero items), then may either complete or error, // but not both. Upon completion or error, the Observer will not receive any // more items. // // The Subscribe method returns a Subscription that can be used to unsubscribe // the Observer from the Observable. The Subscription may be used to cancel the // subscription, and to wait for the subscription to complete. // // The Subscription might be already disposed when the Subscribe method returns. // In this case, the Teardown function is not called. // // The Subscribe method may call the Observer's methods synchronously or // asynchronously. The Observer is responsible for handling concurrency and // synchronization. Subscribe(destination Observer[T]) Subscription SubscribeWithContext(ctx context.Context, destination Observer[T]) Subscription }
Observable is the producer of values. It is the source of values that are emitted to Observers. Observable is a representation of any set of values over any amount of time.
The primary method of an Observable is subscribe, which is used to attach an Observer to the Observable. Once an Observer is subscribed, the Observable may begin to emit items to the Observer. An Observable may emit any number of items (including zero items), then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.
An Observable may call an Observer's methods synchronously or asynchronously.
An Observable is not a stream. It is a factory for streams.
func Amb ΒΆ
func Amb[T any](sources ...Observable[T]) Observable[T]
Amb is an alias for Race. Play: https://go.dev/play/p/-YvhnpQFVNS
Example (Error) ΒΆ
observable := Amb( Delay[int](25*time.Millisecond)(Throw[int](assert.AnError)), Delay[int](50*time.Millisecond)(Just(4, 5, 6)), Delay[int](100*time.Millisecond)(Just(7, 8, 9)), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(75 * time.Millisecond) subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Amb( Delay[int](100*time.Millisecond)(Just(1, 2, 3)), Delay[int](25*time.Millisecond)(Just(4, 5, 6)), Delay[int](50*time.Millisecond)(Just(7, 8, 9)), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(150 * time.Millisecond) subscription.Unsubscribe()
Output: Next: 4 Next: 5 Next: 6 Completed
func CombineLatest2 ΒΆ
func CombineLatest2[A, B any](obsA Observable[A], obsB Observable[B]) Observable[lo.Tuple2[A, B]]
CombineLatest2 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/mzpJyg7plnm
Example (Error) ΒΆ
observable1 := NewObservable(func(observer Observer[int]) Teardown {
go func() {
time.Sleep(10 * time.Millisecond)
observer.Next(1)
observer.Error(assert.AnError)
}()
return nil
})
observable2 := NewObservable(func(observer Observer[int]) Teardown {
go func() {
observer.Next(2)
observer.Complete()
}()
return nil
})
observable := Pipe1(
CombineLatest2(
observable1,
observable2,
),
Map(func(snapshot lo.Tuple2[int, int]) []int {
return []int{snapshot.A, snapshot.B}
}),
)
subscription := observable.Subscribe(PrintObserver[[]int]())
time.Sleep(50 * time.Millisecond)
defer subscription.Unsubscribe()
Output: Next: [1 2] Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable1 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := RangeWithInterval(3, 5, 50*time.Millisecond)
observable := Pipe1(
CombineLatest2(
observable1,
observable2,
),
Map(func(snapshot lo.Tuple2[int64, int64]) []int64 {
return []int64{snapshot.A, snapshot.B}
}),
)
subscription := observable.Subscribe(PrintObserver[[]int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: [1 3] Next: [1 4] Next: [2 4] Completed
func CombineLatest3 ΒΆ
func CombineLatest3[A, B, C any](obsA Observable[A], obsB Observable[B], obsC Observable[C]) Observable[lo.Tuple3[A, B, C]]
CombineLatest3 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes.
Example (Ok) ΒΆ
observable1 := Delay[int64](100 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := RangeWithInterval(3, 5, 50*time.Millisecond)
observable3 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(5, 7, 50*time.Millisecond))
combined := CombineLatest3(observable1, observable2, observable3)
observable := Map(func(snapshot lo.Tuple3[int64, int64, int64]) []int64 {
return []int64{snapshot.A, snapshot.B, snapshot.C}
})(combined)
subscription := observable.Subscribe(PrintObserver[[]int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: [1 4 6] Next: [2 4 6] Completed
func CombineLatest4 ΒΆ
func CombineLatest4[A, B, C, D any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D]) Observable[lo.Tuple4[A, B, C, D]]
CombineLatest4 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/mzpJyg7plnm
func CombineLatest5 ΒΆ
func CombineLatest5[A, B, C, D, E any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E]) Observable[lo.Tuple5[A, B, C, D, E]]
CombineLatest5 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/mzpJyg7plnm
func CombineLatestAny ΒΆ
func CombineLatestAny(sources ...Observable[any]) Observable[[]any]
CombineLatestAny combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/mzpJyg7plnm
Example (Error) ΒΆ
observable1 := Cast[int64, any]()(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := Delay[any](75 * time.Millisecond)(Throw[any](assert.AnError))
observable3 := Just[any]("a", "b")
combined := Just(observable1, observable2, observable3)
observable := CombineLatestAllAny()(combined)
subscription := observable.Subscribe(PrintObserver[[]any]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable1 := Cast[int64, any]()(RangeWithInterval(1, 3, 40*time.Millisecond))
observable2 := Cast[string, any]()(Just("a", "b"))
observable3 := Delay[any](25 * time.Millisecond)(Just[any]("c", "d"))
observable4 := Delay[any](60 * time.Millisecond)(Cast[int64, any]()(Range(100, 102)))
combined := Just(observable1, observable2, observable3, observable4)
observable := CombineLatestAllAny()(combined)
subscription := observable.Subscribe(PrintObserver[[]any]())
time.Sleep(220 * time.Millisecond)
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: [1 b d 100] Next: [1 b d 101] Next: [2 b d 101] Completed
func Concat ΒΆ
func Concat[T any](obs ...Observable[T]) Observable[T]
Concat concatenates the source Observable with other Observables. It subscribes to each inner Observable only after the previous one completes, maintaining their order. It completes when all inner Observables are done. Play: https://go.dev/play/p/DFokqIXIguM
Example (Error) ΒΆ
observable := Concat( Just(1, 2, 3), Throw[int](assert.AnError), Just(4, 5, 6), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Concat( Just(1, 2, 3), Just(4, 5, 6), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Next: 6 Completed
func Defer ΒΆ
func Defer[T any](factory func() Observable[T]) Observable[T]
Defer creates an Observable that waits until an Observer subscribes to it, and then it creates an Observable for each Observer. This is useful for creating Observables that depend on some external state that is not available at the time of creation. The `cb` function is called for each Observer that subscribes to the Observable. Play: https://go.dev/play/p/wyVzordmkK0
Example ΒΆ
// will capture current date time
observable1 := Of(time.Now())
// will capture date time at the moment of subscription
observable2 := Defer(func() Observable[time.Time] {
return Of(time.Now())
})
subscription := Concat(observable1, observable2).Subscribe(NoopObserver[time.Time]())
subscription.Wait() // Note: using .Wait() is not recommended.
func Empty ΒΆ
func Empty[T any]() Observable[T]
Empty creates an Observable that emits no values and completes immediately. Play: https://go.dev/play/p/D1JWkPG4NFK
Example ΒΆ
observable := Empty[int]() subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Completed
func FromChannel ΒΆ
func FromChannel[T any](in <-chan T) Observable[T]
FromChannel creates an Observable from a channel. Closing the channel will complete the Observable. Play: https://go.dev/play/p/x0u4eaOzYln
Example ΒΆ
ch := make(chan int, 10) observable := FromChannel(ch) subscription := observable.Subscribe(PrintObserver[int]()) ch <- 1 ch <- 2 ch <- 3 close(ch) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 1 Next: 2 Next: 3 Completed
func FromSlice ΒΆ
func FromSlice[T any](collections ...[]T) Observable[T]
FromSlice creates an Observable from a slice. The values are emitted in the order they are in the slice. Play: https://go.dev/play/p/BNhnqoQn0tP
Example ΒΆ
observable := FromSlice([]int{1, 2, 3})
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Completed
func Future ΒΆ
func Future[T any](factory func() (T, error)) Observable[T]
Future creates an Observable that waits until an Observer subscribes to it, and then it emits either a value or an error, returned by the `factory` function.
This is useful for creating Observables that depend on some external state that is not available at the time of creation. The `factory` function is called for each Observer that subscribes to the Observable.
Example (Error) ΒΆ
observable := Future(func() (int, error) {
req, err := http.NewRequest("", "", nil)
if err != nil {
return 0, err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return 0, err
}
defer res.Body.Close()
// For some reason, removing the 2 following lines causes
// the example to fail (see goleak).
// See https://github.com/uber-go/goleak/issues/102
_, _ = io.ReadAll(res.Body)
defer http.DefaultClient.CloseIdleConnections()
return 42, nil
})
subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Error: Get "": unsupported protocol scheme ""
Example (Ok) ΒΆ
observable := Future(func() (int, error) {
req, err := http.NewRequest("GET", "https://postman-echo.com/get", nil)
if err != nil {
return 0, err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return 0, err
}
defer res.Body.Close()
// For some reason, removing the 2 following lines causes
// the example to fail (see goleak).
// See https://github.com/uber-go/goleak/issues/102
_, _ = io.ReadAll(res.Body)
defer http.DefaultClient.CloseIdleConnections()
return 42, nil
})
subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 42 Completed
func Interval ΒΆ
func Interval(interval time.Duration) Observable[int64]
Interval creates an Observable that emits an infinite sequence of ascending integers, with a constant interval between them. The first value is not emitted immediately, but after the first interval has passed. Play: https://go.dev/play/p/7yskMPPFHA7
Example ΒΆ
observable := Interval(100 * time.Millisecond) subscription := observable.Subscribe(PrintObserver[int64]()) time.Sleep(250 * time.Millisecond) subscription.Unsubscribe() // "Completed" event is not transmitted
Output: Next: 0 Next: 1
func IntervalWithInitial ΒΆ
func IntervalWithInitial(initial, interval time.Duration) Observable[int64]
IntervalWithInitial creates an Observable that emits an infinite sequence of ascending integers, with a constant interval between them. The first value is not emitted immediately, but after the initial interval has passed. The first interval is `initial`, and the subsequent intervals are `interval`. The first value is emitted after `initial` time has passed. Play: https://go.dev/play/p/Xhi6c336ldy
Example ΒΆ
observable := IntervalWithInitial(50*time.Millisecond, 100*time.Millisecond) subscription := observable.Subscribe(PrintObserver[int64]()) time.Sleep(300 * time.Millisecond) subscription.Unsubscribe() // "Completed" event is not transmitted
Output: Next: 0 Next: 1 Next: 2
func Just ΒΆ
func Just[T any](values ...T) Observable[T]
Just is an alias for Of. Play: https://go.dev/play/p/A5S2McqqfqE
Example ΒΆ
observable := Just(1, 2, 3) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Completed
func Merge ΒΆ
func Merge[T any](sources ...Observable[T]) Observable[T]
Merge merges the values from all observables to a single observable result. It subscribes to each inner Observable, and emits all values from each inner Observable, maintaining their order. It completes when all inner Observables are done. Play: https://go.dev/play/p/hX2xPyeO3M9
Example (Error) ΒΆ
observable := Merge( RangeWithInterval(0, 2, 50*time.Millisecond), Pipe1( Throw[int64](assert.AnError), Delay[int64](75*time.Millisecond), ), ) subscription := observable.Subscribe(PrintObserver[int64]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 0 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Merge( RangeWithInterval(0, 2, 50*time.Millisecond), Pipe1( RangeWithInterval(10, 12, 50*time.Millisecond), Delay[int64](25*time.Millisecond), ), ) subscription := observable.Subscribe(PrintObserver[int64]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 0 Next: 10 Next: 1 Next: 11 Completed
func Never ΒΆ
func Never() Observable[struct{}]
Never creates an Observable that emits no values and never completes. This is useful for testing or when combining with other Observables. Play: https://go.dev/play/p/GHzcVYaEvN8
Example ΒΆ
observable := Never()
subscription := observable.Subscribe(PrintObserver[struct{}]())
time.Sleep(10 * time.Millisecond)
subscription.Unsubscribe()
func NewEventuallySafeObservable ΒΆ
func NewEventuallySafeObservable[T any](subscribe func(destination Observer[T]) Teardown) Observable[T]
NewEventuallySafeObservable creates a new Observable. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.
The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.
The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.
This method is safe for concurrent use, but concurrent messages are dropped.
func NewEventuallySafeObservableWithContext ΒΆ
func NewEventuallySafeObservableWithContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown) Observable[T]
NewEventuallySafeObservableWithContext creates a new Observable. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.
The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.
The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.
This method is safe for concurrent use, but concurrent messages are dropped.
func NewObservable ΒΆ
func NewObservable[T any](subscribe func(destination Observer[T]) Teardown) Observable[T]
NewObservable creates a new Observable. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.
The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.
The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.
This method is not safe for concurrent use.
Example (Error) ΒΆ
observable := NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Error(assert.AnError)
observer.Next(4)
return nil
})
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := NewObservable(func(observer Observer[int]) Teardown {
observer.Next(1)
observer.Next(2)
observer.Next(3)
observer.Next(4)
observer.Complete()
return nil
})
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Next: 4 Completed
func NewObservableWithConcurrencyMode ΒΆ
func NewObservableWithConcurrencyMode[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown, mode ConcurrencyMode) Observable[T]
NewObservableWithConcurrencyMode creates a new Observable with the given concurrency mode. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.
The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.
The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.
The Observable will use the given concurrency mode.
It is rarely used as a public API.
func NewObservableWithContext ΒΆ
func NewObservableWithContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown) Observable[T]
NewObservableWithContext creates a new Observable. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.
The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.
The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.
This method is not safe for concurrent use.
func NewSafeObservable ΒΆ
func NewSafeObservable[T any](subscribe func(destination Observer[T]) Teardown) Observable[T]
NewSafeObservable creates a new Observable. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.
The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.
The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.
This method is not safe for concurrent use.
func NewSafeObservableWithContext ΒΆ
func NewSafeObservableWithContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown) Observable[T]
NewSafeObservableWithContext creates a new Observable. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.
The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.
The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.
This method is not safe for concurrent use.
func NewUnsafeObservable ΒΆ
func NewUnsafeObservable[T any](subscribe func(destination Observer[T]) Teardown) Observable[T]
NewUnsafeObservable creates a new Observable. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.
The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.
The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.
This method is not safe for concurrent use.
func NewUnsafeObservableWithContext ΒΆ
func NewUnsafeObservableWithContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown) Observable[T]
NewUnsafeObservableWithContext creates a new Observable. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.
The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.
The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.
This method is not safe for concurrent use.
func Of ΒΆ
func Of[T any](values ...T) Observable[T]
Of creates an Observable that emits some values you specify. Play: https://go.dev/play/p/Zp5LgHgvJ59
Example ΒΆ
observable := Of(1, 2, 3) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 1 Next: 2 Next: 3 Completed
func Pipe ΒΆ
func Pipe[First, Last any](source Observable[First], operators ...any) Observable[Last]
Pipe builds a composition of operators that will be chained to transform an observable stream. It provides a clean, declarative way to describe complex asynchronous operations.
`PipeX()` should be favored over `Pipe()`, because it offers more type-safety.
`PipeOp()` is the operator version of `Pipe()`.
Example ΒΆ
observable := Pipe[int, int](
Just(1, 2, 3, 4, 5),
Map(func(x int) int {
return x * 2
}),
Skip[int](2),
Sum[int](),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 24 Completed
func Pipe1 ΒΆ
func Pipe1[A, B any]( source Observable[A], operator1 func(Observable[A]) Observable[B], ) Observable[B]
Pipe1 is a typesafe π implementation of Pipe, that takes a source and 1 operator.
`PipeOp1()` is the operator version of `Pipe1()`.
Example ΒΆ
observable := Pipe1( Just(1, 2, 3, 4, 5), Sum[int](), ) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 15 Completed
func Pipe2 ΒΆ
func Pipe2[A, B, C any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], ) Observable[C]
Pipe2 is a typesafe π implementation of Pipe, that takes a source and 2 operators.
`PipeOp2()` is the operator version of `Pipe2()`.
Example ΒΆ
observable := Pipe2(
Just(1, 2, 3, 4, 5),
Map(func(x int) int {
return x * 2
}),
Sum[int](),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 30 Completed
func Pipe3 ΒΆ
func Pipe3[A, B, C, D any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], ) Observable[D]
Pipe3 is a typesafe π implementation of Pipe, that takes a source and 3 operators.
`PipeOp3()` is the operator version of `Pipe3()`.
Example ΒΆ
observable := Pipe3(
Just(1, 2, 3, 4, 5),
Map(func(x int) int {
return x * 2
}),
Skip[int](2),
Sum[int](),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 24 Completed
func Pipe4 ΒΆ
func Pipe4[A, B, C, D, E any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], ) Observable[E]
Pipe4 is a typesafe π implementation of Pipe, that takes a source and 4 operators.
`PipeOp4()` is the operator version of `Pipe4()`.
Example ΒΆ
observable := Pipe4(
Just(1, 2, 3, 4, 5),
Map(func(x int) int {
return x * 2
}),
Skip[int](2),
Take[int](2),
Sum[int](),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 14 Completed
func Pipe5 ΒΆ
func Pipe5[A, B, C, D, E, F any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], ) Observable[F]
Pipe5 is a typesafe π implementation of Pipe, that takes a source and 5 operators.
`PipeOp5()` is the operator version of `Pipe5()`.
Example ΒΆ
observable := Pipe5(
Just(1, 2, 3, 4, 5),
Map(func(x int) int {
return x * 2
}),
Skip[int](2),
Take[int](2),
Sum[int](),
Max[int](),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 14 Completed
func Pipe6 ΒΆ
func Pipe6[A, B, C, D, E, F, G any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], ) Observable[G]
Pipe6 is a typesafe π implementation of Pipe, that takes a source and 6 operators.
`PipeOp6()` is the operator version of `Pipe6()`.
Example ΒΆ
observable := Pipe6(
Just(1, 2, 3, 4, 5),
Map(func(x int) int {
return x * 2
}),
Skip[int](2),
Take[int](2),
Sum[int](),
Map(func(x int) int {
return x / 2
}),
Max[int](),
)
subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output: Next: 7 Completed
func Pipe7 ΒΆ
func Pipe7[A, B, C, D, E, F, G, H any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], ) Observable[H]
Pipe7 is a typesafe π implementation of Pipe, that takes a source and 7 operators.
`PipeOp7()` is the operator version of `Pipe7()`.
func Pipe8 ΒΆ
func Pipe8[A, B, C, D, E, F, G, H, I any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], ) Observable[I]
Pipe8 is a typesafe π implementation of Pipe, that takes a source and 8 operators.
`PipeOp8()` is the operator version of `Pipe8()`.
func Pipe9 ΒΆ
func Pipe9[A, B, C, D, E, F, G, H, I, J any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], ) Observable[J]
Pipe9 is a typesafe π implementation of Pipe, that takes a source and 9 operators.
`PipeOp9()` is the operator version of `Pipe9()`.
func Pipe10 ΒΆ
func Pipe10[A, B, C, D, E, F, G, H, I, J, K any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], ) Observable[K]
Pipe10 is a typesafe π implementation of Pipe, that takes a source and 10 operators.
`PipeOp10()` is the operator version of `Pipe10()`.
func Pipe11 ΒΆ
func Pipe11[A, B, C, D, E, F, G, H, I, J, K, L any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], ) Observable[L]
Pipe11 is a typesafe π implementation of Pipe, that takes a source and 11 operators.
`PipeOp11()` is the operator version of `Pipe11()`.
func Pipe12 ΒΆ
func Pipe12[A, B, C, D, E, F, G, H, I, J, K, L, M any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], ) Observable[M]
Pipe12 is a typesafe π implementation of Pipe, that takes a source and 12 operators.
`PipeOp12()` is the operator version of `Pipe12()`.
func Pipe13 ΒΆ
func Pipe13[A, B, C, D, E, F, G, H, I, J, K, L, M, N any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], ) Observable[N]
Pipe13 is a typesafe π implementation of Pipe, that takes a source and 13 operators.
`PipeOp13()` is the operator version of `Pipe13()`.
func Pipe14 ΒΆ
func Pipe14[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], ) Observable[O]
Pipe14 is a typesafe π implementation of Pipe, that takes a source and 14 operators.
`PipeOp14()` is the operator version of `Pipe14()`.
func Pipe15 ΒΆ
func Pipe15[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], ) Observable[P]
Pipe15 is a typesafe π implementation of Pipe, that takes a source and 15 operators.
`PipeOp15()` is the operator version of `Pipe15()`.
func Pipe16 ΒΆ
func Pipe16[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], ) Observable[Q]
Pipe16 is a typesafe π implementation of Pipe, that takes a source and 16 operators.
`PipeOp16()` is the operator version of `Pipe16()`.
func Pipe17 ΒΆ
func Pipe17[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], ) Observable[R]
Pipe17 is a typesafe π implementation of Pipe, that takes a source and 17 operators.
`PipeOp17()` is the operator version of `Pipe17()`.
func Pipe18 ΒΆ
func Pipe18[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], operator18 func(Observable[R]) Observable[S], ) Observable[S]
Pipe18 is a typesafe π implementation of Pipe, that takes a source and 18 operators.
`PipeOp18()` is the operator version of `Pipe18()`.
func Pipe19 ΒΆ
func Pipe19[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], operator18 func(Observable[R]) Observable[S], operator19 func(Observable[S]) Observable[T], ) Observable[T]
Pipe19 is a typesafe π implementation of Pipe, that takes a source and 19 operators.
`PipeOp19()` is the operator version of `Pipe19()`.
func Pipe20 ΒΆ
func Pipe20[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], operator18 func(Observable[R]) Observable[S], operator19 func(Observable[S]) Observable[T], operator20 func(Observable[T]) Observable[U], ) Observable[U]
Pipe20 is a typesafe π implementation of Pipe, that takes a source and 20 operators.
`PipeOp20()` is the operator version of `Pipe20()`.
func Pipe21 ΒΆ
func Pipe21[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], operator18 func(Observable[R]) Observable[S], operator19 func(Observable[S]) Observable[T], operator20 func(Observable[T]) Observable[U], operator21 func(Observable[U]) Observable[V], ) Observable[V]
Pipe21 is a typesafe π implementation of Pipe, that takes a source and 21 operators.
`PipeOp21()` is the operator version of `Pipe21()`.
func Pipe22 ΒΆ
func Pipe22[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], operator18 func(Observable[R]) Observable[S], operator19 func(Observable[S]) Observable[T], operator20 func(Observable[T]) Observable[U], operator21 func(Observable[U]) Observable[V], operator22 func(Observable[V]) Observable[W], ) Observable[W]
Pipe22 is a typesafe π implementation of Pipe, that takes a source and 22 operators.
`PipeOp22()` is the operator version of `Pipe22()`.
func Pipe23 ΒΆ
func Pipe23[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], operator18 func(Observable[R]) Observable[S], operator19 func(Observable[S]) Observable[T], operator20 func(Observable[T]) Observable[U], operator21 func(Observable[U]) Observable[V], operator22 func(Observable[V]) Observable[W], operator23 func(Observable[W]) Observable[X], ) Observable[X]
Pipe23 is a typesafe π implementation of Pipe, that takes a source and 23 operators.
`PipeOp23()` is the operator version of `Pipe23()`.
func Pipe24 ΒΆ
func Pipe24[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], operator18 func(Observable[R]) Observable[S], operator19 func(Observable[S]) Observable[T], operator20 func(Observable[T]) Observable[U], operator21 func(Observable[U]) Observable[V], operator22 func(Observable[V]) Observable[W], operator23 func(Observable[W]) Observable[X], operator24 func(Observable[X]) Observable[Y], ) Observable[Y]
Pipe24 is a typesafe π implementation of Pipe, that takes a source and 24 operators.
`PipeOp24()` is the operator version of `Pipe24()`.
func Pipe25 ΒΆ
func Pipe25[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y, Z any]( source Observable[A], operator1 func(Observable[A]) Observable[B], operator2 func(Observable[B]) Observable[C], operator3 func(Observable[C]) Observable[D], operator4 func(Observable[D]) Observable[E], operator5 func(Observable[E]) Observable[F], operator6 func(Observable[F]) Observable[G], operator7 func(Observable[G]) Observable[H], operator8 func(Observable[H]) Observable[I], operator9 func(Observable[I]) Observable[J], operator10 func(Observable[J]) Observable[K], operator11 func(Observable[K]) Observable[L], operator12 func(Observable[L]) Observable[M], operator13 func(Observable[M]) Observable[N], operator14 func(Observable[N]) Observable[O], operator15 func(Observable[O]) Observable[P], operator16 func(Observable[P]) Observable[Q], operator17 func(Observable[Q]) Observable[R], operator18 func(Observable[R]) Observable[S], operator19 func(Observable[S]) Observable[T], operator20 func(Observable[T]) Observable[U], operator21 func(Observable[U]) Observable[V], operator22 func(Observable[V]) Observable[W], operator23 func(Observable[W]) Observable[X], operator24 func(Observable[X]) Observable[Y], operator25 func(Observable[Y]) Observable[Z], ) Observable[Z]
Pipe25 is a typesafe π implementation of Pipe, that takes a source and 25 operators.
`PipeOp25()` is the operator version of `Pipe25()`.
func Race ΒΆ
func Race[T any](sources ...Observable[T]) Observable[T]
Race creates an Observable that mirrors the first source Observable to emit a next, error or complete notification from the combination of the Observable sources. It cancels the subscriptions to all other Observables. It completes when the source Observable completes. If the source Observable emits an error, the error is emitted by the resulting Observable. Play: https://go.dev/play/p/5VzGFd62SMC
Example (Error) ΒΆ
observable := Race( Delay[int](75*time.Millisecond)(Just(1, 2, 3)), Delay[int](25*time.Millisecond)(Throw[int](assert.AnError)), Delay[int](100*time.Millisecond)(Just(7, 8, 9)), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(50 * time.Millisecond) subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Race( Delay[int](75*time.Millisecond)(Just(1, 2, 3)), Delay[int](25*time.Millisecond)(Just(4, 5, 6)), Delay[int](100*time.Millisecond)(Just(7, 8, 9)), ) subscription := observable.Subscribe(PrintObserver[int]()) time.Sleep(50 * time.Millisecond) subscription.Unsubscribe()
Output: Next: 4 Next: 5 Next: 6 Completed
func RandFloat64 ΒΆ
func RandFloat64(count int) Observable[float64]
RandFloat64 creates an Observable that emits random float64 values in the range [0, 1). The count is the number of values to emit. Play: https://go.dev/play/p/MRuy8rUpTve
func RandIntN ΒΆ
func RandIntN(n, count int) Observable[int]
RandIntN creates an Observable that emits random int values in the range [0, n). The count is the number of values to emit. Play: https://go.dev/play/p/4m7T5j-7i3a
func Range ΒΆ
func Range(start, end int64) Observable[int64]
Range creates an Observable that emits a range of integers. The range is [start:end), so `start` is emitted but not `end`. If `start` is equal to `end`, an empty Observable is returned. If `start` is greater than `end`, the emitted values are in descending order. The step is 1. Play: https://go.dev/play/p/5XAXfNrtJm2
Example ΒΆ
observable := Range(0, 5) subscription := observable.Subscribe(PrintObserver[int64]()) defer subscription.Unsubscribe()
Output: Next: 0 Next: 1 Next: 2 Next: 3 Next: 4 Completed
func RangeWithInterval ΒΆ
func RangeWithInterval(start, end int64, interval time.Duration) Observable[int64]
RangeWithInterval creates an Observable that emits a range of integers. The range is [start:end), so `start` is emitted but not `end`. If `start` is equal to `end`, an empty Observable is returned. If `start` is greater than `end`, the emitted values are in descending order. The interval is the time between each value. The first value is emitted after the first interval has passed. The step is 1. Play: https://go.dev/play/p/Y_1l6BDbMSi
Example ΒΆ
observable := RangeWithInterval(0, 5, 10*time.Millisecond) subscription := observable.Subscribe(PrintObserver[int64]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 0 Next: 1 Next: 2 Next: 3 Next: 4 Completed
func RangeWithStep ΒΆ
func RangeWithStep(start, end, step float64) Observable[float64]
RangeWithStep creates an Observable that emits a range of floats. The range is [start:end), so `start` is emitted but not `end`. If `start` is equal to `end`, an empty Observable is returned. If `start` is greater than `end`, the emitted values are in descending order. The step must be greater than 0. Play: https://go.dev/play/p/EOG0tIVjUKC
Example ΒΆ
observable := RangeWithStep(0, 5, 0.5) subscription := observable.Subscribe(PrintObserver[float64]()) defer subscription.Unsubscribe()
Output: Next: 0 Next: 0.5 Next: 1 Next: 1.5 Next: 2 Next: 2.5 Next: 3 Next: 3.5 Next: 4 Next: 4.5 Completed
func RangeWithStepAndInterval ΒΆ
func RangeWithStepAndInterval(start, end, step float64, interval time.Duration) Observable[float64]
RangeWithStepAndInterval creates an Observable that emits a range of floats. The range is [start:end), so `start` is emitted but not `end`. If `start` is equal to `end`, an empty Observable is returned. If `start` is greater than `end`, the emitted values are in descending order. The step must be greater than 0. The interval is the time between each value. The first value is emitted after the first interval has passed. Play: https://go.dev/play/p/kdAEsGwfqw9
Example ΒΆ
observable := RangeWithStepAndInterval(0, 5, 0.5, 10*time.Millisecond) subscription := observable.Subscribe(PrintObserver[float64]()) subscription.Wait() // Note: using .Wait() is not recommended.
Output: Next: 0 Next: 0.5 Next: 1 Next: 1.5 Next: 2 Next: 2.5 Next: 3 Next: 3.5 Next: 4 Next: 4.5 Completed
func Repeat ΒΆ
func Repeat[T any](item T, count int64) Observable[T]
Repeat creates an Observable that emits a single value multiple times. This is a creation operator. The pipeable equivalent is `RepeatWith`. Play: https://go.dev/play/p/CUvh_TYALNe
Example ΒΆ
observable := Repeat(42, 3) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Next: 42 Next: 42 Next: 42 Completed
func RepeatWithInterval ΒΆ
func RepeatWithInterval[T any](item T, count int64, interval time.Duration) Observable[T]
RepeatWithInterval creates an Observable that emits a single value multiple times. The interval is the time between each value. The first value is emitted after the first interval has passed. Play: https://go.dev/play/p/4PK5Zt2sGze
Example ΒΆ
observable := RepeatWithInterval(42, 3, 50*time.Millisecond) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe() time.Sleep(200 * time.Millisecond)
Output: Next: 42 Next: 42 Next: 42 Completed
func Start ΒΆ
func Start[T any](cb func() T) Observable[T]
Start creates an Observable that emits lazily a single value. Play: https://go.dev/play/p/Jz7oyagu07u
Example ΒΆ
observable := Start(func() int {
fmt.Println("Start!")
return 42
})
subscription1 := observable.Subscribe(PrintObserver[int]())
subscription2 := observable.Subscribe(PrintObserver[int]())
subscription1.Wait() // Note: using .Wait() is not recommended.
subscription2.Wait() // Note: using .Wait() is not recommended.
Output: Start! Next: 42 Completed Start! Next: 42 Completed
func Throw ΒΆ
func Throw[T any](err error) Observable[T]
Throw creates an Observable that emits an error and completes immediately. Play: https://go.dev/play/p/1TBK8LdDRJF
Example ΒΆ
observable := Throw[int](assert.AnError) subscription := observable.Subscribe(PrintObserver[int]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
func Timer ΒΆ
func Timer(duration time.Duration) Observable[time.Duration]
Timer creates an Observable that emits a value after a specified duration. Play: https://go.dev/play/p/hMkNLEqpcy3
Example ΒΆ
observable := Timer(10 * time.Millisecond) subscription := observable.Subscribe(PrintObserver[time.Duration]()) defer subscription.Unsubscribe()
Output: Next: 10ms Completed
func Zip ΒΆ
func Zip[T any](sources ...Observable[T]) Observable[[]T]
Zip combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/5YxbQ5jNzjQ
Example (Error) ΒΆ
observable := Zip( Range(1, 3), Throw[int64](assert.AnError), ) subscription := observable.Subscribe(PrintObserver[[]int64]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Zip( Range(1, 3), Range(10, 13), ) subscription := observable.Subscribe(PrintObserver[[]int64]()) defer subscription.Unsubscribe()
Output: Next: [1 10] Next: [2 11] Completed
func Zip2 ΒΆ
func Zip2[A, B any](obsA Observable[A], obsB Observable[B]) Observable[lo.Tuple2[A, B]]
Zip2 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/5YxbQ5jNzjQ
Example (Error) ΒΆ
observable := Zip2( Range(0, 10), Throw[int64](assert.AnError), ) subscription := observable.Subscribe(PrintObserver[lo.Tuple2[int64, int64]]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Zip2( Range(0, 10), Skip[int64](1)(Range(0, 4)), ) subscription := observable.Subscribe(PrintObserver[lo.Tuple2[int64, int64]]()) defer subscription.Unsubscribe()
Output: Next: {0 1} Next: {1 2} Next: {2 3} Completed
func Zip3 ΒΆ
func Zip3[A, B, C any](obsA Observable[A], obsB Observable[B], obsC Observable[C]) Observable[lo.Tuple3[A, B, C]]
Zip3 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/5YxbQ5jNzjQ
Example (Error) ΒΆ
observable := Zip3( Range(1, 3), Throw[int64](assert.AnError), Range(100, 103), ) subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int64, int64, int64]]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Zip3( Range(1, 3), Range(10, 13), Range(100, 103), ) subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int64, int64, int64]]()) defer subscription.Unsubscribe()
Output: Next: {1 10 100} Next: {2 11 101} Completed
func Zip4 ΒΆ
func Zip4[A, B, C, D any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D]) Observable[lo.Tuple4[A, B, C, D]]
Zip4 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/5YxbQ5jNzjQ
Example (Error) ΒΆ
observable := Zip4( Range(1, 3), Throw[int64](assert.AnError), Range(100, 103), Range(1000, 1003), ) subscription := observable.Subscribe(PrintObserver[lo.Tuple4[int64, int64, int64, int64]]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Zip4( Range(1, 3), Range(10, 13), Range(100, 103), Range(1000, 1003), ) subscription := observable.Subscribe(PrintObserver[lo.Tuple4[int64, int64, int64, int64]]()) defer subscription.Unsubscribe()
Output: Next: {1 10 100 1000} Next: {2 11 101 1001} Completed
func Zip5 ΒΆ
func Zip5[A, B, C, D, E any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E]) Observable[lo.Tuple5[A, B, C, D, E]]
Zip5 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/5YxbQ5jNzjQ
Example (Error) ΒΆ
observable := Zip5( Range(1, 3), Throw[int64](assert.AnError), Range(100, 103), Range(1000, 1003), Range(10000, 10003), ) subscription := observable.Subscribe(PrintObserver[lo.Tuple5[int64, int64, int64, int64, int64]]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Zip5( Range(1, 3), Range(10, 13), Range(100, 103), Range(1000, 1003), Range(10000, 10003), ) subscription := observable.Subscribe(PrintObserver[lo.Tuple5[int64, int64, int64, int64, int64]]()) defer subscription.Unsubscribe()
Output: Next: {1 10 100 1000 10000} Next: {2 11 101 1001 10001} Completed
func Zip6 ΒΆ
func Zip6[A, B, C, D, E, F any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E], obsF Observable[F]) Observable[lo.Tuple6[A, B, C, D, E, F]]
Zip6 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/5YxbQ5jNzjQ
Example (Error) ΒΆ
observable := Zip6( Range(1, 3), Throw[int64](assert.AnError), Range(100, 103), Range(1000, 1003), Range(10000, 10003), Range(100000, 100003), ) subscription := observable.Subscribe(PrintObserver[lo.Tuple6[int64, int64, int64, int64, int64, int64]]()) defer subscription.Unsubscribe()
Output: Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Zip6( Range(1, 3), Range(10, 13), Range(100, 103), Range(1000, 1003), Range(10000, 10003), Range(100000, 100003), ) subscription := observable.Subscribe(PrintObserver[lo.Tuple6[int64, int64, int64, int64, int64, int64]]()) defer subscription.Unsubscribe()
Output: Next: {1 10 100 1000 10000 100000} Next: {2 11 101 1001 10001 100001} Completed
type Observer ΒΆ
type Observer[T any] interface { // Next receives the next value from the Observable. It is called zero or // more times by the Observable. The Observable may call Next synchronously // or asynchronously. If Next is called after the Observer has been closed, // the value will be dropped. Next(value T) NextWithContext(ctx context.Context, value T) // Error receives an error from the Observable. It is called at most once by // the Observable. The Observable may call Error synchronously or // asynchronously. If Error is called after the Observer has been closed, the // error will be dropped. Error(err error) ErrorWithContext(ctx context.Context, err error) // Complete receives a completion notification from the Observable. It is called // at most once by the Observable. The Observable may call Complete // synchronously or asynchronously. If Complete is called after the Observer has // been closed, the completion notification will be dropped. Complete() CompleteWithContext(ctx context.Context) // IsClosed returns true if the Observer has been closed, either by an error // or completion notification. If the Observer is closed, it will not receive // any more notifications. IsClosed() bool // HasThrown returns true if the Observer has received an error notification. HasThrown() bool // IsCompleted returns true if the Observer has received a completion notification. IsCompleted() bool }
Observer is the consumer of an Observable. It receives notifications: Next, Error, and Complete. Observers are safe for concurrent calls to Next, Error, and Complete. It is the responsibility of the Observer to ensure that notifications are not forwarded after it has been closed.
func NewObserver ΒΆ
func NewObserver[T any](onNext func(value T), onError func(err error), onComplete func()) Observer[T]
NewObserver creates a new Observer with the provided callbacks. No context is provided.
Example ΒΆ
observer := NewObserver(
func(value int) {
fmt.Printf("Next: %d\n", value)
},
func(err error) {
fmt.Printf("Error: %s\n", err.Error())
},
func() {
fmt.Printf("Completed\n")
},
)
observer.Next(123) // 123 logged
observer.Next(456) // 456 logged
observer.Complete() // Completed logged
observer.Next(789) // nothing logged
Output: Next: 123 Next: 456 Completed
Example (Empty) ΒΆ
observer := NewObserver(
func(value int) {
fmt.Printf("Next: %d\n", value)
},
func(err error) {
fmt.Printf("Error: %s\n", err.Error())
},
func() {
fmt.Printf("Completed\n")
},
)
observer.Complete() // Completed logged
observer.Next(123) // nothing logged
Output: Completed
Example (Error) ΒΆ
observer := NewObserver(
func(value int) {
fmt.Printf("Next: %d\n", value)
},
func(err error) {
fmt.Printf("Error: %s\n", err.Error())
},
func() {
fmt.Printf("Completed\n")
},
)
observer.Next(123) // 123 logged
observer.Next(456) // 456 logged
observer.Error(assert.AnError) // Completed logged
observer.Next(789) // nothing logged
Output: Next: 123 Next: 456 Error: assert.AnError general error for testing
func NewObserverWithContext ΒΆ
func NewObserverWithContext[T any](onNext func(ctx context.Context, value T), onError func(ctx context.Context, err error), onComplete func(ctx context.Context)) Observer[T]
NewObserverWithContext creates a new Observer with the provided callbacks. A context is provided to each callback.
func NoopObserver ΒΆ
NoopObserver is an Observer that does nothing. Warning: This observer will silent errors.
func OnComplete ΒΆ
OnComplete is a partial Observer with only the Complete method implemented. Warning: This observer will silent errors.
func OnCompleteWithContext ΒΆ
OnCompleteWithContext is a partial Observer with only the Complete method implemented. Warning: This observer will silent errors.
func OnErrorWithContext ΒΆ
OnErrorWithContext is a partial Observer with only the Error method implemented.
func OnNext ΒΆ
OnNext is a partial Observer with only the Next method implemented. Warning: This observer will silent errors.
func OnNextWithContext ΒΆ
OnNextWithContext is a partial Observer with only the Next method implemented. Warning: This observer will silent errors.
func PrintObserver ΒΆ
PrintObserver is an utilitary Observer that dump notifications for debug purpose.
type RetryConfig ΒΆ
RetryConfig is the configuration for the Retry operator.
type ShareConfig ΒΆ
type ShareConfig[T any] struct { }
ShareConfig is the configuration for the Share operator.
type ShareReplayConfig ΒΆ
type ShareReplayConfig struct {
}
ShareReplayConfig is the configuration for the ShareReplay operator.
type Subject ΒΆ
type Subject[T any] interface { Observable[T] Observer[T] HasObserver() bool CountObservers() int IsClosed() bool HasThrown() bool IsCompleted() bool AsObservable() Observable[T] AsObserver() Observer[T] }
Subject is a sort of bridge or proxy, that acts both as an observer and as an Observable. Because it is an observer, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.
func NewAsyncSubject ΒΆ
NewAsyncSubject emits its last value on completion. If no value had been received, observer is completed without emitting value. The emitted value or error is stored for future subscriptions. 0 or 1 value is emitted.
Example ΒΆ
subject := NewAsyncSubject[int]() subject.Subscribe(PrintObserver[int]()) subject.Next(123) // nothing logged sub := Pipe1( subject.AsObservable(), Delay[int](25*time.Millisecond), ).Subscribe(PrintObserver[int]()) defer sub.Unsubscribe() subject.Next(456) // nothing logged subject.Complete() // 456 logged by both subscribers time.Sleep(50 * time.Millisecond) subject.Next(789) // nothing logged subject.Subscribe(PrintObserver[int]()) // 456 logged by both subscribers
Output: Next: 456 Completed Next: 456 Completed Next: 456 Completed
Example (Empty) ΒΆ
subject := NewAsyncSubject[int]() subject.Subscribe(PrintObserver[int]()) subject.Complete() // nothing logged subject.Subscribe(PrintObserver[int]()) subject.Next(123) // nothing logged
Output: Completed Completed
Example (Error) ΒΆ
subject := NewAsyncSubject[int]() subject.Subscribe(PrintObserver[int]()) subject.Next(123) // nothing logged subject.Subscribe(PrintObserver[int]()) subject.Next(456) // nothing logged subject.Error(assert.AnError) // error logged by both subscribers subject.Subscribe(PrintObserver[int]()) // error logged by last subscriber subject.Next(789) // nothing logged subject.Complete() // nothing logged
Output: Error: assert.AnError general error for testing Error: assert.AnError general error for testing Error: assert.AnError general error for testing
func NewBehaviorSubject ΒΆ
NewBehaviorSubject emits the current value to new subscribers or initial value. After completion, new subscription won't receive the last value, but the error will eventually propagated.
Example ΒΆ
subject := NewBehaviorSubject(42) subject.Subscribe(PrintObserver[int]()) // 42 logged by first subscriber subject.Next(123) // 123 logged by first subscriber subject.Subscribe(PrintObserver[int]()) // 123 logged by second subscriber subject.Next(456) // 123 logged by second subscriber subject.Complete() // 456 logged by both subscribers subject.Next(789) // nothing logged subject.Subscribe(PrintObserver[int]()) // nothing logged
Output: Next: 42 Next: 123 Next: 123 Next: 456 Next: 456 Completed Completed Completed
Example (Empty) ΒΆ
subject := NewBehaviorSubject(42) subject.Complete() // nothing logged subject.Subscribe(PrintObserver[int]()) // nothing logged subject.Subscribe(PrintObserver[int]()) // nothing logged subject.Next(123) // nothing logged
Output: Completed Completed
Example (Error) ΒΆ
subject := NewBehaviorSubject(42) subject.Subscribe(PrintObserver[int]()) // 42 logged by first subscriber subject.Next(123) // 123 logged by first subscriber subject.Subscribe(PrintObserver[int]()) // 123 logged by second subscriber subject.Next(456) // nothing logged subject.Error(assert.AnError) // error logged by both subscribers subject.Subscribe(PrintObserver[int]()) // error logged by last subscriber subject.Next(789) // nothing logged
Output: Next: 42 Next: 123 Next: 123 Next: 456 Next: 456 Error: assert.AnError general error for testing Error: assert.AnError general error for testing Error: assert.AnError general error for testing
func NewPublishSubject ΒΆ
NewPublishSubject broadcasts a value to observers (fanout). Values received before subscription are not transmitted.
Example ΒΆ
subject := NewPublishSubject[int]() subject.Subscribe(PrintObserver[int]()) subject.Next(123) // 123 logged by first subscriber subject.Subscribe(PrintObserver[int]()) subject.Next(456) // 456 logged by both subscribers subject.Complete() subject.Next(789) // nothing logged
Output: Next: 123 Next: 456 Next: 456 Completed Completed
Example (Empty) ΒΆ
subject := NewPublishSubject[int]() subject.Subscribe(PrintObserver[int]()) subject.Complete() // nothing logged subject.Subscribe(PrintObserver[int]()) subject.Next(123) // nothing logged
Output: Completed Completed
Example (Error) ΒΆ
subject := NewPublishSubject[int]() subject.Subscribe(PrintObserver[int]()) subject.Next(123) // 123 logged by first subscriber subject.Subscribe(PrintObserver[int]()) subject.Next(456) // 456 logged by both subscribers subject.Error(assert.AnError) // error logged by both subscribers subject.Subscribe(PrintObserver[int]()) // error logged by last subscriber subject.Next(789) // nothing logged subject.Complete() // nothing logged
Output: Next: 123 Next: 456 Next: 456 Error: assert.AnError general error for testing Error: assert.AnError general error for testing Error: assert.AnError general error for testing
func NewReplaySubject ΒΆ
NewReplaySubject emits old values to new subscribers. After error or completion, new subscriptions receive values from the buffer then the error or the completion.
Example ΒΆ
subject := NewReplaySubject[int](42) subject.Subscribe(PrintObserver[int]()) subject.Next(123) // 123 logged by first subscriber subject.Subscribe(PrintObserver[int]()) // 123 logged by second subscriber subject.Next(456) // 456 logged by both subscriber subject.Complete() subject.Subscribe(PrintObserver[int]()) // 123 and 456 logged by third subscriber subject.Next(789) // nothing logged
Output: Next: 123 Next: 123 Next: 456 Next: 456 Completed Completed Next: 123 Next: 456 Completed
Example (Empty) ΒΆ
subject := NewReplaySubject[int](42) subject.Subscribe(PrintObserver[int]()) subject.Complete() // nothing logged subject.Subscribe(PrintObserver[int]()) subject.Next(123) // nothing logged
Output: Completed Completed
Example (Error) ΒΆ
subject := NewReplaySubject[int](42) subject.Subscribe(PrintObserver[int]()) subject.Next(123) // 123 logged by first subscriber subject.Subscribe(PrintObserver[int]()) // 123 logged by second subscriber subject.Next(456) // 456 logged by both subscriber subject.Error(assert.AnError) // error logged by both subscribers subject.Subscribe(PrintObserver[int]()) // error logged by last subscriber subject.Next(789) // nothing logged subject.Complete() // nothing logged
Output: Next: 123 Next: 123 Next: 456 Next: 456 Error: assert.AnError general error for testing Error: assert.AnError general error for testing Next: 123 Next: 456 Error: assert.AnError general error for testing
Example (Overflow) ΒΆ
subject := NewReplaySubject[int](2) subject.Next(123) // nothing logged subject.Next(456) // nothing logged subject.Next(789) // nothing logged subject.Complete() // nothing logged subject.Subscribe(PrintObserver[int]()) // 456 and 789 logged
Output: Next: 456 Next: 789 Completed
func NewUnicastSubject ΒΆ
NewUnicastSubject queues up events until a single Observer subscribes to it, replays those events to it until the Observer catches up and then switches to relaying events live to this single Observer.
Example ΒΆ
subject := NewUnicastSubject[int](42) subject.Subscribe(PrintObserver[int]()) subject.Next(123) // 123 logged by first subscriber subject.Subscribe(PrintObserver[int]()) // error subject.Next(456) // 456 logged by both subscriber subject.Complete() subject.Subscribe(PrintObserver[int]()) // 123 and 456 logged by third subscriber subject.Next(789) // 789 logged by third subscriber
Output: Next: 123 Error: ro.UnicastSubject: a single subscriber accepted Next: 456 Completed Completed
Example (Empty) ΒΆ
subject := NewUnicastSubject[int](42) subject.Subscribe(PrintObserver[int]()) subject.Complete() // nothing logged subject.Subscribe(PrintObserver[int]()) subject.Next(123) // nothing logged
Output: Completed Completed
Example (Error) ΒΆ
subject := NewUnicastSubject[int](42) subject.Subscribe(PrintObserver[int]()) subject.Next(123) // 123 logged by first subscriber subject.Subscribe(PrintObserver[int]()) // 123 logged by second subscriber subject.Next(456) // 456 logged by both subscriber subject.Error(assert.AnError) // error logged by both subscribers subject.Subscribe(PrintObserver[int]()) // error logged by last subscriber subject.Next(789) // nothing logged subject.Complete() // nothing logged
Output: Next: 123 Error: ro.UnicastSubject: a single subscriber accepted Next: 456 Error: assert.AnError general error for testing Error: assert.AnError general error for testing
Example (Overflow) ΒΆ
subject := NewUnicastSubject[int](2) subject.Next(123) // nothing logged subject.Next(456) // nothing logged subject.Next(789) // nothing logged subject.Complete() // nothing logged subject.Subscribe(PrintObserver[int]()) // 456 and 789 logged
Output: Completed
type Subscriber ΒΆ
type Subscriber[T any] interface { Subscription Observer[T] }
Subscriber implements the Observer and Subscription interfaces. While the Observer is the public API for consuming the values of an Observable, all Observers get converted to a Subscriber, in order to provide Subscription-like capabilities such as `Unsubscribe()`. Subscriber is a common type in samber/ro, and crucial for implementing operators, but it is rarely used as a public API.
func NewEventuallySafeSubscriber ΒΆ
func NewEventuallySafeSubscriber[T any](destination Observer[T]) Subscriber[T]
NewEventuallySafeSubscriber creates a new Subscriber from an Observer. If the Observer is already a Subscriber, it is returned as is. Otherwise, a new Subscriber is created that wraps the Observer.
The returned Subscriber will unsubscribe from the destination Observer when Unsubscribe() is called.
This method is safe for concurrent use, but concurrent messages are dropped.
It is rarely used as a public API.
func NewSafeSubscriber ΒΆ
func NewSafeSubscriber[T any](destination Observer[T]) Subscriber[T]
NewSafeSubscriber creates a new Subscriber from an Observer. If the Observer is already a Subscriber, it is returned as is. Otherwise, a new Subscriber is created that wraps the Observer.
The returned Subscriber will unsubscribe from the destination Observer when Unsubscribe() is called.
This method is safe for concurrent use.
It is rarely used as a public API.
func NewSubscriber ΒΆ
func NewSubscriber[T any](destination Observer[T]) Subscriber[T]
NewSubscriber creates a new Subscriber from an Observer. If the Observer is already a Subscriber, it is returned as is. Otherwise, a new Subscriber is created that wraps the Observer.
The returned Subscriber will unsubscribe from the destination Observer when Unsubscribe() is called.
This method is safe for concurrent use.
It is rarely used as a public API.
func NewSubscriberWithConcurrencyMode ΒΆ
func NewSubscriberWithConcurrencyMode[T any](destination Observer[T], mode ConcurrencyMode) Subscriber[T]
NewSubscriberWithConcurrencyMode creates a new Subscriber from an Observer. If the Observer is already a Subscriber, it is returned as is. Otherwise, a new Subscriber is created that wraps the Observer.
The returned Subscriber will unsubscribe from the destination Observer when Unsubscribe() is called.
It is rarely used as a public API.
func NewUnsafeSubscriber ΒΆ
func NewUnsafeSubscriber[T any](destination Observer[T]) Subscriber[T]
NewUnsafeSubscriber creates a new Subscriber from an Observer. If the Observer is already a Subscriber, it is returned as is. Otherwise, a new Subscriber is created that wraps the Observer.
The returned Subscriber will unsubscribe from the destination Observer when Unsubscribe() is called.
This method is not safe for concurrent use.
It is rarely used as a public API.
type Subscription ΒΆ
type Subscription interface {
Unsubscribable
Add(teardown Teardown)
AddUnsubscribable(unsubscribable Unsubscribable)
IsClosed() bool
Wait() // Note: using .Wait() is not recommended.
}
Subscription represents an ongoing execution of an `Observable`, and has a minimal API which allows you to cancel that execution.
func NewSubscription ΒΆ
func NewSubscription(teardown Teardown) Subscription
NewSubscription creates a new Subscription. When `teardown` is nil, nothing is added. When the subscription is already disposed, the `teardown` callback is triggered immediately.
type Teardown ΒΆ
type Teardown func()
Teardown is a function that cleans up resources, such as closing a file or a network connection. It is called when the Subscription is closed. It is part of a Subscription, and is returned by the Observable creation. It will be called only once, when the Subscription is canceled.
type TimestampValue ΒΆ
TimestampValue is a value emitted by the `TimeInterval` operator.
type Unsubscribable ΒΆ
type Unsubscribable interface {
Unsubscribe()
}
Unsubscribable represents any type that can be unsubscribed from. It provides a common interface for cancellation operations.
Source Files
ΒΆ
- errors.go
- observable.go
- observer.go
- operator_combining.go
- operator_conditional.go
- operator_connectable.go
- operator_context.go
- operator_creation.go
- operator_error_handling.go
- operator_filter.go
- operator_math.go
- operator_sink.go
- operator_transformations.go
- operator_utility.go
- pipe.go
- ro.go
- scheduler.go
- subject.go
- subject_async.go
- subject_behavior.go
- subject_publish.go
- subject_replay.go
- subject_unicast.go
- subscriber.go
- subscription.go
Directories
ΒΆ
| Path | Synopsis |
|---|---|
|
ee
module
|
|
|
plugins/otel
module
|
|
|
plugins/prometheus
module
|
|
|
examples
|
|
|
connectable
module
|
|
|
ee-otel-log
module
|
|
|
ee-otel-metrics
module
|
|
|
ee-otel-tracing
module
|
|
|
ee-prometheus
module
|
|
|
ics-to-csv
module
|
|
|
parallel-api-requests
module
|
|
|
sql-to-csv
module
|
|
|
stock-price-enrichment
module
|
|
|
internal
|
|
|
plugins
|
|
|
bytes
module
|
|
|
cron
module
|
|
|
encoding/base64
module
|
|
|
encoding/csv
module
|
|
|
encoding/gob
module
|
|
|
encoding/json
module
|
|
|
fsnotify
module
|
|
|
http/client
module
|
|
|
hyperloglog
module
|
|
|
ics
module
|
|
|
io
module
|
|
|
iter
module
|
|
|
observability/log
module
|
|
|
observability/logrus
module
|
|
|
observability/sentry
module
|
|
|
observability/slog
module
|
|
|
observability/zap
module
|
|
|
observability/zerolog
module
|
|
|
ozzo/ozzo-validation
module
|
|
|
proc
module
|
|
|
ratelimit/native
module
|
|
|
ratelimit/ulule
module
|
|
|
regexp
module
|
|
|
signal
module
|
|
|
sort
module
|
|
|
stdio
module
|
|
|
strconv
module
|
|
|
strings
module
|
|
|
template
module
|
|
|
testify
module
|
|
|
time
module
|
|
|
websocket/client
module
|
|
