gqm

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2026 License: MIT Imports: 26 Imported by: 0

README

GQM — Go Queue Manager

Go Reference Go Report Card Release Go Version

Redis-based task queue library for Go. Built from scratch with minimal dependencies, progressive disclosure API, and production-grade features including worker isolation, DAG dependencies, cron scheduling, and an embedded monitoring dashboard.

Features

  • Worker pool isolation — Dedicated goroutine pools per job type with independent concurrency, timeout, and retry policies
  • DAG job dependencies — Linear chains or full DAG (Directed Acyclic Graph) with cycle detection, cascade cancellation, and per-dependency failure tolerance
  • Cron scheduler — 6-field cron expressions (incl. seconds), overlap policies (skip/allow/replace), timezone support, distributed locking
  • Delayed jobs — Schedule jobs for future execution with EnqueueAt() / EnqueueIn()
  • Retry & dead letter queue — Configurable retry with fixed/exponential/custom backoff, automatic DLQ after max retries
  • Unique jobs — Idempotent enqueue via Unique() option (backed by atomic HSETNX)
  • Dequeue strategies — Strict priority, round-robin, or weighted (default) across multi-queue pools
  • Timeout hierarchy — Job-level → pool-level → global default (always enforced, never disabled)
  • Middleware — Global handler middleware chain via Server.Use() for logging, metrics, tracing
  • Error classificationIsFailure predicate separates transient errors (retry without counting) from real failures
  • Skip retryErrSkipRetry sentinel error bypasses all retries, sends job directly to DLQ
  • Job callbacksOnSuccess, OnFailure, OnComplete per-handler callbacks with panic recovery
  • Bulk enqueueEnqueueBatch() creates up to 1000 jobs in a single Redis pipeline
  • Panic recovery — Handler panics are caught per-goroutine; worker pools remain operational
  • Graceful shutdown — In-flight jobs complete before exit, with configurable grace period
  • YAML config — Full config-file-driven deployment with 20+ structural validation rules
  • Progressive disclosure — Zero-config to start, full control when needed (4 layers)
  • HTTP monitoring API — 32 REST endpoints for queue stats, job management, worker status, cron control
  • Web dashboard — Embedded vanilla HTML/CSS/JS dashboard with auth, RBAC (admin/viewer), CSRF protection
  • CLI tool — Config management, password hashing, API key generation, dashboard export
  • TUI monitor — Terminal UI with live queue/worker/cron monitoring (separate Go module)
  • Atomic operations — 12 Lua scripts for race-free Redis state transitions
  • Redis TLSWithRedisTLS() option or redis.tls: true config for encrypted connections (pass custom *tls.Config or nil for system defaults)
  • API rate limiting — Per-IP token bucket on all API endpoints (default 100 req/s, configurable via monitoring.api.rate_limit, /health exempt)
  • Redis Sentinel support — Inject pre-configured *redis.Client via WithRedisClient() for Sentinel, Cluster, or custom setups
  • Minimal dependencies — Core library: 3 deps (go-redis, yaml.v3, x/crypto). CLI adds x/term for interactive input

Screenshots

Web Dashboard
Overview DAG Dependencies
Overview DAG
Scheduler Failed / DLQ
Cron DLQ
Terminal UI (TUI)
Queues Workers
TUI Queue TUI Worker

Requirements

  • Go 1.22+
  • Redis 6.2+ (for BLMOVE)

Installation

# Core library
go get github.com/benedict-erwin/gqm

# TUI (optional, separate module)
go get github.com/benedict-erwin/gqm/tui

# CLI binary
go install github.com/benedict-erwin/gqm/cmd/gqm@latest

Quick Start

Layer 1 — Zero Config
// Producer: enqueue jobs
client, _ := gqm.NewClient(gqm.WithRedisAddr("localhost:6379"))
defer client.Close()

client.Enqueue("email.send", gqm.Payload{
    "to":      "[email protected]",
    "subject": "Welcome",
})

// Consumer: process jobs (shared default pool)
server, _ := gqm.NewServer(gqm.WithServerRedis("localhost:6379"))
server.Handle("email.send", func(ctx context.Context, job *gqm.Job) error {
    var p EmailPayload
    job.Decode(&p)
    return sendEmail(ctx, p.To, p.Subject)
})
server.Start(context.Background())
Layer 2 — Per-Handler Concurrency
server, _ := gqm.NewServer(gqm.WithServerRedis("localhost:6379"))

// Each handler gets a dedicated pool with N workers
server.Handle("email.send", emailHandler, gqm.Workers(5))
server.Handle("payment.process", paymentHandler, gqm.Workers(3))

server.Start(context.Background())
Layer 3 — Explicit Pools
server, _ := gqm.NewServer(
    gqm.WithServerRedis("localhost:6379"),
    gqm.WithAPI(true, ":8080"),
    gqm.WithDashboard(true),
)

server.Pool(gqm.PoolConfig{
    Name:        "email-pool",
    JobTypes:    []string{"email.send", "email.bulk"},
    Queues:      []string{"critical", "email"},  // priority order
    Concurrency: 10,
    JobTimeout:  30 * time.Second,
    DequeueStrategy: gqm.StrategyWeighted,
    RetryPolicy: &gqm.RetryPolicy{
        MaxRetry:    5,
        Backoff:     gqm.BackoffExponential,
        BackoffBase: 10 * time.Second,
        BackoffMax:  10 * time.Minute,
    },
})

server.Handle("email.send", sendHandler)
server.Handle("email.bulk", bulkHandler)
server.Start(context.Background())
Layer 4 — Config File

Define everything in YAML — pools, queues, cron, auth, dashboard. See YAML Configuration below.

cfg, _ := gqm.LoadConfigFile("gqm.yaml")
server, _ := gqm.NewServerFromConfig(cfg)
server.Handle("email.send", emailHandler)
server.Start(context.Background())

YAML Configuration

Generate a template with gqm init, then customize:

# gqm.yaml
redis:
  addr: "localhost:6379"
  password: ""
  db: 0
  prefix: "gqm"

app:
  timezone: "Asia/Jakarta"
  log_level: "info"               # debug, info, warn, error
  shutdown_timeout: 30            # seconds
  global_job_timeout: 1800        # seconds (30 min default, cannot be disabled)
  grace_period: 10                # seconds

queues:
  - name: "critical"
    priority: 10
  - name: "default"
    priority: 1
  - name: "low"
    priority: 0

pools:
  - name: "fast"
    job_types: ["email.send", "notification.push"]
    queues: ["critical", "default"]
    concurrency: 10
    job_timeout: 60
    dequeue_strategy: "weighted"  # strict, round_robin, weighted
    retry:
      max_retry: 5
      backoff: "exponential"      # fixed, exponential, custom
      backoff_base: 10            # seconds
      backoff_max: 3600           # seconds
  - name: "background"
    job_types: ["*"]              # catch-all for unassigned job types
    queues: ["default", "low"]
    concurrency: 3

scheduler:
  enabled: true
  poll_interval: 5                # seconds — how often to check for due jobs
                                  # lower = faster promotion, higher Redis load
  cron_entries:
    - id: "cleanup-daily"
      name: "Daily cleanup"
      cron_expr: "0 0 2 * * *"   # 6-field: sec min hour dom month dow
      timezone: "UTC"
      job_type: "cleanup"
      queue: "default"
      overlap_policy: "skip"      # skip, allow, replace

monitoring:
  auth:
    enabled: true
    session_ttl: 86400
    users:
      - username: "admin"
        password_hash: ""         # gqm set-password admin
        role: "admin"             # admin or viewer
  api:
    enabled: true
    addr: ":8080"
    api_keys:
      - name: "grafana"
        key: ""                   # gqm add-api-key grafana
        role: "viewer"
  dashboard:
    enabled: true
    path_prefix: "/dashboard"
    # custom_dir: "./my-dashboard"  # override embedded dashboard

Code options always override config values:

cfg, _ := gqm.LoadConfigFile("gqm.yaml")
server, _ := gqm.NewServerFromConfig(cfg,
    gqm.WithGlobalTimeout(10 * time.Minute), // overrides app.global_job_timeout
    gqm.WithSchedulerEnabled(false),         // worker-only instance
)

Enqueue Options

client.Enqueue("report.generate", payload,
    gqm.Queue("reports"),                      // target queue (default: "default")
    gqm.MaxRetry(5),                           // max retry attempts
    gqm.Timeout(2 * time.Minute),              // job-level timeout
    gqm.RetryIntervals(10, 30, 60, 300),       // custom backoff (seconds)
    gqm.JobID("report-2026-02"),               // custom job ID (default: UUID v7)
    gqm.Meta(map[string]string{"user": "42"}), // arbitrary metadata
    gqm.EnqueuedBy("api-gateway"),             // audit trail
    gqm.EnqueueAtFront(true),                  // push to front of queue
    gqm.Unique(),                              // idempotent (requires custom JobID)
    gqm.DependsOn(parentID),                   // DAG dependency
    gqm.AllowFailure(true),                    // run even if parent fails
)

Middleware

Register global middleware that wraps every handler. Middleware executes in registration order (onion model: a → b → handler → b → a).

srv.Use(func(next gqm.Handler) gqm.Handler {
    return func(ctx context.Context, job *gqm.Job) error {
        slog.Info("job start", "id", job.ID, "type", job.Type)
        start := time.Now()
        err := next(ctx, job)
        slog.Info("job done", "id", job.ID, "duration", time.Since(start), "error", err)
        return err
    }
})

Use() returns an error if called after Start() or with a nil middleware. Register all middleware before starting the server.

Error Classification

ErrSkipRetry

Wrap any error with ErrSkipRetry to bypass all retries and send the job directly to the dead letter queue:

server.Handle("payment.charge", func(ctx context.Context, job *gqm.Job) error {
    err := gateway.Charge(ctx, job.Payload["card_id"])
    if errors.Is(err, ErrInvalidCard) {
        return fmt.Errorf("invalid card: %w", gqm.ErrSkipRetry) // no retry, straight to DLQ
    }
    return err // normal retry on other errors
})
IsFailure Predicate

Classify handler errors as transient or real failures. Transient errors (predicate returns false) retry without incrementing the retry counter — they don't count toward the retry limit:

server.Handle("api.call", apiHandler,
    gqm.Workers(3),
    gqm.IsFailure(func(err error) bool {
        // Rate limits and timeouts are transient — retry indefinitely
        if errors.Is(err, ErrRateLimit) || errors.Is(err, context.DeadlineExceeded) {
            return false
        }
        return true // everything else counts as a real failure
    }),
)

Job Callbacks

Per-handler callbacks fire after job execution. All callbacks include panic recovery.

server.Handle("order.process", orderHandler,
    gqm.Workers(5),

    gqm.OnSuccess(func(ctx context.Context, job *gqm.Job) {
        metrics.OrderProcessed.Inc()
    }),

    gqm.OnFailure(func(ctx context.Context, job *gqm.Job, err error) {
        alerting.Notify(fmt.Sprintf("order %s failed: %v", job.ID, err))
    }),

    gqm.OnComplete(func(ctx context.Context, job *gqm.Job, err error) {
        audit.Log("order.process", job.ID, err)
    }),
)

Callbacks run synchronously in the worker goroutine before the next job is dequeued. Keep them fast — for heavy work, spawn a goroutine inside the callback.

Bulk Enqueue

Create multiple jobs in a single Redis pipeline:

items := []gqm.BatchItem{
    {JobType: "email.send", Payload: gqm.Payload{"to": "[email protected]"}, Options: []gqm.EnqueueOption{gqm.MaxRetry(3)}},
    {JobType: "email.send", Payload: gqm.Payload{"to": "[email protected]"}, Options: []gqm.EnqueueOption{gqm.MaxRetry(3)}},
    {JobType: "email.send", Payload: gqm.Payload{"to": "[email protected]"}, Options: []gqm.EnqueueOption{gqm.MaxRetry(3)}},
}

jobs, err := client.EnqueueBatch(ctx, items)
// jobs[0].ID, jobs[1].ID, jobs[2].ID — all created in one pipeline

Limits: max 1000 items per batch. DependsOn, Unique, and EnqueueAtFront are not supported in batch mode.

Custom Redis Client (Sentinel / Cluster)

GQM connects to a standalone Redis by default. For Sentinel, Cluster, or any custom go-redis configuration, inject a pre-configured *redis.Client:

// Redis Sentinel
rdb := redis.NewFailoverClient(&redis.FailoverOptions{
    MasterName:    "mymaster",
    SentinelAddrs: []string{"sentinel1:26379", "sentinel2:26379", "sentinel3:26379"},
    Password:      "secret",
})

client, _ := gqm.NewClient(gqm.WithRedisClient(rdb))
server, _ := gqm.NewServer(gqm.WithServerRedisClient(rdb))

When WithRedisClient is used, connection options (WithRedisAddr, WithRedisPassword, etc.) are ignored — only WithPrefix still applies.

Delayed & Scheduled Jobs

// Run at a specific time
client.EnqueueAt("report.generate", payload, time.Date(2026, 3, 1, 0, 0, 0, 0, time.UTC))

// Run after a delay
client.EnqueueIn("reminder.send", payload, 24 * time.Hour)

Jobs are held in a Redis sorted set (scored by timestamp) and promoted to the ready queue by the scheduler engine.

Job Dependencies (DAG)

jobA, _ := client.Enqueue("step.one", payloadA)
jobB, _ := client.Enqueue("step.two", payloadB)

// jobC runs only after both A and B complete successfully
jobC, _ := client.Enqueue("step.three", payloadC,
    gqm.DependsOn(jobA.ID, jobB.ID),
)

Failure behavior:

  • Default — If a parent job fails (exhausts retries → DLQ), all dependent children are cascade-canceled recursively. The entire downstream chain is canceled.
  • AllowFailure(true) — Opt-in per dependency. The child treats a failed parent as "resolved" and runs anyway once all dependencies are satisfied (completed or failed).
//   A (fail)
//   ├── B                    → canceled (default)
//   │   └── D                → canceled (cascade from B)
//   └── C [AllowFailure]     → still runs (tolerates A's failure)
//       └── E                → runs after C completes

jobB, _ := client.Enqueue("step.b", p, gqm.DependsOn(jobA.ID))
jobC, _ := client.Enqueue("step.c", p, gqm.DependsOn(jobA.ID), gqm.AllowFailure(true))
jobD, _ := client.Enqueue("step.d", p, gqm.DependsOn(jobB.ID))
jobE, _ := client.Enqueue("step.e", p, gqm.DependsOn(jobC.ID))

Cycle detection (DFS, depth limit 100) runs at enqueue time — circular dependencies are rejected before any job is queued.

Cron Scheduling

Cron works by automatically enqueuing jobs on a schedule. You define what to run (job type) and when (cron expression) — the scheduler handles the rest.

Step 1: Register the handler — this is the code that runs when the cron fires:

// The handler is a regular job handler — same as any other job.
// The scheduler enqueues a job with this type on each cron tick.
server.Handle("cleanup", func(ctx context.Context, job *gqm.Job) error {
    deleted, err := db.DeleteExpiredSessions(ctx)
    if err != nil {
        return err // will retry based on retry policy
    }
    slog.Info("cleanup complete", "deleted", deleted)
    return nil
}, gqm.Workers(1))

Step 2: Define the schedule — either in code or YAML config:

// Option A: in code
server.Schedule(gqm.CronEntry{
    ID:            "cleanup-daily",
    Name:          "Daily Session Cleanup",
    CronExpr:      "0 0 2 * * *",  // 6-field: sec min hour dom month dow
    Timezone:      "Asia/Jakarta",
    JobType:       "cleanup",       // must match the handler registered above
    Queue:         "default",
    OverlapPolicy: gqm.OverlapSkip, // skip | allow | replace
})
# Option B: in gqm.yaml (same effect)
scheduler:
  cron_entries:
    - id: "cleanup-daily"
      name: "Daily Session Cleanup"
      cron_expr: "0 0 2 * * *"
      timezone: "Asia/Jakarta"
      job_type: "cleanup"
      queue: "default"
      overlap_policy: "skip"

How it works: The scheduler goroutine checks cron entries every poll_interval seconds. When an entry is due, it enqueues a new job with the specified job_type into the target queue. The job is then picked up by a worker pool that handles that job type — exactly like a manually enqueued job. Overlap policy controls what happens if the previous cron job is still running when the next tick fires.

Monitoring

Web Dashboard

Embedded vanilla HTML/CSS/JS dashboard — no build step, no npm. Served directly from the Go binary via embed.FS.

Enable programmatically:

server, _ := gqm.NewServer(
    gqm.WithServerRedis("localhost:6379"),
    gqm.WithAPI(true, ":8080"),
    gqm.WithDashboard(true),
    gqm.WithAuthEnabled(true),
    gqm.WithAuthUsers([]gqm.AuthUser{
        {Username: "admin", PasswordHash: "$2a$10$...", Role: "admin"},
        {Username: "viewer", PasswordHash: "$2a$10$...", Role: "viewer"},
    }),
    gqm.WithAPIKeys([]gqm.AuthAPIKey{
        {Name: "grafana", Key: "gqm_ak_...", Role: "viewer"},
    }),
)
// Dashboard: http://localhost:8080/dashboard/
// Health:    http://localhost:8080/health (no auth)

Or via YAML config:

monitoring:
  api:
    enabled: true
    addr: ":8080"
  dashboard:
    enabled: true
    # path_prefix: "/dashboard"     # default
    # custom_dir: "./my-dashboard"  # override embedded assets
  auth:
    enabled: true
    users:
      - username: admin
        password_hash: ""  # generate with: gqm hash-password
        role: admin

Dashboard pages:

Page Description
Overview Job stats with Chart.js graphs, stat cards per status
Servers Live server heartbeats, uptime, active jobs
Queues Queue sizes, pause/resume, empty queue, DLQ retry
Workers Per-pool worker status, active job tracking
Failed / DLQ Failed job browser, retry/delete individual or batch
Scheduler Cron entries, next/last run, trigger/enable/disable
DAG Dependency graph visualization with Cytoscape.js

Auth & security: Session cookies (bcrypt + HttpOnly/Secure/SameSite), API keys with constant-time comparison, RBAC (admin/viewer), CSRF protection, login rate limiting. Health check at GET /health requires no auth.

REST API

32 endpoints under /api/v1/. Authenticate via session cookie (dashboard) or X-API-Key header (programmatic). Write endpoints require X-GQM-CSRF: 1 header (API key exempt).

# List queues with API key
curl -H "X-API-Key: gqm_ak_xxx" http://localhost:8080/api/v1/queues

# Pause a queue (admin only, CSRF header required for session auth, exempt for API key)
curl -X POST -H "X-API-Key: gqm_ak_xxx" http://localhost:8080/api/v1/queues/email:send/pause

Read endpoints:

Method Endpoint Description
GET /api/v1/queues List all queues with per-status counts
GET /api/v1/queues/{name} Queue detail
GET /api/v1/queues/{name}/jobs?status=ready&page=1&limit=20 Paginated job list
GET /api/v1/jobs/{id} Single job detail
GET /api/v1/workers List pools with concurrency, queues, active jobs
GET /api/v1/stats Overview: total counts, worker count, uptime
GET /api/v1/cron List cron entries with next/last run
GET /api/v1/cron/{id}/history Cron execution history
GET /api/v1/servers Active server instances
GET /api/v1/dag/deferred List deferred jobs (waiting on dependencies)
GET /api/v1/dag/roots List DAG root jobs
GET /api/v1/dag/{id}/graph DAG graph (nodes + edges for visualization)

Admin endpoints (require admin role):

Method Endpoint Description
POST /api/v1/queues/{name}/pause Pause queue (workers stop dequeuing)
POST /api/v1/queues/{name}/resume Resume queue
DELETE /api/v1/queues/{name}/empty Delete all ready jobs
POST /api/v1/queues/{name}/dead-letter/retry-all Retry all DLQ jobs
DELETE /api/v1/queues/{name}/dead-letter/clear Clear DLQ
POST /api/v1/jobs/{id}/retry Retry single job
POST /api/v1/jobs/{id}/cancel Cancel job (cascades to DAG dependents)
DELETE /api/v1/jobs/{id} Delete job
POST /api/v1/jobs/batch/retry Batch retry (body: {"job_ids": [...]})
POST /api/v1/jobs/batch/delete Batch delete
POST /api/v1/cron/{id}/trigger Manual trigger cron entry
POST /api/v1/cron/{id}/enable Enable cron entry
POST /api/v1/cron/{id}/disable Disable cron entry

Auth endpoints:

Method Endpoint Description
POST /auth/login Form login → session cookie
POST /auth/logout Destroy session
GET /auth/me Current user info
GET /health Health check (no auth, no rate limit)

Customizing the dashboard:

You can replace the built-in dashboard with your own HTML/CSS/JS files. GQM's REST API remains fully available as your backend.

# Step 1: Export the built-in dashboard as a starting point
gqm dashboard export ./my-dashboard

# Step 2: Edit the files in ./my-dashboard/ (HTML, CSS, JS)

# Step 3: Point your server to the custom directory
server, _ := gqm.NewServer(
    gqm.WithServerRedis("localhost:6379"),
    gqm.WithAPI(true, ":8080"),
    gqm.WithDashboard(true),
    gqm.WithDashboardDir("./my-dashboard"),   // override embedded dashboard
    gqm.WithDashboardPathPrefix("/my-panel"),  // optional: change URL path (default: /dashboard)
)

Or via YAML config:

monitoring:
  dashboard:
    enabled: true
    custom_dir: "./my-dashboard"
    path_prefix: "/my-panel"

When custom_dir is set, GQM serves files entirely from that directory instead of the embedded assets. All API endpoints (/api/v1/*, /auth/*, /health) continue to work normally — only the dashboard static files are replaced. See _examples/11-custom-dashboard for a working example.

TUI

Terminal UI for quick monitoring without a browser. Connects to a running GQM server via the HTTP API.

# Requires a server with monitoring enabled (WithAPI or monitoring.enabled in YAML)
gqm tui --api-url http://localhost:8080 --api-key gqm_ak_xxx

# Or via environment variables
export GQM_API_URL=http://localhost:8080
export GQM_API_KEY=gqm_ak_xxx
gqm tui

4 tabs: Queues, Workers, Failed, Cron. Auto-refreshes every second.

Keyboard shortcuts:

Key Action
1-4 Switch tab directly
Tab / Shift+Tab Cycle tabs
j/k or Up/Down Navigate list
h/l or Left/Right Switch queue (Failed tab)
p Pause/resume queue (Queues tab)
r Retry failed job (Failed tab)
t Trigger cron entry (Cron tab)
e Enable/disable cron entry (Cron tab)
F5 Force refresh
q / Ctrl+C Quit
CLI
gqm init                    Generate template gqm.yaml
gqm set-password <user>     Set/update dashboard password
gqm add-api-key <name>      Add API key to config
gqm revoke-api-key <name>   Remove API key from config
gqm hash-password           Generate bcrypt hash
gqm generate-api-key        Generate random API key
gqm dashboard export <dir>  Export embedded dashboard for customization
gqm tui [--api-url <url>] [--api-key <key>]  Launch terminal monitor
gqm version                 Show version

Performance

Benchmarked on Linux arm64 (Docker), Redis 7, Go 1.26, 4 vCPU. All operations use Lua scripts for atomic Redis state transitions.

Throughput
Operation Latency Throughput
Single enqueue ~55 µs 18,100 jobs/sec
End-to-end (enqueue → process → complete) ~100 µs 10,000 jobs/sec
Batch enqueue (100 jobs) ~726 µs 137,700 jobs/sec
Batch enqueue (1000 jobs) ~7.3 ms 137,800 jobs/sec
Burst drain (30 workers) 19,700 jobs/sec
Large payload 10 KB 1,607 jobs/sec
Large payload 100 KB 346 jobs/sec
Stress Test Highlights
Scenario Result
Data integrity (10K jobs, 20 workers) Zero loss, zero duplicates
Sustained load (30s, 558K jobs) Zero loss, p50 latency 3.2s, drain 3.8s
Retry storm (2K jobs × 4 attempts) All 8K attempts processed correctly
High concurrency (60 workers, 3 pools) Stable, no goroutine or memory leaks
Backpressure (735K queue depth) System responsive, no degradation
Panic recovery (500 panics) All recovered, workers remain operational
Resource Efficiency
  • Minimal dependencies — core library: 3 deps; CLI adds 1 (see Dependencies)
  • 12 Lua scripts — all Redis state transitions are atomic
  • Zero goroutine leaks — verified across all stress test scenarios
  • Memory stable — no runaway growth under sustained load

Architecture

Producer App                  Redis                     Worker Binary
─────────────                 ─────                     ────────────
gqm.Client                                              gqm.Server
  .Enqueue() ──────────────►  Queues (Lists)  ◄───────  Pool "email" (5 workers)
  .EnqueueAt()                Jobs (Hashes)             Pool "payment" (3 workers)
  .EnqueueIn()                Scheduled (ZSet)          Scheduler (delayed + cron)
                              Cron (Hash)               Heartbeat (1/pool)
                              Sessions (Strings)        HTTP API + Dashboard

Dependencies

Core library (what you get with go get github.com/benedict-erwin/gqm):

Dependency Purpose
github.com/redis/go-redis/v9 Redis client
gopkg.in/yaml.v3 YAML config parsing
golang.org/x/crypto/bcrypt Password hashing (dashboard auth)

CLI binary (cmd/gqm/) adds:

Dependency Purpose
golang.org/x/term Interactive password input (gqm set-password)

TUI module (gqm/tui) is a separate Go module within the same repo — importing the core library does not pull TUI dependencies (bubbletea, lipgloss, etc.).

Everything else is stdlib or implemented from scratch (UUID v7, cron parser, HTTP router via Go 1.22+, logging via log/slog).

License

MIT

Built With

  • Go 1.22+ — core language
  • Redis 7 — backbone storage
  • Claude (Anthropic) — AI pair programming assistant for implementation & docs

Documentation

Index

Constants

View Source
const (
	StatusReady      = "ready"
	StatusScheduled  = "scheduled"
	StatusDeferred   = "deferred"
	StatusProcessing = "processing"
	StatusCompleted  = "completed"
	StatusFailed     = "failed"
	StatusRetry      = "retry"
	StatusDeadLetter = "dead_letter"
	StatusStopped    = "stopped"
	StatusCanceled   = "canceled"
)

Job status constants.

Variables

View Source
var (
	// ErrJobNotFound is returned when a job cannot be found in Redis.
	ErrJobNotFound = errors.New("gqm: job not found")

	// ErrQueueEmpty is returned when attempting to dequeue from an empty queue.
	ErrQueueEmpty = errors.New("gqm: queue empty")

	// ErrServerStopped is returned when operations are attempted on a stopped server.
	ErrServerStopped = errors.New("gqm: server stopped")

	// ErrHandlerNotFound is returned when no handler is registered for a job type.
	ErrHandlerNotFound = errors.New("gqm: handler not found")

	// ErrDuplicateHandler is returned when a handler is registered twice for the same job type.
	ErrDuplicateHandler = errors.New("gqm: duplicate handler registration")

	// ErrInvalidJobType is returned when a job type is empty.
	ErrInvalidJobType = errors.New("gqm: invalid job type")

	// ErrMaxRetryExceeded is returned when a job has exceeded its maximum retry count.
	ErrMaxRetryExceeded = errors.New("gqm: max retry exceeded")

	// ErrDuplicatePool is returned when a pool with the same name is registered twice.
	ErrDuplicatePool = errors.New("gqm: duplicate pool name")

	// ErrJobTypeConflict is returned when a job type is assigned to multiple pools.
	ErrJobTypeConflict = errors.New("gqm: job type already assigned to another pool")

	// ErrDuplicateCronEntry is returned when a cron entry with the same ID is registered twice.
	ErrDuplicateCronEntry = errors.New("gqm: duplicate cron entry ID")

	// ErrCyclicDependency is returned when adding a dependency would create a cycle in the DAG.
	ErrCyclicDependency = errors.New("gqm: cyclic dependency detected")

	// ErrDuplicateJobID is returned when enqueueing with Unique() and a job with the same ID already exists.
	ErrDuplicateJobID = errors.New("gqm: duplicate job ID")

	// ErrInvalidQueueName is returned when a queue name contains invalid characters.
	ErrInvalidQueueName = errors.New("gqm: invalid queue name (only alphanumeric, hyphen, underscore, dot allowed; max 128 chars)")

	// ErrInvalidJobID is returned when a job ID contains invalid characters.
	ErrInvalidJobID = errors.New("gqm: invalid job ID (only alphanumeric, hyphen, underscore, dot allowed; max 256 chars)")

	// ErrSkipRetry can be returned (or wrapped) by a handler to skip all
	// remaining retries and move the job directly to the dead letter queue.
	ErrSkipRetry = errors.New("gqm: skip retry")

	// ErrBatchTooLarge is returned when EnqueueBatch receives more items
	// than the maximum allowed batch size.
	ErrBatchTooLarge = errors.New("gqm: batch size exceeds maximum (1000)")

	// ErrBatchDependsOn is returned when DependsOn is used in EnqueueBatch.
	ErrBatchDependsOn = errors.New("gqm: DependsOn is not supported in EnqueueBatch")

	// ErrBatchUnique is returned when Unique is used in EnqueueBatch.
	ErrBatchUnique = errors.New("gqm: Unique is not supported in EnqueueBatch")

	// ErrBatchEnqueueAtFront is returned when EnqueueAtFront is used in EnqueueBatch.
	ErrBatchEnqueueAtFront = errors.New("gqm: EnqueueAtFront is not supported in EnqueueBatch")

	// ErrJobDataTooLarge is returned when a job's serialized data exceeds maxJobDataSize.
	ErrJobDataTooLarge = errors.New("gqm: job data exceeds maximum size (1 MB)")
)

Functions

func NewUUID

func NewUUID() string

NewUUID generates a UUID v7 (RFC 9562) string. Format: xxxxxxxx-xxxx-7xxx-yxxx-xxxxxxxxxxxx 48-bit millisecond timestamp + 4-bit version (0111) + 12-bit random + 2-bit variant (10) + 62-bit random.

Types

type APIConfig

type APIConfig struct {
	Enabled   bool         `yaml:"enabled"`
	Addr      string       `yaml:"addr"`       // default ":8080"
	RateLimit int          `yaml:"rate_limit"` // requests/second per IP; 0 = default (100), -1 = disabled
	APIKeys   []APIKeyYAML `yaml:"api_keys"`
}

APIConfig holds HTTP API settings from YAML.

type APIKeyYAML

type APIKeyYAML struct {
	Name string `yaml:"name"`
	Key  string `yaml:"key"`  // prefix: gqm_ak_, minimum 32 chars
	Role string `yaml:"role"` // "admin" or "viewer"; defaults to "admin"
}

APIKeyYAML holds an API key entry from YAML.

type AppConfig

type AppConfig struct {
	Timezone         string `yaml:"timezone"`
	LogLevel         string `yaml:"log_level"`
	ShutdownTimeout  int    `yaml:"shutdown_timeout"`   // seconds
	GlobalJobTimeout int    `yaml:"global_job_timeout"` // seconds
	GracePeriod      int    `yaml:"grace_period"`       // seconds
}

AppConfig holds application-level settings from YAML.

type AuthAPIKey

type AuthAPIKey struct {
	Name string
	Key  string
	Role string // "admin" or "viewer"; defaults to "admin" if empty
}

AuthAPIKey represents an API key for programmatic access.

type AuthConfig

type AuthConfig struct {
	Enabled    bool       `yaml:"enabled"`
	SessionTTL int        `yaml:"session_ttl"` // seconds, default 86400
	Users      []UserYAML `yaml:"users"`
}

AuthConfig holds authentication settings from YAML.

type AuthUser

type AuthUser struct {
	Username     string
	PasswordHash string // bcrypt hash
	Role         string // "admin" or "viewer"; defaults to "admin" if empty
}

AuthUser represents a user for the monitoring API.

type BackoffType

type BackoffType string

BackoffType determines the retry delay calculation method.

const (
	// BackoffFixed uses a constant interval between retries.
	BackoffFixed BackoffType = "fixed"

	// BackoffExponential uses base * 2^attempt, capped at BackoffMax.
	BackoffExponential BackoffType = "exponential"

	// BackoffCustom uses the Intervals slice directly.
	BackoffCustom BackoffType = "custom"
)

type BatchItem

type BatchItem struct {
	JobType string
	Payload Payload
	Options []EnqueueOption
}

BatchItem represents a single job to be enqueued in a batch.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is used to enqueue jobs into the queue system.

func NewClient

func NewClient(opts ...RedisOption) (*Client, error)

NewClient creates a new Client with the given Redis options.

func (*Client) Close

func (c *Client) Close() error

Close closes the client's Redis connection.

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, jobType string, payload Payload, opts ...EnqueueOption) (*Job, error)

Enqueue creates a new job and adds it to the queue. If DependsOn is set, the job is created with status "deferred" and will be moved to the ready queue only after all dependencies are resolved. Returns the created job or an error.

func (*Client) EnqueueAt

func (c *Client) EnqueueAt(ctx context.Context, at time.Time, jobType string, payload Payload, opts ...EnqueueOption) (*Job, error)

EnqueueAt creates a new job scheduled for execution at the given time. The job is placed in the scheduled sorted set and will be moved to the ready queue by the scheduler when the time arrives.

func (*Client) EnqueueBatch

func (c *Client) EnqueueBatch(ctx context.Context, items []BatchItem) ([]*Job, error)

EnqueueBatch creates multiple jobs in a single Redis pipeline for efficiency. All jobs are validated upfront (including serialization); if any item fails validation, no jobs are enqueued and the error identifies the failing item.

The pipeline is NOT a Redis transaction — on network errors, some jobs may have been created while others were not. Callers should treat a pipeline error as "unknown state" and verify job existence if needed.

Maximum batch size is 1000 items. Returns ErrBatchTooLarge if exceeded.

Limitations compared to single Enqueue:

  • Unique() is not supported (returns error if set on any item).
  • DependsOn() is not supported (returns error if set on any item).
  • EnqueueAtFront() is not supported (returns error if set on any item).

Use individual Enqueue calls if you need uniqueness, DAG dependencies, or front-of-queue insertion.

func (*Client) EnqueueIn

func (c *Client) EnqueueIn(ctx context.Context, delay time.Duration, jobType string, payload Payload, opts ...EnqueueOption) (*Job, error)

EnqueueIn creates a new job scheduled for execution after the given delay.

func (*Client) GetJob

func (c *Client) GetJob(ctx context.Context, jobID string) (*Job, error)

GetJob retrieves a job by ID from Redis.

func (*Client) RedisClient

func (c *Client) RedisClient() *redis.Client

RedisClient returns the underlying RedisClient for testing/advanced use.

type CompleteCallbackFunc

type CompleteCallbackFunc func(ctx context.Context, job *Job, err error)

CompleteCallbackFunc is a callback invoked after every job execution regardless of outcome.

type Config

type Config struct {
	Redis      RedisYAML        `yaml:"redis"`
	App        AppConfig        `yaml:"app"`
	Queues     []QueueDef       `yaml:"queues"`
	Pools      []PoolYAML       `yaml:"pools"`
	Scheduler  SchedulerConfig  `yaml:"scheduler"`
	Monitoring MonitoringConfig `yaml:"monitoring"`
}

Config represents the top-level YAML configuration file.

func LoadConfig

func LoadConfig(data []byte) (*Config, error)

LoadConfig parses YAML bytes and validates the resulting configuration.

func LoadConfigFile

func LoadConfigFile(path string) (*Config, error)

LoadConfigFile reads a YAML file and returns a validated Config.

type CronEntry

type CronEntry struct {
	// User-configurable fields.
	ID            string        `json:"id"`
	Name          string        `json:"name"`
	CronExpr      string        `json:"cron_expr"`
	Timezone      string        `json:"timezone,omitempty"`
	JobType       string        `json:"job_type"`
	Queue         string        `json:"queue,omitempty"`
	Payload       Payload       `json:"payload,omitempty"`
	Timeout       int           `json:"timeout,omitempty"`
	MaxRetry      int           `json:"max_retry,omitempty"`
	OverlapPolicy OverlapPolicy `json:"overlap_policy,omitempty"`
	Enabled       bool          `json:"enabled"`

	// System-managed state (populated by the scheduler, not by the user).
	LastRun    int64  `json:"last_run,omitempty"`
	LastStatus string `json:"last_status,omitempty"`
	NextRun    int64  `json:"next_run,omitempty"`
	CreatedAt  int64  `json:"created_at,omitempty"`
	UpdatedAt  int64  `json:"updated_at,omitempty"`
	// contains filtered or unexported fields
}

CronEntry defines a recurring job schedule.

type CronEntryYAML

type CronEntryYAML struct {
	ID            string `yaml:"id"`
	Name          string `yaml:"name"`
	CronExpr      string `yaml:"cron_expr"`
	Timezone      string `yaml:"timezone"`
	JobType       string `yaml:"job_type"`
	Queue         string `yaml:"queue"`
	Payload       string `yaml:"payload"` // JSON string
	Timeout       int    `yaml:"timeout"` // seconds
	MaxRetry      int    `yaml:"max_retry"`
	OverlapPolicy string `yaml:"overlap_policy"`
	Enabled       *bool  `yaml:"enabled"` // nil = true
}

CronEntryYAML holds a cron entry from YAML.

type CronExpr

type CronExpr struct {
	// contains filtered or unexported fields
}

CronExpr represents a parsed cron expression.

Accepts 6 fields: second minute hour day_of_month month day_of_week or 5 fields: minute hour day_of_month month day_of_week (second defaults to 0).

Field syntax:

  • * (any value)
  • N (specific value)
  • N-M (range)
  • */S or N/S (step from min or N)
  • N-M/S (range with step)
  • N,M,O (comma-separated list)

Day matching: when both day_of_month and day_of_week are explicitly set (not *), the match uses OR semantics (standard cron behavior).

Day of week: 0 = Sunday, 1 = Monday, ..., 6 = Saturday, 7 = Sunday (alias).

func ParseCronExpr

func ParseCronExpr(expr string) (*CronExpr, error)

ParseCronExpr parses a cron expression string into a CronExpr. It accepts 6 fields (second minute hour dom month dow) or 5 fields (minute hour dom month dow, with second implicitly 0).

func (*CronExpr) Next

func (c *CronExpr) Next(from time.Time) time.Time

Next returns the next time after from that matches the cron expression. The search covers up to 4 years ahead. Returns the zero time if no match is found.

func (*CronExpr) String

func (c *CronExpr) String() string

String returns the original cron expression string.

type DashboardConfig

type DashboardConfig struct {
	Enabled    bool   `yaml:"enabled"`
	PathPrefix string `yaml:"path_prefix"` // default "/dashboard"
	CustomDir  string `yaml:"custom_dir"`
}

DashboardConfig holds dashboard settings from YAML.

type DequeueStrategy

type DequeueStrategy string

DequeueStrategy determines how a pool selects jobs from multiple queues.

const (
	// StrategyStrict always checks the highest-priority queue first.
	// Lower-priority queues may starve if higher ones are always full.
	StrategyStrict DequeueStrategy = "strict"

	// StrategyRoundRobin rotates through queues in order each cycle.
	StrategyRoundRobin DequeueStrategy = "round_robin"

	// StrategyWeighted selects queues probabilistically based on priority weight.
	StrategyWeighted DequeueStrategy = "weighted"
)

type EnqueueOption

type EnqueueOption func(*Job)

EnqueueOption configures job enqueue behavior.

func AllowFailure

func AllowFailure(allow bool) EnqueueOption

AllowFailure controls whether this job should proceed even if a parent dependency fails. When false (default), parent failure cascades cancellation.

func DependsOn

func DependsOn(jobIDs ...string) EnqueueOption

DependsOn specifies parent job IDs that must complete before this job runs. The job will be created with status "deferred" and moved to ready queue only after all dependencies are resolved.

func EnqueueAtFront

func EnqueueAtFront(atFront bool) EnqueueOption

EnqueueAtFront causes the job to be pushed to the front of the queue (instead of the back) when its dependencies are resolved.

func EnqueuedBy

func EnqueuedBy(name string) EnqueueOption

EnqueuedBy sets the enqueuer identifier.

func JobID

func JobID(id string) EnqueueOption

JobID overrides the default UUID v7 job ID with a custom value. The caller is responsible for ensuring uniqueness — if a job with the same ID already exists in Redis, its data will be silently overwritten.

func MaxRetry

func MaxRetry(n int) EnqueueOption

MaxRetry sets the maximum number of retries for the job. Negative values are clamped to 0 (no retry).

func Meta

func Meta(m Payload) EnqueueOption

Meta sets arbitrary metadata on the job.

func Queue

func Queue(name string) EnqueueOption

Queue sets the target queue for the job.

func RetryIntervals

func RetryIntervals(intervals ...int) EnqueueOption

RetryIntervals sets the retry intervals in seconds.

func Timeout

func Timeout(d time.Duration) EnqueueOption

Timeout sets the job-level timeout. Sub-second values are rounded up to 1s.

func Unique

func Unique() EnqueueOption

Unique ensures no existing job with the same ID exists before enqueueing. If a duplicate is found, Enqueue returns ErrDuplicateJobID. Most useful with JobID() for custom IDs — default UUID v7 IDs are already unique.

type FailureCallbackFunc

type FailureCallbackFunc func(ctx context.Context, job *Job, err error)

FailureCallbackFunc is a callback invoked after a job fails (handler error or timeout).

type HandleOption

type HandleOption func(*handlerConfig)

HandleOption configures handler registration.

func IsFailure

func IsFailure(fn func(error) bool) HandleOption

IsFailure sets a predicate that classifies handler errors.

When a handler returns an error and the predicate returns false, the job is retried WITHOUT incrementing the retry counter. This means transient/expected errors (rate limiting, temporary unavailability) do not count towards the retry limit, allowing the job to retry indefinitely for non-failure errors.

When the predicate returns true (or when no predicate is set), the error is treated as a real failure: the retry counter increments normally and the job eventually moves to the dead letter queue.

Non-failure retries always use the same retry delay (the first interval), since the retry counter is not incremented. Backoff progression only applies to failure retries.

If maxRetry is 0, non-failure errors are sent to the DLQ with a warning log, since retry is not configured.

If the predicate panics, the error is treated as a failure (safe default) and the panic is logged with a stack trace.

Note: ErrSkipRetry always bypasses retry regardless of IsFailure. The fn parameter must not be nil.

func OnComplete

func OnComplete(fn CompleteCallbackFunc) HandleOption

OnComplete registers a callback that fires after every job execution, regardless of outcome. It is called after OnSuccess or OnFailure. The err parameter is nil on success. Panics are recovered and logged; they do not affect the job outcome.

Important: callbacks block the worker. Keep them fast. Callbacks fire before Redis state is updated. Only fires when a handler is found and invoked — internal failures (fetch/parse/no-handler) do not trigger callbacks. Do not mutate the job struct.

func OnFailure

func OnFailure(fn FailureCallbackFunc) HandleOption

OnFailure registers a callback that fires after a handler returns an error (including timeout). It fires for ALL handler errors regardless of the IsFailure predicate classification. The callback runs synchronously in the worker goroutine before retry/DLQ processing. Panics are recovered and logged; they do not affect the job outcome.

Important: callbacks block the worker. Keep them fast. Callbacks fire before Redis state is updated. Do not mutate the job struct.

func OnSuccess

func OnSuccess(fn SuccessCallbackFunc) HandleOption

OnSuccess registers a callback that fires after a job completes successfully (handler returned nil). The callback runs synchronously in the worker goroutine before the next job is dequeued. Panics are recovered and logged; they do not affect the job outcome.

Important: callbacks block the worker. Keep them fast (no heavy I/O, no unbounded waits). For async work, spawn a goroutine inside the callback. Callbacks fire before Redis state is updated. Do not mutate the job struct — it is shared with subsequent Redis operations. The ctx is the worker's shutdown context, NOT the handler's timeout context. Use your own timeout for I/O inside callbacks.

func Workers

func Workers(n int) HandleOption

Workers sets the number of dedicated worker goroutines for this handler. This creates an implicit pool with a dedicated queue for this job type.

type Handler

type Handler func(ctx context.Context, job *Job) error

Handler is a function that processes a job. It must respect ctx for cancellation and timeout.

type Job

type Job struct {
	ID                string          `json:"id"`
	Type              string          `json:"type"`
	Queue             string          `json:"queue"`
	Payload           Payload         `json:"payload"`
	Status            string          `json:"status"`
	Result            json.RawMessage `json:"result,omitempty"`
	Error             string          `json:"error,omitempty"`
	RetryCount        int             `json:"retry_count"`
	MaxRetry          int             `json:"max_retry"`
	RetryIntervals    []int           `json:"retry_intervals,omitempty"`
	Timeout           int             `json:"timeout,omitempty"`
	CreatedAt         int64           `json:"created_at"`
	ScheduledAt       int64           `json:"scheduled_at,omitempty"`
	StartedAt         int64           `json:"started_at,omitempty"`
	CompletedAt       int64           `json:"completed_at,omitempty"`
	WorkerID          string          `json:"worker_id,omitempty"`
	LastHeartbeat     int64           `json:"last_heartbeat,omitempty"`
	ExecutionDuration int64           `json:"execution_duration,omitempty"`
	EnqueuedBy        string          `json:"enqueued_by,omitempty"`
	Meta              Payload         `json:"meta,omitempty"`
	DependsOn         []string        `json:"depends_on,omitempty"`
	AllowFailure      bool            `json:"allow_failure,omitempty"`
	EnqueueAtFront    bool            `json:"enqueue_at_front,omitempty"`
	// contains filtered or unexported fields
}

Job represents a unit of work in the queue.

func DecodeJob

func DecodeJob(data []byte) (*Job, error)

DecodeJob deserializes a Job from JSON bytes.

func JobFromMap

func JobFromMap(m map[string]string) (*Job, error)

JobFromMap creates a Job from a Redis HGETALL result.

func NewJob

func NewJob(jobType string, payload Payload) *Job

NewJob creates a new Job with a generated UUID v7 and the given type.

func (*Job) Decode

func (j *Job) Decode(target any) error

Decode unmarshals the job payload into the given target.

func (*Job) Encode

func (j *Job) Encode() ([]byte, error)

Encode serializes the job to JSON bytes.

func (*Job) ToMap

func (j *Job) ToMap() (map[string]any, error)

ToMap converts a Job to a map suitable for Redis HSET. Zero-value fields are omitted to reduce HSET data transfer. JobFromMap handles missing fields gracefully (parseInt returns 0, map lookup returns "").

type MiddlewareFunc

type MiddlewareFunc func(Handler) Handler

MiddlewareFunc is a function that wraps a Handler to add cross-cutting behavior such as logging, metrics, tracing, or error handling.

Middleware is registered via Server.Use() and applied in the order registered: Use(a, b) executes as a → b → handler.

Example:

func loggingMiddleware(next gqm.Handler) gqm.Handler {
    return func(ctx context.Context, job *gqm.Job) error {
        slog.Info("start job", "id", job.ID, "type", job.Type)
        err := next(ctx, job)
        slog.Info("end job", "id", job.ID, "type", job.Type, "error", err)
        return err
    }
}

type MonitoringConfig

type MonitoringConfig struct {
	Auth      AuthConfig      `yaml:"auth"`
	API       APIConfig       `yaml:"api"`
	Dashboard DashboardConfig `yaml:"dashboard"`
}

MonitoringConfig holds HTTP API, auth, and dashboard settings from YAML.

type OverlapPolicy

type OverlapPolicy string

OverlapPolicy determines behavior when a cron schedule fires while the previous job from the same entry is still running.

const (
	// OverlapSkip skips the new execution if the previous job is still running.
	OverlapSkip OverlapPolicy = "skip"
	// OverlapAllow enqueues a new job regardless of previous job status.
	OverlapAllow OverlapPolicy = "allow"
	// OverlapReplace cancels the previous job and enqueues a new one.
	OverlapReplace OverlapPolicy = "replace"
)

type Payload

type Payload map[string]any

Payload is a type alias for job payload data.

type PoolConfig

type PoolConfig struct {
	Name            string
	JobTypes        []string        // Job types handled by this pool
	Queues          []string        // Queues to listen on, ordered by priority
	Concurrency     int             // Number of worker goroutines (default: runtime.NumCPU())
	JobTimeout      time.Duration   // Pool-level job timeout (0 = fall back to global)
	GracePeriod     time.Duration   // Grace period after context cancel (0 = use server default)
	ShutdownTimeout time.Duration   // Shutdown wait time (0 = use server default)
	DequeueStrategy DequeueStrategy // How to select from multiple queues (default: strict)
	RetryPolicy     *RetryPolicy    // Pool-level retry defaults (nil = use job-level)
}

PoolConfig is the public configuration for defining an explicit worker pool (Layer 3). Use Server.Pool() to register a pool with this configuration.

type PoolYAML

type PoolYAML struct {
	Name            string     `yaml:"name"`
	JobTypes        []string   `yaml:"job_types"`
	Queues          []string   `yaml:"queues"`
	Concurrency     int        `yaml:"concurrency"`
	JobTimeout      int        `yaml:"job_timeout"`      // seconds
	GracePeriod     int        `yaml:"grace_period"`     // seconds
	ShutdownTimeout int        `yaml:"shutdown_timeout"` // seconds
	DequeueStrategy string     `yaml:"dequeue_strategy"`
	Retry           *RetryYAML `yaml:"retry"`
}

PoolYAML holds worker pool configuration from YAML.

type QueueDef

type QueueDef struct {
	Name     string `yaml:"name"`
	Priority int    `yaml:"priority"`
}

QueueDef declares a named queue with optional priority metadata.

type RedisClient

type RedisClient struct {
	// contains filtered or unexported fields
}

RedisClient wraps a go-redis client with GQM-specific helpers.

func NewRedisClient

func NewRedisClient(opts ...RedisOption) (*RedisClient, error)

NewRedisClient creates a new RedisClient with the given options. If WithRedisClient was used to inject an existing *redis.Client, it is used directly and connection options (Addr, Password, DB, TLSConfig) are ignored.

func (*RedisClient) Close

func (rc *RedisClient) Close() error

Close closes the underlying Redis connection. If the client was injected via WithRedisClient, Close is a no-op — the caller retains ownership and is responsible for closing it.

func (*RedisClient) Key

func (rc *RedisClient) Key(parts ...string) string

Key returns a prefixed Redis key.

func (*RedisClient) Ping

func (rc *RedisClient) Ping(ctx context.Context) error

Ping checks the Redis connection.

func (*RedisClient) Prefix

func (rc *RedisClient) Prefix() string

Prefix returns the key prefix used by this client.

func (*RedisClient) Unwrap

func (rc *RedisClient) Unwrap() *redis.Client

Unwrap returns the underlying go-redis client for advanced operations.

type RedisConfig

type RedisConfig struct {
	Addr      string
	Password  string
	DB        int
	Prefix    string
	TLSConfig *tls.Config
	// contains filtered or unexported fields
}

RedisConfig holds Redis connection configuration.

type RedisOption

type RedisOption func(*RedisConfig)

RedisOption configures a RedisConfig.

func WithPrefix

func WithPrefix(prefix string) RedisOption

WithPrefix sets the key prefix for all GQM keys.

func WithRedisAddr

func WithRedisAddr(addr string) RedisOption

WithRedisAddr sets the Redis server address.

func WithRedisClient

func WithRedisClient(rdb *redis.Client) RedisOption

WithRedisClient injects a pre-configured *redis.Client, bypassing the built-in connection setup. This enables Redis Sentinel, Cluster, or any custom configuration supported by go-redis.

When used, connection options (WithRedisAddr, WithRedisPassword, WithRedisDB, WithRedisTLS) are ignored — only WithPrefix is still applied.

Ownership: the caller retains ownership of rdb. GQM will NOT close it — you must close it yourself after the Client/Server is done. This is safe for sharing a single *redis.Client across multiple GQM instances.

Example (Sentinel):

rdb := redis.NewFailoverClient(&redis.FailoverOptions{
    MasterName:    "mymaster",
    SentinelAddrs: []string{"sentinel1:26379", "sentinel2:26379"},
})
defer rdb.Close()
client, _ := gqm.NewClient(gqm.WithRedisClient(rdb))

func WithRedisDB

func WithRedisDB(db int) RedisOption

WithRedisDB sets the Redis database number.

func WithRedisPassword

func WithRedisPassword(password string) RedisOption

WithRedisPassword sets the Redis password.

func WithRedisTLS

func WithRedisTLS(tc *tls.Config) RedisOption

WithRedisTLS enables TLS for the Redis connection. Pass nil for default TLS configuration (system CA pool), or provide a custom *tls.Config for client certificates, custom CA, or other TLS settings.

type RedisYAML

type RedisYAML struct {
	Addr     string `yaml:"addr"`
	Password string `yaml:"password"`
	DB       int    `yaml:"db"`
	Prefix   string `yaml:"prefix"`
	TLS      bool   `yaml:"tls"` // enable TLS with system CA pool
}

RedisYAML holds Redis connection settings from YAML.

type RetryPolicy

type RetryPolicy struct {
	MaxRetry    int
	Intervals   []int         // Explicit intervals in seconds (for BackoffCustom)
	Backoff     BackoffType   // fixed, exponential, custom
	BackoffBase time.Duration // Base delay for fixed/exponential
	BackoffMax  time.Duration // Maximum delay cap for exponential
}

RetryPolicy configures retry behavior at the pool level. When set on a pool, it serves as the default for all jobs in that pool unless overridden at the job level.

type RetryYAML

type RetryYAML struct {
	MaxRetry    int    `yaml:"max_retry"`
	Intervals   []int  `yaml:"intervals"`
	Backoff     string `yaml:"backoff"`
	BackoffBase int    `yaml:"backoff_base"` // seconds
	BackoffMax  int    `yaml:"backoff_max"`  // seconds
}

RetryYAML holds retry policy configuration from YAML.

type SchedulerConfig

type SchedulerConfig struct {
	Enabled      *bool           `yaml:"enabled"`       // nil = true
	PollInterval int             `yaml:"poll_interval"` // seconds
	CronEntries  []CronEntryYAML `yaml:"cron_entries"`
}

SchedulerConfig holds scheduler settings from YAML.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server manages worker pools and processes jobs.

func NewServer

func NewServer(opts ...ServerOption) (*Server, error)

NewServer creates a new Server with the given options.

func NewServerFromConfig

func NewServerFromConfig(cfg *Config, opts ...ServerOption) (*Server, error)

NewServerFromConfig creates a Server from a Config, with optional code overrides. The config serves as the base configuration; ServerOption values always win.

func (*Server) CancelJob

func (s *Server) CancelJob(ctx context.Context, jobID string) error

CancelJob cancels a job that is ready, scheduled, or deferred. Processing jobs cannot be canceled (handler is already running).

func (*Server) ClearDLQ

func (s *Server) ClearDLQ(ctx context.Context, queue string) (int64, error)

ClearDLQ deletes jobs from the dead letter queue, up to maxBulkOps at a time. Job hashes and DAG keys are also removed. Returns the number of jobs deleted. Call repeatedly until 0 to drain fully.

func (*Server) DeleteJob

func (s *Server) DeleteJob(ctx context.Context, jobID string) error

DeleteJob removes a job completely from Redis. Processing jobs cannot be deleted (return error). Uses a Lua script for atomicity to prevent TOCTOU races where a job transitions to "processing" between the status check and deletion.

func (*Server) DisableCron

func (s *Server) DisableCron(ctx context.Context, cronID string) error

DisableCron disables a cron entry.

func (*Server) EmptyQueue

func (s *Server) EmptyQueue(ctx context.Context, queue string) (int64, error)

EmptyQueue removes pending (ready) jobs from a queue, up to maxBulkOps at a time. Processing, scheduled, and DLQ jobs are unaffected. Returns the number of jobs removed. Call repeatedly until 0 to drain fully.

func (*Server) EnableCron

func (s *Server) EnableCron(ctx context.Context, cronID string) error

EnableCron enables a cron entry.

func (*Server) Handle

func (s *Server) Handle(jobType string, handler Handler, opts ...HandleOption) error

Handle registers a handler for a job type.

func (*Server) IsQueuePaused

func (s *Server) IsQueuePaused(ctx context.Context, queue string) (bool, error)

IsQueuePaused returns whether the given queue is paused.

func (*Server) PauseQueue

func (s *Server) PauseQueue(ctx context.Context, queue string) error

PauseQueue pauses a queue. Workers will stop dequeuing from the paused queue but in-flight jobs continue to completion. New jobs can still be enqueued.

func (*Server) Pool

func (s *Server) Pool(cfg PoolConfig) error

Pool registers an explicit worker pool configuration (Layer 3). This allows grouping multiple job types into a single pool with shared concurrency, custom queues, dequeue strategy, and retry policy.

Must be called before Start(). Returns an error if the pool name is already registered or if the configuration is invalid.

func (*Server) ResumeQueue

func (s *Server) ResumeQueue(ctx context.Context, queue string) error

ResumeQueue resumes a paused queue. Workers will start dequeuing again.

func (*Server) RetryAllDLQ

func (s *Server) RetryAllDLQ(ctx context.Context, queue string) (int64, error)

RetryAllDLQ retries jobs in the dead letter queue, up to maxBulkOps at a time. Returns the number of jobs retried. Call repeatedly until 0 to drain fully.

func (*Server) RetryJob

func (s *Server) RetryJob(ctx context.Context, jobID string) error

RetryJob moves a dead-letter job back to the ready queue. Only jobs with status "dead_letter" can be retried.

func (*Server) Schedule

func (s *Server) Schedule(entry CronEntry) error

Schedule registers a cron entry for recurring job scheduling. Must be called before Start(). The entry's cron expression is parsed and validated. Duplicate IDs are rejected with ErrDuplicateCronEntry.

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start begins processing jobs. It blocks until the server is stopped via signal (SIGTERM/SIGINT) or the context is cancelled. The server is single-use: after Stop or Start returns, create a new Server instance instead of calling Start again.

func (*Server) Stop

func (s *Server) Stop()

Stop signals the server to stop.

func (*Server) TriggerCron

func (s *Server) TriggerCron(ctx context.Context, cronID string) (string, error)

TriggerCron manually triggers a cron entry, enqueuing a job immediately. Returns the new job ID. Bypasses overlap checks and cron locks (explicit user intent).

func (*Server) Use

func (s *Server) Use(mws ...MiddlewareFunc) error

Use registers middleware that wraps all handlers. Middleware is applied in the order registered: Use(a, b) executes as a → b → handler.

Must be called before Start(). Middleware is applied to handlers once at startup, so Use() and Handle() can be called in any order.

type ServerOption

type ServerOption func(*serverConfig)

ServerOption configures a Server.

func WithAPI

func WithAPI(enabled bool, addr string) ServerOption

WithAPI enables the HTTP monitoring API on the given address.

func WithAPIKeys

func WithAPIKeys(keys []AuthAPIKey) ServerOption

WithAPIKeys sets the API keys for programmatic access.

func WithAuthEnabled

func WithAuthEnabled(enabled bool) ServerOption

WithAuthEnabled enables authentication for the monitoring API.

func WithAuthUsers

func WithAuthUsers(users []AuthUser) ServerOption

WithAuthUsers sets the users for the monitoring API.

func WithDashboard

func WithDashboard(enabled bool) ServerOption

WithDashboard enables the web dashboard.

func WithDashboardDir

func WithDashboardDir(dir string) ServerOption

WithDashboardDir sets a custom directory for serving dashboard assets.

func WithDashboardPathPrefix

func WithDashboardPathPrefix(prefix string) ServerOption

WithDashboardPathPrefix sets the URL prefix for the dashboard.

func WithDefaultTimezone

func WithDefaultTimezone(tz string) ServerOption

WithDefaultTimezone sets the fallback timezone (IANA name) for cron entries that don't specify their own timezone. Defaults to UTC.

func WithGlobalTimeout

func WithGlobalTimeout(d time.Duration) ServerOption

WithGlobalTimeout sets the global default job timeout. Must be > 0; the global timeout cannot be disabled.

func WithGracePeriod

func WithGracePeriod(d time.Duration) ServerOption

WithGracePeriod sets the default grace period after context cancellation.

func WithLogLevel

func WithLogLevel(level string) ServerOption

WithLogLevel sets the log level for the auto-created logger. Only takes effect if no WithLogger() is provided. Valid values: "debug", "info", "warn", "error".

func WithLogger

func WithLogger(l *slog.Logger) ServerOption

WithLogger sets a custom slog.Logger.

func WithSchedulerEnabled

func WithSchedulerEnabled(enabled bool) ServerOption

WithSchedulerEnabled controls whether the scheduler goroutine is started. Defaults to true. Set to false for worker-only instances.

func WithSchedulerPollInterval

func WithSchedulerPollInterval(d time.Duration) ServerOption

WithSchedulerPollInterval sets the poll interval for the scheduler engine. Defaults to 1s.

func WithServerRedis

func WithServerRedis(addr string) ServerOption

WithServerRedis sets the Redis address for the server.

func WithServerRedisClient

func WithServerRedisClient(rdb *redis.Client) ServerOption

WithServerRedisClient injects a pre-configured *redis.Client for the server. This enables Redis Sentinel or any custom go-redis setup. Connection options (WithServerRedis, etc.) are ignored when this is used; only the key prefix (WithPrefix via WithServerRedisOpts) still applies.

func WithServerRedisOpts

func WithServerRedisOpts(opts ...RedisOption) ServerOption

WithServerRedisOpts sets Redis options for the server.

func WithShutdownTimeout

func WithShutdownTimeout(d time.Duration) ServerOption

WithShutdownTimeout sets the maximum wait time during graceful shutdown.

type SuccessCallbackFunc

type SuccessCallbackFunc func(ctx context.Context, job *Job)

SuccessCallbackFunc is a callback invoked after a job completes successfully.

type UserYAML

type UserYAML struct {
	Username     string `yaml:"username"`
	PasswordHash string `yaml:"password_hash"` // bcrypt hash
	Role         string `yaml:"role"`          // "admin" or "viewer"; defaults to "admin"
}

UserYAML holds a user entry from YAML.

Directories

Path Synopsis
_examples
01-email-service command
Example: Email Service
Example: Email Service
02-image-pipeline command
Example: Image Processing Pipeline
Example: Image Processing Pipeline
03-scheduled-reports command
Example: Scheduled Reports
Example: Scheduled Reports
04-order-fulfillment command
Example: Order Fulfillment
Example: Order Fulfillment
05-multi-tenant command
Example: Multi-tenant Worker Pools
Example: Multi-tenant Worker Pools
06-config-driven command
Example: Config-driven Setup
Example: Config-driven Setup
07-webhook-delivery command
Example: Idempotent Webhook Delivery
Example: Idempotent Webhook Delivery
08-monitoring command
Example: Monitoring Setup
Example: Monitoring Setup
09-dev-server command
Example: Dev Server (Programmatic)
Example: Dev Server (Programmatic)
09-dev-server/config command
Example: Dev Server (YAML Config Variant)
Example: Dev Server (YAML Config Variant)
10-advanced-features command
Example: Advanced Features (Phase 8)
Example: Advanced Features (Phase 8)
11-custom-dashboard command
Example: Custom Dashboard
Example: Custom Dashboard
cmd
gqm command
Binary gqm provides CLI utilities for the GQM queue manager.
Binary gqm provides CLI utilities for the GQM queue manager.
Package monitor provides the HTTP monitoring API and dashboard for GQM.
Package monitor provides the HTTP monitoring API and dashboard for GQM.
tui module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL