Documentation
¶
Index ¶
- Constants
- Variables
- func AppendIfMissing(topics []string, topic string) []string
- func CloseClientConnection(connection quic.Connection, code uint64, err error) error
- func DefaultAuthenticationFunc(_ string) bool
- func DefaultAuthorizationFunc(_, _ string) bool
- func IsSubscribeTopicValid(topic string, topics []string) bool
- func NewLogger() *zap.Logger
- func SendError(sendStream quic.SendStream, e *Error) error
- func TopicHasWildcard(topic string) bool
- func WriteData(data any, sendStream quic.SendStream) error
- type Client
- type DistributeWork
- type Error
- type Event
- type EventSource
- type Finder
- type Metrics
- type Offer
- type Server
- func (s *Server) GenerateEventSources(topics []string)
- func (s *Server) MetricHandler() http.Handler
- func (s *Server) Publish(topic string, event []byte)
- func (s *Server) SetAuthenticator(authenticator auth.Authenticator)
- func (s *Server) SetAuthenticatorFunc(authenticator auth.AuthenticatorFunc)
- func (s *Server) SetAuthorizer(authorizer auth.Authorizer)
- func (s *Server) SetAuthorizerFunc(authorizer auth.AuthorizerFunc)
- type Subscriber
- type Worker
- type WorkerConfig
Constants ¶
const ( CodeNotAuthorized = iota + 1 CodeTopicNotAvailable CodeFailedToCreateStream CodeFailedToSendOffer CodeUnknown )
const ( DistributeEvent = "Distribute" AcceptClient = "Accept" )
const DELIMITER = '\n'
DELIMITER is the delimiter used to separate messages in streams.
const ErrorTopic = "error"
Variables ¶
var ( ErrNotAuthorized = errors.New("not authorized") ErrFailedToCreateStream = errors.New("failed to create send/receive stream to client") ErrFailedToReadOffer = errors.New("failed to read offer from client") ErrFailedToSendOffer = errors.New("failed to send offer to server") ErrFailedToMarshal = errors.New("failed to marshal/unmarshal data") )
var DefaultOnError = func(code int, data map[string]any) { log.Printf("code: %d, data: %v", code, data) }
DefaultOnError Default handler for processing errors. it listen to topic "error".
var DefaultOnMessage = func(topic string, message []byte) { log.Printf("topic: %s, message: %s\n", topic, string(message)) }
DefaultOnMessage Default handler for processing incoming events without a handler.
Functions ¶
func AppendIfMissing ¶
AppendIfMissing check if item missing append item to list.
func CloseClientConnection ¶
func CloseClientConnection(connection quic.Connection, code uint64, err error) error
func DefaultAuthenticationFunc ¶
DefaultAuthenticationFunc is the default authentication function. it accepts all clients.
func DefaultAuthorizationFunc ¶
DefaultAuthorizationFunc is the default authorization function. it accepts all clients.
func IsSubscribeTopicValid ¶
IsSubscribeTopicValid check the subscribed topic exist or matched with client topics.
func SendError ¶
func SendError(sendStream quic.SendStream, e *Error) error
SendError send input error to client.
func TopicHasWildcard ¶
TopicHasWildcard checks if the topic is a wildcard.
Types ¶
type Client ¶
type Client struct {
Connection quic.Connection
Token string
Topics []string
Logger *zap.Logger
Finder Finder
OnEvent map[string]func(event []byte)
OnMessage func(topic string, message []byte)
OnError func(code int, data map[string]any)
}
func (*Client) AcceptEvents ¶
AcceptEvents reads events from the stream and calls the proper handler. order of calling handlers is as follows: 1. OnError if topic is "error" 2. OnEvent[topic] 3. OnMessage.
func (*Client) SetErrorHandler ¶
SetErrorHandler sets the handler for "error" topic.
func (*Client) SetEventHandler ¶
SetEventHandler sets the handler for the given topic.
func (*Client) SetMessageHandler ¶
SetMessageHandler sets the handler for all topics without handler and "error" topic.
type DistributeWork ¶
type DistributeWork struct {
Event []byte
EventSource *EventSource
}
func NewDistributeWork ¶
func NewDistributeWork(event []byte, eventSource *EventSource) *DistributeWork
type Error ¶
func UnmarshalError ¶
type EventSource ¶
type EventSource struct {
Topic string
DataChannel chan []byte
Subscribers []Subscriber
IncomingSubscribers chan Subscriber
SubscriberWaitingList []Subscriber
Metrics Metrics
Cleaning *atomic.Bool
CleaningInterval time.Duration
}
EventSource is a struct for topic channel and its subscribers.
func NewEventSource ¶
func NewEventSource( topic string, dataChannel chan []byte, subscribers []Subscriber, metric Metrics, cleaningInterval time.Duration, ) *EventSource
func (*EventSource) CleanCorruptSubscribers ¶
func (e *EventSource) CleanCorruptSubscribers()
func (*EventSource) DistributeEvents ¶
func (e *EventSource) DistributeEvents(worker Worker)
DistributeEvents distribute events from channel between subscribers.
func (*EventSource) HandleNewSubscriber ¶
func (e *EventSource) HandleNewSubscriber()
type Finder ¶
Finder for topics.
func (*Finder) FindRelatedWildcardTopics ¶
FindRelatedWildcardTopics find topics patterns that are applicable to the given topic.
type Metrics ¶
type Metrics struct {
EventCounter *prometheus.GaugeVec
SubscriberCounter *prometheus.GaugeVec
}
func NewMetrics ¶
func (Metrics) DecSubscriber ¶
func (Metrics) IncSubscriber ¶
type Offer ¶
type Offer struct {
Token string `json:"token,omitempty"`
Topics []string `json:"topics,omitempty"`
}
func AcceptOffer ¶
func AcceptOffer(connection quic.Connection) (*Offer, error)
type Server ¶
type Server struct {
Worker Worker
Listener *quic.Listener
EventSources map[string]*EventSource
Topics []string
Logger *zap.Logger
Finder Finder
Authenticator auth.Authenticator
Authorizer auth.Authorizer
Metrics Metrics
CleaningInterval time.Duration
}
Server is the main struct for the server.
func (*Server) GenerateEventSources ¶
GenerateEventSources generates eventSources for each topic.
func (*Server) MetricHandler ¶
func (*Server) SetAuthenticator ¶
func (s *Server) SetAuthenticator(authenticator auth.Authenticator)
SetAuthenticator replaces the authentication function.
func (*Server) SetAuthenticatorFunc ¶
func (s *Server) SetAuthenticatorFunc(authenticator auth.AuthenticatorFunc)
SetAuthenticatorFunc replaces the authentication function.
func (*Server) SetAuthorizer ¶
func (s *Server) SetAuthorizer(authorizer auth.Authorizer)
SetAuthorizer replaces the authentication function.
func (*Server) SetAuthorizerFunc ¶
func (s *Server) SetAuthorizerFunc(authorizer auth.AuthorizerFunc)
SetAuthorizerFunc replaces the authentication function.
type Subscriber ¶
type Subscriber struct {
Stream quic.SendStream
Corrupt *atomic.Bool
}
func NewSubscriber ¶
func NewSubscriber(stream quic.SendStream) Subscriber
type Worker ¶
func (*Worker) AddAcceptClientWork ¶
func (*Worker) AddDistributeWork ¶
func (w *Worker) AddDistributeWork(work *DistributeWork)