Documentation
¶
Overview ¶
Package keyedexecutor provides a generic, key-based concurrent task execution framework. Tasks with the same key are executed sequentially, while tasks with different keys can execute concurrently. This version assigns a dedicated slot (queue + worker) per key, avoiding cross-key serialization caused by hashing.
Index ¶
- type Config
- type ContextErrorTask
- type ContextGenericResultTask
- type ContextTask
- type ErrorTask
- type GenericResultTask
- type KeyedExecutor
- func (e *KeyedExecutor[K, R]) Execute(key K, fn func())
- func (e *KeyedExecutor[K, R]) ExecuteWithContext(key K, ctx context.Context, fn func(context.Context))
- func (e *KeyedExecutor[K, R]) ExecuteWithContextError(key K, ctx context.Context, fn func(context.Context) error) <-chan error
- func (e *KeyedExecutor[K, R]) ExecuteWithContextResult(key K, ctx context.Context, fn func(context.Context) (R, error)) <-chan Result[R]
- func (e *KeyedExecutor[K, R]) ExecuteWithError(key K, fn func() error) <-chan error
- func (e *KeyedExecutor[K, R]) ExecuteWithResult(key K, fn func() (R, error)) <-chan Result[R]
- func (e *KeyedExecutor[K, R]) Shutdown()
- func (e *KeyedExecutor[K, R]) Stats() (slots int, pending int)
- type Result
- type SimpleTask
- type Task
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct{}
Config is kept for API compatibility. Currently there are no tunables. Future options could include idle timeouts, caps on active slots, etc.
type ContextErrorTask ¶
type ContextErrorTask struct {
// contains filtered or unexported fields
}
ContextErrorTask executes a context-aware function returning an error, and reports the result through an error channel.
func (*ContextErrorTask) Execute ¶
func (t *ContextErrorTask) Execute()
type ContextGenericResultTask ¶
type ContextGenericResultTask[R any] struct { // contains filtered or unexported fields }
ContextGenericResultTask executes a context-aware function returning a typed result and error, and delivers the result through a typed channel.
func (*ContextGenericResultTask[R]) Execute ¶
func (t *ContextGenericResultTask[R]) Execute()
type ContextTask ¶
type ContextTask struct {
// contains filtered or unexported fields
}
ContextTask executes a function with context injection, useful for timeouts or cancellation.
func (*ContextTask) Execute ¶
func (t *ContextTask) Execute()
type ErrorTask ¶
type ErrorTask struct {
// contains filtered or unexported fields
}
ErrorTask executes a function returning an error, and reports the result through an error channel.
type GenericResultTask ¶
type GenericResultTask[R any] struct { // contains filtered or unexported fields }
GenericResultTask executes a function returning a typed result and error, and delivers the result through a typed channel.
func (*GenericResultTask[R]) Execute ¶
func (t *GenericResultTask[R]) Execute()
type KeyedExecutor ¶
type KeyedExecutor[K comparable, R any] struct { // contains filtered or unexported fields }
KeyedExecutor manages the concurrent execution of tasks partitioned by key. Tasks sharing the same key are executed serially in the order they are submitted. Internally, each key gets its own slot (queue + worker). The worker drains the queue and exits when the queue becomes empty. New tasks for that key will spawn a new worker on demand.
func New ¶
func New[K comparable, R any](config ...Config) *KeyedExecutor[K, R]
New creates a new KeyedExecutor instance. Per-key workers are created on demand and exit when their queue becomes empty.
func (*KeyedExecutor[K, R]) Execute ¶
func (e *KeyedExecutor[K, R]) Execute(key K, fn func())
Execute schedules a simple task (no context, no result) for the given key.
func (*KeyedExecutor[K, R]) ExecuteWithContext ¶
func (e *KeyedExecutor[K, R]) ExecuteWithContext(key K, ctx context.Context, fn func(context.Context))
ExecuteWithContext schedules a context-aware task for the given key.
func (*KeyedExecutor[K, R]) ExecuteWithContextError ¶
func (e *KeyedExecutor[K, R]) ExecuteWithContextError(key K, ctx context.Context, fn func(context.Context) error) <-chan error
ExecuteWithContextError schedules a context-aware task that returns an error. Returns a channel that receives the task's error result.
func (*KeyedExecutor[K, R]) ExecuteWithContextResult ¶
func (e *KeyedExecutor[K, R]) ExecuteWithContextResult(key K, ctx context.Context, fn func(context.Context) (R, error)) <-chan Result[R]
ExecuteWithContextResult schedules a context-aware typed result task and returns a channel for the result.
func (*KeyedExecutor[K, R]) ExecuteWithError ¶
func (e *KeyedExecutor[K, R]) ExecuteWithError(key K, fn func() error) <-chan error
ExecuteWithError schedules a task that returns an error for the given key. Returns a channel that receives the task's error result.
func (*KeyedExecutor[K, R]) ExecuteWithResult ¶
func (e *KeyedExecutor[K, R]) ExecuteWithResult(key K, fn func() (R, error)) <-chan Result[R]
ExecuteWithResult schedules a typed result task and returns a channel for the result.
func (*KeyedExecutor[K, R]) Shutdown ¶
func (e *KeyedExecutor[K, R]) Shutdown()
Shutdown waits for all tasks to complete and all per-key workers to finish. Pending tasks continue to run to completion. New tasks should not be enqueued after Shutdown.
func (*KeyedExecutor[K, R]) Stats ¶
func (e *KeyedExecutor[K, R]) Stats() (slots int, pending int)
Stats returns the number of active slots (keys with pending or running tasks) and the total number of queued tasks across all slots.
type Result ¶
Result wraps a generic return value with an error. It is used for communicating the outcome of async tasks with result.
type SimpleTask ¶
type SimpleTask struct {
// contains filtered or unexported fields
}
SimpleTask executes a no-argument function with no return value.
func (*SimpleTask) Execute ¶
func (t *SimpleTask) Execute()