ro

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2026 License: Apache-2.0 Imports: 17 Imported by: 42

README ΒΆ

ro - Streams & reactive programming for Go

tag Go Version GoDoc Build Status Go report Coverage Contributors

A Go implementation of the ReactiveX spec.

The purpose of Reactive Programming is to simplify the development of event-driven and asynchronous applications by providing a declarative and composable way to handle streams of data or events.



cover

See also:

  • samber/lo: A Lodash-style Go library based on Go 1.18+ Generics
  • samber/do: A dependency injection toolkit based on Go 1.18+ Generics
  • samber/mo: Monads based on Go 1.18+ Generics (Option, Result, Either...)

What makes it different from samber/lo?

  • lo: synchronous helpers across finite sequences (maps, slices...)
  • ro: processing of infinite data streams for event-driven scenarios

The Reactive Programming paradigm

Reactive Programming is focused on handling asynchronous data streams where values (like user input, API responses, or sensor data) are emitted over time. Instead of pulling data or waiting for events manually, you react to changes as they occur using Observable, Observer, and Operator. This approach simplifies building systems that are responsive, resilient, and scalable, especially in event-driven or real-time applications.

observable := ro.Pipe(
    ro.RangeWithInterval(0, 5, 1*time.Second),
    ro.Filter(func(x int) bool {
        return x%2 == 0
    }),
    ro.Map(func(x int) string {
        return fmt.Sprintf("even-%d", x)
    }),
)

// Start consuming on subscription
observable.Subscribe(ro.NewObserver(
    func(s string) { fmt.Println(s) },
    func(err error) { fmt.Println(err.Error()) },
    func() { fmt.Println("Completed!") }
))
// Output:
//   "even-0"
//   "even-2"
//   "even-4"
//   "Completed!"

// or:

values, err := ro.Collect(observable)
// []string{"even-0", "even-2", "even-4"}
// <nil>

Now you discovered the paradigm, follow the documentation and turn reactive: πŸš€ Getting started

πŸš€ Install

go get github.com/samber/ro

This library is v0 and follows SemVer strictly.

Some breaking changes might be made to exported APIs before v1.0.0. Experimental packages under exp/ are considered as unstable.

This library has minimal dependencies outside the Go standard library.

Core package

Full documentation here.

The ro library provides all basic operators:

  • Creation operators: The data source, usually the first argument of ro.Pipe
  • Chainable operators: They filter, validate, transform, enrich... messages
    • Transforming operators: They transform items emitted by an Observable
    • Filtering operators: They selectively emit items from a source Observable
    • Conditional operators: Boolean operators
    • Math and aggregation operators: They perform basic math operations
    • Error handling operators: They help to recover from error notifications from an Observable
    • Combining operators: Combine multiple Observable into one
    • Connectable operators: Convert cold into hot Observable
    • Other: manipulation of context, utility, async scheduling...
  • Plugins: External operators (mostly IOs and library wrappers)

Plugins

The ro library provides a rich ecosystem of plugins for various use cases:

Full documentation here.

Data Manipulation
  • Bytes (plugins/bytes) - String and byte slice manipulation operators
  • Strings (plugins/strings) - String manipulation operators
  • Sort (plugins/sort) - Sorting operators
  • Type Conversion (plugins/strconv) - String conversion operators
  • SIMD (plugins/exp/simd) - SIMD-accelerated transformations
Encoding & Serialization
  • JSON (plugins/encoding/json) - JSON marshaling and unmarshaling
  • CSV (plugins/encoding/csv) - CSV reading and writing
  • Base64 (plugins/encoding/base64) - Base64 encoding and decoding
  • Gob (plugins/encoding/gob) - Go binary serialization
Scheduling & Timing
  • Cron (plugins/cron) - Schedule jobs using cron expressions or duration intervals
  • ICS (plugins/ics) - Read and parse ICS/iCal calendars
Network & I/O
  • HTTP (plugins/http) - HTTP request operators
  • I/O (plugins/io) - File and stream I/O operators
  • File System (plugins/fsnotify) - File system monitoring operators
Observability & Logging
  • Log (plugins/observability/log) - Standard logging operators
  • Zap (plugins/observability/zap) - Structured logging with zap
  • Logrus (plugins/observability/logrus) - Structured logging with logrus
  • Slog (plugins/observability/slog) - Structured logging with slog
  • Zerolog (plugins/observability/zerolog) - Structured logging with zerolog
  • Sentry (plugins/observability/sentry) - Error tracking with Sentry
  • Oops (plugins/samber/oops) - Structured error handling
Rate Limiting
  • Native (plugins/ratelimit/native) - Native rate limiting operators
  • Ulule (plugins/ratelimit/ulule) - Rate limiting with ulule/limiter
Text Processing
  • Regular Expressions (plugins/regexp) - Regular expression operators
  • Templates (plugins/template) - Template processing operators
System Integration
  • Process (plugins/proc) - Process execution operators
  • Signal (plugins/signal) - Signal handling operators
  • Iterators (plugins/iter) - Iterator operators
  • PSI (plugins/samber/psi) - Starvation notifier
Data Validation
  • Validation (plugins/ozzo/ozzo-validation) - Data validation operators
Testing
  • Testing (plugins/testify) - Testing utilities
Utilities
  • HyperLogLog (plugins/hyperloglog) - Cardinality estimation operators
  • Hot (plugins/samber/hot) - In-memory cache

πŸ“š Documentation

πŸ‘€ Examples

See the examples directory for complete working examples:

🀝 Contributing

Check the contribution guide.

Don't hesitate ;)

πŸ‘€ Contributors

Contributors

πŸ’« Show your support

Give a ⭐️ if this project helped you!

GitHub Sponsors

License

Copyright Β© 2025 Samuel Berthe.

This project is licensed under the Apache 2.0 License - see the LICENSE file for details.

Note: The ee/ directory contains the Enterprise Edition of the library, which is subject to a custom license. Please refer to the ee/LICENSE.md file for the specific terms and conditions applicable to the Enterprise Edition.

Documentation ΒΆ

Index ΒΆ

Examples ΒΆ

Constants ΒΆ

View Source
const ReplaySubjectUnlimitedBufferSize = -1

ReplaySubjectUnlimitedBufferSize is the unlimited buffer size for a ReplaySubject.

View Source
const UnicastSubjectUnlimitedBufferSize = -1

UnicastSubjectUnlimitedBufferSize is the unlimited buffer size for a UnicastSubject.

Variables ΒΆ

View Source
var (
	//nolint:revive
	ErrRangeWithStepWrongStep                       = errors.New("ro.RangeWithStep: step must be greater than 0")
	ErrRangeWithStepAndIntervalWrongStep            = errors.New("ro.RangeWithStepAndInterval: step must be greater than 0")
	ErrFirstEmpty                                   = errors.New("ro.First: empty")
	ErrLastEmpty                                    = errors.New("ro.Last: empty")
	ErrHeadEmpty                                    = errors.New("ro.First: empty")
	ErrTailEmpty                                    = errors.New("ro.Last: empty")
	ErrTakeWrongCount                               = errors.New("ro.Take: count must be greater or equal to 0")
	ErrTakeLastWrongCount                           = errors.New("ro.TakeLast: count must be greater than 0")
	ErrSkipWrongCount                               = errors.New("ro.Skip: count must be greater or equal to 0")
	ErrSkipLastWrongCount                           = errors.New("ro.SkipLast: count must be greater than 0")
	ErrElementAtWrongNth                            = errors.New("ro.ElementAt: nth must be greater or equal to 0")
	ErrElementAtNotFound                            = errors.New("ro.ElementAt: nth element not found")
	ErrElementAtOrDefaultWrongNth                   = errors.New("ro.ElementAtOrDefault: nth must be greater or equal to 0")
	ErrRepeatWrongCount                             = errors.New("ro.Repeat: count must be greater or equal to 0")
	ErrRepeatWithIntervalWrongCount                 = errors.New("ro.RepeatWithInterval: count must be greater or equal to 0")
	ErrRepeatWithWrongCount                         = errors.New("ro.RepeatWith: count must be greater or equal to 0")
	ErrBufferWithCountWrongSize                     = errors.New("ro.BufferWithCount: size must be greater than 0")
	ErrBufferWithTimeWrongDuration                  = errors.New("ro.BufferWithTime: duration must be greater than 0")
	ErrBufferWithTimeOrCountWrongSize               = errors.New("ro.BufferWithTimeOrCount: size must be greater than 0")
	ErrBufferWithTimeOrCountWrongDuration           = errors.New("ro.BufferWithTimeOrCount: duration must be greater than 0")
	ErrClampLowerLessThanUpper                      = errors.New("ro.Clamp: lower must be less than or equal to upper")
	ErrToChannelWrongSize                           = errors.New("ro.ErrToChannelWrongSize: size must be greater or equal to 0")
	ErrPoolWrongSize                                = errors.New("ro.Pool: size must be greater than 0")
	ErrSubscribeOnWrongBufferSize                   = errors.New("ro.SubscribeOn: buffer size must be greater than 0")
	ErrObserveOnWrongBufferSize                     = errors.New("ro.ObserveOn: buffer size must be greater than 0")
	ErrDetachOnWrongMode                            = errors.New("ro.detachOn: unexpected detach mode")
	ErrUnicastSubjectConcurrent                     = errors.New("ro.UnicastSubject: a single subscriber accepted")
	ErrConnectableObservableMissingConnectorFactory = errors.New("ro.ConnectableObservable: missing connector factory")
)
View Source
var (

	// OnUnhandledError is called when an error is emitted by an Observable and
	// no error handler is registered.
	OnUnhandledError = IgnoreOnUnhandledError
	// OnDroppedNotification is called when a notification is emitted by an Observable and
	// no notification handler is registered.
	OnDroppedNotification = IgnoreOnDroppedNotification
)

Functions ΒΆ

func Abs ΒΆ

func Abs() func(Observable[float64]) Observable[float64]

Abs emits the absolute values emitted by the source Observable. Play: https://go.dev/play/p/WCzxrucg7BC

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[float64]) Teardown {
		observer.Next(-3)
		observer.Next(-2)
		observer.Next(-1)
		observer.Next(0)
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Abs(),
)

subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output:

Next: 3
Next: 2
Next: 1
Next: 0
Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just[float64](-5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5),
	Abs(),
)

subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output:

Next: 5
Next: 4
Next: 3
Next: 2
Next: 1
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Completed

func All ΒΆ

func All[T any](predicate func(T) bool) func(Observable[T]) Observable[bool]

All determines whether all elements of an observable sequence satisfy a condition. Play: https://go.dev/play/p/t22F_crlA-l

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	All(func(i int) bool { return i > 0 }),
)

subscription := observable.Subscribe(PrintObserver[bool]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable1 := Pipe1(
	Just(1, 2, 3, 4, 5),
	All(func(i int) bool { return i > 0 }),
)

subscription1 := observable1.Subscribe(PrintObserver[bool]())
defer subscription1.Unsubscribe()

observable2 := Pipe1(
	Just(1, 2, 3, 4, 5),
	All(func(i int) bool { return i%2 == 0 }),
)

subscription2 := observable2.Subscribe(PrintObserver[bool]())
defer subscription2.Unsubscribe()
Output:

Next: true
Completed
Next: false
Completed

func AllI ΒΆ

func AllI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[bool]

AllI determines whether all elements of an observable sequence satisfy a condition.

func AllIWithContext ΒΆ

func AllIWithContext[T any](predicate func(ctx context.Context, item T, index int64) bool) func(Observable[T]) Observable[bool]

AllIWithContext determines whether all elements of an observable sequence satisfy a condition. Play: https://go.dev/play/p/UkOzE4wQXPG

func AllWithContext ΒΆ

func AllWithContext[T any](predicate func(ctx context.Context, item T) bool) func(Observable[T]) Observable[bool]

AllWithContext determines whether all elements of an observable sequence satisfy a condition. Play: https://go.dev/play/p/NEA7Zi7yVNh

func Average ΒΆ

func Average[T constraints.Numeric]() func(Observable[T]) Observable[float64]

Average calculates the average of the values emitted by the source Observable. It emits the average when the source completes. If the source is empty, it emits NaN. Play: https://go.dev/play/p/B0IhFEsQAin

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Average[int](),
)

subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Average[int](),
)

subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output:

Next: 3
Completed

func BufferWhen ΒΆ

func BufferWhen[T, B any](boundary Observable[B]) func(Observable[T]) Observable[[]T]

BufferWhen buffers the items emitted by an Observable until a second Observable emits an item. Then it emits the buffer and starts a new buffer. It repeats this process until the source Observable completes. If the boundary Observable completes, the buffer is emitted and the source Observable completes. If the source Observable errors, the buffer is emitted and the error is propagated. Play: https://go.dev/play/p/w8c_zuaLl9l

Example (Error) ΒΆ
observable := Pipe1(
	Throw[int64](assert.AnError),
	BufferWhen[int64](Interval(50*time.Millisecond)),
)

subscription := observable.Subscribe(PrintObserver[[]int64]())
defer subscription.Unsubscribe()

time.Sleep(200 * time.Millisecond)
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Interval(30*time.Millisecond),
	BufferWhen[int64](Interval(100*time.Millisecond)),
)

subscription := observable.Subscribe(PrintObserver[[]int64]())
time.Sleep(250 * time.Millisecond)
subscription.Unsubscribe()
Output:

Next: [0 1 2]
Next: [3 4 5]

func BufferWithCount ΒΆ

func BufferWithCount[T any](size int) func(Observable[T]) Observable[[]T]

BufferWithCount buffers the items emitted by an Observable until the buffer is full. Then it emits the buffer and starts a new buffer. It repeats this process until the source Observable completes. If the source Observable errors, the buffer is emitted and the error is propagated. If the source Observable completes, the buffer is emitted and the complete notification is propagated. If the specified count is reached, the buffer is emitted and a new buffer is started. Play: https://go.dev/play/p/IXhDtSybE4R

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		go func() {
			observer.Next(1)
			observer.Next(2)
			observer.Next(3)
			observer.Error(assert.AnError)
			observer.Next(4)
		}()

		return nil
	}),
	BufferWithCount[int](2),
)

subscription := observable.Subscribe(PrintObserver[[]int]())

time.Sleep(10 * time.Millisecond)
subscription.Unsubscribe()
Output:

Next: [1 2]
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	BufferWithCount[int](2),
)

subscription := observable.Subscribe(PrintObserver[[]int]())

time.Sleep(10 * time.Millisecond)
subscription.Unsubscribe()
Output:

Next: [1 2]
Next: [3 4]
Next: [5]
Completed

func BufferWithTime ΒΆ

func BufferWithTime[T any](duration time.Duration) func(Observable[T]) Observable[[]T]

BufferWithTime buffers the items emitted by an Observable for a specified time. It emits the buffer and starts a new buffer. It repeats this process until the source Observable completes. If the source Observable errors, the buffer is emitted and the error is propagated. If the source Observable completes, the buffer is emitted and the complete notification is propagated. If the specified time is reached, the buffer is emitted and a new buffer is started. Play: https://go.dev/play/p/TfOhP-f_O45

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		go func() {
			observer.Next(1)
			time.Sleep(10 * time.Millisecond)
			observer.Next(2)
			time.Sleep(10 * time.Millisecond)
			observer.Next(3)

			time.Sleep(200 * time.Millisecond)
			// 1 empty buffer

			observer.Next(4)
			observer.Error(assert.AnError)
			observer.Next(5)
		}()

		return nil
	}),
	BufferWithTime[int](100*time.Millisecond),
)

subscription := observable.Subscribe(PrintObserver[[]int]())

time.Sleep(300 * time.Millisecond)
subscription.Unsubscribe()
Output:

Next: [1 2 3]
Next: []
Error: assert.AnError general error for testing
Example (Ok) ΒΆ

Commented because i get a weired conflict with other tests.

observable := Pipe1(
	RangeWithInterval(1, 6, 20*time.Millisecond),
	BufferWithTime[int64](70*time.Millisecond),
)

subscription := observable.Subscribe(PrintObserver[[]int64]())

time.Sleep(200 * time.Millisecond)
subscription.Unsubscribe()
Output:

Next: [1 2 3]
Next: [4 5]
Completed

func BufferWithTimeOrCount ΒΆ

func BufferWithTimeOrCount[T any](size int, duration time.Duration) func(Observable[T]) Observable[[]T]

BufferWithTimeOrCount buffers the items emitted by an Observable for a specified time or count. It emits the buffer and starts a new buffer. It repeats this process until the source Observable completes. If the source Observable errors, the buffer is emitted and the error is propagated. If the source Observable completes, the buffer is emitted and the complete notification is propagated. If the specified time or count is reached, the buffer is emitted and a new buffer is started. Play: https://go.dev/play/p/NyiF19jUdQD

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		go func() {
			observer.Next(1)
			observer.Next(2)
			observer.Next(3)
			observer.Error(assert.AnError)
			observer.Next(4)
		}()

		return nil
	}),
	BufferWithTimeOrCount[int](2, 100*time.Millisecond),
)

subscription := observable.Subscribe(PrintObserver[[]int]())

time.Sleep(10 * time.Millisecond)
subscription.Unsubscribe()
Output:

Next: [1 2]
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	BufferWithTimeOrCount[int](2, 100*time.Millisecond),
)

subscription := observable.Subscribe(PrintObserver[[]int]())

time.Sleep(10 * time.Millisecond)
subscription.Unsubscribe()
Output:

Next: [1 2]
Next: [3 4]
Next: [5]
Completed

func Cast ΒΆ

func Cast[T, U any]() func(Observable[T]) Observable[U]

Cast converts each value emitted by an Observable into a specified type. Play: https://go.dev/play/p/XUdqodfFyT6

func Catch ΒΆ

func Catch[T any](finally func(err error) Observable[T]) func(Observable[T]) Observable[T]

Catch catches errors on the observable to be handled by returning a new observable or throwing an error. Play: https://go.dev/play/p/0pVlxwjhdMT

Example ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)
		observer.Complete()

		return nil
	}),
	Catch(func(err error) Observable[int] {
		return Of(4, 5, 6)
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Completed

func Ceil ΒΆ

func Ceil() func(Observable[float64]) Observable[float64]

Ceil emits the ceiling of the values emitted by the source Observable. Play: https://go.dev/play/p/BlpeIki-oMG

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[float64]) Teardown {
		observer.Next(1.1)
		observer.Next(2.5)
		observer.Next(3.9)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Ceil(),
)

subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output:

Next: 2
Next: 3
Next: 4
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1.1, 2.4, 3.5, 4.9, 5.0),
	Ceil(),
)

subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output:

Next: 2
Next: 3
Next: 4
Next: 5
Next: 5
Completed

func CeilWithPrecision ΒΆ

func CeilWithPrecision(places int) func(Observable[float64]) Observable[float64]

CeilWithPrecision emits the ceiling of the values emitted by the source Observable. It uses the provided decimal precision. Positive precisions apply the ceiling to the specified number of digits to the right of the decimal point, while negative precisions round to powers of ten.

func Clamp ΒΆ

func Clamp[T constraints.Numeric](lower, upper T) func(Observable[T]) Observable[T]

Clamp emits the number within the inclusive lower and upper bounds. Play: https://go.dev/play/p/fu8O-BixXPM

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Clamp(2, 4),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 2
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Clamp(2, 4),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 2
Next: 2
Next: 3
Next: 4
Next: 4
Completed

func Collect ΒΆ

func Collect[T any](obs Observable[T]) ([]T, error)

Collect collects all values emitted by the source Observable and returns them as a slice. It waits for the source Observable to complete before returning. If the source Observable emits an error, the error is returned along with the values collected so far.

func CollectWithContext ΒΆ

func CollectWithContext[T any](ctx context.Context, obs Observable[T]) ([]T, context.Context, error)

CollectWithContext collects all values emitted by the source Observable and returns them as a slice. It waits for the source Observable to complete before returning. If the source Observable emits an error, the error is returned along with the values collected so far. @TODO: return more values, such as (isCanceled bool) or (duration time.Duration) ?

func CombineLatestAll ΒΆ

func CombineLatestAll[T any]() func(Observable[Observable[T]]) Observable[[]T]

CombineLatestAll combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/nT1qq9ipwZL

Example (Error) ΒΆ
observable := Pipe1(
	Just(
		RangeWithInterval(1, 3, 50*time.Millisecond),
		Delay[int64](75*time.Millisecond)(Throw[int64](assert.AnError)),
		RangeWithInterval(5, 7, 50*time.Millisecond),
	),
	CombineLatestAll[int64](),
)

subscription := observable.Subscribe(PrintObserver[[]int64]())

time.Sleep(200 * time.Millisecond)

defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(
		RangeWithInterval(1, 3, 40*time.Millisecond),
		RangeWithInterval(3, 5, 60*time.Millisecond),
		Delay[int64](25*time.Millisecond)(RangeWithInterval(5, 7, 100*time.Millisecond)),
	),
	CombineLatestAll[int64](),
)

subscription := observable.Subscribe(PrintObserver[[]int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: [2 4 5]
Next: [2 4 6]
Completed

func CombineLatestAllAny ΒΆ

func CombineLatestAllAny() func(Observable[Observable[any]]) Observable[[]any]

CombineLatestAllAny combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/nKMychGg9KH

Example (Error) ΒΆ
observable1 := Map(func(x int64) any { return x })(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := Delay[any](75 * time.Millisecond)(Throw[any](assert.AnError))
observable3 := Of[any]("a", "b")

combined := Just(observable1, observable2, observable3)
observable := CombineLatestAllAny()(combined)

subscription := observable.Subscribe(PrintObserver[[]any]())

time.Sleep(200 * time.Millisecond)

defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable1 := Map(func(x int64) any { return x })(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := Of[any]("a", "b")
observable3 := Delay[any](25 * time.Millisecond)(Of[any]("c", "d"))

combined := Just(observable1, observable2, observable3)
observable := CombineLatestAllAny()(combined)

subscription := observable.Subscribe(PrintObserver[[]any]())

time.Sleep(200 * time.Millisecond)

defer subscription.Unsubscribe()
Output:

Next: [1 b d]
Next: [2 b d]
Completed

func CombineLatestWith ΒΆ

func CombineLatestWith[A, B any](obsB Observable[B]) func(Observable[A]) Observable[lo.Tuple2[A, B]]

CombineLatestWith combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes.

It is a curried function that takes the other Observable as an argument. Play: https://go.dev/play/p/yq7G8eItuzO

Example (Ok) ΒΆ
observable1 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := RangeWithInterval(3, 5, 50*time.Millisecond)
observable := Pipe2(
	observable1,
	CombineLatestWith[int64](observable2),
	Map(func(snapshot lo.Tuple2[int64, int64]) []int64 {
		return []int64{snapshot.A, snapshot.B}
	}),
)

subscription := observable.Subscribe(PrintObserver[[]int64]())
time.Sleep(200 * time.Millisecond)
subscription.Unsubscribe()
Output:

Next: [1 3]
Next: [1 4]
Next: [2 4]
Completed

func CombineLatestWith1 ΒΆ

func CombineLatestWith1[A, B any](obsB Observable[B]) func(Observable[A]) Observable[lo.Tuple2[A, B]]

CombineLatestWith1 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes.

It is a curried function that takes the other Observable as an argument. Play: https://go.dev/play/p/KXb19PPjCb1

Example (Ok) ΒΆ
observable1 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := RangeWithInterval(3, 5, 50*time.Millisecond)
observable := Pipe1(
	CombineLatestWith1[int64](observable2)(observable1),
	Map(func(snapshot lo.Tuple2[int64, int64]) []int64 {
		return []int64{snapshot.A, snapshot.B}
	}),
)

subscription := observable.Subscribe(PrintObserver[[]int64]())

time.Sleep(200 * time.Millisecond)

defer subscription.Unsubscribe()
Output:

Next: [1 3]
Next: [1 4]
Next: [2 4]
Completed

func CombineLatestWith2 ΒΆ

func CombineLatestWith2[A, B, C any](obsB Observable[B], obsC Observable[C]) func(Observable[A]) Observable[lo.Tuple3[A, B, C]]

CombineLatestWith2 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes.

It is a curried function that takes the other Observable as an argument. Play: https://go.dev/play/p/hPDCDwEOB84

Example (Ok) ΒΆ
observable1 := Delay[int64](150 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := RangeWithInterval(3, 5, 50*time.Millisecond)
observable3 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(5, 7, 50*time.Millisecond))

combined := CombineLatestWith2[int64](observable2, observable3)(observable1)
observable := Map(func(snapshot lo.Tuple3[int64, int64, int64]) []int64 {
	return []int64{snapshot.A, snapshot.B, snapshot.C}
})(combined)

subscription := observable.Subscribe(PrintObserver[[]int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: [1 4 6]
Next: [2 4 6]
Completed

func CombineLatestWith3 ΒΆ

func CombineLatestWith3[A, B, C, D any](obsB Observable[B], obsC Observable[C], obsD Observable[D]) func(Observable[A]) Observable[lo.Tuple4[A, B, C, D]]

CombineLatestWith3 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes.

It is a curried function that takes the other Observable as an argument. Play: https://go.dev/play/p/PcMxo8yakQq

Example (Ok) ΒΆ
observable1 := Delay[int64](175 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := RangeWithInterval(3, 5, 50*time.Millisecond)
observable3 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(5, 7, 50*time.Millisecond))
observable4 := Delay[int64](50 * time.Millisecond)(RangeWithInterval(7, 9, 50*time.Millisecond))

combined := CombineLatestWith3[int64](observable2, observable3, observable4)(observable1)
observable := Map(func(snapshot lo.Tuple4[int64, int64, int64, int64]) []int64 {
	return []int64{snapshot.A, snapshot.B, snapshot.C, snapshot.D}
})(combined)

subscription := observable.Subscribe(PrintObserver[[]int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: [1 4 6 8]
Next: [2 4 6 8]
Completed

func CombineLatestWith4 ΒΆ

func CombineLatestWith4[A, B, C, D, E any](obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E]) func(Observable[A]) Observable[lo.Tuple5[A, B, C, D, E]]

CombineLatestWith4 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes.

It is a curried function that takes the other Observable as an argument.

Example (Ok) ΒΆ
observable1 := Delay[int64](200 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := RangeWithInterval(3, 5, 50*time.Millisecond)
observable3 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(5, 7, 50*time.Millisecond))
observable4 := Delay[int64](50 * time.Millisecond)(RangeWithInterval(7, 9, 50*time.Millisecond))
observable5 := Delay[int64](75 * time.Millisecond)(RangeWithInterval(9, 11, 50*time.Millisecond))

combined := CombineLatestWith4[int64](observable2, observable3, observable4, observable5)(observable1)
observable := Map(func(snapshot lo.Tuple5[int64, int64, int64, int64, int64]) []int64 {
	return []int64{snapshot.A, snapshot.B, snapshot.C, snapshot.D, snapshot.E}
})(combined)

subscription := observable.Subscribe(PrintObserver[[]int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: [1 4 6 8 10]
Next: [2 4 6 8 10]
Completed

func ConcatAll ΒΆ

func ConcatAll[T any]() func(Observable[Observable[T]]) Observable[T]

ConcatAll concatenates the source Observable with other Observables. It subscribes to each inner Observable only after the previous one completes, maintaining their order. It completes when all inner Observables are done. Play: https://go.dev/play/p/zygV4Ld9tcv

Example (Error) ΒΆ
observable := Pipe1(
	Just(
		Just(1, 2, 3),
		Throw[int](assert.AnError),
		Just(4, 5, 6),
	),
	ConcatAll[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())

time.Sleep(30 * time.Millisecond)
subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(
		Just(1, 2, 3),
		Just(4, 5, 6),
	),
	ConcatAll[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())

time.Sleep(30 * time.Millisecond)
subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Completed

func ConcatWith ΒΆ

func ConcatWith[T any](obs ...Observable[T]) func(Observable[T]) Observable[T]

ConcatWith concatenates the source Observable with other Observables. It subscribes to each inner Observable only after the previous one completes, maintaining their order. It completes when all inner Observables are done.

It is a curried function that takes the other Observables as arguments. Play: https://go.dev/play/p/nRHRSR2yNvd

Example (Error) ΒΆ
observable := Pipe1(
	Just(1, 2, 3),
	ConcatWith(Throw[int](assert.AnError)),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3),
	ConcatWith(Just(4, 5, 6)),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Completed

func Contains ΒΆ

func Contains[T any](predicate func(item T) bool) func(Observable[T]) Observable[bool]

Contains determines whether an observable sequence contains a specified element with an equality comparer. Play: https://go.dev/play/p/ldteqqGsMWM

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Contains(func(i int) bool { return i == 4 }),
)

subscription := observable.Subscribe(PrintObserver[bool]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable1 := Pipe1(
	Just(1, 2, 3, 4, 5),
	Contains(func(i int) bool { return i < 0 }),
)

subscription1 := observable1.Subscribe(PrintObserver[bool]())
defer subscription1.Unsubscribe()

observable2 := Pipe1(
	Just(1, 2, 3, 4, 5),
	Contains(func(i int) bool { return i%2 == 0 }),
)

subscription2 := observable2.Subscribe(PrintObserver[bool]())
defer subscription2.Unsubscribe()
Output:

Next: false
Completed
Next: true
Completed

func ContainsI ΒΆ

func ContainsI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[bool]

ContainsI determines whether an observable sequence contains a specified element with an equality comparer.

func ContainsIWithContext ΒΆ

func ContainsIWithContext[T any](predicate func(ctx context.Context, item T, index int64) bool) func(Observable[T]) Observable[bool]

ContainsIWithContext determines whether an observable sequence contains a specified element with an equality comparer. Play: https://go.dev/play/p/TkLfujMVNJb

func ContainsWithContext ΒΆ

func ContainsWithContext[T any](predicate func(ctx context.Context, item T) bool) func(Observable[T]) Observable[bool]

ContainsWithContext determines whether an observable sequence contains a specified element with an equality comparer. Play: https://go.dev/play/p/RPHkyiLrFVW

func ContextMap ΒΆ

func ContextMap[T any](project func(ctx context.Context) context.Context) func(Observable[T]) Observable[T]

ContextMap returns an Observable that emits the same items as the source Observable, but with a new context. The project function is called for each item emitted by the source Observable, and the context is replaced with the context returned by the project function. Play: https://go.dev/play/p/jbshjD3sb6M

func ContextMapI ΒΆ

func ContextMapI[T any](project func(ctx context.Context, index int64) context.Context) func(Observable[T]) Observable[T]

ContextMapI returns an Observable that emits the same items as the source Observable, but with a new context. The project function is called for each item emitted by the source Observable, and the context is replaced with the context returned by the project function. Play: https://go.dev/play/p/jbshjD3sb6M

func ContextReset ΒΆ

func ContextReset[T any](newCtx context.Context) func(Observable[T]) Observable[T]

ContextReset returns an Observable that emits the same items as the source Observable, but with a new context. If the new context is nil, it uses context.Background(). Play: https://go.dev/play/p/PgvV0SejJpH

func ContextWithDeadline ΒΆ

func ContextWithDeadline[T any](deadline time.Time) func(Observable[T]) Observable[T]

ContextWithDeadline returns an Observable that emits the same items as the source Observable, but adds a deadline to the context of each item. This operator should be chained with ThrowOnContextCancel. Play: https://go.dev/play/p/NPYFzhI2YDK

func ContextWithTimeout ΒΆ

func ContextWithTimeout[T any](timeout time.Duration) func(Observable[T]) Observable[T]

ContextWithTimeout returns an Observable that emits the same items as the source Observable, but adds a cancel function to the context of each item. This operator should be chained with ThrowOnContextCancel. Play: https://go.dev/play/p/1qijKGsyn0D

func ContextWithValue ΒΆ

func ContextWithValue[T any](k, v any) func(Observable[T]) Observable[T]

ContextWithValue returns an Observable that emits the same items as the source Observable, but adds a key-value pair to the context of each item. Play: https://go.dev/play/p/l70D6fuiVhK

Example ΒΆ
type contextValue struct{}

observable := Pipe2(
	Just(1, 2, 3, 4, 5),
	ContextWithValue[int](contextValue{}, 42),
	Filter(func(i int) bool {
		return i%2 == 0
	}),
)

subscription := observable.Subscribe(
	OnNextWithContext(func(ctx context.Context, value int) {
		fmt.Printf("Next: %v\n", value)
		fmt.Printf("Next context value: %v\n", ctx.Value(contextValue{}))
	}),
)
defer subscription.Unsubscribe()
Output:

Next: 2
Next context value: 42
Next: 4
Next context value: 42

func Count ΒΆ

func Count[T any]() func(Observable[T]) Observable[int64]

Count counts the number of values emitted by the source Observable. It emits the count when the source completes. Play: https://go.dev/play/p/igtOxOLeHPp

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Count[int](),
)

subscription := observable.Subscribe(PrintObserver[int64]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Count[int](),
)

subscription := observable.Subscribe(PrintObserver[int64]())
defer subscription.Unsubscribe()
Output:

Next: 5
Completed

func DefaultIfEmpty ΒΆ

func DefaultIfEmpty[T any](defaultValue T) func(Observable[T]) Observable[T]

DefaultIfEmpty emits a default value if the source observable emits no items. Play: https://go.dev/play/p/WDh807OLPWv

Example (Error) ΒΆ
observable := Pipe1(
	Throw[int](assert.AnError),
	DefaultIfEmpty(42),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable1 := Pipe1(
	Just(1, 2, 3),
	DefaultIfEmpty(42),
)

subscription1 := observable1.Subscribe(PrintObserver[int]())
defer subscription1.Unsubscribe()

observable2 := Pipe1(
	Empty[int](),
	DefaultIfEmpty(42),
)

subscription2 := observable2.Subscribe(PrintObserver[int]())
defer subscription2.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Completed
Next: 42
Completed

func DefaultIfEmptyWithContext ΒΆ

func DefaultIfEmptyWithContext[T any](defaultCtx context.Context, defaultValue T) func(Observable[T]) Observable[T]

DefaultIfEmptyWithContext emits a default value if the source observable emits no items.

func DefaultOnDroppedNotification ΒΆ

func DefaultOnDroppedNotification(ctx context.Context, notification fmt.Stringer)

DefaultOnDroppedNotification is the default implementation of `OnDroppedNotification`.

Since we cannot assign a generic callback to `OnDroppedNotification`, we had to use a `fmt.Stringer` instead a `Notification[T any]`.

func DefaultOnUnhandledError ΒΆ

func DefaultOnUnhandledError(ctx context.Context, err error)

DefaultOnUnhandledError is the default implementation of `OnUnhandledError`.

func Delay ΒΆ

func Delay[T any](duration time.Duration) func(Observable[T]) Observable[T]

Delay delays the emissions of the source Observable by a given duration without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Error and Complete notifications are delayed as well.

@TODO: set queue size ? Play: https://go.dev/play/p/K3md7WPtZGI

Example (Cancel) ΒΆ
observable := Pipe1(
	Of(1),
	Delay[int](100*time.Millisecond),
)

subscription := observable.Subscribe(PrintObserver[int]())

time.Sleep(50 * time.Millisecond)
subscription.Unsubscribe() // canceled before first message
Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Delay[int](10*time.Millisecond),
)

subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3),
	Delay[int](10*time.Millisecond),
)

subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 1
Next: 2
Next: 3
Completed

func DelayEach ΒΆ

func DelayEach[T any](duration time.Duration) func(Observable[T]) Observable[T]

DelayEach delays the emissions of the source Observable by a given duration without modifying the emitted items. Play: https://go.dev/play/p/dReP7-bffEU

func Dematerialize ΒΆ

func Dematerialize[T any]() func(Observable[Notification[T]]) Observable[T]

Dematerialize converts the source Observable of Notification instances back into a stream of items. Play: https://go.dev/play/p/oRymdDqkh25

Example (Error) ΒΆ
observable := Pipe1(
	Just(
		Notification[int]{Kind: KindNext, Value: 1, Err: nil},
		Notification[int]{Kind: KindNext, Value: 2, Err: nil},
		Notification[int]{Kind: KindNext, Value: 3, Err: nil},
		Notification[int]{Kind: KindError, Value: 0, Err: assert.AnError},
	),
	Dematerialize[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(
		Notification[int]{Kind: KindNext, Value: 1, Err: nil},
		Notification[int]{Kind: KindNext, Value: 2, Err: nil},
		Notification[int]{Kind: KindNext, Value: 3, Err: nil},
		Notification[int]{Kind: KindComplete, Value: 0, Err: nil},
	),
	Dematerialize[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Completed

func Distinct ΒΆ

func Distinct[T comparable]() func(Observable[T]) Observable[T]

Distinct suppresses duplicate items in an Observable. Play: https://go.dev/play/p/szxp8gO0_I7

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(1)
		observer.Next(2)
		observer.Next(2)
		observer.Next(3)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)
		observer.Next(4)

		return nil
	}),
	Distinct[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 1, 2, 2, 3, 3, 4, 4, 5, 5),
	Distinct[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Completed

func DistinctBy ΒΆ

func DistinctBy[T any, K comparable](keySelector func(item T) K) func(Observable[T]) Observable[T]

DistinctBy suppresses duplicate items in an Observable based on a key selector.

Example (Error) ΒΆ
type user struct {
	id   int
	name string
}

observable := Pipe1(
	NewObservable(func(observer Observer[user]) Teardown {
		observer.Next(user{id: 1, name: "John"})
		observer.Next(user{id: 2, name: "Jane"})
		observer.Next(user{id: 1, name: "John"})
		observer.Error(assert.AnError)
		observer.Next(user{id: 3, name: "Jim"})

		return nil
	}),
	DistinctBy(func(item user) int {
		return item.id
	}),
)
subscription := observable.Subscribe(PrintObserver[user]())
defer subscription.Unsubscribe()
Output:

Next: {1 John}
Next: {2 Jane}
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
type user struct {
	id   int
	name string
}

observable := Pipe1(
	Just(
		user{id: 1, name: "John"},
		user{id: 2, name: "Jane"},
		user{id: 1, name: "John"},
		user{id: 3, name: "Jim"},
	),
	DistinctBy(func(item user) int {
		return item.id
	}),
)

subscription := observable.Subscribe(PrintObserver[user]())
defer subscription.Unsubscribe()
Output:

Next: {1 John}
Next: {2 Jane}
Next: {3 Jim}
Completed

func DistinctByWithContext ΒΆ

func DistinctByWithContext[T any, K comparable](keySelector func(ctx context.Context, item T) (context.Context, K)) func(Observable[T]) Observable[T]

DistinctByWithContext suppresses duplicate items in an Observable based on a key selector. The context is passed to the key selector function.

func Do ΒΆ

func Do[T any](onNext func(value T), onError func(err error), onComplete func()) func(Observable[T]) Observable[T]

Do is an alias to Tap. Play: https://go.dev/play/p/s_BSHgxdjUR

func DoOnComplete ΒΆ

func DoOnComplete[T any](onComplete func()) func(Observable[T]) Observable[T]

DoOnComplete is an alias to TapOnComplete.

func DoOnCompleteWithContext ΒΆ

func DoOnCompleteWithContext[T any](onComplete func(ctx context.Context)) func(Observable[T]) Observable[T]

DoOnCompleteWithContext is an alias to TapOnCompleteWithContext.

func DoOnError ΒΆ

func DoOnError[T any](onError func(err error)) func(Observable[T]) Observable[T]

DoOnError is an alias to TapOnError.

func DoOnErrorWithContext ΒΆ

func DoOnErrorWithContext[T any](onError func(ctx context.Context, err error)) func(Observable[T]) Observable[T]

DoOnErrorWithContext is an alias to TapOnErrorWithContext.

func DoOnFinalize ΒΆ

func DoOnFinalize[T any](onFinalize func()) func(Observable[T]) Observable[T]

DoOnFinalize is an alias to TapOnFinalize. Play: https://go.dev/play/p/7en6T1q33WF

func DoOnNext ΒΆ

func DoOnNext[T any](onNext func(value T)) func(Observable[T]) Observable[T]

DoOnNext is an alias to TapOnNext.

func DoOnNextWithContext ΒΆ

func DoOnNextWithContext[T any](onNext func(ctx context.Context, value T)) func(Observable[T]) Observable[T]

DoOnNextWithContext is an alias to TapOnNextWithContext.

func DoOnSubscribe ΒΆ

func DoOnSubscribe[T any](onSubscribe func()) func(Observable[T]) Observable[T]

DoOnSubscribe is an alias to TapOnSubscribe.

func DoOnSubscribeWithContext ΒΆ

func DoOnSubscribeWithContext[T any](onSubscribe func(ctx context.Context)) func(Observable[T]) Observable[T]

DoOnSubscribeWithContext is an alias to TapOnSubscribe.

func DoWhile ΒΆ

func DoWhile[T any](condition func() bool) func(Observable[T]) Observable[T]

DoWhile repeats the source observable while the condition is true. It will complete when the condition is false. It will not emit any values if the source observable is empty. It will not emit any values if the source observable emits an error. Play: https://go.dev/play/p/nEWabaItDpn

Example ΒΆ
i := 0

observable := Pipe1(
	Just(1, 2, 3),
	DoWhile[int](func() bool {
		i++
		return i < 2
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 1
Next: 2
Next: 3
Completed

func DoWhileI ΒΆ

func DoWhileI[T any](condition func(index int64) bool) func(Observable[T]) Observable[T]

DoWhileI repeats the source observable while the condition is true. It will complete when the condition is false. It will not emit any values if the source observable is empty. It will not emit any values if the source observable emits an error. Play: https://go.dev/play/p/cxOA9gimkCq

func DoWhileIWithContext ΒΆ

func DoWhileIWithContext[T any](condition func(ctx context.Context, index int64) (context.Context, bool)) func(Observable[T]) Observable[T]

DoWhileIWithContext repeats the source observable while the condition is true. It will complete when the condition is false. It will not emit any values if the source observable is empty. It will not emit any values if the source observable emits an error. Play: https://go.dev/play/p/yMoCCnnvRRH

func DoWhileWithContext ΒΆ

func DoWhileWithContext[T any](condition func(ctx context.Context) (context.Context, bool)) func(Observable[T]) Observable[T]

DoWhileWithContext repeats the source observable while the condition is true. It will complete when the condition is false. It will not emit any values if the source observable is empty. It will not emit any values if the source observable emits an error.

func DoWithContext ΒΆ

func DoWithContext[T any](onNext func(ctx context.Context, value T), onError func(ctx context.Context, err error), onComplete func(ctx context.Context)) func(Observable[T]) Observable[T]

DoWithContext is an alias to Tap.

func ElementAt ΒΆ

func ElementAt[T any](nth int) func(Observable[T]) Observable[T]

ElementAt emits only the nth item emitted by an Observable. If the source Observable emits fewer than n items, ElementAt will emit an error. Play: https://go.dev/play/p/0YE1tCbPaDg

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	ElementAt[int](10),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (NotFound) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	ElementAt[int](10),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: ro.ElementAt: nth element not found
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	ElementAt[int](2),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 3
Completed

func ElementAtOrDefault ΒΆ

func ElementAtOrDefault[T any](nth int64, fallback T) func(Observable[T]) Observable[T]

ElementAtOrDefault emits only the nth item emitted by an Observable. If the source Observable emits fewer than n items, ElementAtOrDefault will emit a fallback value. Play: https://go.dev/play/p/DWMWPXkc8x4

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	ElementAtOrDefault(10, 100),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (NotFound) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	ElementAtOrDefault(10, 100),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 100
Completed
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	ElementAtOrDefault(2, 100),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 3
Completed

func EndWith ΒΆ

func EndWith[T any](suffixes ...T) func(Observable[T]) Observable[T]

EndWith emits the given values after emitting the values from the source Observable. Play: https://go.dev/play/p/9FPyf3bqJk_n

Example (Error) ΒΆ
observable := Pipe1(
	Throw[int](assert.AnError),
	EndWith(1, 2, 3),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3),
	EndWith(4, 5, 6),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Completed

func Filter ΒΆ

func Filter[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]

Filter emits only those items from an Observable that pass a predicate test. Play: https://go.dev/play/p/gjk_wULxyEW

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Filter(func(i int) bool {
		return i%2 == 0
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 2
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Filter(func(i int) bool {
		return i%2 == 0
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 2
Next: 4
Completed

func FilterI ΒΆ

func FilterI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]

FilterI emits only those items from an Observable that pass a predicate test. Play: https://go.dev/play/p/Y5a2-AicBWO

func FilterIWithContext ΒΆ

func FilterIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool)) func(Observable[T]) Observable[T]

FilterIWithContext emits only those items from an Observable that pass a predicate test. Play: https://go.dev/play/p/xjz-pViifdB

func FilterWithContext ΒΆ

func FilterWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool)) func(Observable[T]) Observable[T]

FilterWithContext emits only those items from an Observable that pass a predicate test. Play: https://go.dev/play/p/y4gstlmx4KR

func Find ΒΆ

func Find[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]

Find returns the first element of an observable sequence that satisfies the condition. Play: https://go.dev/play/p/2f5rn0HoKeq

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Find(func(i int) bool { return i == 4 }),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable1 := Pipe1(
	Just(1, 2, 3, 4, 5),
	Find(func(i int) bool { return i < 0 }),
)

subscription1 := observable1.Subscribe(PrintObserver[int]())
defer subscription1.Unsubscribe()

observable2 := Pipe1(
	Just(1, 2, 3, 4, 5),
	Find(func(i int) bool { return i%2 == 0 }),
)

subscription2 := observable2.Subscribe(PrintObserver[int]())
defer subscription2.Unsubscribe()
Output:

Completed
Next: 2
Completed

func FindI ΒΆ

func FindI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]

FindI returns the first element of an observable sequence that satisfies the condition.

func FindIWithContext ΒΆ

func FindIWithContext[T any](predicate func(ctx context.Context, item T, index int64) bool) func(Observable[T]) Observable[T]

FindIWithContext returns the first element of an observable sequence that satisfies the condition. Play: https://go.dev/play/p/X8oT_CF9IKM

func FindWithContext ΒΆ

func FindWithContext[T any](predicate func(ctx context.Context, item T) bool) func(Observable[T]) Observable[T]

FindWithContext returns the first element of an observable sequence that satisfies the condition. Play: https://go.dev/play/p/BVm-Grgv11w

func First ΒΆ

func First[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]

First emits only the first item emitted by an Observable that satisfies a specified condition. If the source Observable is empty, First will emit an error. Play: https://go.dev/play/p/yneVKit6vh0

Example (Error) ΒΆ
observable1 := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	First(func(n int) bool {
		return n > 2
	}),
)

subscription1 := observable1.Subscribe(PrintObserver[int]())
defer subscription1.Unsubscribe()

observable2 := Pipe1(
	Throw[int](assert.AnError), // no item transmitted
	First(func(n int) bool {
		return n > 2
	}),
)

subscription2 := observable2.Subscribe(PrintObserver[int]())
defer subscription2.Unsubscribe()
Output:

Next: 3
Completed
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	First(func(n int) bool {
		return n > 2
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 3
Completed

func FirstI ΒΆ

func FirstI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]

FirstI emits only the first item emitted by an Observable that satisfies a specified condition. If the source Observable is empty, FirstI will emit an error.

func FirstIWithContext ΒΆ

func FirstIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool)) func(Observable[T]) Observable[T]

FirstIWithContext emits only the first item emitted by an Observable that satisfies a specified condition. If the source Observable is empty, FirstI will emit an error.

func FirstWithContext ΒΆ

func FirstWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool)) func(Observable[T]) Observable[T]

FirstWithContext emits only the first item emitted by an Observable that satisfies a specified condition. If the source Observable is empty, First will emit an error.

func FlatMap ΒΆ

func FlatMap[T, R any](project func(item T) Observable[R]) func(Observable[T]) Observable[R]

FlatMap transforms the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable. Play: https://go.dev/play/p/QBkDMwskibT

Example (Error) ΒΆ
observable1 := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	FlatMap(func(item int) Observable[int] {
		return Just(item, item)
	}),
)

subscription1 := observable1.Subscribe(PrintObserver[int]())
defer subscription1.Unsubscribe()

observable2 := Pipe1(
	Just(1, 2, 3),
	FlatMap(func(item int) Observable[int] {
		if item == 2 {
			return Throw[int](assert.AnError)
		}

		return Just(item, item)
	}),
)

subscription2 := observable2.Subscribe(PrintObserver[int]())
defer subscription2.Unsubscribe()
Output:

Next: 1
Next: 1
Next: 2
Next: 2
Next: 3
Next: 3
Error: assert.AnError general error for testing
Next: 1
Next: 1
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3),
	FlatMap(func(item int) Observable[int] {
		return Just(item, item)
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 1
Next: 2
Next: 2
Next: 3
Next: 3
Completed

func FlatMapI ΒΆ

func FlatMapI[T, R any](project func(item T, index int64) Observable[R]) func(Observable[T]) Observable[R]

FlatMapI transforms the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable. Play: https://go.dev/play/p/H04QF1dltPI

func FlatMapIWithContext ΒΆ

func FlatMapIWithContext[T, R any](project func(ctx context.Context, item T, index int64) Observable[R]) func(Observable[T]) Observable[R]

FlatMapIWithContext transforms the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable. Play: https://go.dev/play/p/BCv4krqHEhI

func FlatMapWithContext ΒΆ

func FlatMapWithContext[T, R any](project func(ctx context.Context, item T) Observable[R]) func(Observable[T]) Observable[R]

FlatMapWithContext transforms the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable. Play: https://go.dev/play/p/lE04v4_lJ7M

func Flatten ΒΆ

func Flatten[T any]() func(Observable[[]T]) Observable[T]

Flatten flattens an Observable of Observables into a single Observable. Play: https://go.dev/play/p/vUyrQ4GO87S

func Floor ΒΆ

func Floor() func(Observable[float64]) Observable[float64]

Floor emits the floor of the values emitted by the source Observable. Play: https://go.dev/play/p/UulGlomv9K5

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[float64]) Teardown {
		observer.Next(1.1)
		observer.Next(2.5)
		observer.Next(3.9)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Floor(),
)

subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1.1, 2.4, 3.5, 4.9, 5.0),
	Floor(),
)

subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Completed

func FloorWithPrecision ΒΆ

func FloorWithPrecision(places int) func(Observable[float64]) Observable[float64]

FloorWithPrecision emits the floored values with decimal precision applied before flooring. The `places` parameter controls the decimal precision:

  • positive `places` applies flooring to that many digits to the right of the decimal point (e.g. places=2 turns 1.234 -> 1.23),
  • zero behaves like `Floor()` (floor to integer),
  • negative `places` floors to powers of ten (e.g. places=-1 turns 123.45 -> 120).

For very large precision magnitudes the operator uses chunked big.Float arithmetic to avoid overflow. If the requested precision exceeds internal chunking caps the implementation will intentionally return the original source observable (no-op) to avoid unbounded allocations; callers should avoid extremely large `places` values for performance reasons.

Example ΒΆ
observable := Pipe1(
	Just(3.14159, 2.71828, -1.2345),
	FloorWithPrecision(2),
)

subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output:

Next: 3.14
Next: 2.71
Next: -1.24
Completed
Example (SmallNumbers) ΒΆ
observable := Pipe1(
	Just(0.000123, -0.000987, math.Inf(1), math.NaN()),
	FloorWithPrecision(4),
)

subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output:

Next: 0.0001
Next: -0.001
Next: +Inf
Next: NaN
Completed

func GroupBy ΒΆ

func GroupBy[T any, K comparable](iteratee func(item T) K) func(Observable[T]) Observable[Observable[T]]

GroupBy groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as Observables. Play: https://go.dev/play/p/GOL8imC0H5S

Example (Error) ΒΆ
odd := func(v int) bool { return v%2 == 0 }

observable := Pipe2(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		time.Sleep(5 * time.Millisecond)
		observer.Next(2)
		time.Sleep(5 * time.Millisecond)
		observer.Next(3)
		time.Sleep(5 * time.Millisecond)
		observer.Error(assert.AnError)
		time.Sleep(5 * time.Millisecond)
		observer.Next(4)

		return nil
	}),
	GroupBy(odd),
	MergeAll[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
odd := func(v int64) bool { return v%2 == 0 }

observable := Pipe2(
	RangeWithInterval(1, 5, 10*time.Millisecond),
	GroupBy(odd),
	MergeAll[int64](),
)

subscription := observable.Subscribe(PrintObserver[int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Completed

func GroupByI ΒΆ

func GroupByI[T any, K comparable](iteratee func(item T, index int64) K) func(Observable[T]) Observable[Observable[T]]

GroupByI groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as Observables.

func GroupByIWithContext ΒΆ

func GroupByIWithContext[T any, K comparable](iteratee func(ctx context.Context, item T, index int64) (context.Context, K)) func(Observable[T]) Observable[Observable[T]]

GroupByIWithContext groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as Observables. Play: https://go.dev/play/p/h7vpeD0djre

func GroupByWithContext ΒΆ

func GroupByWithContext[T any, K comparable](iteratee func(ctx context.Context, item T) (context.Context, K)) func(Observable[T]) Observable[Observable[T]]

GroupByWithContext groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as Observables.

func Head[T any]() func(Observable[T]) Observable[T]

Head emits only the first item emitted by an Observable. If the source Observable is empty, Head will emit an error. Play: https://go.dev/play/p/TmhTvpuKAp_U

Example (Error) ΒΆ
observable1 := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Head[int](),
)

subscription1 := observable1.Subscribe(PrintObserver[int]())
defer subscription1.Unsubscribe()

observable2 := Pipe1(
	Throw[int](assert.AnError), // no item transmitted
	Head[int](),
)

subscription2 := observable2.Subscribe(PrintObserver[int]())
defer subscription2.Unsubscribe()
Output:

Next: 1
Completed
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Head[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Completed

func IgnoreElements ΒΆ

func IgnoreElements[T any]() func(Observable[T]) Observable[T]

IgnoreElements does not emit any items from an Observable but mirrors its termination notification. It is useful for ignoring all the items from an Observable but you want to be notified when it completes or when it throws an error. Play: https://go.dev/play/p/glDG6E-gZ1V

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	IgnoreElements[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	IgnoreElements[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Completed

func IgnoreOnDroppedNotification ΒΆ

func IgnoreOnDroppedNotification(ctx context.Context, notification fmt.Stringer)

IgnoreOnDroppedNotification is the default implementation of `OnDroppedNotification`.

func IgnoreOnUnhandledError ΒΆ

func IgnoreOnUnhandledError(ctx context.Context, err error)

IgnoreOnUnhandledError is the default implementation of `OnUnhandledError`.

func Iif ΒΆ

func Iif[T any](predicate func() bool, source1, source2 Observable[T]) func() Observable[T]

Iif determines which one of two observables to return based on a condition. Play: https://go.dev/play/p/t-sNgL5EZA-

Example (Error) ΒΆ
observable := Iif(
	func() bool {
		return false
	},
	Just(1, 2, 3),
	Throw[int](assert.AnError),
)()

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Iif(
	func() bool {
		return true
	},
	Just(1, 2, 3),
	Just(4, 5, 6),
)()

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Completed

func Last ΒΆ

func Last[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]

Last emits only the last item emitted by an Observable that satisfies a specified condition. If the source Observable is empty, Last will emit an error. Play: https://go.dev/play/p/aMsvsTPbmHY

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Last(func(n int) bool {
		return n > 2
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Last(func(n int) bool {
		return n > 2
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 5
Completed

func LastI ΒΆ

func LastI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]

LastI emits only the last item emitted by an Observable that satisfies a specified condition. If the source Observable is empty, LastI will emit an error.

func LastIWithContext ΒΆ

func LastIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool)) func(Observable[T]) Observable[T]

LastIWithContext emits only the last item emitted by an Observable that satisfies a specified condition. If the source Observable is empty, LastI will emit an error.

func LastWithContext ΒΆ

func LastWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool)) func(Observable[T]) Observable[T]

LastWithContext emits only the last item emitted by an Observable that satisfies a specified condition. If the source Observable is empty, Last will emit an error.

func Map ΒΆ

func Map[T, R any](project func(item T) R) func(Observable[T]) Observable[R]

Map applies a given project function to each item emitted by an Observable and emits the result. Play: https://go.dev/play/p/JhTBEQFQGYr

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Map(func(x int) int {
		return x * 2
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 2
Next: 4
Next: 6
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Map(func(x int) int {
		return x * 2
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 2
Next: 4
Next: 6
Next: 8
Next: 10
Completed

func MapErr ΒΆ

func MapErr[T, R any](project func(item T) (R, error)) func(Observable[T]) Observable[R]

MapErr applies a given project function to each item emitted by an Observable and emits the result. Play: https://go.dev/play/p/x7-KC-SDXr1

Example (Error) ΒΆ
observable1 := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	MapErr(func(item int) (string, error) {
		return "Hey!", nil
	}),
)

subscription1 := observable1.Subscribe(PrintObserver[string]())
defer subscription1.Unsubscribe()

observable2 := Pipe1(
	Just(1, 2, 3, 4, 5),
	MapErr(func(item int) (string, error) {
		if item == 2 {
			return "Hey!", assert.AnError
		}

		return "Hey!", nil
	}),
)

subscription2 := observable2.Subscribe(PrintObserver[string]())
defer subscription2.Unsubscribe()
Output:

Next: Hey!
Next: Hey!
Next: Hey!
Error: assert.AnError general error for testing
Next: Hey!
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3),
	MapErr(func(item int) (string, error) {
		return "Hey!", nil
	}),
)

subscription := observable.Subscribe(PrintObserver[string]())
defer subscription.Unsubscribe()
Output:

Next: Hey!
Next: Hey!
Next: Hey!
Completed

func MapErrI ΒΆ

func MapErrI[T, R any](project func(item T, index int64) (R, error)) func(Observable[T]) Observable[R]

MapErrI applies a given project function to each item emitted by an Observable and emits the result. Play: https://go.dev/play/p/IAZJ9eQhNqN

func MapErrIWithContext ΒΆ

func MapErrIWithContext[T, R any](project func(ctx context.Context, item T, index int64) (R, context.Context, error)) func(Observable[T]) Observable[R]

MapErrIWithContext applies a given project function to each item emitted by an Observable and emits the result. Play: https://go.dev/play/p/OO8FayqJesp

func MapErrWithContext ΒΆ

func MapErrWithContext[T, R any](project func(ctx context.Context, item T) (R, context.Context, error)) func(Observable[T]) Observable[R]

MapErrWithContext applies a given project function to each item emitted by an Observable and emits the result. Play: https://go.dev/play/p/EIGtDpPq5y-

func MapI ΒΆ

func MapI[T, R any](project func(item T, index int64) R) func(Observable[T]) Observable[R]

MapI applies a given project function to each item emitted by an Observable and emits the result. Play: https://go.dev/play/p/F8IKEdyC4sl

func MapIWithContext ΒΆ

func MapIWithContext[T, R any](project func(ctx context.Context, item T, index int64) (context.Context, R)) func(Observable[T]) Observable[R]

MapIWithContext applies a given project function to each item emitted by an Observable and emits the result. Play: https://go.dev/play/p/dDFC9SU3FF1

func MapTo ΒΆ

func MapTo[T, R any](output R) func(Observable[T]) Observable[R]

MapTo emits a constant value for each item emitted by an Observable. Play: https://go.dev/play/p/Ghc5ar7GJag

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	MapTo[int]("Hey!"),
)

subscription := observable.Subscribe(PrintObserver[string]())
defer subscription.Unsubscribe()
Output:

Next: Hey!
Next: Hey!
Next: Hey!
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe2(
	Just(1, 2, 3, 4, 5),
	MapTo[int]("Hey!"),
	Take[string](3),
)

subscription := observable.Subscribe(PrintObserver[string]())
defer subscription.Unsubscribe()
Output:

Next: Hey!
Next: Hey!
Next: Hey!
Completed

func MapWithContext ΒΆ

func MapWithContext[T, R any](project func(ctx context.Context, item T) (context.Context, R)) func(Observable[T]) Observable[R]

MapWithContext applies a given project function to each item emitted by an Observable and emits the result. Play: https://go.dev/play/p/b6i0jQenObW

func Materialize ΒΆ

func Materialize[T any]() func(Observable[T]) Observable[Notification[T]]

Materialize converts the source Observable into a stream of Notification instances. Play: https://go.dev/play/p/ZHtPviPoqWK

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Materialize[int](),
)

subscription := observable.Subscribe(PrintObserver[Notification[int]]())
defer subscription.Unsubscribe()
Output:

Next: Next(1)
Next: Next(2)
Next: Next(3)
Next: Error(assert.AnError general error for testing)
Completed
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3),
	Materialize[int](),
)

subscription := observable.Subscribe(PrintObserver[Notification[int]]())
defer subscription.Unsubscribe()
Output:

Next: Next(1)
Next: Next(2)
Next: Next(3)
Next: Complete()
Completed

func Max ΒΆ

func Max[T constraints.Numeric]() func(Observable[T]) Observable[T]

Max emits the maximum value emitted by the source Observable. It emits the maximum value when the source completes. If the source is empty, it emits no value. Play: https://go.dev/play/p/wWljVN6i1Ip

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Max[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Max[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 5
Completed

func MergeAll ΒΆ

func MergeAll[T any]() func(Observable[Observable[T]]) Observable[T]

MergeAll converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables. It subscribes to each inner Observable as they arrive, and emits all values from each inner Observable, maintaining their order. It completes when all inner Observables are done. Play: https://go.dev/play/p/m3nHZZJbwMF

Example (Error) ΒΆ
observable := Pipe1(
	Just(
		Delay[int](10*time.Millisecond)(Just(1)),
		Delay[int](50*time.Millisecond)(Throw[int](assert.AnError)),
		Delay[int](100*time.Millisecond)(Just(3)),
	),
	MergeAll[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())

time.Sleep(300 * time.Millisecond)

defer subscription.Unsubscribe()
Output:

Next: 1
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(
		Just(1),
		Delay[int](20*time.Millisecond)(Just(2)),
		Delay[int](40*time.Millisecond)(Just(3)),
	),
	MergeAll[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())

time.Sleep(60 * time.Millisecond)

defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Completed

func MergeMap ΒΆ

func MergeMap[T, R any](projection func(item T) Observable[R]) func(Observable[T]) Observable[R]

MergeMap applies a projection function to each item emitted by the source Observable and then merges the results into a single Observable. Play: https://go.dev/play/p/NwEyrLITshG

Example (Error) ΒΆ
observable := Pipe1(
	Just("a", "bb", "ccc"),
	MergeMap(func(item string) Observable[string] {
		if item == "bb" {
			return Throw[string](assert.AnError)
		}
		return Delay[string](time.Duration(len(item)) * 50 * time.Millisecond)(Just(strings.ToUpper(item)))
	}),
)
subscription := observable.Subscribe(PrintObserver[string]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just("a", "bb", "ccc"),
	MergeMap(func(item string) Observable[string] {
		return Delay[string](time.Duration(len(item)) * 50 * time.Millisecond)(Just(strings.ToUpper(item)))
	}),
)
subscription := observable.Subscribe(PrintObserver[string]())
time.Sleep(200 * time.Millisecond)
defer subscription.Unsubscribe()
Output:

Next: A
Next: BB
Next: CCC
Completed

func MergeMapI ΒΆ

func MergeMapI[T, R any](projection func(item T, index int64) Observable[R]) func(Observable[T]) Observable[R]

MergeMapI applies a projection function to each item emitted by the source Observable and then merges the results into a single Observable. Play: https://go.dev/play/p/dPDI7ch4g0i

func MergeMapIWithContext ΒΆ

func MergeMapIWithContext[T, R any](projection func(ctx context.Context, item T, index int64) (context.Context, Observable[R])) func(Observable[T]) Observable[R]

MergeMapIWithContext applies a projection function to each item emitted by the source Observable and then merges the results into a single Observable. Play: https://go.dev/play/p/8Ih5mCaDbB8

func MergeMapWithContext ΒΆ

func MergeMapWithContext[T, R any](projection func(ctx context.Context, item T) Observable[R]) func(Observable[T]) Observable[R]

MergeMapWithContext applies a projection function to each item emitted by the source Observable and then merges the results into a single Observable. Play: https://go.dev/play/p/i2Ru9sUdL-x

func MergeWith ΒΆ

func MergeWith[T any](observables ...Observable[T]) func(Observable[T]) Observable[T]

MergeWith merges the values from all observables to a single observable result. It subscribes to each inner Observable, and emits all values from each inner Observable, maintaining their order. It completes when all inner Observables are done.

It is a curried function that takes the first Observable as an argument. Play: https://go.dev/play/p/6QpUzcdRWJl

Example (Error) ΒΆ
observable := Pipe1(
	Throw[int](assert.AnError),
	MergeWith(Just(1, 2, 3, 4)),
)

subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2),
	MergeWith(
		Delay[int](20*time.Millisecond)(Just(3, 4)),
		Delay[int](40*time.Millisecond)(Just(5, 6)),
	),
)

subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Completed

func MergeWith1 ΒΆ

func MergeWith1[T any](obsB Observable[T]) func(Observable[T]) Observable[T]

MergeWith1 merges the values from all observables to a single observable result. It subscribes to each inner Observable, and emits all values from each inner Observable, maintaining their order. It completes when all inner Observables are done.

It is a curried function that takes the first Observable as an argument. Play: https://go.dev/play/p/P47lkUFpYq7

Example (Error) ΒΆ
observable := Pipe1(
	Delay[int](20*time.Millisecond)(Throw[int](assert.AnError)),
	MergeWith(Just(1)),
)

subscription := observable.Subscribe(PrintObserver[int]())

time.Sleep(30 * time.Millisecond)

defer subscription.Unsubscribe()
Output:

Next: 1
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Delay[int](20*time.Millisecond)(Just(2)),
	MergeWith(Just(1)),
)

subscription := observable.Subscribe(PrintObserver[int]())

time.Sleep(30 * time.Millisecond)

defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Completed

func MergeWith2 ΒΆ

func MergeWith2[T any](obsB, obsC Observable[T]) func(Observable[T]) Observable[T]

MergeWith2 merges the values from all observables to a single observable result. It subscribes to each inner Observable, and emits all values from each inner Observable, maintaining their order. It completes when all inner Observables are done.

It is a curried function that takes the first Observable as an argument. Play: https://go.dev/play/p/LOQ3YbuDyC9

Example (Error) ΒΆ
observable := Pipe1(
	Delay[int](50*time.Millisecond)(Throw[int](assert.AnError)),
	MergeWith2(
		Just(1, 2),
		Delay[int](25*time.Millisecond)(Just(3)),
	),
)

subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Delay[int](50*time.Millisecond)(Just(4, 5)),
	MergeWith2(
		Just(1, 2),
		Delay[int](25*time.Millisecond)(Just(3)),
	),
)

subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Completed

func MergeWith3 ΒΆ

func MergeWith3[T any](obsB, obsC, obsD Observable[T]) func(Observable[T]) Observable[T]

MergeWith3 merges the values from all observables to a single observable result. It subscribes to each inner Observable, and emits all values from each inner Observable, maintaining their order. It completes when all inner Observables are done.

It is a curried function that takes the first Observable as an argument. Play: https://go.dev/play/p/pMQ5bNOlWj9

Example (Error) ΒΆ
observable := Pipe1(
	Delay[int](75*time.Millisecond)(Throw[int](assert.AnError)),
	MergeWith3(
		Just(1, 2),
		Delay[int](25*time.Millisecond)(Just(3, 4)),
		Delay[int](50*time.Millisecond)(Just(5, 6)),
	),
)
subscription := observable.Subscribe(PrintObserver[int]())
time.Sleep(100 * time.Millisecond)
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Delay[int](75*time.Millisecond)(Just(7, 8)),
	MergeWith3(
		Just(1, 2),
		Delay[int](25*time.Millisecond)(Just(3, 4)),
		Delay[int](50*time.Millisecond)(Just(5, 6)),
	),
)

subscription := observable.Subscribe(PrintObserver[int]())

time.Sleep(100 * time.Millisecond)
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Next: 7
Next: 8
Completed

func MergeWith4 ΒΆ

func MergeWith4[T any](obsB, obsC, obsD, obsE Observable[T]) func(Observable[T]) Observable[T]

MergeWith4 merges the values from all observables to a single observable result. It subscribes to each inner Observable, and emits all values from each inner Observable, maintaining their order. It completes when all inner Observables are done.

It is a curried function that takes the first Observable as an argument. Play: https://go.dev/play/p/FvJTHVOe52s

Example (Error) ΒΆ
observable := Pipe1(
	Delay[int](100*time.Millisecond)(Throw[int](assert.AnError)),
	MergeWith4(
		Just(1, 2),
		Delay[int](25*time.Millisecond)(Just(3, 4)),
		Delay[int](50*time.Millisecond)(Just(5, 6)),
		Delay[int](75*time.Millisecond)(Just(7, 8)),
	),
)
subscription := observable.Subscribe(PrintObserver[int]())
time.Sleep(120 * time.Millisecond)
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Next: 7
Next: 8
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Delay[int](100*time.Millisecond)(Just(9, 10)),
	MergeWith4(
		Just(1, 2),
		Delay[int](25*time.Millisecond)(Just(3, 4)),
		Delay[int](50*time.Millisecond)(Just(5, 6)),
		Delay[int](75*time.Millisecond)(Just(7, 8)),
	),
)
subscription := observable.Subscribe(PrintObserver[int]())
time.Sleep(120 * time.Millisecond)
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Next: 7
Next: 8
Next: 9
Next: 10
Completed

func MergeWith5 ΒΆ

func MergeWith5[T any](obsB, obsC, obsD, obsE, obsF Observable[T]) func(Observable[T]) Observable[T]

MergeWith5 merges the values from all observables to a single observable result. It subscribes to each inner Observable, and emits all values from each inner Observable, maintaining their order. It completes when all inner Observables are done.

It is a curried function that takes the first Observable as an argument. Play: https://go.dev/play/p/kR3rFF7Bw-i

Example (Error) ΒΆ
observable := Pipe1(
	Delay[int](125*time.Millisecond)(Throw[int](assert.AnError)),
	MergeWith5(
		Just(1, 2),
		Delay[int](25*time.Millisecond)(Just(3, 4)),
		Delay[int](50*time.Millisecond)(Just(5, 6)),
		Delay[int](75*time.Millisecond)(Just(7, 8)),
		Delay[int](100*time.Millisecond)(Just(9, 10)),
	),
)
subscription := observable.Subscribe(PrintObserver[int]())
time.Sleep(150 * time.Millisecond)
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Next: 7
Next: 8
Next: 9
Next: 10
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Delay[int](125*time.Millisecond)(Just(11, 12)),
	MergeWith5(
		Just(1, 2),
		Delay[int](25*time.Millisecond)(Just(3, 4)),
		Delay[int](50*time.Millisecond)(Just(5, 6)),
		Delay[int](75*time.Millisecond)(Just(7, 8)),
		Delay[int](100*time.Millisecond)(Just(9, 10)),
	),
)
subscription := observable.Subscribe(PrintObserver[int]())
time.Sleep(150 * time.Millisecond)
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Next: 7
Next: 8
Next: 9
Next: 10
Next: 11
Next: 12
Completed

func Min ΒΆ

func Min[T constraints.Numeric]() func(Observable[T]) Observable[T]

Min emits the minimum value emitted by the source Observable. It emits the minimum value when the source completes. If the source is empty, it emits no value. Play: https://go.dev/play/p/SPK3L-NvZ98

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Min[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Min[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Completed

func NewScheduler ΒΆ

func NewScheduler()

NewScheduler just trolls other languages. 😈 https://reactivex.io/documentation/scheduler.html

func ObserveOn ΒΆ

func ObserveOn[T any](bufferSize int) func(Observable[T]) Observable[T]

ObserveOn schedule the downstream flow to a different goroutine. Next, Error and Complete notifications are sent to a queue first, then the consumer consume this queue. ObserveOn converts a push-based Observable into a pullable stream with backpressure capabilities.

To schedule the upstream flow to a different goroutine, refer to SubscribeOn.

When an Observable emits values faster than they can be consumed, ObserveOn buffers these values in a queue of specified capacity. This allows downstream consumers to pull values at their own pace while managing backpressure from upstream emissions.

Note: Once the buffer reaches its capacity, upstream emissions will block until space becomes available, effectively implementing backpressure control.

@TODO: add a backpressure policy ? drop vs block. Play: https://go.dev/play/p/BpdKJ6Mya03

func OnErrorResumeNextWith ΒΆ

func OnErrorResumeNextWith[T any](finally ...Observable[T]) func(Observable[T]) Observable[T]

OnErrorResumeNextWith instructs an Observable to begin emitting a second Observable sequence if it encounters an error or completes. It immediately subscribes to the next one that was passed. Play: https://go.dev/play/p/9XLTAOginbK

Example ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)
		observer.Complete()

		return nil
	}),
	OnErrorResumeNextWith(Of(4, 5, 6)),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Completed

func OnErrorReturn ΒΆ

func OnErrorReturn[T any](finally T) func(Observable[T]) Observable[T]

OnErrorReturn instructs an Observable to emit a particular item when it encounters an error. It will then complete the sequence. Play: https://go.dev/play/p/d_9xe1oedjU

Example ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)
		observer.Complete()

		return nil
	}),
	OnErrorReturn(42),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 42
Completed

func Pairwise ΒΆ

func Pairwise[T any]() func(Observable[T]) Observable[[]T]

Pairwise emits the previous and current values as a pair of two values. Play: https://go.dev/play/p/0YujgFTL4e0

Example (Error) ΒΆ
observable := Pipe1(
	Throw[int](assert.AnError),
	Pairwise[int](),
)

subscription := observable.Subscribe(PrintObserver[[]int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
obsercable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Pairwise[int](),
)

subscription := obsercable.Subscribe(PrintObserver[[]int]())
defer subscription.Unsubscribe()
Output:

Next: [1 2]
Next: [2 3]
Next: [3 4]
Next: [4 5]
Completed

func PipeOp ΒΆ

func PipeOp[First, Last any](operators ...any) func(Observable[First]) Observable[Last]

PipeOp is similar to Pipe, but can be used as an operator.

Example ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Map(func(x int) int {
		return x + 1
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Completed

func PipeOp1 ΒΆ

func PipeOp1[A, B any](
	operator1 func(Observable[A]) Observable[B],
) func(Observable[A]) Observable[B]

PipeOp1 is similar to Pipe1, but can be used as an operator.

func PipeOp2 ΒΆ

func PipeOp2[A, B, C any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
) func(Observable[A]) Observable[C]

PipeOp2 is similar to Pipe2, but can be used as an operator.

func PipeOp3 ΒΆ

func PipeOp3[A, B, C, D any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
) func(Observable[A]) Observable[D]

PipeOp3 is similar to Pipe3, but can be used as an operator.

func PipeOp4 ΒΆ

func PipeOp4[A, B, C, D, E any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
) func(Observable[A]) Observable[E]

PipeOp4 is similar to Pipe4, but can be used as an operator.

Example ΒΆ
observable := Pipe3(
	Just(1, 2, 3, 4, 5),
	Map(func(x int) int {
		return x * 2
	}),
	Filter(func(x int) bool {
		return x%2 == 0
	}),
	Take[int](2),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 2
Next: 4
Completed

func PipeOp5 ΒΆ

func PipeOp5[A, B, C, D, E, F any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
) func(Observable[A]) Observable[F]

PipeOp5 is similar to Pipe5, but can be used as an operator.

func PipeOp6 ΒΆ

func PipeOp6[A, B, C, D, E, F, G any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
) func(Observable[A]) Observable[G]

PipeOp6 is similar to Pipe6, but can be used as an operator.

func PipeOp7 ΒΆ

func PipeOp7[A, B, C, D, E, F, G, H any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
) func(Observable[A]) Observable[H]

PipeOp7 is similar to Pipe7, but can be used as an operator.

func PipeOp8 ΒΆ

func PipeOp8[A, B, C, D, E, F, G, H, I any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
) func(Observable[A]) Observable[I]

PipeOp8 is similar to Pipe8, but can be used as an operator.

func PipeOp9 ΒΆ

func PipeOp9[A, B, C, D, E, F, G, H, I, J any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
) func(Observable[A]) Observable[J]

PipeOp9 is similar to Pipe9, but can be used as an operator.

func PipeOp10 ΒΆ

func PipeOp10[A, B, C, D, E, F, G, H, I, J, K any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
) func(Observable[A]) Observable[K]

PipeOp10 is similar to Pipe10, but can be used as an operator.

func PipeOp11 ΒΆ

func PipeOp11[A, B, C, D, E, F, G, H, I, J, K, L any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
) func(Observable[A]) Observable[L]

PipeOp11 is similar to Pipe11, but can be used as an operator.

func PipeOp12 ΒΆ

func PipeOp12[A, B, C, D, E, F, G, H, I, J, K, L, M any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
) func(Observable[A]) Observable[M]

PipeOp12 is similar to Pipe12, but can be used as an operator.

func PipeOp13 ΒΆ

func PipeOp13[A, B, C, D, E, F, G, H, I, J, K, L, M, N any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
) func(Observable[A]) Observable[N]

PipeOp13 is similar to Pipe13, but can be used as an operator.

func PipeOp14 ΒΆ

func PipeOp14[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
) func(Observable[A]) Observable[O]

PipeOp14 is similar to Pipe14, but can be used as an operator.

func PipeOp15 ΒΆ

func PipeOp15[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
) func(Observable[A]) Observable[P]

PipeOp15 is similar to Pipe15, but can be used as an operator.

func PipeOp16 ΒΆ

func PipeOp16[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
) func(Observable[A]) Observable[Q]

PipeOp16 is similar to Pipe16, but can be used as an operator.

func PipeOp17 ΒΆ

func PipeOp17[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
) func(Observable[A]) Observable[R]

PipeOp17 is similar to Pipe17, but can be used as an operator.

func PipeOp18 ΒΆ

func PipeOp18[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
	operator18 func(Observable[R]) Observable[S],
) func(Observable[A]) Observable[S]

PipeOp18 is similar to Pipe18, but can be used as an operator.

func PipeOp19 ΒΆ

func PipeOp19[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
	operator18 func(Observable[R]) Observable[S],
	operator19 func(Observable[S]) Observable[T],
) func(Observable[A]) Observable[T]

PipeOp19 is similar to Pipe19, but can be used as an operator.

func PipeOp20 ΒΆ

func PipeOp20[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
	operator18 func(Observable[R]) Observable[S],
	operator19 func(Observable[S]) Observable[T],
	operator20 func(Observable[T]) Observable[U],
) func(Observable[A]) Observable[U]

PipeOp20 is similar to Pipe20, but can be used as an operator.

func PipeOp21 ΒΆ

func PipeOp21[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
	operator18 func(Observable[R]) Observable[S],
	operator19 func(Observable[S]) Observable[T],
	operator20 func(Observable[T]) Observable[U],
	operator21 func(Observable[U]) Observable[V],
) func(Observable[A]) Observable[V]

PipeOp21 is similar to Pipe21, but can be used as an operator.

func PipeOp22 ΒΆ

func PipeOp22[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
	operator18 func(Observable[R]) Observable[S],
	operator19 func(Observable[S]) Observable[T],
	operator20 func(Observable[T]) Observable[U],
	operator21 func(Observable[U]) Observable[V],
	operator22 func(Observable[V]) Observable[W],
) func(Observable[A]) Observable[W]

PipeOp22 is similar to Pipe22, but can be used as an operator.

func PipeOp23 ΒΆ

func PipeOp23[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
	operator18 func(Observable[R]) Observable[S],
	operator19 func(Observable[S]) Observable[T],
	operator20 func(Observable[T]) Observable[U],
	operator21 func(Observable[U]) Observable[V],
	operator22 func(Observable[V]) Observable[W],
	operator23 func(Observable[W]) Observable[X],
) func(Observable[A]) Observable[X]

PipeOp23 is similar to Pipe23, but can be used as an operator.

func PipeOp24 ΒΆ

func PipeOp24[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
	operator18 func(Observable[R]) Observable[S],
	operator19 func(Observable[S]) Observable[T],
	operator20 func(Observable[T]) Observable[U],
	operator21 func(Observable[U]) Observable[V],
	operator22 func(Observable[V]) Observable[W],
	operator23 func(Observable[W]) Observable[X],
	operator24 func(Observable[X]) Observable[Y],
) func(Observable[A]) Observable[Y]

PipeOp24 is similar to Pipe24, but can be used as an operator.

func PipeOp25 ΒΆ

func PipeOp25[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y, Z any](
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
	operator18 func(Observable[R]) Observable[S],
	operator19 func(Observable[S]) Observable[T],
	operator20 func(Observable[T]) Observable[U],
	operator21 func(Observable[U]) Observable[V],
	operator22 func(Observable[V]) Observable[W],
	operator23 func(Observable[W]) Observable[X],
	operator24 func(Observable[X]) Observable[Y],
	operator25 func(Observable[Y]) Observable[Z],
) func(Observable[A]) Observable[Z]

PipeOp25 is similar to Pipe25, but can be used as an operator.

func RaceWith ΒΆ

func RaceWith[T any](sources ...Observable[T]) func(Observable[T]) Observable[T]

RaceWith creates an Observable that mirrors the first source Observable to emit a next, error or complete notification from the combination of the Observable to which the operator is applied and supplied Observables. It cancels the subscriptions to all other Observables. It completes when the source Observable completes. If the source Observable errors, it errors with the same error.

It is a curried function that takes the other Observables as arguments. Play: https://go.dev/play/p/5VzGFd62SMC

Example (Error) ΒΆ
observable := Race(
	Delay[int](50*time.Millisecond)(Throw[int](assert.AnError)),
	Delay[int](20*time.Millisecond)(Just(4, 5, 6)),
	Delay[int](100*time.Millisecond)(Just(7, 8, 9)),
)

subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 4
Next: 5
Next: 6
Completed
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3),
	RaceWith(
		Delay[int](50*time.Millisecond)(Just(4, 5, 6)),
		Delay[int](100*time.Millisecond)(Just(7, 8, 9)),
	),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()

time.Sleep(150 * time.Millisecond)
Output:

Next: 1
Next: 2
Next: 3
Completed

func Reduce ΒΆ

func Reduce[T, R any](accumulator func(agg R, item T) R, seed R) func(Observable[T]) Observable[R]

Reduce applies an accumulator function over the source Observable, and emits the result when the source completes. It takes a seed value as the initial accumulator value. Play: https://go.dev/play/p/GpOF9eNpA5w

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Reduce(func(agg, current int) int {
		return agg + current
	}, 42),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Reduce(func(agg, current int) int {
		return agg + current
	}, 42),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 57
Completed

func ReduceI ΒΆ

func ReduceI[T, R any](accumulator func(agg R, item T, index int64) R, seed R) func(Observable[T]) Observable[R]

ReduceI applies an accumulator function over the source Observable, and emits the result when the source completes. It takes a seed value as the initial accumulator value.

func ReduceIWithContext ΒΆ

func ReduceIWithContext[T, R any](accumulator func(ctx context.Context, agg R, item T, index int64) (context.Context, R), seed R) func(Observable[T]) Observable[R]

ReduceIWithContext applies an accumulator function over the source Observable, and emits the result when the source completes. It takes a seed value as the initial accumulator value. Play: https://go.dev/play/p/WALnb341F4U

func ReduceWithContext ΒΆ

func ReduceWithContext[T, R any](accumulator func(ctx context.Context, agg R, item T) (context.Context, R), seed R) func(Observable[T]) Observable[R]

ReduceWithContext applies an accumulator function over the source Observable, and emits the result when the source completes. It takes a seed value as the initial accumulator value.

func RepeatWith ΒΆ

func RepeatWith[T any](count int64) func(Observable[T]) Observable[T]

RepeatWith repeats the source Observable a specified number of times. This is a pipeable operator. The creation operator equivalent is `Repeat`.

The destination is flatten. Play: https://go.dev/play/p/fEKtAX9_nYe

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	RepeatWith[int](3),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3),
	RepeatWith[int](3),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 1
Next: 2
Next: 3
Next: 1
Next: 2
Next: 3
Completed

func Retry ΒΆ

func Retry[T any]() func(Observable[T]) Observable[T]

Retry resubscribes to the source observable when it encounters an error. It will retry infinitely. If you want to limit the number of retries, use RetryWithConfig. Play: https://go.dev/play/p/Llj9dT9Y3Z2

func RetryWithConfig ΒΆ

func RetryWithConfig[T any](opts RetryConfig) func(Observable[T]) Observable[T]

RetryWithConfig resubscribes to the source observable when it encounters an error. If a max number of retries is set, it will retry until the max number of retries is reached. If a delay is set, it will wait before retrying. If resetOnSuccess is set, it will reset the number of retries when a value is emitted. Play: https://go.dev/play/p/GilWi5xG0lr

Example ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)
		observer.Complete()

		return nil
	}),
	RetryWithConfig[int](RetryConfig{
		MaxRetries: 1,
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing

func Round ΒΆ

func Round() func(Observable[float64]) Observable[float64]

Round emits the rounded values emitted by the source Observable. Play: https://go.dev/play/p/aXwxpsJq_BQ

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[float64]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Round(),
)

subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just[float64](1, 2, 3, 4, 5),
	Round(),
)

subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Completed

func SampleTime ΒΆ

func SampleTime[T any](interval time.Duration) func(Observable[T]) Observable[T]

SampleTime emits the most recently emitted value from the source Observable within periodic time intervals.

Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from this operator will emit no item for that sampling period. Play: https://go.dev/play/p/PcPo4lE9-_T

func SampleWhen ΒΆ

func SampleWhen[T, t any](tick Observable[t]) func(Observable[T]) Observable[T]

SampleWhen emits the most recently emitted value from the source Observable within a period determined by another Observable?

Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from this operator will emit no item for that sampling period. Play: https://go.dev/play/p/tr4FEd-CSce

func Scan ΒΆ

func Scan[T, R any](reduce func(accumulator R, item T) R, seed R) func(Observable[T]) Observable[R]

Scan applies an accumulator function over an Observable and emits each intermediate result. Play: https://go.dev/play/p/gAzVq-a0Jiz

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Scan(func(agg, current int) int {
		return agg + current
	}, 42),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 43
Next: 45
Next: 48
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Scan(func(agg, current int) int {
		return agg + current
	}, 42),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 43
Next: 45
Next: 48
Next: 52
Next: 57
Completed

func ScanI ΒΆ

func ScanI[T, R any](reduce func(accumulator R, item T, index int64) R, seed R) func(Observable[T]) Observable[R]

ScanI applies an accumulator function over an Observable and emits each intermediate result.

func ScanIWithContext ΒΆ

func ScanIWithContext[T, R any](reduce func(ctx context.Context, accumulator R, item T, index int64) (context.Context, R), seed R) func(Observable[T]) Observable[R]

ScanIWithContext applies an accumulator function over an Observable and emits each intermediate result. Play: https://go.dev/play/p/BG6OmY35v4x

func ScanWithContext ΒΆ

func ScanWithContext[T, R any](reduce func(ctx context.Context, accumulator R, item T) (context.Context, R), seed R) func(Observable[T]) Observable[R]

ScanWithContext applies an accumulator function over an Observable and emits each intermediate result.

func SequenceEqual ΒΆ

func SequenceEqual[T comparable](obsB Observable[T]) func(Observable[T]) Observable[bool]

SequenceEqual determines whether two observable sequences are equal by comparing the elements pairwise. Play: https://go.dev/play/p/cBIQlH01byQ

func Serialize ΒΆ

func Serialize[T any]() func(Observable[T]) Observable[T]

Serialize ensures thread-safe message passing by wrapping any observable in a ro.SafeObservable implementation.

func Share ΒΆ

func Share[T any]() func(Observable[T]) Observable[T]

Share creates a new Observable that multicasts (shares) the original Observable. As long as there is at least one subscription to the multicasted Observable, the source Observable will be subscribed and emitting data. When all subscribers have unsubscribed, the source Observable will be unsubscribed.

This is an alias for ShareWithConfig with default configuration. Play: https://go.dev/play/p/C34fv02jAIH

func ShareReplay ΒΆ

func ShareReplay[T any](bufferSize int) func(Observable[T]) Observable[T]

ShareReplay creates a new Observable that multicasts (shares) the original Observable and replays a specified number of items to any future subscribers. As long as there is at least one subscription to the multicasted Observable, the source Observable will be subscribed and emitting data. When all subscribers have unsubscribed, the source Observable will be unsubscribed.

This is an alias for ShareReplayWithConfig with default configuration. Play: https://go.dev/play/p/QmsDbChzRgu

func ShareReplayWithConfig ΒΆ

func ShareReplayWithConfig[T any](bufferSize int, config ShareReplayConfig) func(Observable[T]) Observable[T]

ShareReplayWithConfig creates a new Observable that multicasts (shares) the original Observable and replays a specified number of items to any future subscribers. As long as there is at least one subscription to the multicasted Observable, the source Observable will be subscribed and emitting data. When all subscribers have unsubscribed, the source Observable will be unsubscribed.

The configuration allows to customize the behavior of the shared Observable:

  • `bufferSize` is the number of items to replay to future subscribers.
  • `ResetOnRefCountZero` determines whether the shared Observable should be reset when the reference count reaches zero.

Play: https://go.dev/play/p/QmsDbChzRgu

func ShareWithConfig ΒΆ

func ShareWithConfig[T any](config ShareConfig[T]) func(Observable[T]) Observable[T]

ShareWithConfig creates a new Observable that multicasts (shares) the original Observable. As long as there is at least one subscription to the multicasted Observable, the source Observable will be subscribed and emitting data. When all subscribers have unsubscribed, the source Observable will be unsubscribed.

The configuration allows to customize the behavior of the shared Observable:

  • `Connector` is a factory function that creates a new Subject for each subscription. The Subject can be any type of Subject, such as a ReplaySubject, a BehaviorSubject, a ReplaySubject, etc.
  • `ResetOnError` determines whether the shared Observable should be reset when an error is emitted.
  • `ResetOnComplete` determines whether the shared Observable should be reset when it completes.
  • `ResetOnRefCountZero` determines whether the shared Observable should be reset when the reference count reaches zero.

Play: https://go.dev/play/p/C34fv02jAIH

func Skip ΒΆ

func Skip[T any](count int64) func(Observable[T]) Observable[T]

Skip suppresses the first n items emitted by an Observable. If the count is greater than the number of items emitted by the source Observable, Skip will not emit any items. If the count is zero, Skip will emit all items. Play: https://go.dev/play/p/AAEJaUZJuIj

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Skip[int](2),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Skip[int](2),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 3
Next: 4
Next: 5
Completed

func SkipLast ΒΆ

func SkipLast[T any](count int) func(Observable[T]) Observable[T]

SkipLast suppresses the last n items emitted by an Observable. If the count is greater than the number of items emitted by the source Observable, SkipLast will not emit any items. If the count is zero, SkipLast will emit all items. Play: https://go.dev/play/p/gire30ONRBB

Example (Empty) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	SkipLast[int](10),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Completed
Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	SkipLast[int](2),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	SkipLast[int](2),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Completed

func SkipUntil ΒΆ

func SkipUntil[T, S any](signal Observable[S]) func(Observable[T]) Observable[T]

SkipUntil suppresses items emitted by an Observable until a second Observable emits an item or completes. It will then emit all the subsequent items. If the second Observable is empty, SkipUntil will not emit any items. If the second Observable emits an item or completes, SkipUntil will emit all items.

Example (Empty) ΒΆ
observable := Pipe1(
	RangeWithInterval(0, 5, 10*time.Millisecond),
	SkipUntil[int64](Interval(100*time.Millisecond)),
)

subscription := observable.Subscribe(PrintObserver[int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Completed
Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		go func() {
			time.Sleep(30 * time.Millisecond)
			observer.Next(1)
			time.Sleep(30 * time.Millisecond)
			observer.Next(2)
			time.Sleep(30 * time.Millisecond)
			observer.Next(3)
			observer.Error(assert.AnError)
			observer.Next(4)
		}()

		return nil
	}),
	SkipUntil[int](Interval(45*time.Millisecond)),
)

subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	RangeWithInterval(0, 5, 40*time.Millisecond),
	SkipUntil[int64](Interval(100*time.Millisecond)),
)

subscription := observable.Subscribe(PrintObserver[int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 2
Next: 3
Next: 4
Completed

func SkipWhile ΒΆ

func SkipWhile[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]

SkipWhile skips items emitted by an Observable until a specified condition becomes false. It will then emit all the subsequent items. If the condition is never false, SkipWhile will not emit any items. If the condition is false on the first item, SkipWhile will emit all items.

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	SkipWhile(func(v int) bool {
		return v <= 2
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	SkipWhile(func(v int) bool {
		return v <= 2
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 3
Next: 4
Next: 5
Completed

func SkipWhileI ΒΆ

func SkipWhileI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]

SkipWhileI skips items emitted by an Observable until a specified condition becomes false. It will then emit all the subsequent items. If the condition is never false, SkipWhile will not emit any items. If the condition is false on the first item, SkipWhile will emit all items.

func SkipWhileIWithContext ΒΆ

func SkipWhileIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool)) func(Observable[T]) Observable[T]

SkipWhileIWithContext skips items emitted by an Observable until a specified condition becomes false. It will then emit all the subsequent items. If the condition is never false, SkipWhile will not emit any items. If the condition is false on the first item, SkipWhile will emit all items. Play: https://go.dev/play/p/oYUQuPWIytL

func SkipWhileWithContext ΒΆ

func SkipWhileWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool)) func(Observable[T]) Observable[T]

SkipWhileWithContext skips items emitted by an Observable until a specified condition becomes false. It will then emit all the subsequent items. If the condition is never false, SkipWhile will not emit any items. If the condition is false on the first item, SkipWhile will emit all items.

func StartWith ΒΆ

func StartWith[T any](prefixes ...T) func(Observable[T]) Observable[T]

StartWith emits the given values before emitting the values from the source Observable. Play: https://go.dev/play/p/vS_gIw8Ce1C

Example (Error) ΒΆ
observable := Pipe1(
	Throw[int](assert.AnError),
	StartWith(1, 2, 3),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(4, 5, 6),
	StartWith(1, 2, 3),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Completed

func SubscribeOn ΒΆ

func SubscribeOn[T any](bufferSize int) func(Observable[T]) Observable[T]

SubscribeOn schedule the upstream flow to a different goroutine. Next, Error and Complete notifications are sent to a queue first, then the consumer consume this queue. SubscribeOn converts a push-based Observable into a pullable stream with backpressure capabilities.

To schedule the downstream flow to a different goroutine, refer to SubscribeOn.

When an Observable emits values faster than they can be consumed, SubscribeOn buffers these values in a queue of specified capacity. This allows downstream consumers to pull values at their own pace while managing backpressure from upstream emissions.

Note: Once the buffer reaches its capacity, upstream emissions will block until space becomes available, effectively implementing backpressure control.

@TODO: add a backpressure policy ? drop vs block. Play: https://go.dev/play/p/WrsTUq6yxtO

func Sum ΒΆ

func Sum[T constraints.Numeric]() func(Observable[T]) Observable[T]

Sum calculates the sum of the values emitted by the source Observable. It emits the sum when the source completes. Play: https://go.dev/play/p/b3rRlI80igo

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Sum[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Sum[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 15
Completed

func Tail ΒΆ

func Tail[T any]() func(Observable[T]) Observable[T]

Tail emits only the last item emitted by an Observable. If the source Observable is empty, Tail will emit an error.

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Tail[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Tail[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 5
Completed

func Take ΒΆ

func Take[T any](count int64) func(Observable[T]) Observable[T]

Take emits only the first n items emitted by an Observable. If the count is greater than the number of items emitted by the source Observable, Take will emit all items. If the count is zero, Take will not emit any items. Play: https://go.dev/play/p/IC_hJMsg7yk

Example (Error1) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Take[int](5),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Error2) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Take[int](2),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Completed
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Take[int](2),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Completed

func TakeLast ΒΆ

func TakeLast[T any](count int) func(Observable[T]) Observable[T]

TakeLast emits only the last n items emitted by an Observable. If the count is greater than the number of items emitted by the source Observable, TakeLast will emit all items. If the count is zero, TakeLast will not emit any items. Play: https://go.dev/play/p/J0mX3NpEHzy

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	TakeLast[int](2),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	TakeLast[int](2),
)

subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 4
Next: 5
Completed

func TakeUntil ΒΆ

func TakeUntil[T, S any](signal Observable[S]) func(Observable[T]) Observable[T]

TakeUntil emits items emitted by an Observable until a second Observable emits an item or completes. It will then complete. If the second Observable is empty, TakeUntil will emit all items. If the second Observable emits an item or completes, TakeUntil will emit all items. If the second Observable emits an item or completes, TakeUntil will complete. Play: https://go.dev/play/p/nhgYGyREW1r

Example (Empty) ΒΆ
observable := Pipe1(
	RangeWithInterval(0, 5, 50*time.Millisecond),
	TakeUntil[int64](Interval(10*time.Millisecond)),
)

subscription := observable.Subscribe(PrintObserver[int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Completed
Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		go func() {
			time.Sleep(20 * time.Millisecond)
			observer.Next(1)
			time.Sleep(20 * time.Millisecond)
			observer.Next(2)
			time.Sleep(20 * time.Millisecond)
			observer.Next(3)
			observer.Error(assert.AnError)
			observer.Next(4)
		}()

		return nil
	}),
	TakeUntil[int](Interval(50*time.Millisecond)),
)

subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 1
Next: 2
Completed
Example (Ok) ΒΆ
observable := Pipe1(
	RangeWithInterval(0, 5, 40*time.Millisecond),
	TakeUntil[int64](Interval(100*time.Millisecond)),
)

subscription := observable.Subscribe(PrintObserver[int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 0
Next: 1
Completed

func TakeWhile ΒΆ

func TakeWhile[T any](predicate func(item T) bool) func(Observable[T]) Observable[T]

TakeWhile emits items emitted by an Observable so long as a specified condition is true. It will then complete. If the condition is never true, TakeWhile will not emit any items. If the condition is true on the first item, TakeWhile will emit all items. If the condition is false on the first item, TakeWhile will not emit any items. Play: https://go.dev/play/p/lxV03GzOa2J

Example (Error1) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	TakeWhile(func(n int) bool {
		return n < 5
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Error2) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	TakeWhile(func(n int) bool {
		return n < 3
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Completed
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	TakeWhile(func(n int) bool {
		return n < 3
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Completed

func TakeWhileI ΒΆ

func TakeWhileI[T any](predicate func(item T, index int64) bool) func(Observable[T]) Observable[T]

TakeWhileI emits items emitted by an Observable so long as a specified condition is true. It will then complete. If the condition is never true, TakeWhile will not emit any items. If the condition is true on the first item, TakeWhile will emit all items. If the condition is false on the first item, TakeWhile will not emit any items.

func TakeWhileIWithContext ΒΆ

func TakeWhileIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool)) func(Observable[T]) Observable[T]

TakeWhileIWithContext emits items emitted by an Observable so long as a specified condition is true. It will then complete. If the condition is never true, TakeWhile will not emit any items. If the condition is true on the first item, TakeWhile will emit all items. If the condition is false on the first item, TakeWhile will not emit any items.

func TakeWhileWithContext ΒΆ

func TakeWhileWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool)) func(Observable[T]) Observable[T]

TakeWhileWithContext emits items emitted by an Observable so long as a specified condition is true. It will then complete. If the condition is never true, TakeWhile will not emit any items. If the condition is true on the first item, TakeWhile will emit all items. If the condition is false on the first item, TakeWhile will not emit any items.

func Tap ΒΆ

func Tap[T any](onNext func(value T), onError func(err error), onComplete func()) func(Observable[T]) Observable[T]

Tap allows you to perform side effects for notifications from the source Observable without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Play: https://go.dev/play/p/oDI3d6553MI

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Tap(
		func(value int) {
			fmt.Printf("Next: %v\n", value)
		},
		func(err error) {
			fmt.Printf("Error: %s\n", err.Error())
		},
		func() {
			fmt.Printf("Completed\n")
		},
	),
)

subscription := observable.Subscribe(NoopObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Range(1, 4),
	Tap(
		func(value int64) {
			fmt.Printf("Next: %v\n", value)
		},
		func(err error) {
			fmt.Printf("Error: %s\n", err.Error())
		},
		func() {
			fmt.Printf("Completed\n")
		},
	),
)

subscription := observable.Subscribe(NoopObserver[int64]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Completed

func TapOnComplete ΒΆ

func TapOnComplete[T any](onComplete func()) func(Observable[T]) Observable[T]

TapOnComplete allows you to perform side effects for Complete notifications from the source Observable without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Play: https://go.dev/play/p/z1sntT6bplM

Example (Error) ΒΆ
observable := Pipe2(
	Throw[int](assert.AnError),
	Delay[int](10*time.Millisecond),
	TapOnComplete[int](func() { fmt.Printf("Completed") }),
)

subscription := observable.Subscribe(NoopObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Example (Ok) ΒΆ
observable := Pipe1(
	Range(1, 4),
	TapOnComplete[int64](func() { fmt.Printf("Completed") }),
)

subscription := observable.Subscribe(NoopObserver[int64]())
defer subscription.Unsubscribe()
Output:

Completed

func TapOnCompleteWithContext ΒΆ

func TapOnCompleteWithContext[T any](onComplete func(ctx context.Context)) func(Observable[T]) Observable[T]

TapOnCompleteWithContext allows you to perform side effects for Complete notifications from the source Observable without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Play: https://go.dev/play/p/3k25j_D1OTW

func TapOnError ΒΆ

func TapOnError[T any](onError func(err error)) func(Observable[T]) Observable[T]

TapOnError allows you to perform side effects for Error notifications from the source Observable without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Play: https://go.dev/play/p/oDI3d6553MI

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	TapOnError[int](func(err error) { fmt.Printf("Error: %s\n", err.Error()) }),
)

subscription := observable.Subscribe(NoopObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Range(1, 4),
	TapOnError[int64](func(err error) { fmt.Printf("Error: %s\n", err.Error()) }),
)

subscription := observable.Subscribe(NoopObserver[int64]())
defer subscription.Unsubscribe()

func TapOnErrorWithContext ΒΆ

func TapOnErrorWithContext[T any](onError func(ctx context.Context, err error)) func(Observable[T]) Observable[T]

TapOnErrorWithContext allows you to perform side effects for Error notifications from the source Observable without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer.

func TapOnFinalize ΒΆ

func TapOnFinalize[T any](onFinalize func()) func(Observable[T]) Observable[T]

TapOnFinalize allows you to perform side effects when the source Observable is unsubscribed from without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Play: https://go.dev/play/p/VEACE_KhdvU

func TapOnNext ΒΆ

func TapOnNext[T any](onNext func(value T)) func(Observable[T]) Observable[T]

TapOnNext allows you to perform side effects for Next notifications from the source Observable without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Play: https://go.dev/play/p/oDI3d6553MI

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int64]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	TapOnNext(func(v int64) { fmt.Println("Next:", v) }),
)

subscription := observable.Subscribe(NoopObserver[int64]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Example (Ok) ΒΆ
observable := Pipe1(
	Range(1, 4),
	TapOnNext(func(v int64) { fmt.Println("Next:", v) }),
)

subscription := observable.Subscribe(NoopObserver[int64]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3

func TapOnNextWithContext ΒΆ

func TapOnNextWithContext[T any](onNext func(ctx context.Context, value T)) func(Observable[T]) Observable[T]

TapOnNextWithContext allows you to perform side effects for Next notifications from the source Observable Play: https://go.dev/play/p/oDI3d6553MI without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer.

func TapOnSubscribe ΒΆ

func TapOnSubscribe[T any](onSubscribe func()) func(Observable[T]) Observable[T]

TapOnSubscribe allows you to perform side effects when the source Observable is subscribed to without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Play: https://go.dev/play/p/0YzsxpRkO4T

func TapOnSubscribeWithContext ΒΆ

func TapOnSubscribeWithContext[T any](onSubscribe func(ctx context.Context)) func(Observable[T]) Observable[T]

TapOnSubscribeWithContext allows you to perform side effects when the source Observable is subscribed to without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer.

func TapWithContext ΒΆ

func TapWithContext[T any](onNext func(ctx context.Context, value T), onError func(ctx context.Context, err error), onComplete func(ctx context.Context)) func(Observable[T]) Observable[T]

TapWithContext allows you to perform side effects for notifications from the source Observable without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer. Play: https://go.dev/play/p/oDI3d6553MI

func ThrottleTime ΒΆ

func ThrottleTime[T any](interval time.Duration) func(Observable[T]) Observable[T]

ThrottleTime emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process. Play: https://go.dev/play/p/ITogsevmh88

func ThrottleWhen ΒΆ

func ThrottleWhen[T, t any](tick Observable[t]) func(Observable[T]) Observable[T]

ThrottleWhen emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process. Play: https://go.dev/play/p/q3ISV03EL3q

func ThrowIfEmpty ΒΆ

func ThrowIfEmpty[T any](throw func() error) func(Observable[T]) Observable[T]

ThrowIfEmpty throws an error if the source observable is empty. It will throw the error returned by the throw function. If the source observable emits a value, it will complete. If the source observable emits an error, it will propagate the error. Play: https://go.dev/play/p/mLCaC7p_6p4

Example ΒΆ
observable := Pipe1(
	Empty[int](),
	ThrowIfEmpty[int](func() error {
		return errors.New("empty")
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: empty

func ThrowOnContextCancel ΒΆ

func ThrowOnContextCancel[T any]() func(Observable[T]) Observable[T]

ThrowOnContextCancel returns an Observable that emits the same items as the source Observable, but throws an error if the context is canceled. This operator should be chained after an operator such as ContextWithTimeout or ContextWithDeadline. Play: https://go.dev/play/p/K9oGdZFa-b1

func TimeInterval ΒΆ

func TimeInterval[T any]() func(Observable[T]) Observable[IntervalValue[T]]

TimeInterval emits the values emitted by the source Observable with the time elapsed between each emission. Play: https://go.dev/play/p/VX73ZL74hPk

Example ΒΆ
observable := Pipe1(
	RangeWithInterval(0, 3, 10*time.Millisecond),
	TimeInterval[int64](),
)

subscription := observable.Subscribe(NoopObserver[IntervalValue[int64]]())
defer subscription.Unsubscribe()

func Timeout ΒΆ

func Timeout[T any](duration time.Duration) func(Observable[T]) Observable[T]

Timeout raises an error if the source Observable does not emit any item within the specified duration. Play: https://go.dev/play/p/t0xKoj-_AqZ

Example (Error) ΒΆ
subscription := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		go func() {
			observer.Next(1)
			time.Sleep(100 * time.Millisecond)
			observer.Next(2)
			time.Sleep(100 * time.Millisecond)
			observer.Next(3)
			observer.Error(assert.AnError)
			observer.Next(4)
		}()
		return nil
	}),
	Timeout[int](50*time.Millisecond),
).Subscribe(PrintObserver[int]())

subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 1
Error: ro.Timeout: timeout after 50ms
Example (Ok) ΒΆ
observable := Pipe1(
	Range(1, 4),
	Timeout[int64](20*time.Millisecond),
)

subscription := observable.Subscribe(PrintObserver[int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 1
Next: 2
Next: 3
Completed

func Timestamp ΒΆ

func Timestamp[T any]() func(Observable[T]) Observable[TimestampValue[T]]

Timestamp emits the values emitted by the source Observable with the time elapsed since the source Observable was subscribed to. Play: https://go.dev/play/p/cDiCr6qIE2P

Example ΒΆ
observable := Pipe1(
	RangeWithInterval(0, 3, 10*time.Millisecond),
	Timestamp[int64](),
)

subscription := observable.Subscribe(NoopObserver[TimestampValue[int64]]())
defer subscription.Unsubscribe()

func ToChannel ΒΆ

func ToChannel[T any](size int) func(Observable[T]) Observable[<-chan Notification[T]]

ToChannel materializes and forward all items from the observable into a channel. It is a sink operator so it emit a single value. It emits the channel when the source completes. If the source is empty, it emits an empty channel. The channel will be closed when the source completes or emit an error. Play: https://go.dev/play/p/WMKa26sirV0

Example (Error) ΒΆ
observable := Pipe3(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	ToChannel[int](42),
	Map(lo.ChannelToSlice[Notification[int]]),
	Flatten[Notification[int]](),
)

subscription := observable.Subscribe(PrintObserver[Notification[int]]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: Next(1)
Next: Next(2)
Next: Next(3)
Next: Error(assert.AnError general error for testing)
Completed
Example (Ok) ΒΆ
observable := Pipe3(
	Just(1, 2, 3, 4, 5),
	ToChannel[int](42),
	Map(lo.ChannelToSlice[Notification[int]]),
	Flatten[Notification[int]](),
)

subscription := observable.Subscribe(PrintObserver[Notification[int]]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: Next(1)
Next: Next(2)
Next: Next(3)
Next: Next(4)
Next: Next(5)
Next: Complete()
Completed

func ToMap ΒΆ

func ToMap[T any, K comparable, V any](project func(item T) (K, V)) func(Observable[T]) Observable[map[K]V]

ToMap collects all items from the observable into a map. It is a sink operator so it emit a single value. It emits the map when the source completes. If the source is empty, it emits an empty map. Play: https://go.dev/play/p/FiF83XYB0ba

Example (Error) ΒΆ
mapper := func(v int) (string, string) {
	return strconv.FormatInt(int64(v), 10), strconv.FormatInt(int64(v), 10)
}

observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	ToMap(mapper),
)

subscription := observable.Subscribe(PrintObserver[map[string]string]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
mapper := func(v int) (string, string) {
	return strconv.FormatInt(int64(v), 10), strconv.FormatInt(int64(v), 10)
}

observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	ToMap(mapper),
)

subscription := observable.Subscribe(PrintObserver[map[string]string]())
defer subscription.Unsubscribe()
Output:

Next: map[1:1 2:2 3:3 4:4 5:5]
Completed

func ToMapI ΒΆ

func ToMapI[T any, K comparable, V any](mapper func(item T, index int64) (K, V)) func(Observable[T]) Observable[map[K]V]

ToMapI collects all items from the observable into a map. It is a sink operator so it emit a single value. It emits the map when the source completes. If the source is empty, it emits an empty map. Play: https://go.dev/play/p/FiF83XYB0ba

func ToMapIWithContext ΒΆ

func ToMapIWithContext[T any, K comparable, V any](mapper func(ctx context.Context, item T, index int64) (K, V)) func(Observable[T]) Observable[map[K]V]

ToMapIWithContext collects all items from the observable into a map. It is a sink operator so it emit a single value. It emits the map when the source completes. If the source is empty, it emits an empty map. Play: https://go.dev/play/p/FiF83XYB0ba

func ToMapWithContext ΒΆ

func ToMapWithContext[T any, K comparable, V any](project func(ctx context.Context, item T) (K, V)) func(Observable[T]) Observable[map[K]V]

ToMapWithContext collects all items from the observable into a map. It is a sink operator so it emit a single value. It emits the map when the source completes. If the source is empty, it emits an empty map. Play: https://go.dev/play/p/FiF83XYB0ba

func ToSlice ΒΆ

func ToSlice[T any]() func(Observable[T]) Observable[[]T]

ToSlice collects all items from the observable into a slice. It is a sink operator so it emit a single value. It emits the slice when the source completes. If the source is empty, it emits an empty slice. Play: https://go.dev/play/p/kxbU_PzpN6t

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[int]) Teardown {
		observer.Next(1)
		observer.Next(2)
		observer.Next(3)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	ToSlice[int](),
)

subscription := observable.Subscribe(PrintObserver[[]int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	ToSlice[int](),
)

subscription := observable.Subscribe(PrintObserver[[]int]())
defer subscription.Unsubscribe()
Output:

Next: [1 2 3 4 5]
Completed

func Trunc ΒΆ

func Trunc() func(Observable[float64]) Observable[float64]

Trunc emits the truncated values emitted by the source Observable. Play: https://go.dev/play/p/iYc9oGDgRZJ

Example (Error) ΒΆ
observable := Pipe1(
	NewObservable(func(observer Observer[float64]) Teardown {
		observer.Next(1.1)
		observer.Next(2.5)
		observer.Next(3.9)
		observer.Error(assert.AnError)
		observer.Next(4)

		return nil
	}),
	Trunc(),
)

subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(1.1, 2.4, 3.5, 4.9, 5.0),
	Trunc(),
)

subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Completed

func While ΒΆ

func While[T any](condition func() bool) func(Observable[T]) Observable[T]

While repeats the source observable while the condition is true. It will complete when the condition is false. It will not emit any values if the source observable is empty. It will not emit any values if the source observable emits an error. Play: https://go.dev/play/p/hMj3DBVtp73

Example ΒΆ
i := 0

observable := Pipe1(
	Just(1, 2, 3),
	While[int](func() bool {
		i++
		return i < 2
	}),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Completed

func WhileI ΒΆ

func WhileI[T any](condition func(index int64) bool) func(Observable[T]) Observable[T]

WhileI repeats the source observable while the condition is true. It will complete when the condition is false. It will not emit any values if the source observable is empty. It will not emit any values if the source observable emits an error. Play: https://go.dev/play/p/9aAuzAspyMc

func WhileIWithContext ΒΆ

func WhileIWithContext[T any](condition func(ctx context.Context, index int64) (context.Context, bool)) func(Observable[T]) Observable[T]

WhileIWithContext repeats the source observable while the condition is true. It will complete when the condition is false. It will not emit any values if the source observable is empty. It will not emit any values if the source observable emits an error. Play: https://go.dev/play/p/xTpqdGSxOxw

func WhileWithContext ΒΆ

func WhileWithContext[T any](condition func(ctx context.Context) (context.Context, bool)) func(Observable[T]) Observable[T]

WhileWithContext repeats the source observable while the condition is true. It will complete when the condition is false. It will not emit any values if the source observable is empty. It will not emit any values if the source observable emits an error.

func WindowWhen ΒΆ

func WindowWhen[T, B any](boundary Observable[B]) func(Observable[T]) Observable[Observable[T]]

WindowWhen emits an Observable that represents a window of items emitted by the source Observable. The window emits items when the specified boundary Observable emits an item. The window closes and a new window opens when the boundary Observable emits an item. If the source Observable completes, the window emits the complete notification and the complete notification is propagated. If the boundary Observable completes, the window emits the complete notification and the complete notification is propagated. Play: https://go.dev/play/p/vK0elE-rPbl

func ZipAll ΒΆ

func ZipAll[T any]() func(Observable[Observable[T]]) Observable[[]T]

ZipAll combines the values from the source Observable with the latest values from the other Observables. It emits only when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/FcpgTItKX-Q

Example (Error) ΒΆ
observable := Pipe1(
	Just(
		Range(1, 3),
		Throw[int64](assert.AnError),
		Range(100, 3),
	),
	ZipAll[int64](),
)

subscription := observable.Subscribe(PrintObserver[[]int64]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Pipe1(
	Just(
		Range(1, 3),
		Range(10, 13),
		Range(100, 103),
	),
	ZipAll[int64](),
)

subscription := observable.Subscribe(PrintObserver[[]int64]())
defer subscription.Unsubscribe()
Output:

Next: [1 10 100]
Next: [2 11 101]
Completed

func ZipWith ΒΆ

func ZipWith[A, B any](obsB Observable[B]) func(Observable[A]) Observable[lo.Tuple2[A, B]]

ZipWith combines the values from the source Observable with the latest values from the other Observables. It emits only when all Observables have emitted at least one value. It completes when the source Observable completes.

It is a curried function that takes the other Observable as an argument. Play: https://go.dev/play/p/RmErtE3pHjb

Example (Error) ΒΆ
observable := ZipWith2[int](
	Throw[int64](assert.AnError),
	Range(20, 23),
)(Just(1, 2, 3))

subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := ZipWith2[int](
	Range(10, 13),
	Range(20, 23),
)(Just(1, 2, 3))

subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Next: {1 10 20}
Next: {2 11 21}
Next: {3 12 22}
Completed

func ZipWith1 ΒΆ

func ZipWith1[A, B any](obsB Observable[B]) func(Observable[A]) Observable[lo.Tuple2[A, B]]

ZipWith1 combines the values from the source Observable with the latest values from the other Observables. It emits only when all Observables have emitted at least one value. It completes when the source Observable completes.

It is a curried function that takes the other Observable as an argument.

Example (Error) ΒΆ
observable := ZipWith1[int](Throw[int64](assert.AnError))(Just(1, 2, 3))

subscription := observable.Subscribe(PrintObserver[lo.Tuple2[int, int64]]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := ZipWith1[int](Range(10, 13))(Just(1, 2, 3))

subscription := observable.Subscribe(PrintObserver[lo.Tuple2[int, int64]]())
defer subscription.Unsubscribe()
Output:

Next: {1 10}
Next: {2 11}
Next: {3 12}
Completed

func ZipWith2 ΒΆ

func ZipWith2[A, B, C any](obsB Observable[B], obsC Observable[C]) func(Observable[A]) Observable[lo.Tuple3[A, B, C]]

ZipWith2 combines the values from the source Observable with the latest values from the other Observables. It emits only when all Observables have emitted at least one value. It completes when the source Observable completes.

It is a curried function that takes the other Observable as an argument. Play: https://go.dev/play/p/MMq82Rkb0oh

Example (Error) ΒΆ
observable := ZipWith2[int](Throw[int64](assert.AnError), Range(20, 23))(Just(1, 2))

subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := ZipWith2[int](Range(10, 13), Range(20, 23))(Just(1, 2))

subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Next: {1 10 20}
Next: {2 11 21}
Completed

func ZipWith3 ΒΆ

func ZipWith3[A, B, C, D any](obsB Observable[B], obsC Observable[C], obsD Observable[D]) func(Observable[A]) Observable[lo.Tuple4[A, B, C, D]]

ZipWith3 combines the values from the source Observable with the latest values from the other Observables. It emits only when all Observables have emitted at least one value. It completes when the source Observable completes.

It is a curried function that takes the other Observable as an argument.

Example (Error) ΒΆ
observable := ZipWith3[int](Throw[int64](assert.AnError), Range(20, 23), Range(30, 33))(Just(1))

subscription := observable.Subscribe(PrintObserver[lo.Tuple4[int, int64, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := ZipWith3[int](Range(10, 13), Range(20, 23), Range(30, 33))(Just(1))

subscription := observable.Subscribe(PrintObserver[lo.Tuple4[int, int64, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Next: {1 10 20 30}
Completed

func ZipWith4 ΒΆ

func ZipWith4[A, B, C, D, E any](obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E]) func(Observable[A]) Observable[lo.Tuple5[A, B, C, D, E]]

ZipWith4 combines the values from the source Observable with the latest values from the other Observables. It emits only when all Observables have emitted at least one value. It completes when the source Observable completes.

It is a curried function that takes the other Observable as an argument.

Example (Error) ΒΆ
observable := ZipWith4[int](Throw[int64](assert.AnError), Range(20, 23), Range(30, 33), Range(40, 43))(Just(1))

subscription := observable.Subscribe(PrintObserver[lo.Tuple5[int, int64, int64, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := ZipWith4[int](Range(10, 13), Range(20, 23), Range(30, 33), Range(40, 43))(Just(1))

subscription := observable.Subscribe(PrintObserver[lo.Tuple5[int, int64, int64, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Next: {1 10 20 30 40}
Completed

func ZipWith5 ΒΆ

func ZipWith5[A, B, C, D, E, F any](obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E], obsF Observable[F]) func(Observable[A]) Observable[lo.Tuple6[A, B, C, D, E, F]]

ZipWith5 combines the values from the source Observable with the latest values from the other Observables. It emits only when all Observables have emitted at least one value. It completes when the source Observable completes.

It is a curried function that takes the other Observable as an argument. Play: https://go.dev/play/p/OJz-AVo0-hY

Example (Error) ΒΆ
observable := ZipWith5[int](Throw[int64](assert.AnError), Range(20, 23), Range(30, 33), Range(40, 43), Range(50, 53))(Just(1))

subscription := observable.Subscribe(PrintObserver[lo.Tuple6[int, int64, int64, int64, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := ZipWith5[int](Range(10, 13), Range(20, 23), Range(30, 33), Range(40, 43), Range(50, 53))(Just(1))

subscription := observable.Subscribe(PrintObserver[lo.Tuple6[int, int64, int64, int64, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Next: {1 10 20 30 40 50}
Completed

Types ΒΆ

type Backpressure ΒΆ

type Backpressure int8

Backpressure is a type that represents the backpressure strategy to use.

const (
	// BackpressureBlock blocks the source observable when the destination is not ready to receive more values.
	BackpressureBlock Backpressure = iota
	// BackpressureDrop drops the source observable when the destination is not ready to receive more values.
	BackpressureDrop
)

type ConcurrencyMode ΒΆ

type ConcurrencyMode int8

ConcurrencyMode is a type that represents the concurrency mode to use.

const (
	ConcurrencyModeSafe ConcurrencyMode = iota
	ConcurrencyModeUnsafe
	ConcurrencyModeEventuallySafe
)

ConcurrencyModeSafe is a concurrency mode that is safe to use. Spinlock is ignored because it is too slow when chaining operators. Spinlock should be used only for short-lived local locks.

type ConnectableConfig ΒΆ

type ConnectableConfig[T any] struct {
	Connector         func() Subject[T]
	ResetOnDisconnect bool
}

ConnectableConfig is the configuration for a ConnectableObservable.

type ConnectableObservable ΒΆ

type ConnectableObservable[T any] interface {
	Observable[T]

	// Connect connects the ConnectableObservable. When connected, the ConnectableObservable
	// will emit values to its observers. If the ConnectableObservable is already connected,
	// this method creates a new subscription and starts emitting values to its observers.
	//
	// The Connect method returns a Subscription that can be used to disconnect the
	// ConnectableObservable. The Subscription may be used to cancel the connection,
	// and to wait for the connection to complete.
	//
	// The Subscription might be already disposed when the Connect method returns.
	Connect() Subscription
	ConnectWithContext(ctx context.Context) Subscription
}

ConnectableObservable is an Observable that can be connected and disconnected. When connected, it will emit values to its observers.

ConnectableObservable is useful when you want to share a single subscription to an Observable among multiple observers. This is useful when you want to multicast the values of an Observable.

func Connectable ΒΆ

func Connectable[T any](source Observable[T]) ConnectableObservable[T]

Connectable creates a new ConnectableObservable from an Observable. The ConnectableObservable will use the default connector, which is a PublishSubject. The ConnectableObservable will reset the source when disconnected. This means that when the ConnectableObservable is disconnected, it will create a new source when reconnected.

If you want to use a different connector or change the reset behavior, use ConnectableWithConfig.

func ConnectableWithConfig ΒΆ

func ConnectableWithConfig[T any](source Observable[T], config ConnectableConfig[T]) ConnectableObservable[T]

ConnectableWithConfig creates a new ConnectableObservable from an Observable. The ConnectableObservable will use the given connector. The ConnectableObservable will reset the source when disconnected if ResetOnDisconnect is true. This means that when the ConnectableObservable is disconnected, it will create a new source when reconnected.

func NewConnectableObservable ΒΆ

func NewConnectableObservable[T any](subscribe func(destination Observer[T]) Teardown) ConnectableObservable[T]

NewConnectableObservable creates a new ConnectableObservable. The subscribe function is called when the ConnectableObservable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the ConnectableObservable will not emit any more items.

The ConnectableObservable will use the default connector, which is a PublishSubject. The ConnectableObservable will reset the source when disconnected. This means that when the ConnectableObservable is disconnected, it will create a new source when reconnected.

If you want to use a different connector or change the reset behavior, use NewConnectableObservableWithConfig.

func NewConnectableObservableWithConfig ΒΆ

func NewConnectableObservableWithConfig[T any](subscribe func(destination Observer[T]) Teardown, config ConnectableConfig[T]) ConnectableObservable[T]

NewConnectableObservableWithConfig creates a new ConnectableObservable. The subscribe function is called when the ConnectableObservable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the ConnectableObservable will not emit any more items.

The ConnectableObservable will use the given connector. The ConnectableObservable will reset the source when disconnected if ResetOnDisconnect is true. This means that when the ConnectableObservable is disconnected, it will create a new source when reconnected.

func NewConnectableObservableWithConfigAndContext ΒΆ

func NewConnectableObservableWithConfigAndContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown, config ConnectableConfig[T]) ConnectableObservable[T]

NewConnectableObservableWithConfigAndContext creates a new ConnectableObservable. The subscribe function is called when the ConnectableObservable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the ConnectableObservable will not emit any more items.

The ConnectableObservable will use the given connector. The ConnectableObservable will reset the source when disconnected if ResetOnDisconnect is true. This means that when the ConnectableObservable is disconnected, it will create a new source when reconnected.

func NewConnectableObservableWithContext ΒΆ

func NewConnectableObservableWithContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown) ConnectableObservable[T]

NewConnectableObservableWithContext creates a new ConnectableObservable. The subscribe function is called when the ConnectableObservable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the ConnectableObservable will not emit any more items.

The ConnectableObservable will use the default connector, which is a PublishSubject. The ConnectableObservable will reset the source when disconnected. This means that when the ConnectableObservable is disconnected, it will create a new source when reconnected.

If you want to use a different connector or change the reset behavior, use NewConnectableObservableWithConfig.

type IntervalValue ΒΆ

type IntervalValue[T any] struct {
	Value    T
	Interval time.Duration
}

IntervalValue is a value emitted by the `TimeInterval` operator.

type Kind ΒΆ

type Kind uint8

Kind represents the kind of a Notification. It can be Next, Error, or Complete.

const (
	KindNext Kind = iota
	KindError
	KindComplete
)

Kind constants.

func (Kind) String ΒΆ

func (k Kind) String() string

String returns the string representation of a Kind.

type Notification ΒΆ

type Notification[T any] struct {
	Kind  Kind
	Value T
	Err   error
}

Notification represents a value emitted by an Observable. It can be a Next value, an Error, or a Complete signal. It is used to communicate between Observables and Observers. It is a generic type, so it can hold any value.

func NewNotificationComplete ΒΆ

func NewNotificationComplete[T any]() Notification[T]

NewNotificationComplete creates a new Notification with a Complete signal.

func NewNotificationError ΒΆ

func NewNotificationError[T any](err error) Notification[T]

NewNotificationError creates a new Notification with an Error.

func NewNotificationNext ΒΆ

func NewNotificationNext[T any](value T) Notification[T]

NewNotificationNext creates a new Notification with a Next value.

func (Notification[T]) String ΒΆ

func (n Notification[T]) String() string

type Observable ΒΆ

type Observable[T any] interface {
	// Subscribe subscribes an Observer to the Observable. The Observer will begin
	// to receive items emitted by the Observable. The Observer may receive any
	// number of items (including zero items), then may either complete or error,
	// but not both. Upon completion or error, the Observer will not receive any
	// more items.
	//
	// The Subscribe method returns a Subscription that can be used to unsubscribe
	// the Observer from the Observable. The Subscription may be used to cancel the
	// subscription, and to wait for the subscription to complete.
	//
	// The Subscription might be already disposed when the Subscribe method returns.
	// In this case, the Teardown function is not called.
	//
	// The Subscribe method may call the Observer's methods synchronously or
	// asynchronously. The Observer is responsible for handling concurrency and
	// synchronization.
	Subscribe(destination Observer[T]) Subscription
	SubscribeWithContext(ctx context.Context, destination Observer[T]) Subscription
}

Observable is the producer of values. It is the source of values that are emitted to Observers. Observable is a representation of any set of values over any amount of time.

The primary method of an Observable is subscribe, which is used to attach an Observer to the Observable. Once an Observer is subscribed, the Observable may begin to emit items to the Observer. An Observable may emit any number of items (including zero items), then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.

An Observable may call an Observer's methods synchronously or asynchronously.

An Observable is not a stream. It is a factory for streams.

func Amb ΒΆ

func Amb[T any](sources ...Observable[T]) Observable[T]

Amb is an alias for Race. Play: https://go.dev/play/p/-YvhnpQFVNS

Example (Error) ΒΆ
observable := Amb(
	Delay[int](25*time.Millisecond)(Throw[int](assert.AnError)),
	Delay[int](50*time.Millisecond)(Just(4, 5, 6)),
	Delay[int](100*time.Millisecond)(Just(7, 8, 9)),
)

subscription := observable.Subscribe(PrintObserver[int]())

time.Sleep(75 * time.Millisecond)
subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Amb(
	Delay[int](100*time.Millisecond)(Just(1, 2, 3)),
	Delay[int](25*time.Millisecond)(Just(4, 5, 6)),
	Delay[int](50*time.Millisecond)(Just(7, 8, 9)),
)

subscription := observable.Subscribe(PrintObserver[int]())

time.Sleep(150 * time.Millisecond)
subscription.Unsubscribe()
Output:

Next: 4
Next: 5
Next: 6
Completed

func CombineLatest2 ΒΆ

func CombineLatest2[A, B any](obsA Observable[A], obsB Observable[B]) Observable[lo.Tuple2[A, B]]

CombineLatest2 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/mzpJyg7plnm

Example (Error) ΒΆ
observable1 := NewObservable(func(observer Observer[int]) Teardown {
	go func() {
		time.Sleep(10 * time.Millisecond)
		observer.Next(1)
		observer.Error(assert.AnError)
	}()

	return nil
})

observable2 := NewObservable(func(observer Observer[int]) Teardown {
	go func() {
		observer.Next(2)
		observer.Complete()
	}()

	return nil
})

observable := Pipe1(
	CombineLatest2(
		observable1,
		observable2,
	),
	Map(func(snapshot lo.Tuple2[int, int]) []int {
		return []int{snapshot.A, snapshot.B}
	}),
)

subscription := observable.Subscribe(PrintObserver[[]int]())

time.Sleep(50 * time.Millisecond)

defer subscription.Unsubscribe()
Output:

Next: [1 2]
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable1 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := RangeWithInterval(3, 5, 50*time.Millisecond)

observable := Pipe1(
	CombineLatest2(
		observable1,
		observable2,
	),
	Map(func(snapshot lo.Tuple2[int64, int64]) []int64 {
		return []int64{snapshot.A, snapshot.B}
	}),
)

subscription := observable.Subscribe(PrintObserver[[]int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: [1 3]
Next: [1 4]
Next: [2 4]
Completed

func CombineLatest3 ΒΆ

func CombineLatest3[A, B, C any](obsA Observable[A], obsB Observable[B], obsC Observable[C]) Observable[lo.Tuple3[A, B, C]]

CombineLatest3 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes.

Example (Ok) ΒΆ
observable1 := Delay[int64](100 * time.Millisecond)(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := RangeWithInterval(3, 5, 50*time.Millisecond)
observable3 := Delay[int64](25 * time.Millisecond)(RangeWithInterval(5, 7, 50*time.Millisecond))

combined := CombineLatest3(observable1, observable2, observable3)
observable := Map(func(snapshot lo.Tuple3[int64, int64, int64]) []int64 {
	return []int64{snapshot.A, snapshot.B, snapshot.C}
})(combined)

subscription := observable.Subscribe(PrintObserver[[]int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: [1 4 6]
Next: [2 4 6]
Completed

func CombineLatest4 ΒΆ

func CombineLatest4[A, B, C, D any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D]) Observable[lo.Tuple4[A, B, C, D]]

CombineLatest4 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/mzpJyg7plnm

func CombineLatest5 ΒΆ

func CombineLatest5[A, B, C, D, E any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E]) Observable[lo.Tuple5[A, B, C, D, E]]

CombineLatest5 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/mzpJyg7plnm

func CombineLatestAny ΒΆ

func CombineLatestAny(sources ...Observable[any]) Observable[[]any]

CombineLatestAny combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/mzpJyg7plnm

Example (Error) ΒΆ
observable1 := Cast[int64, any]()(RangeWithInterval(1, 3, 50*time.Millisecond))
observable2 := Delay[any](75 * time.Millisecond)(Throw[any](assert.AnError))
observable3 := Just[any]("a", "b")

combined := Just(observable1, observable2, observable3)
observable := CombineLatestAllAny()(combined)

subscription := observable.Subscribe(PrintObserver[[]any]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable1 := Cast[int64, any]()(RangeWithInterval(1, 3, 40*time.Millisecond))
observable2 := Cast[string, any]()(Just("a", "b"))
observable3 := Delay[any](25 * time.Millisecond)(Just[any]("c", "d"))
observable4 := Delay[any](60 * time.Millisecond)(Cast[int64, any]()(Range(100, 102)))

combined := Just(observable1, observable2, observable3, observable4)
observable := CombineLatestAllAny()(combined)

subscription := observable.Subscribe(PrintObserver[[]any]())

time.Sleep(220 * time.Millisecond)
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: [1 b d 100]
Next: [1 b d 101]
Next: [2 b d 101]
Completed

func Concat ΒΆ

func Concat[T any](obs ...Observable[T]) Observable[T]

Concat concatenates the source Observable with other Observables. It subscribes to each inner Observable only after the previous one completes, maintaining their order. It completes when all inner Observables are done. Play: https://go.dev/play/p/DFokqIXIguM

Example (Error) ΒΆ
observable := Concat(
	Just(1, 2, 3),
	Throw[int](assert.AnError),
	Just(4, 5, 6),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Concat(
	Just(1, 2, 3),
	Just(4, 5, 6),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Completed

func Defer ΒΆ

func Defer[T any](factory func() Observable[T]) Observable[T]

Defer creates an Observable that waits until an Observer subscribes to it, and then it creates an Observable for each Observer. This is useful for creating Observables that depend on some external state that is not available at the time of creation. The `cb` function is called for each Observer that subscribes to the Observable. Play: https://go.dev/play/p/wyVzordmkK0

Example ΒΆ
// will capture current date time
observable1 := Of(time.Now())

// will capture date time at the moment of subscription
observable2 := Defer(func() Observable[time.Time] {
	return Of(time.Now())
})

subscription := Concat(observable1, observable2).Subscribe(NoopObserver[time.Time]())
subscription.Wait() // Note: using .Wait() is not recommended.

func Empty ΒΆ

func Empty[T any]() Observable[T]

Empty creates an Observable that emits no values and completes immediately. Play: https://go.dev/play/p/D1JWkPG4NFK

Example ΒΆ
observable := Empty[int]()

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Completed

func FromChannel ΒΆ

func FromChannel[T any](in <-chan T) Observable[T]

FromChannel creates an Observable from a channel. Closing the channel will complete the Observable. Play: https://go.dev/play/p/x0u4eaOzYln

Example ΒΆ
ch := make(chan int, 10)
observable := FromChannel(ch)

subscription := observable.Subscribe(PrintObserver[int]())

ch <- 1

ch <- 2

ch <- 3

close(ch)

subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 1
Next: 2
Next: 3
Completed

func FromSlice ΒΆ

func FromSlice[T any](collections ...[]T) Observable[T]

FromSlice creates an Observable from a slice. The values are emitted in the order they are in the slice. Play: https://go.dev/play/p/BNhnqoQn0tP

Example ΒΆ
observable := FromSlice([]int{1, 2, 3})

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Completed

func Future ΒΆ

func Future[T any](factory func() (T, error)) Observable[T]

Future creates an Observable that waits until an Observer subscribes to it, and then it emits either a value or an error, returned by the `factory` function.

This is useful for creating Observables that depend on some external state that is not available at the time of creation. The `factory` function is called for each Observer that subscribes to the Observable.

Example (Error) ΒΆ
observable := Future(func() (int, error) {
	req, err := http.NewRequest("", "", nil)
	if err != nil {
		return 0, err
	}

	res, err := http.DefaultClient.Do(req)
	if err != nil {
		return 0, err
	}

	defer res.Body.Close()

	// For some reason, removing the 2 following lines causes
	// the example to fail (see goleak).
	// See https://github.com/uber-go/goleak/issues/102
	_, _ = io.ReadAll(res.Body)

	defer http.DefaultClient.CloseIdleConnections()

	return 42, nil
})

subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Error: Get "": unsupported protocol scheme ""
Example (Ok) ΒΆ
observable := Future(func() (int, error) {
	req, err := http.NewRequest("GET", "https://postman-echo.com/get", nil)
	if err != nil {
		return 0, err
	}

	res, err := http.DefaultClient.Do(req)
	if err != nil {
		return 0, err
	}

	defer res.Body.Close()

	// For some reason, removing the 2 following lines causes
	// the example to fail (see goleak).
	// See https://github.com/uber-go/goleak/issues/102
	_, _ = io.ReadAll(res.Body)

	defer http.DefaultClient.CloseIdleConnections()

	return 42, nil
})

subscription := observable.Subscribe(PrintObserver[int]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 42
Completed

func Interval ΒΆ

func Interval(interval time.Duration) Observable[int64]

Interval creates an Observable that emits an infinite sequence of ascending integers, with a constant interval between them. The first value is not emitted immediately, but after the first interval has passed. Play: https://go.dev/play/p/7yskMPPFHA7

Example ΒΆ
observable := Interval(100 * time.Millisecond)

subscription := observable.Subscribe(PrintObserver[int64]())

time.Sleep(250 * time.Millisecond)
subscription.Unsubscribe() // "Completed" event is not transmitted
Output:

Next: 0
Next: 1

func IntervalWithInitial ΒΆ

func IntervalWithInitial(initial, interval time.Duration) Observable[int64]

IntervalWithInitial creates an Observable that emits an infinite sequence of ascending integers, with a constant interval between them. The first value is not emitted immediately, but after the initial interval has passed. The first interval is `initial`, and the subsequent intervals are `interval`. The first value is emitted after `initial` time has passed. Play: https://go.dev/play/p/Xhi6c336ldy

Example ΒΆ
observable := IntervalWithInitial(50*time.Millisecond, 100*time.Millisecond)

subscription := observable.Subscribe(PrintObserver[int64]())

time.Sleep(300 * time.Millisecond)
subscription.Unsubscribe() // "Completed" event is not transmitted
Output:

Next: 0
Next: 1
Next: 2

func Just ΒΆ

func Just[T any](values ...T) Observable[T]

Just is an alias for Of. Play: https://go.dev/play/p/A5S2McqqfqE

Example ΒΆ
observable := Just(1, 2, 3)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Completed

func Merge ΒΆ

func Merge[T any](sources ...Observable[T]) Observable[T]

Merge merges the values from all observables to a single observable result. It subscribes to each inner Observable, and emits all values from each inner Observable, maintaining their order. It completes when all inner Observables are done. Play: https://go.dev/play/p/hX2xPyeO3M9

Example (Error) ΒΆ
observable := Merge(
	RangeWithInterval(0, 2, 50*time.Millisecond),
	Pipe1(
		Throw[int64](assert.AnError),
		Delay[int64](75*time.Millisecond),
	),
)

subscription := observable.Subscribe(PrintObserver[int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 0
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Merge(
	RangeWithInterval(0, 2, 50*time.Millisecond),
	Pipe1(
		RangeWithInterval(10, 12, 50*time.Millisecond),
		Delay[int64](25*time.Millisecond),
	),
)

subscription := observable.Subscribe(PrintObserver[int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 0
Next: 10
Next: 1
Next: 11
Completed

func Never ΒΆ

func Never() Observable[struct{}]

Never creates an Observable that emits no values and never completes. This is useful for testing or when combining with other Observables. Play: https://go.dev/play/p/GHzcVYaEvN8

Example ΒΆ
observable := Never()

subscription := observable.Subscribe(PrintObserver[struct{}]())

time.Sleep(10 * time.Millisecond)
subscription.Unsubscribe()

func NewEventuallySafeObservable ΒΆ

func NewEventuallySafeObservable[T any](subscribe func(destination Observer[T]) Teardown) Observable[T]

NewEventuallySafeObservable creates a new Observable. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.

The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.

The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.

This method is safe for concurrent use, but concurrent messages are dropped.

func NewEventuallySafeObservableWithContext ΒΆ

func NewEventuallySafeObservableWithContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown) Observable[T]

NewEventuallySafeObservableWithContext creates a new Observable. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.

The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.

The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.

This method is safe for concurrent use, but concurrent messages are dropped.

func NewObservable ΒΆ

func NewObservable[T any](subscribe func(destination Observer[T]) Teardown) Observable[T]

NewObservable creates a new Observable. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.

The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.

The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.

This method is not safe for concurrent use.

Example (Error) ΒΆ
observable := NewObservable(func(observer Observer[int]) Teardown {
	observer.Next(1)
	observer.Next(2)
	observer.Next(3)
	observer.Error(assert.AnError)
	observer.Next(4)

	return nil
})

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := NewObservable(func(observer Observer[int]) Teardown {
	observer.Next(1)
	observer.Next(2)
	observer.Next(3)
	observer.Next(4)
	observer.Complete()

	return nil
})

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 4
Completed

func NewObservableWithConcurrencyMode ΒΆ

func NewObservableWithConcurrencyMode[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown, mode ConcurrencyMode) Observable[T]

NewObservableWithConcurrencyMode creates a new Observable with the given concurrency mode. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.

The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.

The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.

The Observable will use the given concurrency mode.

It is rarely used as a public API.

func NewObservableWithContext ΒΆ

func NewObservableWithContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown) Observable[T]

NewObservableWithContext creates a new Observable. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.

The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.

The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.

This method is not safe for concurrent use.

func NewSafeObservable ΒΆ

func NewSafeObservable[T any](subscribe func(destination Observer[T]) Teardown) Observable[T]

NewSafeObservable creates a new Observable. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.

The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.

The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.

This method is not safe for concurrent use.

func NewSafeObservableWithContext ΒΆ

func NewSafeObservableWithContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown) Observable[T]

NewSafeObservableWithContext creates a new Observable. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.

The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.

The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.

This method is not safe for concurrent use.

func NewUnsafeObservable ΒΆ

func NewUnsafeObservable[T any](subscribe func(destination Observer[T]) Teardown) Observable[T]

NewUnsafeObservable creates a new Observable. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.

The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.

The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.

This method is not safe for concurrent use.

func NewUnsafeObservableWithContext ΒΆ

func NewUnsafeObservableWithContext[T any](subscribe func(ctx context.Context, destination Observer[T]) Teardown) Observable[T]

NewUnsafeObservableWithContext creates a new Observable. The subscribe function is called when the Observable is subscribed to. The subscribe function is given an Observer, to which it may emit any number of items, then may either complete or error, but not both. Upon completion or error, the Observable will not emit any more items.

The subscribe function should return a Teardown function that will be called when the Subscription is unsubscribed. The Teardown function should clean up any resources created during the subscription.

The subscribe function may return a Teardown function that does nothing, if no cleanup is necessary. In this case, the Teardown function should return nil.

This method is not safe for concurrent use.

func Of ΒΆ

func Of[T any](values ...T) Observable[T]

Of creates an Observable that emits some values you specify. Play: https://go.dev/play/p/Zp5LgHgvJ59

Example ΒΆ
observable := Of(1, 2, 3)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Completed

func Pipe ΒΆ

func Pipe[First, Last any](source Observable[First], operators ...any) Observable[Last]

Pipe builds a composition of operators that will be chained to transform an observable stream. It provides a clean, declarative way to describe complex asynchronous operations.

`PipeX()` should be favored over `Pipe()`, because it offers more type-safety.

`PipeOp()` is the operator version of `Pipe()`.

Example ΒΆ
observable := Pipe[int, int](
	Just(1, 2, 3, 4, 5),
	Map(func(x int) int {
		return x * 2
	}),
	Skip[int](2),
	Sum[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 24
Completed

func Pipe1 ΒΆ

func Pipe1[A, B any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
) Observable[B]

Pipe1 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 1 operator.

`PipeOp1()` is the operator version of `Pipe1()`.

Example ΒΆ
observable := Pipe1(
	Just(1, 2, 3, 4, 5),
	Sum[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 15
Completed

func Pipe2 ΒΆ

func Pipe2[A, B, C any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
) Observable[C]

Pipe2 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 2 operators.

`PipeOp2()` is the operator version of `Pipe2()`.

Example ΒΆ
observable := Pipe2(
	Just(1, 2, 3, 4, 5),
	Map(func(x int) int {
		return x * 2
	}),
	Sum[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 30
Completed

func Pipe3 ΒΆ

func Pipe3[A, B, C, D any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
) Observable[D]

Pipe3 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 3 operators.

`PipeOp3()` is the operator version of `Pipe3()`.

Example ΒΆ
observable := Pipe3(
	Just(1, 2, 3, 4, 5),
	Map(func(x int) int {
		return x * 2
	}),
	Skip[int](2),
	Sum[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 24
Completed

func Pipe4 ΒΆ

func Pipe4[A, B, C, D, E any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
) Observable[E]

Pipe4 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 4 operators.

`PipeOp4()` is the operator version of `Pipe4()`.

Example ΒΆ
observable := Pipe4(
	Just(1, 2, 3, 4, 5),
	Map(func(x int) int {
		return x * 2
	}),
	Skip[int](2),
	Take[int](2),
	Sum[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 14
Completed

func Pipe5 ΒΆ

func Pipe5[A, B, C, D, E, F any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
) Observable[F]

Pipe5 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 5 operators.

`PipeOp5()` is the operator version of `Pipe5()`.

Example ΒΆ
observable := Pipe5(
	Just(1, 2, 3, 4, 5),
	Map(func(x int) int {
		return x * 2
	}),
	Skip[int](2),
	Take[int](2),
	Sum[int](),
	Max[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 14
Completed

func Pipe6 ΒΆ

func Pipe6[A, B, C, D, E, F, G any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
) Observable[G]

Pipe6 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 6 operators.

`PipeOp6()` is the operator version of `Pipe6()`.

Example ΒΆ
observable := Pipe6(
	Just(1, 2, 3, 4, 5),
	Map(func(x int) int {
		return x * 2
	}),
	Skip[int](2),
	Take[int](2),
	Sum[int](),
	Map(func(x int) int {
		return x / 2
	}),
	Max[int](),
)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 7
Completed

func Pipe7 ΒΆ

func Pipe7[A, B, C, D, E, F, G, H any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
) Observable[H]

Pipe7 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 7 operators.

`PipeOp7()` is the operator version of `Pipe7()`.

func Pipe8 ΒΆ

func Pipe8[A, B, C, D, E, F, G, H, I any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
) Observable[I]

Pipe8 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 8 operators.

`PipeOp8()` is the operator version of `Pipe8()`.

func Pipe9 ΒΆ

func Pipe9[A, B, C, D, E, F, G, H, I, J any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
) Observable[J]

Pipe9 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 9 operators.

`PipeOp9()` is the operator version of `Pipe9()`.

func Pipe10 ΒΆ

func Pipe10[A, B, C, D, E, F, G, H, I, J, K any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
) Observable[K]

Pipe10 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 10 operators.

`PipeOp10()` is the operator version of `Pipe10()`.

func Pipe11 ΒΆ

func Pipe11[A, B, C, D, E, F, G, H, I, J, K, L any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
) Observable[L]

Pipe11 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 11 operators.

`PipeOp11()` is the operator version of `Pipe11()`.

func Pipe12 ΒΆ

func Pipe12[A, B, C, D, E, F, G, H, I, J, K, L, M any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
) Observable[M]

Pipe12 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 12 operators.

`PipeOp12()` is the operator version of `Pipe12()`.

func Pipe13 ΒΆ

func Pipe13[A, B, C, D, E, F, G, H, I, J, K, L, M, N any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
) Observable[N]

Pipe13 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 13 operators.

`PipeOp13()` is the operator version of `Pipe13()`.

func Pipe14 ΒΆ

func Pipe14[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
) Observable[O]

Pipe14 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 14 operators.

`PipeOp14()` is the operator version of `Pipe14()`.

func Pipe15 ΒΆ

func Pipe15[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
) Observable[P]

Pipe15 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 15 operators.

`PipeOp15()` is the operator version of `Pipe15()`.

func Pipe16 ΒΆ

func Pipe16[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
) Observable[Q]

Pipe16 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 16 operators.

`PipeOp16()` is the operator version of `Pipe16()`.

func Pipe17 ΒΆ

func Pipe17[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
) Observable[R]

Pipe17 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 17 operators.

`PipeOp17()` is the operator version of `Pipe17()`.

func Pipe18 ΒΆ

func Pipe18[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
	operator18 func(Observable[R]) Observable[S],
) Observable[S]

Pipe18 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 18 operators.

`PipeOp18()` is the operator version of `Pipe18()`.

func Pipe19 ΒΆ

func Pipe19[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
	operator18 func(Observable[R]) Observable[S],
	operator19 func(Observable[S]) Observable[T],
) Observable[T]

Pipe19 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 19 operators.

`PipeOp19()` is the operator version of `Pipe19()`.

func Pipe20 ΒΆ

func Pipe20[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
	operator18 func(Observable[R]) Observable[S],
	operator19 func(Observable[S]) Observable[T],
	operator20 func(Observable[T]) Observable[U],
) Observable[U]

Pipe20 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 20 operators.

`PipeOp20()` is the operator version of `Pipe20()`.

func Pipe21 ΒΆ

func Pipe21[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
	operator18 func(Observable[R]) Observable[S],
	operator19 func(Observable[S]) Observable[T],
	operator20 func(Observable[T]) Observable[U],
	operator21 func(Observable[U]) Observable[V],
) Observable[V]

Pipe21 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 21 operators.

`PipeOp21()` is the operator version of `Pipe21()`.

func Pipe22 ΒΆ

func Pipe22[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
	operator18 func(Observable[R]) Observable[S],
	operator19 func(Observable[S]) Observable[T],
	operator20 func(Observable[T]) Observable[U],
	operator21 func(Observable[U]) Observable[V],
	operator22 func(Observable[V]) Observable[W],
) Observable[W]

Pipe22 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 22 operators.

`PipeOp22()` is the operator version of `Pipe22()`.

func Pipe23 ΒΆ

func Pipe23[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
	operator18 func(Observable[R]) Observable[S],
	operator19 func(Observable[S]) Observable[T],
	operator20 func(Observable[T]) Observable[U],
	operator21 func(Observable[U]) Observable[V],
	operator22 func(Observable[V]) Observable[W],
	operator23 func(Observable[W]) Observable[X],
) Observable[X]

Pipe23 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 23 operators.

`PipeOp23()` is the operator version of `Pipe23()`.

func Pipe24 ΒΆ

func Pipe24[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
	operator18 func(Observable[R]) Observable[S],
	operator19 func(Observable[S]) Observable[T],
	operator20 func(Observable[T]) Observable[U],
	operator21 func(Observable[U]) Observable[V],
	operator22 func(Observable[V]) Observable[W],
	operator23 func(Observable[W]) Observable[X],
	operator24 func(Observable[X]) Observable[Y],
) Observable[Y]

Pipe24 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 24 operators.

`PipeOp24()` is the operator version of `Pipe24()`.

func Pipe25 ΒΆ

func Pipe25[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y, Z any](
	source Observable[A],
	operator1 func(Observable[A]) Observable[B],
	operator2 func(Observable[B]) Observable[C],
	operator3 func(Observable[C]) Observable[D],
	operator4 func(Observable[D]) Observable[E],
	operator5 func(Observable[E]) Observable[F],
	operator6 func(Observable[F]) Observable[G],
	operator7 func(Observable[G]) Observable[H],
	operator8 func(Observable[H]) Observable[I],
	operator9 func(Observable[I]) Observable[J],
	operator10 func(Observable[J]) Observable[K],
	operator11 func(Observable[K]) Observable[L],
	operator12 func(Observable[L]) Observable[M],
	operator13 func(Observable[M]) Observable[N],
	operator14 func(Observable[N]) Observable[O],
	operator15 func(Observable[O]) Observable[P],
	operator16 func(Observable[P]) Observable[Q],
	operator17 func(Observable[Q]) Observable[R],
	operator18 func(Observable[R]) Observable[S],
	operator19 func(Observable[S]) Observable[T],
	operator20 func(Observable[T]) Observable[U],
	operator21 func(Observable[U]) Observable[V],
	operator22 func(Observable[V]) Observable[W],
	operator23 func(Observable[W]) Observable[X],
	operator24 func(Observable[X]) Observable[Y],
	operator25 func(Observable[Y]) Observable[Z],
) Observable[Z]

Pipe25 is a typesafe πŸŽ‰ implementation of Pipe, that takes a source and 25 operators.

`PipeOp25()` is the operator version of `Pipe25()`.

func Race ΒΆ

func Race[T any](sources ...Observable[T]) Observable[T]

Race creates an Observable that mirrors the first source Observable to emit a next, error or complete notification from the combination of the Observable sources. It cancels the subscriptions to all other Observables. It completes when the source Observable completes. If the source Observable emits an error, the error is emitted by the resulting Observable. Play: https://go.dev/play/p/5VzGFd62SMC

Example (Error) ΒΆ
observable := Race(
	Delay[int](75*time.Millisecond)(Just(1, 2, 3)),
	Delay[int](25*time.Millisecond)(Throw[int](assert.AnError)),
	Delay[int](100*time.Millisecond)(Just(7, 8, 9)),
)

subscription := observable.Subscribe(PrintObserver[int]())

time.Sleep(50 * time.Millisecond)
subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Race(
	Delay[int](75*time.Millisecond)(Just(1, 2, 3)),
	Delay[int](25*time.Millisecond)(Just(4, 5, 6)),
	Delay[int](100*time.Millisecond)(Just(7, 8, 9)),
)

subscription := observable.Subscribe(PrintObserver[int]())

time.Sleep(50 * time.Millisecond)
subscription.Unsubscribe()
Output:

Next: 4
Next: 5
Next: 6
Completed

func RandFloat64 ΒΆ

func RandFloat64(count int) Observable[float64]

RandFloat64 creates an Observable that emits random float64 values in the range [0, 1). The count is the number of values to emit. Play: https://go.dev/play/p/MRuy8rUpTve

func RandIntN ΒΆ

func RandIntN(n, count int) Observable[int]

RandIntN creates an Observable that emits random int values in the range [0, n). The count is the number of values to emit. Play: https://go.dev/play/p/4m7T5j-7i3a

func Range ΒΆ

func Range(start, end int64) Observable[int64]

Range creates an Observable that emits a range of integers. The range is [start:end), so `start` is emitted but not `end`. If `start` is equal to `end`, an empty Observable is returned. If `start` is greater than `end`, the emitted values are in descending order. The step is 1. Play: https://go.dev/play/p/5XAXfNrtJm2

Example ΒΆ
observable := Range(0, 5)

subscription := observable.Subscribe(PrintObserver[int64]())
defer subscription.Unsubscribe()
Output:

Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Completed

func RangeWithInterval ΒΆ

func RangeWithInterval(start, end int64, interval time.Duration) Observable[int64]

RangeWithInterval creates an Observable that emits a range of integers. The range is [start:end), so `start` is emitted but not `end`. If `start` is equal to `end`, an empty Observable is returned. If `start` is greater than `end`, the emitted values are in descending order. The interval is the time between each value. The first value is emitted after the first interval has passed. The step is 1. Play: https://go.dev/play/p/Y_1l6BDbMSi

Example ΒΆ
observable := RangeWithInterval(0, 5, 10*time.Millisecond)

subscription := observable.Subscribe(PrintObserver[int64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Completed

func RangeWithStep ΒΆ

func RangeWithStep(start, end, step float64) Observable[float64]

RangeWithStep creates an Observable that emits a range of floats. The range is [start:end), so `start` is emitted but not `end`. If `start` is equal to `end`, an empty Observable is returned. If `start` is greater than `end`, the emitted values are in descending order. The step must be greater than 0. Play: https://go.dev/play/p/EOG0tIVjUKC

Example ΒΆ
observable := RangeWithStep(0, 5, 0.5)

subscription := observable.Subscribe(PrintObserver[float64]())
defer subscription.Unsubscribe()
Output:

Next: 0
Next: 0.5
Next: 1
Next: 1.5
Next: 2
Next: 2.5
Next: 3
Next: 3.5
Next: 4
Next: 4.5
Completed

func RangeWithStepAndInterval ΒΆ

func RangeWithStepAndInterval(start, end, step float64, interval time.Duration) Observable[float64]

RangeWithStepAndInterval creates an Observable that emits a range of floats. The range is [start:end), so `start` is emitted but not `end`. If `start` is equal to `end`, an empty Observable is returned. If `start` is greater than `end`, the emitted values are in descending order. The step must be greater than 0. The interval is the time between each value. The first value is emitted after the first interval has passed. Play: https://go.dev/play/p/kdAEsGwfqw9

Example ΒΆ
observable := RangeWithStepAndInterval(0, 5, 0.5, 10*time.Millisecond)

subscription := observable.Subscribe(PrintObserver[float64]())
subscription.Wait() // Note: using .Wait() is not recommended.
Output:

Next: 0
Next: 0.5
Next: 1
Next: 1.5
Next: 2
Next: 2.5
Next: 3
Next: 3.5
Next: 4
Next: 4.5
Completed

func Repeat ΒΆ

func Repeat[T any](item T, count int64) Observable[T]

Repeat creates an Observable that emits a single value multiple times. This is a creation operator. The pipeable equivalent is `RepeatWith`. Play: https://go.dev/play/p/CUvh_TYALNe

Example ΒΆ
observable := Repeat(42, 3)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Next: 42
Next: 42
Next: 42
Completed

func RepeatWithInterval ΒΆ

func RepeatWithInterval[T any](item T, count int64, interval time.Duration) Observable[T]

RepeatWithInterval creates an Observable that emits a single value multiple times. The interval is the time between each value. The first value is emitted after the first interval has passed. Play: https://go.dev/play/p/4PK5Zt2sGze

Example ΒΆ
observable := RepeatWithInterval(42, 3, 50*time.Millisecond)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()

time.Sleep(200 * time.Millisecond)
Output:

Next: 42
Next: 42
Next: 42
Completed

func Start ΒΆ

func Start[T any](cb func() T) Observable[T]

Start creates an Observable that emits lazily a single value. Play: https://go.dev/play/p/Jz7oyagu07u

Example ΒΆ
observable := Start(func() int {
	fmt.Println("Start!")
	return 42
})

subscription1 := observable.Subscribe(PrintObserver[int]())
subscription2 := observable.Subscribe(PrintObserver[int]())

subscription1.Wait() // Note: using .Wait() is not recommended.
subscription2.Wait() // Note: using .Wait() is not recommended.
Output:

Start!
Next: 42
Completed
Start!
Next: 42
Completed

func Throw ΒΆ

func Throw[T any](err error) Observable[T]

Throw creates an Observable that emits an error and completes immediately. Play: https://go.dev/play/p/1TBK8LdDRJF

Example ΒΆ
observable := Throw[int](assert.AnError)

subscription := observable.Subscribe(PrintObserver[int]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing

func Timer ΒΆ

func Timer(duration time.Duration) Observable[time.Duration]

Timer creates an Observable that emits a value after a specified duration. Play: https://go.dev/play/p/hMkNLEqpcy3

Example ΒΆ
observable := Timer(10 * time.Millisecond)

subscription := observable.Subscribe(PrintObserver[time.Duration]())
defer subscription.Unsubscribe()
Output:

Next: 10ms
Completed

func Zip ΒΆ

func Zip[T any](sources ...Observable[T]) Observable[[]T]

Zip combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/5YxbQ5jNzjQ

Example (Error) ΒΆ
observable := Zip(
	Range(1, 3),
	Throw[int64](assert.AnError),
)

subscription := observable.Subscribe(PrintObserver[[]int64]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Zip(
	Range(1, 3),
	Range(10, 13),
)

subscription := observable.Subscribe(PrintObserver[[]int64]())
defer subscription.Unsubscribe()
Output:

Next: [1 10]
Next: [2 11]
Completed

func Zip2 ΒΆ

func Zip2[A, B any](obsA Observable[A], obsB Observable[B]) Observable[lo.Tuple2[A, B]]

Zip2 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/5YxbQ5jNzjQ

Example (Error) ΒΆ
observable := Zip2(
	Range(0, 10),
	Throw[int64](assert.AnError),
)

subscription := observable.Subscribe(PrintObserver[lo.Tuple2[int64, int64]]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Zip2(
	Range(0, 10),
	Skip[int64](1)(Range(0, 4)),
)

subscription := observable.Subscribe(PrintObserver[lo.Tuple2[int64, int64]]())
defer subscription.Unsubscribe()
Output:

Next: {0 1}
Next: {1 2}
Next: {2 3}
Completed

func Zip3 ΒΆ

func Zip3[A, B, C any](obsA Observable[A], obsB Observable[B], obsC Observable[C]) Observable[lo.Tuple3[A, B, C]]

Zip3 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/5YxbQ5jNzjQ

Example (Error) ΒΆ
observable := Zip3(
	Range(1, 3),
	Throw[int64](assert.AnError),
	Range(100, 103),
)

subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int64, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Zip3(
	Range(1, 3),
	Range(10, 13),
	Range(100, 103),
)

subscription := observable.Subscribe(PrintObserver[lo.Tuple3[int64, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Next: {1 10 100}
Next: {2 11 101}
Completed

func Zip4 ΒΆ

func Zip4[A, B, C, D any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D]) Observable[lo.Tuple4[A, B, C, D]]

Zip4 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/5YxbQ5jNzjQ

Example (Error) ΒΆ
observable := Zip4(
	Range(1, 3),
	Throw[int64](assert.AnError),
	Range(100, 103),
	Range(1000, 1003),
)

subscription := observable.Subscribe(PrintObserver[lo.Tuple4[int64, int64, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Zip4(
	Range(1, 3),
	Range(10, 13),
	Range(100, 103),
	Range(1000, 1003),
)

subscription := observable.Subscribe(PrintObserver[lo.Tuple4[int64, int64, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Next: {1 10 100 1000}
Next: {2 11 101 1001}
Completed

func Zip5 ΒΆ

func Zip5[A, B, C, D, E any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E]) Observable[lo.Tuple5[A, B, C, D, E]]

Zip5 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/5YxbQ5jNzjQ

Example (Error) ΒΆ
observable := Zip5(
	Range(1, 3),
	Throw[int64](assert.AnError),
	Range(100, 103),
	Range(1000, 1003),
	Range(10000, 10003),
)

subscription := observable.Subscribe(PrintObserver[lo.Tuple5[int64, int64, int64, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Zip5(
	Range(1, 3),
	Range(10, 13),
	Range(100, 103),
	Range(1000, 1003),
	Range(10000, 10003),
)

subscription := observable.Subscribe(PrintObserver[lo.Tuple5[int64, int64, int64, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Next: {1 10 100 1000 10000}
Next: {2 11 101 1001 10001}
Completed

func Zip6 ΒΆ

func Zip6[A, B, C, D, E, F any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E], obsF Observable[F]) Observable[lo.Tuple6[A, B, C, D, E, F]]

Zip6 combines the values from the source Observable with the latest values from the other Observables. It will only emit when all Observables have emitted at least one value. It completes when the source Observable completes. Play: https://go.dev/play/p/5YxbQ5jNzjQ

Example (Error) ΒΆ
observable := Zip6(
	Range(1, 3),
	Throw[int64](assert.AnError),
	Range(100, 103),
	Range(1000, 1003),
	Range(10000, 10003),
	Range(100000, 100003),
)

subscription := observable.Subscribe(PrintObserver[lo.Tuple6[int64, int64, int64, int64, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Error: assert.AnError general error for testing
Example (Ok) ΒΆ
observable := Zip6(
	Range(1, 3),
	Range(10, 13),
	Range(100, 103),
	Range(1000, 1003),
	Range(10000, 10003),
	Range(100000, 100003),
)

subscription := observable.Subscribe(PrintObserver[lo.Tuple6[int64, int64, int64, int64, int64, int64]]())
defer subscription.Unsubscribe()
Output:

Next: {1 10 100 1000 10000 100000}
Next: {2 11 101 1001 10001 100001}
Completed

type Observer ΒΆ

type Observer[T any] interface {
	// Next receives the next value from the Observable. It is called zero or
	// more times by the Observable. The Observable may call Next synchronously
	// or asynchronously. If Next is called after the Observer has been closed,
	// the value will be dropped.
	Next(value T)
	NextWithContext(ctx context.Context, value T)
	// Error receives an error from the Observable. It is called at most once by
	// the Observable. The Observable may call Error synchronously or
	// asynchronously. If Error is called after the Observer has been closed, the
	// error will be dropped.
	Error(err error)
	ErrorWithContext(ctx context.Context, err error)
	// Complete receives a completion notification from the Observable. It is called
	// at most once by the Observable. The Observable may call Complete
	// synchronously or asynchronously. If Complete is called after the Observer has
	// been closed, the completion notification will be dropped.
	Complete()
	CompleteWithContext(ctx context.Context)

	// IsClosed returns true if the Observer has been closed, either by an error
	// or completion notification. If the Observer is closed, it will not receive
	// any more notifications.
	IsClosed() bool
	// HasThrown returns true if the Observer has received an error notification.
	HasThrown() bool
	// IsCompleted returns true if the Observer has received a completion notification.
	IsCompleted() bool
}

Observer is the consumer of an Observable. It receives notifications: Next, Error, and Complete. Observers are safe for concurrent calls to Next, Error, and Complete. It is the responsibility of the Observer to ensure that notifications are not forwarded after it has been closed.

func NewObserver ΒΆ

func NewObserver[T any](onNext func(value T), onError func(err error), onComplete func()) Observer[T]

NewObserver creates a new Observer with the provided callbacks. No context is provided.

Example ΒΆ
observer := NewObserver(
	func(value int) {
		fmt.Printf("Next: %d\n", value)
	},
	func(err error) {
		fmt.Printf("Error: %s\n", err.Error())
	},
	func() {
		fmt.Printf("Completed\n")
	},
)

observer.Next(123)  // 123 logged
observer.Next(456)  // 456 logged
observer.Complete() // Completed logged

observer.Next(789) // nothing logged
Output:

Next: 123
Next: 456
Completed
Example (Empty) ΒΆ
observer := NewObserver(
	func(value int) {
		fmt.Printf("Next: %d\n", value)
	},
	func(err error) {
		fmt.Printf("Error: %s\n", err.Error())
	},
	func() {
		fmt.Printf("Completed\n")
	},
)

observer.Complete() // Completed logged

observer.Next(123) // nothing logged
Output:

Completed
Example (Error) ΒΆ
observer := NewObserver(
	func(value int) {
		fmt.Printf("Next: %d\n", value)
	},
	func(err error) {
		fmt.Printf("Error: %s\n", err.Error())
	},
	func() {
		fmt.Printf("Completed\n")
	},
)

observer.Next(123)             // 123 logged
observer.Next(456)             // 456 logged
observer.Error(assert.AnError) // Completed logged

observer.Next(789) // nothing logged
Output:

Next: 123
Next: 456
Error: assert.AnError general error for testing

func NewObserverWithContext ΒΆ

func NewObserverWithContext[T any](onNext func(ctx context.Context, value T), onError func(ctx context.Context, err error), onComplete func(ctx context.Context)) Observer[T]

NewObserverWithContext creates a new Observer with the provided callbacks. A context is provided to each callback.

func NoopObserver ΒΆ

func NoopObserver[T any]() Observer[T]

NoopObserver is an Observer that does nothing. Warning: This observer will silent errors.

func OnComplete ΒΆ

func OnComplete[T any](onComplete func()) Observer[T]

OnComplete is a partial Observer with only the Complete method implemented. Warning: This observer will silent errors.

func OnCompleteWithContext ΒΆ

func OnCompleteWithContext[T any](onComplete func(ctx context.Context)) Observer[T]

OnCompleteWithContext is a partial Observer with only the Complete method implemented. Warning: This observer will silent errors.

func OnError ΒΆ

func OnError[T any](onError func(err error)) Observer[T]

OnError is a partial Observer with only the Error method implemented.

func OnErrorWithContext ΒΆ

func OnErrorWithContext[T any](onError func(ctx context.Context, err error)) Observer[T]

OnErrorWithContext is a partial Observer with only the Error method implemented.

func OnNext ΒΆ

func OnNext[T any](onNext func(value T)) Observer[T]

OnNext is a partial Observer with only the Next method implemented. Warning: This observer will silent errors.

func OnNextWithContext ΒΆ

func OnNextWithContext[T any](onNext func(ctx context.Context, value T)) Observer[T]

OnNextWithContext is a partial Observer with only the Next method implemented. Warning: This observer will silent errors.

func PrintObserver ΒΆ

func PrintObserver[T any]() Observer[T]

PrintObserver is an utilitary Observer that dump notifications for debug purpose.

type RetryConfig ΒΆ

type RetryConfig struct {
	MaxRetries     uint64
	Delay          time.Duration
	ResetOnSuccess bool
}

RetryConfig is the configuration for the Retry operator.

type ShareConfig ΒΆ

type ShareConfig[T any] struct {
	Connector           func() Subject[T]
	ResetOnError        bool
	ResetOnComplete     bool
	ResetOnRefCountZero bool
}

ShareConfig is the configuration for the Share operator.

type ShareReplayConfig ΒΆ

type ShareReplayConfig struct {
	ResetOnRefCountZero bool
}

ShareReplayConfig is the configuration for the ShareReplay operator.

type Subject ΒΆ

type Subject[T any] interface {
	Observable[T]
	Observer[T]

	HasObserver() bool
	CountObservers() int

	IsClosed() bool
	HasThrown() bool
	IsCompleted() bool

	AsObservable() Observable[T]
	AsObserver() Observer[T]
}

Subject is a sort of bridge or proxy, that acts both as an observer and as an Observable. Because it is an observer, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.

func NewAsyncSubject ΒΆ

func NewAsyncSubject[T any]() Subject[T]

NewAsyncSubject emits its last value on completion. If no value had been received, observer is completed without emitting value. The emitted value or error is stored for future subscriptions. 0 or 1 value is emitted.

Example ΒΆ
subject := NewAsyncSubject[int]()

subject.Subscribe(PrintObserver[int]())

subject.Next(123) // nothing logged

sub := Pipe1(
	subject.AsObservable(),
	Delay[int](25*time.Millisecond),
).Subscribe(PrintObserver[int]())
defer sub.Unsubscribe()

subject.Next(456) // nothing logged

subject.Complete() // 456 logged by both subscribers

time.Sleep(50 * time.Millisecond)

subject.Next(789)                       // nothing logged
subject.Subscribe(PrintObserver[int]()) // 456 logged by both subscribers
Output:

Next: 456
Completed
Next: 456
Completed
Next: 456
Completed
Example (Empty) ΒΆ
subject := NewAsyncSubject[int]()

subject.Subscribe(PrintObserver[int]())

subject.Complete() // nothing logged

subject.Subscribe(PrintObserver[int]())

subject.Next(123) // nothing logged
Output:

Completed
Completed
Example (Error) ΒΆ
subject := NewAsyncSubject[int]()

subject.Subscribe(PrintObserver[int]())

subject.Next(123) // nothing logged

subject.Subscribe(PrintObserver[int]())

subject.Next(456) // nothing logged

subject.Error(assert.AnError) // error logged by both subscribers

subject.Subscribe(PrintObserver[int]()) // error logged by last subscriber

subject.Next(789)  // nothing logged
subject.Complete() // nothing logged
Output:

Error: assert.AnError general error for testing
Error: assert.AnError general error for testing
Error: assert.AnError general error for testing

func NewBehaviorSubject ΒΆ

func NewBehaviorSubject[T any](initial T) Subject[T]

NewBehaviorSubject emits the current value to new subscribers or initial value. After completion, new subscription won't receive the last value, but the error will eventually propagated.

Example ΒΆ
subject := NewBehaviorSubject(42)

subject.Subscribe(PrintObserver[int]()) // 42 logged by first subscriber

subject.Next(123) // 123 logged by first subscriber

subject.Subscribe(PrintObserver[int]()) // 123 logged by second subscriber

subject.Next(456) // 123 logged by second subscriber

subject.Complete() // 456 logged by both subscribers

subject.Next(789)                       // nothing logged
subject.Subscribe(PrintObserver[int]()) // nothing logged
Output:

Next: 42
Next: 123
Next: 123
Next: 456
Next: 456
Completed
Completed
Completed
Example (Empty) ΒΆ
subject := NewBehaviorSubject(42)

subject.Complete() // nothing logged

subject.Subscribe(PrintObserver[int]()) // nothing logged
subject.Subscribe(PrintObserver[int]()) // nothing logged

subject.Next(123) // nothing logged
Output:

Completed
Completed
Example (Error) ΒΆ
subject := NewBehaviorSubject(42)

subject.Subscribe(PrintObserver[int]()) // 42 logged by first subscriber

subject.Next(123) // 123 logged by first subscriber

subject.Subscribe(PrintObserver[int]()) // 123 logged by second subscriber

subject.Next(456) // nothing logged

subject.Error(assert.AnError) // error logged by both subscribers

subject.Subscribe(PrintObserver[int]()) // error logged by last subscriber

subject.Next(789) // nothing logged
Output:

Next: 42
Next: 123
Next: 123
Next: 456
Next: 456
Error: assert.AnError general error for testing
Error: assert.AnError general error for testing
Error: assert.AnError general error for testing

func NewPublishSubject ΒΆ

func NewPublishSubject[T any]() Subject[T]

NewPublishSubject broadcasts a value to observers (fanout). Values received before subscription are not transmitted.

Example ΒΆ
subject := NewPublishSubject[int]()

subject.Subscribe(PrintObserver[int]())

subject.Next(123) // 123 logged by first subscriber

subject.Subscribe(PrintObserver[int]())

subject.Next(456) // 456 logged by both subscribers

subject.Complete()

subject.Next(789) // nothing logged
Output:

Next: 123
Next: 456
Next: 456
Completed
Completed
Example (Empty) ΒΆ
subject := NewPublishSubject[int]()

subject.Subscribe(PrintObserver[int]())

subject.Complete() // nothing logged

subject.Subscribe(PrintObserver[int]())

subject.Next(123) // nothing logged
Output:

Completed
Completed
Example (Error) ΒΆ
subject := NewPublishSubject[int]()

subject.Subscribe(PrintObserver[int]())

subject.Next(123) // 123 logged by first subscriber

subject.Subscribe(PrintObserver[int]())

subject.Next(456) // 456 logged by both subscribers

subject.Error(assert.AnError) // error logged by both subscribers

subject.Subscribe(PrintObserver[int]()) // error logged by last subscriber

subject.Next(789)  // nothing logged
subject.Complete() // nothing logged
Output:

Next: 123
Next: 456
Next: 456
Error: assert.AnError general error for testing
Error: assert.AnError general error for testing
Error: assert.AnError general error for testing

func NewReplaySubject ΒΆ

func NewReplaySubject[T any](bufferSize int) Subject[T]

NewReplaySubject emits old values to new subscribers. After error or completion, new subscriptions receive values from the buffer then the error or the completion.

Example ΒΆ
subject := NewReplaySubject[int](42)

subject.Subscribe(PrintObserver[int]())

subject.Next(123) // 123 logged by first subscriber

subject.Subscribe(PrintObserver[int]()) // 123 logged by second subscriber

subject.Next(456) // 456 logged by both subscriber

subject.Complete()

subject.Subscribe(PrintObserver[int]()) // 123 and 456 logged by third subscriber

subject.Next(789) // nothing logged
Output:

Next: 123
Next: 123
Next: 456
Next: 456
Completed
Completed
Next: 123
Next: 456
Completed
Example (Empty) ΒΆ
subject := NewReplaySubject[int](42)

subject.Subscribe(PrintObserver[int]())

subject.Complete() // nothing logged

subject.Subscribe(PrintObserver[int]())

subject.Next(123) // nothing logged
Output:

Completed
Completed
Example (Error) ΒΆ
subject := NewReplaySubject[int](42)

subject.Subscribe(PrintObserver[int]())

subject.Next(123) // 123 logged by first subscriber

subject.Subscribe(PrintObserver[int]()) // 123 logged by second subscriber

subject.Next(456) // 456 logged by both subscriber

subject.Error(assert.AnError) // error logged by both subscribers

subject.Subscribe(PrintObserver[int]()) // error logged by last subscriber

subject.Next(789)  // nothing logged
subject.Complete() // nothing logged
Output:

Next: 123
Next: 123
Next: 456
Next: 456
Error: assert.AnError general error for testing
Error: assert.AnError general error for testing
Next: 123
Next: 456
Error: assert.AnError general error for testing
Example (Overflow) ΒΆ
subject := NewReplaySubject[int](2)

subject.Next(123)  // nothing logged
subject.Next(456)  // nothing logged
subject.Next(789)  // nothing logged
subject.Complete() // nothing logged

subject.Subscribe(PrintObserver[int]()) // 456 and 789 logged
Output:

Next: 456
Next: 789
Completed

func NewSubject ΒΆ

func NewSubject[T any]() Subject[T]

NewSubject is an alias to NewPublishSubject.

func NewUnicastSubject ΒΆ

func NewUnicastSubject[T any](bufferSize int) Subject[T]

NewUnicastSubject queues up events until a single Observer subscribes to it, replays those events to it until the Observer catches up and then switches to relaying events live to this single Observer.

Example ΒΆ
subject := NewUnicastSubject[int](42)

subject.Subscribe(PrintObserver[int]())

subject.Next(123) // 123 logged by first subscriber

subject.Subscribe(PrintObserver[int]()) // error

subject.Next(456) // 456 logged by both subscriber

subject.Complete()

subject.Subscribe(PrintObserver[int]()) // 123 and 456 logged by third subscriber

subject.Next(789) // 789 logged by third subscriber
Output:

Next: 123
Error: ro.UnicastSubject: a single subscriber accepted
Next: 456
Completed
Completed
Example (Empty) ΒΆ
subject := NewUnicastSubject[int](42)

subject.Subscribe(PrintObserver[int]())

subject.Complete() // nothing logged

subject.Subscribe(PrintObserver[int]())

subject.Next(123) // nothing logged
Output:

Completed
Completed
Example (Error) ΒΆ
subject := NewUnicastSubject[int](42)

subject.Subscribe(PrintObserver[int]())

subject.Next(123) // 123 logged by first subscriber

subject.Subscribe(PrintObserver[int]()) // 123 logged by second subscriber

subject.Next(456) // 456 logged by both subscriber

subject.Error(assert.AnError) // error logged by both subscribers

subject.Subscribe(PrintObserver[int]()) // error logged by last subscriber

subject.Next(789)  // nothing logged
subject.Complete() // nothing logged
Output:

Next: 123
Error: ro.UnicastSubject: a single subscriber accepted
Next: 456
Error: assert.AnError general error for testing
Error: assert.AnError general error for testing
Example (Overflow) ΒΆ
subject := NewUnicastSubject[int](2)

subject.Next(123)  // nothing logged
subject.Next(456)  // nothing logged
subject.Next(789)  // nothing logged
subject.Complete() // nothing logged

subject.Subscribe(PrintObserver[int]()) // 456 and 789 logged
Output:

Completed

type Subscriber ΒΆ

type Subscriber[T any] interface {
	Subscription
	Observer[T]
}

Subscriber implements the Observer and Subscription interfaces. While the Observer is the public API for consuming the values of an Observable, all Observers get converted to a Subscriber, in order to provide Subscription-like capabilities such as `Unsubscribe()`. Subscriber is a common type in samber/ro, and crucial for implementing operators, but it is rarely used as a public API.

func NewEventuallySafeSubscriber ΒΆ

func NewEventuallySafeSubscriber[T any](destination Observer[T]) Subscriber[T]

NewEventuallySafeSubscriber creates a new Subscriber from an Observer. If the Observer is already a Subscriber, it is returned as is. Otherwise, a new Subscriber is created that wraps the Observer.

The returned Subscriber will unsubscribe from the destination Observer when Unsubscribe() is called.

This method is safe for concurrent use, but concurrent messages are dropped.

It is rarely used as a public API.

func NewSafeSubscriber ΒΆ

func NewSafeSubscriber[T any](destination Observer[T]) Subscriber[T]

NewSafeSubscriber creates a new Subscriber from an Observer. If the Observer is already a Subscriber, it is returned as is. Otherwise, a new Subscriber is created that wraps the Observer.

The returned Subscriber will unsubscribe from the destination Observer when Unsubscribe() is called.

This method is safe for concurrent use.

It is rarely used as a public API.

func NewSubscriber ΒΆ

func NewSubscriber[T any](destination Observer[T]) Subscriber[T]

NewSubscriber creates a new Subscriber from an Observer. If the Observer is already a Subscriber, it is returned as is. Otherwise, a new Subscriber is created that wraps the Observer.

The returned Subscriber will unsubscribe from the destination Observer when Unsubscribe() is called.

This method is safe for concurrent use.

It is rarely used as a public API.

func NewSubscriberWithConcurrencyMode ΒΆ

func NewSubscriberWithConcurrencyMode[T any](destination Observer[T], mode ConcurrencyMode) Subscriber[T]

NewSubscriberWithConcurrencyMode creates a new Subscriber from an Observer. If the Observer is already a Subscriber, it is returned as is. Otherwise, a new Subscriber is created that wraps the Observer.

The returned Subscriber will unsubscribe from the destination Observer when Unsubscribe() is called.

It is rarely used as a public API.

func NewUnsafeSubscriber ΒΆ

func NewUnsafeSubscriber[T any](destination Observer[T]) Subscriber[T]

NewUnsafeSubscriber creates a new Subscriber from an Observer. If the Observer is already a Subscriber, it is returned as is. Otherwise, a new Subscriber is created that wraps the Observer.

The returned Subscriber will unsubscribe from the destination Observer when Unsubscribe() is called.

This method is not safe for concurrent use.

It is rarely used as a public API.

type Subscription ΒΆ

type Subscription interface {
	Unsubscribable

	Add(teardown Teardown)
	AddUnsubscribable(unsubscribable Unsubscribable)
	IsClosed() bool
	Wait() // Note: using .Wait() is not recommended.
}

Subscription represents an ongoing execution of an `Observable`, and has a minimal API which allows you to cancel that execution.

func NewSubscription ΒΆ

func NewSubscription(teardown Teardown) Subscription

NewSubscription creates a new Subscription. When `teardown` is nil, nothing is added. When the subscription is already disposed, the `teardown` callback is triggered immediately.

type Teardown ΒΆ

type Teardown func()

Teardown is a function that cleans up resources, such as closing a file or a network connection. It is called when the Subscription is closed. It is part of a Subscription, and is returned by the Observable creation. It will be called only once, when the Subscription is canceled.

type TimestampValue ΒΆ

type TimestampValue[T any] struct {
	Value     T
	Timestamp time.Duration
}

TimestampValue is a value emitted by the `TimeInterval` operator.

type Unsubscribable ΒΆ

type Unsubscribable interface {
	Unsubscribe()
}

Unsubscribable represents any type that can be unsubscribed from. It provides a common interface for cancellation operations.

Directories ΒΆ

Path Synopsis
ee module
plugins/otel module
examples
connectable module
ee-otel-log module
ee-prometheus module
ics-to-csv module
sql-to-csv module
internal
plugins
bytes module
cron module
encoding/csv module
encoding/gob module
encoding/json module
fsnotify module
http/client module
hyperloglog module
ics module
io module
iter module
proc module
regexp module
signal module
sort module
stdio module
strconv module
strings module
template module
testify module
time module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL