Documentation
¶
Index ¶
- Constants
- Variables
- func NewUUID() string
- type APIConfig
- type APIKeyYAML
- type AppConfig
- type AuthAPIKey
- type AuthConfig
- type AuthUser
- type BackoffType
- type BatchItem
- type Client
- func (c *Client) Close() error
- func (c *Client) Enqueue(ctx context.Context, jobType string, payload Payload, opts ...EnqueueOption) (*Job, error)
- func (c *Client) EnqueueAt(ctx context.Context, at time.Time, jobType string, payload Payload, ...) (*Job, error)
- func (c *Client) EnqueueBatch(ctx context.Context, items []BatchItem) ([]*Job, error)
- func (c *Client) EnqueueIn(ctx context.Context, delay time.Duration, jobType string, payload Payload, ...) (*Job, error)
- func (c *Client) GetJob(ctx context.Context, jobID string) (*Job, error)
- func (c *Client) RedisClient() *redis.Client
- type CompleteCallbackFunc
- type Config
- type CronEntry
- type CronEntryYAML
- type CronExpr
- type DashboardConfig
- type DequeueStrategy
- type EnqueueOption
- func AllowFailure(allow bool) EnqueueOption
- func DependsOn(jobIDs ...string) EnqueueOption
- func EnqueueAtFront(atFront bool) EnqueueOption
- func EnqueuedBy(name string) EnqueueOption
- func JobID(id string) EnqueueOption
- func MaxRetry(n int) EnqueueOption
- func Meta(m Payload) EnqueueOption
- func Queue(name string) EnqueueOption
- func RetryIntervals(intervals ...int) EnqueueOption
- func Timeout(d time.Duration) EnqueueOption
- func Unique() EnqueueOption
- type FailureCallbackFunc
- type HandleOption
- type Handler
- type Job
- type MiddlewareFunc
- type MonitoringConfig
- type OverlapPolicy
- type Payload
- type PoolConfig
- type PoolYAML
- type QueueDef
- type RedisClient
- type RedisConfig
- type RedisOption
- type RedisYAML
- type RetryPolicy
- type RetryYAML
- type SchedulerConfig
- type Server
- func (s *Server) CancelJob(ctx context.Context, jobID string) error
- func (s *Server) ClearDLQ(ctx context.Context, queue string) (int64, error)
- func (s *Server) DeleteJob(ctx context.Context, jobID string) error
- func (s *Server) DisableCron(ctx context.Context, cronID string) error
- func (s *Server) EmptyQueue(ctx context.Context, queue string) (int64, error)
- func (s *Server) EnableCron(ctx context.Context, cronID string) error
- func (s *Server) Handle(jobType string, handler Handler, opts ...HandleOption) error
- func (s *Server) IsQueuePaused(ctx context.Context, queue string) (bool, error)
- func (s *Server) PauseQueue(ctx context.Context, queue string) error
- func (s *Server) Pool(cfg PoolConfig) error
- func (s *Server) ResumeQueue(ctx context.Context, queue string) error
- func (s *Server) RetryAllDLQ(ctx context.Context, queue string) (int64, error)
- func (s *Server) RetryJob(ctx context.Context, jobID string) error
- func (s *Server) Schedule(entry CronEntry) error
- func (s *Server) Start(ctx context.Context) error
- func (s *Server) Stop()
- func (s *Server) TriggerCron(ctx context.Context, cronID string) (string, error)
- func (s *Server) Use(mws ...MiddlewareFunc) error
- type ServerOption
- func WithAPI(enabled bool, addr string) ServerOption
- func WithAPIKeys(keys []AuthAPIKey) ServerOption
- func WithAuthEnabled(enabled bool) ServerOption
- func WithAuthUsers(users []AuthUser) ServerOption
- func WithDashboard(enabled bool) ServerOption
- func WithDashboardDir(dir string) ServerOption
- func WithDashboardPathPrefix(prefix string) ServerOption
- func WithDefaultTimezone(tz string) ServerOption
- func WithGlobalTimeout(d time.Duration) ServerOption
- func WithGracePeriod(d time.Duration) ServerOption
- func WithLogLevel(level string) ServerOption
- func WithLogger(l *slog.Logger) ServerOption
- func WithSchedulerEnabled(enabled bool) ServerOption
- func WithSchedulerPollInterval(d time.Duration) ServerOption
- func WithServerRedis(addr string) ServerOption
- func WithServerRedisClient(rdb *redis.Client) ServerOption
- func WithServerRedisOpts(opts ...RedisOption) ServerOption
- func WithShutdownTimeout(d time.Duration) ServerOption
- type SuccessCallbackFunc
- type UserYAML
Constants ¶
const ( StatusReady = "ready" StatusScheduled = "scheduled" StatusDeferred = "deferred" StatusProcessing = "processing" StatusCompleted = "completed" StatusFailed = "failed" StatusRetry = "retry" StatusDeadLetter = "dead_letter" StatusStopped = "stopped" StatusCanceled = "canceled" )
Job status constants.
Variables ¶
var ( // ErrJobNotFound is returned when a job cannot be found in Redis. ErrJobNotFound = errors.New("gqm: job not found") // ErrQueueEmpty is returned when attempting to dequeue from an empty queue. ErrQueueEmpty = errors.New("gqm: queue empty") // ErrServerStopped is returned when operations are attempted on a stopped server. ErrServerStopped = errors.New("gqm: server stopped") // ErrHandlerNotFound is returned when no handler is registered for a job type. ErrHandlerNotFound = errors.New("gqm: handler not found") // ErrDuplicateHandler is returned when a handler is registered twice for the same job type. ErrDuplicateHandler = errors.New("gqm: duplicate handler registration") // ErrInvalidJobType is returned when a job type is empty. ErrInvalidJobType = errors.New("gqm: invalid job type") // ErrMaxRetryExceeded is returned when a job has exceeded its maximum retry count. ErrMaxRetryExceeded = errors.New("gqm: max retry exceeded") // ErrDuplicatePool is returned when a pool with the same name is registered twice. ErrDuplicatePool = errors.New("gqm: duplicate pool name") // ErrJobTypeConflict is returned when a job type is assigned to multiple pools. ErrJobTypeConflict = errors.New("gqm: job type already assigned to another pool") // ErrDuplicateCronEntry is returned when a cron entry with the same ID is registered twice. ErrDuplicateCronEntry = errors.New("gqm: duplicate cron entry ID") // ErrCyclicDependency is returned when adding a dependency would create a cycle in the DAG. ErrCyclicDependency = errors.New("gqm: cyclic dependency detected") // ErrDuplicateJobID is returned when enqueueing with Unique() and a job with the same ID already exists. ErrDuplicateJobID = errors.New("gqm: duplicate job ID") // ErrInvalidQueueName is returned when a queue name contains invalid characters. ErrInvalidQueueName = errors.New("gqm: invalid queue name (only alphanumeric, hyphen, underscore, dot allowed; max 128 chars)") // ErrInvalidJobID is returned when a job ID contains invalid characters. ErrInvalidJobID = errors.New("gqm: invalid job ID (only alphanumeric, hyphen, underscore, dot allowed; max 256 chars)") // ErrSkipRetry can be returned (or wrapped) by a handler to skip all // remaining retries and move the job directly to the dead letter queue. ErrSkipRetry = errors.New("gqm: skip retry") // ErrBatchTooLarge is returned when EnqueueBatch receives more items // than the maximum allowed batch size. ErrBatchTooLarge = errors.New("gqm: batch size exceeds maximum (1000)") // ErrBatchDependsOn is returned when DependsOn is used in EnqueueBatch. ErrBatchDependsOn = errors.New("gqm: DependsOn is not supported in EnqueueBatch") // ErrBatchUnique is returned when Unique is used in EnqueueBatch. ErrBatchUnique = errors.New("gqm: Unique is not supported in EnqueueBatch") // ErrBatchEnqueueAtFront is returned when EnqueueAtFront is used in EnqueueBatch. ErrBatchEnqueueAtFront = errors.New("gqm: EnqueueAtFront is not supported in EnqueueBatch") // ErrJobDataTooLarge is returned when a job's serialized data exceeds maxJobDataSize. ErrJobDataTooLarge = errors.New("gqm: job data exceeds maximum size (1 MB)") )
Functions ¶
Types ¶
type APIConfig ¶
type APIConfig struct {
Enabled bool `yaml:"enabled"`
Addr string `yaml:"addr"` // default ":8080"
RateLimit int `yaml:"rate_limit"` // requests/second per IP; 0 = default (100), -1 = disabled
APIKeys []APIKeyYAML `yaml:"api_keys"`
}
APIConfig holds HTTP API settings from YAML.
type APIKeyYAML ¶
type APIKeyYAML struct {
Name string `yaml:"name"`
Key string `yaml:"key"` // prefix: gqm_ak_, minimum 32 chars
Role string `yaml:"role"` // "admin" or "viewer"; defaults to "admin"
}
APIKeyYAML holds an API key entry from YAML.
type AppConfig ¶
type AppConfig struct {
Timezone string `yaml:"timezone"`
LogLevel string `yaml:"log_level"`
ShutdownTimeout int `yaml:"shutdown_timeout"` // seconds
GlobalJobTimeout int `yaml:"global_job_timeout"` // seconds
GracePeriod int `yaml:"grace_period"` // seconds
}
AppConfig holds application-level settings from YAML.
type AuthAPIKey ¶
type AuthAPIKey struct {
Name string
Key string
Role string // "admin" or "viewer"; defaults to "admin" if empty
}
AuthAPIKey represents an API key for programmatic access.
type AuthConfig ¶
type AuthConfig struct {
Enabled bool `yaml:"enabled"`
SessionTTL int `yaml:"session_ttl"` // seconds, default 86400
Users []UserYAML `yaml:"users"`
}
AuthConfig holds authentication settings from YAML.
type AuthUser ¶
type AuthUser struct {
Username string
PasswordHash string // bcrypt hash
Role string // "admin" or "viewer"; defaults to "admin" if empty
}
AuthUser represents a user for the monitoring API.
type BackoffType ¶
type BackoffType string
BackoffType determines the retry delay calculation method.
const ( // BackoffFixed uses a constant interval between retries. BackoffFixed BackoffType = "fixed" // BackoffExponential uses base * 2^attempt, capped at BackoffMax. BackoffExponential BackoffType = "exponential" // BackoffCustom uses the Intervals slice directly. BackoffCustom BackoffType = "custom" )
type BatchItem ¶
type BatchItem struct {
JobType string
Payload Payload
Options []EnqueueOption
}
BatchItem represents a single job to be enqueued in a batch.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is used to enqueue jobs into the queue system.
func NewClient ¶
func NewClient(opts ...RedisOption) (*Client, error)
NewClient creates a new Client with the given Redis options.
func (*Client) Enqueue ¶
func (c *Client) Enqueue(ctx context.Context, jobType string, payload Payload, opts ...EnqueueOption) (*Job, error)
Enqueue creates a new job and adds it to the queue. If DependsOn is set, the job is created with status "deferred" and will be moved to the ready queue only after all dependencies are resolved. Returns the created job or an error.
func (*Client) EnqueueAt ¶
func (c *Client) EnqueueAt(ctx context.Context, at time.Time, jobType string, payload Payload, opts ...EnqueueOption) (*Job, error)
EnqueueAt creates a new job scheduled for execution at the given time. The job is placed in the scheduled sorted set and will be moved to the ready queue by the scheduler when the time arrives.
func (*Client) EnqueueBatch ¶
EnqueueBatch creates multiple jobs in a single Redis pipeline for efficiency. All jobs are validated upfront (including serialization); if any item fails validation, no jobs are enqueued and the error identifies the failing item.
The pipeline is NOT a Redis transaction — on network errors, some jobs may have been created while others were not. Callers should treat a pipeline error as "unknown state" and verify job existence if needed.
Maximum batch size is 1000 items. Returns ErrBatchTooLarge if exceeded.
Limitations compared to single Enqueue:
- Unique() is not supported (returns error if set on any item).
- DependsOn() is not supported (returns error if set on any item).
- EnqueueAtFront() is not supported (returns error if set on any item).
Use individual Enqueue calls if you need uniqueness, DAG dependencies, or front-of-queue insertion.
func (*Client) EnqueueIn ¶
func (c *Client) EnqueueIn(ctx context.Context, delay time.Duration, jobType string, payload Payload, opts ...EnqueueOption) (*Job, error)
EnqueueIn creates a new job scheduled for execution after the given delay.
func (*Client) RedisClient ¶
RedisClient returns the underlying RedisClient for testing/advanced use.
type CompleteCallbackFunc ¶
CompleteCallbackFunc is a callback invoked after every job execution regardless of outcome.
type Config ¶
type Config struct {
Redis RedisYAML `yaml:"redis"`
App AppConfig `yaml:"app"`
Queues []QueueDef `yaml:"queues"`
Pools []PoolYAML `yaml:"pools"`
Scheduler SchedulerConfig `yaml:"scheduler"`
Monitoring MonitoringConfig `yaml:"monitoring"`
}
Config represents the top-level YAML configuration file.
func LoadConfig ¶
LoadConfig parses YAML bytes and validates the resulting configuration.
func LoadConfigFile ¶
LoadConfigFile reads a YAML file and returns a validated Config.
type CronEntry ¶
type CronEntry struct {
// User-configurable fields.
ID string `json:"id"`
Name string `json:"name"`
CronExpr string `json:"cron_expr"`
Timezone string `json:"timezone,omitempty"`
JobType string `json:"job_type"`
Queue string `json:"queue,omitempty"`
Payload Payload `json:"payload,omitempty"`
Timeout int `json:"timeout,omitempty"`
MaxRetry int `json:"max_retry,omitempty"`
OverlapPolicy OverlapPolicy `json:"overlap_policy,omitempty"`
Enabled bool `json:"enabled"`
// System-managed state (populated by the scheduler, not by the user).
LastRun int64 `json:"last_run,omitempty"`
LastStatus string `json:"last_status,omitempty"`
NextRun int64 `json:"next_run,omitempty"`
CreatedAt int64 `json:"created_at,omitempty"`
UpdatedAt int64 `json:"updated_at,omitempty"`
// contains filtered or unexported fields
}
CronEntry defines a recurring job schedule.
type CronEntryYAML ¶
type CronEntryYAML struct {
ID string `yaml:"id"`
Name string `yaml:"name"`
CronExpr string `yaml:"cron_expr"`
Timezone string `yaml:"timezone"`
JobType string `yaml:"job_type"`
Queue string `yaml:"queue"`
Payload string `yaml:"payload"` // JSON string
Timeout int `yaml:"timeout"` // seconds
MaxRetry int `yaml:"max_retry"`
OverlapPolicy string `yaml:"overlap_policy"`
Enabled *bool `yaml:"enabled"` // nil = true
}
CronEntryYAML holds a cron entry from YAML.
type CronExpr ¶
type CronExpr struct {
// contains filtered or unexported fields
}
CronExpr represents a parsed cron expression.
Accepts 6 fields: second minute hour day_of_month month day_of_week or 5 fields: minute hour day_of_month month day_of_week (second defaults to 0).
Field syntax:
- * (any value)
- N (specific value)
- N-M (range)
- */S or N/S (step from min or N)
- N-M/S (range with step)
- N,M,O (comma-separated list)
Day matching: when both day_of_month and day_of_week are explicitly set (not *), the match uses OR semantics (standard cron behavior).
Day of week: 0 = Sunday, 1 = Monday, ..., 6 = Saturday, 7 = Sunday (alias).
func ParseCronExpr ¶
ParseCronExpr parses a cron expression string into a CronExpr. It accepts 6 fields (second minute hour dom month dow) or 5 fields (minute hour dom month dow, with second implicitly 0).
type DashboardConfig ¶
type DashboardConfig struct {
Enabled bool `yaml:"enabled"`
PathPrefix string `yaml:"path_prefix"` // default "/dashboard"
CustomDir string `yaml:"custom_dir"`
}
DashboardConfig holds dashboard settings from YAML.
type DequeueStrategy ¶
type DequeueStrategy string
DequeueStrategy determines how a pool selects jobs from multiple queues.
const ( // StrategyStrict always checks the highest-priority queue first. // Lower-priority queues may starve if higher ones are always full. StrategyStrict DequeueStrategy = "strict" // StrategyRoundRobin rotates through queues in order each cycle. StrategyRoundRobin DequeueStrategy = "round_robin" // StrategyWeighted selects queues probabilistically based on priority weight. StrategyWeighted DequeueStrategy = "weighted" )
type EnqueueOption ¶
type EnqueueOption func(*Job)
EnqueueOption configures job enqueue behavior.
func AllowFailure ¶
func AllowFailure(allow bool) EnqueueOption
AllowFailure controls whether this job should proceed even if a parent dependency fails. When false (default), parent failure cascades cancellation.
func DependsOn ¶
func DependsOn(jobIDs ...string) EnqueueOption
DependsOn specifies parent job IDs that must complete before this job runs. The job will be created with status "deferred" and moved to ready queue only after all dependencies are resolved.
func EnqueueAtFront ¶
func EnqueueAtFront(atFront bool) EnqueueOption
EnqueueAtFront causes the job to be pushed to the front of the queue (instead of the back) when its dependencies are resolved.
func EnqueuedBy ¶
func EnqueuedBy(name string) EnqueueOption
EnqueuedBy sets the enqueuer identifier.
func JobID ¶
func JobID(id string) EnqueueOption
JobID overrides the default UUID v7 job ID with a custom value. The caller is responsible for ensuring uniqueness — if a job with the same ID already exists in Redis, its data will be silently overwritten.
func MaxRetry ¶
func MaxRetry(n int) EnqueueOption
MaxRetry sets the maximum number of retries for the job. Negative values are clamped to 0 (no retry).
func RetryIntervals ¶
func RetryIntervals(intervals ...int) EnqueueOption
RetryIntervals sets the retry intervals in seconds.
func Timeout ¶
func Timeout(d time.Duration) EnqueueOption
Timeout sets the job-level timeout. Sub-second values are rounded up to 1s.
func Unique ¶
func Unique() EnqueueOption
Unique ensures no existing job with the same ID exists before enqueueing. If a duplicate is found, Enqueue returns ErrDuplicateJobID. Most useful with JobID() for custom IDs — default UUID v7 IDs are already unique.
type FailureCallbackFunc ¶
FailureCallbackFunc is a callback invoked after a job fails (handler error or timeout).
type HandleOption ¶
type HandleOption func(*handlerConfig)
HandleOption configures handler registration.
func IsFailure ¶
func IsFailure(fn func(error) bool) HandleOption
IsFailure sets a predicate that classifies handler errors.
When a handler returns an error and the predicate returns false, the job is retried WITHOUT incrementing the retry counter. This means transient/expected errors (rate limiting, temporary unavailability) do not count towards the retry limit, allowing the job to retry indefinitely for non-failure errors.
When the predicate returns true (or when no predicate is set), the error is treated as a real failure: the retry counter increments normally and the job eventually moves to the dead letter queue.
Non-failure retries always use the same retry delay (the first interval), since the retry counter is not incremented. Backoff progression only applies to failure retries.
If maxRetry is 0, non-failure errors are sent to the DLQ with a warning log, since retry is not configured.
If the predicate panics, the error is treated as a failure (safe default) and the panic is logged with a stack trace.
Note: ErrSkipRetry always bypasses retry regardless of IsFailure. The fn parameter must not be nil.
func OnComplete ¶
func OnComplete(fn CompleteCallbackFunc) HandleOption
OnComplete registers a callback that fires after every job execution, regardless of outcome. It is called after OnSuccess or OnFailure. The err parameter is nil on success. Panics are recovered and logged; they do not affect the job outcome.
Important: callbacks block the worker. Keep them fast. Callbacks fire before Redis state is updated. Only fires when a handler is found and invoked — internal failures (fetch/parse/no-handler) do not trigger callbacks. Do not mutate the job struct.
func OnFailure ¶
func OnFailure(fn FailureCallbackFunc) HandleOption
OnFailure registers a callback that fires after a handler returns an error (including timeout). It fires for ALL handler errors regardless of the IsFailure predicate classification. The callback runs synchronously in the worker goroutine before retry/DLQ processing. Panics are recovered and logged; they do not affect the job outcome.
Important: callbacks block the worker. Keep them fast. Callbacks fire before Redis state is updated. Do not mutate the job struct.
func OnSuccess ¶
func OnSuccess(fn SuccessCallbackFunc) HandleOption
OnSuccess registers a callback that fires after a job completes successfully (handler returned nil). The callback runs synchronously in the worker goroutine before the next job is dequeued. Panics are recovered and logged; they do not affect the job outcome.
Important: callbacks block the worker. Keep them fast (no heavy I/O, no unbounded waits). For async work, spawn a goroutine inside the callback. Callbacks fire before Redis state is updated. Do not mutate the job struct — it is shared with subsequent Redis operations. The ctx is the worker's shutdown context, NOT the handler's timeout context. Use your own timeout for I/O inside callbacks.
func Workers ¶
func Workers(n int) HandleOption
Workers sets the number of dedicated worker goroutines for this handler. This creates an implicit pool with a dedicated queue for this job type.
type Handler ¶
Handler is a function that processes a job. It must respect ctx for cancellation and timeout.
type Job ¶
type Job struct {
ID string `json:"id"`
Type string `json:"type"`
Queue string `json:"queue"`
Payload Payload `json:"payload"`
Status string `json:"status"`
Result json.RawMessage `json:"result,omitempty"`
Error string `json:"error,omitempty"`
RetryCount int `json:"retry_count"`
MaxRetry int `json:"max_retry"`
RetryIntervals []int `json:"retry_intervals,omitempty"`
Timeout int `json:"timeout,omitempty"`
CreatedAt int64 `json:"created_at"`
ScheduledAt int64 `json:"scheduled_at,omitempty"`
StartedAt int64 `json:"started_at,omitempty"`
CompletedAt int64 `json:"completed_at,omitempty"`
WorkerID string `json:"worker_id,omitempty"`
LastHeartbeat int64 `json:"last_heartbeat,omitempty"`
ExecutionDuration int64 `json:"execution_duration,omitempty"`
EnqueuedBy string `json:"enqueued_by,omitempty"`
Meta Payload `json:"meta,omitempty"`
DependsOn []string `json:"depends_on,omitempty"`
AllowFailure bool `json:"allow_failure,omitempty"`
EnqueueAtFront bool `json:"enqueue_at_front,omitempty"`
// contains filtered or unexported fields
}
Job represents a unit of work in the queue.
func JobFromMap ¶
JobFromMap creates a Job from a Redis HGETALL result.
type MiddlewareFunc ¶
MiddlewareFunc is a function that wraps a Handler to add cross-cutting behavior such as logging, metrics, tracing, or error handling.
Middleware is registered via Server.Use() and applied in the order registered: Use(a, b) executes as a → b → handler.
Example:
func loggingMiddleware(next gqm.Handler) gqm.Handler {
return func(ctx context.Context, job *gqm.Job) error {
slog.Info("start job", "id", job.ID, "type", job.Type)
err := next(ctx, job)
slog.Info("end job", "id", job.ID, "type", job.Type, "error", err)
return err
}
}
type MonitoringConfig ¶
type MonitoringConfig struct {
Auth AuthConfig `yaml:"auth"`
API APIConfig `yaml:"api"`
Dashboard DashboardConfig `yaml:"dashboard"`
}
MonitoringConfig holds HTTP API, auth, and dashboard settings from YAML.
type OverlapPolicy ¶
type OverlapPolicy string
OverlapPolicy determines behavior when a cron schedule fires while the previous job from the same entry is still running.
const ( // OverlapSkip skips the new execution if the previous job is still running. OverlapSkip OverlapPolicy = "skip" // OverlapAllow enqueues a new job regardless of previous job status. OverlapAllow OverlapPolicy = "allow" // OverlapReplace cancels the previous job and enqueues a new one. OverlapReplace OverlapPolicy = "replace" )
type PoolConfig ¶
type PoolConfig struct {
Name string
JobTypes []string // Job types handled by this pool
Queues []string // Queues to listen on, ordered by priority
Concurrency int // Number of worker goroutines (default: runtime.NumCPU())
JobTimeout time.Duration // Pool-level job timeout (0 = fall back to global)
GracePeriod time.Duration // Grace period after context cancel (0 = use server default)
ShutdownTimeout time.Duration // Shutdown wait time (0 = use server default)
DequeueStrategy DequeueStrategy // How to select from multiple queues (default: strict)
RetryPolicy *RetryPolicy // Pool-level retry defaults (nil = use job-level)
}
PoolConfig is the public configuration for defining an explicit worker pool (Layer 3). Use Server.Pool() to register a pool with this configuration.
type PoolYAML ¶
type PoolYAML struct {
Name string `yaml:"name"`
JobTypes []string `yaml:"job_types"`
Queues []string `yaml:"queues"`
Concurrency int `yaml:"concurrency"`
JobTimeout int `yaml:"job_timeout"` // seconds
GracePeriod int `yaml:"grace_period"` // seconds
ShutdownTimeout int `yaml:"shutdown_timeout"` // seconds
DequeueStrategy string `yaml:"dequeue_strategy"`
Retry *RetryYAML `yaml:"retry"`
}
PoolYAML holds worker pool configuration from YAML.
type RedisClient ¶
type RedisClient struct {
// contains filtered or unexported fields
}
RedisClient wraps a go-redis client with GQM-specific helpers.
func NewRedisClient ¶
func NewRedisClient(opts ...RedisOption) (*RedisClient, error)
NewRedisClient creates a new RedisClient with the given options. If WithRedisClient was used to inject an existing *redis.Client, it is used directly and connection options (Addr, Password, DB, TLSConfig) are ignored.
func (*RedisClient) Close ¶
func (rc *RedisClient) Close() error
Close closes the underlying Redis connection. If the client was injected via WithRedisClient, Close is a no-op — the caller retains ownership and is responsible for closing it.
func (*RedisClient) Key ¶
func (rc *RedisClient) Key(parts ...string) string
Key returns a prefixed Redis key.
func (*RedisClient) Ping ¶
func (rc *RedisClient) Ping(ctx context.Context) error
Ping checks the Redis connection.
func (*RedisClient) Prefix ¶
func (rc *RedisClient) Prefix() string
Prefix returns the key prefix used by this client.
func (*RedisClient) Unwrap ¶
func (rc *RedisClient) Unwrap() *redis.Client
Unwrap returns the underlying go-redis client for advanced operations.
type RedisConfig ¶
type RedisConfig struct {
Addr string
Password string
DB int
Prefix string
TLSConfig *tls.Config
// contains filtered or unexported fields
}
RedisConfig holds Redis connection configuration.
type RedisOption ¶
type RedisOption func(*RedisConfig)
RedisOption configures a RedisConfig.
func WithPrefix ¶
func WithPrefix(prefix string) RedisOption
WithPrefix sets the key prefix for all GQM keys.
func WithRedisAddr ¶
func WithRedisAddr(addr string) RedisOption
WithRedisAddr sets the Redis server address.
func WithRedisClient ¶
func WithRedisClient(rdb *redis.Client) RedisOption
WithRedisClient injects a pre-configured *redis.Client, bypassing the built-in connection setup. This enables Redis Sentinel, Cluster, or any custom configuration supported by go-redis.
When used, connection options (WithRedisAddr, WithRedisPassword, WithRedisDB, WithRedisTLS) are ignored — only WithPrefix is still applied.
Ownership: the caller retains ownership of rdb. GQM will NOT close it — you must close it yourself after the Client/Server is done. This is safe for sharing a single *redis.Client across multiple GQM instances.
Example (Sentinel):
rdb := redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: "mymaster",
SentinelAddrs: []string{"sentinel1:26379", "sentinel2:26379"},
})
defer rdb.Close()
client, _ := gqm.NewClient(gqm.WithRedisClient(rdb))
func WithRedisPassword ¶
func WithRedisPassword(password string) RedisOption
WithRedisPassword sets the Redis password.
func WithRedisTLS ¶
func WithRedisTLS(tc *tls.Config) RedisOption
WithRedisTLS enables TLS for the Redis connection. Pass nil for default TLS configuration (system CA pool), or provide a custom *tls.Config for client certificates, custom CA, or other TLS settings.
type RedisYAML ¶
type RedisYAML struct {
Addr string `yaml:"addr"`
Password string `yaml:"password"`
DB int `yaml:"db"`
Prefix string `yaml:"prefix"`
TLS bool `yaml:"tls"` // enable TLS with system CA pool
}
RedisYAML holds Redis connection settings from YAML.
type RetryPolicy ¶
type RetryPolicy struct {
MaxRetry int
Intervals []int // Explicit intervals in seconds (for BackoffCustom)
Backoff BackoffType // fixed, exponential, custom
BackoffBase time.Duration // Base delay for fixed/exponential
BackoffMax time.Duration // Maximum delay cap for exponential
}
RetryPolicy configures retry behavior at the pool level. When set on a pool, it serves as the default for all jobs in that pool unless overridden at the job level.
type RetryYAML ¶
type RetryYAML struct {
MaxRetry int `yaml:"max_retry"`
Intervals []int `yaml:"intervals"`
Backoff string `yaml:"backoff"`
BackoffBase int `yaml:"backoff_base"` // seconds
BackoffMax int `yaml:"backoff_max"` // seconds
}
RetryYAML holds retry policy configuration from YAML.
type SchedulerConfig ¶
type SchedulerConfig struct {
Enabled *bool `yaml:"enabled"` // nil = true
PollInterval int `yaml:"poll_interval"` // seconds
CronEntries []CronEntryYAML `yaml:"cron_entries"`
}
SchedulerConfig holds scheduler settings from YAML.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server manages worker pools and processes jobs.
func NewServer ¶
func NewServer(opts ...ServerOption) (*Server, error)
NewServer creates a new Server with the given options.
func NewServerFromConfig ¶
func NewServerFromConfig(cfg *Config, opts ...ServerOption) (*Server, error)
NewServerFromConfig creates a Server from a Config, with optional code overrides. The config serves as the base configuration; ServerOption values always win.
func (*Server) CancelJob ¶
CancelJob cancels a job that is ready, scheduled, or deferred. Processing jobs cannot be canceled (handler is already running).
func (*Server) ClearDLQ ¶
ClearDLQ deletes jobs from the dead letter queue, up to maxBulkOps at a time. Job hashes and DAG keys are also removed. Returns the number of jobs deleted. Call repeatedly until 0 to drain fully.
func (*Server) DeleteJob ¶
DeleteJob removes a job completely from Redis. Processing jobs cannot be deleted (return error). Uses a Lua script for atomicity to prevent TOCTOU races where a job transitions to "processing" between the status check and deletion.
func (*Server) DisableCron ¶
DisableCron disables a cron entry.
func (*Server) EmptyQueue ¶
EmptyQueue removes pending (ready) jobs from a queue, up to maxBulkOps at a time. Processing, scheduled, and DLQ jobs are unaffected. Returns the number of jobs removed. Call repeatedly until 0 to drain fully.
func (*Server) EnableCron ¶
EnableCron enables a cron entry.
func (*Server) Handle ¶
func (s *Server) Handle(jobType string, handler Handler, opts ...HandleOption) error
Handle registers a handler for a job type.
func (*Server) IsQueuePaused ¶
IsQueuePaused returns whether the given queue is paused.
func (*Server) PauseQueue ¶
PauseQueue pauses a queue. Workers will stop dequeuing from the paused queue but in-flight jobs continue to completion. New jobs can still be enqueued.
func (*Server) Pool ¶
func (s *Server) Pool(cfg PoolConfig) error
Pool registers an explicit worker pool configuration (Layer 3). This allows grouping multiple job types into a single pool with shared concurrency, custom queues, dequeue strategy, and retry policy.
Must be called before Start(). Returns an error if the pool name is already registered or if the configuration is invalid.
func (*Server) ResumeQueue ¶
ResumeQueue resumes a paused queue. Workers will start dequeuing again.
func (*Server) RetryAllDLQ ¶
RetryAllDLQ retries jobs in the dead letter queue, up to maxBulkOps at a time. Returns the number of jobs retried. Call repeatedly until 0 to drain fully.
func (*Server) RetryJob ¶
RetryJob moves a dead-letter job back to the ready queue. Only jobs with status "dead_letter" can be retried.
func (*Server) Schedule ¶
Schedule registers a cron entry for recurring job scheduling. Must be called before Start(). The entry's cron expression is parsed and validated. Duplicate IDs are rejected with ErrDuplicateCronEntry.
func (*Server) Start ¶
Start begins processing jobs. It blocks until the server is stopped via signal (SIGTERM/SIGINT) or the context is cancelled. The server is single-use: after Stop or Start returns, create a new Server instance instead of calling Start again.
func (*Server) TriggerCron ¶
TriggerCron manually triggers a cron entry, enqueuing a job immediately. Returns the new job ID. Bypasses overlap checks and cron locks (explicit user intent).
func (*Server) Use ¶
func (s *Server) Use(mws ...MiddlewareFunc) error
Use registers middleware that wraps all handlers. Middleware is applied in the order registered: Use(a, b) executes as a → b → handler.
Must be called before Start(). Middleware is applied to handlers once at startup, so Use() and Handle() can be called in any order.
type ServerOption ¶
type ServerOption func(*serverConfig)
ServerOption configures a Server.
func WithAPI ¶
func WithAPI(enabled bool, addr string) ServerOption
WithAPI enables the HTTP monitoring API on the given address.
func WithAPIKeys ¶
func WithAPIKeys(keys []AuthAPIKey) ServerOption
WithAPIKeys sets the API keys for programmatic access.
func WithAuthEnabled ¶
func WithAuthEnabled(enabled bool) ServerOption
WithAuthEnabled enables authentication for the monitoring API.
func WithAuthUsers ¶
func WithAuthUsers(users []AuthUser) ServerOption
WithAuthUsers sets the users for the monitoring API.
func WithDashboard ¶
func WithDashboard(enabled bool) ServerOption
WithDashboard enables the web dashboard.
func WithDashboardDir ¶
func WithDashboardDir(dir string) ServerOption
WithDashboardDir sets a custom directory for serving dashboard assets.
func WithDashboardPathPrefix ¶
func WithDashboardPathPrefix(prefix string) ServerOption
WithDashboardPathPrefix sets the URL prefix for the dashboard.
func WithDefaultTimezone ¶
func WithDefaultTimezone(tz string) ServerOption
WithDefaultTimezone sets the fallback timezone (IANA name) for cron entries that don't specify their own timezone. Defaults to UTC.
func WithGlobalTimeout ¶
func WithGlobalTimeout(d time.Duration) ServerOption
WithGlobalTimeout sets the global default job timeout. Must be > 0; the global timeout cannot be disabled.
func WithGracePeriod ¶
func WithGracePeriod(d time.Duration) ServerOption
WithGracePeriod sets the default grace period after context cancellation.
func WithLogLevel ¶
func WithLogLevel(level string) ServerOption
WithLogLevel sets the log level for the auto-created logger. Only takes effect if no WithLogger() is provided. Valid values: "debug", "info", "warn", "error".
func WithLogger ¶
func WithLogger(l *slog.Logger) ServerOption
WithLogger sets a custom slog.Logger.
func WithSchedulerEnabled ¶
func WithSchedulerEnabled(enabled bool) ServerOption
WithSchedulerEnabled controls whether the scheduler goroutine is started. Defaults to true. Set to false for worker-only instances.
func WithSchedulerPollInterval ¶
func WithSchedulerPollInterval(d time.Duration) ServerOption
WithSchedulerPollInterval sets the poll interval for the scheduler engine. Defaults to 1s.
func WithServerRedis ¶
func WithServerRedis(addr string) ServerOption
WithServerRedis sets the Redis address for the server.
func WithServerRedisClient ¶
func WithServerRedisClient(rdb *redis.Client) ServerOption
WithServerRedisClient injects a pre-configured *redis.Client for the server. This enables Redis Sentinel or any custom go-redis setup. Connection options (WithServerRedis, etc.) are ignored when this is used; only the key prefix (WithPrefix via WithServerRedisOpts) still applies.
func WithServerRedisOpts ¶
func WithServerRedisOpts(opts ...RedisOption) ServerOption
WithServerRedisOpts sets Redis options for the server.
func WithShutdownTimeout ¶
func WithShutdownTimeout(d time.Duration) ServerOption
WithShutdownTimeout sets the maximum wait time during graceful shutdown.
type SuccessCallbackFunc ¶
SuccessCallbackFunc is a callback invoked after a job completes successfully.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
_examples
|
|
|
01-email-service
command
Example: Email Service
|
Example: Email Service |
|
02-image-pipeline
command
Example: Image Processing Pipeline
|
Example: Image Processing Pipeline |
|
03-scheduled-reports
command
Example: Scheduled Reports
|
Example: Scheduled Reports |
|
04-order-fulfillment
command
Example: Order Fulfillment
|
Example: Order Fulfillment |
|
05-multi-tenant
command
Example: Multi-tenant Worker Pools
|
Example: Multi-tenant Worker Pools |
|
06-config-driven
command
Example: Config-driven Setup
|
Example: Config-driven Setup |
|
07-webhook-delivery
command
Example: Idempotent Webhook Delivery
|
Example: Idempotent Webhook Delivery |
|
08-monitoring
command
Example: Monitoring Setup
|
Example: Monitoring Setup |
|
09-dev-server
command
Example: Dev Server (Programmatic)
|
Example: Dev Server (Programmatic) |
|
09-dev-server/config
command
Example: Dev Server (YAML Config Variant)
|
Example: Dev Server (YAML Config Variant) |
|
10-advanced-features
command
Example: Advanced Features (Phase 8)
|
Example: Advanced Features (Phase 8) |
|
11-custom-dashboard
command
Example: Custom Dashboard
|
Example: Custom Dashboard |
|
cmd
|
|
|
gqm
command
Binary gqm provides CLI utilities for the GQM queue manager.
|
Binary gqm provides CLI utilities for the GQM queue manager. |
|
Package monitor provides the HTTP monitoring API and dashboard for GQM.
|
Package monitor provides the HTTP monitoring API and dashboard for GQM. |
|
tui
module
|





