Documentation
¶
Index ¶
- Variables
- type Command
- type Decider
- type DeciderEvolver
- type Event
- type EventStore
- func (s *EventStore) Append(ctx context.Context, events []*Event) (uint64, error)
- func (s *EventStore) Decide(ctx context.Context, model Decider, cmd *Command) ([]*Event, uint64, error)
- func (s *EventStore) DecideAndEvolve(ctx context.Context, model DeciderEvolver, cmd *Command) ([]*Event, uint64, error)
- func (s *EventStore) Evolve(ctx context.Context, model Evolver, opts ...EvolveOption) (uint64, error)
- func (s *EventStore) Watch(ctx context.Context, model Evolver, opts ...WatchOption) (Watcher, error)
- type EventStoreConfig
- type EvolveOption
- type Evolver
- type Expect
- type Manager
- func (m *Manager) CreateEventStore(ctx context.Context, config EventStoreConfig) (*EventStore, error)
- func (m *Manager) DeleteEventStore(ctx context.Context, name string) error
- func (m *Manager) GetEventStore(ctx context.Context, name string) (*EventStore, error)
- func (m *Manager) UpdateEventStore(ctx context.Context, config EventStoreConfig) error
- type ManagerOption
- type Model
- type Viewer
- type WatchOption
- type Watcher
Constants ¶
This section is empty.
Variables ¶
var ( ErrSequenceConflict = errors.New("rita: sequence conflict") ErrEventDataRequired = errors.New("rita: event data required") ErrEventEntityRequired = errors.New("rita: event entity required") ErrEventEntityInvalid = errors.New("rita: event entity invalid") ErrEventTypeRequired = errors.New("rita: event type required") ErrNoEvents = errors.New("rita: no events provided") ErrEventStoreNameRequired = errors.New("rita: event store name is required") ErrSubjectTooManyTokens = errors.New("rita: subject can have at most three tokens") )
Functions ¶
This section is empty.
Types ¶
type Command ¶
type Command struct {
// ID is a unique identifier for the command.
ID string
// Time is the time of when the command was received.
Time time.Time
// Type is a unique name for the command. This can be omitted
// if a type registry is being used, otherwise it must be set explicitly
// to identity the encoded data.
Type string
// Data is the command data. This must be a byte slice (pre-encoded) or a value
// of a type registered in the type registry.
Data any
// Meta is application-defined metadata about the command.
Meta map[string]string
}
Command is a wrapper for application-defined commands.
type Decider ¶
Decider is an interface that application-defined models can implement to decide on state transitions. Zero or more events can be returned that represents the state transitions to be stored.
type DeciderEvolver ¶
DeciderEvolver combines Decider and Evolver for use with DecideAndEvolve.
type Event ¶
type Event struct {
// ID of the event. This will be used as the NATS msg ID
// for de-duplication.
ID string
// Identifier for specific entities. Can be used to determine if
// an event is related to a specific entity/node/endpoint/agent/etc.
// The format must be two tokens, e.g. "node.1234".
Entity string
// Time is the time of when the event occurred which may be different
// from the time the event is appended to the store. If no time is provided,
// the current local time will be used.
Time time.Time
// Type is a unique name for the event itself. This can be omitted
// if a type registry is being used, otherwise it must be set explicitly
// to identity the encoded data.
Type string
// Data is the event data. This must be a byte slice (pre-encoded) or a value
// of a type registered in the type registry.
Data any
// Metadata is application-defined metadata about the event.
Meta map[string]string
// Expect can be set to specify optimistic concurrency control
// expectations for an append operation.
Expect *Expect
// contains filtered or unexported fields
}
Event is a wrapper for application-defined events.
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore persists events to a JetStream stream and provides operations to append, evolve, and watch events.
func (*EventStore) Append ¶
Append appends a one or more events to the subject's event sequence. It returns the resulting sequence number of the last appended event.
func (*EventStore) Decide ¶
func (s *EventStore) Decide(ctx context.Context, model Decider, cmd *Command) ([]*Event, uint64, error)
Decide is a convenience method that combines a model's Decide invocation followed by an Append. If either step fails, an error is returned.
func (*EventStore) DecideAndEvolve ¶
func (s *EventStore) DecideAndEvolve(ctx context.Context, model DeciderEvolver, cmd *Command) ([]*Event, uint64, error)
DecideAndEvolve is a convenience method that decides, stores, and evolves a model in one operation. If any step fails, an error is returned. Note, that if the evolve step fails, the events have already been stored.
func (*EventStore) Evolve ¶
func (s *EventStore) Evolve(ctx context.Context, model Evolver, opts ...EvolveOption) (uint64, error)
Evolve loads events and evolves a model of state. The sequence of the last event that evolved the state is returned, including when an error occurs. Note, the pattern can be several forms depending on the need. The full template is `<entity-type>.<entity-id>.<event-type>`. If only the entity type is provided, all events for all entities of that type will be loaded. If the entity type and entity ID are provided, all events for that specific entity will be loaded. If the full subject is provided, only events of that specific type for that specific entity will be loaded. Wildcards can be used as well.
func (*EventStore) Watch ¶
func (s *EventStore) Watch(ctx context.Context, model Evolver, opts ...WatchOption) (Watcher, error)
Watch creates a watcher that asynchronously consumes events from the event store and applies them to the provided Evolver. The watcher can be configured with various options such as error handling and subject patterns to filter events. Since this will update the Evolver asynchronously, the Evolver implementation must be thread-safe. Use the `NewModel()` helper to create a thread-safe model.
type EventStoreConfig ¶
type EvolveOption ¶
type EvolveOption interface {
// contains filtered or unexported methods
}
EvolveOption is an option for the event store Evolve operation.
func WithAfterSequence ¶
func WithAfterSequence(seq uint64) EvolveOption
WithAfterSequence specifies the sequence of the first event that should be fetched from the sequence up to the end of the sequence. This useful when partially applied state has been derived up to a specific sequence and only the latest events need to be fetched. This can be passed in `Evolve` and `Watch`.
func WithFilters ¶
func WithFilters(filters ...string) EvolveOption
WithFilters specifies the subject filter to use when evolving state. The filter can be in the form of `<entity-type>`, `<entity-type>.<entity-id>`, or `<entity-type>.<entity-id>.<event-type>`. Wildcards can be used as well at any token position. This can be passed in `Evolve` and `Watch`.
func WithStopSequence ¶
func WithStopSequence(seq uint64) EvolveOption
WithStopSequence specifies the sequence of the last event that should be fetched. This is useful to control how much replay is performed when evolving a state.
type Evolver ¶
Evolver is an interface that application-defined models can implement to evolve their state based on events.
type Expect ¶
func ExpectSequence ¶
ExpectSequence can be set to specify the expected sequence number for optimistic concurrency control. If the current last sequence number does not match the provided value, the append will fail. If nil, no sequence check will be performed. The subject defaults to the entity's pattern.
func ExpectSequenceSubject ¶
ExpectSubject can be set to specify an alternative pattern, such as the top-level type.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager creates and manages EventStore instances. It provides shared dependencies (type registry, ID generator, clock) to all stores it creates.
func New ¶
func New(nc *nats.Conn, opts ...ManagerOption) (*Manager, error)
New initializes a new Manager instance with a NATS connection.
func (*Manager) CreateEventStore ¶
func (m *Manager) CreateEventStore(ctx context.Context, config EventStoreConfig) (*EventStore, error)
Create creates the event store given the configuration. The stream name is the name of the store and the subjects default to "{name}.>".
func (*Manager) DeleteEventStore ¶
Delete deletes the event store.
func (*Manager) GetEventStore ¶
func (*Manager) UpdateEventStore ¶
func (m *Manager) UpdateEventStore(ctx context.Context, config EventStoreConfig) error
Update updates the event store configuration.
type ManagerOption ¶
type ManagerOption interface {
// contains filtered or unexported methods
}
ManagerOption models a option when creating a type registry.
func WithClock ¶
func WithClock(clock clock.Clock) ManagerOption
WithClock sets a clock implementation. Default it clock.Time.
func WithIDer ¶
func WithIDer(id id.ID) ManagerOption
WithIDer sets a unique ID generator implementation. Default is id.NUID.
func WithLogger ¶
func WithLogger(logger *slog.Logger) ManagerOption
func WithRegistry ¶
func WithRegistry(types *types.Registry) ManagerOption
WithRegistry sets an explicit type registry.
type Model ¶
type Model[T any] struct { // contains filtered or unexported fields }
Model combines an Evolver, Decider, and Viewer for a specific type T. It provides thread-safe access to the underlying interfaces and keeps track of the last sequence number of events applied to the model.
type WatchOption ¶
type WatchOption interface {
// contains filtered or unexported methods
}
WatchOption is an option for the event store Watch operation.
func WithErrHandler ¶
func WithErrHandler(fn func(error, *Event, jetstream.Msg)) WatchOption
WithErrHandler sets the error handler function for the watcher.
func WithNoWait ¶
func WithNoWait() WatchOption
WithNoWait configures the watcher to not wait for catch-up before returning.