internal

package
v0.0.0-...-0e4e1ec Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2025 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CodeNotAuthorized = iota + 1
	CodeTopicNotAvailable
	CodeFailedToCreateStream
	CodeFailedToSendOffer
	CodeUnknown
)
View Source
const (
	DistributeEvent = "Distribute"
	AcceptClient    = "Accept"
)
View Source
const DELIMITER = '\n'

DELIMITER is the delimiter used to separate messages in streams.

View Source
const ErrorTopic = "error"

Variables

View Source
var (
	ErrNotAuthorized        = errors.New("not authorized")
	ErrFailedToCreateStream = errors.New("failed to create send/receive stream to client")
	ErrFailedToReadOffer    = errors.New("failed to read offer from client")
	ErrFailedToSendOffer    = errors.New("failed to send offer to server")
	ErrFailedToMarshal      = errors.New("failed to marshal/unmarshal data")
)
View Source
var DefaultOnError = func(code int, data map[string]any) {
	log.Printf("code: %d, data: %v", code, data)
}

DefaultOnError Default handler for processing errors. it listen to topic "error".

View Source
var DefaultOnMessage = func(topic string, message []byte) {
	log.Printf("topic: %s, message: %s\n", topic, string(message))
}

DefaultOnMessage Default handler for processing incoming events without a handler.

Functions

func AppendIfMissing

func AppendIfMissing(topics []string, topic string) []string

AppendIfMissing check if item missing append item to list.

func CloseClientConnection

func CloseClientConnection(connection quic.Connection, code uint64, err error) error

func DefaultAuthenticationFunc

func DefaultAuthenticationFunc(_ string) bool

DefaultAuthenticationFunc is the default authentication function. it accepts all clients.

func DefaultAuthorizationFunc

func DefaultAuthorizationFunc(_, _ string) bool

DefaultAuthorizationFunc is the default authorization function. it accepts all clients.

func IsSubscribeTopicValid

func IsSubscribeTopicValid(topic string, topics []string) bool

IsSubscribeTopicValid check the subscribed topic exist or matched with client topics.

func NewLogger

func NewLogger() *zap.Logger

func SendError

func SendError(sendStream quic.SendStream, e *Error) error

SendError send input error to client.

func TopicHasWildcard

func TopicHasWildcard(topic string) bool

TopicHasWildcard checks if the topic is a wildcard.

func WriteData

func WriteData(data any, sendStream quic.SendStream) error

WriteData writes data to stream.

Types

type Client

type Client struct {
	Connection quic.Connection
	Token      string
	Topics     []string
	Logger     *zap.Logger
	Finder     Finder

	OnEvent   map[string]func(event []byte)
	OnMessage func(topic string, message []byte)
	OnError   func(code int, data map[string]any)
}

func (*Client) AcceptEvents

func (c *Client) AcceptEvents(reader *bufio.Reader)

AcceptEvents reads events from the stream and calls the proper handler. order of calling handlers is as follows: 1. OnError if topic is "error" 2. OnEvent[topic] 3. OnMessage.

func (*Client) SetErrorHandler

func (c *Client) SetErrorHandler(handler func(code int, data map[string]any))

SetErrorHandler sets the handler for "error" topic.

func (*Client) SetEventHandler

func (c *Client) SetEventHandler(topic string, handler func([]byte))

SetEventHandler sets the handler for the given topic.

func (*Client) SetMessageHandler

func (c *Client) SetMessageHandler(handler func(topic string, message []byte))

SetMessageHandler sets the handler for all topics without handler and "error" topic.

type DistributeWork

type DistributeWork struct {
	Event       []byte
	EventSource *EventSource
}

func NewDistributeWork

func NewDistributeWork(event []byte, eventSource *EventSource) *DistributeWork

type Error

type Error struct {
	Code int            `json:"code,omitempty"`
	Data map[string]any `json:"data,omitempty"`
}

func NewErr

func NewErr(code int, data map[string]any) *Error

func UnmarshalError

func UnmarshalError(bytes []byte) (Error, error)

type Event

type Event struct {
	Topic string `json:"topic,omitempty"`
	Data  []byte `json:"data,omitempty"`
}

func NewEvent

func NewEvent(topic string, data []byte) *Event

type EventSource

type EventSource struct {
	Topic                 string
	DataChannel           chan []byte
	Subscribers           []Subscriber
	IncomingSubscribers   chan Subscriber
	SubscriberWaitingList []Subscriber
	Metrics               Metrics
	Cleaning              *atomic.Bool
	CleaningInterval      time.Duration
}

EventSource is a struct for topic channel and its subscribers.

func NewEventSource

func NewEventSource(
	topic string,
	dataChannel chan []byte,
	subscribers []Subscriber,
	metric Metrics,
	cleaningInterval time.Duration,
) *EventSource

func (*EventSource) CleanCorruptSubscribers

func (e *EventSource) CleanCorruptSubscribers()

func (*EventSource) DistributeEvents

func (e *EventSource) DistributeEvents(worker Worker)

DistributeEvents distribute events from channel between subscribers.

func (*EventSource) HandleNewSubscriber

func (e *EventSource) HandleNewSubscriber()

type Finder

type Finder struct {
	Logger *zap.Logger
}

Finder for topics.

func (*Finder) FindRelatedWildcardTopics

func (f *Finder) FindRelatedWildcardTopics(topic string, topics []string) []string

FindRelatedWildcardTopics find topics patterns that are applicable to the given topic.

func (*Finder) FindTopicsList

func (f *Finder) FindTopicsList(topics []string, pattern string) []string

FindTopicsList find topics that match the topic pattern.

type Metrics

type Metrics struct {
	EventCounter      *prometheus.GaugeVec
	SubscriberCounter *prometheus.GaugeVec
}

func NewMetrics

func NewMetrics(namespace, subSystem string) Metrics

func (Metrics) DecEvent

func (m Metrics) DecEvent(topic string)

func (Metrics) DecSubscriber

func (m Metrics) DecSubscriber(topic string)

func (Metrics) IncEvent

func (m Metrics) IncEvent(topic string)

func (Metrics) IncSubscriber

func (m Metrics) IncSubscriber(topic string)

type Offer

type Offer struct {
	Token  string   `json:"token,omitempty"`
	Topics []string `json:"topics,omitempty"`
}

func AcceptOffer

func AcceptOffer(connection quic.Connection) (*Offer, error)

func NewOffer

func NewOffer(token string, topics []string) Offer

type Server

type Server struct {
	Worker       Worker
	Listener     *quic.Listener
	EventSources map[string]*EventSource
	Topics       []string
	Logger       *zap.Logger
	Finder       Finder

	Authenticator auth.Authenticator
	Authorizer    auth.Authorizer
	Metrics       Metrics

	CleaningInterval time.Duration
}

Server is the main struct for the server.

func (*Server) GenerateEventSources

func (s *Server) GenerateEventSources(topics []string)

GenerateEventSources generates eventSources for each topic.

func (*Server) MetricHandler

func (s *Server) MetricHandler() http.Handler

func (*Server) Publish

func (s *Server) Publish(topic string, event []byte)

Publish publishes an event to all the subscribers of the given topic.

func (*Server) SetAuthenticator

func (s *Server) SetAuthenticator(authenticator auth.Authenticator)

SetAuthenticator replaces the authentication function.

func (*Server) SetAuthenticatorFunc

func (s *Server) SetAuthenticatorFunc(authenticator auth.AuthenticatorFunc)

SetAuthenticatorFunc replaces the authentication function.

func (*Server) SetAuthorizer

func (s *Server) SetAuthorizer(authorizer auth.Authorizer)

SetAuthorizer replaces the authentication function.

func (*Server) SetAuthorizerFunc

func (s *Server) SetAuthorizerFunc(authorizer auth.AuthorizerFunc)

SetAuthorizerFunc replaces the authentication function.

type Subscriber

type Subscriber struct {
	Stream  quic.SendStream
	Corrupt *atomic.Bool
}

func NewSubscriber

func NewSubscriber(stream quic.SendStream) Subscriber

type Worker

type Worker struct {
	Pond   *koi.Pond
	Logger *zap.Logger
}

func NewWorker

func NewWorker(cfg WorkerConfig, l *zap.Logger) Worker

func (*Worker) AddAcceptClientWork

func (w *Worker) AddAcceptClientWork(server *Server, count int)

func (*Worker) AddDistributeWork

func (w *Worker) AddDistributeWork(work *DistributeWork)

type WorkerConfig

type WorkerConfig struct {
	ClientAcceptorCount       int64
	ClientAcceptorQueueSize   int
	EventDistributorCount     int64
	EventDistributorQueueSize int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL