groupmq

package module
v0.1.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2025 License: MIT Imports: 16 Imported by: 0

README

Go-GroupMQ

English | 简体中文

A fast, reliable Redis-backed per-group FIFO queue for Go with guaranteed job ordering and parallel processing across groups.

Ported from GroupMQ (TypeScript/Node.js) - a BullMQ-inspired queue library optimized for per-group sequential processing.

Features

  • Per-group FIFO ordering - Jobs within the same group process in strict order, perfect for user workflows, data pipelines, and sequential operations
  • Parallel processing across groups - Process multiple groups simultaneously while maintaining order within each group
  • High performance - Optimized with goroutines and atomic Lua scripts for Redis
  • Built-in ordering strategies - Handle out-of-order job arrivals with OrderingNone, OrderingScheduler, or OrderingInMemory methods
  • Automatic recovery - Stalled job detection and connection error handling
  • Production ready - Atomic operations, graceful shutdown, and comprehensive logging
  • Zero polling - Efficient blocking operations prevent wasteful Redis calls
  • CLI monitor tool - Real-time queue monitoring and debugging

Installation

go get github.com/bootcs-dev/go-groupmq

Quick Start

package main

import (
	"context"
	"fmt"
	"log"

	groupmq "github.com/bootcs-dev/go-groupmq"
	"github.com/redis/go-redis/v9"
)

func main() {
	ctx := context.Background()

	// Connect to Redis
	rdb := redis.NewClient(&redis.Options{
		Addr: "localhost:6379",
	})
	defer rdb.Close()

	// Create queue
	queue := groupmq.NewQueue(rdb, groupmq.QueueOptions{
		Namespace:          "orders",      // Will be prefixed with 'groupmq:'
		JobTimeoutMs:       30000,         // How long before job times out
		DefaultMaxAttempts: 3,             // Default max retry attempts
		Logger:             groupmq.NewDefaultLogger(true, "orders"),
	})

	// Add a job
	jobID, err := queue.Add(ctx, groupmq.AddOptions{
		GroupId: "user:42",
		Data: map[string]interface{}{
			"type":   "charge",
			"amount": 999,
		},
		OrderMs:     0,  // Use current timestamp, or provide event timestamp
		MaxAttempts: 5,
	})
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("Added job: %s\n", jobID)

	// Create worker
	worker := groupmq.NewWorker(queue, func(ctx context.Context, job *groupmq.ReservedJob) (interface{}, error) {
		fmt.Printf("Processing: %+v\n", job.Data)
		// Do work here...
		return "done", nil
	})

	// Run worker (blocks)
	worker.Run(ctx)
}

CLI Monitor Tool

Go-GroupMQ includes a powerful CLI tool for monitoring and debugging your queues in real-time:

Installation
# Install globally
go install github.com/bootcs-dev/go-groupmq/cmd/monitor@latest

# Or run directly
go run github.com/bootcs-dev/go-groupmq/cmd/monitor@latest -n myqueue
Usage

Monitor mode - Real-time queue statistics:

monitor -n orders
monitor -n orders -r redis://localhost:6379 -i 2000

List completed jobs:

monitor -n orders --list-completed --limit 10

List failed jobs:

monitor -n orders --list-failed --limit 20

CLI Options:

  • -n, --namespace - Queue namespace (required)
  • -r, --redis-url - Redis URL (default: redis://127.0.0.1:6379)
  • -i, --interval - Update interval in ms (default: 1000)
  • --list-completed - List recent completed jobs
  • --list-failed - List recent failed jobs
  • --limit - Number of jobs to list (default: 20)
  • -h, --help - Show help

Queue Options

type QueueOptions struct {
	Namespace                  string              // Unique queue name (required)
	Logger                     LoggerInterface     // Logger instance (optional)
	JobTimeoutMs               int                 // Job processing timeout (default: 30000ms)
	DefaultMaxAttempts         int                 // Default max retry attempts (default: 3)
	ReserveScanLimit           int                 // Groups to scan when reserving (default: 20)
	KeepCompleted              int                 // Number of completed jobs to retain (default: 0)
	KeepFailed                 int                 // Number of failed jobs to retain (default: 0)
	SchedulerLockTtlMs         int                 // Scheduler lock TTL (default: 1500ms)
	OrderingMethod             OrderingMethod      // Ordering strategy (default: OrderingNone)
	OrderingWindowMs           int                 // Time window for ordering (required for non-none methods)
	OrderingMaxWaitMultiplier  int                 // Max grace period multiplier for in-memory (default: 3)
	OrderingGracePeriodDecay   float64             // Grace period decay factor for in-memory (default: 1.0)
	OrderingMaxBatchSize       int                 // Max jobs to collect in batch for in-memory (default: 10)
}

type OrderingMethod string

const (
	OrderingNone      OrderingMethod = "none"       // No ordering guarantees (fastest)
	OrderingScheduler OrderingMethod = "scheduler"  // Redis buffering for large windows (≥1000ms)
	OrderingInMemory  OrderingMethod = "in-memory"  // Worker collection for small windows (50-500ms)
)

Worker Configuration

// Create worker with handler function
worker := groupmq.NewWorker(queue, handlerFunc)

// Configure worker options
worker.Concurrency = 8           // Process up to 8 jobs in parallel (default: 1)
worker.HeartbeatMs = 5000        // Heartbeat interval (default: max(1000, jobTimeoutMs/3))
worker.MaxStalledCount = 1       // Fail after N stalls (default: 1)
worker.StalledGracePeriod = 0    // Grace period before considering stalled (default: 0)
worker.CleanupIntervalMs = 60000 // Cleanup frequency (default: 60000ms)

// Run worker
go worker.Run(ctx)

// Stop worker gracefully
worker.Stop()

Handler function signature:

type HandlerFunc func(ctx context.Context, job *ReservedJob) (interface{}, error)

Adding Jobs

Basic job
jobID, err := queue.Add(ctx, groupmq.AddOptions{
	GroupId: "user:123",
	Data: map[string]interface{}{
		"action": "send-email",
		"email":  "[email protected]",
	},
})
Job with custom options
jobID, err := queue.Add(ctx, groupmq.AddOptions{
	GroupId:     "user:123",
	Data:        myData,
	OrderMs:     event.CreatedAtMs,  // Timestamp for ordering
	MaxAttempts: 5,                  // Override default max attempts
	JobId:       "custom-id",        // Custom job ID (optional)
	Delay:       3600000,            // Delay in ms (1 hour)
})
Delayed job (run at specific time)
runAt := time.Now().Add(24 * time.Hour)

jobID, err := queue.Add(ctx, groupmq.AddOptions{
	GroupId: "user:123",
	Data:    myData,
	RunAt:   runAt.UnixMilli(),
})
Repeating jobs (cron/interval)

Every N milliseconds:

jobID, err := queue.Add(ctx, groupmq.AddOptions{
	GroupId: "reports",
	Data:    map[string]interface{}{"type": "daily-summary"},
	Repeat: &groupmq.RepeatOptions{
		Every: 5000, // Run every 5 seconds
	},
})

Cron pattern:

jobID, err := queue.Add(ctx, groupmq.AddOptions{
	GroupId: "emails",
	Data:    map[string]interface{}{"type": "weekly-digest"},
	Repeat: &groupmq.RepeatOptions{
		Pattern: "0 9 * * 1-5", // 09:00 Mon-Fri
	},
})

Remove repeating job:

err := queue.RemoveRepeatingJob(ctx, "reports", &groupmq.RepeatOptions{Every: 5000})

Worker Concurrency

Workers support configurable concurrency to process multiple jobs in parallel from different groups:

worker := groupmq.NewWorker(queue, handlerFunc)
worker.Concurrency = 8 // Process up to 8 jobs simultaneously

// Jobs from different groups run in parallel
// Jobs from the same group still run sequentially
go worker.Run(ctx)

Benefits:

  • Higher throughput for multi-group workloads
  • Efficient resource utilization with goroutines
  • Still maintains per-group FIFO ordering

Considerations:

  • Each job runs in its own goroutine
  • Set concurrency based on job duration and system resources
  • Monitor Redis connection pool (go-redis default: 10 connections per client)

Logging

Both Queue and Worker support optional logging:

// Enable default logger
queue := groupmq.NewQueue(rdb, groupmq.QueueOptions{
	Namespace: "orders",
	Logger:    groupmq.NewDefaultLogger(true, "orders"),
})

Custom logger:

type CustomLogger struct{}

func (l *CustomLogger) Debug(msg string, args ...interface{}) { /* ... */ }
func (l *CustomLogger) Info(msg string, args ...interface{})  { /* ... */ }
func (l *CustomLogger) Warn(msg string, args ...interface{})  { /* ... */ }
func (l *CustomLogger) Error(msg string, args ...interface{}) { /* ... */ }

queue := groupmq.NewQueue(rdb, groupmq.QueueOptions{
	Namespace: "orders",
	Logger:    &CustomLogger{},
})

Queue Methods

Job counts and status
// Get all counts at once
counts, err := queue.GetJobCounts(ctx)
// counts: map[string]int64{
//   "active": 5, "waiting": 12, "delayed": 3,
//   "completed": 100, "failed": 2, "total": 122
// }

// Individual counts
activeCount, err := queue.GetActiveCount(ctx)
waitingCount, err := queue.GetWaitingCount(ctx)
delayedCount, err := queue.GetDelayedCount(ctx)
completedCount, err := queue.GetCompletedCount(ctx)
failedCount, err := queue.GetFailedCount(ctx)
Get jobs by status
// Get job IDs
activeJobIDs, err := queue.GetActiveJobs(ctx)
waitingJobIDs, err := queue.GetWaitingJobs(ctx)
delayedJobIDs, err := queue.GetDelayedJobs(ctx)

// Get Job instances
completedJobs, err := queue.GetCompletedJobs(ctx, 20) // returns []*Job
failedJobs, err := queue.GetFailedJobs(ctx, 20)
Group information
// Get all unique groups
groups, err := queue.GetUniqueGroups(ctx) // []string{"user:123", "order:456"}
groupCount, err := queue.GetUniqueGroupsCount(ctx)

// Get jobs in a specific group
jobCount, err := queue.GetGroupJobCount(ctx, "user:123")
Job manipulation
// Get specific job
job, err := queue.GetJob(ctx, jobID)

// Remove job
err = queue.Remove(ctx, jobID)

// Retry failed job
err = queue.Retry(ctx, jobID)

// Promote delayed job to waiting
err = queue.Promote(ctx, jobID)

// Change delay
err = queue.ChangeDelay(ctx, jobID, newDelayMs)

// Update job data
err = queue.UpdateData(ctx, jobID, newData)
Scheduler and maintenance
// Manual scheduler run
err = queue.RunSchedulerOnce(ctx)

// Promote delayed jobs
count, err := queue.PromoteDelayedJobs(ctx)

// Recover stuck groups
count, err := queue.RecoverDelayedGroups(ctx)

// Wait for queue to be empty
isEmpty, err := queue.WaitForEmpty(ctx, timeoutMs)

// Cleanup
err = queue.Close()

Job Instance Methods

job, err := queue.GetJob(ctx, jobID)

// Manipulate the job
err = job.Remove(ctx)
err = job.Retry(ctx)
err = job.Promote(ctx)
err = job.ChangeDelay(ctx, newDelayMs)
err = job.UpdateData(ctx, newData)

// Get job state
state, err := job.GetState(ctx) // "active" | "waiting" | "delayed" | "completed" | "failed"

// Serialize job
jsonData, err := json.Marshal(job)

Graceful Shutdown

// Stop worker gracefully - waits for current jobs to finish
worker.Stop()

// Wait for queue to be empty
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
isEmpty, err := queue.WaitForEmpty(ctx, 30000)

// Recover groups that might be stuck
recoveredCount, err := queue.RecoverDelayedGroups(ctx)

Architecture

Redis Data Structures

Go-GroupMQ uses these Redis keys (all prefixed with groupmq:{namespace}:):

  • :g:{groupId} - Sorted set of job IDs in a group, ordered by score
  • :ready - Sorted set of group IDs with available jobs
  • :job:{jobId} - Hash containing job data
  • :lock:{groupId} - String with job ID that owns the group lock (with TTL)
  • :processing - Sorted set of active job IDs
  • :processing:{jobId} - Hash with processing metadata
  • :delayed - Sorted set of delayed jobs
  • :completed - Sorted set of completed job IDs
  • :failed - Sorted set of failed job IDs
  • :repeats - Hash of repeating job definitions
Job Lifecycle
  1. Waiting - Job is in :g:{groupId} and group is in :ready
  2. Delayed - Job is in :delayed (scheduled for future)
  3. Active - Job is in :processing and group is locked
  4. Completed - Job is in :completed (retention)
  5. Failed - Job exceeded maxAttempts, moved to :failed
Atomic Operations

All critical operations use Lua scripts for atomicity:

  • enqueue.lua - Adds job to group queue and ready set
  • reserve.lua - Finds ready group, pops head job, locks group
  • reserve-batch.lua - Reserves one job from multiple groups
  • complete.lua - Marks job complete, unlocks group
  • complete-and-reserve-next-with-metadata.lua - Atomic completion + reservation
  • retry.lua - Increments attempts, re-enqueues with backoff
  • remove.lua - Removes job from all data structures
Ordering and Scoring

Jobs are ordered using a composite score:

score = (orderMs - baseEpoch) * 1000 + seq
  • orderMs - User-provided timestamp for event ordering
  • baseEpoch - Fixed epoch (1704067200000) to keep scores manageable
  • seq - Auto-incrementing sequence for tiebreaking

This ensures:

  • Jobs with earlier orderMs process first
  • Jobs with same orderMs process in submission order
  • Stable, sortable scores
  • Daily sequence reset prevents overflow

Performance

Go-GroupMQ is designed for high performance:

Optimizations:

  • Goroutines - Lightweight concurrency for parallel job processing
  • Batch Operations - ReserveBatch reduces Redis round-trips
  • Blocking Operations - Efficient Redis blocking prevents polling
  • Lua Scripts - All critical paths are atomic
  • Atomic Completion - Complete + reserve next in single operation
  • Zero-copy - Minimal data marshaling with Redis hashes

Benchmarks (MacBook M2, Redis 7):

See benchmark/README.md for detailed benchmark results comparing Go-GroupMQ with TypeScript GroupMQ.

Inspiration and Attribution

Go-GroupMQ is a Go port of GroupMQ, which is inspired by BullMQ. We've adapted the TypeScript/Node.js implementation to Go while maintaining the core concepts:

  • Per-group FIFO ordering
  • Parallel processing across groups
  • Atomic Lua script operations
  • Flexible ordering strategies

Key differences from TypeScript GroupMQ:

  • Native Go concurrency with goroutines
  • Strongly typed API with Go structs
  • CLI tool compiled to standalone binary
  • Similar performance with lower memory footprint

Testing

Requires a local Redis at 127.0.0.1:6379.

# Run all tests
go test -v ./...

# Run with coverage
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out

# Run benchmarks
cd benchmark
go run main.go -type=cpu -workers=8 -jobs=10000

Optional Docker Redis:

docker run --rm -p 6379:6379 redis:7

Contributing

Contributions are welcome! Please:

  1. Run tests before and after changes
  2. Add tests for new features
  3. Update documentation
  4. Run benchmarks for performance-sensitive changes

License

MIT License - see LICENSE file for details.

Documentation

Overview

Package groupmq provides a Redis-backed per-group FIFO queue implementation. It is a Go port of the TypeScript GroupMQ library.

Index

Constants

View Source
const (
	BackoffTypeExponential = "exponential"
	BackoffTypeLinear      = "linear"
	BackoffTypeFixed       = "fixed"
)

Backoff type constants

Variables

View Source
var BaseEpoch = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()

BaseEpoch is the reference timestamp for score calculations (2020-01-01 UTC)

Functions

func GetWorkersStatusWithAllJobs

func GetWorkersStatusWithAllJobs(workers []*Worker) []struct {
	Index        int
	IsProcessing bool
	Jobs         []WorkerJobInfo
}

GetWorkersStatusWithAllJobs returns detailed status with all jobs per worker This is an enhanced version that exposes Go's concurrent job processing capability

func IsValidStatus

func IsValidStatus(s string) bool

IsValidStatus checks if a status string is a valid Status constant

func WaitForAllWorkersIdle

func WaitForAllWorkersIdle(ctx context.Context, workers []*Worker, timeoutMs int64) bool

WaitForAllWorkersIdle waits for all workers to become idle Returns true if all workers became idle before timeout, false otherwise

func WaitForQueueToEmpty

func WaitForQueueToEmpty(ctx context.Context, queue *Queue, timeoutMs int64) (bool, error)

WaitForQueueToEmpty waits for a queue to become empty This is a convenience wrapper around Queue.WaitForEmpty Aligns with TypeScript's waitForQueueToEmpty() helper

func WaitForWorkerIdle

func WaitForWorkerIdle(ctx context.Context, worker *Worker, timeoutMs int64) bool

WaitForWorkerIdle waits for a worker to become idle (no jobs processing) Returns true if worker became idle before timeout, false otherwise

Types

type AddOptions

type AddOptions struct {
	// GroupId for FIFO ordering - jobs in same group are processed sequentially
	GroupId string

	// Job data payload (will be JSON serialized)
	Data any

	// Custom job name (default: "groupmq")
	Name string

	// Timestamp in milliseconds for ordering within the group
	// Jobs with lower orderMs are processed first (default: current time)
	OrderMs int64

	// Delay in milliseconds before the job becomes visible
	Delay int64

	// Absolute timestamp when the job should run (overrides Delay)
	RunAt int64

	// Custom job ID for idempotence - if provided and job exists, returns existing job
	JobId string

	// Maximum retry attempts for this job (overrides queue default)
	MaxAttempts int

	// Repeat options for recurring jobs
	Repeat *RepeatOptions
}

AddOptions configures how a job is added to the queue

type BackoffOptions

type BackoffOptions struct {
	// Type of backoff: "exponential", "linear", "fixed"
	Type string

	// Initial delay in milliseconds (default: 1000)
	Delay int64

	// Maximum delay in milliseconds (default: 30000)
	MaxDelay int64

	// Multiplier for exponential backoff (default: 2)
	Factor float64

	// Whether to add random jitter to delays (default: true)
	Jitter bool
}

BackoffOptions configures retry backoff behavior

func DefaultBackoffOptions

func DefaultBackoffOptions() BackoffOptions

DefaultBackoffOptions returns the default backoff options

type CompletedJobInfo

type CompletedJobInfo struct {
	ID          string
	GroupId     string
	Data        json.RawMessage
	Returnvalue json.RawMessage
	ProcessedOn int64
	FinishedOn  int64
	Attempts    int
	MaxAttempts int
}

CompletedJobInfo represents a completed job with metadata

type DefaultLogger

type DefaultLogger struct {
	// contains filtered or unexported fields
}

DefaultLogger is a simple logger implementation with optional enable/disable This aligns with TypeScript groupmq's Logger class

func NewDefaultLogger

func NewDefaultLogger(enabled bool, name string) *DefaultLogger

NewDefaultLogger creates a new default logger enabled: whether logging is enabled name: logger name (e.g., "Queue", "Worker")

func (*DefaultLogger) Debug

func (l *DefaultLogger) Debug(msg string)

Debug logs a debug message

func (*DefaultLogger) Error

func (l *DefaultLogger) Error(msg string)

Error logs an error message with emoji prefix Error logs are always printed regardless of enabled flag

func (*DefaultLogger) Info

func (l *DefaultLogger) Info(msg string)

Info logs an info message

func (*DefaultLogger) Warn

func (l *DefaultLogger) Warn(msg string)

Warn logs a warning message with emoji prefix

type EnqueueResult

type EnqueueResult struct {
	JobId     string
	Status    string // "added", "duplicate", "delayed", "staged"
	Duplicate bool
}

EnqueueResult represents the result of adding a job

type ErrorInfo

type ErrorInfo struct {
	Message string `json:"message"`
	Name    string `json:"name"`
	Stack   string `json:"stack"`
}

ErrorInfo represents error information for failed jobs

type FailedJobInfo

type FailedJobInfo struct {
	ID           string
	GroupId      string
	Data         json.RawMessage
	FailedReason string
	Stacktrace   string
	ProcessedOn  int64
	FinishedOn   int64
	Attempts     int
	MaxAttempts  int
}

FailedJobInfo represents a failed job with metadata

type Job

type Job struct {

	// Core fields
	ID          string          `json:"id"`
	Name        string          `json:"name"`
	GroupId     string          `json:"groupId"`
	Data        json.RawMessage `json:"data"`
	Attempts    int             `json:"attemptsMade"`
	MaxAttempts int             `json:"maxAttempts"`

	// Timing fields
	Timestamp   int64 `json:"timestamp"`   // Creation timestamp in ms
	OrderMs     int64 `json:"orderMs"`     // Ordering timestamp in ms
	DelayUntil  int64 `json:"delayUntil"`  // Delay until timestamp in ms
	ProcessedOn int64 `json:"processedOn"` // Processing start timestamp in ms
	FinishedOn  int64 `json:"finishedOn"`  // Completion timestamp in ms

	// Status fields
	Status       Status `json:"status"`
	FailedReason string `json:"failedReason"`
	Stacktrace   string `json:"stacktrace"`
	Returnvalue  any    `json:"returnvalue"`
	// contains filtered or unexported fields
}

Job represents a job in the queue with full metadata

func FromRawHash

func FromRawHash(queue *Queue, id string, raw map[string]string, knownStatus Status) *Job

FromRawHash creates a Job from raw Redis hash data with optional known status This avoids extra Redis lookups when status is already known Aligns with TypeScript's Job.fromRawHash()

func FromReserved

func FromReserved(queue *Queue, reserved *ReservedJob, meta *JobResultMeta) *Job

FromReserved creates a Job from a ReservedJob

func FromStore

func FromStore(ctx context.Context, queue *Queue, id string) (*Job, error)

FromStore loads a Job from Redis storage by ID This performs additional Redis lookups to determine the current status Aligns with TypeScript's Job.fromStore()

func (*Job) ChangeDelay

func (j *Job) ChangeDelay(ctx context.Context, newDelayMs int64) (bool, error)

ChangeDelay changes the delay of a delayed job

func (*Job) GetData

func (j *Job) GetData(v interface{}) error

GetData unmarshals the job data into the provided interface

func (*Job) GetState

func (j *Job) GetState() Status

GetState returns the current status of the job

func (*Job) Promote

func (j *Job) Promote(ctx context.Context) error

Promote moves a delayed job to the waiting state (processes immediately)

func (*Job) Remove

func (j *Job) Remove(ctx context.Context) error

Remove removes the job from the queue

func (*Job) Retry

func (j *Job) Retry(ctx context.Context) error

Retry retries a failed job

func (*Job) ToJSON

func (j *Job) ToJSON() map[string]interface{}

ToJSON returns the job as a JSON-serializable map

func (*Job) UnmarshalData

func (j *Job) UnmarshalData(v interface{}) error

UnmarshalData is an alias for GetData

func (*Job) Update

func (j *Job) Update(ctx context.Context, data interface{}) error

Update is an alias for UpdateData (TypeScript compatibility)

func (*Job) UpdateData

func (j *Job) UpdateData(ctx context.Context, data interface{}) error

UpdateData updates the job's data payload

type JobCounts

type JobCounts struct {
	Active          int64 `json:"active"`
	Waiting         int64 `json:"waiting"`
	Delayed         int64 `json:"delayed"`
	Completed       int64 `json:"completed"`
	Failed          int64 `json:"failed"`
	Paused          int64 `json:"paused"`
	WaitingChildren int64 `json:"waiting-children"`
	Prioritized     int64 `json:"prioritized"`
}

JobCounts represents counts of jobs by status

type JobEventHandler

type JobEventHandler interface {
	OnCompleted(job *Job, result any)
	OnFailed(job *Job, err error)
	OnStalled(job *Job)
	OnProgress(job *Job, progress int)
}

JobEventHandler handles job lifecycle events

type JobMeta

type JobMeta struct {
	ProcessedOn int64
	FinishedOn  int64
	Attempts    int
	MaxAttempts int
}

JobMeta contains job metadata for completion/failure recording

type JobProgress

type JobProgress struct {
	Job              *ReservedJob
	ProcessingTimeMs int64
}

JobProgress represents a job being processed

type JobResult

type JobResult struct {
	JobId       string
	GroupId     string
	Status      Status
	Returnvalue any
	FinishedOn  int64
	Error       error
}

JobResult represents the result of a completed job

type JobResultMeta

type JobResultMeta struct {
	ProcessedOn  int64
	FinishedOn   int64
	FailedReason string
	Stacktrace   string
	Returnvalue  any
	Status       Status
	DelayMs      int64
}

JobResultMeta contains optional metadata for job results

type Logger

type Logger interface {
	Debug(msg string)
	Info(msg string)
	Warn(msg string)
	Error(msg string)
}

Logger is a generic logger interface that works with different logger implementations This aligns with TypeScript groupmq's LoggerInterface

type Metrics

type Metrics struct {
	WaitingCount    int64
	ActiveCount     int64
	DelayedCount    int64
	CompletedCount  int64
	FailedCount     int64
	GroupCount      int64
	ProcessingCount int64
}

Metrics provides queue statistics

type NoOpLogger

type NoOpLogger struct{}

NoOpLogger is a logger that does nothing Useful for testing or when logging is explicitly disabled

func NewNoOpLogger

func NewNoOpLogger() *NoOpLogger

NewNoOpLogger creates a new no-op logger

func (*NoOpLogger) Debug

func (l *NoOpLogger) Debug(msg string)

Debug does nothing

func (*NoOpLogger) Error

func (l *NoOpLogger) Error(msg string)

Error does nothing

func (*NoOpLogger) Info

func (l *NoOpLogger) Info(msg string)

Info does nothing

func (*NoOpLogger) Warn

func (l *NoOpLogger) Warn(msg string)

Warn does nothing

type ProcessorFunc

type ProcessorFunc func(ctx context.Context, job *ReservedJob) (interface{}, error)

ProcessorFunc is the function signature for job handlers

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue represents a GroupMQ queue with per-group FIFO ordering

func NewQueue

func NewQueue(client redis.Cmdable, opts QueueOptions) *Queue

NewQueue creates a new Queue instance

func (*Queue) AcquireSchedulerLock

func (q *Queue) AcquireSchedulerLock(ctx context.Context, ttlMs int64) (bool, error)

AcquireSchedulerLock acquires the scheduler lock

func (*Queue) Add

func (q *Queue) Add(ctx context.Context, opts AddOptions) (*Job, error)

Add adds a new job to the queue

func (*Queue) AddRepeatingJob

func (q *Queue) AddRepeatingJob(ctx context.Context, opts AddOptions) (*Job, error)

AddRepeatingJob adds a repeating job (cron or interval-based)

func (*Queue) ChangeDelay

func (q *Queue) ChangeDelay(ctx context.Context, jobId string, newDelayMs int64) (bool, error)

ChangeDelay changes the delay of a specific job

func (*Queue) CheckStalled

func (q *Queue) CheckStalled(ctx context.Context, gracePeriodMs, maxStalledCount int64) ([]StalledJobResult, error)

CheckStalled checks for stalled jobs and recovers them

func (*Queue) Clean

func (q *Queue) Clean(ctx context.Context, graceTimeMs int64, limit int, status Status) (int64, error)

Clean removes old completed or failed jobs graceTimeMs: remove jobs with finishedOn <= now - graceTimeMs limit: max number of jobs to clean in one call status: "completed", "failed", or "delayed"

func (*Queue) Cleanup

func (q *Queue) Cleanup(ctx context.Context) (int64, error)

Cleanup cleans up expired jobs and stale data with distributed lock

func (*Queue) CleanupPoisonedGroup

func (q *Queue) CleanupPoisonedGroup(ctx context.Context, groupId string) (string, error)

CleanupPoisonedGroup removes problematic groups from ready queue

func (*Queue) Close

func (q *Queue) Close(ctx context.Context) error

Close closes the queue and flushes any pending batches

func (*Queue) Complete

func (q *Queue) Complete(ctx context.Context, jobId, groupId string) error

Complete marks a job as completed

func (*Queue) CompleteAndReserveNext

func (q *Queue) CompleteAndReserveNext(ctx context.Context, completedJobId, groupId string, result interface{}, meta JobMeta) (*ReservedJob, error)

CompleteAndReserveNext atomically completes a job and reserves the next one from the same group

func (*Queue) CompleteWithMetadata

func (q *Queue) CompleteWithMetadata(ctx context.Context, job *ReservedJob, result interface{}, meta JobMeta) error

CompleteWithMetadata completes a job and records metadata atomically

func (*Queue) DeadLetter

func (q *Queue) DeadLetter(ctx context.Context, jobId, groupId string) error

DeadLetter moves a job to the dead letter queue

func (*Queue) GetActiveCount

func (q *Queue) GetActiveCount(ctx context.Context) (int64, error)

GetActiveCount returns the number of jobs currently being processed

func (*Queue) GetActiveJobs

func (q *Queue) GetActiveJobs(ctx context.Context) ([]string, error)

GetActiveJobs returns list of active job IDs

func (*Queue) GetCompleted

func (q *Queue) GetCompleted(ctx context.Context, limit int) ([]CompletedJobInfo, error)

GetCompleted returns completed jobs with details

func (*Queue) GetCompletedCount

func (q *Queue) GetCompletedCount(ctx context.Context) (int64, error)

GetCompletedCount returns the number of completed jobs

func (*Queue) GetCompletedJobs

func (q *Queue) GetCompletedJobs(ctx context.Context, limit int) ([]*Job, error)

GetCompletedJobs returns completed jobs as Job entities

func (*Queue) GetDelayedCount

func (q *Queue) GetDelayedCount(ctx context.Context) (int64, error)

GetDelayedCount returns the number of delayed jobs

func (*Queue) GetDelayedJobs

func (q *Queue) GetDelayedJobs(ctx context.Context) ([]string, error)

GetDelayedJobs returns list of delayed job IDs

func (*Queue) GetFailed

func (q *Queue) GetFailed(ctx context.Context, limit int) ([]FailedJobInfo, error)

GetFailed returns failed jobs with details

func (*Queue) GetFailedCount

func (q *Queue) GetFailedCount(ctx context.Context) (int64, error)

GetFailedCount returns the number of failed jobs

func (*Queue) GetFailedJobs

func (q *Queue) GetFailedJobs(ctx context.Context, limit int) ([]*Job, error)

GetFailedJobs returns failed jobs as Job entities

func (*Queue) GetGroupJobCount

func (q *Queue) GetGroupJobCount(ctx context.Context, groupId string) (int64, error)

GetGroupJobCount returns the number of jobs in a specific group

func (*Queue) GetJob

func (q *Queue) GetJob(ctx context.Context, jobId string) (*Job, error)

GetJob retrieves a job by ID

func (*Queue) GetJobCounts

func (q *Queue) GetJobCounts(ctx context.Context) (*JobCounts, error)

GetJobCounts returns counts structured like BullBoard expects

func (*Queue) GetJobsByStatus

func (q *Queue) GetJobsByStatus(ctx context.Context, statuses []Status, start, end int) ([]*Job, error)

GetJobsByStatus fetches jobs by statuses (like BullMQ's Queue.getJobs API)

func (*Queue) GetUniqueGroups

func (q *Queue) GetUniqueGroups(ctx context.Context) ([]string, error)

GetUniqueGroups returns list of unique group IDs that have jobs

func (*Queue) GetUniqueGroupsCount

func (q *Queue) GetUniqueGroupsCount(ctx context.Context) (int64, error)

GetUniqueGroupsCount returns the number of unique groups with jobs

func (*Queue) GetWaitingCount

func (q *Queue) GetWaitingCount(ctx context.Context) (int64, error)

GetWaitingCount returns the number of jobs waiting to be processed

func (*Queue) GetWaitingJobs

func (q *Queue) GetWaitingJobs(ctx context.Context) ([]string, error)

GetWaitingJobs returns list of waiting job IDs

func (*Queue) Heartbeat

func (q *Queue) Heartbeat(ctx context.Context, jobId, groupId string, extendMs int64) error

Heartbeat extends the deadline of a processing job

func (*Queue) IsEmpty

func (q *Queue) IsEmpty(ctx context.Context) (bool, error)

IsEmpty checks if the queue has no jobs

func (*Queue) IsJobProcessing

func (q *Queue) IsJobProcessing(ctx context.Context, jobId string) (bool, error)

IsJobProcessing checks if a job is currently in processing state

func (*Queue) IsPaused

func (q *Queue) IsPaused(ctx context.Context) (bool, error)

IsPaused checks if the queue is paused

func (*Queue) JobTimeoutMs

func (q *Queue) JobTimeoutMs() int64

JobTimeoutMs returns the job timeout in milliseconds

func (*Queue) MaxAttempts

func (q *Queue) MaxAttempts() int

MaxAttempts returns the default max attempts

func (*Queue) Name

func (q *Queue) Name() string

Name returns the queue name (raw namespace)

func (*Queue) Namespace

func (q *Queue) Namespace() string

Namespace returns the full namespace prefix

func (*Queue) Pause

func (q *Queue) Pause(ctx context.Context) error

Pause pauses the queue

func (*Queue) ProcessRepeatingJobsBounded

func (q *Queue) ProcessRepeatingJobsBounded(ctx context.Context, limit int, now int64) (int64, error)

ProcessRepeatingJobsBounded processes up to limit repeating jobs

func (*Queue) Promote

func (q *Queue) Promote(ctx context.Context, jobId string) error

Promote promotes a delayed job to be ready immediately

func (*Queue) PromoteDelayedJobs

func (q *Queue) PromoteDelayedJobs(ctx context.Context) (int64, error)

PromoteDelayedJobs promotes delayed jobs that are ready

func (*Queue) PromoteDelayedJobsBounded

func (q *Queue) PromoteDelayedJobsBounded(ctx context.Context, limit int, now int64) (int64, error)

PromoteDelayedJobsBounded promotes up to limit delayed jobs

func (*Queue) PromoteStaged

func (q *Queue) PromoteStaged(ctx context.Context) (int64, error)

PromoteStaged promotes staged jobs (for orderingDelayMs) limit: maximum number of jobs to promote in one batch (default: 100, matching TypeScript)

func (*Queue) PromoteStagedWithLimit

func (q *Queue) PromoteStagedWithLimit(ctx context.Context, limit int) (int64, error)

PromoteStagedWithLimit promotes staged jobs with custom limit

func (*Queue) RawNamespace

func (q *Queue) RawNamespace() string

RawNamespace returns the raw namespace without prefix

func (*Queue) RecordAttemptFailure

func (q *Queue) RecordAttemptFailure(ctx context.Context, job *ReservedJob, errInfo ErrorInfo, meta JobMeta) error

RecordAttemptFailure records a failure attempt (non-final)

func (*Queue) RecordCompleted

func (q *Queue) RecordCompleted(ctx context.Context, job *ReservedJob, result interface{}, meta JobMeta) error

RecordCompleted records a successful completion for retention

func (*Queue) RecordFinalFailure

func (q *Queue) RecordFinalFailure(ctx context.Context, job *ReservedJob, errInfo ErrorInfo, meta JobMeta) error

RecordFinalFailure records a final failure (dead-lettered)

func (*Queue) Redis

func (q *Queue) Redis() redis.Cmdable

Redis returns the Redis client

func (*Queue) ReleaseJob

func (q *Queue) ReleaseJob(ctx context.Context, jobId string) error

ReleaseJob releases a job that was reserved but not processed (e.g., worker stopped) This removes the job from processing and active list, and puts it back in the group queue

func (*Queue) Remove

func (q *Queue) Remove(ctx context.Context, jobId string) error

Remove removes a job from the queue

func (*Queue) RemoveRepeatingJob

func (q *Queue) RemoveRepeatingJob(ctx context.Context, groupId string, repeat *RepeatOptions) (bool, error)

RemoveRepeatingJob removes a repeating job

func (*Queue) Reserve

func (q *Queue) Reserve(ctx context.Context) (*ReservedJob, error)

Reserve reserves the next available job for processing

func (*Queue) ReserveAtomic

func (q *Queue) ReserveAtomic(ctx context.Context, groupId string) (*ReservedJob, error)

ReserveAtomic reserves a job from a specific group atomically

func (*Queue) ReserveBatch

func (q *Queue) ReserveBatch(ctx context.Context, maxBatch int) ([]*ReservedJob, error)

ReserveBatch reserves multiple jobs at once

func (*Queue) ReserveBlocking

func (q *Queue) ReserveBlocking(ctx context.Context, timeoutSec float64) (*ReservedJob, error)

ReserveBlocking blocks until a job is available or timeout This is the high-performance blocking reserve similar to TypeScript version

func (*Queue) ReserveBlockingWithOptions

func (q *Queue) ReserveBlockingWithOptions(ctx context.Context, timeoutSec float64, blockUntil *int64, blockingClient redis.Cmdable) (*ReservedJob, error)

ReserveBlockingWithOptions blocks with adaptive timeout and optional separate client blockUntil: optional timestamp for adaptive timeout calculation (can be nil) blockingClient: optional separate Redis client for blocking operations (can be nil)

func (*Queue) Resume

func (q *Queue) Resume(ctx context.Context) error

Resume resumes a paused queue

func (*Queue) Retry

func (q *Queue) Retry(ctx context.Context, jobId string, backoffMs int64) (int, error)

Retry re-queues a job for retry with optional backoff

func (*Queue) RunSchedulerOnce

func (q *Queue) RunSchedulerOnce(ctx context.Context) error

RunSchedulerOnce runs the scheduler once

func (*Queue) StartPromoter

func (q *Queue) StartPromoter(ctx context.Context) error

StartPromoter starts the promoter service with default interval (100ms) This matches TypeScript's startPromoter() with 100ms polling interval

func (*Queue) StartPromoterWithInterval

func (q *Queue) StartPromoterWithInterval(ctx context.Context, intervalMs int64) error

StartPromoterWithInterval starts the promoter service with custom interval This is the full-featured version, matching TypeScript's internal implementation with distributed locking

func (*Queue) StopPromoter

func (q *Queue) StopPromoter()

StopPromoter stops the promoter service

func (*Queue) UpdateData

func (q *Queue) UpdateData(ctx context.Context, jobId string, data interface{}) error

UpdateData updates a job's data payload

func (*Queue) WaitForEmpty

func (q *Queue) WaitForEmpty(ctx context.Context, timeoutMs int64) (bool, error)

WaitForEmpty waits for the queue to become empty

type QueueEventHandler

type QueueEventHandler interface {
	OnError(err error)
	OnReady()
	OnPaused()
	OnResumed()
	OnDrained()
}

QueueEventHandler handles queue lifecycle events

type QueueOption

type QueueOption func(*QueueOptions)

QueueOption is a functional option for queue configuration

func WithAutoBatch

func WithAutoBatch(enabled bool) QueueOption

WithAutoBatch enables auto-batching

func WithBatchMaxWaitMs

func WithBatchMaxWaitMs(ms int64) QueueOption

WithBatchMaxWaitMs sets the maximum wait time before flushing a batch

func WithBatchSize

func WithBatchSize(size int) QueueOption

WithBatchSize sets the maximum jobs per batch

func WithJobTimeoutMs

func WithJobTimeoutMs(ms int64) QueueOption

WithJobTimeoutMs sets the job timeout

func WithKeepCompleted

func WithKeepCompleted(count int) QueueOption

WithKeepCompleted sets the number of completed jobs to keep

func WithKeepFailed

func WithKeepFailed(count int) QueueOption

WithKeepFailed sets the number of failed jobs to keep

func WithOrderingDelayMs

func WithOrderingDelayMs(ms int64) QueueOption

WithOrderingDelayMs sets the ordering delay

func WithSchedulerLockTtlMs

func WithSchedulerLockTtlMs(ms int64) QueueOption

WithSchedulerLockTtlMs sets the scheduler lock TTL

type QueueOptions

type QueueOptions struct {
	// Namespace prefix for all Redis keys (default: "gmq")
	Namespace string

	// Job visibility timeout in milliseconds - how long before a reserved job
	// is considered stalled and can be picked up by another worker (default: 30000)
	JobTimeoutMs int64

	// Delay in milliseconds before newly added jobs become visible (default: 0)
	// Useful for ensuring FIFO ordering when multiple producers add jobs concurrently
	OrderingDelayMs int64

	// Enable auto-batching for high throughput (default: false)
	AutoBatch bool

	// Maximum jobs per batch when AutoBatch is enabled (default: 10)
	AutoBatchMaxJobs int

	// Maximum wait time in milliseconds before flushing a batch (default: 10)
	AutoBatchMaxWaitMs int64

	// Default maximum retry attempts for jobs (default: 3)
	DefaultMaxAttempts int

	// Whether to keep completed jobs for inspection (default: false)
	KeepCompleted bool

	// How many completed jobs to keep (default: 1000)
	KeepCompletedCount int

	// How long to keep completed jobs in seconds (default: 86400 = 1 day)
	KeepCompletedAge int64

	// Whether to keep failed jobs for inspection (default: true)
	KeepFailed bool

	// How many failed jobs to keep (default: 1000)
	KeepFailedCount int

	// How long to keep failed jobs in seconds (default: 604800 = 7 days)
	KeepFailedAge int64

	// Scheduler lock TTL in milliseconds (default: 1500)
	// Controls how long the scheduler lock is held, preventing concurrent scheduler runs
	SchedulerLockTtlMs int64

	// Logger for queue operations and debugging
	// Can be nil (no logging), true (default logger), or a custom Logger instance
	Logger interface{} // nil, bool, or Logger
}

QueueOptions configures a Queue instance

func DefaultQueueOptions

func DefaultQueueOptions() QueueOptions

DefaultQueueOptions returns the default queue options

type RepeatOptions

type RepeatOptions struct {
	// Cron expression for scheduling (e.g., "0 * * * *" for every hour)
	Cron string

	// Repeat every N milliseconds
	Every int64

	// Maximum number of repetitions
	Limit int

	// End date for repetition
	EndDate time.Time
}

RepeatOptions configures job repetition

type ReservedJob

type ReservedJob struct {
	ID          string
	GroupId     string
	Data        []byte // Raw JSON data
	Attempts    int
	MaxAttempts int
	Seq         int64
	Timestamp   int64 // Job creation timestamp in ms
	OrderMs     int64 // Ordering timestamp in ms
	Score       int64 // Calculated score for ordering
	Deadline    int64 // Processing deadline timestamp in ms
}

ReservedJob represents a job that has been reserved for processing

func (*ReservedJob) UnmarshalData

func (r *ReservedJob) UnmarshalData(v interface{}) error

UnmarshalData unmarshals the job data into the provided interface

type ScriptLoader

type ScriptLoader struct {
	// contains filtered or unexported fields
}

ScriptLoader manages loading and caching of Lua scripts

func NewScriptLoader

func NewScriptLoader(client redis.Cmdable) *ScriptLoader

NewScriptLoader creates a new script loader

func (*ScriptLoader) EvalScript

func (l *ScriptLoader) EvalScript(ctx context.Context, name ScriptName, keys []string, args ...interface{}) *redis.Cmd

EvalScript evaluates a Lua script with the given arguments

func (*ScriptLoader) EvalScriptResult

func (l *ScriptLoader) EvalScriptResult(ctx context.Context, name ScriptName, keys []string, args ...interface{}) (interface{}, error)

EvalScriptResult evaluates a Lua script and returns the result

func (*ScriptLoader) LoadScript

func (l *ScriptLoader) LoadScript(ctx context.Context, name ScriptName) (string, error)

LoadScript loads a Lua script and returns its SHA

func (*ScriptLoader) PreloadScripts

func (l *ScriptLoader) PreloadScripts(ctx context.Context) error

PreloadScripts preloads all scripts into Redis cache

type ScriptName

type ScriptName string

ScriptName represents the name of a Lua script

const (
	ScriptEnqueue                        ScriptName = "enqueue"
	ScriptEnqueueBatch                   ScriptName = "enqueue-batch"
	ScriptReserve                        ScriptName = "reserve"
	ScriptReserveBatch                   ScriptName = "reserve-batch"
	ScriptReserveAtomic                  ScriptName = "reserve-atomic"
	ScriptComplete                       ScriptName = "complete"
	ScriptCompleteAndReserveNextWithMeta ScriptName = "complete-and-reserve-next-with-metadata"
	ScriptCompleteWithMetadata           ScriptName = "complete-with-metadata"
	ScriptRetry                          ScriptName = "retry"
	ScriptReleaseJob                     ScriptName = "release-job"
	ScriptHeartbeat                      ScriptName = "heartbeat"
	ScriptCleanup                        ScriptName = "cleanup"
	ScriptPromoteDelayedJobs             ScriptName = "promote-delayed-jobs"
	ScriptPromoteDelayedOne              ScriptName = "promote-delayed-one"
	ScriptPromoteStaged                  ScriptName = "promote-staged"
	ScriptChangeDelay                    ScriptName = "change-delay"
	ScriptGetActiveCount                 ScriptName = "get-active-count"
	ScriptGetWaitingCount                ScriptName = "get-waiting-count"
	ScriptGetDelayedCount                ScriptName = "get-delayed-count"
	ScriptGetCompletedCount              ScriptName = "get-completed-count"
	ScriptGetFailedCount                 ScriptName = "get-failed-count"
	ScriptGetActiveJobs                  ScriptName = "get-active-jobs"
	ScriptGetWaitingJobs                 ScriptName = "get-waiting-jobs"
	ScriptGetDelayedJobs                 ScriptName = "get-delayed-jobs"
	ScriptGetUniqueGroups                ScriptName = "get-unique-groups"
	ScriptGetUniqueGroupsCount           ScriptName = "get-unique-groups-count"
	ScriptCleanupPoisonedGroup           ScriptName = "cleanup-poisoned-group"
	ScriptRemove                         ScriptName = "remove"
	ScriptCleanStatus                    ScriptName = "clean-status"
	ScriptIsEmpty                        ScriptName = "is-empty"
	ScriptDeadLetter                     ScriptName = "dead-letter"
	ScriptRecordJobResult                ScriptName = "record-job-result"
	ScriptCheckStalled                   ScriptName = "check-stalled"
)

type StalledJobResult

type StalledJobResult struct {
	JobId   string
	GroupId string
	Action  string // "recovered" or "failed"
}

StalledJobResult represents the result of stalled job detection

type Status

type Status string

Status represents the state of a job in the queue system This aligns with TypeScript groupmq's status.ts for consistency

const (
	// StatusLatest represents the most recent jobs (BullBoard compatibility)
	StatusLatest Status = "latest"

	// StatusActive represents jobs currently being processed
	StatusActive Status = "active"

	// StatusWaiting represents jobs waiting to be processed
	StatusWaiting Status = "waiting"

	// StatusWaitingChildren represents jobs waiting for child jobs to complete
	StatusWaitingChildren Status = "waiting-children"

	// StatusPrioritized represents high-priority waiting jobs
	StatusPrioritized Status = "prioritized"

	// StatusCompleted represents successfully completed jobs
	StatusCompleted Status = "completed"

	// StatusFailed represents jobs that failed after all retry attempts
	StatusFailed Status = "failed"

	// StatusDelayed represents jobs scheduled to run in the future
	StatusDelayed Status = "delayed"

	// StatusPaused represents jobs in a paused queue
	StatusPaused Status = "paused"

	// StatusProcessing is an alias for StatusActive (internal use)
	StatusProcessing Status = "processing"

	// StatusUnknown represents jobs in an unknown or invalid state
	StatusUnknown Status = "unknown"
)

Job status constants - matches TypeScript STATUS enum

func AllStatuses

func AllStatuses() []Status

AllStatuses returns all valid job statuses Useful for iteration and validation

func (Status) String

func (s Status) String() string

String returns the string representation of the status

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker processes jobs from a Queue

func NewWorker

func NewWorker(queue *Queue, handler ProcessorFunc, opts ...WorkerOption) *Worker

NewWorker creates a new Worker instance

func (*Worker) Close

func (w *Worker) Close(ctx context.Context) error

Close gracefully closes the worker with a timeout This is a unified API similar to TypeScript's close() method

func (*Worker) GetCurrentJob

func (w *Worker) GetCurrentJob() *ReservedJob

GetCurrentJob returns the current job being processed by the worker (for single concurrency)

func (*Worker) GetCurrentJobs

func (w *Worker) GetCurrentJobs() []JobProgress

GetCurrentJobs returns all currently processing jobs

func (*Worker) GetWorkerMetrics

func (w *Worker) GetWorkerMetrics() WorkerMetrics

GetWorkerMetrics returns comprehensive worker performance metrics

func (*Worker) IsClosed

func (w *Worker) IsClosed() bool

IsClosed returns true if the worker is closed

func (*Worker) IsProcessing

func (w *Worker) IsProcessing() bool

IsProcessing returns true if the worker is currently processing jobs

func (*Worker) Run

func (w *Worker) Run(ctx context.Context) error

Run starts the worker and blocks until stopped

func (*Worker) Stop

func (w *Worker) Stop()

Stop gracefully stops the worker

func (*Worker) TotalProcessed

func (w *Worker) TotalProcessed() int64

TotalProcessed returns the total number of jobs processed

func (*Worker) Wait

func (w *Worker) Wait()

Wait blocks until the worker is fully stopped

type WorkerEventHandler

type WorkerEventHandler interface {
	OnReady()
	OnClosed()
	OnError(err error)
	OnStalled(jobId string, groupId string)
}

WorkerEventHandler handles worker lifecycle events

type WorkerJobInfo

type WorkerJobInfo struct {
	JobID            string `json:"jobId"`
	GroupID          string `json:"groupId"`
	ProcessingTimeMs int64  `json:"processingTimeMs"`
}

WorkerJobInfo contains information about a job being processed by a worker

type WorkerMetrics

type WorkerMetrics struct {
	Name                     string
	TotalJobsProcessed       int64
	LastJobPickupTime        time.Time
	TimeSinceLastJobMs       int64
	IsProcessing             bool
	JobsInProgressCount      int
	ConsecutiveEmptyReserves int
	LastActivityTime         time.Time
	JobsInProgress           []JobProgress
}

WorkerMetrics contains worker performance metrics

type WorkerOption

type WorkerOption func(*Worker)

WorkerOption configures a Worker

func WithBackoff

func WithBackoff(opts BackoffOptions) WorkerOption

WithBackoff sets the backoff strategy

func WithBlockingClient

func WithBlockingClient(client redis.Cmdable) WorkerOption

WithBlockingClient sets a dedicated Redis client for blocking operations

func WithCleanupIntervalMs

func WithCleanupIntervalMs(ms int64) WorkerOption

WithCleanupIntervalMs sets the cleanup interval

func WithCompletedHandler

func WithCompletedHandler(fn func(job *Job, result interface{})) WorkerOption

WithCompletedHandler sets the completion handler

func WithConcurrency

func WithConcurrency(n int) WorkerOption

WithConcurrency sets the number of concurrent job processors

func WithEnableCleanup

func WithEnableCleanup(enabled bool) WorkerOption

WithEnableCleanup enables or disables automatic cleanup

func WithErrorHandler

func WithErrorHandler(fn func(err error, job *ReservedJob)) WorkerOption

WithErrorHandler sets the error handler

func WithFailedHandler

func WithFailedHandler(fn func(job *Job, err error)) WorkerOption

WithFailedHandler sets the failure handler

func WithGracePeriod

func WithGracePeriod(ms int64) WorkerOption

WithGracePeriod sets the grace period before considering a job stalled

func WithHeartbeatMs

func WithHeartbeatMs(ms int64) WorkerOption

WithHeartbeatMs sets the heartbeat interval

func WithLogger

func WithLogger(logger Logger) WorkerOption

WithLogger sets the logger

func WithMaxAttempts

func WithMaxAttempts(n int) WorkerOption

WithMaxAttempts sets the maximum retry attempts

func WithMaxStalledCount

func WithMaxStalledCount(n int64) WorkerOption

WithMaxStalledCount sets the max stalled count before failing

func WithName

func WithName(name string) WorkerOption

WithName sets the worker name

func WithSchedulerIntervalMs

func WithSchedulerIntervalMs(ms int64) WorkerOption

WithSchedulerIntervalMs sets the scheduler interval in milliseconds

func WithStalledHandler

func WithStalledHandler(fn func(jobId, groupId string)) WorkerOption

WithStalledHandler sets the stalled job handler

func WithStalledInterval

func WithStalledInterval(ms int64) WorkerOption

WithStalledInterval sets the stalled check interval

type WorkerOptions

type WorkerOptions struct {
	// Number of concurrent job processors (default: 1)
	Concurrency int

	// Heartbeat interval in milliseconds to extend job visibility (default: 5000)
	HeartbeatMs int64

	// Backoff strategy for retries
	Backoff BackoffOptions

	// Interval in milliseconds for checking stalled jobs (default: 30000)
	StalledInterval int64

	// Maximum times a job can be stalled before failing (default: 1)
	MaxStalledCount int

	// Grace period in milliseconds before considering a job stalled (default: 0)
	StalledGracePeriod int64

	// Blocking timeout in seconds for BRPOP-style waiting (default: 5)
	BlockingTimeoutSec int

	// Whether to auto-run the worker on creation (default: true)
	AutoRun bool

	// Batch size for reserving jobs (default: 1, max: concurrency)
	BatchSize int
}

WorkerOptions configures a Worker instance

func DefaultWorkerOptions

func DefaultWorkerOptions() WorkerOptions

DefaultWorkerOptions returns the default worker options

type WorkerStatusInfo

type WorkerStatusInfo struct {
	Index        int            `json:"index"`
	IsProcessing bool           `json:"isProcessing"`
	CurrentJob   *WorkerJobInfo `json:"currentJob,omitempty"`
}

WorkerStatusInfo contains status information for a single worker

type WorkersStatusSummary

type WorkersStatusSummary struct {
	Total      int                `json:"total"`
	Processing int                `json:"processing"`
	Idle       int                `json:"idle"`
	Workers    []WorkerStatusInfo `json:"workers"`
}

WorkersStatusSummary contains aggregated status for multiple workers

func GetWorkersStatus

func GetWorkersStatus(workers []*Worker) WorkersStatusSummary

GetWorkersStatus returns status information for all workers This aligns with TypeScript's getWorkersStatus() helper

Directories

Path Synopsis
cmd
monitor command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL