keyedexecutor

package module
v0.0.0-...-8c72b0a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2025 License: MIT Imports: 3 Imported by: 0

README

!!!! Not Production Ready

KeyedExecutor

KeyedExecutor is a Go library for concurrent task execution with key-based grouping. It ensures that tasks with the same key are executed sequentially, while tasks with different keys can run in parallel.

Features

  • Execute tasks concurrently with controlled parallelism
  • Group-based task scheduling using keys
  • Tasks with the same key are executed sequentially
  • Tasks with different keys can run in parallel
  • Simple, intuitive API

Installation

go get github.com/cch123/keyedexecutor

Usage

package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/cch123/keyedexecutor"
)

func main() {
	// Create a new executor with a maximum of 10 concurrent tasks
	exec := keyedexecutor.New[string](keyedexecutor.Config{
		WorkerCount: 4,
	})

	// Execute a task with key "user1"
	ctx := context.Background()
	ctx = context.WithValue(ctx, "user", "user1")
	exec.ExecuteWithContext("user1", ctx, func(ctx context.Context) {
		fmt.Println("Processing task for user1")
		time.Sleep(100 * time.Millisecond)
		fmt.Println("Processing task for user1, read ctx value, User:", ctx.Value("user"))
	})

	// Tasks with the same key run sequentially
	exec.Execute("user1", func() {
		fmt.Println("Another task for user1, runs after the first one")
	})

	// Tasks with different keys can run in parallel
	err := <-exec.ExecuteWithError("user2", func() error {
		fmt.Println("Processing task for user2 (can run in parallel with user1 tasks)")
		return errors.New("example error")
	})
	fmt.Println("err returned", err)

	err = <-exec.ExecuteWithContextError("user3", ctx, func(ctx context.Context) error {
		fmt.Println("Processing task for user3 (can run in parallel with user1 tasks)")

		time.Sleep(200 * time.Millisecond)
		fmt.Println("Processing task for user3, read ctx value, User:", ctx.Value("user"))
		return errors.New("example error for user3")
	})

	fmt.Println("err returned for user3", err)

	fmt.Printf("executor stats ")
	workerCount, pendingTasks := exec.Stats()
	fmt.Println("worker count:", workerCount, "pending tasks:", pendingTasks)
	time.Sleep(time.Second)
}

API Documentation

Creating a New Executor
// Create a new executor with a specified maximum number of concurrent tasks
executor := keyedexecutor.New[int](keyedexecutor.Config{100})
Executing Tasks
// Execute a task with a specific key
executor.Execute(key string, task func(context.Context) error)

// Execute a task with a specific key and context
executor.ExecuteWithContext(ctx context.Context, key string, task func(context.Context) error)

Benchmark

~/keyedExecutor ❯❯❯ go test -bench=.
goos: darwin
goarch: arm64
pkg: github.com/cch123/keyedexecutor
cpu: Apple M2 Max
BenchmarkKeyedExecutor_SingleKey-12       	 2412008	       483.2 ns/op
BenchmarkKeyedExecutor_MultipleKeys-12    	 2222204	       515.9 ns/op
BenchmarkKeyedExecutor_WithContext-12     	 2425419	       494.1 ns/op
BenchmarkKeyedExecutor_WithError-12       	 2445438	       490.5 ns/op
PASS
ok  	github.com/cch123/keyedexecutor	7.683s

License

MIT License

Documentation

Overview

Package keyedexecutor provides a generic, key-based concurrent task execution framework. Tasks with the same key are executed sequentially, while tasks with different keys can execute concurrently. This version assigns a dedicated slot (queue + worker) per key, avoiding cross-key serialization caused by hashing.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct{}

Config is kept for API compatibility. Currently there are no tunables. Future options could include idle timeouts, caps on active slots, etc.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default configuration.

type ContextErrorTask

type ContextErrorTask struct {
	// contains filtered or unexported fields
}

ContextErrorTask executes a context-aware function returning an error, and reports the result through an error channel.

func (*ContextErrorTask) Execute

func (t *ContextErrorTask) Execute()

type ContextGenericResultTask

type ContextGenericResultTask[R any] struct {
	// contains filtered or unexported fields
}

ContextGenericResultTask executes a context-aware function returning a typed result and error, and delivers the result through a typed channel.

func (*ContextGenericResultTask[R]) Execute

func (t *ContextGenericResultTask[R]) Execute()

type ContextTask

type ContextTask struct {
	// contains filtered or unexported fields
}

ContextTask executes a function with context injection, useful for timeouts or cancellation.

func (*ContextTask) Execute

func (t *ContextTask) Execute()

type ErrorTask

type ErrorTask struct {
	// contains filtered or unexported fields
}

ErrorTask executes a function returning an error, and reports the result through an error channel.

func (*ErrorTask) Execute

func (t *ErrorTask) Execute()

type GenericResultTask

type GenericResultTask[R any] struct {
	// contains filtered or unexported fields
}

GenericResultTask executes a function returning a typed result and error, and delivers the result through a typed channel.

func (*GenericResultTask[R]) Execute

func (t *GenericResultTask[R]) Execute()

type KeyedExecutor

type KeyedExecutor[K comparable, R any] struct {
	// contains filtered or unexported fields
}

KeyedExecutor manages the concurrent execution of tasks partitioned by key. Tasks sharing the same key are executed serially in the order they are submitted. Internally, each key gets its own slot (queue + worker). The worker drains the queue and exits when the queue becomes empty. New tasks for that key will spawn a new worker on demand.

func New

func New[K comparable, R any](config ...Config) *KeyedExecutor[K, R]

New creates a new KeyedExecutor instance. Per-key workers are created on demand and exit when their queue becomes empty.

func (*KeyedExecutor[K, R]) Execute

func (e *KeyedExecutor[K, R]) Execute(key K, fn func())

Execute schedules a simple task (no context, no result) for the given key.

func (*KeyedExecutor[K, R]) ExecuteWithContext

func (e *KeyedExecutor[K, R]) ExecuteWithContext(key K, ctx context.Context, fn func(context.Context))

ExecuteWithContext schedules a context-aware task for the given key.

func (*KeyedExecutor[K, R]) ExecuteWithContextError

func (e *KeyedExecutor[K, R]) ExecuteWithContextError(key K, ctx context.Context, fn func(context.Context) error) <-chan error

ExecuteWithContextError schedules a context-aware task that returns an error. Returns a channel that receives the task's error result.

func (*KeyedExecutor[K, R]) ExecuteWithContextResult

func (e *KeyedExecutor[K, R]) ExecuteWithContextResult(key K, ctx context.Context, fn func(context.Context) (R, error)) <-chan Result[R]

ExecuteWithContextResult schedules a context-aware typed result task and returns a channel for the result.

func (*KeyedExecutor[K, R]) ExecuteWithError

func (e *KeyedExecutor[K, R]) ExecuteWithError(key K, fn func() error) <-chan error

ExecuteWithError schedules a task that returns an error for the given key. Returns a channel that receives the task's error result.

func (*KeyedExecutor[K, R]) ExecuteWithResult

func (e *KeyedExecutor[K, R]) ExecuteWithResult(key K, fn func() (R, error)) <-chan Result[R]

ExecuteWithResult schedules a typed result task and returns a channel for the result.

func (*KeyedExecutor[K, R]) Shutdown

func (e *KeyedExecutor[K, R]) Shutdown()

Shutdown waits for all tasks to complete and all per-key workers to finish. Pending tasks continue to run to completion. New tasks should not be enqueued after Shutdown.

func (*KeyedExecutor[K, R]) Stats

func (e *KeyedExecutor[K, R]) Stats() (slots int, pending int)

Stats returns the number of active slots (keys with pending or running tasks) and the total number of queued tasks across all slots.

type Result

type Result[R any] struct {
	Value R
	Err   error
}

Result wraps a generic return value with an error. It is used for communicating the outcome of async tasks with result.

type SimpleTask

type SimpleTask struct {
	// contains filtered or unexported fields
}

SimpleTask executes a no-argument function with no return value.

func (*SimpleTask) Execute

func (t *SimpleTask) Execute()

type Task

type Task interface {
	Execute()
}

Task represents a unit of executable logic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL