Documentation
¶
Overview ¶
Package transport provides a comprehensive transport abstraction layer for the AG-UI Go SDK.
This package implements a robust, type-safe transport system that enables reliable, bidirectional communication between agents and front-end applications. The transport layer supports multiple protocols, connection management, error handling, and advanced features like streaming, compression, and security.
Key Features:
- Type-safe transport interfaces with comprehensive validation
- Support for multiple transport protocols (WebSocket, HTTP, gRPC)
- Streaming and reliable transport capabilities
- Simplified capabilities system for basic transport characteristics
- Comprehensive error handling and circuit breaker patterns
- Middleware and interceptor support for cross-cutting concerns
- Transport manager with load balancing and failover
- Health checking and performance monitoring
- Security features including TLS, JWT, API keys, and OAuth2
Core Interfaces:
The transport layer is built around several key interfaces:
- Transport: Basic transport operations (connect, send, receive, close)
- StreamingTransport: Real-time bidirectional streaming capabilities
- ReliableTransport: Guaranteed delivery with acknowledgments and retries
- TransportManager: Manages multiple transports with load balancing
- Config: Type-safe configuration with validation
- Middleware: Interceptors for cross-cutting concerns
Basic Capabilities:
The simplified capabilities system provides basic transport characteristics:
- Streaming: Whether the transport supports streaming
- Bidirectional: Whether the transport supports bidirectional communication
- MaxMessageSize: Maximum message size supported
- ProtocolVersion: Version of the transport protocol
Transport Protocols:
- WebSocket: Full-duplex, real-time communication
- HTTP: Request-response with SSE support for streaming
- gRPC: High-performance RPC with bidirectional streaming
- Mock: Testing and development transport
Advanced Features:
- Circuit breakers for fault tolerance
- Automatic reconnection with exponential backoff
- Event filtering and middleware chains
- Comprehensive metrics and health checking
- Load balancing strategies (round-robin, failover, performance-based)
Basic Transport Usage:
import (
"context"
"time"
"github.com/mattsp1290/ag-ui/go-sdk/pkg/transport"
"github.com/mattsp1290/ag-ui/go-sdk/pkg/core/events"
)
// Create type-safe configuration
config := &transport.BasicConfig{
Type: "websocket",
Endpoint: "ws://localhost:8080/ws",
Timeout: 30 * time.Second,
Headers: map[string]string{
"Authorization": "Bearer token123",
},
Secure: true,
}
// Validate configuration
if err := config.Validate(); err != nil {
log.Fatalf("Invalid config: %v", err)
}
// Create transport
transport := transport.NewWebSocketTransport(config)
// Connect
ctx := context.Background()
if err := transport.Connect(ctx); err != nil {
log.Fatalf("Connection failed: %v", err)
}
defer transport.Close(ctx)
// Send type-safe events
event := events.NewRunStartedEvent("thread-123", "run-456")
if err := transport.Send(ctx, event); err != nil {
log.Printf("Send failed: %v", err)
}
// Receive events
go func() {
for event := range transport.Receive() {
log.Printf("Received: %s", event.GetEventType())
}
}()
// Handle errors
go func() {
for err := range transport.Errors() {
log.Printf("Transport error: %v", err)
}
}()
Basic Capabilities:
// Create simple capabilities
capabilities := transport.Capabilities{
Streaming: true,
Bidirectional: true,
MaxMessageSize: 1024 * 1024,
ProtocolVersion: "1.0",
}
// Capabilities are used for basic transport characteristics
// and can be extended via the Extensions field for custom features
Streaming Transport:
streamingTransport := transport.NewGRPCStreamingTransport(config)
// Start bidirectional streaming
send, receive, errors, err := streamingTransport.StartStreaming(ctx)
if err != nil {
log.Fatalf("Streaming failed: %v", err)
}
// Send events via channel
go func() {
event := events.NewTextMessageContentEvent("msg-123", "Hello")
send <- event
}()
// Batch sending for performance
events := []transport.TransportEvent{
events.NewStepStartedEvent("step-1"),
events.NewStepFinishedEvent("step-1"),
}
err = streamingTransport.SendBatch(ctx, events)
Transport Manager with Load Balancing:
manager := transport.NewTransportManager()
// Add multiple transports
manager.AddTransport("primary", primaryTransport)
manager.AddTransport("backup", backupTransport)
// Configure load balancer
manager.SetLoadBalancer(transport.NewFailoverLoadBalancer())
// Send using best available transport
err = manager.SendEvent(ctx, event)
For comprehensive examples, see the examples/ directory and API documentation.
Package transport provides a comprehensive transport abstraction system with composable interfaces organized across multiple focused files.
The interfaces have been refactored into smaller, more focused files: - interfaces_core.go: Core transport interfaces - interfaces_stats.go: Statistics and metrics - interfaces_config.go: Configuration interfaces - interfaces_state.go: Connection state management - interfaces_middleware.go: Middleware and filtering - interfaces_manager.go: Transport management - interfaces_serialization.go: Serialization and compression - interfaces_health.go: Health checking - interfaces_metrics.go: Metrics collection - interfaces_auth.go: Authentication - interfaces_resilience.go: Retry and circuit breaker - interfaces_io.go: I/O abstractions - interfaces_events.go: Transport event types
This organization provides better maintainability while keeping the core Transport interface simple and composable.
Index ¶
- Constants
- Variables
- func AssertErrorReceived(t *testing.T, errorChan <-chan error, timeout time.Duration) error
- func AssertEventReceived(t *testing.T, eventChan <-chan events.Event, timeout time.Duration) events.Event
- func AssertNoError(t *testing.T, errorChan <-chan error, timeout time.Duration)
- func AssertNoEvent(t *testing.T, eventChan <-chan events.Event, timeout time.Duration)
- func AssertTransportConnected(t *testing.T, transport Transport)
- func AssertTransportNotConnected(t *testing.T, transport Transport)
- func BenchmarkConcurrentSend(b *testing.B, transport Transport, concurrency int)
- func BenchmarkMigrationPerformance(b *testing.B)
- func BenchmarkTransport(b *testing.B, transport Transport)
- func ContextAwareSleep(ctx context.Context, duration time.Duration) error
- func CreateMapCleanupFunc[K comparable, V any](m *sync.Map, getTimestamp func(V) time.Time, ttl time.Duration) func() (int, error)
- func CreateSliceCleanupFunc[T any](slice *[]T, mu *sync.RWMutex, isExpired func(T) bool) func() (int, error)
- func CreateTypedConfigError(field string, value interface{}, message string) error
- func EnsureTimeout(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc)
- func ExampleDocumentationGeneration()
- func ExampleMigrationTestUsage()
- func ExampleMigrationUsage()
- func FastSliceGrow[T any](slice []T, targetCap int) []T
- func FormatLogMessage(format string, args ...interface{}) string
- func GetConfigurationErrorField(err error) string
- func GetConfigurationErrorValue(err error) interface{}
- func GetRegisteredTypes() []string
- func GetValidationDepth(ctx context.Context) int
- func GetValidationPath(ctx context.Context) string
- func InternErrorMsg(msg string) string
- func InternEventType(eventType string) string
- func IsConfigurationError(err error) bool
- func IsRegistered(transportType string) bool
- func IsTransportError(err error) bool
- func IsValidationError(err error) bool
- func NewLegacyEvent(id, eventType string, data map[string]interface{}) *legacyEventImpl
- func PropagateDeadline(parent, child context.Context) (context.Context, context.CancelFunc)
- func Register(transportType string, factory TransportFactory) error
- func RetryWithContext(ctx context.Context, maxRetries int, backoff time.Duration, ...) error
- func SetLocalizationProvider(provider LocalizationProvider)
- func TestMigrationScenarios(t *testing.T)
- func TestMockTransportImplementation(t *testing.T)
- func WaitForCondition(t *testing.T, timeout time.Duration, condition func() bool)
- func WaitWithTimeout[T any](ctx context.Context, ch <-chan T, timeout time.Duration) (T, bool, error)
- func WithTimeout(t *testing.T, timeout time.Duration, fn func(ctx context.Context))
- func WithTimeoutExpected(t *testing.T, timeout time.Duration, fn func(ctx context.Context))
- func WithValidationDepth(ctx context.Context) context.Context
- func WithValidationPath(ctx context.Context, path string) context.Context
- type APIDocumentation
- type AckHandler
- type AckHandlerProvider
- type AdvancedMockTransport
- func (t *AdvancedMockTransport) AddMiddleware(mw Middleware)
- func (t *AdvancedMockTransport) Close(ctx context.Context) error
- func (t *AdvancedMockTransport) Connect(ctx context.Context) error
- func (t *AdvancedMockTransport) GetState() ConnectionState
- func (t *AdvancedMockTransport) Send(ctx context.Context, event TransportEvent) error
- func (t *AdvancedMockTransport) SetEventFilter(filter EventFilter)
- func (t *AdvancedMockTransport) SetNetworkConditions(latency, jitter time.Duration, packetLoss float64, bandwidth int64)
- type AggregatorOptions
- type AlertLevel
- type AllOfValidator
- type AnyOfValidator
- type AsynchronousValidator
- type AuthConfig
- type AuthProvider
- type BackpressureConfig
- type BackpressureHandler
- func (h *BackpressureHandler) Channels() (<-chan events.Event, <-chan error)
- func (h *BackpressureHandler) ErrorChan() <-chan error
- func (h *BackpressureHandler) EventChan() <-chan events.Event
- func (h *BackpressureHandler) GetMetrics() BackpressureMetrics
- func (h *BackpressureHandler) SendError(err error) error
- func (h *BackpressureHandler) SendEvent(event events.Event) error
- func (h *BackpressureHandler) Stop()
- type BackpressureMetrics
- type BackpressureStrategy
- type BaseConfig
- type BatchEvent
- type BatchSender
- type BatchValidator
- type BoolValue
- type CacheStats
- type CachedValidator
- func (cv *CachedValidator) ClearCache()
- func (cv *CachedValidator) GetCacheStats() CacheStats
- func (cv *CachedValidator) Validate(ctx context.Context, event TransportEvent) error
- func (cv *CachedValidator) ValidateIncoming(ctx context.Context, event TransportEvent) error
- func (cv *CachedValidator) ValidateOutgoing(ctx context.Context, event TransportEvent) error
- type Capabilities
- type ChaosTransport
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitBreakerState
- type CleanupCompletionRule
- type CleanupConfig
- type CleanupCoordinator
- func (cc *CleanupCoordinator) GetGroupStatus(groupID string) (GroupStatus, error)
- func (cc *CleanupCoordinator) GetStatus() CoordinatorStatus
- func (cc *CleanupCoordinator) RegisterGroup(group *CleanupGroup) error
- func (cc *CleanupCoordinator) Shutdown(ctx context.Context) error
- func (cc *CleanupCoordinator) Wait() error
- type CleanupGroup
- type CleanupManager
- func (cm *CleanupManager) GetMetrics() CleanupMetrics
- func (cm *CleanupManager) RegisterTask(name string, ttl time.Duration, cleanupFunc func() (int, error)) error
- func (cm *CleanupManager) RunTaskNow(name string) error
- func (cm *CleanupManager) Start() error
- func (cm *CleanupManager) Stop() error
- func (cm *CleanupManager) UnregisterTask(name string) error
- type CleanupManagerConfig
- type CleanupMetrics
- type CleanupOperation
- type CleanupPhase
- type CleanupStats
- type CleanupTask
- type CleanupTimeoutRule
- type CleanupTracker
- func (ct *CleanupTracker) Cleanup(ctx context.Context) error
- func (ct *CleanupTracker) GetPhase() CleanupPhase
- func (ct *CleanupTracker) GetStats() CleanupStats
- func (ct *CleanupTracker) Track(id string, resourceType ResourceType, description string, ...)
- func (ct *CleanupTracker) Untrack(id string)
- func (ct *CleanupTracker) Wait() error
- type CleanupValidationConfig
- type CleanupValidationError
- type CleanupValidationResult
- type CleanupValidationRule
- type CleanupValidator
- func (cv *CleanupValidator) AddRule(rule CleanupValidationRule)
- func (cv *CleanupValidator) GetAllValidationResults() map[string]*CleanupValidationResult
- func (cv *CleanupValidator) GetValidationErrors() []CleanupValidationError
- func (cv *CleanupValidator) GetValidationResult(componentID string) (*CleanupValidationResult, bool)
- func (cv *CleanupValidator) ValidateCleanup(ctx context.Context, componentID string, tracker *CleanupTracker) (*CleanupValidationResult, error)
- type CompositeEvent
- type Compressor
- type ConcurrentTest
- type ConditionalEvent
- type ConditionalRule
- type ConditionalValidator
- func (v *ConditionalValidator) AddFieldCondition(name, field string, condition func(interface{}) bool, ...) *ConditionalValidator
- func (v *ConditionalValidator) AddRule(rule ConditionalRule) *ConditionalValidator
- func (v *ConditionalValidator) IsEnabled() bool
- func (v *ConditionalValidator) Name() string
- func (v *ConditionalValidator) Priority() int
- func (v *ConditionalValidator) SetEnabled(enabled bool) *ConditionalValidator
- func (v *ConditionalValidator) SetPriority(priority int) *ConditionalValidator
- func (v *ConditionalValidator) Validate(ctx context.Context, value interface{}) ValidationResult
- type Config
- type ConfigBuilder
- func (b *ConfigBuilder) Build() (Config, error)
- func (b *ConfigBuilder) MustBuild() Config
- func (b *ConfigBuilder) WithAuth(auth *AuthConfig) *ConfigBuilder
- func (b *ConfigBuilder) WithEndpoint(endpoint string) *ConfigBuilder
- func (b *ConfigBuilder) WithHeaders(headers map[string]string) *ConfigBuilder
- func (b *ConfigBuilder) WithMetrics(metrics *MetricsConfig) *ConfigBuilder
- func (b *ConfigBuilder) WithRetry(retry *RetryConfig) *ConfigBuilder
- func (b *ConfigBuilder) WithTLS(tls *tls.Config) *ConfigBuilder
- func (b *ConfigBuilder) WithTimeout(timeout time.Duration) *ConfigBuilder
- type ConfigError
- type ConfigHeaders
- type ConfigImpact
- type ConfigMetadata
- type ConfigProvider
- type ConfigTimeouts
- type ConfigValidation
- type ConfigurationError
- func NewBoolConfigError(field string, value bool, message string) *ConfigurationError[BoolValue]
- func NewConfigurationError[T ErrorValue](field string, value T, message string) *ConfigurationError[T]
- func NewFloatConfigError(field string, value float64, message string) *ConfigurationError[FloatValue]
- func NewGenericConfigError(field string, value interface{}, message string) *ConfigurationError[GenericValue]
- func NewIntConfigError(field string, value int, message string) *ConfigurationError[IntValue]
- func NewNilConfigError(field, message string) *ConfigurationError[NilValue]
- func NewStringConfigError(field, value, message string) *ConfigurationError[StringValue]
- type ConfigurationEventData
- type ConnectionCallback
- type ConnectionCapabilities
- type ConnectionError
- type ConnectionEventData
- type ConnectionEventOption
- func WithAttemptNumber(attempt int) ConnectionEventOption
- func WithCapabilities(capabilities ConnectionCapabilities) ConnectionEventOption
- func WithConnectionError(err string) ConnectionEventOption
- func WithLocalAddress(address string) ConnectionEventOption
- func WithProtocol(protocol string) ConnectionEventOption
- func WithRemoteAddress(address string) ConnectionEventOption
- func WithVersion(version string) ConnectionEventOption
- type ConnectionFactory
- type ConnectionHandler
- type ConnectionPool
- type ConnectionPoolConfig
- type ConnectionState
- type Connector
- type ContextBestPractices
- type ContextConfig
- func (c *ContextConfig) Validate() error
- func (c *ContextConfig) WithConnectTimeout(ctx context.Context) (context.Context, context.CancelFunc)
- func (c *ContextConfig) WithDefaultTimeout(ctx context.Context) (context.Context, context.CancelFunc)
- func (c *ContextConfig) WithReceiveTimeout(ctx context.Context) (context.Context, context.CancelFunc)
- func (c *ContextConfig) WithRetryTimeout(ctx context.Context) (context.Context, context.CancelFunc)
- func (c *ContextConfig) WithSendTimeout(ctx context.Context) (context.Context, context.CancelFunc)
- func (c *ContextConfig) WithShutdownTimeout(ctx context.Context) (context.Context, context.CancelFunc)
- type ContextualEvent
- type CoordinatorConfig
- type CoordinatorState
- type CoordinatorStatus
- type CrossFieldRule
- type CrossFieldValidator
- func (v *CrossFieldValidator) AddComparisonRule(name, field1, field2 string, compare func(a, b interface{}) bool, ...) *CrossFieldValidator
- func (v *CrossFieldValidator) AddRule(rule CrossFieldRule) *CrossFieldValidator
- func (v *CrossFieldValidator) IsEnabled() bool
- func (v *CrossFieldValidator) Name() string
- func (v *CrossFieldValidator) Priority() int
- func (v *CrossFieldValidator) SetCondition(condition func(map[string]interface{}) bool) *CrossFieldValidator
- func (v *CrossFieldValidator) SetEnabled(enabled bool) *CrossFieldValidator
- func (v *CrossFieldValidator) SetPriority(priority int) *CrossFieldValidator
- func (v *CrossFieldValidator) Validate(ctx context.Context, value map[string]interface{}) ValidationResult
- type DataEventData
- type DataEventOption
- func WithChecksum(checksum string) DataEventOption
- func WithCompressed(compressed bool) DataEventOption
- func WithContentType(contentType string) DataEventOption
- func WithEncoding(encoding string) DataEventOption
- func WithSequenceNumber(seq uint64) DataEventOption
- func WithStreamID(streamID string) DataEventOption
- type DataFormatRule
- type DeadlockConfig
- type DeadlockDetector
- func (dd *DeadlockDetector) AcquireResource(resourceID string, ownerID string)
- func (dd *DeadlockDetector) GenerateReport() string
- func (dd *DeadlockDetector) GetAllResources() map[string]*DeadlockResource
- func (dd *DeadlockDetector) GetDeadlockCount() int64
- func (dd *DeadlockDetector) GetResourceInfo(resourceID string) (*DeadlockResource, bool)
- func (dd *DeadlockDetector) GetWaitGraph() map[string][]string
- func (dd *DeadlockDetector) RegisterResource(id string, resourceType ResourceType)
- func (dd *DeadlockDetector) ReleaseResource(resourceID string)
- func (dd *DeadlockDetector) SetDeadlockHandler(handler func(DeadlockInfo))
- func (dd *DeadlockDetector) Start()
- func (dd *DeadlockDetector) Stop()
- func (dd *DeadlockDetector) StopWaitingForResource(resourceID string, waiterID string)
- func (dd *DeadlockDetector) UnregisterResource(id string)
- func (dd *DeadlockDetector) WaitForResource(resourceID string, waiterID string, waitingFor []string)
- type DeadlockInfo
- type DeadlockResource
- type DefaultLocalizationProvider
- type DefaultMiddlewareChain
- func (c *DefaultMiddlewareChain) Add(middleware Middleware)
- func (c *DefaultMiddlewareChain) Clear()
- func (c *DefaultMiddlewareChain) ProcessIncoming(ctx context.Context, event events.Event) (events.Event, error)
- func (c *DefaultMiddlewareChain) ProcessOutgoing(ctx context.Context, event TransportEvent) (TransportEvent, error)
- type DefaultTransportManager
- func (m *DefaultTransportManager) AddTransport(name string, transport Transport) error
- func (m *DefaultTransportManager) Close() error
- func (m *DefaultTransportManager) GetActiveTransports() map[string]Transport
- func (m *DefaultTransportManager) GetMapCleanupMetrics() MapCleanupMetrics
- func (m *DefaultTransportManager) GetTransport(name string) (Transport, error)
- func (m *DefaultTransportManager) ReceiveEvents(ctx context.Context) (<-chan events.Event, error)
- func (m *DefaultTransportManager) RemoveTransport(name string) error
- func (m *DefaultTransportManager) SendEvent(ctx context.Context, event TransportEvent) error
- func (m *DefaultTransportManager) SendEventToTransport(ctx context.Context, transportName string, event TransportEvent) error
- func (m *DefaultTransportManager) SetEventBus(eventBus EventBus)
- func (m *DefaultTransportManager) SetLoadBalancer(balancer LoadBalancer)
- func (m *DefaultTransportManager) SetMiddleware(middleware MiddlewareChain)
- func (m *DefaultTransportManager) Stats() map[string]TransportStats
- func (m *DefaultTransportManager) TriggerManualCleanup()
- type DefaultTransportRegistry
- func (r *DefaultTransportRegistry) Clear()
- func (r *DefaultTransportRegistry) Create(config Config) (Transport, error)
- func (r *DefaultTransportRegistry) CreateWithContext(ctx context.Context, config Config) (Transport, error)
- func (r *DefaultTransportRegistry) GetFactory(transportType string) (TransportFactory, error)
- func (r *DefaultTransportRegistry) GetRegisteredTypes() []string
- func (r *DefaultTransportRegistry) IsRegistered(transportType string) bool
- func (r *DefaultTransportRegistry) Register(transportType string, factory TransportFactory) error
- func (r *DefaultTransportRegistry) Unregister(transportType string) error
- type DefaultValidator
- type DependencyValidator
- func (v *DependencyValidator) AddDependency(field string, requiredFields ...string) *DependencyValidator
- func (v *DependencyValidator) IsEnabled() bool
- func (v *DependencyValidator) Name() string
- func (v *DependencyValidator) Priority() int
- func (v *DependencyValidator) SetCondition(condition func(map[string]interface{}) bool) *DependencyValidator
- func (v *DependencyValidator) SetEnabled(enabled bool) *DependencyValidator
- func (v *DependencyValidator) SetFieldValidator(field string, validator TypedValidator[any]) *DependencyValidator
- func (v *DependencyValidator) SetPriority(priority int) *DependencyValidator
- func (v *DependencyValidator) Validate(ctx context.Context, value map[string]interface{}) ValidationResult
- type DeprecationDoc
- type DeprecationInfo
- type DeprecationWarning
- type DocConfig
- type DocumentationGenerator
- type DynamicSchemaValidator
- func (v *DynamicSchemaValidator) InvalidateCache()
- func (v *DynamicSchemaValidator) IsEnabled() bool
- func (v *DynamicSchemaValidator) Name() string
- func (v *DynamicSchemaValidator) Priority() int
- func (v *DynamicSchemaValidator) SetCacheTimeout(timeout time.Duration) *DynamicSchemaValidator
- func (v *DynamicSchemaValidator) Validate(ctx context.Context, value interface{}) ValidationResult
- type EnvironmentContext
- type ErrorCause
- type ErrorContext
- type ErrorDetails
- type ErrorEventData
- type ErrorEventOption
- func WithErrorCategory(category string) ErrorEventOption
- func WithErrorCode(code string) ErrorEventOption
- func WithErrorDetails(details ErrorDetails) ErrorEventOption
- func WithErrorSeverity(severity string) ErrorEventOption
- func WithRequestID(requestID string) ErrorEventOption
- func WithRetryable(retryable bool) ErrorEventOption
- func WithStackTrace(stackTrace string) ErrorEventOption
- type ErrorField
- type ErrorRetryInfo
- type ErrorSeverity
- type ErrorSimulator
- type ErrorValue
- type EventAggregator
- type EventBus
- type EventCondition
- type EventContext
- type EventData
- type EventFilter
- type EventHandler
- type EventHandlerProvider
- type EventHandlerSlicePool
- type EventRetryPolicy
- type EventRouter
- type EventTypeRule
- type ExampleDoc
- type FastLogger
- type FastValidator
- type Field
- func Any(key string, value interface{}) Field
- func Bool(key string, value bool) Field
- func Duration(key string, value time.Duration) Field
- func Err(err error) Field
- func Error(err error) Field
- func Float64(key string, value float64) Field
- func Int(key string, value int) Field
- func Int64(key string, value int64) Field
- func String(key, value string) Field
- func Time(key string, value time.Time) Field
- type FieldDoc
- type FieldProvider
- type FieldValidatorRule
- type FloatValue
- type FunctionDoc
- type GenericValue
- type GoroutineLeak
- type GoroutineLeakRule
- type GroupState
- type GroupStatus
- type HTTPConfig
- type HTTPConnectionFactory
- type HealthCheckManager
- type HealthChecker
- type HealthStatus
- type Int64Field
- type IntValue
- type InterfaceDoc
- type KeepAliveConfig
- type LeakedResource
- type LegacyConfigurationError
- type LoadBalancer
- type LoadBalancerSetter
- type LocalizationProvider
- type LogEntry
- type LogLevel
- type LogValue
- type Logger
- type LoggerConfig
- type Manager
- func (m *Manager) AddMiddleware(middleware ...Middleware)
- func (m *Manager) Channels() (<-chan events.Event, <-chan error)
- func (m *Manager) Errors() <-chan error
- func (m *Manager) GetActiveTransport() Transport
- func (m *Manager) GetBackpressureMetrics() BackpressureMetrics
- func (m *Manager) GetMetrics() ManagerMetrics
- func (m *Manager) GetValidationConfig() *ValidationConfig
- func (m *Manager) IsValidationEnabled() bool
- func (m *Manager) Receive() <-chan events.Event
- func (m *Manager) Send(ctx context.Context, event TransportEvent) error
- func (m *Manager) SetTransport(transport Transport)
- func (m *Manager) SetValidationConfig(config *ValidationConfig)
- func (m *Manager) SetValidationEnabled(enabled bool)
- func (m *Manager) Start(ctx context.Context) error
- func (m *Manager) Stop(ctx context.Context) error
- type ManagerConfig
- type ManagerMetrics
- type MapCleanupMetrics
- type MapValidator
- func (v *MapValidator[K, V]) IsEnabled() bool
- func (v *MapValidator[K, V]) Name() string
- func (v *MapValidator[K, V]) Priority() int
- func (v *MapValidator[K, V]) SetCondition(condition func(map[K]V) bool) *MapValidator[K, V]
- func (v *MapValidator[K, V]) SetEnabled(enabled bool) *MapValidator[K, V]
- func (v *MapValidator[K, V]) SetKeyValidator(validator TypedValidator[K]) *MapValidator[K, V]
- func (v *MapValidator[K, V]) SetPriority(priority int) *MapValidator[K, V]
- func (v *MapValidator[K, V]) SetRequiredKeys(keys ...K) *MapValidator[K, V]
- func (v *MapValidator[K, V]) SetSizeRange(min, max *int) *MapValidator[K, V]
- func (v *MapValidator[K, V]) SetValueValidator(validator TypedValidator[V]) *MapValidator[K, V]
- func (v *MapValidator[K, V]) Validate(ctx context.Context, value map[K]V) ValidationResult
- type MemoryConfig
- func (c *MemoryConfig) Clone() Config
- func (c *MemoryConfig) GetEndpoint() string
- func (c *MemoryConfig) GetHeaders() map[string]string
- func (c *MemoryConfig) GetTimeout() time.Duration
- func (c *MemoryConfig) GetType() string
- func (c *MemoryConfig) IsSecure() bool
- func (c *MemoryConfig) Validate() error
- type MemoryLeak
- type MemoryLeakRule
- type MemoryManager
- func (mm *MemoryManager) ForceGC()
- func (mm *MemoryManager) GetAdaptiveBufferSize(key string, defaultSize int) int
- func (mm *MemoryManager) GetMemoryPressureLevel() MemoryPressureLevel
- func (mm *MemoryManager) GetMemoryStats() runtime.MemStats
- func (mm *MemoryManager) GetMetrics() MemoryMetrics
- func (mm *MemoryManager) OnMemoryPressure(callback func(MemoryPressureLevel))
- func (mm *MemoryManager) Start()
- func (mm *MemoryManager) Stop()
- type MemoryManagerConfig
- type MemoryMetrics
- type MemoryPressureLevel
- type MemoryTransport
- func (t *MemoryTransport) Channels() (<-chan events.Event, <-chan error)
- func (t *MemoryTransport) Close(ctx context.Context) error
- func (t *MemoryTransport) Config() Config
- func (t *MemoryTransport) Connect(ctx context.Context) error
- func (t *MemoryTransport) Context() context.Context
- func (t *MemoryTransport) Errors() <-chan error
- func (t *MemoryTransport) GetOption(key string) (interface{}, error)
- func (t *MemoryTransport) IsConnected() bool
- func (t *MemoryTransport) Receive() <-chan events.Event
- func (t *MemoryTransport) Send(ctx context.Context, event TransportEvent) error
- func (t *MemoryTransport) SetOption(key string, value interface{}) error
- func (t *MemoryTransport) Stats() TransportStats
- func (t *MemoryTransport) Type() string
- type MessageAttachment
- type MessageEventData
- type MessageQueueConfig
- type MessageSizeRule
- type MethodDoc
- type Metrics
- type MetricsCollector
- type MetricsConfig
- type MetricsEventData
- type MetricsEventOption
- type MetricsManager
- func (m *MetricsManager) AddTransport(name string, transport Transport)
- func (m *MetricsManager) Close() error
- func (m *MetricsManager) RecordError(transportName string, err error)
- func (m *MetricsManager) RecordEvent(transportName string, event any)
- func (m *MetricsManager) RemoveTransport(name string)
- type Middleware
- type MiddlewareChain
- type MigrationConfig
- type MigrationMockEvent
- type MigrationMockTransport
- func (mt *MigrationMockTransport) Channels() (<-chan events.Event, <-chan error)
- func (mt *MigrationMockTransport) Close(ctx context.Context) error
- func (mt *MigrationMockTransport) Config() Config
- func (mt *MigrationMockTransport) Connect(ctx context.Context) error
- func (mt *MigrationMockTransport) IsConnected() bool
- func (mt *MigrationMockTransport) Send(ctx context.Context, event TransportEvent) error
- func (mt *MigrationMockTransport) Stats() TransportStats
- type MigrationReport
- type MigrationRule
- type MigrationTestSuite
- func (mts *MigrationTestSuite) Cleanup()
- func (mts *MigrationTestSuite) TestBackwardCompatibility(oldCode, newCode string)
- func (mts *MigrationTestSuite) TestDeprecationDetection(code string, expectedWarnings []string)
- func (mts *MigrationTestSuite) TestInterfaceComposition()
- func (mts *MigrationTestSuite) TestTransformationRule(ruleName string, input, expected string)
- type MigrationValidator
- type MockEventHandler
- type MockHealthChecker
- type MockManager
- type MockTransport
- func (m *MockTransport) Channels() (<-chan events.Event, <-chan error)
- func (m *MockTransport) Close(ctx context.Context) error
- func (m *MockTransport) Config() Config
- func (m *MockTransport) Connect(ctx context.Context) error
- func (m *MockTransport) Errors() <-chan error
- func (m *MockTransport) GetCallCount(method string) int
- func (m *MockTransport) GetSentEvents() []TransportEvent
- func (m *MockTransport) IsConnected() bool
- func (m *MockTransport) Receive() <-chan events.Event
- func (m *MockTransport) Reset()
- func (m *MockTransport) Send(ctx context.Context, event TransportEvent) error
- func (m *MockTransport) SetCloseBehavior(fn func(ctx context.Context) error)
- func (m *MockTransport) SetConnectBehavior(fn func(ctx context.Context) error)
- func (m *MockTransport) SetConnectDelay(delay time.Duration)
- func (m *MockTransport) SetSendBehavior(fn func(ctx context.Context, event TransportEvent) error)
- func (m *MockTransport) SetSendDelay(delay time.Duration)
- func (m *MockTransport) SimulateError(err error) error
- func (m *MockTransport) SimulateEvent(event events.Event) error
- func (m *MockTransport) Stats() TransportStats
- func (m *MockTransport) WasCalled(method string) bool
- type NilValue
- type NoopLogger
- func (n *NoopLogger) Debug(message string, fields ...Field)
- func (n *NoopLogger) DebugTyped(message string, fields ...FieldProvider)
- func (n *NoopLogger) Error(message string, fields ...Field)
- func (n *NoopLogger) ErrorTyped(message string, fields ...FieldProvider)
- func (n *NoopLogger) Info(message string, fields ...Field)
- func (n *NoopLogger) InfoTyped(message string, fields ...FieldProvider)
- func (n *NoopLogger) Log(level LogLevel, message string, fields ...Field)
- func (n *NoopLogger) LogTyped(level LogLevel, message string, fields ...FieldProvider)
- func (n *NoopLogger) Warn(message string, fields ...Field)
- func (n *NoopLogger) WarnTyped(message string, fields ...FieldProvider)
- func (n *NoopLogger) WithContext(ctx context.Context) Logger
- func (n *NoopLogger) WithFields(fields ...Field) Logger
- func (n *NoopLogger) WithTypedFields(fields ...FieldProvider) Logger
- type Operation
- type OptimizedLogger
- func (l *OptimizedLogger) Debug(message string, fields ...Field)
- func (l *OptimizedLogger) DebugTyped(message string, fields ...FieldProvider)
- func (l *OptimizedLogger) Error(message string, fields ...Field)
- func (l *OptimizedLogger) ErrorTyped(message string, fields ...FieldProvider)
- func (l *OptimizedLogger) Info(message string, fields ...Field)
- func (l *OptimizedLogger) InfoTyped(message string, fields ...FieldProvider)
- func (l *OptimizedLogger) Log(level LogLevel, message string, fields ...Field)
- func (l *OptimizedLogger) LogTyped(level LogLevel, message string, fields ...FieldProvider)
- func (l *OptimizedLogger) Warn(message string, fields ...Field)
- func (l *OptimizedLogger) WarnTyped(message string, fields ...FieldProvider)
- func (l *OptimizedLogger) WithContext(ctx context.Context) Logger
- func (l *OptimizedLogger) WithFields(fields ...Field) Logger
- func (l *OptimizedLogger) WithTypedFields(fields ...FieldProvider) Logger
- type OverflowPolicy
- type ParamDoc
- type PatternValidator
- func MustNewEmailPatternValidator() *PatternValidator
- func MustNewPhonePatternValidator() *PatternValidator
- func MustNewURLPatternValidator() *PatternValidator
- func NewEmailPatternValidator() (*PatternValidator, error)
- func NewPatternValidator(name string) *PatternValidator
- func NewPhonePatternValidator() (*PatternValidator, error)
- func NewURLPatternValidator() (*PatternValidator, error)
- func (v *PatternValidator) AddPattern(name, pattern string, required bool) error
- func (v *PatternValidator) IsEnabled() bool
- func (v *PatternValidator) Name() string
- func (v *PatternValidator) Priority() int
- func (v *PatternValidator) SetAnyMatch(anyMatch bool) *PatternValidator
- func (v *PatternValidator) SetCaseInsensitive(caseInsensitive bool) *PatternValidator
- func (v *PatternValidator) SetCondition(condition func(string) bool) *PatternValidator
- func (v *PatternValidator) SetEnabled(enabled bool) *PatternValidator
- func (v *PatternValidator) SetPriority(priority int) *PatternValidator
- func (v *PatternValidator) Validate(ctx context.Context, value string) ValidationResult
- type PatternValidatorRule
- type PerformanceEventData
- type PerformanceMetricType
- type PerformanceTrend
- type PhaseResult
- type PoolStats
- type PooledConnection
- func (pc *PooledConnection) Close() error
- func (pc *PooledConnection) GetStats() map[string]interface{}
- func (pc *PooledConnection) IsExpired(maxLifetime, maxIdleTime time.Duration) bool
- func (pc *PooledConnection) Return()
- func (pc *PooledConnection) Use()
- func (pc *PooledConnection) Validate() bool
- type PreAllocatedSlices
- type RangeValidator
- func NewFloat64RangeValidator(name string) *RangeValidator[float64]
- func NewIntRangeValidator(name string) *RangeValidator[int]
- func NewRangeValidator[T comparable](name string, comparer func(a, b T) int) *RangeValidator[T]
- func NewStringLengthRangeValidator(name string) *RangeValidator[string]
- func NewTimeRangeValidator(name string) *RangeValidator[time.Time]
- func (v *RangeValidator[T]) IsEnabled() bool
- func (v *RangeValidator[T]) Name() string
- func (v *RangeValidator[T]) Priority() int
- func (v *RangeValidator[T]) SetCondition(condition func(T) bool) *RangeValidator[T]
- func (v *RangeValidator[T]) SetEnabled(enabled bool) *RangeValidator[T]
- func (v *RangeValidator[T]) SetInclusive(inclusive bool) *RangeValidator[T]
- func (v *RangeValidator[T]) SetPriority(priority int) *RangeValidator[T]
- func (v *RangeValidator[T]) SetRange(min, max *T) *RangeValidator[T]
- func (v *RangeValidator[T]) Validate(ctx context.Context, value T) ValidationResult
- type ReadWriter
- type Receiver
- type ReconnectStrategy
- type RecordingTransport
- func (rt *RecordingTransport) Clear()
- func (rt *RecordingTransport) Close(ctx context.Context) error
- func (rt *RecordingTransport) Connect(ctx context.Context) error
- func (rt *RecordingTransport) GetOperations() []Operation
- func (rt *RecordingTransport) Send(ctx context.Context, event TransportEvent) error
- func (rt *RecordingTransport) StartRecording()
- func (rt *RecordingTransport) StopRecording()
- type RecursiveValidator
- func (v *RecursiveValidator[T]) IsEnabled() bool
- func (v *RecursiveValidator[T]) Name() string
- func (v *RecursiveValidator[T]) Priority() int
- func (v *RecursiveValidator[T]) SetCondition(condition func(T) bool) *RecursiveValidator[T]
- func (v *RecursiveValidator[T]) SetEnabled(enabled bool) *RecursiveValidator[T]
- func (v *RecursiveValidator[T]) SetMaxDepth(maxDepth int) *RecursiveValidator[T]
- func (v *RecursiveValidator[T]) SetPriority(priority int) *RecursiveValidator[T]
- func (v *RecursiveValidator[T]) Validate(ctx context.Context, value T) ValidationResult
- type ReliabilityStats
- type ReliabilityStatsProvider
- type ReliableSender
- type ReliableTransport
- type RequestContext
- type RequiredFieldsRule
- type ResourceLeak
- type ResourceLeakRule
- type ResourceType
- type ResourceUsage
- type RetryConfig
- type RetryPolicy
- type ReturnDoc
- type RingBuffer
- func (rb *RingBuffer) Capacity() int
- func (rb *RingBuffer) Clear()
- func (rb *RingBuffer) Close()
- func (rb *RingBuffer) Drain() []events.Event
- func (rb *RingBuffer) GetMetrics() RingBufferMetrics
- func (rb *RingBuffer) IsEmpty() bool
- func (rb *RingBuffer) IsFull() bool
- func (rb *RingBuffer) Pop() (events.Event, error)
- func (rb *RingBuffer) PopWithContext(ctx context.Context) (events.Event, error)
- func (rb *RingBuffer) Push(event events.Event) error
- func (rb *RingBuffer) PushWithContext(ctx context.Context, event events.Event) error
- func (rb *RingBuffer) Size() int
- func (rb *RingBuffer) TryPop() (events.Event, bool)
- type RingBufferConfig
- type RingBufferMetrics
- type RoundRobinLoadBalancer
- type ScenarioTransport
- type Schema
- func (s *Schema) AddDefinition(name string, definition *Schema) *Schema
- func (s *Schema) AddProperty(name string, propertySchema *Schema) *Schema
- func (s *Schema) SetDefault(value interface{}) *Schema
- func (s *Schema) SetDescription(description string) *Schema
- func (s *Schema) SetEnum(values ...interface{}) *Schema
- func (s *Schema) SetFormat(format string) *Schema
- func (s *Schema) SetLengthRange(min, max *int) *Schema
- func (s *Schema) SetNumberRange(min, max *float64, exclusiveMin, exclusiveMax bool) *Schema
- func (s *Schema) SetPattern(pattern string) *Schema
- func (s *Schema) SetRequired(property string) *Schema
- func (s *Schema) SetTitle(title string) *Schema
- type SchemaComposer
- type SchemaMigrator
- func (m *SchemaMigrator) MigrateAndValidate(data interface{}, currentVersion, targetVersion string) ValidationResult
- func (m *SchemaMigrator) RegisterMigration(fromVersion, toVersion string, migrator func(interface{}) (interface{}, error))
- func (m *SchemaMigrator) RegisterSchema(version string, schema *VersionedSchema)
- type SchemaType
- type SchemaValidationOptions
- type SchemaValidator
- func (v *SchemaValidator) AddCustomValidator(name string, validator func(interface{}) error) *SchemaValidator
- func (v *SchemaValidator) AddFormatValidator(format string, validator func(string) bool) *SchemaValidator
- func (v *SchemaValidator) IsEnabled() bool
- func (v *SchemaValidator) Name() string
- func (v *SchemaValidator) Priority() int
- func (v *SchemaValidator) RegisterSchema(id string, schema *Schema) *SchemaValidator
- func (v *SchemaValidator) SetEnabled(enabled bool) *SchemaValidator
- func (v *SchemaValidator) SetOptions(options SchemaValidationOptions) *SchemaValidator
- func (v *SchemaValidator) SetPriority(priority int) *SchemaValidator
- func (v *SchemaValidator) SetSchema(schema *Schema) *SchemaValidator
- func (v *SchemaValidator) SetStrictMode(strict bool) *SchemaValidator
- func (v *SchemaValidator) Validate(ctx context.Context, value interface{}) ValidationResult
- type SecurityContext
- type SecurityEventData
- type SecurityEventType
- type SecuritySeverity
- type Sender
- type SequencedEvent
- type Serializer
- type SimpleManager
- func (m *SimpleManager) Channels() (<-chan events.Event, <-chan error)
- func (m *SimpleManager) Errors() <-chan error
- func (m *SimpleManager) GetBackpressureMetrics() BackpressureMetrics
- func (m *SimpleManager) GetValidationConfig() *ValidationConfig
- func (m *SimpleManager) GetValidationState() (*ValidationConfig, bool)
- func (m *SimpleManager) IsRunning() bool
- func (m *SimpleManager) IsValidationEnabled() bool
- func (m *SimpleManager) Receive() <-chan events.Event
- func (m *SimpleManager) Send(ctx context.Context, event TransportEvent) error
- func (m *SimpleManager) SetTransport(transport Transport)
- func (m *SimpleManager) SetValidationConfig(config *ValidationConfig)
- func (m *SimpleManager) SetValidationEnabled(enabled bool)
- func (m *SimpleManager) Start(ctx context.Context) error
- func (m *SimpleManager) Stop(ctx context.Context) error
- type SimpleTransportEvent
- type Slice
- func (s *Slice) All(f func(interface{}) bool) bool
- func (s *Slice) Any(f func(interface{}) bool) bool
- func (s *Slice) Append(item interface{})
- func (s *Slice) Clear()
- func (s *Slice) Count(f func(interface{}) bool) int
- func (s *Slice) Filter(f func(interface{}) bool) *Slice
- func (s *Slice) Find(f func(interface{}) bool) (interface{}, bool)
- func (s *Slice) FindIndex(f func(interface{}) bool) int
- func (s *Slice) Get(index int) (interface{}, bool)
- func (s *Slice) Len() int
- func (s *Slice) Map(f func(interface{}) interface{}) *Slice
- func (s *Slice) Range(f func(interface{}) bool)
- func (s *Slice) RemoveAt(index int) bool
- func (s *Slice) RemoveFunc(f func(interface{}) bool) bool
- func (s *Slice) ToSlice() []interface{}
- type SliceOps
- type SlicePool
- type SliceStats
- type SliceValidator
- func (v *SliceValidator[T]) IsEnabled() bool
- func (v *SliceValidator[T]) Name() string
- func (v *SliceValidator[T]) Priority() int
- func (v *SliceValidator[T]) SetCondition(condition func([]T) bool) *SliceValidator[T]
- func (v *SliceValidator[T]) SetEnabled(enabled bool) *SliceValidator[T]
- func (v *SliceValidator[T]) SetLengthRange(min, max *int) *SliceValidator[T]
- func (v *SliceValidator[T]) SetPriority(priority int) *SliceValidator[T]
- func (v *SliceValidator[T]) Validate(ctx context.Context, value []T) ValidationResult
- type StateChangeEventData
- type StatsProvider
- type StreamController
- type StreamEventData
- type StreamEventOption
- func WithDirection(direction string) StreamEventOption
- func WithHeaders(headers map[string]string) StreamEventOption
- func WithPriority(priority int) StreamEventOption
- func WithReason(reason string) StreamEventOption
- func WithState(state string) StreamEventOption
- func WithWindowSize(size uint32) StreamEventOption
- type StreamingStats
- type StreamingStatsProvider
- type StreamingTransport
- type StringValue
- type StructValidator
- func (v *StructValidator[T]) AddFieldValidator(field string, validator TypedValidator[any]) *StructValidator[T]
- func (v *StructValidator[T]) IsEnabled() bool
- func (v *StructValidator[T]) Name() string
- func (v *StructValidator[T]) Priority() int
- func (v *StructValidator[T]) SetCondition(condition func(T) bool) *StructValidator[T]
- func (v *StructValidator[T]) SetEnabled(enabled bool) *StructValidator[T]
- func (v *StructValidator[T]) SetPriority(priority int) *StructValidator[T]
- func (v *StructValidator[T]) SetRequired(field string, required bool) *StructValidator[T]
- func (v *StructValidator[T]) Validate(ctx context.Context, value T) ValidationResult
- type SystemEventData
- type SystemEventType
- type TaskMetrics
- type TestConfig
- type TestEvent
- type TestFixture
- type TestManagerHelper
- func (h *TestManagerHelper) Cleanup()
- func (h *TestManagerHelper) CreateAdvancedTransport() *AdvancedMockTransport
- func (h *TestManagerHelper) CreateManager() *SimpleManager
- func (h *TestManagerHelper) CreateManagerWithBackpressure(config BackpressureConfig) *SimpleManager
- func (h *TestManagerHelper) CreateTransport() Transport
- func (h *TestManagerHelper) RunWithTimeout(timeout time.Duration, testFunc func()) bool
- func (h *TestManagerHelper) WaitForCompletion(timeout time.Duration) bool
- type ThreatLevel
- type TimedEvent
- type TokenUsage
- type TrackedResource
- type Transport
- type TransportAggregatedStatsProvider
- type TransportConfiguration
- type TransportConnection
- type TransportError
- type TransportEvent
- type TransportEventAdapter
- type TransportEventHandler
- type TransportEventImpl
- type TransportEventReceiver
- type TransportEventSender
- type TransportEventStruct
- type TransportEventType
- type TransportFactory
- type TransportLifecycle
- type TransportLoadBalancerManager
- type TransportManager
- type TransportManagerConfig
- type TransportMetrics
- type TransportMigrator
- type TransportMultiManager
- type TransportRegistry
- type TransportRegistryInterface
- type TransportStatistics
- type TransportStats
- type TypeDoc
- type TypedField
- func SafeBool(key string, value bool) TypedField[bool]
- func SafeDuration(key string, value time.Duration) TypedField[time.Duration]
- func SafeFloat32(key string, value float32) TypedField[float32]
- func SafeFloat64(key string, value float64) TypedField[float64]
- func SafeInt(key string, value int) TypedField[int]
- func SafeInt16(key string, value int16) TypedField[int16]
- func SafeInt32(key string, value int32) TypedField[int32]
- func SafeInt8(key string, value int8) TypedField[int8]
- func SafeString(key, value string) TypedField[string]
- func SafeTime(key string, value time.Time) TypedField[time.Time]
- func SafeUint(key string, value uint) TypedField[uint]
- func SafeUint16(key string, value uint16) TypedField[uint16]
- func SafeUint32(key string, value uint32) TypedField[uint32]
- func SafeUint64(key string, value uint64) TypedField[uint64]
- func SafeUint8(key string, value uint8) TypedField[uint8]
- func TypedValue[T LogValue](key string, value T) TypedField[T]
- type TypedTransportEvent
- func CreateConfigurationEvent(id string, data *ConfigurationEventData) TypedTransportEvent[*ConfigurationEventData]
- func CreateConnectionEvent(id string, status string, options ...interface{}) TypedTransportEvent[ConnectionEventData]
- func CreateConnectionEventWithBuilder(id string, status string, builder func(*ConnectionEventData)) TypedTransportEvent[ConnectionEventData]
- func CreateDataEvent(id string, content []byte, options ...interface{}) TypedTransportEvent[DataEventData]
- func CreateDataEventWithBuilder(id string, content []byte, builder func(*DataEventData)) TypedTransportEvent[DataEventData]
- func CreateErrorEvent(id string, message string, options ...interface{}) TypedTransportEvent[ErrorEventData]
- func CreateMessageEvent(id string, data *MessageEventData) TypedTransportEvent[*MessageEventData]
- func CreateMetricsEvent(id string, metricName string, value float64, options ...MetricsEventOption) TypedTransportEvent[MetricsEventData]
- func CreatePerformanceEvent(id string, data *PerformanceEventData) TypedTransportEvent[*PerformanceEventData]
- func CreateSecurityEvent(id string, data *SecurityEventData) TypedTransportEvent[*SecurityEventData]
- func CreateStateChangeEvent(id string, data *StateChangeEventData) TypedTransportEvent[*StateChangeEventData]
- func CreateStreamEvent(id string, streamID string, action string, options ...StreamEventOption) TypedTransportEvent[StreamEventData]
- func CreateSystemEvent(id string, data *SystemEventData) TypedTransportEvent[*SystemEventData]
- func NewBatchEvent[T EventData](id string, data BatchEvent[T]) TypedTransportEvent[BatchEvent[T]]
- func NewConditionalEvent[T EventData](id string, data ConditionalEvent[T]) TypedTransportEvent[ConditionalEvent[T]]
- func NewContextualEvent[T EventData](id string, data ContextualEvent[T]) TypedTransportEvent[ContextualEvent[T]]
- func NewSequencedEvent[T EventData](id string, data SequencedEvent[T]) TypedTransportEvent[SequencedEvent[T]]
- func NewTimedEvent[T EventData](id string, data TimedEvent[T]) TypedTransportEvent[TimedEvent[T]]
- func NewTypedEvent[T EventData](id, eventType string, data T) TypedTransportEvent[T]
- func ToTypedEvent[T EventData](event TransportEvent, constructor func(map[string]interface{}) (T, error)) (TypedTransportEvent[T], error)
- func TryGetTypedEvent[T EventData](event TransportEvent) TypedTransportEvent[T]
- type TypedValidator
- type URLValidator
- func (v *URLValidator) IsEnabled() bool
- func (v *URLValidator) Name() string
- func (v *URLValidator) Priority() int
- func (v *URLValidator) SetEnabled(enabled bool) *URLValidator
- func (v *URLValidator) SetOptions(opts common.URLValidationOptions) *URLValidator
- func (v *URLValidator) SetPriority(priority int) *URLValidator
- func (v *URLValidator) Validate(ctx context.Context, value string) ValidationResult
- type UnionValidator
- func (v *UnionValidator[T]) AddValidator(validator TypedValidator[any]) *UnionValidator[T]
- func (v *UnionValidator[T]) IsEnabled() bool
- func (v *UnionValidator[T]) Name() string
- func (v *UnionValidator[T]) Priority() int
- func (v *UnionValidator[T]) SetCondition(condition func(T) bool) *UnionValidator[T]
- func (v *UnionValidator[T]) SetEnabled(enabled bool) *UnionValidator[T]
- func (v *UnionValidator[T]) SetPriority(priority int) *UnionValidator[T]
- func (v *UnionValidator[T]) Validate(ctx context.Context, value T) ValidationResult
- type UnsafeSliceOps
- type UserContext
- type ValidatedTransportEvent
- type ValidationConfig
- type ValidationContextKey
- type ValidationError
- type ValidationIssue
- func (i *ValidationIssue) AddTag(tag string) *ValidationIssue
- func (i *ValidationIssue) Error() string
- func (i *ValidationIssue) IsError() bool
- func (i *ValidationIssue) IsFatal() bool
- func (i *ValidationIssue) IsWarning() bool
- func (i *ValidationIssue) String() string
- func (i *ValidationIssue) WithCategory(category string) *ValidationIssue
- func (i *ValidationIssue) WithCode(code string) *ValidationIssue
- func (i *ValidationIssue) WithContext(key string, value interface{}) *ValidationIssue
- func (i *ValidationIssue) WithDetail(key string, value interface{}) *ValidationIssue
- func (i *ValidationIssue) WithExpectedValue(expected interface{}) *ValidationIssue
- func (i *ValidationIssue) WithField(field string) *ValidationIssue
- func (i *ValidationIssue) WithRuleID(ruleID string) *ValidationIssue
- func (i *ValidationIssue) WithSuggestion(suggestion string) *ValidationIssue
- func (i *ValidationIssue) WithTags(tags ...string) *ValidationIssue
- func (i *ValidationIssue) WithValidator(validator string) *ValidationIssue
- func (i *ValidationIssue) WithValue(value interface{}) *ValidationIssue
- type ValidationMetrics
- type ValidationMiddleware
- func (m *ValidationMiddleware) GetMetrics() ValidationMetrics
- func (m *ValidationMiddleware) IsEnabled() bool
- func (m *ValidationMiddleware) Name() string
- func (m *ValidationMiddleware) ProcessIncoming(ctx context.Context, event events.Event) (events.Event, error)
- func (m *ValidationMiddleware) ProcessOutgoing(ctx context.Context, event TransportEvent) (TransportEvent, error)
- func (m *ValidationMiddleware) ResetMetrics()
- func (m *ValidationMiddleware) SetEnabled(enabled bool)
- func (m *ValidationMiddleware) UpdateConfig(config *ValidationConfig)
- func (m *ValidationMiddleware) Wrap(transport Transport) Transport
- type ValidationPool
- type ValidationResult
- func (r *ValidationResult) AddError(err error)
- func (r *ValidationResult) AddFieldError(field string, err error)
- func (r *ValidationResult) AddFieldInfo(field, message string)
- func (r *ValidationResult) AddFieldWarning(field, message string)
- func (r *ValidationResult) AddInfo(message string)
- func (r *ValidationResult) AddIssue(issue *ValidationIssue)
- func (r *ValidationResult) AddWarning(message string)
- func (r *ValidationResult) Error() string
- func (r *ValidationResult) Errors() []error
- func (r *ValidationResult) FieldErrors() map[string][]error
- func (r *ValidationResult) FieldIssues(field string) []*ValidationIssue
- func (r *ValidationResult) Filter(filter func(*ValidationIssue) bool) ValidationResult
- func (r *ValidationResult) FilterByCategory(category string) []*ValidationIssue
- func (r *ValidationResult) FilterByField(field string) []*ValidationIssue
- func (r *ValidationResult) FilterBySeverity(severity ValidationSeverity) []*ValidationIssue
- func (r *ValidationResult) FilterByValidator(validator string) []*ValidationIssue
- func (r *ValidationResult) GetMetadata(key string) (interface{}, bool)
- func (r *ValidationResult) GetSummary() *ValidationSummary
- func (r *ValidationResult) IsValid() bool
- func (r *ValidationResult) Issues() []*ValidationIssue
- func (r *ValidationResult) Merge(other ValidationResult)
- func (r *ValidationResult) SetValid(valid bool)
- func (r *ValidationResult) ToJSON() ([]byte, error)
- func (r *ValidationResult) Warnings() []*ValidationIssue
- func (r *ValidationResult) WithMetadata(key string, value interface{}) *ValidationResult
- type ValidationResultAggregator
- type ValidationRule
- type ValidationSeverity
- type ValidationSummary
- type ValidationTransport
- func (vt *ValidationTransport) Channels() (<-chan events.Event, <-chan error)
- func (vt *ValidationTransport) GetValidationMetrics() ValidationMetrics
- func (vt *ValidationTransport) Send(ctx context.Context, event TransportEvent) error
- func (vt *ValidationTransport) UpdateValidationConfig(config *ValidationConfig)
- type Validator
- type VersionedSchema
- type WebSocketConfig
- type WebSocketConnectionFactory
- type WeightedLoadBalancer
Examples ¶
Constants ¶
const ( EventTypeBatch = "batch" EventTypeSequenced = "sequenced" EventTypeConditional = "conditional" EventTypeTimed = "timed" EventTypeContextual = "contextual" )
Event type constants for composite events
const ( BatchStatusPending = "pending" BatchStatusProcessing = "processing" BatchStatusCompleted = "completed" BatchStatusFailed = "failed" BatchStatusCancelled = "cancelled" )
BatchStatus constants
const ( ConditionalActionDiscard = "discard" ConditionalActionDelay = "delay" ConditionalActionLog = "log" ConditionalActionAlert = "alert" ConditionalActionFallback = "fallback" )
ConditionalAction constants
const ( ExpiryActionDiscard = "discard" ExpiryActionLog = "log" ExpiryActionAlert = "alert" ExpiryActionRetry = "retry" )
ExpiryAction constants
const ( CompressionGzip = "gzip" CompressionNone = "none" )
CompressionType constants
const ( SecurityTLS = "tls" SecurityNone = "none" )
SecurityFeature constants
Variables ¶
var ( // ErrNotConnected is returned when an operation is attempted on a disconnected transport ErrNotConnected = errors.New("transport not connected") // ErrAlreadyConnected is returned when Connect is called on an already connected transport ErrAlreadyConnected = errors.New("transport already connected") // ErrConnectionFailed is returned when a connection attempt fails ErrConnectionFailed = errors.New("failed to establish connection") // ErrConnectionClosed is returned when the connection is closed unexpectedly ErrConnectionClosed = errors.New("connection closed") // ErrTimeout is returned when an operation times out ErrTimeout = errors.New("operation timed out") // ErrMessageTooLarge is returned when a message exceeds the transport's size limit ErrMessageTooLarge = errors.New("message too large") // ErrUnsupportedCapability is returned when a requested capability is not supported ErrUnsupportedCapability = errors.New("unsupported capability") // ErrTransportNotFound is returned when a requested transport is not registered ErrTransportNotFound = errors.New("transport not found") // ErrInvalidConfiguration is returned when transport configuration is invalid ErrInvalidConfiguration = errors.New("invalid configuration") // ErrStreamNotFound is returned when a requested stream does not exist ErrStreamNotFound = errors.New("stream not found") // ErrStreamClosed is returned when an operation is attempted on a closed stream ErrStreamClosed = errors.New("stream closed") // ErrReconnectFailed is returned when all reconnection attempts fail ErrReconnectFailed = errors.New("reconnection failed") // ErrHealthCheckFailed is returned when a health check fails ErrHealthCheckFailed = errors.New("health check failed") // ErrBackpressureActive is returned when backpressure is active and blocking operations ErrBackpressureActive = errors.New("backpressure active") // ErrBackpressureTimeout is returned when backpressure timeout is exceeded ErrBackpressureTimeout = errors.New("backpressure timeout exceeded") // ErrValidationFailed is returned when message validation fails ErrValidationFailed = errors.New("message validation failed") // ErrInvalidMessageSize is returned when message size exceeds limits ErrInvalidMessageSize = errors.New("message size exceeds limits") // ErrMissingRequiredFields is returned when required fields are missing ErrMissingRequiredFields = errors.New("missing required fields") // ErrInvalidEventType is returned when event type is not allowed ErrInvalidEventType = errors.New("invalid event type") // ErrInvalidDataFormat is returned when data format is invalid ErrInvalidDataFormat = errors.New("invalid data format") // ErrFieldValidationFailed is returned when field validation fails ErrFieldValidationFailed = errors.New("field validation failed") // ErrPatternValidationFailed is returned when pattern validation fails ErrPatternValidationFailed = errors.New("pattern validation failed") // ErrAlreadyStarted is returned when trying to start an already started component ErrAlreadyStarted = errors.New("already started") // ErrInvalidTaskName is returned when a task name is invalid ErrInvalidTaskName = errors.New("invalid task name") // ErrInvalidCleanupFunc is returned when a cleanup function is invalid ErrInvalidCleanupFunc = errors.New("invalid cleanup function") // ErrTaskNotFound is returned when a cleanup task is not found ErrTaskNotFound = errors.New("task not found") )
Common transport errors
var ( StringSlicePool = NewSlicePool[string]() IntSlicePool = NewSlicePool[int]() ByteSlicePool = NewSlicePool[byte]() EventSlicePool = NewSlicePool[interface{}]() // For events )
Common slice pools for frequently used types
Functions ¶
func AssertErrorReceived ¶
AssertErrorReceived asserts that an error is received within the timeout
func AssertEventReceived ¶
func AssertEventReceived(t *testing.T, eventChan <-chan events.Event, timeout time.Duration) events.Event
AssertEventReceived asserts that an event is received within the timeout
func AssertNoError ¶
AssertNoError asserts that no error is received within the timeout
func AssertNoEvent ¶
AssertNoEvent asserts that no event is received within the timeout
func AssertTransportConnected ¶
AssertTransportConnected asserts that a transport is connected
func AssertTransportNotConnected ¶
AssertTransportNotConnected asserts that a transport is not connected
func BenchmarkConcurrentSend ¶
BenchmarkConcurrentSend benchmarks concurrent send operations
func BenchmarkMigrationPerformance ¶
BenchmarkMigrationPerformance benchmarks the migration tool performance
func BenchmarkTransport ¶
BenchmarkTransport runs standard transport benchmarks
func ContextAwareSleep ¶
ContextAwareSleep performs a context-aware sleep that can be cancelled
func CreateMapCleanupFunc ¶
func CreateMapCleanupFunc[K comparable, V any]( m *sync.Map, getTimestamp func(V) time.Time, ttl time.Duration, ) func() (int, error)
CreateMapCleanupFunc creates a cleanup function for maps with timestamps
func CreateSliceCleanupFunc ¶
func CreateSliceCleanupFunc[T any]( slice *[]T, mu *sync.RWMutex, isExpired func(T) bool, ) func() (int, error)
CreateSliceCleanupFunc creates a cleanup function for slices with expiration
func CreateTypedConfigError ¶
CreateTypedConfigError creates a type-safe configuration error from an interface{} value
func EnsureTimeout ¶
func EnsureTimeout(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc)
EnsureTimeout ensures that a context has a timeout, adding one if necessary
func ExampleDocumentationGeneration ¶
func ExampleDocumentationGeneration()
ExampleDocumentationGeneration demonstrates how to use the documentation generator
func ExampleMigrationTestUsage ¶
func ExampleMigrationTestUsage()
ExampleMigrationTestUsage demonstrates how to use the migration test helpers
func ExampleMigrationUsage ¶
func ExampleMigrationUsage()
Example usage functions for the migration tool
func FastSliceGrow ¶
FastSliceGrow grows a slice to the target capacity efficiently
func FormatLogMessage ¶
FormatLogMessage efficiently formats a log message using a pooled buffer
func GetConfigurationErrorField ¶
GetConfigurationErrorField extracts the field name from any configuration error type
func GetConfigurationErrorValue ¶
func GetConfigurationErrorValue(err error) interface{}
GetConfigurationErrorValue extracts the value from any configuration error type as interface{}
func GetRegisteredTypes ¶
func GetRegisteredTypes() []string
GetRegisteredTypes returns all registered transport types from the global registry.
func GetValidationDepth ¶
GetValidationDepth extracts validation depth from context
func GetValidationPath ¶
GetValidationPath extracts validation path from context
func InternErrorMsg ¶
InternErrorMsg interns an error message
func InternEventType ¶
InternEventType interns an event type string
func IsConfigurationError ¶
IsConfigurationError checks if an error is any type of configuration error
func IsRegistered ¶
IsRegistered checks if a transport type is registered in the global registry.
func IsTransportError ¶
IsTransportError checks if an error is a TransportError
func IsValidationError ¶
IsValidationError checks if an error is a validation error
func NewLegacyEvent ¶
NewLegacyEvent creates a new legacy transport event
func PropagateDeadline ¶
PropagateDeadline propagates the deadline from parent to child context if child has no deadline or a later deadline
func Register ¶
func Register(transportType string, factory TransportFactory) error
Register registers a transport factory globally.
func RetryWithContext ¶
func RetryWithContext(ctx context.Context, maxRetries int, backoff time.Duration, operation func(context.Context) error) error
RetryWithContext performs an operation with retries and context support
func SetLocalizationProvider ¶
func SetLocalizationProvider(provider LocalizationProvider)
SetLocalizationProvider sets the global localization provider
func TestMigrationScenarios ¶
TestMigrationScenarios tests various migration scenarios
func TestMockTransportImplementation ¶
TestMockTransportImplementation tests that mock transport correctly implements interfaces
func WaitForCondition ¶
WaitForCondition waits for a condition to be true
func WaitWithTimeout ¶
func WaitWithTimeout[T any](ctx context.Context, ch <-chan T, timeout time.Duration) (T, bool, error)
WaitWithTimeout waits for a channel with a timeout
func WithTimeout ¶
WithTimeout runs a function with a timeout
func WithTimeoutExpected ¶
WithTimeoutExpected runs a function with a timeout but does not fail the test on timeout. This is useful when testing timeout behavior where a timeout is the expected outcome. The function will be called with a context that has the specified timeout. The test can check ctx.Err() to verify if the timeout occurred.
func WithValidationDepth ¶
WithValidationDepth increments validation depth in context
Types ¶
type APIDocumentation ¶
type APIDocumentation struct {
PackageName string `json:"package_name"`
Synopsis string `json:"synopsis"`
Description string `json:"description"`
Interfaces []InterfaceDoc `json:"interfaces"`
Types []TypeDoc `json:"types"`
Functions []FunctionDoc `json:"functions"`
Examples []ExampleDoc `json:"examples"`
Deprecations []DeprecationDoc `json:"deprecations"`
GeneratedAt time.Time `json:"generated_at"`
Version string `json:"version"`
}
APIDocumentation represents the complete API documentation
type AckHandler ¶
AckHandler is a callback function for handling acknowledgments.
type AckHandlerProvider ¶
type AckHandlerProvider interface {
// SetAckHandler sets a callback for handling acknowledgments.
SetAckHandler(handler AckHandler)
}
AckHandlerProvider allows setting acknowledgment handlers
type AdvancedMockTransport ¶
type AdvancedMockTransport struct {
*MockTransport
// contains filtered or unexported fields
}
AdvancedMockTransport provides advanced testing capabilities with state machine, network simulation, and detailed behavior control
func NewAdvancedMockTransport ¶
func NewAdvancedMockTransport() *AdvancedMockTransport
NewAdvancedMockTransport creates a new advanced mock transport
func (*AdvancedMockTransport) AddMiddleware ¶
func (t *AdvancedMockTransport) AddMiddleware(mw Middleware)
AddMiddleware adds middleware to the transport
func (*AdvancedMockTransport) Close ¶
func (t *AdvancedMockTransport) Close(ctx context.Context) error
Close with state machine support
func (*AdvancedMockTransport) Connect ¶
func (t *AdvancedMockTransport) Connect(ctx context.Context) error
Connect with state machine support
func (*AdvancedMockTransport) GetState ¶
func (t *AdvancedMockTransport) GetState() ConnectionState
GetState returns the current connection state
func (*AdvancedMockTransport) Send ¶
func (t *AdvancedMockTransport) Send(ctx context.Context, event TransportEvent) error
Send with network simulation
func (*AdvancedMockTransport) SetEventFilter ¶
func (t *AdvancedMockTransport) SetEventFilter(filter EventFilter)
SetEventFilter sets an event filter
func (*AdvancedMockTransport) SetNetworkConditions ¶
func (t *AdvancedMockTransport) SetNetworkConditions(latency, jitter time.Duration, packetLoss float64, bandwidth int64)
SetNetworkConditions configures network simulation parameters
type AggregatorOptions ¶
type AggregatorOptions struct {
// StopOnFirstError stops aggregation when the first error is encountered
StopOnFirstError bool
// MaxIssues limits the total number of issues to collect
MaxIssues int
// IncludeWarnings includes warning-level issues in the aggregation
IncludeWarnings bool
// IncludeInfo includes info-level issues in the aggregation
IncludeInfo bool
// DuplicateDetection enables detection and removal of duplicate issues
DuplicateDetection bool
}
AggregatorOptions configures how validation results are aggregated
type AlertLevel ¶
type AlertLevel string
const ( AlertLevelNone AlertLevel = "none" AlertLevelInfo AlertLevel = "info" AlertLevelWarning AlertLevel = "warning" AlertLevelCritical AlertLevel = "critical" )
type AllOfValidator ¶
type AllOfValidator[T any] struct { // contains filtered or unexported fields }
AllOfValidator requires all validators to pass
func NewAllOfValidator ¶
func NewAllOfValidator[T any](name string, validators ...TypedValidator[T]) *AllOfValidator[T]
NewAllOfValidator creates a validator that requires all sub-validators to pass
func (*AllOfValidator[T]) IsEnabled ¶
func (v *AllOfValidator[T]) IsEnabled() bool
IsEnabled returns whether the validator is enabled
func (*AllOfValidator[T]) Name ¶
func (v *AllOfValidator[T]) Name() string
Name returns the validator name
func (*AllOfValidator[T]) Priority ¶
func (v *AllOfValidator[T]) Priority() int
Priority returns the validator priority
func (*AllOfValidator[T]) Validate ¶
func (v *AllOfValidator[T]) Validate(ctx context.Context, value T) ValidationResult
Validate validates that all sub-validators pass
type AnyOfValidator ¶
type AnyOfValidator[T any] struct { // contains filtered or unexported fields }
AnyOfValidator requires at least one validator to pass
func NewAnyOfValidator ¶
func NewAnyOfValidator[T any](name string, validators ...TypedValidator[T]) *AnyOfValidator[T]
NewAnyOfValidator creates a validator that requires at least one sub-validator to pass
func (*AnyOfValidator[T]) IsEnabled ¶
func (v *AnyOfValidator[T]) IsEnabled() bool
IsEnabled returns whether the validator is enabled
func (*AnyOfValidator[T]) Name ¶
func (v *AnyOfValidator[T]) Name() string
Name returns the validator name
func (*AnyOfValidator[T]) Priority ¶
func (v *AnyOfValidator[T]) Priority() int
Priority returns the validator priority
func (*AnyOfValidator[T]) Validate ¶
func (v *AnyOfValidator[T]) Validate(ctx context.Context, value T) ValidationResult
Validate validates that at least one sub-validator passes
type AsynchronousValidator ¶
type AsynchronousValidator struct {
// contains filtered or unexported fields
}
AsynchronousValidator validates events asynchronously
func NewAsynchronousValidator ¶
func NewAsynchronousValidator(validator Validator, workers int, queueSize int) *AsynchronousValidator
NewAsynchronousValidator creates a new asynchronous validator
func (*AsynchronousValidator) Close ¶
func (av *AsynchronousValidator) Close()
Close closes the asynchronous validator
func (*AsynchronousValidator) GetResult ¶
func (av *AsynchronousValidator) GetResult() (string, error, bool)
GetResult gets a validation result
func (*AsynchronousValidator) ValidateAsync ¶
func (av *AsynchronousValidator) ValidateAsync(event TransportEvent, id string) error
ValidateAsync validates an event asynchronously
type AuthConfig ¶
type AuthConfig struct {
// Type specifies the authentication type
Type string `json:"type"`
// Token specifies the authentication token
Token string `json:"token,omitempty"`
// Username specifies the username for basic auth
Username string `json:"username,omitempty"`
// Password specifies the password for basic auth
Password string `json:"password,omitempty"`
// APIKey specifies the API key
APIKey string `json:"api_key,omitempty"`
// APISecret specifies the API secret
APISecret string `json:"api_secret,omitempty"`
// TokenURL specifies the token endpoint URL
TokenURL string `json:"token_url,omitempty"`
// RefreshURL specifies the refresh endpoint URL
RefreshURL string `json:"refresh_url,omitempty"`
// RefreshToken specifies the refresh token
RefreshToken string `json:"refresh_token,omitempty"`
// ClientID specifies the OAuth client ID
ClientID string `json:"client_id,omitempty"`
// ClientSecret specifies the OAuth client secret
ClientSecret string `json:"client_secret,omitempty"`
// Scopes specifies the OAuth scopes
Scopes []string `json:"scopes,omitempty"`
// ExpiresAt specifies when the token expires
ExpiresAt time.Time `json:"expires_at,omitempty"`
// CustomHeaders specifies custom authentication headers
CustomHeaders map[string]string `json:"custom_headers,omitempty"`
}
AuthConfig contains authentication configuration.
func (*AuthConfig) Clone ¶
func (c *AuthConfig) Clone() *AuthConfig
Clone creates a deep copy of the authentication configuration.
func (*AuthConfig) Validate ¶
func (c *AuthConfig) Validate() error
Validate validates the authentication configuration.
type AuthProvider ¶
type AuthProvider interface {
// GetCredentials returns authentication credentials.
GetCredentials(ctx context.Context) (map[string]string, error)
// RefreshCredentials refreshes authentication credentials.
RefreshCredentials(ctx context.Context) error
// IsValid returns true if the credentials are valid.
IsValid() bool
// ExpiresAt returns when the credentials expire.
ExpiresAt() time.Time
}
AuthProvider handles authentication for transport connections.
type BackpressureConfig ¶
type BackpressureConfig struct {
// Strategy defines the backpressure strategy to use
Strategy BackpressureStrategy `yaml:"strategy" json:"strategy" default:"none"`
// BufferSize is the size of the event buffer
BufferSize int `yaml:"buffer_size" json:"buffer_size" default:"1024"`
// HighWaterMark is the percentage of buffer fullness that triggers backpressure
HighWaterMark float64 `yaml:"high_water_mark" json:"high_water_mark" default:"0.8"`
// LowWaterMark is the percentage of buffer fullness that releases backpressure
LowWaterMark float64 `yaml:"low_water_mark" json:"low_water_mark" default:"0.2"`
// BlockTimeout is the maximum time to block when using block_timeout strategy
BlockTimeout time.Duration `yaml:"block_timeout" json:"block_timeout" default:"5s"`
// EnableMetrics enables backpressure metrics collection
EnableMetrics bool `yaml:"enable_metrics" json:"enable_metrics" default:"true"`
}
BackpressureConfig configures backpressure handling
type BackpressureHandler ¶
type BackpressureHandler struct {
// contains filtered or unexported fields
}
BackpressureHandler manages backpressure for event channels
func NewBackpressureHandler ¶
func NewBackpressureHandler(config BackpressureConfig) *BackpressureHandler
NewBackpressureHandler creates a new backpressure handler
func (*BackpressureHandler) Channels ¶
func (h *BackpressureHandler) Channels() (<-chan events.Event, <-chan error)
Channels returns both event and error channels together
func (*BackpressureHandler) ErrorChan ¶
func (h *BackpressureHandler) ErrorChan() <-chan error
ErrorChan returns the error channel Deprecated: Use Channels() instead for consistency
func (*BackpressureHandler) EventChan ¶
func (h *BackpressureHandler) EventChan() <-chan events.Event
EventChan returns the event channel Deprecated: Use Channels() instead for consistency
func (*BackpressureHandler) GetMetrics ¶
func (h *BackpressureHandler) GetMetrics() BackpressureMetrics
GetMetrics returns the current backpressure metrics
func (*BackpressureHandler) SendError ¶
func (h *BackpressureHandler) SendError(err error) error
SendError sends an error through the backpressure handler
func (*BackpressureHandler) SendEvent ¶
func (h *BackpressureHandler) SendEvent(event events.Event) error
SendEvent sends an event through the backpressure handler
func (*BackpressureHandler) Stop ¶
func (h *BackpressureHandler) Stop()
Stop stops the backpressure handler
type BackpressureMetrics ¶
type BackpressureMetrics struct {
EventsDropped uint64 `json:"events_dropped"`
EventsBlocked uint64 `json:"events_blocked"`
BlockedDuration time.Duration `json:"blocked_duration"`
CurrentBufferSize int `json:"current_buffer_size"`
MaxBufferSize int `json:"max_buffer_size"`
HighWaterMarkHits uint64 `json:"high_water_mark_hits"`
LowWaterMarkHits uint64 `json:"low_water_mark_hits"`
LastDropTime time.Time `json:"last_drop_time"`
LastBlockTime time.Time `json:"last_block_time"`
BackpressureActive bool `json:"backpressure_active"`
// contains filtered or unexported fields
}
BackpressureMetrics contains metrics for backpressure handling
type BackpressureStrategy ¶
type BackpressureStrategy string
BackpressureStrategy defines how backpressure should be handled
const ( // BackpressureNone disables backpressure handling (default behavior) BackpressureNone BackpressureStrategy = "none" // BackpressureDropOldest drops the oldest events when buffer is full BackpressureDropOldest BackpressureStrategy = "drop_oldest" // BackpressureDropNewest drops the newest events when buffer is full BackpressureDropNewest BackpressureStrategy = "drop_newest" // BackpressureBlock blocks the producer when buffer is full BackpressureBlock BackpressureStrategy = "block" // BackpressureBlockWithTimeout blocks the producer with a timeout BackpressureBlockWithTimeout BackpressureStrategy = "block_timeout" )
type BaseConfig ¶
type BaseConfig struct {
// Transport type (e.g., "websocket", "http", "grpc")
Type string `json:"type"`
// Endpoint URL or address
Endpoint string `json:"endpoint"`
// Connection timeout
Timeout time.Duration `json:"timeout"`
// Custom headers
Headers map[string]string `json:"headers,omitempty"`
// TLS configuration for secure connections
TLS *tls.Config `json:"-"`
// EnableCompression enables data compression
EnableCompression bool `json:"enable_compression"`
// MaxMessageSize sets the maximum message size in bytes
MaxMessageSize int64 `json:"max_message_size"`
// ReadBufferSize sets the read buffer size
ReadBufferSize int `json:"read_buffer_size"`
// WriteBufferSize sets the write buffer size
WriteBufferSize int `json:"write_buffer_size"`
// KeepAlive configuration
KeepAlive *KeepAliveConfig `json:"keep_alive,omitempty"`
// Retry configuration
Retry *RetryConfig `json:"retry,omitempty"`
// Circuit breaker configuration
CircuitBreaker *CircuitBreakerConfig `json:"circuit_breaker,omitempty"`
// Authentication configuration
Auth *AuthConfig `json:"auth,omitempty"`
// Metrics configuration
Metrics *MetricsConfig `json:"metrics,omitempty"`
// Custom metadata
Metadata map[string]any `json:"metadata,omitempty"`
}
BaseConfig provides common configuration fields for all transport types.
func (*BaseConfig) Clone ¶
func (c *BaseConfig) Clone() Config
Clone creates a deep copy of the base configuration.
func (*BaseConfig) GetEndpoint ¶
func (c *BaseConfig) GetEndpoint() string
GetEndpoint returns the endpoint URL.
func (*BaseConfig) GetHeaders ¶
func (c *BaseConfig) GetHeaders() map[string]string
GetHeaders returns the custom headers.
func (*BaseConfig) GetTimeout ¶
func (c *BaseConfig) GetTimeout() time.Duration
GetTimeout returns the connection timeout.
func (*BaseConfig) GetType ¶
func (c *BaseConfig) GetType() string
GetType returns the transport type.
func (*BaseConfig) IsSecure ¶
func (c *BaseConfig) IsSecure() bool
IsSecure returns true if the connection uses TLS.
func (*BaseConfig) Validate ¶
func (c *BaseConfig) Validate() error
Validate validates the base configuration.
type BatchEvent ¶
type BatchEvent[T EventData] struct { // BatchID is the unique identifier for this batch BatchID string `json:"batch_id"` // Events contains the events in this batch Events []TypedTransportEvent[T] `json:"events"` // BatchSize is the number of events in this batch BatchSize int `json:"batch_size"` // MaxBatchSize is the maximum allowed batch size MaxBatchSize int `json:"max_batch_size"` // CreatedAt when the batch was created CreatedAt time.Time `json:"created_at"` // CompletedAt when the batch processing completed (nil if not completed) CompletedAt *time.Time `json:"completed_at,omitempty"` // ProcessingDuration how long the batch took to process ProcessingDuration time.Duration `json:"processing_duration,omitempty"` // Status indicates the batch status (pending, processing, completed, failed) Status string `json:"status"` // Errors contains any processing errors Errors []error `json:"errors,omitempty"` // SuccessCount number of successfully processed events SuccessCount int `json:"success_count"` // FailureCount number of failed events FailureCount int `json:"failure_count"` // Metadata contains additional batch metadata Metadata map[string]interface{} `json:"metadata,omitempty"` // ProcessorID identifies the processor that handled this batch ProcessorID string `json:"processor_id,omitempty"` // Priority indicates batch processing priority Priority int `json:"priority,omitempty"` // RetryAttempt for failed batch retries RetryAttempt int `json:"retry_attempt,omitempty"` // MaxRetries maximum number of retry attempts MaxRetries int `json:"max_retries,omitempty"` }
BatchEvent represents a collection of events processed together
func (BatchEvent[T]) ToMap ¶
func (b BatchEvent[T]) ToMap() map[string]interface{}
ToMap converts the batch event data to a map for backward compatibility
func (BatchEvent[T]) Validate ¶
func (b BatchEvent[T]) Validate() error
Validate ensures the batch event data is valid
type BatchSender ¶
type BatchSender interface {
// SendBatch sends multiple events in a single batch operation.
SendBatch(ctx context.Context, events []TransportEvent) error
}
BatchSender handles batch operations
type BatchValidator ¶
type BatchValidator struct {
// contains filtered or unexported fields
}
BatchValidator validates multiple events in batches
func NewBatchValidator ¶
func NewBatchValidator(validator Validator, batchSize int) *BatchValidator
NewBatchValidator creates a new batch validator
func (*BatchValidator) ValidateBatch ¶
func (bv *BatchValidator) ValidateBatch(ctx context.Context, events []TransportEvent) []error
ValidateBatch validates a batch of events
type BoolValue ¶
type BoolValue struct{ Value bool }
BoolValue wraps a boolean value
func (BoolValue) ErrorString ¶
type CacheStats ¶
type CacheStats struct {
Hits uint64
Misses uint64
Size int
MaxSize int
Evictions uint64
TotalOps uint64
HitRate float64
LastHitTime time.Time
LastMissTime time.Time
// contains filtered or unexported fields
}
CacheStats tracks cache performance metrics
type CachedValidator ¶
type CachedValidator struct {
// contains filtered or unexported fields
}
CachedValidator implements a validator with caching for performance
func NewCachedValidator ¶
func NewCachedValidator(validator Validator, maxCacheSize int, cacheTTL time.Duration) *CachedValidator
NewCachedValidator creates a new cached validator
func (*CachedValidator) ClearCache ¶
func (cv *CachedValidator) ClearCache()
ClearCache clears the validation cache
func (*CachedValidator) GetCacheStats ¶
func (cv *CachedValidator) GetCacheStats() CacheStats
GetCacheStats returns cache statistics
func (*CachedValidator) Validate ¶
func (cv *CachedValidator) Validate(ctx context.Context, event TransportEvent) error
Validate validates with caching
func (*CachedValidator) ValidateIncoming ¶
func (cv *CachedValidator) ValidateIncoming(ctx context.Context, event TransportEvent) error
ValidateIncoming validates incoming events with caching
func (*CachedValidator) ValidateOutgoing ¶
func (cv *CachedValidator) ValidateOutgoing(ctx context.Context, event TransportEvent) error
ValidateOutgoing validates outgoing events with caching
type Capabilities ¶
type Capabilities struct {
// Streaming indicates if the transport supports streaming
Streaming bool
// Bidirectional indicates if the transport supports bidirectional communication
Bidirectional bool
// MaxMessageSize is the maximum message size supported (0 for unlimited)
MaxMessageSize int64
// ProtocolVersion is the version of the transport protocol
ProtocolVersion string
// Features is a map of feature names to their values
Features map[string]interface{}
}
Capabilities describes basic transport characteristics. Simplified from the previous complex capability system.
type ChaosTransport ¶
type ChaosTransport struct {
*AdvancedMockTransport
// contains filtered or unexported fields
}
ChaosTransport introduces random failures and delays for chaos testing
func NewChaosTransport ¶
func NewChaosTransport(errorRate float64) *ChaosTransport
NewChaosTransport creates a new chaos transport
func (*ChaosTransport) Connect ¶
func (ct *ChaosTransport) Connect(ctx context.Context) error
Connect with chaos behavior
func (*ChaosTransport) Send ¶
func (ct *ChaosTransport) Send(ctx context.Context, event TransportEvent) error
Send with chaos behavior
type CircuitBreaker ¶
type CircuitBreaker interface {
// Execute executes an operation with circuit breaker protection.
Execute(ctx context.Context, operation func() error) error
// IsOpen returns true if the circuit breaker is open.
IsOpen() bool
// Reset resets the circuit breaker state.
Reset()
// GetState returns the current circuit breaker state.
GetState() CircuitBreakerState
}
CircuitBreaker provides circuit breaker functionality for transport operations.
type CircuitBreakerConfig ¶
type CircuitBreakerConfig struct {
// Enabled enables circuit breaker
Enabled bool `json:"enabled"`
// FailureThreshold specifies the failure threshold to open the circuit
FailureThreshold int `json:"failure_threshold"`
// RecoveryTimeout specifies the recovery timeout
RecoveryTimeout time.Duration `json:"recovery_timeout"`
// HalfOpenMaxCalls specifies the maximum calls in half-open state
HalfOpenMaxCalls int `json:"half_open_max_calls"`
// MinCalls specifies the minimum calls before circuit evaluation
MinCalls int `json:"min_calls"`
// SlidingWindowSize specifies the sliding window size
SlidingWindowSize int `json:"sliding_window_size"`
// FailureRateThreshold specifies the failure rate threshold (percentage)
FailureRateThreshold float64 `json:"failure_rate_threshold"`
}
CircuitBreakerConfig contains circuit breaker configuration.
func (*CircuitBreakerConfig) Clone ¶
func (c *CircuitBreakerConfig) Clone() *CircuitBreakerConfig
Clone creates a deep copy of the circuit breaker configuration.
func (*CircuitBreakerConfig) Validate ¶
func (c *CircuitBreakerConfig) Validate() error
Validate validates the circuit breaker configuration.
type CircuitBreakerState ¶
type CircuitBreakerState int
CircuitBreakerState represents the state of a circuit breaker.
const ( // CircuitClosed indicates the circuit breaker is closed (normal operation). CircuitClosed CircuitBreakerState = iota // CircuitOpen indicates the circuit breaker is open (rejecting requests). CircuitOpen // CircuitHalfOpen indicates the circuit breaker is half-open (testing). CircuitHalfOpen )
func (CircuitBreakerState) String ¶
func (s CircuitBreakerState) String() string
String returns the string representation of the circuit breaker state.
type CleanupCompletionRule ¶
type CleanupCompletionRule struct{}
CleanupCompletionRule validates cleanup completion
func (*CleanupCompletionRule) Description ¶
func (r *CleanupCompletionRule) Description() string
func (*CleanupCompletionRule) Name ¶
func (r *CleanupCompletionRule) Name() string
func (*CleanupCompletionRule) Validate ¶
func (r *CleanupCompletionRule) Validate(ctx context.Context, result *CleanupValidationResult) error
type CleanupConfig ¶
type CleanupConfig struct {
// MaxCleanupDuration is the maximum time allowed for cleanup
MaxCleanupDuration time.Duration
// PhaseTimeout is the timeout for each cleanup phase
PhaseTimeout time.Duration
// EnableStackTrace enables stack trace capture for debugging
EnableStackTrace bool
// Logger for cleanup events
Logger Logger
}
CleanupConfig configures cleanup behavior
func DefaultCleanupConfig ¶
func DefaultCleanupConfig() CleanupConfig
DefaultCleanupConfig returns default cleanup configuration
type CleanupCoordinator ¶
type CleanupCoordinator struct {
// contains filtered or unexported fields
}
CleanupCoordinator coordinates cleanup across multiple components to prevent deadlocks
func NewCleanupCoordinator ¶
func NewCleanupCoordinator(config CoordinatorConfig) *CleanupCoordinator
NewCleanupCoordinator creates a new cleanup coordinator
func (*CleanupCoordinator) GetGroupStatus ¶
func (cc *CleanupCoordinator) GetGroupStatus(groupID string) (GroupStatus, error)
GetGroupStatus returns the status of a cleanup group
func (*CleanupCoordinator) GetStatus ¶
func (cc *CleanupCoordinator) GetStatus() CoordinatorStatus
GetStatus returns the overall coordinator status
func (*CleanupCoordinator) RegisterGroup ¶
func (cc *CleanupCoordinator) RegisterGroup(group *CleanupGroup) error
RegisterGroup registers a cleanup group
func (*CleanupCoordinator) Shutdown ¶
func (cc *CleanupCoordinator) Shutdown(ctx context.Context) error
Shutdown initiates coordinated cleanup
func (*CleanupCoordinator) Wait ¶
func (cc *CleanupCoordinator) Wait() error
Wait waits for cleanup to complete
type CleanupGroup ¶
type CleanupGroup struct {
ID string
Name string
Priority int // Lower values = higher priority
Dependencies []string // IDs of groups that must complete first
Operations []CleanupOperation
// contains filtered or unexported fields
}
CleanupGroup represents a group of related cleanup operations
type CleanupManager ¶
type CleanupManager struct {
// contains filtered or unexported fields
}
CleanupManager handles periodic cleanup of growing maps and resources
func NewCleanupManager ¶
func NewCleanupManager(config *CleanupManagerConfig) *CleanupManager
NewCleanupManager creates a new cleanup manager
func (*CleanupManager) GetMetrics ¶
func (cm *CleanupManager) GetMetrics() CleanupMetrics
GetMetrics returns current cleanup metrics
func (*CleanupManager) RegisterTask ¶
func (cm *CleanupManager) RegisterTask(name string, ttl time.Duration, cleanupFunc func() (int, error)) error
RegisterTask registers a cleanup task
func (*CleanupManager) RunTaskNow ¶
func (cm *CleanupManager) RunTaskNow(name string) error
RunTaskNow runs a specific cleanup task immediately
func (*CleanupManager) Start ¶
func (cm *CleanupManager) Start() error
Start begins the cleanup manager
func (*CleanupManager) UnregisterTask ¶
func (cm *CleanupManager) UnregisterTask(name string) error
UnregisterTask removes a cleanup task
type CleanupManagerConfig ¶
type CleanupManagerConfig struct {
DefaultTTL time.Duration
CheckInterval time.Duration
Logger *zap.Logger
}
CleanupManagerConfig configures the cleanup manager
func DefaultCleanupManagerConfig ¶
func DefaultCleanupManagerConfig() *CleanupManagerConfig
DefaultCleanupManagerConfig returns default configuration
type CleanupMetrics ¶
type CleanupMetrics struct {
TotalTasks int
ActiveTasks int
TotalRuns uint64
TotalItemsCleaned uint64
TotalErrors uint64
LastCleanupTime time.Time
AverageCleanupDuration time.Duration
TaskMetrics map[string]*TaskMetrics
// contains filtered or unexported fields
}
CleanupMetrics tracks cleanup statistics
type CleanupOperation ¶
type CleanupOperation struct {
Name string
Description string
Timeout time.Duration
Func func(context.Context) error
// Options
ContinueOnError bool // Continue with next operation even if this fails
Retryable bool // Can retry if fails
MaxRetries int // Maximum retry attempts
}
CleanupOperation represents a single cleanup operation
type CleanupPhase ¶
type CleanupPhase int
CleanupPhase represents phases of cleanup process
const ( // CleanupPhaseNone indicates no cleanup in progress CleanupPhaseNone CleanupPhase = iota // CleanupPhaseInitiated indicates cleanup has been initiated CleanupPhaseInitiated // CleanupPhaseStoppingWorkers indicates stopping worker goroutines CleanupPhaseStoppingWorkers // CleanupPhaseClosingConnections indicates closing network connections CleanupPhaseClosingConnections // CleanupPhaseReleasingResources indicates releasing memory and file resources CleanupPhaseReleasingResources // CleanupPhaseFinalizing indicates final cleanup steps CleanupPhaseFinalizing // CleanupPhaseComplete indicates cleanup is complete CleanupPhaseComplete )
func (CleanupPhase) String ¶
func (p CleanupPhase) String() string
String returns string representation of cleanup phase
type CleanupStats ¶
type CleanupStats struct {
TotalTracked int64
TotalCleaned int64
CleanupErrors int64
CleanupDuration time.Duration
PhaseDurations map[CleanupPhase]time.Duration
CurrentPhase CleanupPhase
ResourcesLeaked []LeakedResource
}
CleanupStats contains cleanup statistics
type CleanupTask ¶
type CleanupTask struct {
Name string
TTL time.Duration
LastCleanup time.Time
CleanupFunc func() (itemsCleaned int, err error)
// Statistics - cache line padded to prevent false sharing
TotalRuns atomic.Uint64
TotalCleaned atomic.Uint64
TotalErrors atomic.Uint64
LastError error
LastErrorTime time.Time
// contains filtered or unexported fields
}
CleanupTask represents a cleanup task
type CleanupTimeoutRule ¶
type CleanupTimeoutRule struct {
// contains filtered or unexported fields
}
CleanupTimeoutRule validates cleanup timing
func (*CleanupTimeoutRule) Description ¶
func (r *CleanupTimeoutRule) Description() string
func (*CleanupTimeoutRule) Name ¶
func (r *CleanupTimeoutRule) Name() string
func (*CleanupTimeoutRule) Validate ¶
func (r *CleanupTimeoutRule) Validate(ctx context.Context, result *CleanupValidationResult) error
type CleanupTracker ¶
type CleanupTracker struct {
// contains filtered or unexported fields
}
CleanupTracker tracks resources and ensures proper cleanup order
func NewCleanupTracker ¶
func NewCleanupTracker(config CleanupConfig) *CleanupTracker
NewCleanupTracker creates a new cleanup tracker
func (*CleanupTracker) Cleanup ¶
func (ct *CleanupTracker) Cleanup(ctx context.Context) error
Cleanup performs ordered cleanup of all tracked resources
func (*CleanupTracker) GetPhase ¶
func (ct *CleanupTracker) GetPhase() CleanupPhase
GetPhase returns the current cleanup phase
func (*CleanupTracker) GetStats ¶
func (ct *CleanupTracker) GetStats() CleanupStats
GetStats returns cleanup statistics
func (*CleanupTracker) Track ¶
func (ct *CleanupTracker) Track(id string, resourceType ResourceType, description string, cleanupFunc func() error, dependencies ...string)
Track registers a resource for cleanup tracking
func (*CleanupTracker) Untrack ¶
func (ct *CleanupTracker) Untrack(id string)
Untrack removes a resource from tracking (used when resource is cleaned up normally)
func (*CleanupTracker) Wait ¶
func (ct *CleanupTracker) Wait() error
Wait waits for cleanup to complete
type CleanupValidationConfig ¶
type CleanupValidationConfig struct {
// EnableMemoryValidation enables memory leak detection
EnableMemoryValidation bool
// EnableGoroutineValidation enables goroutine leak detection
EnableGoroutineValidation bool
// EnableResourceValidation enables resource leak detection
EnableResourceValidation bool
// MemoryThreshold is the memory increase threshold to consider a leak
MemoryThreshold int64
// GoroutineThreshold is the goroutine increase threshold to consider a leak
GoroutineThreshold int
// ValidationTimeout is the timeout for validation operations
ValidationTimeout time.Duration
// Logger for validation events
Logger Logger
}
CleanupValidationConfig configures cleanup validation
func DefaultCleanupValidationConfig ¶
func DefaultCleanupValidationConfig() CleanupValidationConfig
DefaultCleanupValidationConfig returns default validation configuration
type CleanupValidationError ¶
type CleanupValidationError struct {
Rule string
Message string
Severity ErrorSeverity
Component string
Timestamp time.Time
Details map[string]interface{}
}
CleanupValidationError represents a validation error
func (CleanupValidationError) String ¶
func (ve CleanupValidationError) String() string
String returns string representation of validation error
type CleanupValidationResult ¶
type CleanupValidationResult struct {
ComponentID string
StartTime time.Time
EndTime time.Time
Duration time.Duration
Success bool
Errors []error
// Memory statistics
MemoryBefore runtime.MemStats
MemoryAfter runtime.MemStats
MemoryDelta int64
// Goroutine statistics
GoroutinesBefore int
GoroutinesAfter int
GoroutineDelta int
// Resource tracking
ResourcesTracked int
ResourcesCleaned int
ResourcesLeaked int
LeakedResources []string
// Cleanup phases
PhaseResults map[CleanupPhase]PhaseResult
// Validation results
ValidationErrors []CleanupValidationError
RulePassed map[string]bool
RuleErrors map[string]error
}
CleanupValidationResult contains the result of a cleanup validation
type CleanupValidationRule ¶
type CleanupValidationRule interface {
Name() string
Description() string
Validate(ctx context.Context, result *CleanupValidationResult) error
}
CleanupValidationRule defines a rule for validating cleanup
type CleanupValidator ¶
type CleanupValidator struct {
// contains filtered or unexported fields
}
CleanupValidator validates that cleanup operations complete successfully
func NewCleanupValidator ¶
func NewCleanupValidator(config CleanupValidationConfig) *CleanupValidator
NewCleanupValidator creates a new cleanup validator
func (*CleanupValidator) AddRule ¶
func (cv *CleanupValidator) AddRule(rule CleanupValidationRule)
AddRule adds a validation rule
func (*CleanupValidator) GetAllValidationResults ¶
func (cv *CleanupValidator) GetAllValidationResults() map[string]*CleanupValidationResult
GetAllValidationResults returns all validation results
func (*CleanupValidator) GetValidationErrors ¶
func (cv *CleanupValidator) GetValidationErrors() []CleanupValidationError
GetValidationErrors returns all validation errors
func (*CleanupValidator) GetValidationResult ¶
func (cv *CleanupValidator) GetValidationResult(componentID string) (*CleanupValidationResult, bool)
GetValidationResult returns the validation result for a component
func (*CleanupValidator) ValidateCleanup ¶
func (cv *CleanupValidator) ValidateCleanup(ctx context.Context, componentID string, tracker *CleanupTracker) (*CleanupValidationResult, error)
ValidateCleanup validates a cleanup operation
type CompositeEvent ¶
type CompositeEvent struct {
BaseEvent SimpleTransportEvent
Events []TransportEvent
}
CompositeEvent represents a transport event that contains multiple child events
func (*CompositeEvent) Data ¶
func (e *CompositeEvent) Data() map[string]interface{}
Data returns the composite event data including child events
func (*CompositeEvent) Timestamp ¶
func (e *CompositeEvent) Timestamp() time.Time
Timestamp returns the composite event timestamp
func (*CompositeEvent) Type ¶
func (e *CompositeEvent) Type() string
Type returns the composite event type
type Compressor ¶
type Compressor interface {
// Compress compresses the input data.
Compress(data []byte) ([]byte, error)
// Decompress decompresses the input data.
Decompress(data []byte) ([]byte, error)
// Algorithm returns the compression algorithm name.
Algorithm() string
// CompressionRatio returns the achieved compression ratio.
CompressionRatio() float64
}
Compressor handles compression and decompression of serialized data.
type ConcurrentTest ¶
type ConcurrentTest struct {
// contains filtered or unexported fields
}
ConcurrentTest helps with concurrent testing scenarios
func NewConcurrentTest ¶
func NewConcurrentTest() *ConcurrentTest
NewConcurrentTest creates a new concurrent test helper
func (*ConcurrentTest) Run ¶
func (ct *ConcurrentTest) Run(count int, fn func(id int) error)
Run runs a function concurrently N times
func (*ConcurrentTest) Wait ¶
func (ct *ConcurrentTest) Wait() []error
Wait waits for all concurrent operations to complete
type ConditionalEvent ¶
type ConditionalEvent[T EventData] struct { // ConditionID identifies the condition to evaluate ConditionID string `json:"condition_id"` // Event is the wrapped event that may be processed conditionally Event TypedTransportEvent[T] `json:"event"` // Condition defines the condition to evaluate Condition *EventCondition `json:"condition"` // IsConditionMet indicates if the condition has been evaluated and met IsConditionMet *bool `json:"is_condition_met,omitempty"` // EvaluatedAt when the condition was last evaluated EvaluatedAt *time.Time `json:"evaluated_at,omitempty"` // EvaluationContext contains context used for condition evaluation EvaluationContext map[string]interface{} `json:"evaluation_context,omitempty"` // AlternativeAction defines what to do if condition is not met AlternativeAction string `json:"alternative_action,omitempty"` // MaxEvaluationAttempts limits how many times condition can be evaluated MaxEvaluationAttempts int `json:"max_evaluation_attempts,omitempty"` // EvaluationAttempts tracks how many times condition has been evaluated EvaluationAttempts int `json:"evaluation_attempts"` // TimeoutAt when the conditional event expires TimeoutAt *time.Time `json:"timeout_at,omitempty"` // Dependencies lists other conditions this condition depends on Dependencies []string `json:"dependencies,omitempty"` // Priority for condition evaluation ordering Priority int `json:"priority,omitempty"` // RetryPolicy for condition evaluation failures RetryPolicy *EventRetryPolicy `json:"retry_policy,omitempty"` }
ConditionalEvent represents an event with conditional logic
func (ConditionalEvent[T]) ToMap ¶
func (c ConditionalEvent[T]) ToMap() map[string]interface{}
ToMap converts the conditional event data to a map for backward compatibility
func (ConditionalEvent[T]) Validate ¶
func (c ConditionalEvent[T]) Validate() error
Validate ensures the conditional event data is valid
type ConditionalRule ¶
type ConditionalRule struct {
Name string
Condition func(interface{}) bool
Validator TypedValidator[any]
Description string
}
ConditionalRule defines a condition and the validators to apply when it's met
type ConditionalValidator ¶
type ConditionalValidator struct {
// contains filtered or unexported fields
}
ConditionalValidator applies validation rules based on conditions
func NewConditionalValidator ¶
func NewConditionalValidator(name string) *ConditionalValidator
NewConditionalValidator creates a new conditional validator
func (*ConditionalValidator) AddFieldCondition ¶
func (v *ConditionalValidator) AddFieldCondition(name, field string, condition func(interface{}) bool, validator TypedValidator[any], description string) *ConditionalValidator
AddFieldCondition adds a rule that applies a validator when a specific field meets a condition
func (*ConditionalValidator) AddRule ¶
func (v *ConditionalValidator) AddRule(rule ConditionalRule) *ConditionalValidator
AddRule adds a conditional validation rule
func (*ConditionalValidator) IsEnabled ¶
func (v *ConditionalValidator) IsEnabled() bool
IsEnabled returns whether the validator is enabled
func (*ConditionalValidator) Name ¶
func (v *ConditionalValidator) Name() string
Name returns the validator name
func (*ConditionalValidator) Priority ¶
func (v *ConditionalValidator) Priority() int
Priority returns the validator priority
func (*ConditionalValidator) SetEnabled ¶
func (v *ConditionalValidator) SetEnabled(enabled bool) *ConditionalValidator
SetEnabled enables or disables the validator
func (*ConditionalValidator) SetPriority ¶
func (v *ConditionalValidator) SetPriority(priority int) *ConditionalValidator
SetPriority sets the validator priority
func (*ConditionalValidator) Validate ¶
func (v *ConditionalValidator) Validate(ctx context.Context, value interface{}) ValidationResult
Validate applies conditional validation rules
type Config ¶
type Config interface {
ConfigValidation
ConfigMetadata
ConfigTimeouts
ConfigHeaders
}
Config represents the interface for transport configuration. Composed of focused interfaces following Interface Segregation Principle
type ConfigBuilder ¶
type ConfigBuilder struct {
// contains filtered or unexported fields
}
ConfigBuilder provides a fluent interface for building transport configurations.
func NewConfigBuilder ¶
func NewConfigBuilder(transportType string) *ConfigBuilder
NewConfigBuilder creates a new configuration builder.
func (*ConfigBuilder) Build ¶
func (b *ConfigBuilder) Build() (Config, error)
Build builds and validates the configuration.
func (*ConfigBuilder) MustBuild ¶
func (b *ConfigBuilder) MustBuild() Config
MustBuild builds the configuration and panics if validation fails.
func (*ConfigBuilder) WithAuth ¶
func (b *ConfigBuilder) WithAuth(auth *AuthConfig) *ConfigBuilder
WithAuth sets authentication configuration.
func (*ConfigBuilder) WithEndpoint ¶
func (b *ConfigBuilder) WithEndpoint(endpoint string) *ConfigBuilder
WithEndpoint sets the endpoint URL.
func (*ConfigBuilder) WithHeaders ¶
func (b *ConfigBuilder) WithHeaders(headers map[string]string) *ConfigBuilder
WithHeaders sets custom headers.
func (*ConfigBuilder) WithMetrics ¶
func (b *ConfigBuilder) WithMetrics(metrics *MetricsConfig) *ConfigBuilder
WithMetrics sets metrics configuration.
func (*ConfigBuilder) WithRetry ¶
func (b *ConfigBuilder) WithRetry(retry *RetryConfig) *ConfigBuilder
WithRetry sets retry configuration.
func (*ConfigBuilder) WithTLS ¶
func (b *ConfigBuilder) WithTLS(tls *tls.Config) *ConfigBuilder
WithTLS sets TLS configuration.
func (*ConfigBuilder) WithTimeout ¶
func (b *ConfigBuilder) WithTimeout(timeout time.Duration) *ConfigBuilder
WithTimeout sets the connection timeout.
type ConfigError ¶
type ConfigError = LegacyConfigurationError
ConfigError is an alias for LegacyConfigurationError to maintain compatibility
type ConfigHeaders ¶
type ConfigHeaders interface {
// GetHeaders returns custom headers for the transport.
GetHeaders() map[string]string
}
ConfigHeaders provides header configuration
type ConfigImpact ¶
type ConfigImpact string
const ( ConfigImpactNone ConfigImpact = "none" ConfigImpactLow ConfigImpact = "low" ConfigImpactMedium ConfigImpact = "medium" ConfigImpactHigh ConfigImpact = "high" ConfigImpactCritical ConfigImpact = "critical" )
type ConfigMetadata ¶
type ConfigMetadata interface {
// GetType returns the transport type (e.g., "websocket", "http", "grpc").
GetType() string
// GetEndpoint returns the endpoint URL or address.
GetEndpoint() string
// IsSecure returns true if the transport uses secure connections.
IsSecure() bool
}
ConfigMetadata provides configuration metadata
type ConfigProvider ¶
type ConfigProvider interface {
// Config returns the transport's configuration.
Config() Config
}
ConfigProvider provides configuration access
type ConfigTimeouts ¶
type ConfigTimeouts interface {
// GetTimeout returns the connection timeout.
GetTimeout() time.Duration
}
ConfigTimeouts provides timeout configuration
type ConfigValidation ¶
type ConfigValidation interface {
// Validate validates the configuration.
Validate() error
// Clone creates a deep copy of the configuration.
Clone() Config
}
ConfigValidation handles configuration validation
type ConfigurationError ¶
type ConfigurationError[T ErrorValue] struct { Field string Value T Message string }
ConfigurationError represents a type-safe configuration-related error
func NewBoolConfigError ¶
func NewBoolConfigError(field string, value bool, message string) *ConfigurationError[BoolValue]
NewBoolConfigError creates a configuration error for boolean values
func NewConfigurationError ¶
func NewConfigurationError[T ErrorValue](field string, value T, message string) *ConfigurationError[T]
NewConfigurationError creates a new type-safe configuration error
func NewFloatConfigError ¶
func NewFloatConfigError(field string, value float64, message string) *ConfigurationError[FloatValue]
NewFloatConfigError creates a configuration error for float values
func NewGenericConfigError ¶
func NewGenericConfigError(field string, value interface{}, message string) *ConfigurationError[GenericValue]
NewGenericConfigError creates a configuration error for any value type (fallback)
func NewIntConfigError ¶
func NewIntConfigError(field string, value int, message string) *ConfigurationError[IntValue]
NewIntConfigError creates a configuration error for integer values
func NewNilConfigError ¶
func NewNilConfigError(field, message string) *ConfigurationError[NilValue]
NewNilConfigError creates a configuration error for nil/missing values
func NewStringConfigError ¶
func NewStringConfigError(field, value, message string) *ConfigurationError[StringValue]
NewStringConfigError creates a configuration error for string values
func (*ConfigurationError[T]) Error ¶
func (e *ConfigurationError[T]) Error() string
type ConfigurationEventData ¶
type ConfigurationEventData struct {
// Configuration change details
Key string `json:"key" validate:"required"`
OldValue string `json:"old_value,omitempty"`
NewValue string `json:"new_value" validate:"required"`
ValueType string `json:"value_type" validate:"required"`
// Change metadata
ChangedBy string `json:"changed_by,omitempty"`
Reason string `json:"reason,omitempty"`
Source string `json:"source,omitempty"`
Namespace string `json:"namespace,omitempty"`
// Validation and rollback
Validated bool `json:"validated"`
Applied bool `json:"applied"`
CanRollback bool `json:"can_rollback"`
Impact ConfigImpact `json:"impact"`
}
ConfigurationEventData represents configuration change events
func (*ConfigurationEventData) ToMap ¶
func (c *ConfigurationEventData) ToMap() map[string]interface{}
func (*ConfigurationEventData) Validate ¶
func (c *ConfigurationEventData) Validate() error
type ConnectionCallback ¶
type ConnectionCallback func(state ConnectionState, err error)
ConnectionCallback is called when the connection state changes.
type ConnectionCapabilities ¶
type ConnectionCapabilities struct {
// Maximum message size supported
MaxMessageSize int64 `json:"max_message_size,omitempty"`
// Protocol version
ProtocolVersion string `json:"protocol_version,omitempty"`
// Custom extension capabilities for backward compatibility
Extensions map[string]string `json:"extensions,omitempty"`
}
ConnectionCapabilities represents basic connection information. Simplified from the previous complex capability negotiation system.
func (ConnectionCapabilities) IsEmpty ¶
func (c ConnectionCapabilities) IsEmpty() bool
IsEmpty returns true if the ConnectionCapabilities struct has no set values
func (ConnectionCapabilities) ToMap ¶
func (c ConnectionCapabilities) ToMap() map[string]interface{}
ToMap converts ConnectionCapabilities to map[string]interface{} for backward compatibility
type ConnectionError ¶
ConnectionError represents a connection-related error
func (*ConnectionError) Error ¶
func (e *ConnectionError) Error() string
func (*ConnectionError) Unwrap ¶
func (e *ConnectionError) Unwrap() error
type ConnectionEventData ¶
type ConnectionEventData struct {
// Status indicates the connection status (connected, disconnected, reconnecting, etc.)
Status string `json:"status"`
// RemoteAddress is the address of the remote endpoint
RemoteAddress string `json:"remote_address,omitempty"`
// LocalAddress is the local address used for the connection
LocalAddress string `json:"local_address,omitempty"`
// Protocol is the protocol used (http, websocket, grpc, etc.)
Protocol string `json:"protocol,omitempty"`
// Version is the protocol version
Version string `json:"version,omitempty"`
// Capabilities are the negotiated capabilities
Capabilities ConnectionCapabilities `json:"capabilities,omitempty"`
// Error contains error information if the connection failed
Error string `json:"error,omitempty"`
// AttemptNumber for reconnection events
AttemptNumber int `json:"attempt_number,omitempty"`
}
ConnectionEventData represents connection-related event data
func (ConnectionEventData) ToMap ¶
func (c ConnectionEventData) ToMap() map[string]interface{}
ToMap converts the connection event data to a map for backward compatibility
func (ConnectionEventData) Validate ¶
func (c ConnectionEventData) Validate() error
Validate ensures the connection event data is valid
type ConnectionEventOption ¶
type ConnectionEventOption func(*ConnectionEventData)
ConnectionEventOption is a functional option for configuring ConnectionEventData
func WithAttemptNumber ¶
func WithAttemptNumber(attempt int) ConnectionEventOption
WithAttemptNumber sets the attempt number for a connection event
func WithCapabilities ¶
func WithCapabilities(capabilities ConnectionCapabilities) ConnectionEventOption
WithCapabilities sets the capabilities for a connection event
func WithConnectionError ¶
func WithConnectionError(err string) ConnectionEventOption
WithConnectionError sets the error for a connection event
func WithLocalAddress ¶
func WithLocalAddress(address string) ConnectionEventOption
WithLocalAddress sets the local address for a connection event
func WithProtocol ¶
func WithProtocol(protocol string) ConnectionEventOption
WithProtocol sets the protocol for a connection event
func WithRemoteAddress ¶
func WithRemoteAddress(address string) ConnectionEventOption
WithRemoteAddress sets the remote address for a connection event
func WithVersion ¶
func WithVersion(version string) ConnectionEventOption
WithVersion sets the protocol version for a connection event
type ConnectionFactory ¶
type ConnectionFactory interface {
CreateConnection(ctx context.Context) (net.Conn, error)
ValidateConnection(conn net.Conn) bool
CloseConnection(conn net.Conn) error
}
ConnectionFactory creates new connections
type ConnectionHandler ¶
type ConnectionHandler func(state ConnectionState, err error)
ConnectionHandler is called when the connection state changes.
type ConnectionPool ¶
type ConnectionPool struct {
// contains filtered or unexported fields
}
ConnectionPool manages a pool of network connections
func NewConnectionPool ¶
func NewConnectionPool(config *ConnectionPoolConfig, factory ConnectionFactory) (*ConnectionPool, error)
NewConnectionPool creates a new connection pool
func (*ConnectionPool) Acquire ¶
func (p *ConnectionPool) Acquire(ctx context.Context) (*PooledConnection, error)
Acquire gets a connection from the pool
func (*ConnectionPool) Close ¶
func (p *ConnectionPool) Close() error
Close closes the connection pool with enhanced cleanup and leak prevention
func (*ConnectionPool) GetPoolInfo ¶
func (p *ConnectionPool) GetPoolInfo() map[string]interface{}
GetPoolInfo returns current pool information
func (*ConnectionPool) GetStats ¶
func (p *ConnectionPool) GetStats() *PoolStats
GetStats returns pool statistics
func (*ConnectionPool) Return ¶
func (p *ConnectionPool) Return(conn *PooledConnection) error
Return returns a connection to the pool
type ConnectionPoolConfig ¶
type ConnectionPoolConfig struct {
// Pool sizing
InitialSize int // Initial number of connections
MaxSize int // Maximum number of connections
MinIdleSize int // Minimum number of idle connections
MaxIdleSize int // Maximum number of idle connections
// Connection management
MaxIdleTime time.Duration // Maximum time a connection can be idle
MaxLifetime time.Duration // Maximum lifetime of a connection
AcquireTimeout time.Duration // Timeout for acquiring a connection
// Health checking
HealthCheckInterval time.Duration // Interval for health checks
HealthCheckTimeout time.Duration // Timeout for health checks
// Network settings
DialTimeout time.Duration // Timeout for dialing new connections
KeepAlive time.Duration // TCP keep-alive interval
MaxConnLifetime time.Duration // Maximum connection lifetime
// Validation
ValidateOnAcquire bool // Validate connection when acquired
ValidateOnReturn bool // Validate connection when returned
}
ConnectionPoolConfig contains configuration for connection pooling
func DefaultConnectionPoolConfig ¶
func DefaultConnectionPoolConfig() *ConnectionPoolConfig
DefaultConnectionPoolConfig returns default connection pool configuration
type ConnectionState ¶
type ConnectionState int
ConnectionState represents the current state of a transport connection.
const ( // StateDisconnected indicates the transport is not connected. StateDisconnected ConnectionState = iota // StateConnecting indicates the transport is attempting to connect. StateConnecting // StateConnected indicates the transport is connected and ready. StateConnected // StateReconnecting indicates the transport is attempting to reconnect. StateReconnecting // StateClosing indicates the transport is closing the connection. StateClosing // StateClosed indicates the transport is closed. StateClosed // StateError indicates the transport is in an error state. StateError )
func (ConnectionState) String ¶
func (s ConnectionState) String() string
String returns the string representation of the connection state.
type Connector ¶
type Connector = TransportConnection
Connector handles connection lifecycle Deprecated: Use TransportConnection instead for consistency
type ContextBestPractices ¶
type ContextBestPractices struct{}
ContextBestPractices provides documentation for context handling best practices
func (ContextBestPractices) Example ¶
func (ContextBestPractices) Example()
Example demonstrates best practices for context usage in transport operations
type ContextConfig ¶
type ContextConfig struct {
// DefaultTimeout is the default timeout for operations without explicit timeouts
DefaultTimeout time.Duration
// ConnectTimeout is the timeout for establishing connections
ConnectTimeout time.Duration
// SendTimeout is the timeout for send operations
SendTimeout time.Duration
// ReceiveTimeout is the timeout for receive operations
ReceiveTimeout time.Duration
// ShutdownTimeout is the timeout for graceful shutdown
ShutdownTimeout time.Duration
// RetryTimeout is the timeout for retry operations
RetryTimeout time.Duration
}
ContextConfig defines standard timeout configurations for transport operations
func DefaultContextConfig ¶
func DefaultContextConfig() *ContextConfig
DefaultContextConfig returns the default context configuration
func (*ContextConfig) Validate ¶
func (c *ContextConfig) Validate() error
Validate validates the context configuration
func (*ContextConfig) WithConnectTimeout ¶
func (c *ContextConfig) WithConnectTimeout(ctx context.Context) (context.Context, context.CancelFunc)
WithConnectTimeout creates a context with the connect timeout
func (*ContextConfig) WithDefaultTimeout ¶
func (c *ContextConfig) WithDefaultTimeout(ctx context.Context) (context.Context, context.CancelFunc)
WithDefaultTimeout creates a context with the default timeout
func (*ContextConfig) WithReceiveTimeout ¶
func (c *ContextConfig) WithReceiveTimeout(ctx context.Context) (context.Context, context.CancelFunc)
WithReceiveTimeout creates a context with the receive timeout
func (*ContextConfig) WithRetryTimeout ¶
func (c *ContextConfig) WithRetryTimeout(ctx context.Context) (context.Context, context.CancelFunc)
WithRetryTimeout creates a context with the retry timeout
func (*ContextConfig) WithSendTimeout ¶
func (c *ContextConfig) WithSendTimeout(ctx context.Context) (context.Context, context.CancelFunc)
WithSendTimeout creates a context with the send timeout
func (*ContextConfig) WithShutdownTimeout ¶
func (c *ContextConfig) WithShutdownTimeout(ctx context.Context) (context.Context, context.CancelFunc)
WithShutdownTimeout creates a context with the shutdown timeout
type ContextualEvent ¶
type ContextualEvent[T EventData] struct { // ContextID identifies the context ContextID string `json:"context_id"` // Event is the wrapped event Event TypedTransportEvent[T] `json:"event"` // Context contains rich contextual information Context *EventContext `json:"context"` // CorrelationID for event correlation across systems CorrelationID string `json:"correlation_id,omitempty"` // CausationID identifies the event that caused this one CausationID string `json:"causation_id,omitempty"` // TraceID for distributed tracing TraceID string `json:"trace_id,omitempty"` // SpanID for distributed tracing SpanID string `json:"span_id,omitempty"` // BusinessContext contains business-specific context BusinessContext map[string]interface{} `json:"business_context,omitempty"` // TechnicalContext contains technical context TechnicalContext map[string]interface{} `json:"technical_context,omitempty"` // UserContext contains user-specific context UserContext *UserContext `json:"user_context,omitempty"` // RequestContext contains request-specific context RequestContext *RequestContext `json:"request_context,omitempty"` // EnvironmentContext contains environment information EnvironmentContext *EnvironmentContext `json:"environment_context,omitempty"` // SecurityContext contains security-related context SecurityContext *SecurityContext `json:"security_context,omitempty"` // Tags for event categorization and filtering Tags map[string]string `json:"tags,omitempty"` // Annotations for additional metadata Annotations map[string]string `json:"annotations,omitempty"` }
ContextualEvent represents an event with rich context data
func (ContextualEvent[T]) ToMap ¶
func (c ContextualEvent[T]) ToMap() map[string]interface{}
ToMap converts the contextual event data to a map for backward compatibility
func (ContextualEvent[T]) Validate ¶
func (c ContextualEvent[T]) Validate() error
Validate ensures the contextual event data is valid
type CoordinatorConfig ¶
type CoordinatorConfig struct {
// MaxParallelCleanups limits concurrent cleanup operations
MaxParallelCleanups int
// GroupTimeout is the timeout for each cleanup group
GroupTimeout time.Duration
// EnableDeadlockDetection enables deadlock detection
EnableDeadlockDetection bool
// Logger for coordinator events
Logger Logger
}
CoordinatorConfig configures the cleanup coordinator
func DefaultCoordinatorConfig ¶
func DefaultCoordinatorConfig() CoordinatorConfig
DefaultCoordinatorConfig returns default coordinator configuration
type CoordinatorState ¶
type CoordinatorState int32
CoordinatorState represents the state of the cleanup coordinator
const ( CoordinatorStateActive CoordinatorState = iota CoordinatorStateShuttingDown CoordinatorStateStopped )
func (CoordinatorState) String ¶
func (s CoordinatorState) String() string
String returns string representation of coordinator state
type CoordinatorStatus ¶
type CoordinatorStatus struct {
State CoordinatorState
Groups map[string]GroupStatus
Sequence []string
TrackerStats CleanupStats
}
CoordinatorStatus represents the overall coordinator status
type CrossFieldRule ¶
type CrossFieldRule struct {
Name string
Fields []string
Validator func(values map[string]interface{}) error
Description string
}
CrossFieldRule defines a validation rule that compares multiple fields
type CrossFieldValidator ¶
type CrossFieldValidator struct {
// contains filtered or unexported fields
}
CrossFieldValidator validates relationships between multiple fields
func CreateDateRangeValidator ¶
func CreateDateRangeValidator(startField, endField string) *CrossFieldValidator
CreateDateRangeValidator creates a cross-field validator for date ranges
func CreatePasswordConfirmationValidator ¶
func CreatePasswordConfirmationValidator(passwordField, confirmField string) *CrossFieldValidator
CreatePasswordConfirmationValidator creates a cross-field validator for password confirmation
func NewCrossFieldValidator ¶
func NewCrossFieldValidator(name string) *CrossFieldValidator
NewCrossFieldValidator creates a new cross-field validator
func (*CrossFieldValidator) AddComparisonRule ¶
func (v *CrossFieldValidator) AddComparisonRule(name, field1, field2 string, compare func(a, b interface{}) bool, errorMsg string) *CrossFieldValidator
AddComparisonRule adds a rule to compare two fields using a comparison function
func (*CrossFieldValidator) AddRule ¶
func (v *CrossFieldValidator) AddRule(rule CrossFieldRule) *CrossFieldValidator
AddRule adds a cross-field validation rule
func (*CrossFieldValidator) IsEnabled ¶
func (v *CrossFieldValidator) IsEnabled() bool
IsEnabled returns whether the validator is enabled
func (*CrossFieldValidator) Name ¶
func (v *CrossFieldValidator) Name() string
Name returns the validator name
func (*CrossFieldValidator) Priority ¶
func (v *CrossFieldValidator) Priority() int
Priority returns the validator priority
func (*CrossFieldValidator) SetCondition ¶
func (v *CrossFieldValidator) SetCondition(condition func(map[string]interface{}) bool) *CrossFieldValidator
SetCondition sets a condition that must be met for validation to apply
func (*CrossFieldValidator) SetEnabled ¶
func (v *CrossFieldValidator) SetEnabled(enabled bool) *CrossFieldValidator
SetEnabled enables or disables the validator
func (*CrossFieldValidator) SetPriority ¶
func (v *CrossFieldValidator) SetPriority(priority int) *CrossFieldValidator
SetPriority sets the validator priority
func (*CrossFieldValidator) Validate ¶
func (v *CrossFieldValidator) Validate(ctx context.Context, value map[string]interface{}) ValidationResult
Validate validates cross-field relationships
type DataEventData ¶
type DataEventData struct {
// Content is the actual data payload
Content []byte `json:"content"`
// ContentType indicates the MIME type or format of the content
ContentType string `json:"content_type,omitempty"`
// Encoding indicates the encoding used (utf-8, base64, etc.)
Encoding string `json:"encoding,omitempty"`
// Size is the size of the content in bytes
Size int64 `json:"size"`
// Checksum for data integrity verification
Checksum string `json:"checksum,omitempty"`
// Compressed indicates if the content is compressed
Compressed bool `json:"compressed,omitempty"`
// StreamID if this data belongs to a specific stream
StreamID string `json:"stream_id,omitempty"`
// SequenceNumber for ordered delivery
SequenceNumber uint64 `json:"sequence_number,omitempty"`
}
DataEventData represents message/data-related event data
func (DataEventData) ToMap ¶
func (d DataEventData) ToMap() map[string]interface{}
ToMap converts the data event data to a map for backward compatibility
func (DataEventData) Validate ¶
func (d DataEventData) Validate() error
Validate ensures the data event data is valid
type DataEventOption ¶
type DataEventOption func(*DataEventData)
DataEventOption is a functional option for configuring DataEventData
func WithChecksum ¶
func WithChecksum(checksum string) DataEventOption
WithChecksum sets the checksum for a data event
func WithCompressed ¶
func WithCompressed(compressed bool) DataEventOption
WithCompressed sets whether the data is compressed
func WithContentType ¶
func WithContentType(contentType string) DataEventOption
WithContentType sets the content type for a data event
func WithEncoding ¶
func WithEncoding(encoding string) DataEventOption
WithEncoding sets the encoding for a data event
func WithSequenceNumber ¶
func WithSequenceNumber(seq uint64) DataEventOption
WithSequenceNumber sets the sequence number for ordered delivery
func WithStreamID ¶
func WithStreamID(streamID string) DataEventOption
WithStreamID sets the stream ID for a data event
type DataFormatRule ¶
type DataFormatRule struct {
// contains filtered or unexported fields
}
DataFormatRule validates data format
func (*DataFormatRule) IsEnabled ¶
func (r *DataFormatRule) IsEnabled() bool
func (*DataFormatRule) Name ¶
func (r *DataFormatRule) Name() string
func (*DataFormatRule) Priority ¶
func (r *DataFormatRule) Priority() int
func (*DataFormatRule) Validate ¶
func (r *DataFormatRule) Validate(ctx context.Context, event TransportEvent) error
type DeadlockConfig ¶
type DeadlockConfig struct {
// DetectionInterval is how often to check for deadlocks
DetectionInterval time.Duration
// WaitTimeout is the maximum time to wait for a resource
WaitTimeout time.Duration
// EnableStackTrace captures stack traces for debugging
EnableStackTrace bool
// Logger for deadlock events
Logger Logger
}
DeadlockConfig configures deadlock detection
func DefaultDeadlockConfig ¶
func DefaultDeadlockConfig() DeadlockConfig
DefaultDeadlockConfig returns default deadlock detection configuration
type DeadlockDetector ¶
type DeadlockDetector struct {
// contains filtered or unexported fields
}
DeadlockDetector monitors for potential deadlocks in cleanup operations
func NewDeadlockDetector ¶
func NewDeadlockDetector(config DeadlockConfig) *DeadlockDetector
NewDeadlockDetector creates a new deadlock detector
func (*DeadlockDetector) AcquireResource ¶
func (dd *DeadlockDetector) AcquireResource(resourceID string, ownerID string)
AcquireResource records that a resource has been acquired
func (*DeadlockDetector) GenerateReport ¶
func (dd *DeadlockDetector) GenerateReport() string
GenerateReport generates a detailed report of the current state
func (*DeadlockDetector) GetAllResources ¶
func (dd *DeadlockDetector) GetAllResources() map[string]*DeadlockResource
GetAllResources returns information about all tracked resources
func (*DeadlockDetector) GetDeadlockCount ¶
func (dd *DeadlockDetector) GetDeadlockCount() int64
GetDeadlockCount returns the number of deadlocks detected
func (*DeadlockDetector) GetResourceInfo ¶
func (dd *DeadlockDetector) GetResourceInfo(resourceID string) (*DeadlockResource, bool)
GetResourceInfo returns information about a specific resource
func (*DeadlockDetector) GetWaitGraph ¶
func (dd *DeadlockDetector) GetWaitGraph() map[string][]string
GetWaitGraph returns a copy of the current wait graph
func (*DeadlockDetector) RegisterResource ¶
func (dd *DeadlockDetector) RegisterResource(id string, resourceType ResourceType)
RegisterResource registers a resource for deadlock tracking
func (*DeadlockDetector) ReleaseResource ¶
func (dd *DeadlockDetector) ReleaseResource(resourceID string)
ReleaseResource records that a resource has been released
func (*DeadlockDetector) SetDeadlockHandler ¶
func (dd *DeadlockDetector) SetDeadlockHandler(handler func(DeadlockInfo))
SetDeadlockHandler sets the callback for when deadlocks are detected
func (*DeadlockDetector) Start ¶
func (dd *DeadlockDetector) Start()
Start starts deadlock detection
func (*DeadlockDetector) StopWaitingForResource ¶
func (dd *DeadlockDetector) StopWaitingForResource(resourceID string, waiterID string)
StopWaitingForResource records that a goroutine is no longer waiting
func (*DeadlockDetector) UnregisterResource ¶
func (dd *DeadlockDetector) UnregisterResource(id string)
UnregisterResource removes a resource from deadlock tracking
func (*DeadlockDetector) WaitForResource ¶
func (dd *DeadlockDetector) WaitForResource(resourceID string, waiterID string, waitingFor []string)
WaitForResource records that a goroutine is waiting for a resource
type DeadlockInfo ¶
type DeadlockInfo struct {
Cycle []string // Resource IDs in the deadlock cycle
Resources map[string]*DeadlockResource
DetectedAt time.Time
Resolution string // How the deadlock was resolved
StackTraces map[string]string // Stack traces of involved goroutines
}
DeadlockInfo contains information about a detected deadlock
func (DeadlockInfo) String ¶
func (di DeadlockInfo) String() string
String returns a string representation of the deadlock info
type DeadlockResource ¶
type DeadlockResource struct {
ID string
Type ResourceType
Owner string // Goroutine ID that owns this resource
Waiters []string // Goroutine IDs waiting for this resource
AcquiredAt time.Time
StackTrace string // Stack trace of acquisition
// contains filtered or unexported fields
}
DeadlockResource represents a resource that can be involved in deadlocks
type DefaultLocalizationProvider ¶
type DefaultLocalizationProvider struct{}
DefaultLocalizationProvider provides English messages
func (*DefaultLocalizationProvider) GetMessage ¶
func (p *DefaultLocalizationProvider) GetMessage(key string, params map[string]interface{}) string
GetMessage returns the default English message
func (*DefaultLocalizationProvider) GetSuggestion ¶
func (p *DefaultLocalizationProvider) GetSuggestion(key string, params map[string]interface{}) string
GetSuggestion returns a default suggestion
func (*DefaultLocalizationProvider) SupportedLanguages ¶
func (p *DefaultLocalizationProvider) SupportedLanguages() []string
SupportedLanguages returns supported languages
type DefaultMiddlewareChain ¶
type DefaultMiddlewareChain struct {
// contains filtered or unexported fields
}
DefaultMiddlewareChain is the default implementation of MiddlewareChain.
func NewDefaultMiddlewareChain ¶
func NewDefaultMiddlewareChain() *DefaultMiddlewareChain
NewDefaultMiddlewareChain creates a new default middleware chain.
func (*DefaultMiddlewareChain) Add ¶
func (c *DefaultMiddlewareChain) Add(middleware Middleware)
Add adds middleware to the chain.
func (*DefaultMiddlewareChain) Clear ¶
func (c *DefaultMiddlewareChain) Clear()
Clear removes all middleware from the chain.
func (*DefaultMiddlewareChain) ProcessIncoming ¶
func (c *DefaultMiddlewareChain) ProcessIncoming(ctx context.Context, event events.Event) (events.Event, error)
ProcessIncoming processes an incoming event through the middleware chain.
func (*DefaultMiddlewareChain) ProcessOutgoing ¶
func (c *DefaultMiddlewareChain) ProcessOutgoing(ctx context.Context, event TransportEvent) (TransportEvent, error)
ProcessOutgoing processes an outgoing event through the middleware chain.
type DefaultTransportManager ¶
type DefaultTransportManager struct {
// contains filtered or unexported fields
}
TransportManager manages multiple transport instances and provides advanced features.
func NewDefaultTransportManager ¶
func NewDefaultTransportManager(registry TransportRegistry) *DefaultTransportManager
NewDefaultTransportManager creates a new default transport manager.
func NewDefaultTransportManagerWithConfig ¶
func NewDefaultTransportManagerWithConfig(registry TransportRegistry, config *TransportManagerConfig) *DefaultTransportManager
NewDefaultTransportManagerWithConfig creates a new default transport manager with custom configuration.
func (*DefaultTransportManager) AddTransport ¶
func (m *DefaultTransportManager) AddTransport(name string, transport Transport) error
AddTransport adds a transport to the manager.
func (*DefaultTransportManager) Close ¶
func (m *DefaultTransportManager) Close() error
Close closes all managed transports with improved race condition protection and timeout handling.
func (*DefaultTransportManager) GetActiveTransports ¶
func (m *DefaultTransportManager) GetActiveTransports() map[string]Transport
GetActiveTransports returns all active transports.
func (*DefaultTransportManager) GetMapCleanupMetrics ¶
func (m *DefaultTransportManager) GetMapCleanupMetrics() MapCleanupMetrics
GetMapCleanupMetrics returns the current cleanup metrics
func (*DefaultTransportManager) GetTransport ¶
func (m *DefaultTransportManager) GetTransport(name string) (Transport, error)
GetTransport retrieves a transport by name.
func (*DefaultTransportManager) ReceiveEvents ¶
ReceiveEvents returns a channel that receives events from all transports.
func (*DefaultTransportManager) RemoveTransport ¶
func (m *DefaultTransportManager) RemoveTransport(name string) error
RemoveTransport removes a transport from the manager.
func (*DefaultTransportManager) SendEvent ¶
func (m *DefaultTransportManager) SendEvent(ctx context.Context, event TransportEvent) error
SendEvent sends an event using the best available transport.
func (*DefaultTransportManager) SendEventToTransport ¶
func (m *DefaultTransportManager) SendEventToTransport(ctx context.Context, transportName string, event TransportEvent) error
SendEventToTransport sends an event to a specific transport.
func (*DefaultTransportManager) SetEventBus ¶
func (m *DefaultTransportManager) SetEventBus(eventBus EventBus)
SetEventBus sets the event bus.
func (*DefaultTransportManager) SetLoadBalancer ¶
func (m *DefaultTransportManager) SetLoadBalancer(balancer LoadBalancer)
SetLoadBalancer sets the load balancing strategy.
func (*DefaultTransportManager) SetMiddleware ¶
func (m *DefaultTransportManager) SetMiddleware(middleware MiddlewareChain)
SetMiddleware sets the middleware chain.
func (*DefaultTransportManager) Stats ¶
func (m *DefaultTransportManager) Stats() map[string]TransportStats
Stats returns aggregated statistics from all transports.
func (*DefaultTransportManager) TriggerManualCleanup ¶
func (m *DefaultTransportManager) TriggerManualCleanup()
TriggerManualCleanup manually triggers a cleanup operation
type DefaultTransportRegistry ¶
type DefaultTransportRegistry struct {
// contains filtered or unexported fields
}
DefaultTransportRegistry is the default implementation of TransportRegistry.
func NewDefaultTransportRegistry ¶
func NewDefaultTransportRegistry() *DefaultTransportRegistry
NewDefaultTransportRegistry creates a new default transport registry.
func (*DefaultTransportRegistry) Clear ¶
func (r *DefaultTransportRegistry) Clear()
Clear removes all registered factories.
func (*DefaultTransportRegistry) Create ¶
func (r *DefaultTransportRegistry) Create(config Config) (Transport, error)
Create creates a transport instance using the appropriate factory.
func (*DefaultTransportRegistry) CreateWithContext ¶
func (r *DefaultTransportRegistry) CreateWithContext(ctx context.Context, config Config) (Transport, error)
CreateWithContext creates a transport instance with context.
func (*DefaultTransportRegistry) GetFactory ¶
func (r *DefaultTransportRegistry) GetFactory(transportType string) (TransportFactory, error)
GetFactory returns the factory for a specific transport type.
func (*DefaultTransportRegistry) GetRegisteredTypes ¶
func (r *DefaultTransportRegistry) GetRegisteredTypes() []string
GetRegisteredTypes returns all registered transport types.
func (*DefaultTransportRegistry) IsRegistered ¶
func (r *DefaultTransportRegistry) IsRegistered(transportType string) bool
IsRegistered checks if a transport type is registered.
func (*DefaultTransportRegistry) Register ¶
func (r *DefaultTransportRegistry) Register(transportType string, factory TransportFactory) error
Register registers a transport factory for a specific type.
func (*DefaultTransportRegistry) Unregister ¶
func (r *DefaultTransportRegistry) Unregister(transportType string) error
Unregister removes a transport factory for a specific type.
type DefaultValidator ¶
type DefaultValidator struct {
// contains filtered or unexported fields
}
DefaultValidator is the default implementation of Validator
func NewValidator ¶
func NewValidator(config *ValidationConfig) *DefaultValidator
NewValidator creates a new validator with the given configuration
func (*DefaultValidator) Validate ¶
func (v *DefaultValidator) Validate(ctx context.Context, event TransportEvent) error
Validate validates a transport event
func (*DefaultValidator) ValidateIncoming ¶
func (v *DefaultValidator) ValidateIncoming(ctx context.Context, event TransportEvent) error
ValidateIncoming validates an incoming event
func (*DefaultValidator) ValidateOutgoing ¶
func (v *DefaultValidator) ValidateOutgoing(ctx context.Context, event TransportEvent) error
ValidateOutgoing validates an outgoing event
type DependencyValidator ¶
type DependencyValidator struct {
// contains filtered or unexported fields
}
DependencyValidator validates field dependencies (field A requires field B to be present/valid)
func NewDependencyValidator ¶
func NewDependencyValidator(name string) *DependencyValidator
NewDependencyValidator creates a new dependency validator
func (*DependencyValidator) AddDependency ¶
func (v *DependencyValidator) AddDependency(field string, requiredFields ...string) *DependencyValidator
AddDependency adds a dependency rule (if field exists, then requiredFields must also exist)
func (*DependencyValidator) IsEnabled ¶
func (v *DependencyValidator) IsEnabled() bool
IsEnabled returns whether the validator is enabled
func (*DependencyValidator) Name ¶
func (v *DependencyValidator) Name() string
Name returns the validator name
func (*DependencyValidator) Priority ¶
func (v *DependencyValidator) Priority() int
Priority returns the validator priority
func (*DependencyValidator) SetCondition ¶
func (v *DependencyValidator) SetCondition(condition func(map[string]interface{}) bool) *DependencyValidator
SetCondition sets a condition that must be met for validation to apply
func (*DependencyValidator) SetEnabled ¶
func (v *DependencyValidator) SetEnabled(enabled bool) *DependencyValidator
SetEnabled enables or disables the validator
func (*DependencyValidator) SetFieldValidator ¶
func (v *DependencyValidator) SetFieldValidator(field string, validator TypedValidator[any]) *DependencyValidator
SetFieldValidator sets a validator for a specific field
func (*DependencyValidator) SetPriority ¶
func (v *DependencyValidator) SetPriority(priority int) *DependencyValidator
SetPriority sets the validator priority
func (*DependencyValidator) Validate ¶
func (v *DependencyValidator) Validate(ctx context.Context, value map[string]interface{}) ValidationResult
Validate validates field dependencies
type DeprecationDoc ¶
type DeprecationDoc struct {
Reason string `json:"reason"`
Alternative string `json:"alternative"`
RemovalDate time.Time `json:"removal_date"`
Version string `json:"version,omitempty"`
}
DeprecationDoc documents deprecation information
type DeprecationInfo ¶
DeprecationInfo contains information about a deprecated item
type DeprecationWarning ¶
type DeprecationWarning struct {
File string
Line int
Column int
Method string
Message string
Deadline time.Time
}
DeprecationWarning represents a deprecation notice
type DocConfig ¶
type DocConfig struct {
// OutputDir is where documentation files will be written
OutputDir string
// Format specifies the output format (markdown, html, json)
Format string
// IncludeExamples includes code examples in documentation
IncludeExamples bool
// IncludeDeprecated includes deprecated items with warnings
IncludeDeprecated bool
// GenerateIndex creates an index file
GenerateIndex bool
// CustomTemplates allows custom documentation templates
CustomTemplates map[string]string
}
DocConfig configures documentation generation
type DocumentationGenerator ¶
type DocumentationGenerator struct {
// contains filtered or unexported fields
}
DocumentationGenerator creates comprehensive API documentation
func NewDocumentationGenerator ¶
func NewDocumentationGenerator(config *DocConfig) *DocumentationGenerator
NewDocumentationGenerator creates a new documentation generator
func (*DocumentationGenerator) GenerateDocumentation ¶
func (dg *DocumentationGenerator) GenerateDocumentation(sourceDir string) (*APIDocumentation, error)
GenerateDocumentation generates comprehensive API documentation
func (*DocumentationGenerator) WriteDocumentation ¶
func (dg *DocumentationGenerator) WriteDocumentation(apiDoc *APIDocumentation) error
WriteDocumentation writes the documentation to files
type DynamicSchemaValidator ¶
type DynamicSchemaValidator struct {
// contains filtered or unexported fields
}
DynamicSchemaValidator validates against schemas loaded at runtime
func NewDynamicSchemaValidator ¶
func NewDynamicSchemaValidator(name string, provider func() (*Schema, error)) *DynamicSchemaValidator
NewDynamicSchemaValidator creates a new dynamic schema validator
func (*DynamicSchemaValidator) InvalidateCache ¶
func (v *DynamicSchemaValidator) InvalidateCache()
InvalidateCache invalidates the schema cache
func (*DynamicSchemaValidator) IsEnabled ¶
func (v *DynamicSchemaValidator) IsEnabled() bool
IsEnabled returns whether the validator is enabled
func (*DynamicSchemaValidator) Name ¶
func (v *DynamicSchemaValidator) Name() string
Name returns the validator name
func (*DynamicSchemaValidator) Priority ¶
func (v *DynamicSchemaValidator) Priority() int
Priority returns the validator priority
func (*DynamicSchemaValidator) SetCacheTimeout ¶
func (v *DynamicSchemaValidator) SetCacheTimeout(timeout time.Duration) *DynamicSchemaValidator
SetCacheTimeout sets the schema cache timeout
func (*DynamicSchemaValidator) Validate ¶
func (v *DynamicSchemaValidator) Validate(ctx context.Context, value interface{}) ValidationResult
Validate validates against the dynamic schema
type EnvironmentContext ¶
type EnvironmentContext struct {
// Hostname of the machine
Hostname string `json:"hostname,omitempty"`
// Platform (linux, windows, darwin, etc.)
Platform string `json:"platform,omitempty"`
// Architecture (amd64, arm64, etc.)
Architecture string `json:"architecture,omitempty"`
// RuntimeVersion (Go version, etc.)
RuntimeVersion string `json:"runtime_version,omitempty"`
// ContainerID if running in container
ContainerID string `json:"container_id,omitempty"`
// PodName if running in Kubernetes
PodName string `json:"pod_name,omitempty"`
// Namespace Kubernetes namespace
Namespace string `json:"namespace,omitempty"`
// NodeName Kubernetes node
NodeName string `json:"node_name,omitempty"`
// Environment variables relevant to the event
EnvironmentVars map[string]string `json:"environment_vars,omitempty"`
}
EnvironmentContext represents environment information
type ErrorCause ¶
type ErrorCause struct {
Type string `json:"type"`
Message string `json:"message"`
Source string `json:"source,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
}
ErrorCause represents a related error or cause
type ErrorContext ¶
type ErrorContext struct {
RequestID string `json:"request_id,omitempty"`
Operation string `json:"operation,omitempty"`
Resource string `json:"resource,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
RemoteAddr string `json:"remote_addr,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
QueryParams map[string]string `json:"query_params,omitempty"`
}
ErrorContext provides contextual information about an error
type ErrorDetails ¶
type ErrorDetails struct {
// Error code or identifier
Code string `json:"code,omitempty"`
// Error category (network, protocol, auth, etc.)
Category string `json:"category,omitempty"`
// HTTP status code if applicable
HTTPStatus int `json:"http_status,omitempty"`
// Retry information
Retry ErrorRetryInfo `json:"retry,omitempty"`
// Context information
Context ErrorContext `json:"context,omitempty"`
// Related errors or causes
Causes []ErrorCause `json:"causes,omitempty"`
// Custom fields for specific error types
Custom map[string]string `json:"custom,omitempty"`
}
ErrorDetails represents structured error information
func (ErrorDetails) IsEmpty ¶
func (e ErrorDetails) IsEmpty() bool
IsEmpty returns true if the ErrorDetails struct has no set values
func (ErrorDetails) ToMap ¶
func (e ErrorDetails) ToMap() map[string]interface{}
ToMap converts ErrorDetails to map[string]interface{} for backward compatibility
type ErrorEventData ¶
type ErrorEventData struct {
// Message is the error message
Message string `json:"message"`
// Code is an error code for programmatic handling
Code string `json:"code,omitempty"`
// Severity indicates the error severity (fatal, error, warning, info)
Severity string `json:"severity,omitempty"`
// Category categorizes the error (network, protocol, validation, etc.)
Category string `json:"category,omitempty"`
// Retryable indicates if the operation can be retried
Retryable bool `json:"retryable"`
// Details contains additional error context
Details ErrorDetails `json:"details,omitempty"`
// StackTrace for debugging (should be omitted in production)
StackTrace string `json:"stack_trace,omitempty"`
// RequestID for request correlation
RequestID string `json:"request_id,omitempty"`
}
ErrorEventData represents error-related event data
func (ErrorEventData) ToMap ¶
func (e ErrorEventData) ToMap() map[string]interface{}
ToMap converts the error event data to a map for backward compatibility
func (ErrorEventData) Validate ¶
func (e ErrorEventData) Validate() error
Validate ensures the error event data is valid
type ErrorEventOption ¶
type ErrorEventOption func(*ErrorEventData)
ErrorEventOption is a functional option for configuring ErrorEventData
func WithErrorCategory ¶
func WithErrorCategory(category string) ErrorEventOption
WithErrorCategory sets the error category
func WithErrorCode ¶
func WithErrorCode(code string) ErrorEventOption
WithErrorCode sets the error code
func WithErrorDetails ¶
func WithErrorDetails(details ErrorDetails) ErrorEventOption
WithErrorDetails sets additional error details
func WithErrorSeverity ¶
func WithErrorSeverity(severity string) ErrorEventOption
WithErrorSeverity sets the error severity
func WithRequestID ¶
func WithRequestID(requestID string) ErrorEventOption
WithRequestID sets the request ID for correlation
func WithRetryable ¶
func WithRetryable(retryable bool) ErrorEventOption
WithRetryable sets whether the error is retryable
func WithStackTrace ¶
func WithStackTrace(stackTrace string) ErrorEventOption
WithStackTrace sets the stack trace (use sparingly, not in production)
type ErrorField ¶
ErrorField represents an error field with special handling
func SafeErr ¶
func SafeErr(err error) ErrorField
func SafeError ¶
func SafeError(key string, err error) ErrorField
func (ErrorField) ToField ¶
func (ef ErrorField) ToField() Field
ToField converts an ErrorField to a legacy Field
type ErrorRetryInfo ¶
type ErrorRetryInfo struct {
Retryable bool `json:"retryable"`
RetryAfter time.Duration `json:"retry_after,omitempty"`
MaxRetries int `json:"max_retries,omitempty"`
BackoffType string `json:"backoff_type,omitempty"`
}
ErrorRetryInfo provides retry guidance
type ErrorSeverity ¶
type ErrorSeverity int
ErrorSeverity represents the severity of a validation error
const ( SeverityInfoError ErrorSeverity = iota SeverityWarningError SeverityErrorError SeverityCritical )
func (ErrorSeverity) String ¶
func (s ErrorSeverity) String() string
String returns string representation of error severity
type ErrorSimulator ¶
type ErrorSimulator struct {
// contains filtered or unexported fields
}
ErrorSimulator provides utilities for simulating various error conditions
func NewErrorSimulator ¶
func NewErrorSimulator() *ErrorSimulator
NewErrorSimulator creates a new error simulator
func (*ErrorSimulator) SetError ¶
func (s *ErrorSimulator) SetError(operation string, err error)
SetError sets an error to be returned for a specific operation
func (*ErrorSimulator) SetErrorFrequency ¶
func (s *ErrorSimulator) SetErrorFrequency(operation string, frequency int)
SetErrorFrequency sets how often an error should occur (every N calls)
func (*ErrorSimulator) ShouldError ¶
func (s *ErrorSimulator) ShouldError(operation string) (error, bool)
ShouldError returns whether an error should be simulated for this call
type ErrorValue ¶
type ErrorValue interface {
// ErrorString returns a string representation of the value for error messages
ErrorString() string
}
ErrorValue defines the interface for allowed error context types
func ValidateErrorValue ¶
func ValidateErrorValue(value interface{}) ErrorValue
ValidateErrorValue validates an error value and returns a standardized ErrorValue
type EventAggregator ¶
type EventAggregator interface {
// ReceiveEvents returns a channel that receives events from all transports.
ReceiveEvents(ctx context.Context) (<-chan any, error)
}
EventAggregator aggregates events from multiple sources
type EventBus ¶
type EventBus interface {
// Subscribe subscribes to events of a specific type.
Subscribe(ctx context.Context, eventType string, handler EventHandler) error
// Unsubscribe removes a subscription.
Unsubscribe(ctx context.Context, eventType string, handler EventHandler) error
// Publish publishes an event to all subscribers.
Publish(ctx context.Context, eventType string, event any) error
// Close closes the event bus.
Close(ctx context.Context) error
}
EventBus provides event bus capabilities for decoupled communication.
type EventCondition ¶
type EventCondition struct {
// Type indicates the condition type (field_match, event_count, time_window, etc.)
Type string `json:"type"`
// Expression is the condition expression (varies by type)
Expression string `json:"expression"`
// Parameters contains condition-specific parameters
Parameters map[string]interface{} `json:"parameters,omitempty"`
// Operator for comparison conditions (eq, ne, gt, lt, gte, lte, in, not_in, etc.)
Operator string `json:"operator,omitempty"`
// ExpectedValue for comparison conditions
ExpectedValue interface{} `json:"expected_value,omitempty"`
// FieldPath for field-based conditions
FieldPath string `json:"field_path,omitempty"`
// TimeWindow for time-based conditions
TimeWindow time.Duration `json:"time_window,omitempty"`
// EventTypes for event-type-based conditions
EventTypes []string `json:"event_types,omitempty"`
// MinCount for count-based conditions
MinCount *int `json:"min_count,omitempty"`
// MaxCount for count-based conditions
MaxCount *int `json:"max_count,omitempty"`
}
EventCondition defines a condition for conditional events
type EventContext ¶
type EventContext struct {
// Timestamp when the context was captured
Timestamp time.Time `json:"timestamp"`
// Version of the context schema
Version string `json:"version"`
// Source system or component that generated the event
Source string `json:"source"`
// SourceVersion version of the source system
SourceVersion string `json:"source_version,omitempty"`
// Environment (development, staging, production, etc.)
Environment string `json:"environment"`
// Region or datacenter location
Region string `json:"region,omitempty"`
// Tenant or organization identifier
TenantID string `json:"tenant_id,omitempty"`
// ServiceName of the originating service
ServiceName string `json:"service_name,omitempty"`
// ServiceInstance identifier
ServiceInstance string `json:"service_instance,omitempty"`
// ProcessID of the generating process
ProcessID int `json:"process_id,omitempty"`
// ThreadID of the generating thread
ThreadID string `json:"thread_id,omitempty"`
}
EventContext represents comprehensive event context
type EventData ¶
type EventData interface {
// Validate ensures the event data is valid
Validate() error
// ToMap converts the event data to a map[string]interface{} for backward compatibility
ToMap() map[string]interface{}
}
EventData is the base interface that all typed event data must implement. This provides a common contract for event data validation and serialization.
type EventFilter ¶
type EventFilter interface {
// ShouldProcess returns true if the event should be processed.
ShouldProcess(event any) bool
// Priority returns the filter priority (higher values are processed first).
Priority() int
// Name returns the filter name for logging and debugging.
Name() string
}
EventFilter represents a filter for events based on type or other criteria.
type EventHandler ¶
EventHandler is a callback function for handling received events.
type EventHandlerProvider ¶
type EventHandlerProvider interface {
// SetEventHandler sets a callback function to handle received events.
SetEventHandler(handler EventHandler)
}
EventHandlerProvider allows setting event handlers
type EventHandlerSlicePool ¶
type EventHandlerSlicePool struct {
// contains filtered or unexported fields
}
EventHandlerSlicePool manages event handler slices
func NewEventHandlerSlicePool ¶
func NewEventHandlerSlicePool() *EventHandlerSlicePool
NewEventHandlerSlicePool creates a pool for event handler slices
func (*EventHandlerSlicePool) Get ¶
func (p *EventHandlerSlicePool) Get() []interface{}
Get returns an event handler slice
func (*EventHandlerSlicePool) Put ¶
func (p *EventHandlerSlicePool) Put(slice []interface{})
Put returns a slice to the pool
type EventRetryPolicy ¶
type EventRetryPolicy struct {
// MaxRetries maximum number of retry attempts
MaxRetries int `json:"max_retries"`
// InitialDelay before first retry
InitialDelay time.Duration `json:"initial_delay"`
// MaxDelay maximum delay between retries
MaxDelay time.Duration `json:"max_delay"`
// BackoffMultiplier for exponential backoff
BackoffMultiplier float64 `json:"backoff_multiplier"`
// Jitter adds randomness to delay
Jitter bool `json:"jitter"`
}
EventRetryPolicy defines retry behavior for conditional events
type EventRouter ¶
type EventRouter interface {
// SendEvent sends an event using the best available transport.
SendEvent(ctx context.Context, event any) error
// SendEventToTransport sends an event to a specific transport.
SendEventToTransport(ctx context.Context, transportName string, event any) error
}
EventRouter routes events to appropriate transports
type EventTypeRule ¶
type EventTypeRule struct {
// contains filtered or unexported fields
}
EventTypeRule validates event types
func (*EventTypeRule) IsEnabled ¶
func (r *EventTypeRule) IsEnabled() bool
func (*EventTypeRule) Name ¶
func (r *EventTypeRule) Name() string
func (*EventTypeRule) Priority ¶
func (r *EventTypeRule) Priority() int
func (*EventTypeRule) Validate ¶
func (r *EventTypeRule) Validate(ctx context.Context, event TransportEvent) error
type ExampleDoc ¶
type ExampleDoc struct {
Name string `json:"name"`
Description string `json:"description"`
Code string `json:"code"`
Output string `json:"output,omitempty"`
}
ExampleDoc documents a code example
type FastLogger ¶
type FastLogger struct {
// contains filtered or unexported fields
}
FastLogger provides zero-allocation logging for hot paths
func NewFastLogger ¶
func NewFastLogger(w io.Writer) *FastLogger
NewFastLogger creates a logger optimized for hot paths
func (*FastLogger) LogFast ¶
func (fl *FastLogger) LogFast(level LogLevel, message string)
LogFast logs a message with minimal allocations
type FastValidator ¶
type FastValidator struct {
// contains filtered or unexported fields
}
FastValidator implements a lightweight validator for high-throughput scenarios
func NewFastValidator ¶
func NewFastValidator(config *ValidationConfig) *FastValidator
NewFastValidator creates a new fast validator
func (*FastValidator) Validate ¶
func (fv *FastValidator) Validate(ctx context.Context, event TransportEvent) error
Validate performs fast validation
func (*FastValidator) ValidateIncoming ¶
func (fv *FastValidator) ValidateIncoming(ctx context.Context, event TransportEvent) error
ValidateIncoming validates incoming events
func (*FastValidator) ValidateOutgoing ¶
func (fv *FastValidator) ValidateOutgoing(ctx context.Context, event TransportEvent) error
ValidateOutgoing validates outgoing events
type Field ¶
type Field struct {
Key string
Value interface{}
}
Field represents a key-value pair for structured logging (legacy interface{} version for compatibility)
type FieldDoc ¶
type FieldDoc struct {
Name string `json:"name"`
Type string `json:"type"`
Description string `json:"description"`
Tags string `json:"tags,omitempty"`
IsDeprecated bool `json:"is_deprecated"`
}
FieldDoc documents a struct field
type FieldProvider ¶
type FieldProvider interface {
ToField() Field
}
FieldProvider interface for type-safe field creation
type FieldValidatorRule ¶
type FieldValidatorRule struct {
// contains filtered or unexported fields
}
FieldValidatorRule validates specific fields
func (*FieldValidatorRule) IsEnabled ¶
func (r *FieldValidatorRule) IsEnabled() bool
func (*FieldValidatorRule) Name ¶
func (r *FieldValidatorRule) Name() string
func (*FieldValidatorRule) Priority ¶
func (r *FieldValidatorRule) Priority() int
func (*FieldValidatorRule) Validate ¶
func (r *FieldValidatorRule) Validate(ctx context.Context, event TransportEvent) error
type FloatValue ¶
type FloatValue struct{ Value float64 }
FloatValue wraps a float value
func (FloatValue) ErrorString ¶
func (v FloatValue) ErrorString() string
type FunctionDoc ¶
type FunctionDoc struct {
Name string `json:"name"`
Description string `json:"description"`
Signature string `json:"signature"`
Parameters []ParamDoc `json:"parameters"`
Returns []ReturnDoc `json:"returns"`
Examples []ExampleDoc `json:"examples"`
IsDeprecated bool `json:"is_deprecated"`
DeprecationInfo *DeprecationDoc `json:"deprecation_info,omitempty"`
}
FunctionDoc documents a function
type GenericValue ¶
type GenericValue struct{ Value interface{} }
GenericValue wraps any value (for backward compatibility)
func (GenericValue) ErrorString ¶
func (v GenericValue) ErrorString() string
type GoroutineLeak ¶
type GoroutineLeak struct {
Component string
Delta int
Threshold int
CountBefore int
CountAfter int
DetectedAt time.Time
StackTrace string
LeakedRoutines []string
}
GoroutineLeak represents a detected goroutine leak
func (GoroutineLeak) String ¶
func (gl GoroutineLeak) String() string
String returns string representation of goroutine leak
type GoroutineLeakRule ¶
type GoroutineLeakRule struct {
// contains filtered or unexported fields
}
GoroutineLeakRule validates goroutine count
func (*GoroutineLeakRule) Description ¶
func (r *GoroutineLeakRule) Description() string
func (*GoroutineLeakRule) Name ¶
func (r *GoroutineLeakRule) Name() string
func (*GoroutineLeakRule) Validate ¶
func (r *GoroutineLeakRule) Validate(ctx context.Context, result *CleanupValidationResult) error
type GroupState ¶
type GroupState int32
GroupState represents the state of a cleanup group
const ( GroupStatePending GroupState = iota GroupStateRunning GroupStateComplete GroupStateFailed )
func (GroupState) String ¶
func (s GroupState) String() string
String returns string representation of group state
type GroupStatus ¶
type GroupStatus struct {
ID string
Name string
State GroupState
StartTime time.Time
EndTime time.Time
Duration time.Duration
Errors []error
}
GroupStatus represents the status of a cleanup group
type HTTPConfig ¶
type HTTPConfig struct {
*BaseConfig
// Method specifies the HTTP method (GET, POST, etc.)
Method string `json:"method"`
// UserAgent specifies the User-Agent header
UserAgent string `json:"user_agent"`
// MaxIdleConns specifies the maximum number of idle connections
MaxIdleConns int `json:"max_idle_conns"`
// MaxIdleConnsPerHost specifies the maximum number of idle connections per host
MaxIdleConnsPerHost int `json:"max_idle_conns_per_host"`
// MaxConnsPerHost specifies the maximum number of connections per host
MaxConnsPerHost int `json:"max_conns_per_host"`
// IdleConnTimeout specifies the timeout for idle connections
IdleConnTimeout time.Duration `json:"idle_conn_timeout"`
// ResponseHeaderTimeout specifies the timeout for response headers
ResponseHeaderTimeout time.Duration `json:"response_header_timeout"`
// ExpectContinueTimeout specifies the timeout for Expect: 100-continue
ExpectContinueTimeout time.Duration `json:"expect_continue_timeout"`
// DisableKeepAlives disables HTTP keep-alives
DisableKeepAlives bool `json:"disable_keep_alives"`
// DisableCompression disables HTTP compression
DisableCompression bool `json:"disable_compression"`
// Transport allows custom HTTP transport
Transport *http.Transport `json:"-"`
// Client allows custom HTTP client
Client *http.Client `json:"-"`
// EnableStreaming enables HTTP/2 streaming
EnableStreaming bool `json:"enable_streaming"`
// StreamingBufferSize sets the buffer size for streaming
StreamingBufferSize int `json:"streaming_buffer_size"`
}
HTTPConfig contains configuration specific to HTTP transport.
func NewHTTPConfig ¶
func NewHTTPConfig(endpoint string) *HTTPConfig
NewHTTPConfig creates a new HTTP configuration with defaults.
func (*HTTPConfig) Clone ¶
func (c *HTTPConfig) Clone() Config
Clone creates a deep copy of the HTTP configuration.
func (*HTTPConfig) Validate ¶
func (c *HTTPConfig) Validate() error
Validate validates the HTTP configuration.
type HTTPConnectionFactory ¶
type HTTPConnectionFactory struct {
// contains filtered or unexported fields
}
HTTPConnectionFactory creates HTTP connections
func NewHTTPConnectionFactory ¶
func NewHTTPConnectionFactory(target string) *HTTPConnectionFactory
NewHTTPConnectionFactory creates a new HTTP connection factory
func (*HTTPConnectionFactory) CloseConnection ¶
func (f *HTTPConnectionFactory) CloseConnection(conn net.Conn) error
CloseConnection closes an HTTP connection
func (*HTTPConnectionFactory) CreateConnection ¶
CreateConnection creates a new HTTP connection
func (*HTTPConnectionFactory) ValidateConnection ¶
func (f *HTTPConnectionFactory) ValidateConnection(conn net.Conn) bool
ValidateConnection validates an HTTP connection
type HealthCheckManager ¶
type HealthCheckManager struct {
// contains filtered or unexported fields
}
HealthCheckManager manages health checks for transports with proper resource management.
func NewHealthCheckManager ¶
func NewHealthCheckManager() *HealthCheckManager
NewHealthCheckManager creates a new health check manager with enhanced resource tracking.
func (*HealthCheckManager) AddTransport ¶
func (m *HealthCheckManager) AddTransport(name string, transport Transport)
AddTransport adds a transport to health checking.
func (*HealthCheckManager) Close ¶
func (m *HealthCheckManager) Close() error
Close closes the health check manager with proper resource cleanup and timeout protection.
func (*HealthCheckManager) RemoveTransport ¶
func (m *HealthCheckManager) RemoveTransport(name string)
RemoveTransport removes a transport from health checking with proper cleanup.
type HealthChecker ¶
type HealthChecker interface {
// CheckHealth performs a health check on the transport.
CheckHealth(ctx context.Context) error
// IsHealthy returns true if the transport is healthy.
IsHealthy() bool
// GetHealthStatus returns detailed health status information.
GetHealthStatus() HealthStatus
}
HealthChecker provides health check capabilities for transports.
type HealthStatus ¶
type HealthStatus struct {
Healthy bool `json:"healthy"`
Timestamp time.Time `json:"timestamp"`
Latency time.Duration `json:"latency"`
Error string `json:"error,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
HealthStatus represents the health status of a transport.
type Int64Field ¶
Int64Field represents an int64 field with special handling
func SafeInt64 ¶
func SafeInt64(key string, value int64) Int64Field
func (Int64Field) ToField ¶
func (i64f Int64Field) ToField() Field
ToField converts an Int64Field to a legacy Field
type IntValue ¶
type IntValue struct{ Value int }
IntValue wraps an integer value
func (IntValue) ErrorString ¶
type InterfaceDoc ¶
type InterfaceDoc struct {
Name string `json:"name"`
Description string `json:"description"`
Methods []MethodDoc `json:"methods"`
Examples []ExampleDoc `json:"examples"`
Embeds []string `json:"embeds"`
Implementations []string `json:"implementations"`
IsDeprecated bool `json:"is_deprecated"`
DeprecationInfo *DeprecationDoc `json:"deprecation_info,omitempty"`
}
InterfaceDoc documents an interface
type KeepAliveConfig ¶
type KeepAliveConfig struct {
// Enabled enables keep-alive
Enabled bool `json:"enabled"`
// Interval specifies the keep-alive interval
Interval time.Duration `json:"interval"`
// Timeout specifies the keep-alive timeout
Timeout time.Duration `json:"timeout"`
// MaxRetries specifies the maximum number of keep-alive retries
MaxRetries int `json:"max_retries"`
}
KeepAliveConfig contains keep-alive configuration.
func (*KeepAliveConfig) Clone ¶
func (c *KeepAliveConfig) Clone() *KeepAliveConfig
Clone creates a deep copy of the keep-alive configuration.
func (*KeepAliveConfig) Validate ¶
func (c *KeepAliveConfig) Validate() error
Validate validates the keep-alive configuration.
type LeakedResource ¶
type LeakedResource struct {
ID string
Type ResourceType
Description string
CreatedAt time.Time
StackTrace string
}
LeakedResource represents a resource that was not properly cleaned
type LegacyConfigurationError ¶
Legacy ConfigurationError for backward compatibility This maintains the original interface{} behavior
func NewConfigError ¶
func NewConfigError(field string, value interface{}, message string) *LegacyConfigurationError
NewConfigError creates a legacy configuration error (backward compatibility)
func NewLegacyConfigurationError ¶
func NewLegacyConfigurationError(field string, value interface{}, message string) *LegacyConfigurationError
NewLegacyConfigurationError creates a new legacy configuration error for backward compatibility
func (*LegacyConfigurationError) Error ¶
func (e *LegacyConfigurationError) Error() string
type LoadBalancer ¶
type LoadBalancer interface {
// SelectTransport selects a transport for sending an event.
SelectTransport(transports map[string]Transport, event any) (string, error)
// UpdateStats updates the load balancer with transport statistics.
UpdateStats(transportName string, stats TransportStats)
// Name returns the load balancer name.
Name() string
}
LoadBalancer represents a load balancing strategy for multiple transports.
type LoadBalancerSetter ¶
type LoadBalancerSetter interface {
// SetLoadBalancer sets the load balancing strategy.
SetLoadBalancer(balancer LoadBalancer)
}
LoadBalancerSetter allows setting load balancing strategy
type LocalizationProvider ¶
type LocalizationProvider interface {
// GetMessage returns a localized message for the given key and parameters
GetMessage(key string, params map[string]interface{}) string
// GetSuggestion returns a localized suggestion for the given key and parameters
GetSuggestion(key string, params map[string]interface{}) string
// SupportedLanguages returns a list of supported language codes
SupportedLanguages() []string
}
LocalizationProvider defines an interface for providing localized error messages
func GetLocalizationProvider ¶
func GetLocalizationProvider() LocalizationProvider
GetLocalizationProvider returns the current localization provider
type LogEntry ¶
type LogEntry struct {
Level LogLevel
Timestamp time.Time
Message string
Fields map[string]interface{}
Error error
}
LogEntry represents a structured log entry
type LogValue ¶
type LogValue interface {
~string | ~int | ~int8 | ~int16 | ~int32 | ~int64 |
~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 |
~float32 | ~float64 | ~bool |
time.Time
}
LogValue defines the interface for type-safe log values This constraint ensures only safe, serializable types can be logged
type Logger ¶
type Logger interface {
// Legacy field-based methods for backward compatibility
Log(level LogLevel, message string, fields ...Field)
Debug(message string, fields ...Field)
Info(message string, fields ...Field)
Warn(message string, fields ...Field)
Error(message string, fields ...Field)
WithFields(fields ...Field) Logger
WithContext(ctx context.Context) Logger
// Type-safe logging methods
LogTyped(level LogLevel, message string, fields ...FieldProvider)
DebugTyped(message string, fields ...FieldProvider)
InfoTyped(message string, fields ...FieldProvider)
WarnTyped(message string, fields ...FieldProvider)
ErrorTyped(message string, fields ...FieldProvider)
WithTypedFields(fields ...FieldProvider) Logger
}
Logger defines the interface for structured logging within the transport layer
func NewLogger ¶
func NewLogger(config *LoggerConfig) Logger
NewLogger creates a new logger with the given configuration
func NewOptimizedLogger ¶
func NewOptimizedLogger(config *LoggerConfig) Logger
NewOptimizedLogger creates a new optimized logger
type LoggerConfig ¶
type LoggerConfig struct {
// Level is the minimum log level to output
Level LogLevel
// Format is the output format ("json" or "text")
Format string
// Output is the output destination (os.Stdout, os.Stderr, or file)
Output *os.File
// TimestampFormat is the format for timestamps
TimestampFormat string
// EnableCaller enables caller information in logs
EnableCaller bool
// EnableStacktrace enables stacktrace for error logs
EnableStacktrace bool
}
LoggerConfig defines configuration for the logger
func DefaultLoggerConfig ¶
func DefaultLoggerConfig() *LoggerConfig
DefaultLoggerConfig returns a default logger configuration
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager orchestrates transport operations including selection, failover, and load balancing
func NewManager ¶
func NewManager(cfg *ManagerConfig) *Manager
NewManager creates a new transport manager
func NewManagerWithLogger ¶
func NewManagerWithLogger(cfg *ManagerConfig, logger Logger) *Manager
NewManagerWithLogger creates a new transport manager with a custom logger
func (*Manager) AddMiddleware ¶
func (m *Manager) AddMiddleware(middleware ...Middleware)
AddMiddleware adds middleware to the transport stack
func (*Manager) GetActiveTransport ¶
GetActiveTransport returns the currently active transport
func (*Manager) GetBackpressureMetrics ¶
func (m *Manager) GetBackpressureMetrics() BackpressureMetrics
GetBackpressureMetrics returns the current backpressure metrics
func (*Manager) GetMetrics ¶
func (m *Manager) GetMetrics() ManagerMetrics
GetMetrics returns the manager metrics
func (*Manager) GetValidationConfig ¶
func (m *Manager) GetValidationConfig() *ValidationConfig
GetValidationConfig returns the current validation configuration
func (*Manager) IsValidationEnabled ¶
IsValidationEnabled returns whether validation is enabled
func (*Manager) Send ¶
func (m *Manager) Send(ctx context.Context, event TransportEvent) error
Send sends an event through the active transport
func (*Manager) SetTransport ¶
SetTransport sets the active transport
func (*Manager) SetValidationConfig ¶
func (m *Manager) SetValidationConfig(config *ValidationConfig)
SetValidationConfig sets the validation configuration
func (*Manager) SetValidationEnabled ¶
SetValidationEnabled enables or disables validation
type ManagerConfig ¶
type ManagerConfig struct {
Primary string
Fallback []string
BufferSize int
LogLevel string
EnableMetrics bool
Backpressure BackpressureConfig
Validation *ValidationConfig
}
ManagerConfig represents simplified transport configuration
type ManagerMetrics ¶
type ManagerMetrics struct {
TransportSwitches uint64
TotalConnections uint64
ActiveConnections uint64
FailedConnections uint64
TotalMessagesSent uint64
TotalMessagesReceived uint64
TotalBytesSent uint64
TotalBytesReceived uint64
AverageLatency time.Duration
LastTransportSwitch time.Time
TransportHealthScores map[string]float64
// contains filtered or unexported fields
}
ManagerMetrics contains metrics for the transport manager
type MapCleanupMetrics ¶
type MapCleanupMetrics struct {
TotalCleanups uint64
TransportsRemoved uint64
TransportsRetained uint64
LastCleanupDuration time.Duration
LastCleanupTime time.Time
CleanupErrors uint64
// contains filtered or unexported fields
}
MapCleanupMetrics tracks transport map cleanup operation statistics
type MapValidator ¶
type MapValidator[K comparable, V any] struct { // contains filtered or unexported fields }
MapValidator provides type-safe validation for maps with key and value rules
func CreateStringMapValidator ¶
func CreateStringMapValidator(name string) *MapValidator[string, string]
CreateStringMapValidator creates a map validator for string keys and values
func NewMapValidator ¶
func NewMapValidator[K comparable, V any](name string) *MapValidator[K, V]
NewMapValidator creates a new map validator
func (*MapValidator[K, V]) IsEnabled ¶
func (v *MapValidator[K, V]) IsEnabled() bool
IsEnabled returns whether the validator is enabled
func (*MapValidator[K, V]) Name ¶
func (v *MapValidator[K, V]) Name() string
Name returns the validator name
func (*MapValidator[K, V]) Priority ¶
func (v *MapValidator[K, V]) Priority() int
Priority returns the validator priority
func (*MapValidator[K, V]) SetCondition ¶
func (v *MapValidator[K, V]) SetCondition(condition func(map[K]V) bool) *MapValidator[K, V]
SetCondition sets a condition that must be met for validation to apply
func (*MapValidator[K, V]) SetEnabled ¶
func (v *MapValidator[K, V]) SetEnabled(enabled bool) *MapValidator[K, V]
SetEnabled enables or disables the validator
func (*MapValidator[K, V]) SetKeyValidator ¶
func (v *MapValidator[K, V]) SetKeyValidator(validator TypedValidator[K]) *MapValidator[K, V]
SetKeyValidator sets the validator for map keys
func (*MapValidator[K, V]) SetPriority ¶
func (v *MapValidator[K, V]) SetPriority(priority int) *MapValidator[K, V]
SetPriority sets the validator priority
func (*MapValidator[K, V]) SetRequiredKeys ¶
func (v *MapValidator[K, V]) SetRequiredKeys(keys ...K) *MapValidator[K, V]
SetRequiredKeys sets keys that must be present in the map
func (*MapValidator[K, V]) SetSizeRange ¶
func (v *MapValidator[K, V]) SetSizeRange(min, max *int) *MapValidator[K, V]
SetSizeRange sets the allowed size range for the map
func (*MapValidator[K, V]) SetValueValidator ¶
func (v *MapValidator[K, V]) SetValueValidator(validator TypedValidator[V]) *MapValidator[K, V]
SetValueValidator sets the validator for map values
func (*MapValidator[K, V]) Validate ¶
func (v *MapValidator[K, V]) Validate(ctx context.Context, value map[K]V) ValidationResult
Validate validates a map value
type MemoryConfig ¶
type MemoryConfig struct {
BufferSize int
}
MemoryConfig is the configuration for memory transport
func (*MemoryConfig) Clone ¶
func (c *MemoryConfig) Clone() Config
Clone creates a deep copy of the configuration
func (*MemoryConfig) GetEndpoint ¶
func (c *MemoryConfig) GetEndpoint() string
GetEndpoint returns empty for memory transport
func (*MemoryConfig) GetHeaders ¶
func (c *MemoryConfig) GetHeaders() map[string]string
GetHeaders returns empty headers for memory transport
func (*MemoryConfig) GetTimeout ¶
func (c *MemoryConfig) GetTimeout() time.Duration
GetTimeout returns the connection timeout
func (*MemoryConfig) GetType ¶
func (c *MemoryConfig) GetType() string
GetType returns the transport type
func (*MemoryConfig) IsSecure ¶
func (c *MemoryConfig) IsSecure() bool
IsSecure returns true as memory transport is always secure
func (*MemoryConfig) Validate ¶
func (c *MemoryConfig) Validate() error
Validate validates the configuration
type MemoryLeak ¶
type MemoryLeak struct {
Component string
Delta int64
Threshold int64
AllocBefore uint64
AllocAfter uint64
DetectedAt time.Time
StackTrace string
}
MemoryLeak represents a detected memory leak
func (MemoryLeak) String ¶
func (ml MemoryLeak) String() string
String returns string representation of memory leak
type MemoryLeakRule ¶
type MemoryLeakRule struct {
// contains filtered or unexported fields
}
MemoryLeakRule validates memory usage
func (*MemoryLeakRule) Description ¶
func (r *MemoryLeakRule) Description() string
func (*MemoryLeakRule) Name ¶
func (r *MemoryLeakRule) Name() string
func (*MemoryLeakRule) Validate ¶
func (r *MemoryLeakRule) Validate(ctx context.Context, result *CleanupValidationResult) error
type MemoryManager ¶
type MemoryManager struct {
// contains filtered or unexported fields
}
MemoryManager handles memory pressure monitoring and adaptive behavior
func NewMemoryManager ¶
func NewMemoryManager(config *MemoryManagerConfig) *MemoryManager
NewMemoryManager creates a new memory manager
func (*MemoryManager) ForceGC ¶
func (mm *MemoryManager) ForceGC()
ForceGC forces a garbage collection if memory pressure is high
func (*MemoryManager) GetAdaptiveBufferSize ¶
func (mm *MemoryManager) GetAdaptiveBufferSize(key string, defaultSize int) int
GetAdaptiveBufferSize returns the adaptive buffer size for a given key
func (*MemoryManager) GetMemoryPressureLevel ¶
func (mm *MemoryManager) GetMemoryPressureLevel() MemoryPressureLevel
GetMemoryPressureLevel returns the current memory pressure level
func (*MemoryManager) GetMemoryStats ¶
func (mm *MemoryManager) GetMemoryStats() runtime.MemStats
GetMemoryStats returns current runtime memory statistics
func (*MemoryManager) GetMetrics ¶
func (mm *MemoryManager) GetMetrics() MemoryMetrics
GetMetrics returns current memory metrics
func (*MemoryManager) OnMemoryPressure ¶
func (mm *MemoryManager) OnMemoryPressure(callback func(MemoryPressureLevel))
OnMemoryPressure registers a callback for memory pressure events
type MemoryManagerConfig ¶
type MemoryManagerConfig struct {
// Memory thresholds as percentage of system memory
LowMemoryPercent float64 // Default: 70%
HighMemoryPercent float64 // Default: 85%
CriticalMemoryPercent float64 // Default: 95%
MonitorInterval time.Duration // Default: 5s
Logger *zap.Logger
}
MemoryManagerConfig configures the memory manager
func DefaultMemoryManagerConfig ¶
func DefaultMemoryManagerConfig() *MemoryManagerConfig
DefaultMemoryManagerConfig returns default configuration
type MemoryMetrics ¶
type MemoryMetrics struct {
TotalAllocated uint64
HeapInUse uint64
StackInUse uint64
NumGC uint32
LastGCTime time.Time
PressureEvents map[MemoryPressureLevel]uint64
BufferResizeEvents uint64
LastPressureChangeTime time.Time
GCPauseTotal time.Duration
// contains filtered or unexported fields
}
MemoryMetrics tracks memory usage metrics
type MemoryPressureLevel ¶
type MemoryPressureLevel int
MemoryPressureLevel represents the current memory pressure
const ( MemoryPressureNormal MemoryPressureLevel = iota MemoryPressureLow MemoryPressureHigh MemoryPressureCritical )
func (MemoryPressureLevel) String ¶
func (m MemoryPressureLevel) String() string
String returns the string representation of memory pressure level
type MemoryTransport ¶
type MemoryTransport struct {
// contains filtered or unexported fields
}
MemoryTransport is an in-memory transport implementation for testing
func NewMemoryTransport ¶
func NewMemoryTransport(bufferSize int) *MemoryTransport
NewMemoryTransport creates a new in-memory transport
func (*MemoryTransport) Channels ¶
func (t *MemoryTransport) Channels() (<-chan events.Event, <-chan error)
Channels returns both event and error channels together
func (*MemoryTransport) Close ¶
func (t *MemoryTransport) Close(ctx context.Context) error
Close closes the transport
func (*MemoryTransport) Config ¶
func (t *MemoryTransport) Config() Config
Config returns the transport configuration
func (*MemoryTransport) Connect ¶
func (t *MemoryTransport) Connect(ctx context.Context) error
Connect establishes the connection
func (*MemoryTransport) Context ¶
func (t *MemoryTransport) Context() context.Context
Context returns the transport context
func (*MemoryTransport) Errors ¶
func (t *MemoryTransport) Errors() <-chan error
Errors returns the channel for receiving errors
func (*MemoryTransport) GetOption ¶
func (t *MemoryTransport) GetOption(key string) (interface{}, error)
GetOption gets a transport option
func (*MemoryTransport) IsConnected ¶
func (t *MemoryTransport) IsConnected() bool
IsConnected returns whether the transport is connected
func (*MemoryTransport) Receive ¶
func (t *MemoryTransport) Receive() <-chan events.Event
Receive returns the channel for receiving events
func (*MemoryTransport) Send ¶
func (t *MemoryTransport) Send(ctx context.Context, event TransportEvent) error
Send sends an event through the transport
func (*MemoryTransport) SetOption ¶
func (t *MemoryTransport) SetOption(key string, value interface{}) error
SetOption sets a transport option
func (*MemoryTransport) Stats ¶
func (t *MemoryTransport) Stats() TransportStats
Stats returns transport statistics
func (*MemoryTransport) Type ¶
func (t *MemoryTransport) Type() string
Type returns the transport type
type MessageAttachment ¶
type MessageEventData ¶
type MessageEventData struct {
// Core message data
Content string `json:"content" validate:"required"`
Role string `json:"role" validate:"required,oneof=user assistant system"`
Model string `json:"model,omitempty"`
MessageID string `json:"message_id,omitempty"`
ThreadID string `json:"thread_id,omitempty"`
ParentID string `json:"parent_id,omitempty"`
// Message metadata
TokenUsage *TokenUsage `json:"token_usage,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
Attachments []MessageAttachment `json:"attachments,omitempty"`
// Processing info
ProcessingTime time.Duration `json:"processing_time,omitempty"`
Temperature float64 `json:"temperature,omitempty"`
MaxTokens int `json:"max_tokens,omitempty"`
}
MessageEventData represents chat/conversation message events
func (*MessageEventData) ToMap ¶
func (m *MessageEventData) ToMap() map[string]interface{}
func (*MessageEventData) Validate ¶
func (m *MessageEventData) Validate() error
type MessageQueueConfig ¶
type MessageQueueConfig struct {
// Enabled enables message queuing
Enabled bool `json:"enabled"`
// MaxSize specifies the maximum queue size
MaxSize int `json:"max_size"`
// MaxMemory specifies the maximum memory usage for the queue
MaxMemory int64 `json:"max_memory"`
// PersistToDisk enables persisting messages to disk
PersistToDisk bool `json:"persist_to_disk"`
// PersistenceDir specifies the directory for persisted messages
PersistenceDir string `json:"persistence_dir,omitempty"`
// FlushInterval specifies the flush interval for disk persistence
FlushInterval time.Duration `json:"flush_interval"`
// DropPolicy specifies the policy for dropping messages when queue is full
DropPolicy string `json:"drop_policy"`
// Priority enables priority queuing
Priority bool `json:"priority"`
// Compression enables message compression in the queue
Compression bool `json:"compression"`
// Deduplication enables message deduplication
Deduplication bool `json:"deduplication"`
// TTL specifies the time-to-live for messages
TTL time.Duration `json:"ttl"`
}
MessageQueueConfig contains message queue configuration.
func (*MessageQueueConfig) Clone ¶
func (c *MessageQueueConfig) Clone() *MessageQueueConfig
Clone creates a deep copy of the message queue configuration.
func (*MessageQueueConfig) Validate ¶
func (c *MessageQueueConfig) Validate() error
Validate validates the message queue configuration.
type MessageSizeRule ¶
type MessageSizeRule struct {
// contains filtered or unexported fields
}
MessageSizeRule validates message size
func (*MessageSizeRule) IsEnabled ¶
func (r *MessageSizeRule) IsEnabled() bool
func (*MessageSizeRule) Name ¶
func (r *MessageSizeRule) Name() string
func (*MessageSizeRule) Priority ¶
func (r *MessageSizeRule) Priority() int
func (*MessageSizeRule) Validate ¶
func (r *MessageSizeRule) Validate(ctx context.Context, event TransportEvent) error
type MethodDoc ¶
type MethodDoc struct {
Name string `json:"name"`
Description string `json:"description"`
Signature string `json:"signature"`
Parameters []ParamDoc `json:"parameters"`
Returns []ReturnDoc `json:"returns"`
Examples []ExampleDoc `json:"examples"`
IsDeprecated bool `json:"is_deprecated"`
DeprecationInfo *DeprecationDoc `json:"deprecation_info,omitempty"`
}
MethodDoc documents a method
type Metrics ¶
type Metrics struct {
// ConnectionUptime is how long the connection has been established
ConnectionUptime time.Duration
// MessagesSent is the total number of messages sent
MessagesSent uint64
// MessagesReceived is the total number of messages received
MessagesReceived uint64
// BytesSent is the total number of bytes sent
BytesSent uint64
// BytesReceived is the total number of bytes received
BytesReceived uint64
// ErrorCount is the total number of errors encountered
ErrorCount uint64
// AverageLatency is the average message latency
AverageLatency time.Duration
// CurrentThroughput is the current throughput in messages per second
CurrentThroughput float64
// ReconnectCount is the number of reconnection attempts
ReconnectCount uint64
// LastError contains the last error encountered
LastError error
// LastErrorTime is when the last error occurred
LastErrorTime time.Time
}
Metrics contains performance metrics for a transport.
type MetricsCollector ¶
type MetricsCollector interface {
// RecordEvent records an event metric.
RecordEvent(eventType string, size int64, latency time.Duration)
// RecordError records an error metric.
RecordError(errorType string, err error)
// RecordConnection records a connection metric.
RecordConnection(connected bool, duration time.Duration)
// GetMetrics returns collected metrics.
GetMetrics() map[string]any
// Reset resets all collected metrics.
Reset()
}
MetricsCollector collects and reports transport metrics.
type MetricsConfig ¶
type MetricsConfig struct {
// Enabled enables metrics collection
Enabled bool `json:"enabled"`
// Namespace specifies the metrics namespace
Namespace string `json:"namespace"`
// Labels specifies additional labels for metrics
Labels map[string]string `json:"labels,omitempty"`
// CollectionInterval specifies the metrics collection interval
CollectionInterval time.Duration `json:"collection_interval"`
// ExportInterval specifies the metrics export interval
ExportInterval time.Duration `json:"export_interval"`
// Endpoint specifies the metrics endpoint
Endpoint string `json:"endpoint,omitempty"`
// EnableDetailedMetrics enables detailed metrics collection
EnableDetailedMetrics bool `json:"enable_detailed_metrics"`
// EnableHistograms enables histogram metrics
EnableHistograms bool `json:"enable_histograms"`
// HistogramBuckets specifies histogram bucket boundaries
HistogramBuckets []float64 `json:"histogram_buckets,omitempty"`
}
MetricsConfig contains metrics configuration.
func (*MetricsConfig) Clone ¶
func (c *MetricsConfig) Clone() *MetricsConfig
Clone creates a deep copy of the metrics configuration.
func (*MetricsConfig) Validate ¶
func (c *MetricsConfig) Validate() error
Validate validates the metrics configuration.
type MetricsEventData ¶
type MetricsEventData struct {
// MetricName is the name of the metric
MetricName string `json:"metric_name"`
// Value is the metric value
Value float64 `json:"value"`
// Unit indicates the unit of measurement
Unit string `json:"unit,omitempty"`
// Tags for metric categorization and filtering
Tags map[string]string `json:"tags,omitempty"`
// Labels for additional metric metadata (same as tags, different naming convention)
Labels map[string]string `json:"labels,omitempty"`
// SampleRate for sampled metrics
SampleRate float64 `json:"sample_rate,omitempty"`
// Interval indicates the measurement interval
Interval time.Duration `json:"interval,omitempty"`
}
MetricsEventData represents metrics-related event data
func (MetricsEventData) ToMap ¶
func (m MetricsEventData) ToMap() map[string]interface{}
ToMap converts the metrics event data to a map for backward compatibility
func (MetricsEventData) Validate ¶
func (m MetricsEventData) Validate() error
Validate ensures the metrics event data is valid
type MetricsEventOption ¶
type MetricsEventOption func(*MetricsEventData)
MetricsEventOption is a functional option for configuring MetricsEventData
func WithInterval ¶
func WithInterval(interval time.Duration) MetricsEventOption
WithInterval sets the measurement interval
func WithLabels ¶
func WithLabels(labels map[string]string) MetricsEventOption
WithLabels sets metric labels (alternative to tags)
func WithSampleRate ¶
func WithSampleRate(rate float64) MetricsEventOption
WithSampleRate sets the sample rate for sampled metrics
type MetricsManager ¶
type MetricsManager struct {
// contains filtered or unexported fields
}
MetricsManager manages metrics collection for transports.
func NewMetricsManager ¶
func NewMetricsManager() *MetricsManager
NewMetricsManager creates a new metrics manager.
func (*MetricsManager) AddTransport ¶
func (m *MetricsManager) AddTransport(name string, transport Transport)
AddTransport adds a transport to metrics collection.
func (*MetricsManager) Close ¶
func (m *MetricsManager) Close() error
Close closes the metrics manager.
func (*MetricsManager) RecordError ¶
func (m *MetricsManager) RecordError(transportName string, err error)
RecordError records an error metric.
func (*MetricsManager) RecordEvent ¶
func (m *MetricsManager) RecordEvent(transportName string, event any)
RecordEvent records an event metric.
func (*MetricsManager) RemoveTransport ¶
func (m *MetricsManager) RemoveTransport(name string)
RemoveTransport removes a transport from metrics collection.
type Middleware ¶
type Middleware interface {
// ProcessOutgoing processes outgoing events before they are sent.
ProcessOutgoing(ctx context.Context, event TransportEvent) (TransportEvent, error)
// ProcessIncoming processes incoming events before they are delivered.
ProcessIncoming(ctx context.Context, event events.Event) (events.Event, error)
// Name returns the middleware name for logging and debugging.
Name() string
// Wrap wraps a transport with this middleware.
Wrap(transport Transport) Transport
}
Middleware represents transport middleware for intercepting and modifying events.
func NewValidationMiddleware ¶
func NewValidationMiddleware(config ...*ValidationConfig) Middleware
NewValidationMiddleware creates a new validation middleware
type MiddlewareChain ¶
type MiddlewareChain interface {
// Add adds middleware to the chain.
Add(middleware Middleware)
// ProcessOutgoing processes an outgoing event through the middleware chain.
ProcessOutgoing(ctx context.Context, event TransportEvent) (TransportEvent, error)
// ProcessIncoming processes an incoming event through the middleware chain.
ProcessIncoming(ctx context.Context, event events.Event) (events.Event, error)
// Clear removes all middleware from the chain.
Clear()
}
MiddlewareChain represents a chain of middleware processors.
type MigrationConfig ¶
type MigrationConfig struct {
// SourceDir is the directory to scan for Go files
SourceDir string
// OutputDir is where migrated files should be written (if different from source)
OutputDir string
// DryRun when true will only analyze and report changes without writing
DryRun bool
// BackupOriginal when true will create .backup files
BackupOriginal bool
// TargetPackages specifies which packages to migrate (empty means all)
TargetPackages []string
// DeprecationDeadline specifies when deprecated methods will be removed
DeprecationDeadline time.Time
}
MigrationConfig defines configuration for the migration process
type MigrationMockEvent ¶
type MigrationMockEvent struct {
// contains filtered or unexported fields
}
MigrationMockEvent provides a mock event implementation for testing
func NewMigrationMockEvent ¶
func NewMigrationMockEvent(id, eventType string) *MigrationMockEvent
func (*MigrationMockEvent) Data ¶
func (me *MigrationMockEvent) Data() map[string]interface{}
func (*MigrationMockEvent) ID ¶
func (me *MigrationMockEvent) ID() string
func (*MigrationMockEvent) Timestamp ¶
func (me *MigrationMockEvent) Timestamp() time.Time
func (*MigrationMockEvent) Type ¶
func (me *MigrationMockEvent) Type() string
type MigrationMockTransport ¶
type MigrationMockTransport struct {
// contains filtered or unexported fields
}
MigrationMockTransport provides a simplified mock implementation for migration testing
func NewMigrationMockTransport ¶
func NewMigrationMockTransport() *MigrationMockTransport
NewMigrationMockTransport creates a new migration mock transport for testing
func (*MigrationMockTransport) Channels ¶
func (mt *MigrationMockTransport) Channels() (<-chan events.Event, <-chan error)
Channels implements Receiver
func (*MigrationMockTransport) Close ¶
func (mt *MigrationMockTransport) Close(ctx context.Context) error
Close implements Connector
func (*MigrationMockTransport) Config ¶
func (mt *MigrationMockTransport) Config() Config
Config implements ConfigProvider
func (*MigrationMockTransport) Connect ¶
func (mt *MigrationMockTransport) Connect(ctx context.Context) error
Connect implements Connector
func (*MigrationMockTransport) IsConnected ¶
func (mt *MigrationMockTransport) IsConnected() bool
IsConnected implements Connector
func (*MigrationMockTransport) Send ¶
func (mt *MigrationMockTransport) Send(ctx context.Context, event TransportEvent) error
Send implements Sender
func (*MigrationMockTransport) Stats ¶
func (mt *MigrationMockTransport) Stats() TransportStats
Stats implements StatsProvider
type MigrationReport ¶
type MigrationReport struct {
FilesProcessed int
FilesModified int
TransformationsApplied map[string]int // rule name -> count
Errors []error
Warnings []string
DeprecationWarnings []DeprecationWarning
}
MigrationReport contains the results of a migration operation
type MigrationRule ¶
type MigrationRule struct {
Name string
Description string
Pattern string // Go AST pattern to match
Replacement string // Replacement pattern
Priority int // Higher priority rules run first
}
MigrationRule defines a transformation rule
type MigrationTestSuite ¶
type MigrationTestSuite struct {
// contains filtered or unexported fields
}
MigrationTestSuite provides comprehensive testing utilities for transport migration
func NewMigrationTestSuite ¶
func NewMigrationTestSuite(t *testing.T) *MigrationTestSuite
NewMigrationTestSuite creates a new test suite for migration validation
func (*MigrationTestSuite) Cleanup ¶
func (mts *MigrationTestSuite) Cleanup()
Cleanup removes temporary test files
func (*MigrationTestSuite) TestBackwardCompatibility ¶
func (mts *MigrationTestSuite) TestBackwardCompatibility(oldCode, newCode string)
TestBackwardCompatibility ensures that migrated code maintains backward compatibility
func (*MigrationTestSuite) TestDeprecationDetection ¶
func (mts *MigrationTestSuite) TestDeprecationDetection(code string, expectedWarnings []string)
TestDeprecationDetection tests that deprecated patterns are correctly identified
func (*MigrationTestSuite) TestInterfaceComposition ¶
func (mts *MigrationTestSuite) TestInterfaceComposition()
TestInterfaceComposition validates that interface composition works correctly
func (*MigrationTestSuite) TestTransformationRule ¶
func (mts *MigrationTestSuite) TestTransformationRule(ruleName string, input, expected string)
TestTransformationRule tests a specific migration transformation rule
type MigrationValidator ¶
type MigrationValidator struct {
// contains filtered or unexported fields
}
MigrationValidator provides utilities to validate migration results
func NewMigrationValidator ¶
func NewMigrationValidator() *MigrationValidator
NewMigrationValidator creates a new migration validator
func (*MigrationValidator) ValidateDeprecationAnnotations ¶
func (mv *MigrationValidator) ValidateDeprecationAnnotations(code string) []string
ValidateDeprecationAnnotations checks that deprecation comments are properly formatted
func (*MigrationValidator) ValidateInterfaceImplementation ¶
func (mv *MigrationValidator) ValidateInterfaceImplementation(code string, typeName string, expectedInterfaces []string) error
ValidateInterfaceImplementation checks that a type properly implements expected interfaces
type MockEventHandler ¶
type MockEventHandler struct {
// contains filtered or unexported fields
}
MockEventHandler is a mock implementation of EventHandler
func NewMockEventHandler ¶
func NewMockEventHandler() *MockEventHandler
NewMockEventHandler creates a new mock event handler
func (*MockEventHandler) GetHandledEvents ¶
func (h *MockEventHandler) GetHandledEvents() []events.Event
GetHandledEvents returns all handled events
func (*MockEventHandler) SetBehavior ¶
SetBehavior sets custom behavior for handling events
type MockHealthChecker ¶
type MockHealthChecker struct {
// contains filtered or unexported fields
}
MockHealthChecker provides mock health checking functionality
func NewMockHealthChecker ¶
func NewMockHealthChecker() *MockHealthChecker
NewMockHealthChecker creates a new mock health checker
func (*MockHealthChecker) CheckHealth ¶
func (h *MockHealthChecker) CheckHealth(ctx context.Context) error
CheckHealth performs a health check
func (*MockHealthChecker) GetHealthStatus ¶
func (h *MockHealthChecker) GetHealthStatus() HealthStatus
GetHealthStatus returns detailed health status
func (*MockHealthChecker) IsHealthy ¶
func (h *MockHealthChecker) IsHealthy() bool
IsHealthy returns the health status
func (*MockHealthChecker) SetCheckError ¶
func (h *MockHealthChecker) SetCheckError(err error)
SetCheckError sets the error to return on health checks
func (*MockHealthChecker) SetHealthy ¶
func (h *MockHealthChecker) SetHealthy(healthy bool)
SetHealthy sets the health status
type MockManager ¶
type MockManager struct {
// contains filtered or unexported fields
}
MockManager is a mock implementation of a transport manager
func (*MockManager) IsRunning ¶
func (m *MockManager) IsRunning() bool
IsRunning returns true if the manager is running
func (*MockManager) Send ¶
func (m *MockManager) Send(ctx context.Context, event TransportEvent) error
Send sends an event
func (*MockManager) SetTransport ¶
func (m *MockManager) SetTransport(transport Transport)
SetTransport sets the transport
type MockTransport ¶
type MockTransport struct {
// contains filtered or unexported fields
}
MockTransport is a highly configurable mock implementation of the Transport interface
Example ¶
ExampleMockTransport shows how to use MockTransport in tests
// Create a mock transport
transport := NewMockTransport()
// Configure custom behavior
transport.SetConnectBehavior(func(ctx context.Context) error {
// Connect immediately for example demonstration
return nil
})
// Use in tests
ctx := context.Background()
if err := transport.Connect(ctx); err != nil {
fmt.Printf("Connect failed: %v\n", err)
return
}
// Send an event
event := NewTestEvent("example-1", "example.event")
if err := transport.Send(ctx, event); err != nil {
fmt.Printf("Send failed: %v\n", err)
}
// Check what was sent
sentEvents := transport.GetSentEvents()
fmt.Printf("Sent %d events\n", len(sentEvents))
Output: Sent 1 events
func NewMockTransport ¶
func NewMockTransport() *MockTransport
NewMockTransport creates a new mock transport with default behavior
func (*MockTransport) Channels ¶
func (m *MockTransport) Channels() (<-chan events.Event, <-chan error)
Channels implements Transport.Channels
func (*MockTransport) Close ¶
func (m *MockTransport) Close(ctx context.Context) error
Close implements Transport.Close
func (*MockTransport) Config ¶
func (m *MockTransport) Config() Config
Config implements Transport.Config
func (*MockTransport) Connect ¶
func (m *MockTransport) Connect(ctx context.Context) error
Connect implements Transport.Connect
func (*MockTransport) Errors ¶
func (m *MockTransport) Errors() <-chan error
Errors implements Transport.Errors
func (*MockTransport) GetCallCount ¶
func (m *MockTransport) GetCallCount(method string) int
GetCallCount returns the number of times a method was called
func (*MockTransport) GetSentEvents ¶
func (m *MockTransport) GetSentEvents() []TransportEvent
GetSentEvents returns all events that were sent
func (*MockTransport) IsConnected ¶
func (m *MockTransport) IsConnected() bool
IsConnected implements Transport.IsConnected
func (*MockTransport) Receive ¶
func (m *MockTransport) Receive() <-chan events.Event
Receive implements Transport.Receive
func (*MockTransport) Send ¶
func (m *MockTransport) Send(ctx context.Context, event TransportEvent) error
Send implements Transport.Send
func (*MockTransport) SetCloseBehavior ¶
func (m *MockTransport) SetCloseBehavior(fn func(ctx context.Context) error)
SetCloseBehavior sets custom behavior for Close calls
func (*MockTransport) SetConnectBehavior ¶
func (m *MockTransport) SetConnectBehavior(fn func(ctx context.Context) error)
SetConnectBehavior sets custom behavior for Connect calls
func (*MockTransport) SetConnectDelay ¶
func (m *MockTransport) SetConnectDelay(delay time.Duration)
SetConnectDelay sets a delay for Connect operations (for testing timeouts)
func (*MockTransport) SetSendBehavior ¶
func (m *MockTransport) SetSendBehavior(fn func(ctx context.Context, event TransportEvent) error)
SetSendBehavior sets custom behavior for Send calls
func (*MockTransport) SetSendDelay ¶
func (m *MockTransport) SetSendDelay(delay time.Duration)
SetSendDelay sets a delay for Send operations (for testing cancellation)
func (*MockTransport) SimulateError ¶
func (m *MockTransport) SimulateError(err error) error
SimulateError simulates an error
func (*MockTransport) SimulateEvent ¶
func (m *MockTransport) SimulateEvent(event events.Event) error
SimulateEvent simulates receiving an event
func (*MockTransport) Stats ¶
func (m *MockTransport) Stats() TransportStats
Stats implements Transport.Stats
func (*MockTransport) WasCalled ¶
func (m *MockTransport) WasCalled(method string) bool
WasCalled returns true if the method was called at least once
type NilValue ¶
type NilValue struct{}
NilValue represents a nil/missing value
func (NilValue) ErrorString ¶
type NoopLogger ¶
type NoopLogger struct{}
NoopLogger is a logger that does nothing
func (*NoopLogger) Debug ¶
func (n *NoopLogger) Debug(message string, fields ...Field)
Debug implements the Logger interface
func (*NoopLogger) DebugTyped ¶
func (n *NoopLogger) DebugTyped(message string, fields ...FieldProvider)
func (*NoopLogger) Error ¶
func (n *NoopLogger) Error(message string, fields ...Field)
Error implements the Logger interface
func (*NoopLogger) ErrorTyped ¶
func (n *NoopLogger) ErrorTyped(message string, fields ...FieldProvider)
func (*NoopLogger) Info ¶
func (n *NoopLogger) Info(message string, fields ...Field)
Info implements the Logger interface
func (*NoopLogger) InfoTyped ¶
func (n *NoopLogger) InfoTyped(message string, fields ...FieldProvider)
func (*NoopLogger) Log ¶
func (n *NoopLogger) Log(level LogLevel, message string, fields ...Field)
Log implements the Logger interface
func (*NoopLogger) LogTyped ¶
func (n *NoopLogger) LogTyped(level LogLevel, message string, fields ...FieldProvider)
Type-safe NoOp methods
func (*NoopLogger) Warn ¶
func (n *NoopLogger) Warn(message string, fields ...Field)
Warn implements the Logger interface
func (*NoopLogger) WarnTyped ¶
func (n *NoopLogger) WarnTyped(message string, fields ...FieldProvider)
func (*NoopLogger) WithContext ¶
func (n *NoopLogger) WithContext(ctx context.Context) Logger
WithContext implements the Logger interface
func (*NoopLogger) WithFields ¶
func (n *NoopLogger) WithFields(fields ...Field) Logger
WithFields implements the Logger interface
func (*NoopLogger) WithTypedFields ¶
func (n *NoopLogger) WithTypedFields(fields ...FieldProvider) Logger
type Operation ¶
type Operation struct {
Type string
Timestamp time.Time
Duration time.Duration
Args []interface{}
Result interface{}
Error error
}
Operation represents a recorded transport operation
type OptimizedLogger ¶
type OptimizedLogger struct {
// contains filtered or unexported fields
}
OptimizedLogger implements Logger with reduced allocations
func (*OptimizedLogger) Debug ¶
func (l *OptimizedLogger) Debug(message string, fields ...Field)
Debug implements the Logger interface
func (*OptimizedLogger) DebugTyped ¶
func (l *OptimizedLogger) DebugTyped(message string, fields ...FieldProvider)
func (*OptimizedLogger) Error ¶
func (l *OptimizedLogger) Error(message string, fields ...Field)
Error implements the Logger interface
func (*OptimizedLogger) ErrorTyped ¶
func (l *OptimizedLogger) ErrorTyped(message string, fields ...FieldProvider)
func (*OptimizedLogger) Info ¶
func (l *OptimizedLogger) Info(message string, fields ...Field)
Info implements the Logger interface
func (*OptimizedLogger) InfoTyped ¶
func (l *OptimizedLogger) InfoTyped(message string, fields ...FieldProvider)
func (*OptimizedLogger) Log ¶
func (l *OptimizedLogger) Log(level LogLevel, message string, fields ...Field)
Log implements the Logger interface with optimizations
func (*OptimizedLogger) LogTyped ¶
func (l *OptimizedLogger) LogTyped(level LogLevel, message string, fields ...FieldProvider)
Type-safe methods implementation
func (*OptimizedLogger) Warn ¶
func (l *OptimizedLogger) Warn(message string, fields ...Field)
Warn implements the Logger interface
func (*OptimizedLogger) WarnTyped ¶
func (l *OptimizedLogger) WarnTyped(message string, fields ...FieldProvider)
func (*OptimizedLogger) WithContext ¶
func (l *OptimizedLogger) WithContext(ctx context.Context) Logger
WithContext implements the Logger interface
func (*OptimizedLogger) WithFields ¶
func (l *OptimizedLogger) WithFields(fields ...Field) Logger
WithFields implements the Logger interface
func (*OptimizedLogger) WithTypedFields ¶
func (l *OptimizedLogger) WithTypedFields(fields ...FieldProvider) Logger
type OverflowPolicy ¶
type OverflowPolicy int
OverflowPolicy defines how to handle buffer overflow
const ( // OverflowDropOldest drops the oldest item when buffer is full OverflowDropOldest OverflowPolicy = iota // OverflowDropNewest drops the newest item when buffer is full OverflowDropNewest // OverflowBlock blocks until space is available OverflowBlock // OverflowResize dynamically resizes the buffer (up to a limit) OverflowResize )
type ParamDoc ¶
type ParamDoc struct {
Name string `json:"name"`
Type string `json:"type"`
Description string `json:"description"`
}
ParamDoc documents a parameter
type PatternValidator ¶
type PatternValidator struct {
// contains filtered or unexported fields
}
PatternValidator validates strings against complex regex patterns
func MustNewEmailPatternValidator ¶
func MustNewEmailPatternValidator() *PatternValidator
MustNewEmailPatternValidator creates a pattern validator for email addresses and panics on error. This function is provided for backward compatibility where panicking is desired. For production code, prefer using NewEmailPatternValidator() which returns an error.
func MustNewPhonePatternValidator ¶
func MustNewPhonePatternValidator() *PatternValidator
MustNewPhonePatternValidator creates a pattern validator for phone numbers and panics on error. This function is provided for backward compatibility where panicking is desired. For production code, prefer using NewPhonePatternValidator() which returns an error.
func MustNewURLPatternValidator ¶
func MustNewURLPatternValidator() *PatternValidator
MustNewURLPatternValidator creates a pattern validator for URLs and panics on error. This function is provided for backward compatibility where panicking is desired. For production code, prefer using NewURLPatternValidator() which returns an error.
func NewEmailPatternValidator ¶
func NewEmailPatternValidator() (*PatternValidator, error)
NewEmailPatternValidator creates a pattern validator for email addresses
func NewPatternValidator ¶
func NewPatternValidator(name string) *PatternValidator
NewPatternValidator creates a new pattern validator
func NewPhonePatternValidator ¶
func NewPhonePatternValidator() (*PatternValidator, error)
NewPhonePatternValidator creates a pattern validator for phone numbers
func NewURLPatternValidator ¶
func NewURLPatternValidator() (*PatternValidator, error)
NewURLPatternValidator creates a pattern validator for URLs
func (*PatternValidator) AddPattern ¶
func (v *PatternValidator) AddPattern(name, pattern string, required bool) error
AddPattern adds a named regex pattern
func (*PatternValidator) IsEnabled ¶
func (v *PatternValidator) IsEnabled() bool
IsEnabled returns whether the validator is enabled
func (*PatternValidator) Name ¶
func (v *PatternValidator) Name() string
Name returns the validator name
func (*PatternValidator) Priority ¶
func (v *PatternValidator) Priority() int
Priority returns the validator priority
func (*PatternValidator) SetAnyMatch ¶
func (v *PatternValidator) SetAnyMatch(anyMatch bool) *PatternValidator
SetAnyMatch sets whether the value must match any pattern (true) or all patterns (false)
func (*PatternValidator) SetCaseInsensitive ¶
func (v *PatternValidator) SetCaseInsensitive(caseInsensitive bool) *PatternValidator
SetCaseInsensitive sets whether pattern matching should be case insensitive
func (*PatternValidator) SetCondition ¶
func (v *PatternValidator) SetCondition(condition func(string) bool) *PatternValidator
SetCondition sets a condition that must be met for validation to apply
func (*PatternValidator) SetEnabled ¶
func (v *PatternValidator) SetEnabled(enabled bool) *PatternValidator
SetEnabled enables or disables the validator
func (*PatternValidator) SetPriority ¶
func (v *PatternValidator) SetPriority(priority int) *PatternValidator
SetPriority sets the validator priority
func (*PatternValidator) Validate ¶
func (v *PatternValidator) Validate(ctx context.Context, value string) ValidationResult
Validate validates a string against the configured patterns
type PatternValidatorRule ¶
type PatternValidatorRule struct {
// contains filtered or unexported fields
}
PatternValidatorRule validates fields against regex patterns
func (*PatternValidatorRule) IsEnabled ¶
func (r *PatternValidatorRule) IsEnabled() bool
func (*PatternValidatorRule) Name ¶
func (r *PatternValidatorRule) Name() string
func (*PatternValidatorRule) Priority ¶
func (r *PatternValidatorRule) Priority() int
func (*PatternValidatorRule) Validate ¶
func (r *PatternValidatorRule) Validate(ctx context.Context, event TransportEvent) error
type PerformanceEventData ¶
type PerformanceEventData struct {
// Performance metrics
MetricName string `json:"metric_name" validate:"required"`
Value float64 `json:"value" validate:"required"`
Unit string `json:"unit" validate:"required"`
MetricType PerformanceMetricType `json:"metric_type" validate:"required"`
// Context information
Component string `json:"component,omitempty"`
Operation string `json:"operation,omitempty"`
RequestID string `json:"request_id,omitempty"`
// Performance context
Threshold float64 `json:"threshold,omitempty"`
Baseline float64 `json:"baseline,omitempty"`
Percentile float64 `json:"percentile,omitempty"`
SampleSize int `json:"sample_size,omitempty"`
// Timing and trends
Duration time.Duration `json:"duration,omitempty"`
Trend PerformanceTrend `json:"trend"`
AlertLevel AlertLevel `json:"alert_level"`
// Additional data
Tags map[string]string `json:"tags,omitempty"`
Dimensions map[string]float64 `json:"dimensions,omitempty"`
}
PerformanceEventData represents performance metrics and profiling events
func (*PerformanceEventData) ToMap ¶
func (p *PerformanceEventData) ToMap() map[string]interface{}
func (*PerformanceEventData) Validate ¶
func (p *PerformanceEventData) Validate() error
type PerformanceMetricType ¶
type PerformanceMetricType string
const ( MetricTypeCounter PerformanceMetricType = "counter" MetricTypeGauge PerformanceMetricType = "gauge" MetricTypeHistogram PerformanceMetricType = "histogram" MetricTypeTimer PerformanceMetricType = "timer" MetricTypeRate PerformanceMetricType = "rate" )
type PerformanceTrend ¶
type PerformanceTrend string
const ( TrendUnknown PerformanceTrend = "unknown" TrendImproving PerformanceTrend = "improving" TrendStable PerformanceTrend = "stable" TrendDegrading PerformanceTrend = "degrading" TrendVolatile PerformanceTrend = "volatile" )
type PhaseResult ¶
type PhaseResult struct {
Phase CleanupPhase
StartTime time.Time
EndTime time.Time
Duration time.Duration
Success bool
ResourcesCleaned int
Errors []error
}
PhaseResult contains the result of a cleanup phase
type PoolStats ¶
type PoolStats struct {
TotalAcquired int64
TotalReturned int64
TotalCreated int64
TotalClosed int64
TotalErrors int64
AcquireTimeouts int64
ValidationErrors int64
// Timing statistics
AvgAcquireTime time.Duration
AvgCreateTime time.Duration
// contains filtered or unexported fields
}
PoolStats tracks pool statistics
type PooledConnection ¶
type PooledConnection struct {
ID string
Conn net.Conn
CreatedAt time.Time
LastUsedAt time.Time
IsHealthy bool
InUse bool
UseCount int64
ErrorCount int64
// Connection-specific data
RemoteAddr string
LocalAddr string
Protocol string
// contains filtered or unexported fields
}
PooledConnection represents a connection in the pool
func NewPooledConnection ¶
func NewPooledConnection(id string, conn net.Conn, pool *ConnectionPool) *PooledConnection
NewPooledConnection creates a new pooled connection
func (*PooledConnection) Close ¶
func (pc *PooledConnection) Close() error
Close closes the connection with enhanced cleanup
func (*PooledConnection) GetStats ¶
func (pc *PooledConnection) GetStats() map[string]interface{}
GetStats returns connection statistics
func (*PooledConnection) IsExpired ¶
func (pc *PooledConnection) IsExpired(maxLifetime, maxIdleTime time.Duration) bool
IsExpired checks if the connection has expired
func (*PooledConnection) Return ¶
func (pc *PooledConnection) Return()
Return returns the connection to the pool with enhanced leak prevention
func (*PooledConnection) Validate ¶
func (pc *PooledConnection) Validate() bool
Validate checks if the connection is healthy
type PreAllocatedSlices ¶
type PreAllocatedSlices struct {
// contains filtered or unexported fields
}
PreAllocatedSlices provides pre-allocated slices for common use cases
func (*PreAllocatedSlices) GetEventTypeSlice ¶
func (p *PreAllocatedSlices) GetEventTypeSlice() []string
GetEventTypeSlice returns an event type slice
func (*PreAllocatedSlices) GetHandlerSlice ¶
func (p *PreAllocatedSlices) GetHandlerSlice(size int) []interface{}
GetHandlerSlice returns a handler slice with appropriate capacity
func (*PreAllocatedSlices) GetLogFieldSlice ¶
func (p *PreAllocatedSlices) GetLogFieldSlice() []Field
GetLogFieldSlice returns a log field slice
type RangeValidator ¶
type RangeValidator[T comparable] struct { // contains filtered or unexported fields }
RangeValidator validates that a value falls within a specified range
func NewFloat64RangeValidator ¶
func NewFloat64RangeValidator(name string) *RangeValidator[float64]
NewFloat64RangeValidator creates a range validator for float64 values
func NewIntRangeValidator ¶
func NewIntRangeValidator(name string) *RangeValidator[int]
NewIntRangeValidator creates a range validator for integers
func NewRangeValidator ¶
func NewRangeValidator[T comparable](name string, comparer func(a, b T) int) *RangeValidator[T]
NewRangeValidator creates a new range validator
func NewStringLengthRangeValidator ¶
func NewStringLengthRangeValidator(name string) *RangeValidator[string]
NewStringLengthRangeValidator creates a range validator for string lengths
func NewTimeRangeValidator ¶
func NewTimeRangeValidator(name string) *RangeValidator[time.Time]
NewTimeRangeValidator creates a range validator for time values
func (*RangeValidator[T]) IsEnabled ¶
func (v *RangeValidator[T]) IsEnabled() bool
IsEnabled returns whether the validator is enabled
func (*RangeValidator[T]) Name ¶
func (v *RangeValidator[T]) Name() string
Name returns the validator name
func (*RangeValidator[T]) Priority ¶
func (v *RangeValidator[T]) Priority() int
Priority returns the validator priority
func (*RangeValidator[T]) SetCondition ¶
func (v *RangeValidator[T]) SetCondition(condition func(T) bool) *RangeValidator[T]
SetCondition sets a condition that must be met for validation to apply
func (*RangeValidator[T]) SetEnabled ¶
func (v *RangeValidator[T]) SetEnabled(enabled bool) *RangeValidator[T]
SetEnabled enables or disables the validator
func (*RangeValidator[T]) SetInclusive ¶
func (v *RangeValidator[T]) SetInclusive(inclusive bool) *RangeValidator[T]
SetInclusive sets whether the range bounds are inclusive
func (*RangeValidator[T]) SetPriority ¶
func (v *RangeValidator[T]) SetPriority(priority int) *RangeValidator[T]
SetPriority sets the validator priority
func (*RangeValidator[T]) SetRange ¶
func (v *RangeValidator[T]) SetRange(min, max *T) *RangeValidator[T]
SetRange sets the minimum and maximum values for the range
func (*RangeValidator[T]) Validate ¶
func (v *RangeValidator[T]) Validate(ctx context.Context, value T) ValidationResult
Validate validates that a value is within the specified range
type ReadWriter ¶
ReadWriter combines io.Reader and io.Writer for raw data transport.
type Receiver ¶
type Receiver interface {
// Channels returns event and error channels.
Channels() (<-chan events.Event, <-chan error)
}
Receiver handles receiving events
type ReconnectStrategy ¶
type ReconnectStrategy struct {
// MaxAttempts is the maximum number of reconnection attempts (0 for infinite)
MaxAttempts int
// InitialDelay is the initial delay between reconnection attempts
InitialDelay time.Duration
// MaxDelay is the maximum delay between reconnection attempts
MaxDelay time.Duration
// BackoffMultiplier is the multiplier for exponential backoff
BackoffMultiplier float64
// Jitter adds randomness to reconnection delays
Jitter bool
}
ReconnectStrategy defines how reconnection should be handled.
type RecordingTransport ¶
type RecordingTransport struct {
Transport
// contains filtered or unexported fields
}
RecordingTransport records all operations for detailed analysis
func NewRecordingTransport ¶
func NewRecordingTransport(wrapped Transport) *RecordingTransport
NewRecordingTransport creates a new recording transport
func (*RecordingTransport) Clear ¶
func (rt *RecordingTransport) Clear()
Clear clears recorded operations
func (*RecordingTransport) Close ¶
func (rt *RecordingTransport) Close(ctx context.Context) error
Close records the close operation
func (*RecordingTransport) Connect ¶
func (rt *RecordingTransport) Connect(ctx context.Context) error
Connect records the connect operation
func (*RecordingTransport) GetOperations ¶
func (rt *RecordingTransport) GetOperations() []Operation
GetOperations returns all recorded operations
func (*RecordingTransport) Send ¶
func (rt *RecordingTransport) Send(ctx context.Context, event TransportEvent) error
Send records the send operation
func (*RecordingTransport) StartRecording ¶
func (rt *RecordingTransport) StartRecording()
StartRecording starts recording operations
func (*RecordingTransport) StopRecording ¶
func (rt *RecordingTransport) StopRecording()
StopRecording stops recording operations
type RecursiveValidator ¶
type RecursiveValidator[T any] struct { // contains filtered or unexported fields }
RecursiveValidator handles recursive data structures with cycle detection
func NewRecursiveValidator ¶
func NewRecursiveValidator[T any](name string, validator TypedValidator[T]) *RecursiveValidator[T]
NewRecursiveValidator creates a new recursive validator
func (*RecursiveValidator[T]) IsEnabled ¶
func (v *RecursiveValidator[T]) IsEnabled() bool
IsEnabled returns whether the validator is enabled
func (*RecursiveValidator[T]) Name ¶
func (v *RecursiveValidator[T]) Name() string
Name returns the validator name
func (*RecursiveValidator[T]) Priority ¶
func (v *RecursiveValidator[T]) Priority() int
Priority returns the validator priority
func (*RecursiveValidator[T]) SetCondition ¶
func (v *RecursiveValidator[T]) SetCondition(condition func(T) bool) *RecursiveValidator[T]
SetCondition sets a condition that must be met for validation to apply
func (*RecursiveValidator[T]) SetEnabled ¶
func (v *RecursiveValidator[T]) SetEnabled(enabled bool) *RecursiveValidator[T]
SetEnabled enables or disables the validator
func (*RecursiveValidator[T]) SetMaxDepth ¶
func (v *RecursiveValidator[T]) SetMaxDepth(maxDepth int) *RecursiveValidator[T]
SetMaxDepth sets the maximum recursion depth
func (*RecursiveValidator[T]) SetPriority ¶
func (v *RecursiveValidator[T]) SetPriority(priority int) *RecursiveValidator[T]
SetPriority sets the validator priority
func (*RecursiveValidator[T]) Validate ¶
func (v *RecursiveValidator[T]) Validate(ctx context.Context, value T) ValidationResult
Validate validates a recursive structure with cycle detection
type ReliabilityStats ¶
type ReliabilityStats struct {
TransportStats
// Reliability-specific metrics
EventsAcknowledged int64 `json:"events_acknowledged"`
EventsUnacknowledged int64 `json:"events_unacknowledged"`
EventsRetried int64 `json:"events_retried"`
EventsTimedOut int64 `json:"events_timed_out"`
AverageAckTime time.Duration `json:"average_ack_time"`
DuplicateEvents int64 `json:"duplicate_events"`
OutOfOrderEvents int64 `json:"out_of_order_events"`
RedeliveryRate float64 `json:"redelivery_rate"`
}
ReliabilityStats contains reliability-specific statistics.
type ReliabilityStatsProvider ¶
type ReliabilityStatsProvider interface {
// GetReliabilityStats returns reliability-specific statistics.
GetReliabilityStats() ReliabilityStats
}
ReliabilityStatsProvider provides reliability statistics
type ReliableSender ¶
type ReliableSender interface {
// SendEventWithAck sends an event and waits for acknowledgment.
SendEventWithAck(ctx context.Context, event TransportEvent, timeout time.Duration) error
}
ReliableSender handles reliable event delivery
type ReliableTransport ¶
type ReliableTransport interface {
Transport
ReliableSender
AckHandlerProvider
ReliabilityStatsProvider
}
ReliableTransport extends Transport with reliability features like acknowledgments, retries, and ordered delivery.
type RequestContext ¶
type RequestContext struct {
// RequestID identifies the request
RequestID string `json:"request_id"`
// Method HTTP method or operation type
Method string `json:"method,omitempty"`
// URL or endpoint
URL string `json:"url,omitempty"`
// Headers relevant headers
Headers map[string]string `json:"headers,omitempty"`
// QueryParams query parameters
QueryParams map[string]string `json:"query_params,omitempty"`
// UserAgent client user agent
UserAgent string `json:"user_agent,omitempty"`
// ClientIP client IP address
ClientIP string `json:"client_ip,omitempty"`
// ContentType request content type
ContentType string `json:"content_type,omitempty"`
// ContentLength request content length
ContentLength int64 `json:"content_length,omitempty"`
// Referrer request referrer
Referrer string `json:"referrer,omitempty"`
}
RequestContext represents request-specific context
type RequiredFieldsRule ¶
type RequiredFieldsRule struct {
// contains filtered or unexported fields
}
RequiredFieldsRule validates required fields
func (*RequiredFieldsRule) IsEnabled ¶
func (r *RequiredFieldsRule) IsEnabled() bool
func (*RequiredFieldsRule) Name ¶
func (r *RequiredFieldsRule) Name() string
func (*RequiredFieldsRule) Priority ¶
func (r *RequiredFieldsRule) Priority() int
func (*RequiredFieldsRule) Validate ¶
func (r *RequiredFieldsRule) Validate(ctx context.Context, event TransportEvent) error
type ResourceLeak ¶
type ResourceLeak struct {
Component string
ResourceType ResourceType
ResourceID string
LeakedAt time.Time
StackTrace string
}
ResourceLeak represents a detected resource leak
func (ResourceLeak) String ¶
func (rl ResourceLeak) String() string
String returns string representation of resource leak
type ResourceLeakRule ¶
type ResourceLeakRule struct{}
ResourceLeakRule validates resource cleanup
func (*ResourceLeakRule) Description ¶
func (r *ResourceLeakRule) Description() string
func (*ResourceLeakRule) Name ¶
func (r *ResourceLeakRule) Name() string
func (*ResourceLeakRule) Validate ¶
func (r *ResourceLeakRule) Validate(ctx context.Context, result *CleanupValidationResult) error
type ResourceType ¶
type ResourceType string
ResourceType represents different types of resources to track
const ( ResourceTypeGoroutine ResourceType = "goroutine" ResourceTypeConnection ResourceType = "connection" ResourceTypeChannel ResourceType = "channel" ResourceTypeTimer ResourceType = "timer" ResourceTypeFile ResourceType = "file" ResourceTypeMemory ResourceType = "memory" ResourceTypeHandler ResourceType = "handler" ResourceTypeSubscription ResourceType = "subscription" )
type ResourceUsage ¶
type RetryConfig ¶
type RetryConfig struct {
// Enabled enables retry logic
Enabled bool `json:"enabled"`
// MaxAttempts specifies the maximum number of retry attempts
MaxAttempts int `json:"max_attempts"`
// InitialDelay specifies the initial delay between retries
InitialDelay time.Duration `json:"initial_delay"`
// MaxDelay specifies the maximum delay between retries
MaxDelay time.Duration `json:"max_delay"`
// Multiplier specifies the backoff multiplier
Multiplier float64 `json:"multiplier"`
// Jitter enables random jitter in retry delays
Jitter bool `json:"jitter"`
// RetryableErrors specifies which errors should trigger a retry
RetryableErrors []string `json:"retryable_errors,omitempty"`
}
RetryConfig contains retry configuration.
func (*RetryConfig) Clone ¶
func (c *RetryConfig) Clone() *RetryConfig
Clone creates a deep copy of the retry configuration.
func (*RetryConfig) Validate ¶
func (c *RetryConfig) Validate() error
Validate validates the retry configuration.
type RetryPolicy ¶
type RetryPolicy interface {
// ShouldRetry returns true if the operation should be retried.
ShouldRetry(attempt int, err error) bool
// NextDelay returns the delay before the next retry attempt.
NextDelay(attempt int) time.Duration
// MaxAttempts returns the maximum number of retry attempts.
MaxAttempts() int
// Reset resets the retry policy state.
Reset()
}
RetryPolicy defines retry behavior for failed operations.
type RingBuffer ¶
type RingBuffer struct {
// contains filtered or unexported fields
}
RingBuffer is a thread-safe circular buffer with configurable overflow policies
func NewRingBuffer ¶
func NewRingBuffer(config *RingBufferConfig) *RingBuffer
NewRingBuffer creates a new ring buffer
func (*RingBuffer) Capacity ¶
func (rb *RingBuffer) Capacity() int
Capacity returns the current capacity of the buffer
func (*RingBuffer) Drain ¶
func (rb *RingBuffer) Drain() []events.Event
Drain removes all events from the buffer and returns them
func (*RingBuffer) GetMetrics ¶
func (rb *RingBuffer) GetMetrics() RingBufferMetrics
GetMetrics returns current metrics
func (*RingBuffer) IsEmpty ¶
func (rb *RingBuffer) IsEmpty() bool
IsEmpty returns true if the buffer is empty
func (*RingBuffer) IsFull ¶
func (rb *RingBuffer) IsFull() bool
IsFull returns true if the buffer is full
func (*RingBuffer) Pop ¶
func (rb *RingBuffer) Pop() (events.Event, error)
Pop removes and returns the oldest event from the buffer
func (*RingBuffer) PopWithContext ¶
PopWithContext removes and returns the oldest event with context
func (*RingBuffer) Push ¶
func (rb *RingBuffer) Push(event events.Event) error
Push adds an event to the buffer
func (*RingBuffer) PushWithContext ¶
PushWithContext adds an event to the buffer with context
func (*RingBuffer) Size ¶
func (rb *RingBuffer) Size() int
Size returns the current number of events in the buffer
type RingBufferConfig ¶
type RingBufferConfig struct {
Capacity int
OverflowPolicy OverflowPolicy
MaxCapacity int // For OverflowResize policy
ResizeFactor float64 // Multiplier for resize (default 1.5)
BlockTimeout time.Duration // Timeout for blocking operations
}
RingBufferConfig configures the ring buffer
func DefaultRingBufferConfig ¶
func DefaultRingBufferConfig() *RingBufferConfig
DefaultRingBufferConfig returns default configuration
type RingBufferMetrics ¶
type RingBufferMetrics struct {
CurrentSize int
Capacity int
TotalWrites uint64
TotalReads uint64
TotalDrops uint64
TotalOverflows uint64
ResizeCount uint64
LastWriteTime time.Time
LastReadTime time.Time
LastDropTime time.Time
LastResizeTime time.Time
AverageWriteTime time.Duration
AverageReadTime time.Duration
// contains filtered or unexported fields
}
RingBufferMetrics tracks ring buffer statistics
type RoundRobinLoadBalancer ¶
type RoundRobinLoadBalancer struct {
// contains filtered or unexported fields
}
RoundRobinLoadBalancer is a simple round-robin load balancer.
func NewRoundRobinLoadBalancer ¶
func NewRoundRobinLoadBalancer() *RoundRobinLoadBalancer
NewRoundRobinLoadBalancer creates a new round-robin load balancer.
func (*RoundRobinLoadBalancer) Name ¶
func (lb *RoundRobinLoadBalancer) Name() string
Name returns the load balancer name.
func (*RoundRobinLoadBalancer) SelectTransport ¶
func (lb *RoundRobinLoadBalancer) SelectTransport(transports map[string]Transport, event TransportEvent) (string, error)
SelectTransport selects a transport using round-robin algorithm.
func (*RoundRobinLoadBalancer) UpdateStats ¶
func (lb *RoundRobinLoadBalancer) UpdateStats(transportName string, stats TransportStats)
UpdateStats updates the load balancer with transport statistics.
type ScenarioTransport ¶
type ScenarioTransport struct {
*AdvancedMockTransport
// contains filtered or unexported fields
}
ScenarioTransport provides pre-configured transport behaviors for common test scenarios
func NewScenarioTransport ¶
func NewScenarioTransport(scenario string) *ScenarioTransport
NewScenarioTransport creates a new scenario transport
type Schema ¶
type Schema struct {
// Type specifies the expected data type
Type SchemaType `json:"type,omitempty"`
// Title provides a human-readable title for the schema
Title string `json:"title,omitempty"`
// Description provides additional documentation
Description string `json:"description,omitempty"`
// Required specifies which properties are required (for object types)
Required []string `json:"required,omitempty"`
// Properties defines the schema for object properties
Properties map[string]*Schema `json:"properties,omitempty"`
// AdditionalProperties controls whether additional properties are allowed
AdditionalProperties interface{} `json:"additionalProperties,omitempty"`
// Items defines the schema for array items
Items *Schema `json:"items,omitempty"`
// MinItems specifies the minimum number of items in an array
MinItems *int `json:"minItems,omitempty"`
// MaxItems specifies the maximum number of items in an array
MaxItems *int `json:"maxItems,omitempty"`
// UniqueItems requires all items in an array to be unique
UniqueItems bool `json:"uniqueItems,omitempty"`
// MinLength specifies the minimum length for strings
MinLength *int `json:"minLength,omitempty"`
// MaxLength specifies the maximum length for strings
MaxLength *int `json:"maxLength,omitempty"`
// Pattern specifies a regex pattern for string validation
Pattern string `json:"pattern,omitempty"`
// Format specifies a semantic format for string validation
Format string `json:"format,omitempty"`
// Minimum specifies the minimum value for numbers
Minimum *float64 `json:"minimum,omitempty"`
// Maximum specifies the maximum value for numbers
Maximum *float64 `json:"maximum,omitempty"`
// ExclusiveMinimum specifies whether the minimum is exclusive
ExclusiveMinimum bool `json:"exclusiveMinimum,omitempty"`
// ExclusiveMaximum specifies whether the maximum is exclusive
ExclusiveMaximum bool `json:"exclusiveMaximum,omitempty"`
// MultipleOf specifies that a number must be a multiple of this value
MultipleOf *float64 `json:"multipleOf,omitempty"`
// Enum specifies a list of valid values
Enum []interface{} `json:"enum,omitempty"`
// Const specifies a single valid value
Const interface{} `json:"const,omitempty"`
// AllOf requires the value to be valid against all schemas in the array
AllOf []*Schema `json:"allOf,omitempty"`
// AnyOf requires the value to be valid against any schema in the array
AnyOf []*Schema `json:"anyOf,omitempty"`
// OneOf requires the value to be valid against exactly one schema in the array
OneOf []*Schema `json:"oneOf,omitempty"`
// Not requires the value to NOT be valid against the schema
Not *Schema `json:"not,omitempty"`
// Default provides a default value
Default interface{} `json:"default,omitempty"`
// Examples provides example values
Examples []interface{} `json:"examples,omitempty"`
// Custom validation function
CustomValidator func(interface{}) error `json:"-"`
// Version specifies the schema version for migration support
Version string `json:"version,omitempty"`
// Deprecated marks the schema as deprecated
Deprecated bool `json:"deprecated,omitempty"`
// ID provides a unique identifier for the schema
ID string `json:"$id,omitempty"`
// Ref allows referencing other schemas
Ref string `json:"$ref,omitempty"`
// Definitions allows defining reusable schema components
Definitions map[string]*Schema `json:"definitions,omitempty"`
}
Schema represents a validation schema
func NewArraySchema ¶
NewArraySchema creates a new array schema
func NewSchema ¶
func NewSchema(schemaType SchemaType) *Schema
NewSchema creates a new schema with the specified type
func (*Schema) AddDefinition ¶
AddDefinition adds a reusable schema definition
func (*Schema) AddProperty ¶
AddProperty adds a property to an object schema
func (*Schema) SetDefault ¶
SetDefault sets a default value
func (*Schema) SetDescription ¶
SetDescription sets the schema description
func (*Schema) SetLengthRange ¶
SetLengthRange sets min/max length for strings or arrays
func (*Schema) SetNumberRange ¶
SetNumberRange sets min/max values for numbers
func (*Schema) SetPattern ¶
SetPattern sets a regex pattern for string validation
func (*Schema) SetRequired ¶
SetRequired marks a property as required
type SchemaComposer ¶
type SchemaComposer struct {
// contains filtered or unexported fields
}
SchemaComposer helps compose complex schemas from simpler ones
func NewSchemaComposer ¶
func NewSchemaComposer(baseSchema *Schema) *SchemaComposer
NewSchemaComposer creates a new schema composer
func (*SchemaComposer) AddMixin ¶
func (c *SchemaComposer) AddMixin(mixin *Schema) *SchemaComposer
AddMixin adds a schema mixin
func (*SchemaComposer) Compose ¶
func (c *SchemaComposer) Compose() *Schema
Compose creates the final composed schema
func (*SchemaComposer) SetOverrides ¶
func (c *SchemaComposer) SetOverrides(overrides *Schema) *SchemaComposer
SetOverrides sets schema overrides
type SchemaMigrator ¶
type SchemaMigrator struct {
// contains filtered or unexported fields
}
SchemaMigrator handles schema version migrations
func NewSchemaMigrator ¶
func NewSchemaMigrator() *SchemaMigrator
NewSchemaMigrator creates a new schema migrator
func (*SchemaMigrator) MigrateAndValidate ¶
func (m *SchemaMigrator) MigrateAndValidate(data interface{}, currentVersion, targetVersion string) ValidationResult
MigrateAndValidate migrates data to the target version and validates it
func (*SchemaMigrator) RegisterMigration ¶
func (m *SchemaMigrator) RegisterMigration(fromVersion, toVersion string, migrator func(interface{}) (interface{}, error))
RegisterMigration registers a migration function
func (*SchemaMigrator) RegisterSchema ¶
func (m *SchemaMigrator) RegisterSchema(version string, schema *VersionedSchema)
RegisterSchema registers a versioned schema
type SchemaType ¶
type SchemaType string
SchemaType represents the type of a schema property
const ( SchemaTypeString SchemaType = "string" SchemaTypeNumber SchemaType = "number" SchemaTypeInteger SchemaType = "integer" SchemaTypeBoolean SchemaType = "boolean" SchemaTypeArray SchemaType = "array" SchemaTypeObject SchemaType = "object" SchemaTypeNull SchemaType = "null" SchemaTypeAny SchemaType = "any" )
type SchemaValidationOptions ¶
type SchemaValidationOptions struct {
// ValidateFormats enables format validation for string types
ValidateFormats bool
// StrictTypes requires exact type matches
StrictTypes bool
// AllowCoercion enables type coercion (e.g., string "123" to number 123)
AllowCoercion bool
// CollectAllErrors continues validation even after finding errors
CollectAllErrors bool
// MaxErrorCount limits the number of errors to collect
MaxErrorCount int
// ValidateDefaults validates default values against their schemas
ValidateDefaults bool
// ReportDeprecated reports usage of deprecated schema elements
ReportDeprecated bool
}
SchemaValidationOptions configures schema validation behavior
type SchemaValidator ¶
type SchemaValidator struct {
// contains filtered or unexported fields
}
SchemaValidator validates data against JSON-like schemas
func NewSchemaValidator ¶
func NewSchemaValidator(name string, schema *Schema) *SchemaValidator
NewSchemaValidator creates a new schema validator
func (*SchemaValidator) AddCustomValidator ¶
func (v *SchemaValidator) AddCustomValidator(name string, validator func(interface{}) error) *SchemaValidator
AddCustomValidator adds a custom validation function
func (*SchemaValidator) AddFormatValidator ¶
func (v *SchemaValidator) AddFormatValidator(format string, validator func(string) bool) *SchemaValidator
AddFormatValidator adds a custom format validator
func (*SchemaValidator) IsEnabled ¶
func (v *SchemaValidator) IsEnabled() bool
IsEnabled returns whether the validator is enabled
func (*SchemaValidator) Name ¶
func (v *SchemaValidator) Name() string
Name returns the validator name
func (*SchemaValidator) Priority ¶
func (v *SchemaValidator) Priority() int
Priority returns the validator priority
func (*SchemaValidator) RegisterSchema ¶
func (v *SchemaValidator) RegisterSchema(id string, schema *Schema) *SchemaValidator
RegisterSchema registers a schema in the registry for reference resolution
func (*SchemaValidator) SetEnabled ¶
func (v *SchemaValidator) SetEnabled(enabled bool) *SchemaValidator
SetEnabled enables or disables the validator
func (*SchemaValidator) SetOptions ¶
func (v *SchemaValidator) SetOptions(options SchemaValidationOptions) *SchemaValidator
SetOptions sets validation options
func (*SchemaValidator) SetPriority ¶
func (v *SchemaValidator) SetPriority(priority int) *SchemaValidator
SetPriority sets the validator priority
func (*SchemaValidator) SetSchema ¶
func (v *SchemaValidator) SetSchema(schema *Schema) *SchemaValidator
SetSchema updates the schema
func (*SchemaValidator) SetStrictMode ¶
func (v *SchemaValidator) SetStrictMode(strict bool) *SchemaValidator
SetStrictMode enables or disables strict validation mode
func (*SchemaValidator) Validate ¶
func (v *SchemaValidator) Validate(ctx context.Context, value interface{}) ValidationResult
Validate validates a value against the schema
type SecurityContext ¶
type SecurityContext struct {
// AuthenticationMethod used
AuthenticationMethod string `json:"authentication_method,omitempty"`
// TokenType (bearer, api_key, etc.)
TokenType string `json:"token_type,omitempty"`
// TokenHash hashed token for audit purposes
TokenHash string `json:"token_hash,omitempty"`
// ClientCertificateHash if using client certificates
ClientCertificateHash string `json:"client_certificate_hash,omitempty"`
// TLSVersion used for the connection
TLSVersion string `json:"tls_version,omitempty"`
// CipherSuite used
CipherSuite string `json:"cipher_suite,omitempty"`
// IsEncrypted indicates if the communication is encrypted
IsEncrypted bool `json:"is_encrypted"`
// SecurityHeaders relevant security headers
SecurityHeaders map[string]string `json:"security_headers,omitempty"`
}
SecurityContext represents security-related context
type SecurityEventData ¶
type SecurityEventData struct {
// Security event details
EventType SecurityEventType `json:"event_type" validate:"required"`
Severity SecuritySeverity `json:"severity" validate:"required"`
Actor string `json:"actor,omitempty"`
Target string `json:"target,omitempty"`
Resource string `json:"resource,omitempty"`
// Authentication/Authorization
UserID string `json:"user_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
Permissions []string `json:"permissions,omitempty"`
// Network context
SourceIP string `json:"source_ip,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
RequestID string `json:"request_id,omitempty"`
// Security details
ThreatLevel ThreatLevel `json:"threat_level"`
Blocked bool `json:"blocked"`
Automatic bool `json:"automatic"`
Context map[string]string `json:"context,omitempty"`
}
SecurityEventData represents security-related events
func (*SecurityEventData) ToMap ¶
func (s *SecurityEventData) ToMap() map[string]interface{}
func (*SecurityEventData) Validate ¶
func (s *SecurityEventData) Validate() error
type SecurityEventType ¶
type SecurityEventType string
const ( SecurityEventLogin SecurityEventType = "login" SecurityEventLogout SecurityEventType = "logout" SecurityEventAccess SecurityEventType = "access" SecurityEventPermission SecurityEventType = "permission" SecurityEventThreat SecurityEventType = "threat" SecurityEventViolation SecurityEventType = "violation" SecurityEventAudit SecurityEventType = "audit" )
type SecuritySeverity ¶
type SecuritySeverity string
const ( SecuritySeverityInfo SecuritySeverity = "info" SecuritySeverityWarning SecuritySeverity = "warning" SecuritySeverityError SecuritySeverity = "error" SecuritySeverityCritical SecuritySeverity = "critical" )
type Sender ¶
type Sender interface {
// Send sends an event to the remote endpoint.
Send(ctx context.Context, event TransportEvent) error
}
Sender handles sending events
type SequencedEvent ¶
type SequencedEvent[T EventData] struct { // SequenceID identifies the sequence this event belongs to SequenceID string `json:"sequence_id"` // SequenceNumber is the position in the sequence (1-based) SequenceNumber uint64 `json:"sequence_number"` // TotalInSequence is the total number of events in this sequence (if known) TotalInSequence *uint64 `json:"total_in_sequence,omitempty"` // Event is the wrapped event data Event TypedTransportEvent[T] `json:"event"` // PreviousSequenceNumber for gap detection PreviousSequenceNumber *uint64 `json:"previous_sequence_number,omitempty"` // NextExpectedSequenceNumber for ordering validation NextExpectedSequenceNumber *uint64 `json:"next_expected_sequence_number,omitempty"` // IsLast indicates if this is the last event in the sequence IsLast bool `json:"is_last"` // IsFirst indicates if this is the first event in the sequence IsFirst bool `json:"is_first"` // ChecksumPrevious for integrity checking with previous event ChecksumPrevious string `json:"checksum_previous,omitempty"` // ChecksumCurrent for integrity checking ChecksumCurrent string `json:"checksum_current"` // Dependencies lists sequence numbers this event depends on Dependencies []uint64 `json:"dependencies,omitempty"` // PartitionKey for partitioned sequences PartitionKey string `json:"partition_key,omitempty"` // OrderingKey for sub-sequence ordering OrderingKey string `json:"ordering_key,omitempty"` // Timeout for sequence completion Timeout time.Duration `json:"timeout,omitempty"` // CreatedAt when this sequenced event was created CreatedAt time.Time `json:"created_at"` }
SequencedEvent represents an event with sequence ordering
func (SequencedEvent[T]) ToMap ¶
func (s SequencedEvent[T]) ToMap() map[string]interface{}
ToMap converts the sequenced event data to a map for backward compatibility
func (SequencedEvent[T]) Validate ¶
func (s SequencedEvent[T]) Validate() error
Validate ensures the sequenced event data is valid
type Serializer ¶
type Serializer interface {
// Serialize converts an event to bytes for transport.
Serialize(event any) ([]byte, error)
// Deserialize converts bytes back to an event.
Deserialize(data []byte) (any, error)
// ContentType returns the content type for the serialized data.
ContentType() string
// SupportedTypes returns the types that this serializer can handle.
SupportedTypes() []string
}
Serializer handles serialization and deserialization of events for transport.
type SimpleManager ¶
type SimpleManager struct {
// contains filtered or unexported fields
}
SimpleManager provides basic transport management without import cycles
func NewSimpleManager ¶
func NewSimpleManager() *SimpleManager
NewSimpleManager creates a new simple transport manager
func NewSimpleManagerWithBackpressure ¶
func NewSimpleManagerWithBackpressure(backpressureConfig BackpressureConfig) *SimpleManager
NewSimpleManagerWithBackpressure creates a new simple transport manager with backpressure configuration
func NewSimpleManagerWithValidation ¶
func NewSimpleManagerWithValidation(backpressureConfig BackpressureConfig, validationConfig *ValidationConfig) *SimpleManager
NewSimpleManagerWithValidation creates a new simple transport manager with validation
func (*SimpleManager) Channels ¶
func (m *SimpleManager) Channels() (<-chan events.Event, <-chan error)
Channels returns both event and error channels together
func (*SimpleManager) Errors ¶
func (m *SimpleManager) Errors() <-chan error
Errors returns the error channel
func (*SimpleManager) GetBackpressureMetrics ¶
func (m *SimpleManager) GetBackpressureMetrics() BackpressureMetrics
GetBackpressureMetrics returns the current backpressure metrics
func (*SimpleManager) GetValidationConfig ¶
func (m *SimpleManager) GetValidationConfig() *ValidationConfig
GetValidationConfig returns the current validation configuration
func (*SimpleManager) GetValidationState ¶
func (m *SimpleManager) GetValidationState() (*ValidationConfig, bool)
GetValidationState returns both the validation config and enabled state atomically
func (*SimpleManager) IsRunning ¶
func (m *SimpleManager) IsRunning() bool
IsRunning returns true if the manager is currently running This method uses atomic operations for thread-safe access
func (*SimpleManager) IsValidationEnabled ¶
func (m *SimpleManager) IsValidationEnabled() bool
IsValidationEnabled returns whether validation is enabled
func (*SimpleManager) Receive ¶
func (m *SimpleManager) Receive() <-chan events.Event
Receive returns the event channel
func (*SimpleManager) Send ¶
func (m *SimpleManager) Send(ctx context.Context, event TransportEvent) error
Send sends an event
func (*SimpleManager) SetTransport ¶
func (m *SimpleManager) SetTransport(transport Transport)
SetTransport sets the active transport
func (*SimpleManager) SetValidationConfig ¶
func (m *SimpleManager) SetValidationConfig(config *ValidationConfig)
SetValidationConfig sets the validation configuration
func (*SimpleManager) SetValidationEnabled ¶
func (m *SimpleManager) SetValidationEnabled(enabled bool)
SetValidationEnabled enables or disables validation
type SimpleTransportEvent ¶
type SimpleTransportEvent struct {
EventID string
EventType string
EventTimestamp time.Time
EventData map[string]interface{}
}
SimpleTransportEvent is a basic implementation of TransportEvent
func (*SimpleTransportEvent) Data ¶
func (e *SimpleTransportEvent) Data() map[string]interface{}
Data returns the event data
func (*SimpleTransportEvent) ID ¶
func (e *SimpleTransportEvent) ID() string
ID returns the event ID
func (*SimpleTransportEvent) Timestamp ¶
func (e *SimpleTransportEvent) Timestamp() time.Time
Timestamp returns the event timestamp
func (*SimpleTransportEvent) Type ¶
func (e *SimpleTransportEvent) Type() string
Type returns the event type
type Slice ¶
type Slice struct {
// contains filtered or unexported fields
}
Slice is a thread-safe slice implementation
func (*Slice) RemoveFunc ¶
RemoveFunc removes the first item that matches the predicate
type SliceOps ¶
type SliceOps struct{}
Zero-allocation slice operations
func (SliceOps) AppendInt ¶
AppendInt appends an int to a slice without allocation if capacity allows
func (SliceOps) AppendString ¶
AppendString appends a string to a slice without allocation if capacity allows
type SlicePool ¶
type SlicePool[T any] struct { // contains filtered or unexported fields }
SlicePool manages pools of slices with different capacities
func NewSlicePool ¶
NewSlicePool creates a new slice pool for the given type
type SliceStats ¶
type SliceStats struct {
TotalAllocations int64
TotalDeallocations int64
PeakUsage int64
CurrentUsage int64
// contains filtered or unexported fields
}
SliceStats provides statistics about slice usage
func (*SliceStats) GetStats ¶
func (s *SliceStats) GetStats() (total, peak, current int64)
GetStats returns current statistics
func (*SliceStats) RecordAllocation ¶
func (s *SliceStats) RecordAllocation(size int)
RecordAllocation records a slice allocation
func (*SliceStats) RecordDeallocation ¶
func (s *SliceStats) RecordDeallocation(size int)
RecordDeallocation records a slice deallocation
type SliceValidator ¶
type SliceValidator[T any] struct { // contains filtered or unexported fields }
SliceValidator provides type-safe validation for slices with element rules
func CreateIntSliceValidator ¶
func CreateIntSliceValidator(name string, elementValidator TypedValidator[int]) *SliceValidator[int]
CreateIntSliceValidator creates a slice validator for integers
func NewSliceValidator ¶
func NewSliceValidator[T any](name string, elementValidator TypedValidator[T]) *SliceValidator[T]
NewSliceValidator creates a new slice validator
func (*SliceValidator[T]) IsEnabled ¶
func (v *SliceValidator[T]) IsEnabled() bool
IsEnabled returns whether the validator is enabled
func (*SliceValidator[T]) Name ¶
func (v *SliceValidator[T]) Name() string
Name returns the validator name
func (*SliceValidator[T]) Priority ¶
func (v *SliceValidator[T]) Priority() int
Priority returns the validator priority
func (*SliceValidator[T]) SetCondition ¶
func (v *SliceValidator[T]) SetCondition(condition func([]T) bool) *SliceValidator[T]
SetCondition sets a condition that must be met for validation to apply
func (*SliceValidator[T]) SetEnabled ¶
func (v *SliceValidator[T]) SetEnabled(enabled bool) *SliceValidator[T]
SetEnabled enables or disables the validator
func (*SliceValidator[T]) SetLengthRange ¶
func (v *SliceValidator[T]) SetLengthRange(min, max *int) *SliceValidator[T]
SetLengthRange sets the allowed length range for the slice
func (*SliceValidator[T]) SetPriority ¶
func (v *SliceValidator[T]) SetPriority(priority int) *SliceValidator[T]
SetPriority sets the validator priority
func (*SliceValidator[T]) Validate ¶
func (v *SliceValidator[T]) Validate(ctx context.Context, value []T) ValidationResult
Validate validates a slice value
type StateChangeEventData ¶
type StateChangeEventData struct {
// State transition info
FromState string `json:"from_state" validate:"required"`
ToState string `json:"to_state" validate:"required"`
Reason string `json:"reason,omitempty"`
Trigger string `json:"trigger,omitempty"`
// Context information
EntityID string `json:"entity_id" validate:"required"`
EntityType string `json:"entity_type" validate:"required"`
Context map[string]string `json:"context,omitempty"`
// Transition metadata
Duration time.Duration `json:"duration,omitempty"`
Automatic bool `json:"automatic"`
Rollback bool `json:"rollback"`
Version string `json:"version,omitempty"`
}
StateChangeEventData represents state transition events
func (*StateChangeEventData) ToMap ¶
func (s *StateChangeEventData) ToMap() map[string]interface{}
func (*StateChangeEventData) Validate ¶
func (s *StateChangeEventData) Validate() error
type StatsProvider ¶
type StatsProvider interface {
// Stats returns transport statistics and metrics.
Stats() TransportStats
}
StatsProvider provides statistics access
type StreamController ¶
type StreamController interface {
// StartStreaming begins streaming events in both directions.
StartStreaming(ctx context.Context) (send chan<- TransportEvent, receive <-chan events.Event, errors <-chan error, err error)
}
StreamController controls streaming operations
type StreamEventData ¶
type StreamEventData struct {
// StreamID is the unique identifier for the stream
StreamID string `json:"stream_id"`
// Action indicates the stream action (create, close, reset, etc.)
Action string `json:"action"`
// Direction indicates data flow direction (inbound, outbound, bidirectional)
Direction string `json:"direction,omitempty"`
// Priority for stream prioritization
Priority int `json:"priority,omitempty"`
// WindowSize for flow control
WindowSize uint32 `json:"window_size,omitempty"`
// State indicates the current stream state
State string `json:"state,omitempty"`
// Reason provides additional context for the action
Reason string `json:"reason,omitempty"`
// Headers contains stream-specific headers
Headers map[string]string `json:"headers,omitempty"`
}
StreamEventData represents stream-related event data
func (StreamEventData) ToMap ¶
func (s StreamEventData) ToMap() map[string]interface{}
ToMap converts the stream event data to a map for backward compatibility
func (StreamEventData) Validate ¶
func (s StreamEventData) Validate() error
Validate ensures the stream event data is valid
type StreamEventOption ¶
type StreamEventOption func(*StreamEventData)
StreamEventOption is a functional option for configuring StreamEventData
func WithDirection ¶
func WithDirection(direction string) StreamEventOption
WithDirection sets the stream direction
func WithHeaders ¶
func WithHeaders(headers map[string]string) StreamEventOption
WithHeaders sets stream-specific headers
func WithPriority ¶
func WithPriority(priority int) StreamEventOption
WithPriority sets the stream priority
func WithReason ¶
func WithReason(reason string) StreamEventOption
WithReason sets the reason for the stream action
func WithWindowSize ¶
func WithWindowSize(size uint32) StreamEventOption
WithWindowSize sets the flow control window size
type StreamingStats ¶
type StreamingStats struct {
TransportStats
// Streaming-specific metrics
StreamsActive int `json:"streams_active"`
StreamsTotal int `json:"streams_total"`
BufferUtilization float64 `json:"buffer_utilization"`
BackpressureEvents int64 `json:"backpressure_events"`
DroppedEvents int64 `json:"dropped_events"`
AverageEventSize int64 `json:"average_event_size"`
ThroughputEventsPerSec float64 `json:"throughput_events_per_sec"`
ThroughputBytesPerSec float64 `json:"throughput_bytes_per_sec"`
}
StreamingStats contains streaming-specific statistics.
type StreamingStatsProvider ¶
type StreamingStatsProvider interface {
// GetStreamingStats returns streaming-specific statistics.
GetStreamingStats() StreamingStats
}
StreamingStatsProvider provides streaming-specific statistics
type StreamingTransport ¶
type StreamingTransport interface {
Transport
BatchSender
EventHandlerProvider
StreamController
StreamingStatsProvider
}
StreamingTransport extends Transport with streaming-specific capabilities for real-time bidirectional communication.
type StringValue ¶
type StringValue struct{ Value string }
StringValue wraps a string value
func (StringValue) ErrorString ¶
func (v StringValue) ErrorString() string
ErrorString implementations for common types
type StructValidator ¶
type StructValidator[T any] struct { // contains filtered or unexported fields }
StructValidator provides type-safe validation for structs with constraints
func CreateStringStructValidator ¶
func CreateStringStructValidator(name string) *StructValidator[map[string]string]
CreateStringStructValidator creates a struct validator for string fields
func NewStructValidator ¶
func NewStructValidator[T any](name string) *StructValidator[T]
NewStructValidator creates a new struct validator
func (*StructValidator[T]) AddFieldValidator ¶
func (v *StructValidator[T]) AddFieldValidator(field string, validator TypedValidator[any]) *StructValidator[T]
AddFieldValidator adds a validator for a specific field
func (*StructValidator[T]) IsEnabled ¶
func (v *StructValidator[T]) IsEnabled() bool
IsEnabled returns whether the validator is enabled
func (*StructValidator[T]) Name ¶
func (v *StructValidator[T]) Name() string
Name returns the validator name
func (*StructValidator[T]) Priority ¶
func (v *StructValidator[T]) Priority() int
Priority returns the validator priority
func (*StructValidator[T]) SetCondition ¶
func (v *StructValidator[T]) SetCondition(condition func(T) bool) *StructValidator[T]
SetCondition sets a condition that must be met for validation to apply
func (*StructValidator[T]) SetEnabled ¶
func (v *StructValidator[T]) SetEnabled(enabled bool) *StructValidator[T]
SetEnabled enables or disables the validator
func (*StructValidator[T]) SetPriority ¶
func (v *StructValidator[T]) SetPriority(priority int) *StructValidator[T]
SetPriority sets the validator priority
func (*StructValidator[T]) SetRequired ¶
func (v *StructValidator[T]) SetRequired(field string, required bool) *StructValidator[T]
SetRequired marks a field as required
func (*StructValidator[T]) Validate ¶
func (v *StructValidator[T]) Validate(ctx context.Context, value T) ValidationResult
Validate validates a struct value
type SystemEventData ¶
type SystemEventData struct {
// System event details
EventType SystemEventType `json:"event_type" validate:"required"`
Component string `json:"component" validate:"required"`
Instance string `json:"instance,omitempty"`
Version string `json:"version,omitempty"`
// System state
PreviousState string `json:"previous_state,omitempty"`
CurrentState string `json:"current_state" validate:"required"`
TargetState string `json:"target_state,omitempty"`
// Resource information
Resources ResourceUsage `json:"resources,omitempty"`
Dependencies []string `json:"dependencies,omitempty"`
// Event context
Initiated bool `json:"initiated"`
Automated bool `json:"automated"`
Reason string `json:"reason,omitempty"`
Context map[string]string `json:"context,omitempty"`
}
SystemEventData represents system lifecycle events
func (*SystemEventData) ToMap ¶
func (s *SystemEventData) ToMap() map[string]interface{}
func (*SystemEventData) Validate ¶
func (s *SystemEventData) Validate() error
type SystemEventType ¶
type SystemEventType string
const ( SystemEventStartup SystemEventType = "startup" SystemEventShutdown SystemEventType = "shutdown" SystemEventRestart SystemEventType = "restart" SystemEventDeploy SystemEventType = "deploy" SystemEventScale SystemEventType = "scale" SystemEventMigration SystemEventType = "migration" SystemEventFailover SystemEventType = "failover" SystemEventRecovery SystemEventType = "recovery" )
type TaskMetrics ¶
type TaskMetrics struct {
Runs uint64
ItemsCleaned uint64
Errors uint64
LastRun time.Time
LastDuration time.Duration
AverageDuration time.Duration
}
TaskMetrics tracks per-task metrics
type TestConfig ¶
type TestConfig struct {
DefaultTimeout time.Duration
EventChannelSize int
ErrorChannelSize int
MaxMessageSize int
}
TestConfig provides common test configurations
func DefaultTestConfig ¶
func DefaultTestConfig() TestConfig
DefaultTestConfig returns the default test configuration
type TestEvent ¶
type TestEvent struct {
// contains filtered or unexported fields
}
TestEvent is a simple implementation of TransportEvent for testing
func NewTestEvent ¶
NewTestEvent creates a new test event
func NewTestEventWithData ¶
NewTestEventWithData creates a new test event with data
type TestFixture ¶
type TestFixture struct {
Transport *MockTransport
Manager *MockManager
Handler *MockEventHandler
Config TestConfig
Ctx context.Context
Cancel context.CancelFunc
}
TestFixture encapsulates common test setup
func NewTestFixture ¶
func NewTestFixture(t *testing.T) *TestFixture
NewTestFixture creates a new test fixture
func (*TestFixture) ConnectTransport ¶
func (f *TestFixture) ConnectTransport(t *testing.T)
ConnectTransport connects the transport with error handling
func (*TestFixture) SendEvent ¶
func (f *TestFixture) SendEvent(t *testing.T, event TransportEvent)
SendEvent sends an event through the transport
func (*TestFixture) StartManager ¶
func (f *TestFixture) StartManager(t *testing.T)
StartManager starts the manager with error handling
type TestManagerHelper ¶
type TestManagerHelper struct {
// contains filtered or unexported fields
}
TestManagerHelper provides isolated test manager instances with proper cleanup
func NewTestManagerHelper ¶
func NewTestManagerHelper(t *testing.T) *TestManagerHelper
NewTestManagerHelper creates a new test manager helper for isolated testing
func (*TestManagerHelper) Cleanup ¶
func (h *TestManagerHelper) Cleanup()
Cleanup performs comprehensive cleanup of all test resources
func (*TestManagerHelper) CreateAdvancedTransport ¶
func (h *TestManagerHelper) CreateAdvancedTransport() *AdvancedMockTransport
CreateAdvancedTransport creates a new advanced mock transport with proper tracking
func (*TestManagerHelper) CreateManager ¶
func (h *TestManagerHelper) CreateManager() *SimpleManager
CreateManager creates a new SimpleManager with proper tracking for cleanup
func (*TestManagerHelper) CreateManagerWithBackpressure ¶
func (h *TestManagerHelper) CreateManagerWithBackpressure(config BackpressureConfig) *SimpleManager
CreateManagerWithBackpressure creates a manager with backpressure config
func (*TestManagerHelper) CreateTransport ¶
func (h *TestManagerHelper) CreateTransport() Transport
CreateTransport creates a new test transport with proper tracking for cleanup
func (*TestManagerHelper) RunWithTimeout ¶
func (h *TestManagerHelper) RunWithTimeout(timeout time.Duration, testFunc func()) bool
RunWithTimeout runs a test function with a timeout to prevent hanging
func (*TestManagerHelper) WaitForCompletion ¶
func (h *TestManagerHelper) WaitForCompletion(timeout time.Duration) bool
WaitForCompletion waits for operations to complete with a timeout
type ThreatLevel ¶
type ThreatLevel string
const ( ThreatLevelNone ThreatLevel = "none" ThreatLevelLow ThreatLevel = "low" ThreatLevelMedium ThreatLevel = "medium" ThreatLevelHigh ThreatLevel = "high" ThreatLevelCritical ThreatLevel = "critical" )
type TimedEvent ¶
type TimedEvent[T EventData] struct { // TimerID identifies the timer associated with this event TimerID string `json:"timer_id"` // Event is the wrapped event Event TypedTransportEvent[T] `json:"event"` // ScheduledAt when the event should be processed ScheduledAt time.Time `json:"scheduled_at"` // CreatedAt when the timed event was created CreatedAt time.Time `json:"created_at"` // ProcessedAt when the event was actually processed (nil if not yet processed) ProcessedAt *time.Time `json:"processed_at,omitempty"` // Delay intended delay from creation to processing Delay time.Duration `json:"delay"` // ActualDelay actual delay experienced ActualDelay time.Duration `json:"actual_delay,omitempty"` // MaxDelay maximum allowed delay before event expires MaxDelay time.Duration `json:"max_delay,omitempty"` // IsExpired indicates if the event has expired IsExpired bool `json:"is_expired"` // ExpiresAt when the event expires ExpiresAt *time.Time `json:"expires_at,omitempty"` // IsRecurring indicates if this is a recurring event IsRecurring bool `json:"is_recurring"` // RecurrencePattern for recurring events (cron-like syntax) RecurrencePattern string `json:"recurrence_pattern,omitempty"` // NextScheduledAt when the next occurrence is scheduled (for recurring events) NextScheduledAt *time.Time `json:"next_scheduled_at,omitempty"` // MaxOccurrences limits recurring events (0 = unlimited) MaxOccurrences int `json:"max_occurrences,omitempty"` // OccurrenceCount tracks how many times this event has occurred OccurrenceCount int `json:"occurrence_count"` // TimeZone for schedule calculations TimeZone string `json:"time_zone,omitempty"` // Priority for timing queue ordering Priority int `json:"priority,omitempty"` // OnExpiry action to take when event expires (discard, log, alert, etc.) OnExpiry string `json:"on_expiry,omitempty"` }
TimedEvent represents an event with timing constraints
func (TimedEvent[T]) ToMap ¶
func (t TimedEvent[T]) ToMap() map[string]interface{}
ToMap converts the timed event data to a map for backward compatibility
func (TimedEvent[T]) Validate ¶
func (t TimedEvent[T]) Validate() error
Validate ensures the timed event data is valid
type TokenUsage ¶
type TrackedResource ¶
type TrackedResource struct {
ID string
Type ResourceType
Description string
CreatedAt time.Time
CleanedAt time.Time
Cleaned bool
CleanupFunc func() error
Dependencies []string // IDs of resources that must be cleaned first
StackTrace string // For debugging resource leaks
}
TrackedResource represents a tracked resource
type Transport ¶
type Transport interface {
// Core transport functionality
TransportConnection
Sender
Receiver
ConfigProvider
StatsProvider
}
Transport represents the core transport interface for bidirectional communication with agents and front-end applications in the AG-UI system. Composed of focused interfaces following Interface Segregation Principle
type TransportAggregatedStatsProvider ¶
type TransportAggregatedStatsProvider interface {
// GetStats returns aggregated statistics from all transports.
GetStats() map[string]TransportStats
}
TransportAggregatedStatsProvider provides aggregated transport statistics
type TransportConfiguration ¶
type TransportConfiguration = ConfigProvider
TransportConfiguration provides configuration access Deprecated: Use ConfigProvider instead for consistency
type TransportConnection ¶
type TransportConnection interface {
// Connect establishes a connection to the remote endpoint.
// Returns an error if the connection cannot be established.
Connect(ctx context.Context) error
// Close closes the transport and releases any associated resources.
// This method should be idempotent and safe to call multiple times.
Close(ctx context.Context) error
// IsConnected returns true if the transport is currently connected.
IsConnected() bool
}
TransportConnection handles connection operations
type TransportError ¶
type TransportError struct {
// Transport is the name of the transport that generated the error
Transport string
// Op is the operation that caused the error
Op string
// Err is the underlying error
Err error
// Temporary indicates if the error is temporary and may be retried
Temporary bool
// Retryable indicates if the operation can be retried
Retryable bool
// contains filtered or unexported fields
}
TransportError represents a transport-specific error with additional context
func NewTemporaryError ¶
func NewTemporaryError(transport, op string, err error) *TransportError
NewTemporaryError creates a new temporary TransportError
func NewTransportError ¶
func NewTransportError(transport, op string, err error) *TransportError
NewTransportError creates a new TransportError
func (*TransportError) Error ¶
func (e *TransportError) Error() string
Error implements the error interface
func (*TransportError) IsRetryable ¶
func (e *TransportError) IsRetryable() bool
IsRetryable returns whether the operation can be retried
func (*TransportError) IsTemporary ¶
func (e *TransportError) IsTemporary() bool
IsTemporary returns whether the error is temporary
func (*TransportError) SetRetryable ¶
func (e *TransportError) SetRetryable(retryable bool)
SetRetryable sets whether the operation can be retried (thread-safe)
func (*TransportError) SetTemporary ¶
func (e *TransportError) SetTemporary(temporary bool)
SetTemporary sets whether the error is temporary (thread-safe)
func (*TransportError) Unwrap ¶
func (e *TransportError) Unwrap() error
Unwrap returns the underlying error
type TransportEvent ¶
type TransportEvent interface {
// ID returns the unique identifier for this event
ID() string
// Type returns the event type
Type() string
// Timestamp returns when the event was created
Timestamp() time.Time
// Data returns the event data as a map for backward compatibility
Data() map[string]interface{}
}
TransportEvent represents a transport event interface. This interface is implemented by various event types for compatibility.
func AdaptToLegacyEvent ¶
func AdaptToLegacyEvent[T EventData](typedEvent TypedTransportEvent[T]) TransportEvent
AdaptToLegacyEvent converts a TypedTransportEvent to a TransportEvent This allows typed events to be used with legacy interfaces
func GenerateTestEvents ¶
func GenerateTestEvents(count int, prefix string) []TransportEvent
GenerateTestEvents generates a slice of test events
func GenerateTestEventsWithDelay ¶
func GenerateTestEventsWithDelay(count int, prefix string, delay time.Duration) []TransportEvent
GenerateTestEventsWithDelay generates test events with a delay between each
type TransportEventAdapter ¶
type TransportEventAdapter struct {
// contains filtered or unexported fields
}
TransportEventAdapter wraps a TypedTransportEvent to implement the legacy TransportEvent interface
func NewTransportEventAdapter ¶
func NewTransportEventAdapter[T EventData](typedEvent TypedTransportEvent[T]) *TransportEventAdapter
NewTransportEventAdapter creates an adapter from any TypedTransportEvent to TransportEvent
func (*TransportEventAdapter) Data ¶
func (a *TransportEventAdapter) Data() map[string]interface{}
Data returns the event data as a map
func (*TransportEventAdapter) GetTypedEvent ¶
func (a *TransportEventAdapter) GetTypedEvent() (interface{}, bool)
GetTypedEvent attempts to retrieve the original typed event Returns the typed event and true if successful, nil and false otherwise
func (*TransportEventAdapter) ID ¶
func (a *TransportEventAdapter) ID() string
ID returns the unique identifier for this event
func (*TransportEventAdapter) Timestamp ¶
func (a *TransportEventAdapter) Timestamp() time.Time
Timestamp returns when the event was created
func (*TransportEventAdapter) Type ¶
func (a *TransportEventAdapter) Type() string
Type returns the event type
type TransportEventHandler ¶
type TransportEventHandler interface {
// SendEvent sends an event to the remote endpoint.
// The event parameter is type-erased to support multiple event types.
SendEvent(ctx context.Context, event any) error
// ReceiveEvents returns a channel that receives events from the remote endpoint.
// The channel is closed when the transport is closed or an error occurs.
// Events are type-erased to support multiple event types.
ReceiveEvents(ctx context.Context) (<-chan any, error)
}
TransportEventHandler handles event sending and receiving
type TransportEventImpl ¶
type TransportEventImpl struct {
Type TransportEventType `json:"type"`
Timestamp time.Time `json:"timestamp"`
Transport string `json:"transport"`
Data any `json:"data,omitempty"`
Error error `json:"error,omitempty"`
}
TransportEventImpl represents a transport-related event implementation.
func NewTransportErrorEventImpl ¶
func NewTransportErrorEventImpl(transport string, err error) *TransportEventImpl
NewTransportErrorEventImpl creates a new transport error event implementation.
func NewTransportEventImpl ¶
func NewTransportEventImpl(eventType TransportEventType, transport string, data any) *TransportEventImpl
NewTransportEventImpl creates a new transport event implementation.
type TransportEventReceiver ¶
type TransportEventReceiver interface {
// ReceiveEvents returns a channel that receives events from all transports.
ReceiveEvents(ctx context.Context) (<-chan any, error)
}
TransportEventReceiver handles event receiving from transports
type TransportEventSender ¶
type TransportEventSender interface {
// SendEvent sends an event using the best available transport.
SendEvent(ctx context.Context, event any) error
// SendEventToTransport sends an event to a specific transport.
SendEventToTransport(ctx context.Context, transportName string, event any) error
}
TransportEventSender handles event sending across transports
type TransportEventStruct ¶
type TransportEventStruct struct {
Type TransportEventType `json:"type"`
Timestamp time.Time `json:"timestamp"`
Transport string `json:"transport"`
Data any `json:"data,omitempty"`
Error error `json:"error,omitempty"`
}
TransportEventStruct represents a transport-related event implementation.
func NewTransportErrorEvent ¶
func NewTransportErrorEvent(transport string, err error) *TransportEventStruct
NewTransportErrorEvent creates a new transport error event.
func NewTransportEvent ¶
func NewTransportEvent(eventType TransportEventType, transport string, data any) *TransportEventStruct
NewTransportEvent creates a new transport event.
type TransportEventType ¶
type TransportEventType string
TransportEventType represents different types of transport events.
const ( // EventTypeConnected indicates a successful connection. EventTypeConnected TransportEventType = "connected" // EventTypeDisconnected indicates a disconnection. EventTypeDisconnected TransportEventType = "disconnected" // EventTypeReconnecting indicates a reconnection attempt. EventTypeReconnecting TransportEventType = "reconnecting" // EventTypeError indicates an error occurred. EventTypeError TransportEventType = "error" // EventTypeEventSent indicates an event was sent. EventTypeEventSent TransportEventType = "event_sent" // EventTypeEventReceived indicates an event was received. EventTypeEventReceived TransportEventType = "event_received" // EventTypeStatsUpdated indicates transport statistics were updated. EventTypeStatsUpdated TransportEventType = "stats_updated" )
type TransportFactory ¶
type TransportFactory interface {
// Create creates a new transport instance with the given configuration.
Create(config Config) (Transport, error)
// CreateWithContext creates a new transport instance with context.
CreateWithContext(ctx context.Context, config Config) (Transport, error)
// SupportedTypes returns the transport types supported by this factory.
SupportedTypes() []string
// Name returns the factory name.
Name() string
// Version returns the factory version.
Version() string
}
TransportFactory creates transport instances based on configuration.
type TransportLifecycle ¶
type TransportLifecycle interface {
// Close closes all managed transports.
Close(ctx context.Context) error
}
TransportLifecycle manages transport lifecycle
type TransportLoadBalancerManager ¶
type TransportLoadBalancerManager interface {
// SetLoadBalancer sets the load balancing strategy.
SetLoadBalancer(balancer LoadBalancer)
}
TransportLoadBalancerManager manages load balancing configuration
type TransportManager ¶
type TransportManager interface {
TransportRegistryInterface
TransportEventSender
TransportEventReceiver
TransportLoadBalancerManager
TransportAggregatedStatsProvider
TransportLifecycle
}
TransportManager manages multiple transport instances and provides load balancing, failover, and connection pooling capabilities. Composed of focused interfaces following Interface Segregation Principle
type TransportManagerConfig ¶
type TransportManagerConfig struct {
// CleanupEnabled enables the periodic map cleanup mechanism
CleanupEnabled bool
// CleanupInterval specifies how often to run cleanup (default: 1 hour)
CleanupInterval time.Duration
// MaxMapSize is the threshold above which cleanup is triggered (default: 1000)
MaxMapSize int
// ActiveThreshold is the ratio below which cleanup occurs (default: 0.5)
// If activeTransports/totalTransports < ActiveThreshold, cleanup runs
ActiveThreshold float64
// CleanupMetricsEnabled enables detailed cleanup metrics
CleanupMetricsEnabled bool
}
TransportManagerConfig holds configuration for the transport manager cleanup mechanism
func DefaultTransportManagerConfig ¶
func DefaultTransportManagerConfig() *TransportManagerConfig
DefaultTransportManagerConfig returns default configuration for transport manager
type TransportMetrics ¶
type TransportMetrics struct {
// contains filtered or unexported fields
}
TransportMetrics tracks detailed transport metrics
func NewTransportMetrics ¶
func NewTransportMetrics() *TransportMetrics
NewTransportMetrics creates new transport metrics
func (*TransportMetrics) GetSummary ¶
func (m *TransportMetrics) GetSummary() map[string]interface{}
GetSummary returns a metrics summary
func (*TransportMetrics) RecordDroppedEvent ¶
func (m *TransportMetrics) RecordDroppedEvent()
RecordDroppedEvent records a dropped event
func (*TransportMetrics) RecordStateChange ¶
func (m *TransportMetrics) RecordStateChange(from, to ConnectionState)
RecordStateChange records a state transition
type TransportMigrator ¶
type TransportMigrator struct {
// contains filtered or unexported fields
}
TransportMigrator handles the migration of transport code
func NewTransportMigrator ¶
func NewTransportMigrator(config *MigrationConfig) *TransportMigrator
NewTransportMigrator creates a new migration tool instance
func (*TransportMigrator) AddRule ¶
func (tm *TransportMigrator) AddRule(rule MigrationRule)
AddRule adds a custom migration rule
func (*TransportMigrator) GenerateDeprecationAnnotations ¶
func (tm *TransportMigrator) GenerateDeprecationAnnotations() error
GenerateDeprecationAnnotations adds deprecation comments to the codebase
func (*TransportMigrator) Migrate ¶
func (tm *TransportMigrator) Migrate() (*MigrationReport, error)
Migrate performs the migration operation
type TransportMultiManager ¶
type TransportMultiManager interface {
TransportRegistryInterface
EventRouter
EventAggregator
LoadBalancerSetter
TransportAggregatedStatsProvider
// Close closes all managed transports.
Close(ctx context.Context) error
}
TransportMultiManager manages multiple transport instances This is an alternative composition that uses different interface names
type TransportRegistry ¶
type TransportRegistry interface {
// Register registers a transport factory for a specific type.
Register(transportType string, factory TransportFactory) error
// Unregister removes a transport factory for a specific type.
Unregister(transportType string) error
// Create creates a transport instance using the appropriate factory.
Create(config Config) (Transport, error)
// CreateWithContext creates a transport instance with context.
CreateWithContext(ctx context.Context, config Config) (Transport, error)
// GetFactory returns the factory for a specific transport type.
GetFactory(transportType string) (TransportFactory, error)
// GetRegisteredTypes returns all registered transport types.
GetRegisteredTypes() []string
// IsRegistered checks if a transport type is registered.
IsRegistered(transportType string) bool
// Clear removes all registered factories.
Clear()
}
TransportRegistry manages transport factories and provides transport creation services.
type TransportRegistryInterface ¶
type TransportRegistryInterface interface {
// AddTransport adds a transport to the manager.
AddTransport(name string, transport Transport) error
// RemoveTransport removes a transport from the manager.
RemoveTransport(name string) error
// GetTransport retrieves a transport by name.
GetTransport(name string) (Transport, error)
// GetActiveTransports returns all active transports.
GetActiveTransports() map[string]Transport
}
TransportRegistryInterface manages transport registration and retrieval
type TransportStatistics ¶
type TransportStatistics = StatsProvider
TransportStatistics provides statistics and metrics Deprecated: Use StatsProvider instead for consistency
type TransportStats ¶
type TransportStats struct {
// Connection statistics
ConnectedAt time.Time `json:"connected_at"`
ReconnectCount int `json:"reconnect_count"`
LastError error `json:"last_error,omitempty"`
Uptime time.Duration `json:"uptime"`
// Event statistics
EventsSent int64 `json:"events_sent"`
EventsReceived int64 `json:"events_received"`
BytesSent int64 `json:"bytes_sent"`
BytesReceived int64 `json:"bytes_received"`
AverageLatency time.Duration `json:"average_latency"`
ErrorCount int64 `json:"error_count"`
LastEventSentAt time.Time `json:"last_event_sent_at"`
LastEventRecvAt time.Time `json:"last_event_recv_at"`
}
TransportStats contains general transport statistics.
type TypeDoc ¶
type TypeDoc struct {
Name string `json:"name"`
Description string `json:"description"`
Kind string `json:"kind"` // struct, interface, alias, etc.
Fields []FieldDoc `json:"fields,omitempty"`
Methods []MethodDoc `json:"methods,omitempty"`
Examples []ExampleDoc `json:"examples"`
IsDeprecated bool `json:"is_deprecated"`
DeprecationInfo *DeprecationDoc `json:"deprecation_info,omitempty"`
}
TypeDoc documents a type
type TypedField ¶
TypedField represents a type-safe structured logging field
func SafeDuration ¶
func SafeFloat32 ¶
func SafeFloat32(key string, value float32) TypedField[float32]
func SafeFloat64 ¶
func SafeFloat64(key string, value float64) TypedField[float64]
func SafeString ¶
func SafeString(key, value string) TypedField[string]
New type-safe field constructors with SafeXxx naming
func SafeUint16 ¶
func SafeUint16(key string, value uint16) TypedField[uint16]
func SafeUint32 ¶
func SafeUint32(key string, value uint32) TypedField[uint32]
func SafeUint64 ¶
func SafeUint64(key string, value uint64) TypedField[uint64]
func TypedValue ¶
func TypedValue[T LogValue](key string, value T) TypedField[T]
Type-safe generic field constructor
func (TypedField[T]) ToField ¶
func (tf TypedField[T]) ToField() Field
ToField converts a TypedField to a legacy Field for backward compatibility
type TypedTransportEvent ¶
type TypedTransportEvent[T EventData] interface { // ID returns the unique identifier for this event ID() string // Type returns the event type Type() string // Timestamp returns when the event was created Timestamp() time.Time // TypedData returns the strongly-typed event data TypedData() T // Data returns the event data as a map for backward compatibility // Deprecated: Use TypedData() for type-safe access Data() map[string]interface{} }
TypedTransportEvent is a generic interface for type-safe transport events. It provides compile-time type safety for event data while maintaining backward compatibility with the existing TransportEvent interface.
func CreateConfigurationEvent ¶
func CreateConfigurationEvent(id string, data *ConfigurationEventData) TypedTransportEvent[*ConfigurationEventData]
CreateConfigurationEvent creates a new configuration change event
func CreateConnectionEvent ¶
func CreateConnectionEvent(id string, status string, options ...interface{}) TypedTransportEvent[ConnectionEventData]
CreateConnectionEvent creates a new connection event with the given options It supports both functional options and a builder function as the last parameter
func CreateConnectionEventWithBuilder ¶
func CreateConnectionEventWithBuilder(id string, status string, builder func(*ConnectionEventData)) TypedTransportEvent[ConnectionEventData]
Alternative syntax for CreateConnectionEvent that accepts a function to configure the data This provides flexibility for complex connection initialization
func CreateDataEvent ¶
func CreateDataEvent(id string, content []byte, options ...interface{}) TypedTransportEvent[DataEventData]
CreateDataEvent creates a new data event with the given content and options It supports both functional options and a builder function as the last parameter
func CreateDataEventWithBuilder ¶
func CreateDataEventWithBuilder(id string, content []byte, builder func(*DataEventData)) TypedTransportEvent[DataEventData]
Alternative syntax for CreateDataEvent that accepts a function to configure the data This provides flexibility for complex data initialization
func CreateErrorEvent ¶
func CreateErrorEvent(id string, message string, options ...interface{}) TypedTransportEvent[ErrorEventData]
CreateErrorEvent creates a new error event with the given message and options It supports both functional options and a builder function as the last parameter
func CreateMessageEvent ¶
func CreateMessageEvent(id string, data *MessageEventData) TypedTransportEvent[*MessageEventData]
CreateMessageEvent creates a new message event with the specified data
func CreateMetricsEvent ¶
func CreateMetricsEvent(id string, metricName string, value float64, options ...MetricsEventOption) TypedTransportEvent[MetricsEventData]
CreateMetricsEvent creates a new metrics event with the given name and value
func CreatePerformanceEvent ¶
func CreatePerformanceEvent(id string, data *PerformanceEventData) TypedTransportEvent[*PerformanceEventData]
CreatePerformanceEvent creates a new performance event
func CreateSecurityEvent ¶
func CreateSecurityEvent(id string, data *SecurityEventData) TypedTransportEvent[*SecurityEventData]
CreateSecurityEvent creates a new security event
func CreateStateChangeEvent ¶
func CreateStateChangeEvent(id string, data *StateChangeEventData) TypedTransportEvent[*StateChangeEventData]
CreateStateChangeEvent creates a new state change event
func CreateStreamEvent ¶
func CreateStreamEvent(id string, streamID string, action string, options ...StreamEventOption) TypedTransportEvent[StreamEventData]
CreateStreamEvent creates a new stream event with the given stream ID and action
func CreateSystemEvent ¶
func CreateSystemEvent(id string, data *SystemEventData) TypedTransportEvent[*SystemEventData]
CreateSystemEvent creates a new system event
func NewBatchEvent ¶
func NewBatchEvent[T EventData](id string, data BatchEvent[T]) TypedTransportEvent[BatchEvent[T]]
NewBatchEvent creates a new batch event
func NewConditionalEvent ¶
func NewConditionalEvent[T EventData](id string, data ConditionalEvent[T]) TypedTransportEvent[ConditionalEvent[T]]
NewConditionalEvent creates a new conditional event
func NewContextualEvent ¶
func NewContextualEvent[T EventData](id string, data ContextualEvent[T]) TypedTransportEvent[ContextualEvent[T]]
NewContextualEvent creates a new contextual event
func NewSequencedEvent ¶
func NewSequencedEvent[T EventData](id string, data SequencedEvent[T]) TypedTransportEvent[SequencedEvent[T]]
NewSequencedEvent creates a new sequenced event
func NewTimedEvent ¶
func NewTimedEvent[T EventData](id string, data TimedEvent[T]) TypedTransportEvent[TimedEvent[T]]
NewTimedEvent creates a new timed event
func NewTypedEvent ¶
func NewTypedEvent[T EventData](id, eventType string, data T) TypedTransportEvent[T]
NewTypedEvent creates a new typed transport event
func ToTypedEvent ¶
func ToTypedEvent[T EventData](event TransportEvent, constructor func(map[string]interface{}) (T, error)) (TypedTransportEvent[T], error)
ToTypedEvent attempts to convert a legacy TransportEvent to a TypedTransportEvent This is a convenience function for migration scenarios
func TryGetTypedEvent ¶
func TryGetTypedEvent[T EventData](event TransportEvent) TypedTransportEvent[T]
TryGetTypedEvent attempts to extract a typed event from a TransportEvent Returns nil if the event is not a typed event or doesn't match the expected type
type TypedValidator ¶
type TypedValidator[T any] interface { // Name returns the validator name Name() string // Validate validates a value and returns a validation result Validate(ctx context.Context, value T) ValidationResult // IsEnabled returns whether the validator is enabled IsEnabled() bool // Priority returns the validator priority Priority() int }
TypedValidator interface for type-safe validators
type URLValidator ¶
type URLValidator struct {
// contains filtered or unexported fields
}
URLValidator provides comprehensive URL validation with security checks
func NewSecureURLValidator ¶
func NewSecureURLValidator(name string) *URLValidator
NewSecureURLValidator creates a URL validator with security checks
func NewWebhookURLValidator ¶
func NewWebhookURLValidator(name string) *URLValidator
NewWebhookURLValidator creates a URL validator specifically for webhooks
func (*URLValidator) IsEnabled ¶
func (v *URLValidator) IsEnabled() bool
IsEnabled returns whether the validator is enabled
func (*URLValidator) Priority ¶
func (v *URLValidator) Priority() int
Priority returns the validator priority
func (*URLValidator) SetEnabled ¶
func (v *URLValidator) SetEnabled(enabled bool) *URLValidator
SetEnabled enables or disables the validator
func (*URLValidator) SetOptions ¶
func (v *URLValidator) SetOptions(opts common.URLValidationOptions) *URLValidator
SetOptions sets custom validation options
func (*URLValidator) SetPriority ¶
func (v *URLValidator) SetPriority(priority int) *URLValidator
SetPriority sets the validator priority
func (*URLValidator) Validate ¶
func (v *URLValidator) Validate(ctx context.Context, value string) ValidationResult
Validate validates a URL string
type UnionValidator ¶
type UnionValidator[T any] struct { // contains filtered or unexported fields }
UnionValidator validates union types (one of several possible types)
func NewUnionValidator ¶
func NewUnionValidator[T any](name string) *UnionValidator[T]
NewUnionValidator creates a new union validator
func (*UnionValidator[T]) AddValidator ¶
func (v *UnionValidator[T]) AddValidator(validator TypedValidator[any]) *UnionValidator[T]
AddValidator adds a validator for one of the union types
func (*UnionValidator[T]) IsEnabled ¶
func (v *UnionValidator[T]) IsEnabled() bool
IsEnabled returns whether the validator is enabled
func (*UnionValidator[T]) Name ¶
func (v *UnionValidator[T]) Name() string
Name returns the validator name
func (*UnionValidator[T]) Priority ¶
func (v *UnionValidator[T]) Priority() int
Priority returns the validator priority
func (*UnionValidator[T]) SetCondition ¶
func (v *UnionValidator[T]) SetCondition(condition func(T) bool) *UnionValidator[T]
SetCondition sets a condition that must be met for validation to apply
func (*UnionValidator[T]) SetEnabled ¶
func (v *UnionValidator[T]) SetEnabled(enabled bool) *UnionValidator[T]
SetEnabled enables or disables the validator
func (*UnionValidator[T]) SetPriority ¶
func (v *UnionValidator[T]) SetPriority(priority int) *UnionValidator[T]
SetPriority sets the validator priority
func (*UnionValidator[T]) Validate ¶
func (v *UnionValidator[T]) Validate(ctx context.Context, value T) ValidationResult
Validate validates a union value (must match at least one validator)
type UnsafeSliceOps ¶
type UnsafeSliceOps struct{}
UnsafeSliceOps provides unsafe operations for maximum performance
func (UnsafeSliceOps) BytesToString ¶
func (UnsafeSliceOps) BytesToString(b []byte) string
BytesToString converts bytes to string without allocation
func (UnsafeSliceOps) StringToBytes ¶
func (UnsafeSliceOps) StringToBytes(s string) []byte
StringToBytes converts string to bytes without allocation
type UserContext ¶
type UserContext struct {
// UserID identifies the user
UserID string `json:"user_id"`
// UserType indicates the type of user (human, service, etc.)
UserType string `json:"user_type,omitempty"`
// SessionID identifies the user session
SessionID string `json:"session_id,omitempty"`
// Roles assigned to the user
Roles []string `json:"roles,omitempty"`
// Permissions granted to the user
Permissions []string `json:"permissions,omitempty"`
// Groups the user belongs to
Groups []string `json:"groups,omitempty"`
// ClientInfo about the user's client
ClientInfo map[string]string `json:"client_info,omitempty"`
// Preferences user preferences relevant to the event
Preferences map[string]interface{} `json:"preferences,omitempty"`
}
UserContext represents user-specific context
type ValidatedTransportEvent ¶
type ValidatedTransportEvent struct {
TransportEvent
ValidatedAt time.Time
Validator string
}
ValidatedTransportEvent wraps a TransportEvent with validation metadata
func NewValidatedTransportEvent ¶
func NewValidatedTransportEvent(event TransportEvent, validator string) *ValidatedTransportEvent
NewValidatedTransportEvent creates a new validated transport event
func (*ValidatedTransportEvent) Data ¶
func (e *ValidatedTransportEvent) Data() map[string]interface{}
type ValidationConfig ¶
type ValidationConfig struct {
// Enabled controls whether validation is enabled
Enabled bool
// MaxMessageSize is the maximum allowed message size in bytes
MaxMessageSize int64
// RequiredFields lists fields that must be present in event data
RequiredFields []string
// AllowedEventTypes lists allowed event types (empty = all allowed)
AllowedEventTypes []string
// DeniedEventTypes lists denied event types
DeniedEventTypes []string
// MaxDataDepth is the maximum nesting depth for event data
MaxDataDepth int
// MaxArraySize is the maximum size for arrays in event data
MaxArraySize int
// MaxStringLength is the maximum length for string values
MaxStringLength int
// AllowedDataTypes lists allowed data types for event data values
AllowedDataTypes []string
// CustomValidators are custom validation functions
CustomValidators []ValidationRule
// FieldValidators are field-specific validation rules
FieldValidators map[string][]ValidationRule
// PatternValidators are regex-based validation rules
PatternValidators map[string]*regexp.Regexp
// SkipValidationOnIncoming skips validation for incoming events
SkipValidationOnIncoming bool
// SkipValidationOnOutgoing skips validation for outgoing events
SkipValidationOnOutgoing bool
// FailFast stops validation on first error
FailFast bool
// CollectAllErrors collects all validation errors
CollectAllErrors bool
// ValidateTimestamps enables timestamp validation
ValidateTimestamps bool
// StrictMode enables strict validation mode
StrictMode bool
// MaxEventSize is the maximum size of an event in bytes
MaxEventSize int
}
ValidationConfig holds configuration for validation
func DefaultValidationConfig ¶
func DefaultValidationConfig() *ValidationConfig
DefaultValidationConfig returns a default validation configuration
type ValidationContextKey ¶
type ValidationContextKey string
ValidationContext keys for passing data through validation chains
const ( ValidationDepthKey ValidationContextKey = "validation_depth" ValidationPathKey ValidationContextKey = "validation_path" ValidationRootKey ValidationContextKey = "validation_root" )
type ValidationError ¶
type ValidationError struct {
// contains filtered or unexported fields
}
ValidationError represents a validation error
func NewValidationError ¶
func NewValidationError(message string, errors []error) *ValidationError
NewValidationError creates a new validation error
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
func (*ValidationError) Errors ¶
func (e *ValidationError) Errors() []error
func (*ValidationError) Unwrap ¶
func (e *ValidationError) Unwrap() error
type ValidationIssue ¶
type ValidationIssue struct {
// Message is the human-readable error message
Message string `json:"message"`
// Code is a machine-readable error code
Code string `json:"code,omitempty"`
// Severity indicates the severity level of this issue
Severity ValidationSeverity `json:"severity"`
// Field is the field path where the issue occurred
Field string `json:"field,omitempty"`
// Value is the actual value that caused the issue (for debugging)
Value interface{} `json:"value,omitempty"`
// ExpectedValue is what was expected (for comparison errors)
ExpectedValue interface{} `json:"expected_value,omitempty"`
// Suggestion provides a hint on how to fix the issue
Suggestion string `json:"suggestion,omitempty"`
// Context provides additional context about the validation
Context map[string]interface{} `json:"context,omitempty"`
// Timestamp when the issue was detected
Timestamp time.Time `json:"timestamp"`
// Validator is the name of the validator that generated this issue
Validator string `json:"validator,omitempty"`
// RuleID identifies the specific validation rule that failed
RuleID string `json:"rule_id,omitempty"`
// Category groups related validation issues
Category string `json:"category,omitempty"`
// Tags allow for flexible categorization and filtering
Tags []string `json:"tags,omitempty"`
// Details contains additional structured information about the issue
Details map[string]interface{} `json:"details,omitempty"`
}
ValidationIssue represents a single validation issue with rich context
func NewLocalizedValidationIssue ¶
func NewLocalizedValidationIssue(messageKey string, severity ValidationSeverity, params map[string]interface{}) *ValidationIssue
NewLocalizedValidationIssue creates a validation issue with localized messages
func NewValidationIssue ¶
func NewValidationIssue(message string, severity ValidationSeverity) *ValidationIssue
NewValidationIssue creates a new validation issue
func (*ValidationIssue) AddTag ¶
func (i *ValidationIssue) AddTag(tag string) *ValidationIssue
AddTag adds a tag
func (*ValidationIssue) Error ¶
func (i *ValidationIssue) Error() string
Error implements the error interface
func (*ValidationIssue) IsError ¶
func (i *ValidationIssue) IsError() bool
IsError returns true if this is an error-level or fatal-level issue
func (*ValidationIssue) IsFatal ¶
func (i *ValidationIssue) IsFatal() bool
IsFatal returns true if this is a fatal-level issue
func (*ValidationIssue) IsWarning ¶
func (i *ValidationIssue) IsWarning() bool
IsWarning returns true if this is a warning-level issue
func (*ValidationIssue) String ¶
func (i *ValidationIssue) String() string
String returns a detailed string representation
func (*ValidationIssue) WithCategory ¶
func (i *ValidationIssue) WithCategory(category string) *ValidationIssue
WithCategory sets the category
func (*ValidationIssue) WithCode ¶
func (i *ValidationIssue) WithCode(code string) *ValidationIssue
WithCode sets the error code
func (*ValidationIssue) WithContext ¶
func (i *ValidationIssue) WithContext(key string, value interface{}) *ValidationIssue
WithContext adds context information
func (*ValidationIssue) WithDetail ¶
func (i *ValidationIssue) WithDetail(key string, value interface{}) *ValidationIssue
WithDetail adds detail information
func (*ValidationIssue) WithExpectedValue ¶
func (i *ValidationIssue) WithExpectedValue(expected interface{}) *ValidationIssue
WithExpectedValue sets the expected value
func (*ValidationIssue) WithField ¶
func (i *ValidationIssue) WithField(field string) *ValidationIssue
WithField sets the field path
func (*ValidationIssue) WithRuleID ¶
func (i *ValidationIssue) WithRuleID(ruleID string) *ValidationIssue
WithRuleID sets the rule ID
func (*ValidationIssue) WithSuggestion ¶
func (i *ValidationIssue) WithSuggestion(suggestion string) *ValidationIssue
WithSuggestion sets a suggestion for fixing the issue
func (*ValidationIssue) WithTags ¶
func (i *ValidationIssue) WithTags(tags ...string) *ValidationIssue
WithTags sets the tags
func (*ValidationIssue) WithValidator ¶
func (i *ValidationIssue) WithValidator(validator string) *ValidationIssue
WithValidator sets the validator name
func (*ValidationIssue) WithValue ¶
func (i *ValidationIssue) WithValue(value interface{}) *ValidationIssue
WithValue sets the problematic value
type ValidationMetrics ¶
type ValidationMetrics struct {
TotalValidations uint64
SuccessfulValidations uint64
FailedValidations uint64
ValidationErrors uint64
IncomingValidations uint64
OutgoingValidations uint64
AverageValidationTime time.Duration
MaxValidationTime time.Duration
ValidationTimeTotal time.Duration
ValidationsByType map[string]uint64
ValidationsByRule map[string]uint64
LastValidationTime time.Time
LastValidationError error
LastValidationErrorTime time.Time
// contains filtered or unexported fields
}
ValidationMetrics tracks validation performance metrics
type ValidationMiddleware ¶
type ValidationMiddleware struct {
// contains filtered or unexported fields
}
ValidationMiddleware implements middleware for transport validation
func NewValidationMiddlewareWithLogger ¶
func NewValidationMiddlewareWithLogger(config *ValidationConfig, logger Logger) *ValidationMiddleware
NewValidationMiddlewareWithLogger creates a new validation middleware with a logger
func (*ValidationMiddleware) GetMetrics ¶
func (m *ValidationMiddleware) GetMetrics() ValidationMetrics
GetMetrics returns validation metrics
func (*ValidationMiddleware) IsEnabled ¶
func (m *ValidationMiddleware) IsEnabled() bool
IsEnabled returns whether validation is enabled
func (*ValidationMiddleware) Name ¶
func (m *ValidationMiddleware) Name() string
Name returns the middleware name
func (*ValidationMiddleware) ProcessIncoming ¶
func (m *ValidationMiddleware) ProcessIncoming(ctx context.Context, event events.Event) (events.Event, error)
ProcessIncoming processes incoming events before they are delivered
func (*ValidationMiddleware) ProcessOutgoing ¶
func (m *ValidationMiddleware) ProcessOutgoing(ctx context.Context, event TransportEvent) (TransportEvent, error)
ProcessOutgoing processes outgoing events before they are sent
func (*ValidationMiddleware) ResetMetrics ¶
func (m *ValidationMiddleware) ResetMetrics()
ResetMetrics resets all validation metrics
func (*ValidationMiddleware) SetEnabled ¶
func (m *ValidationMiddleware) SetEnabled(enabled bool)
SetEnabled enables or disables validation
func (*ValidationMiddleware) UpdateConfig ¶
func (m *ValidationMiddleware) UpdateConfig(config *ValidationConfig)
UpdateConfig updates the validation configuration
func (*ValidationMiddleware) Wrap ¶
func (m *ValidationMiddleware) Wrap(transport Transport) Transport
Wrap implements the Middleware interface
type ValidationPool ¶
type ValidationPool struct {
// contains filtered or unexported fields
}
ValidationPool manages a pool of validators for concurrent validation
func NewValidationPool ¶
func NewValidationPool(maxSize int, factory func() Validator) *ValidationPool
NewValidationPool creates a new validation pool
func (*ValidationPool) Get ¶
func (vp *ValidationPool) Get() Validator
Get gets a validator from the pool
func (*ValidationPool) Put ¶
func (vp *ValidationPool) Put(validator Validator)
Put returns a validator to the pool
type ValidationResult ¶
type ValidationResult struct {
// contains filtered or unexported fields
}
ValidationResult aggregates validation results with rich error reporting
func FromJSON ¶
func FromJSON(data []byte) (*ValidationResult, error)
FromJSON deserializes a validation result from JSON
func NewValidationResult ¶
func NewValidationResult(valid bool) ValidationResult
NewValidationResult creates a new validation result
func (*ValidationResult) AddError ¶
func (r *ValidationResult) AddError(err error)
AddError adds an error-level validation issue
func (*ValidationResult) AddFieldError ¶
func (r *ValidationResult) AddFieldError(field string, err error)
AddFieldError adds an error for a specific field
func (*ValidationResult) AddFieldInfo ¶
func (r *ValidationResult) AddFieldInfo(field, message string)
AddFieldInfo adds an info message for a specific field
func (*ValidationResult) AddFieldWarning ¶
func (r *ValidationResult) AddFieldWarning(field, message string)
AddFieldWarning adds a warning for a specific field
func (*ValidationResult) AddInfo ¶
func (r *ValidationResult) AddInfo(message string)
AddInfo adds an info-level validation issue
func (*ValidationResult) AddIssue ¶
func (r *ValidationResult) AddIssue(issue *ValidationIssue)
AddIssue adds a validation issue
func (*ValidationResult) AddWarning ¶
func (r *ValidationResult) AddWarning(message string)
AddWarning adds a warning-level validation issue
func (*ValidationResult) Error ¶
func (r *ValidationResult) Error() string
Error implements the error interface
func (*ValidationResult) Errors ¶
func (r *ValidationResult) Errors() []error
Errors returns all error-level and fatal-level issues
func (*ValidationResult) FieldErrors ¶
func (r *ValidationResult) FieldErrors() map[string][]error
AllFieldIssues returns all field-specific issues
func (*ValidationResult) FieldIssues ¶
func (r *ValidationResult) FieldIssues(field string) []*ValidationIssue
FieldIssues returns issues for a specific field
func (*ValidationResult) Filter ¶
func (r *ValidationResult) Filter(filter func(*ValidationIssue) bool) ValidationResult
Filter returns a new ValidationResult containing only issues that match the filter
func (*ValidationResult) FilterByCategory ¶
func (r *ValidationResult) FilterByCategory(category string) []*ValidationIssue
FilterByCategory returns issues of a specific category
func (*ValidationResult) FilterByField ¶
func (r *ValidationResult) FilterByField(field string) []*ValidationIssue
FilterByField returns issues for a specific field
func (*ValidationResult) FilterBySeverity ¶
func (r *ValidationResult) FilterBySeverity(severity ValidationSeverity) []*ValidationIssue
FilterBySeverity returns issues of a specific severity level
func (*ValidationResult) FilterByValidator ¶
func (r *ValidationResult) FilterByValidator(validator string) []*ValidationIssue
FilterByValidator returns issues from a specific validator
func (*ValidationResult) GetMetadata ¶
func (r *ValidationResult) GetMetadata(key string) (interface{}, bool)
GetMetadata retrieves metadata from the validation result
func (*ValidationResult) GetSummary ¶
func (r *ValidationResult) GetSummary() *ValidationSummary
GetSummary returns a summary of the validation results
func (*ValidationResult) IsValid ¶
func (r *ValidationResult) IsValid() bool
IsValid returns whether the validation passed
func (*ValidationResult) Issues ¶
func (r *ValidationResult) Issues() []*ValidationIssue
Issues returns all validation issues
func (*ValidationResult) Merge ¶
func (r *ValidationResult) Merge(other ValidationResult)
Merge combines another validation result into this one
func (*ValidationResult) SetValid ¶
func (r *ValidationResult) SetValid(valid bool)
SetValid sets the validation status
func (*ValidationResult) ToJSON ¶
func (r *ValidationResult) ToJSON() ([]byte, error)
ToJSON serializes the validation result to JSON
func (*ValidationResult) Warnings ¶
func (r *ValidationResult) Warnings() []*ValidationIssue
Warnings returns all warning-level issues
func (*ValidationResult) WithMetadata ¶
func (r *ValidationResult) WithMetadata(key string, value interface{}) *ValidationResult
WithMetadata adds metadata to the validation result
type ValidationResultAggregator ¶
type ValidationResultAggregator struct {
// contains filtered or unexported fields
}
ValidationResultAggregator aggregates multiple validation results
func NewValidationResultAggregator ¶
func NewValidationResultAggregator(options AggregatorOptions) *ValidationResultAggregator
NewValidationResultAggregator creates a new result aggregator
func (*ValidationResultAggregator) Add ¶
func (a *ValidationResultAggregator) Add(result ValidationResult) bool
Add adds a validation result to the aggregator
func (*ValidationResultAggregator) Aggregate ¶
func (a *ValidationResultAggregator) Aggregate() ValidationResult
Aggregate returns the final aggregated validation result
type ValidationRule ¶
type ValidationRule interface {
// Name returns the name of the validation rule
Name() string
// Validate validates the event against this rule
Validate(ctx context.Context, event TransportEvent) error
// IsEnabled returns whether this rule is enabled
IsEnabled() bool
// Priority returns the priority of this rule (higher = earlier execution)
Priority() int
}
ValidationRule defines a single validation rule
type ValidationSeverity ¶
type ValidationSeverity int
ValidationSeverity defines the severity level of validation errors
const ( // SeverityInfo represents informational messages SeverityInfo ValidationSeverity = iota // SeverityWarning represents warnings that don't prevent processing SeverityWarning // SeverityError represents errors that prevent processing SeverityError // SeverityFatal represents critical errors that require immediate attention SeverityFatal )
func ParseSeverity ¶
func ParseSeverity(s string) ValidationSeverity
ParseSeverity parses a string into a ValidationSeverity
func (ValidationSeverity) String ¶
func (s ValidationSeverity) String() string
String returns the string representation of the severity level
type ValidationSummary ¶
type ValidationSummary struct {
// TotalIssues is the total number of issues found
TotalIssues int `json:"total_issues"`
// ErrorCount is the number of error-level issues
ErrorCount int `json:"error_count"`
// WarningCount is the number of warning-level issues
WarningCount int `json:"warning_count"`
// InfoCount is the number of info-level issues
InfoCount int `json:"info_count"`
// FatalCount is the number of fatal-level issues
FatalCount int `json:"fatal_count"`
// FieldsWithIssues is the number of fields that have issues
FieldsWithIssues int `json:"fields_with_issues"`
// Categories lists all issue categories found
Categories []string `json:"categories"`
// Validators lists all validators that reported issues
Validators []string `json:"validators"`
// HasErrors indicates if there are any error-level or fatal-level issues
HasErrors bool `json:"has_errors"`
// HasWarnings indicates if there are any warning-level issues
HasWarnings bool `json:"has_warnings"`
}
ValidationSummary provides a high-level overview of validation results
type ValidationTransport ¶
type ValidationTransport struct {
Transport
// contains filtered or unexported fields
}
ValidationTransport provides a transport wrapper focused on validation
func NewValidationTransport ¶
func NewValidationTransport(transport Transport, config *ValidationConfig) *ValidationTransport
NewValidationTransport creates a new validation transport wrapper
func NewValidationTransportWithLogger ¶
func NewValidationTransportWithLogger(transport Transport, config *ValidationConfig, logger Logger) *ValidationTransport
NewValidationTransportWithLogger creates a new validation transport with logger
func (*ValidationTransport) Channels ¶
func (vt *ValidationTransport) Channels() (<-chan events.Event, <-chan error)
Channels returns validated events and errors
func (*ValidationTransport) GetValidationMetrics ¶
func (vt *ValidationTransport) GetValidationMetrics() ValidationMetrics
GetValidationMetrics returns validation metrics
func (*ValidationTransport) Send ¶
func (vt *ValidationTransport) Send(ctx context.Context, event TransportEvent) error
Send validates and sends an event
func (*ValidationTransport) UpdateValidationConfig ¶
func (vt *ValidationTransport) UpdateValidationConfig(config *ValidationConfig)
UpdateValidationConfig updates the validation configuration
type Validator ¶
type Validator interface {
// Validate validates a transport event
Validate(ctx context.Context, event TransportEvent) error
// ValidateIncoming validates an incoming event
ValidateIncoming(ctx context.Context, event TransportEvent) error
// ValidateOutgoing validates an outgoing event
ValidateOutgoing(ctx context.Context, event TransportEvent) error
}
Validator defines the interface for validating transport events and messages
type VersionedSchema ¶
type VersionedSchema struct {
Schema *Schema `json:"schema"`
Version string `json:"version"`
Previous string `json:"previous,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
VersionedSchema represents a schema with version information
type WebSocketConfig ¶
type WebSocketConfig struct {
*BaseConfig
// Subprotocols specifies the WebSocket subprotocols to negotiate
Subprotocols []string `json:"subprotocols,omitempty"`
// Origin specifies the Origin header value
Origin string `json:"origin,omitempty"`
// PingInterval specifies the interval between ping frames
PingInterval time.Duration `json:"ping_interval"`
// PongTimeout specifies the timeout for pong responses
PongTimeout time.Duration `json:"pong_timeout"`
// CloseTimeout specifies the timeout for close handshake
CloseTimeout time.Duration `json:"close_timeout"`
// HandshakeTimeout specifies the timeout for WebSocket handshake
HandshakeTimeout time.Duration `json:"handshake_timeout"`
// EnablePingPong enables ping-pong keep-alive mechanism
EnablePingPong bool `json:"enable_ping_pong"`
// EnableAutoReconnect enables automatic reconnection on connection loss
EnableAutoReconnect bool `json:"enable_auto_reconnect"`
// ReconnectInterval specifies the interval between reconnection attempts
ReconnectInterval time.Duration `json:"reconnect_interval"`
// MaxReconnectAttempts specifies the maximum number of reconnection attempts
MaxReconnectAttempts int `json:"max_reconnect_attempts"`
// MessageQueue configuration for buffering messages during reconnection
MessageQueue *MessageQueueConfig `json:"message_queue,omitempty"`
// Dialer specifies a custom dialer for WebSocket connections
Dialer *net.Dialer `json:"-"`
// NetDial allows custom network dialing
NetDial func(network, addr string) (net.Conn, error) `json:"-"`
// NetDialContext allows custom network dialing with context
NetDialContext func(ctx context.Context, network, addr string) (net.Conn, error) `json:"-"`
// CheckOrigin allows custom origin validation
CheckOrigin func(r *http.Request) bool `json:"-"`
// EnableBinaryFrames enables support for binary WebSocket frames
EnableBinaryFrames bool `json:"enable_binary_frames"`
// CompressionLevel sets the compression level for per-message deflate
CompressionLevel int `json:"compression_level"`
// CompressionThreshold sets the minimum message size for compression
CompressionThreshold int `json:"compression_threshold"`
}
WebSocketConfig contains configuration specific to WebSocket transport.
func NewWebSocketConfig ¶
func NewWebSocketConfig(endpoint string) *WebSocketConfig
NewWebSocketConfig creates a new WebSocket configuration with defaults.
func (*WebSocketConfig) Clone ¶
func (c *WebSocketConfig) Clone() Config
Clone creates a deep copy of the WebSocket configuration.
func (*WebSocketConfig) Validate ¶
func (c *WebSocketConfig) Validate() error
Validate validates the WebSocket configuration.
type WebSocketConnectionFactory ¶
type WebSocketConnectionFactory struct {
// contains filtered or unexported fields
}
WebSocketConnectionFactory creates WebSocket connections
func NewWebSocketConnectionFactory ¶
func NewWebSocketConnectionFactory(url string) *WebSocketConnectionFactory
NewWebSocketConnectionFactory creates a new WebSocket connection factory
func (*WebSocketConnectionFactory) CloseConnection ¶
func (f *WebSocketConnectionFactory) CloseConnection(conn net.Conn) error
CloseConnection closes a WebSocket connection
func (*WebSocketConnectionFactory) CreateConnection ¶
CreateConnection creates a new WebSocket connection
func (*WebSocketConnectionFactory) ValidateConnection ¶
func (f *WebSocketConnectionFactory) ValidateConnection(conn net.Conn) bool
ValidateConnection validates a WebSocket connection
type WeightedLoadBalancer ¶
type WeightedLoadBalancer struct {
// contains filtered or unexported fields
}
WeightedLoadBalancer is a weighted load balancer based on transport performance.
func NewWeightedLoadBalancer ¶
func NewWeightedLoadBalancer() *WeightedLoadBalancer
NewWeightedLoadBalancer creates a new weighted load balancer.
func (*WeightedLoadBalancer) Name ¶
func (lb *WeightedLoadBalancer) Name() string
Name returns the load balancer name.
func (*WeightedLoadBalancer) SelectTransport ¶
func (lb *WeightedLoadBalancer) SelectTransport(transports map[string]Transport, event TransportEvent) (string, error)
SelectTransport selects a transport using weighted algorithm.
func (*WeightedLoadBalancer) UpdateStats ¶
func (lb *WeightedLoadBalancer) UpdateStats(transportName string, stats TransportStats)
UpdateStats updates the load balancer with transport statistics.
Source Files
¶
- advanced_events.go
- backpressure.go
- business_validators.go
- cleanup_coordinator.go
- cleanup_manager.go
- cleanup_tracker.go
- cleanup_validator.go
- complex_validators.go
- composite_events.go
- config.go
- connection_capabilities.go
- connection_pool.go
- context_improvements.go
- deadlock_detector.go
- doc.go
- doc_generator.go
- errors.go
- event_creators.go
- event_types.go
- factory.go
- interfaces.go
- interfaces_auth.go
- interfaces_config.go
- interfaces_core.go
- interfaces_events.go
- interfaces_health.go
- interfaces_io.go
- interfaces_manager.go
- interfaces_metrics.go
- interfaces_middleware.go
- interfaces_resilience.go
- interfaces_serialization.go
- interfaces_state.go
- interfaces_stats.go
- logger.go
- logger_optimized.go
- manager_full.go
- manager_simple.go
- memory_manager.go
- memory_transport.go
- migration_test_helpers.go
- migration_tool.go
- ring_buffer.go
- schema_validators.go
- simple_transport_event.go
- slice_pool.go
- string_pool.go
- sync_slice.go
- test_isolation_helpers.go
- testing.go
- testing_advanced.go
- types.go
- validation.go
- validation_middleware.go
- validation_performance.go
- validation_results.go