rita

package module
v0.0.0-...-b1a1457 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2026 License: Apache-2.0 Imports: 16 Imported by: 2

README

Rita

Rita is a library for building event-sourced applications on top of NATS.

NOTE: This package is under heavy development, so breaking changes may be introduced.

GoDoc ReportCard GitHub Actions

Install

Requires Go 1.24+ and NATS 2.12+

go get github.com/synadia-labs/rita

Getting Started

Coming soon!

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrSequenceConflict       = errors.New("rita: sequence conflict")
	ErrEventDataRequired      = errors.New("rita: event data required")
	ErrEventEntityRequired    = errors.New("rita: event entity required")
	ErrEventEntityInvalid     = errors.New("rita: event entity invalid")
	ErrEventTypeRequired      = errors.New("rita: event type required")
	ErrNoEvents               = errors.New("rita: no events provided")
	ErrEventStoreNameRequired = errors.New("rita: event store name is required")
	ErrSubjectTooManyTokens   = errors.New("rita: subject can have at most three tokens")
)
View Source
var (
	ErrEvolverNotImplemented = errors.New("evolver not implemented")
	ErrDeciderNotImplemented = errors.New("decider not implemented")
	ErrViewerNotImplemented  = errors.New("viewer not implemented")
)

Functions

This section is empty.

Types

type Command

type Command struct {
	// ID is a unique identifier for the command.
	ID string

	// Time is the time of when the command was received.
	Time time.Time

	// Type is a unique name for the command. This can be omitted
	// if a type registry is being used, otherwise it must be set explicitly
	// to identity the encoded data.
	Type string

	// Data is the command data. This must be a byte slice (pre-encoded) or a value
	// of a type registered in the type registry.
	Data any

	// Meta is application-defined metadata about the command.
	Meta map[string]string
}

Command is a wrapper for application-defined commands.

type Decider

type Decider interface {
	Decide(*Command) ([]*Event, error)
}

Decider is an interface that application-defined models can implement to decide on state transitions. Zero or more events can be returned that represents the state transitions to be stored.

type DeciderEvolver

type DeciderEvolver interface {
	Decider
	Evolver
}

DeciderEvolver combines Decider and Evolver for use with DecideAndEvolve.

type Event

type Event struct {
	// ID of the event. This will be used as the NATS msg ID
	// for de-duplication.
	ID string

	// Identifier for specific entities. Can be used to determine if
	// an event is related to a specific entity/node/endpoint/agent/etc.
	// The format must be two tokens, e.g. "node.1234".
	Entity string

	// Time is the time of when the event occurred which may be different
	// from the time the event is appended to the store. If no time is provided,
	// the current local time will be used.
	Time time.Time

	// Type is a unique name for the event itself. This can be omitted
	// if a type registry is being used, otherwise it must be set explicitly
	// to identity the encoded data.
	Type string

	// Data is the event data. This must be a byte slice (pre-encoded) or a value
	// of a type registered in the type registry.
	Data any

	// Metadata is application-defined metadata about the event.
	Meta map[string]string

	// Expect can be set to specify optimistic concurrency control
	// expectations for an append operation.
	Expect *Expect
	// contains filtered or unexported fields
}

Event is a wrapper for application-defined events.

func (*Event) Sequence

func (e *Event) Sequence() uint64

Sequence returns the stream sequence number of the event.

func (*Event) Subject

func (e *Event) Subject() string

Subject returns the full NATS subject the event was published to.

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore persists events to a JetStream stream and provides operations to append, evolve, and watch events.

func (*EventStore) Append

func (s *EventStore) Append(ctx context.Context, events []*Event) (uint64, error)

Append appends a one or more events to the subject's event sequence. It returns the resulting sequence number of the last appended event.

func (*EventStore) Decide

func (s *EventStore) Decide(ctx context.Context, model Decider, cmd *Command) ([]*Event, uint64, error)

Decide is a convenience method that combines a model's Decide invocation followed by an Append. If either step fails, an error is returned.

func (*EventStore) DecideAndEvolve

func (s *EventStore) DecideAndEvolve(ctx context.Context, model DeciderEvolver, cmd *Command) ([]*Event, uint64, error)

DecideAndEvolve is a convenience method that decides, stores, and evolves a model in one operation. If any step fails, an error is returned. Note, that if the evolve step fails, the events have already been stored.

func (*EventStore) Evolve

func (s *EventStore) Evolve(ctx context.Context, model Evolver, opts ...EvolveOption) (uint64, error)

Evolve loads events and evolves a model of state. The sequence of the last event that evolved the state is returned, including when an error occurs. Note, the pattern can be several forms depending on the need. The full template is `<entity-type>.<entity-id>.<event-type>`. If only the entity type is provided, all events for all entities of that type will be loaded. If the entity type and entity ID are provided, all events for that specific entity will be loaded. If the full subject is provided, only events of that specific type for that specific entity will be loaded. Wildcards can be used as well.

func (*EventStore) Watch

func (s *EventStore) Watch(ctx context.Context, model Evolver, opts ...WatchOption) (Watcher, error)

Watch creates a watcher that asynchronously consumes events from the event store and applies them to the provided Evolver. The watcher can be configured with various options such as error handling and subject patterns to filter events. Since this will update the Evolver asynchronously, the Evolver implementation must be thread-safe. Use the `NewModel()` helper to create a thread-safe model.

type EventStoreConfig

type EventStoreConfig struct {
	Name        string
	Description string
	Metadata    map[string]string
	Replicas    int
	Storage     jetstream.StorageType
	Placement   *jetstream.Placement
	RePublish   *jetstream.RePublish
	MaxMsgs     int64
	MaxAge      time.Duration
	MaxBytes    int64
}

type EvolveOption

type EvolveOption interface {
	// contains filtered or unexported methods
}

EvolveOption is an option for the event store Evolve operation.

func WithAfterSequence

func WithAfterSequence(seq uint64) EvolveOption

WithAfterSequence specifies the sequence of the first event that should be fetched from the sequence up to the end of the sequence. This useful when partially applied state has been derived up to a specific sequence and only the latest events need to be fetched. This can be passed in `Evolve` and `Watch`.

func WithFilters

func WithFilters(filters ...string) EvolveOption

WithFilters specifies the subject filter to use when evolving state. The filter can be in the form of `<entity-type>`, `<entity-type>.<entity-id>`, or `<entity-type>.<entity-id>.<event-type>`. Wildcards can be used as well at any token position. This can be passed in `Evolve` and `Watch`.

func WithStopSequence

func WithStopSequence(seq uint64) EvolveOption

WithStopSequence specifies the sequence of the last event that should be fetched. This is useful to control how much replay is performed when evolving a state.

type Evolver

type Evolver interface {
	Evolve(*Event) error
}

Evolver is an interface that application-defined models can implement to evolve their state based on events.

type Expect

type Expect struct {
	Sequence uint64
	Pattern  string
}

func ExpectSequence

func ExpectSequence(seq uint64) *Expect

ExpectSequence can be set to specify the expected sequence number for optimistic concurrency control. If the current last sequence number does not match the provided value, the append will fail. If nil, no sequence check will be performed. The subject defaults to the entity's pattern.

func ExpectSequenceSubject

func ExpectSequenceSubject(seq uint64, pattern string) *Expect

ExpectSubject can be set to specify an alternative pattern, such as the top-level type.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager creates and manages EventStore instances. It provides shared dependencies (type registry, ID generator, clock) to all stores it creates.

func New

func New(nc *nats.Conn, opts ...ManagerOption) (*Manager, error)

New initializes a new Manager instance with a NATS connection.

func (*Manager) CreateEventStore

func (m *Manager) CreateEventStore(ctx context.Context, config EventStoreConfig) (*EventStore, error)

Create creates the event store given the configuration. The stream name is the name of the store and the subjects default to "{name}.>".

func (*Manager) DeleteEventStore

func (m *Manager) DeleteEventStore(ctx context.Context, name string) error

Delete deletes the event store.

func (*Manager) GetEventStore

func (m *Manager) GetEventStore(ctx context.Context, name string) (*EventStore, error)

func (*Manager) UpdateEventStore

func (m *Manager) UpdateEventStore(ctx context.Context, config EventStoreConfig) error

Update updates the event store configuration.

type ManagerOption

type ManagerOption interface {
	// contains filtered or unexported methods
}

ManagerOption models a option when creating a type registry.

func WithClock

func WithClock(clock clock.Clock) ManagerOption

WithClock sets a clock implementation. Default it clock.Time.

func WithIDer

func WithIDer(id id.ID) ManagerOption

WithIDer sets a unique ID generator implementation. Default is id.NUID.

func WithLogger

func WithLogger(logger *slog.Logger) ManagerOption

func WithRegistry

func WithRegistry(types *types.Registry) ManagerOption

WithRegistry sets an explicit type registry.

type Model

type Model[T any] struct {
	// contains filtered or unexported fields
}

Model combines an Evolver, Decider, and Viewer for a specific type T. It provides thread-safe access to the underlying interfaces and keeps track of the last sequence number of events applied to the model.

func NewModel

func NewModel[T any](t T) *Model[T]

func (*Model[T]) Decide

func (m *Model[T]) Decide(cmd *Command) ([]*Event, error)

func (*Model[T]) Evolve

func (m *Model[T]) Evolve(event *Event) error

func (*Model[T]) View

func (m *Model[T]) View(fn func(T) error) error

type Viewer

type Viewer[T any] interface {
	View(func(T) error) error
}

Viewer represents a read-only view of the state of an entity.

type WatchOption

type WatchOption interface {
	// contains filtered or unexported methods
}

WatchOption is an option for the event store Watch operation.

func WithErrHandler

func WithErrHandler(fn func(error, *Event, jetstream.Msg)) WatchOption

WithErrHandler sets the error handler function for the watcher.

func WithNoWait

func WithNoWait() WatchOption

WithNoWait configures the watcher to not wait for catch-up before returning.

type Watcher

type Watcher interface {
	Stop()
}

Watcher represents an active event subscription. Call Stop to drain pending messages and stop the underlying consumer.

Directories

Path Synopsis
internal
pb

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL