goworker2

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2025 License: MIT Imports: 0 Imported by: 0

README

goworker2

Build GoDoc

goworker2 is a Go-based background job processing library with pluggable components. It provides a clean, modular architecture supporting multiple queue backends, serializers, and statistics providers.

Originally inspired by Resque-compatible job processing, goworker2 has evolved into a flexible framework that can work with Redis, RabbitMQ, and custom backends.

Note: This is a complete rewrite and modernization of the original goworker library by Benjamin Manns, designed as a new project rather than a backwards-compatible upgrade. We're grateful for the inspiration and foundation provided by the original work.

Features

  • Multiple Queue Backends: Redis, RabbitMQ, or bring your own
  • Pluggable Serializers: JSON, Resque, Sneakers/ActiveJob, or custom formats
  • Statistics Providers: Resque-compatible, NoOp, or custom monitoring
  • Pre-configured Engines: Ready-to-use setups for common scenarios
  • Graceful Shutdown: Proper signal handling and worker cleanup
  • Concurrent Processing: Configurable worker pools with job distribution
  • Health Monitoring: Built-in health checks and statistics

Quick Start

Using Pre-configured Engines

The easiest way to get started is with pre-configured engines:

Redis with Resque Compatibility
package main

import (
	"context"
	"log"
	"time"
	
	"github.com/BranchIntl/goworker2/engines"
)

func emailJob(queue string, args ...interface{}) error {
	// Process email job
	return nil
}

func main() {
	options := engines.DefaultResqueOptions()
	options.Queues = []string{"email", "default"}
	options.PollInterval = 3 * time.Second
	
	engine := engines.NewResqueEngine(options)
	engine.Register("EmailJob", emailJob)
	
	if err := engine.Run(context.Background()); err != nil {
		log.Fatal(err)
	}
}
RabbitMQ with ActiveJob Compatibility
package main

import (
	"context"
	"log"
	
	"github.com/BranchIntl/goworker2/engines"
)

func imageProcessor(queue string, args ...interface{}) error {
	// Process image
	return nil
}

func main() {
	options := engines.DefaultSneakersOptions()
	options.Queues = []string{"images", "default"}
	
	engine := engines.NewSneakersEngine(options)
	engine.Register("ImageProcessor", imageProcessor)
	
	if err := engine.Run(context.Background()); err != nil {
		log.Fatal(err)
	}
}
Custom Configuration

For more control, you can configure components manually:

package main

import (
	"context"
	"log"
	"time"
	
	"github.com/BranchIntl/goworker2/brokers/redis"
	"github.com/BranchIntl/goworker2/core"
	"github.com/BranchIntl/goworker2/registry"
	"github.com/BranchIntl/goworker2/serializers/resque"
	"github.com/BranchIntl/goworker2/statistics/resque"
)

func main() {
	// Configure broker with queues
	brokerOpts := redis.DefaultOptions()
	brokerOpts.Queues = []string{"critical", "default"}
	brokerOpts.PollInterval = 5 * time.Second
	
	// Create components
	serializer := resque.NewSerializer()
	broker := redis.NewBroker(brokerOpts, serializer)
	stats := resque.NewStatistics(resque.DefaultOptions())
	registry := registry.NewRegistry()
	
	// Create engine with custom options
	engine := core.NewEngine(
		broker,
		stats,
		registry,
		core.WithConcurrency(10),
		core.WithShutdownTimeout(30*time.Second),
		core.WithJobBufferSize(200),
	)
	
	// Register workers
	registry.Register("MyJob", func(queue string, args ...interface{}) error {
		// Handle job
		return nil
	})
	
	// Start processing
	if err := engine.Run(context.Background()); err != nil {
		log.Fatal(err)
	}
}

Installation

go get github.com/BranchIntl/goworker2

Acknowledgments

This project was inspired by and builds upon the concepts from the original goworker library by Benjamin Manns. While this is a complete rewrite with different architecture and capabilities, we acknowledge and appreciate the foundational work that made this project possible.

Architecture

goworker2 uses a modular architecture with dependency injection:

┌─────────────────┐
│     Engine      │  ← Orchestrates components
├─────────────────┤
│   Broker        │  ← Queue backend with job consumption
│   Statistics    │  ← Metrics and monitoring
│   Registry      │  ← Worker function registry
│   Serializer    │  ← Job serialization format
│   WorkerPool    │  ← Manages concurrent workers
└─────────────────┘
Components
  • Broker: Handles queue operations and job consumption (enqueue, ack/nack, polling/pushing)
  • Statistics: Records metrics and worker information
  • Registry: Maps job classes to worker functions
  • Serializer: Converts jobs to/from bytes
  • Engine: Orchestrates all components and handles lifecycle
Pre-configured Engines
  • ResqueEngine: Redis + Resque serializer + Resque statistics (Ruby Resque compatibility)
  • SneakersEngine: RabbitMQ + ActiveJob serializer + NoOp statistics (Rails ActiveJob compatibility)

See engines/ directory for detailed engine documentation.

Configuration

Engine Options
engine := core.NewEngine(
	broker, stats, registry, serializer,
	core.WithConcurrency(25),                    // Number of workers
	core.WithShutdownTimeout(30*time.Second),    // Graceful shutdown timeout
	core.WithJobBufferSize(100),                 // Job channel buffer
)
Broker Options
Redis
options := redis.DefaultOptions()
options.URI = "redis://localhost:6379/"
options.Namespace = "jobs:"
options.Queues = []string{"high", "low"}        // Queues to consume from
options.PollInterval = 5 * time.Second          // Polling frequency
options.MaxConnections = 10
RabbitMQ
options := rabbitmq.DefaultOptions()
options.URI = "amqp://guest:guest@localhost:5672/"
options.Exchange = "jobs"
options.Queues = []string{"high", "low"}        // Queues to consume from
options.PrefetchCount = 1

Logging

goworker2 uses Go's standard log/slog library for structured logging. By default, it uses the default slog logger. To customize logging, configure your logger before creating the engine using slog.SetDefault(logger). For example: slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))).

Worker Functions

Worker functions must match this signature:

func(queue string, args ...interface{}) error
Type Assertions

Use Go type assertions to handle job arguments:

func processUser(queue string, args ...interface{}) error {
	if len(args) != 2 {
		return fmt.Errorf("expected 2 arguments, got %d", len(args))
	}
	
	userID, ok := args[0].(float64)  // JSON numbers are float64
	if !ok {
		return fmt.Errorf("invalid user ID type")
	}
	
	action, ok := args[1].(string)
	if !ok {
		return fmt.Errorf("invalid action type")
	}
	
	// Process user
	return processUserAction(int(userID), action)
}

Signal Handling

goworker handles these signals automatically:

  • SIGINT/SIGTERM: Graceful shutdown
  • Custom signals: Can be handled in advanced examples
// Automatic signal handling
engine.Run(ctx)  // Blocks until SIGINT/SIGTERM

// Manual control
engine.Start(ctx)
// ... custom signal handling ...
engine.Stop()

Testing

For testing, use mocks or lightweight alternatives like miniredis for Redis, or run actual brokers in Docker containers for integration tests.

Examples

Complete working examples are available in the examples/ directory covering both pre-configured engines and manual component setup.

Monitoring and Health

Health Checks
health := engine.Health()
fmt.Printf("Healthy: %v\n", health.Healthy)
fmt.Printf("Active Workers: %d\n", health.ActiveWorkers)
for queue, count := range health.QueuedJobs {
	fmt.Printf("Queue %s: %d jobs\n", queue, count)
}
Statistics
stats, err := engine.GetStats().GetGlobalStats(ctx)
if err == nil {
	fmt.Printf("Total Processed: %d\n", stats.TotalProcessed)
	fmt.Printf("Total Failed: %d\n", stats.TotalFailed)
}

Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -am 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Overview

Package goworker2 provides a Go-based background job processing library with pluggable components and modular architecture.

Originally inspired by Resque-compatible job processing, goworker2 has evolved into a flexible framework supporting multiple queue backends (Redis, RabbitMQ), serializers (JSON, Resque, Sneakers/ActiveJob), and statistics providers.

Architecture

goworker uses dependency injection with these core components:

  • Broker: Handles queue operations and job consumption (Redis, RabbitMQ)
  • Statistics: Records metrics and monitoring data
  • Registry: Maps job classes to worker functions
  • Serializer: Converts jobs to/from bytes
  • Engine: Orchestrates all components and handles lifecycle

Quick Start with Pre-configured Engines

For Resque compatibility with Redis:

import "github.com/BranchIntl/goworker2/engines"

func emailJob(queue string, args ...interface{}) error {
	// Process email job
	return nil
}

func main() {
	options := engines.DefaultResqueOptions()
	options.Queues = []string{"email", "default"}
	options.PollInterval = 3 * time.Second

	engine := engines.NewResqueEngine(options)
	engine.Register("EmailJob", emailJob)
	engine.Run(context.Background())
}

For ActiveJob compatibility with RabbitMQ:

import "github.com/BranchIntl/goworker2/engines"

func imageProcessor(queue string, args ...interface{}) error {
	// Process image
	return nil
}

func main() {
	options := engines.DefaultSneakersOptions()
	options.Queues = []string{"images", "default"}

	engine := engines.NewSneakersEngine(options)
	engine.Register("ImageProcessor", imageProcessor)
	engine.Run(context.Background())
}

Custom Configuration

For complete control over components:

import (
	"context"
	"github.com/BranchIntl/goworker2/brokers/redis"
	"github.com/BranchIntl/goworker2/core"
	"github.com/BranchIntl/goworker2/registry"
	"github.com/BranchIntl/goworker2/serializers/resque"
	"github.com/BranchIntl/goworker2/statistics/resque"
)

func main() {
	// Configure broker with queues
	brokerOpts := redis.DefaultOptions()
	brokerOpts.Queues = []string{"critical", "default"}
	brokerOpts.PollInterval = 5 * time.Second

	// Create components
	serializer := resque.NewSerializer()
	broker := redis.NewBroker(brokerOpts, serializer)
	stats := resque.NewStatistics(resque.DefaultOptions())
	reg := registry.NewRegistry()

	// Create engine
	engine := core.NewEngine(
		broker,    // implements core.Broker
		stats,     // implements core.Statistics
		reg,       // implements core.Registry
		core.WithConcurrency(10),
		core.WithShutdownTimeout(30*time.Second),
	)

	// Register workers
	reg.Register("EmailJob", sendEmail)

	// Start processing
	engine.Run(context.Background())
}

Worker Functions

Worker functions must match this signature:

func(queue string, args ...interface{}) error

Use type assertions to handle arguments:

func processUser(queue string, args ...interface{}) error {
	userID, ok := args[0].(float64)  // JSON numbers are float64
	if !ok {
		return fmt.Errorf("invalid user ID")
	}
	// Process user...
	return nil
}

Signal Handling

The engine.Run() method automatically handles SIGINT and SIGTERM for graceful shutdown. For manual control:

engine.Start(ctx)
// Custom signal handling...
engine.Stop()

Testing

For testing, use mocks or lightweight alternatives like miniredis for Redis, or run actual brokers in Docker containers for integration tests.

Available Engines

ResqueEngine: Redis + Resque serializer + Resque statistics - Compatible with Ruby Resque - Uses Redis for queuing and statistics - Configure queues and poll interval via ResqueOptions

SneakersEngine: RabbitMQ + Sneakers serializer + NoOp statistics - Compatible with Rails ActiveJob/Sneakers - Uses RabbitMQ for queuing - Configure queues via SneakersOptions

Health Monitoring

health := engine.Health()
if health.Healthy {
	fmt.Printf("Active workers: %d\n", health.ActiveWorkers)
	for queue, count := range health.QueuedJobs {
		fmt.Printf("Queue %s: %d jobs\n", queue, count)
	}
}

Directories

Path Synopsis
brokers
Package engines provides pre-configured engine setups for common background job processing scenarios.
Package engines provides pre-configured engine setups for common background job processing scenarios.
Package errors provides error types and utilities for the goworker library.
Package errors provides error types and utilities for the goworker library.
examples
rabbitmq-basic command
redis-advanced command
redis-basic command
internal
Package job defines the core job interface and related types for the goworker library.
Package job defines the core job interface and related types for the goworker library.
serializers
statistics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL