Documentation
¶
Overview ¶
Package goworker2 provides a Go-based background job processing library with pluggable components and modular architecture.
Originally inspired by Resque-compatible job processing, goworker2 has evolved into a flexible framework supporting multiple queue backends (Redis, RabbitMQ), serializers (JSON, Resque, Sneakers/ActiveJob), and statistics providers.
Architecture ¶
goworker uses dependency injection with these core components:
- Broker: Handles queue operations and job consumption (Redis, RabbitMQ)
- Statistics: Records metrics and monitoring data
- Registry: Maps job classes to worker functions
- Serializer: Converts jobs to/from bytes
- Engine: Orchestrates all components and handles lifecycle
Quick Start with Pre-configured Engines ¶
For Resque compatibility with Redis:
import "github.com/BranchIntl/goworker2/engines"
func emailJob(queue string, args ...interface{}) error {
// Process email job
return nil
}
func main() {
options := engines.DefaultResqueOptions()
options.Queues = []string{"email", "default"}
options.PollInterval = 3 * time.Second
engine := engines.NewResqueEngine(options)
engine.Register("EmailJob", emailJob)
engine.Run(context.Background())
}
For ActiveJob compatibility with RabbitMQ:
import "github.com/BranchIntl/goworker2/engines"
func imageProcessor(queue string, args ...interface{}) error {
// Process image
return nil
}
func main() {
options := engines.DefaultSneakersOptions()
options.Queues = []string{"images", "default"}
engine := engines.NewSneakersEngine(options)
engine.Register("ImageProcessor", imageProcessor)
engine.Run(context.Background())
}
Custom Configuration ¶
For complete control over components:
import (
"context"
"github.com/BranchIntl/goworker2/brokers/redis"
"github.com/BranchIntl/goworker2/core"
"github.com/BranchIntl/goworker2/registry"
"github.com/BranchIntl/goworker2/serializers/resque"
"github.com/BranchIntl/goworker2/statistics/resque"
)
func main() {
// Configure broker with queues
brokerOpts := redis.DefaultOptions()
brokerOpts.Queues = []string{"critical", "default"}
brokerOpts.PollInterval = 5 * time.Second
// Create components
serializer := resque.NewSerializer()
broker := redis.NewBroker(brokerOpts, serializer)
stats := resque.NewStatistics(resque.DefaultOptions())
reg := registry.NewRegistry()
// Create engine
engine := core.NewEngine(
broker, // implements core.Broker
stats, // implements core.Statistics
reg, // implements core.Registry
core.WithConcurrency(10),
core.WithShutdownTimeout(30*time.Second),
)
// Register workers
reg.Register("EmailJob", sendEmail)
// Start processing
engine.Run(context.Background())
}
Worker Functions ¶
Worker functions must match this signature:
func(queue string, args ...interface{}) error
Use type assertions to handle arguments:
func processUser(queue string, args ...interface{}) error {
userID, ok := args[0].(float64) // JSON numbers are float64
if !ok {
return fmt.Errorf("invalid user ID")
}
// Process user...
return nil
}
Signal Handling ¶
The engine.Run() method automatically handles SIGINT and SIGTERM for graceful shutdown. For manual control:
engine.Start(ctx) // Custom signal handling... engine.Stop()
Testing ¶
For testing, use mocks or lightweight alternatives like miniredis for Redis, or run actual brokers in Docker containers for integration tests.
Available Engines ¶
ResqueEngine: Redis + Resque serializer + Resque statistics - Compatible with Ruby Resque - Uses Redis for queuing and statistics - Configure queues and poll interval via ResqueOptions
SneakersEngine: RabbitMQ + Sneakers serializer + NoOp statistics - Compatible with Rails ActiveJob/Sneakers - Uses RabbitMQ for queuing - Configure queues via SneakersOptions
Health Monitoring ¶
health := engine.Health()
if health.Healthy {
fmt.Printf("Active workers: %d\n", health.ActiveWorkers)
for queue, count := range health.QueuedJobs {
fmt.Printf("Queue %s: %d jobs\n", queue, count)
}
}
Directories
¶
| Path | Synopsis |
|---|---|
|
brokers
|
|
|
Package engines provides pre-configured engine setups for common background job processing scenarios.
|
Package engines provides pre-configured engine setups for common background job processing scenarios. |
|
Package errors provides error types and utilities for the goworker library.
|
Package errors provides error types and utilities for the goworker library. |
|
examples
|
|
|
rabbitmq-advanced
command
|
|
|
rabbitmq-basic
command
|
|
|
redis-advanced
command
|
|
|
redis-basic
command
|
|
|
internal
|
|
|
Package job defines the core job interface and related types for the goworker library.
|
Package job defines the core job interface and related types for the goworker library. |
|
serializers
|
|
|
statistics
|
|