nsqconsumer

package module
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2026 License: MIT Imports: 17 Imported by: 0

README

Package nsqconsumer v1.6.0

Telemetry

Telemetry is enabled by default, and can be disabled with WithoutTelemetry set to true in initialization options.

It records the duration of each handling (in seconds). The metric uses the scalingo.nsq.topic, scalingo.nsq.channel, scalingo.nsq.message_type, and scalingo.nsq.status attributes (success or error).

Metrics:

  • scalingo.nsq_consumer.message.duration: handling duration in seconds

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewError

func NewError(err error, opts ErrorOpts) error

Types

type Consumer

type Consumer interface {
	Start(ctx context.Context) func()
}

func New

func New(opts ConsumerOpts) (Consumer, error)

type ConsumerOpts

type ConsumerOpts struct {
	NsqConfig      *nsq.Config
	NsqLookupdURLs []string
	Topic          string
	Channel        string
	MaxInFlight    int
	SkipLogSet     map[string]bool
	LogLevel       LogLevel
	// PostponeProducer is an NSQ producer user to send postponed messages
	PostponeProducer nsqproducer.Producer
	// How long can the consumer keep the message before the message is considered as 'Timed Out'
	MsgTimeout     time.Duration
	MessageHandler func(context.Context, *NsqMessageDeserialize) error
	DisableBackoff bool
	// WithoutTelemetry indicates whether OpenTelemetry instrumentation should be disabled
	WithoutTelemetry bool
}

type Error

type Error struct {
	// contains filtered or unexported fields
}

func (Error) Error

func (nsqerr Error) Error() string

func (Error) NoRetry added in v1.3.2

func (nsqerr Error) NoRetry() bool

NoRetry returns true if the message should not be retried to be handled

func (Error) Unwrap added in v1.3.2

func (nsqerr Error) Unwrap() error

Unwrap returns the cause of the error to be compatible with errors.As/Is()

type ErrorOpts

type ErrorOpts struct {
	NoRetry bool
}

type LogLevel added in v1.3.0

type LogLevel int

LogLevel is a wrapper around nsq.LogLevel to ensure that the default log level is set to Warning and not Debug

const (
	// DefaultLogLevel is the default log level for NSQ when no log level is provided
	DefaultLogLevel LogLevel = iota
	LogLevelDebug
	LogLevelInfo
	LogLevelWarning
	LogLevelError
)

func ParseLogLevel added in v1.3.0

func ParseLogLevel(logLevel string) LogLevel

type NsqMessageDeserialize

type NsqMessageDeserialize struct {
	RequestID string          `json:"request_id"`
	Type      string          `json:"type"`
	At        int64           `json:"at"`
	Payload   json.RawMessage `json:"payload"`
	NsqMsg    *nsq.Message
}

func FromMessageSerialize

func FromMessageSerialize(msg *nsqproducer.NsqMessageSerialize) *NsqMessageDeserialize

FromMessageSerialize let you transform a Serialized message to a DeserializeMessage for a consumer Its use is mostly for testing as writing manual `json.RawMessage` is boring

func (*NsqMessageDeserialize) TouchUntilClosed

func (msg *NsqMessageDeserialize) TouchUntilClosed() chan<- struct{}

TouchUntilClosed returns a channel which has to be closed by the called Until the channel is closed, the NSQ message will be touched every 40 secs to ensure NSQ does not consider the message as failed because of time out.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL