simultaneously

package
v0.0.0-...-c514b9c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2026 License: MIT Imports: 12 Imported by: 0

README

simultaneously

Go Reference

A Go library for safe, controlled parallel execution with automatic panic recovery, context cancellation, and error handling.

Table of Contents

Purpose

The simultaneously package provides utilities for running functions concurrently with controlled parallelism. It solves common challenges in concurrent programming:

  • Controlled Concurrency: Limit how many goroutines run at once to prevent resource exhaustion
  • Panic Recovery: Automatically recover from panics and convert them to errors
  • Context Cancellation: Stop all work when one function fails or context is canceled
  • Error Aggregation: Collect and return errors from parallel operations
  • Type-Safe Transformations: Generic functions for transforming slices, maps, and sets in parallel

Core Concepts

Executor

The Executor interface manages concurrent execution with configurable concurrency limits:

type Executor interface {
    // Execute a function asynchronously with context support
    GoContext(ctx context.Context, fn func(context.Context) error, done func(error))

    // Execute a function asynchronously (convenience wrapper)
    Go(fn func(context.Context) error, done func(error))

    // Shut down the executor and wait for completion
    Close() error
}

Key Features:

  • Semaphore-based concurrency control
  • Thread-safe execution management
  • Graceful shutdown with completion tracking
  • Reusable across multiple batches of work
Concurrency Control

The maxConcurrent parameter controls how many operations run simultaneously:

  • maxConcurrent > 0: Limit to N concurrent operations
  • maxConcurrent <= 0: Unlimited concurrency (bounded only by goroutine scheduler)
  • Automatically capped at number of items to process
Panic Recovery

All functions automatically recover from panics and convert them to errors:

err := simultaneously.Do(2,
    func(ctx context.Context) error {
        panic("something went wrong") // Recovered and returned as error
    },
)
// err contains: "recovered from panic: something went wrong" + stack trace
Context Cancellation

When any function returns an error, the shared context is canceled to stop remaining work:

err := simultaneously.DoCtx(ctx, 3,
    func(ctx context.Context) error {
        return errors.New("failed") // Triggers cancellation
    },
    func(ctx context.Context) error {
        // This should check ctx.Done() and exit early
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            // Do work...
        }
    },
)

Installation

go get github.com/amp-labs/amp-common/simultaneously

Quick Start

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/amp-labs/amp-common/simultaneously"
)

func main() {
    // Run 3 tasks with max 2 concurrent
    err := simultaneously.Do(2,
        func(ctx context.Context) error {
            fmt.Println("Task 1")
            time.Sleep(100 * time.Millisecond)
            return nil
        },
        func(ctx context.Context) error {
            fmt.Println("Task 2")
            time.Sleep(100 * time.Millisecond)
            return nil
        },
        func(ctx context.Context) error {
            fmt.Println("Task 3")
            time.Sleep(100 * time.Millisecond)
            return nil
        },
    )

    if err != nil {
        fmt.Printf("Error: %v\n", err)
    }
}

Basic Usage

Running Multiple Functions

Use Do or DoCtx to run multiple functions in parallel:

// Without context (uses context.Background)
err := simultaneously.Do(maxConcurrent,
    func(ctx context.Context) error {
        // Task 1
        return nil
    },
    func(ctx context.Context) error {
        // Task 2
        return nil
    },
)

// With context for cancellation/timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err = simultaneously.DoCtx(ctx, maxConcurrent,
    func(ctx context.Context) error {
        // Task 1
        return nil
    },
    func(ctx context.Context) error {
        // Task 2
        return nil
    },
)
Transforming Slices

Transform slice elements in parallel while preserving order:

numbers := []int{1, 2, 3, 4, 5}

// Double each number in parallel (max 2 concurrent)
doubled, err := simultaneously.MapSlice(2, numbers,
    func(ctx context.Context, n int) (int, error) {
        return n * 2, nil
    },
)
// doubled = [2, 4, 6, 8, 10]
Transforming Maps

Transform map entries in parallel:

input := map[string]int{
    "a": 1,
    "b": 2,
    "c": 3,
}

// Convert to map[int]string in parallel
output, err := simultaneously.MapGoMap(2, input,
    func(ctx context.Context, k string, v int) (int, string, error) {
        return v, strings.ToUpper(k), nil
    },
)
// output = map[int]string{1: "A", 2: "B", 3: "C"}

Advanced Usage

Reusing Executors

For multiple batches of work, reuse an executor to avoid creation overhead:

// Create executor once
exec := simultaneously.NewDefaultExecutor(3) // max 3 concurrent
defer exec.Close()

// Process multiple batches
batch1 := []int{1, 2, 3, 4, 5}
batch2 := []int{6, 7, 8, 9, 10}

result1, err := simultaneously.MapSliceWithExecutor(exec, batch1,
    func(ctx context.Context, n int) (int, error) {
        return n * 2, nil
    },
)
if err != nil {
    return err
}

result2, err := simultaneously.MapSliceWithExecutor(exec, batch2,
    func(ctx context.Context, n int) (int, error) {
        return n * 2, nil
    },
)
if err != nil {
    return err
}
Custom Executor Implementation

Implement the Executor interface for custom behavior:

type CustomExecutor struct {
    // Your custom fields
}

func (e *CustomExecutor) GoContext(ctx context.Context, fn func(context.Context) error, done func(error)) {
    // Your custom execution logic
}

func (e *CustomExecutor) Go(fn func(context.Context) error, done func(error)) {
    e.GoContext(context.Background(), fn, done)
}

func (e *CustomExecutor) Close() error {
    // Your custom cleanup logic
    return nil
}

// Use your custom executor
exec := &CustomExecutor{}
err := simultaneously.DoWithExecutor(exec, tasks...)
Flat Mapping

Expand each input into multiple outputs (flattening):

// FlatMapSlice - expand strings into characters
words := []string{"hello", "world"}
chars, err := simultaneously.FlatMapSlice(2, words,
    func(ctx context.Context, word string) ([]rune, error) {
        return []rune(word), nil
    },
)
// chars = ['h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd']

// FlatMapGoMap - expand each entry into multiple entries
input := map[string]int{"a": 2, "b": 3}
output, err := simultaneously.FlatMapGoMap(2, input,
    func(ctx context.Context, k string, v int) (map[string]int, error) {
        result := make(map[string]int)
        for i := 0; i < v; i++ {
            result[fmt.Sprintf("%s%d", k, i)] = i
        }
        return result, nil
    },
)
// output = map[string]int{"a0": 0, "a1": 1, "b0": 0, "b1": 1, "b2": 2}

Data Transformations

The package provides parallel transformation functions for various data structures:

Slices
Function Description Order Preserved
MapSlice Transform each element
FlatMapSlice Transform and flatten
Go Maps (standard map[K]V)
Function Description Order Preserved
MapGoMap Transform key-value pairs
FlatMapGoMap Transform and flatten
amp-common Maps
Function Description Order Preserved
MapMap Transform amp-common Map
FlatMapMap Transform and flatten
MapOrderedMap Transform with order
FlatMapOrderedMap Transform and flatten with order
amp-common Sets
Function Description Order Preserved
MapSet Transform set elements
FlatMapSet Transform and flatten
MapOrderedSet Transform with order
FlatMapOrderedSet Transform and flatten with order

Note: All functions have:

  • Base version (uses context.Background())
  • Ctx version (accepts custom context)
  • WithExecutor version (uses custom executor)

Best Practices

1. Always Check Context Cancellation

Long-running functions should periodically check if the context is canceled:

err := simultaneously.DoCtx(ctx, 2,
    func(ctx context.Context) error {
        for i := 0; i < 1000; i++ {
            // Check cancellation periodically
            select {
            case <-ctx.Done():
                return ctx.Err()
            default:
            }

            // Do work
            processItem(i)
        }
        return nil
    },
)
2. Use Appropriate Concurrency Limits

Choose maxConcurrent based on your workload:

// CPU-bound work: limit to number of CPUs
cpuBound := runtime.NumCPU()
err := simultaneously.Do(cpuBound, tasks...)

// I/O-bound work: higher concurrency is ok
ioBound := 50
err := simultaneously.Do(ioBound, tasks...)

// Unlimited: use 0 or negative value (use with caution)
err := simultaneously.Do(0, tasks...)
3. Reuse Executors for Multiple Batches

When processing multiple batches, reuse the executor:

exec := simultaneously.NewDefaultExecutor(10)
defer exec.Close()

for _, batch := range batches {
    results, err := simultaneously.MapSliceWithExecutor(exec, batch, transform)
    if err != nil {
        return err
    }
    // Process results...
}
4. Set Timeouts for Operations

Use context timeouts to prevent hanging:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

err := simultaneously.DoCtx(ctx, 5, tasks...)
if errors.Is(err, context.DeadlineExceeded) {
    // Handle timeout
}
5. Handle Panics Gracefully

The package recovers panics automatically and converts them to errors with stack traces:

err := simultaneously.Do(2,
    func(ctx context.Context) error {
        // Risky operation - panics are recovered automatically
        // and converted to errors with stack traces
        return riskyOperation(ctx)
    },
)

if err != nil {
    // Panic has already been recovered and logged with stack trace
    // Just handle the error normally
    log.Printf("Operation failed: %v", err)
}

You don't need to recover panics yourself - the library handles this for you.

Error Handling

Error Propagation

The first error encountered stops all remaining work:

err := simultaneously.Do(3,
    func(ctx context.Context) error {
        time.Sleep(100 * time.Millisecond)
        return errors.New("task 1 failed") // This error is returned
    },
    func(ctx context.Context) error {
        time.Sleep(200 * time.Millisecond)
        return errors.New("task 2 failed") // May not execute
    },
)
// err = "task 1 failed" (first error wins)
Multiple Errors

When multiple errors occur simultaneously, they are combined:

err := simultaneously.Do(3,
    func(ctx context.Context) error {
        return errors.New("error 1")
    },
    func(ctx context.Context) error {
        return errors.New("error 2")
    },
)
// err contains both errors joined with errors.Join
Panic Recovery

Panics are converted to errors with stack traces:

err := simultaneously.Do(1,
    func(ctx context.Context) error {
        panic("unexpected panic")
    },
)
// err contains:
// - "recovered from panic: unexpected panic"
// - Full stack trace
// - File and line number where panic occurred
Context Errors

Context cancellation and deadline errors are propagated:

ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately

err := simultaneously.DoCtx(ctx, 2, tasks...)
// err = context.Canceled

ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel2()

err = simultaneously.DoCtx(ctx2, 2, longRunningTasks...)
// err = context.DeadlineExceeded

API Reference

Core Functions
Do / DoCtx

Run multiple functions in parallel with controlled concurrency:

func Do(maxConcurrent int, funcs ...func(ctx context.Context) error) error
func DoCtx(ctx context.Context, maxConcurrent int, funcs ...func(ctx context.Context) error) error
DoWithExecutor / DoCtxWithExecutor

Run functions using a custom executor:

func DoWithExecutor(exec Executor, funcs ...func(ctx context.Context) error) error
func DoCtxWithExecutor(ctx context.Context, exec Executor, funcs ...func(ctx context.Context) error) error
Executor
NewDefaultExecutor

Create a new executor with concurrency limit:

func NewDefaultExecutor(maxConcurrent int) Executor
Slice Transformations

Transform slices in parallel while preserving order:

// Map: one-to-one transformation
func MapSlice[In, Out any](maxConcurrent int, values []In,
    transform func(ctx context.Context, value In) (Out, error)) ([]Out, error)

func MapSliceCtx[In, Out any](ctx context.Context, maxConcurrent int, values []In,
    transform func(ctx context.Context, value In) (Out, error)) ([]Out, error)

func MapSliceWithExecutor[In, Out any](exec Executor, values []In,
    transform func(ctx context.Context, value In) (Out, error)) ([]Out, error)

// FlatMap: one-to-many transformation with flattening
func FlatMapSlice[In, Out any](maxConcurrent int, values []In,
    transform func(ctx context.Context, value In) ([]Out, error)) ([]Out, error)

func FlatMapSliceCtx[In, Out any](ctx context.Context, maxConcurrent int, values []In,
    transform func(ctx context.Context, value In) ([]Out, error)) ([]Out, error)

func FlatMapSliceWithExecutor[In, Out any](exec Executor, values []In,
    transform func(ctx context.Context, value In) ([]Out, error)) ([]Out, error)
Go Map Transformations

Transform standard Go maps in parallel:

// Map: transform key-value pairs
func MapGoMap[InK comparable, InV, OutK comparable, OutV any](
    maxConcurrent int, input map[InK]InV,
    transform func(ctx context.Context, key InK, val InV) (OutK, OutV, error),
) (map[OutK]OutV, error)

func MapGoMapCtx[InK comparable, InV, OutK comparable, OutV any](
    ctx context.Context, maxConcurrent int, input map[InK]InV,
    transform func(ctx context.Context, key InK, val InV) (OutK, OutV, error),
) (map[OutK]OutV, error)

func MapGoMapWithExecutor[InK comparable, InV, OutK comparable, OutV any](
    exec Executor, input map[InK]InV,
    transform func(ctx context.Context, key InK, val InV) (OutK, OutV, error),
) (map[OutK]OutV, error)

// FlatMap: expand entries into multiple entries
func FlatMapGoMap[InK comparable, InV, OutK comparable, OutV any](
    maxConcurrent int, input map[InK]InV,
    transform func(ctx context.Context, key InK, val InV) (map[OutK]OutV, error),
) (map[OutK]OutV, error)

func FlatMapGoMapCtx[InK comparable, InV, OutK comparable, OutV any](
    ctx context.Context, maxConcurrent int, input map[InK]InV,
    transform func(ctx context.Context, key InK, val InV) (map[OutK]OutV, error),
) (map[OutK]OutV, error)

func FlatMapGoMapWithExecutor[InK comparable, InV, OutK comparable, OutV any](
    exec Executor, input map[InK]InV,
    transform func(ctx context.Context, key InK, val InV) (map[OutK]OutV, error),
) (map[OutK]OutV, error)
amp-common Data Structures

The package also provides parallel transformations for amp-common Map, OrderedMap, Set, and OrderedSet types. Each follows the same pattern with base, Ctx, and WithExecutor variants.

See the full API documentation for details.

Examples

Example 1: Parallel HTTP Requests
urls := []string{
    "https://api.example.com/users/1",
    "https://api.example.com/users/2",
    "https://api.example.com/users/3",
}

responses, err := simultaneously.MapSlice(5, urls,
    func(ctx context.Context, url string) (*http.Response, error) {
        req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
        if err != nil {
            return nil, err
        }
        return http.DefaultClient.Do(req)
    },
)
Example 2: Parallel File Processing
files := []string{"file1.txt", "file2.txt", "file3.txt"}

err := simultaneously.Do(2,
    func(ctx context.Context) error {
        return processFile(ctx, "file1.txt")
    },
    func(ctx context.Context) error {
        return processFile(ctx, "file2.txt")
    },
    func(ctx context.Context) error {
        return processFile(ctx, "file3.txt")
    },
)
Example 3: Batch Processing with Retry
exec := simultaneously.NewDefaultExecutor(10)
defer exec.Close()

for attempt := 0; attempt < 3; attempt++ {
    err := simultaneously.DoCtxWithExecutor(ctx, exec, tasks...)
    if err == nil {
        break // Success
    }

    if attempt < 2 {
        time.Sleep(time.Second * time.Duration(attempt+1))
    }
}
Example 4: Processing with Progress Tracking
var processed atomic.Int32
total := len(items)

results, err := simultaneously.MapSlice(10, items,
    func(ctx context.Context, item Item) (Result, error) {
        result, err := process(item)
        if err != nil {
            return Result{}, err
        }

        count := processed.Add(1)
        if count%100 == 0 {
            log.Printf("Processed %d/%d items", count, total)
        }

        return result, nil
    },
)

Thread Safety

All functions in this package are thread-safe:

  • Output collections (maps, sets) use mutexes for concurrent writes
  • Slice outputs preserve order using indexed writes with mutex protection
  • Executors use atomic operations and channels for synchronization
  • Context cancellation is handled with sync.Once to ensure single execution

Performance Considerations

  1. Concurrency Overhead: Each concurrent operation has overhead. For very fast operations, sequential execution may be faster.

  2. Memory Usage: Higher concurrency means more goroutines and memory. Monitor memory usage and adjust maxConcurrent accordingly.

  3. CPU vs I/O Bound:

    • CPU-bound: Set maxConcurrent to runtime.NumCPU()
    • I/O-bound: Higher concurrency (10-100) is usually fine
  4. Executor Reuse: Creating executors has overhead. Reuse them when processing multiple batches.

License

This package is part of amp-common and follows the same license.

Documentation

Overview

Package simultaneously provides utilities for running functions concurrently with controlled parallelism. It handles context cancellation, panic recovery, and error aggregation automatically.

Index

Constants

This section is empty.

Variables

View Source
var ErrExecutorClosed = errors.New("executor is closed")

ErrExecutorClosed is returned when attempting to execute functions on a closed executor.

Functions

func Do

func Do(maxConcurrent int, f ...func(ctx context.Context) error) error

Do runs the given functions in parallel and returns the first error encountered. See SimultaneouslyCtx for more information.

func DoCtx

func DoCtx(ctx context.Context, maxConcurrent int, callback ...func(ctx context.Context) error) error

DoCtx runs the given functions in parallel and returns the first error encountered. If no error is encountered, it returns nil. In the event that an error happens, all other functions are canceled (via their context) to hopefully save on CPU cycles. It's up to the individual functions to check their context and return early if they are canceled.

The maxConcurrent parameter is used to limit the number of functions that run at the same time. If maxConcurrent is less than 1, all functions will run at the same time.

Panics that occur within the callback functions are automatically recovered and converted to errors. This prevents a single panicking function from crashing the entire process.

func DoCtxWithExecutor

func DoCtxWithExecutor(ctx context.Context, exec Executor, callback ...func(ctx context.Context) error) error

DoCtxWithExecutor runs the given functions in parallel using a custom executor. This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused. All other behavior matches DoCtx including context cancellation, panic recovery, and error handling.

func DoWithExecutor

func DoWithExecutor(exec Executor, callback ...func(ctx context.Context) error) error

DoWithExecutor runs the given functions in parallel using a custom executor. See DoCtxWithExecutor for more information.

func FlatMapGoMap

func FlatMapGoMap[InKey comparable, InVal any, OutKey comparable, OutVal any](
	maxConcurrent int,
	input map[InKey]InVal,
	transform func(ctx context.Context, key InKey, val InVal) (map[OutKey]OutVal, error),
) (map[OutKey]OutVal, error)

FlatMapGoMap transforms a standard Go map by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening).

Unlike MapGoMap which produces one output entry per input entry, FlatMapGoMap allows each transform to return an entire map of results, which are then merged into the final output map. This is useful when one input entry should expand into multiple output entries.

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each entry in the input map. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use FlatMapGoMapCtx.

Returns nil if the input map is nil. If multiple transforms produce the same output key, later entries overwrite earlier ones (the order is non-deterministic due to parallelism).

Example:

// Expand each entry into multiple entries
input := map[string]int{"a": 2, "b": 3}
output, err := FlatMapGoMap(2, input, func(ctx context.Context, k string, v int) (map[string]int, error) {
    // Create v entries for each input entry
    result := make(map[string]int)
    for i := 0; i < v; i++ {
        result[fmt.Sprintf("%s%d", k, i)] = i
    }
    return result, nil
})
// output: map[string]int{"a0": 0, "a1": 1, "b0": 0, "b1": 1, "b2": 2}

func FlatMapGoMapCtx

func FlatMapGoMapCtx[InKey comparable, InVal any, OutKey comparable, OutVal any](
	ctx context.Context,
	maxConcurrent int,
	input map[InKey]InVal,
	transform func(ctx context.Context, key InKey, val InVal) (map[OutKey]OutVal, error),
) (result map[OutKey]OutVal, err error)

FlatMapGoMapCtx transforms a standard Go map by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening).

This is the context-aware version of FlatMapGoMap. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.

Unlike MapGoMapCtx which produces one output entry per input entry, FlatMapGoMapCtx allows each transform to return an entire map of results, which are then merged into the final output map. This is useful when one input entry should expand into multiple output entries.

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

Returns nil if the input map is nil. If multiple transforms produce the same output key, later entries overwrite earlier ones (the order is non-deterministic due to parallelism).

Example:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

output, err := FlatMapGoMapCtx(ctx, 2, input, func(ctx context.Context, k string, v int) (map[string]int, error) {
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    default:
    }
    result := make(map[string]int)
    for i := 0; i < v; i++ {
        result[fmt.Sprintf("%s%d", k, i)] = i
    }
    return result, nil
})

func FlatMapGoMapCtxWithExecutor

func FlatMapGoMapCtxWithExecutor[InKey comparable, InVal any, OutKey comparable, OutVal any](
	ctx context.Context,
	exec Executor,
	input map[InKey]InVal,
	transform func(ctx context.Context, key InKey, val InVal) (map[OutKey]OutVal, error),
) (map[OutKey]OutVal, error)

FlatMapGoMapCtxWithExecutor transforms a standard Go map by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening), using a custom executor.

This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.

Unlike MapGoMapCtxWithExecutor which produces one output entry per input entry, FlatMapGoMapCtxWithExecutor allows each transform to return an entire map of results, which are then merged into the final output map. This is useful when one input entry should expand into multiple output entries.

The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

Returns nil if the input map is nil. If multiple transforms produce the same output key, later entries overwrite earlier ones (the order is non-deterministic due to parallelism).

Example:

exec := NewDefaultExecutor(2)
defer exec.Close()

output, err := FlatMapGoMapCtxWithExecutor(ctx, exec, input,
    func(ctx context.Context, k string, v int) (map[string]int, error) {
        result := make(map[string]int)
        for i := 0; i < v; i++ {
            result[fmt.Sprintf("%s%d", k, i)] = i
        }
        return result, nil
    })

func FlatMapGoMapWithExecutor

func FlatMapGoMapWithExecutor[InKey comparable, InVal any, OutKey comparable, OutVal any](
	exec Executor,
	input map[InKey]InVal,
	transform func(ctx context.Context, key InKey, val InVal) (map[OutKey]OutVal, error),
) (map[OutKey]OutVal, error)

FlatMapGoMapWithExecutor transforms a standard Go map by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening), using a custom executor. See FlatMapGoMapCtxWithExecutor for more information.

func FlatMapMap

func FlatMapMap[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](
	maxConcurrent int,
	input maps.Map[InKey, InVal],
	transform func(ctx context.Context, key InKey, val InVal) (maps.Map[OutKey, OutVal], error),
) (maps.Map[OutKey, OutVal], error)

FlatMapMap transforms an amp-common Map by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening).

Unlike MapMap which produces one output entry per input entry, FlatMapMap allows each transform to return an entire Map of results, which are then merged into the final output map. This is useful when one input entry should expand into multiple output entries.

This function is similar to FlatMapGoMap but works with amp-common Map types instead of standard Go maps. Keys must implement the Collectable interface (hashable and comparable).

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each entry in the input map. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use FlatMapMapCtx.

Returns nil if the input map is nil. The output map uses the same hash function as the input. If multiple transforms produce the same output key, later entries overwrite earlier ones (the order is non-deterministic due to parallelism).

Example:

// Expand each entry into multiple entries
input := maps.NewHashMap[MyKey, int](hashing.Sha256)
output, err := FlatMapMap(2, input, func(ctx context.Context, k MyKey, v int) (maps.Map[MyKey, int], error) {
    result := maps.NewHashMap[MyKey, int](hashing.Sha256)
    for i := 0; i < v; i++ {
        result.Add(MyKey{ID: fmt.Sprintf("%s-%d", k.ID, i)}, i)
    }
    return result, nil
})

func FlatMapMapCtx

func FlatMapMapCtx[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](
	ctx context.Context,
	maxConcurrent int,
	input maps.Map[InKey, InVal],
	transform func(ctx context.Context, key InKey, val InVal) (maps.Map[OutKey, OutVal], error),
) (result maps.Map[OutKey, OutVal], err error)

FlatMapMapCtx transforms an amp-common Map by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening).

This is the context-aware version of FlatMapMap. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.

Unlike MapMapCtx which produces one output entry per input entry, FlatMapMapCtx allows each transform to return an entire Map of results, which are then merged into the final output map. This is useful when one input entry should expand into multiple output entries.

This function is similar to FlatMapGoMapCtx but works with amp-common Map types instead of standard Go maps. Keys must implement the Collectable interface (hashable and comparable).

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

Returns nil if the input map is nil. The output map uses the same hash function as the input. If multiple transforms produce the same output key, later entries overwrite earlier ones (the order is non-deterministic due to parallelism).

Thread-safety: The output map is built with a mutex to handle concurrent additions from all the flattened results, ensuring thread-safe construction even when transforms execute in parallel.

Example:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

output, err := FlatMapMapCtx(ctx, 2, input, func(ctx context.Context, k MyKey, v int) (maps.Map[MyKey, int], error) {
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    default:
    }
    result := maps.NewHashMap[MyKey, int](hashing.Sha256)
    for i := 0; i < v; i++ {
        result.Add(MyKey{ID: fmt.Sprintf("%s-%d", k.ID, i)}, i)
    }
    return result, nil
})

func FlatMapMapCtxWithExecutor

func FlatMapMapCtxWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](
	ctx context.Context,
	exec Executor,
	input maps.Map[InKey, InVal],
	transform func(ctx context.Context, key InKey, val InVal) (maps.Map[OutKey, OutVal], error),
) (maps.Map[OutKey, OutVal], error)

FlatMapMapCtxWithExecutor transforms an amp-common Map by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening), using a custom executor.

This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.

Unlike MapMapCtxWithExecutor which produces one output entry per input entry, FlatMapMapCtxWithExecutor allows each transform to return an entire Map of results, which are then merged into the final output map. This is useful when one input entry should expand into multiple output entries.

This function is similar to FlatMapGoMapCtxWithExecutor but works with amp-common Map types instead of standard Go maps. Keys must implement the Collectable interface (hashable and comparable).

The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

Returns nil if the input map is nil. The output map uses the same hash function as the input. If multiple transforms produce the same output key, later entries overwrite earlier ones (the order is non-deterministic due to parallelism).

Thread-safety: The output map is built with a mutex to handle concurrent additions from all the flattened results, ensuring thread-safe construction even when transforms execute in parallel.

Example:

exec := NewDefaultExecutor(2)
defer exec.Close()

output, err := FlatMapMapCtxWithExecutor(ctx, exec, input,
    func(ctx context.Context, k MyKey, v int) (maps.Map[MyKey, int], error) {
        result := maps.NewHashMap[MyKey, int](hashing.Sha256)
        for i := 0; i < v; i++ {
            result.Add(MyKey{ID: fmt.Sprintf("%s-%d", k.ID, i)}, i)
        }
        return result, nil
    })

func FlatMapMapWithExecutor

func FlatMapMapWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](
	exec Executor,
	input maps.Map[InKey, InVal],
	transform func(ctx context.Context, key InKey, val InVal) (maps.Map[OutKey, OutVal], error),
) (maps.Map[OutKey, OutVal], error)

FlatMapMapWithExecutor transforms an amp-common Map by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening), using a custom executor. See FlatMapMapCtxWithExecutor for more information.

func FlatMapOrderedMap

func FlatMapOrderedMap[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](
	maxConcurrent int,
	input maps.OrderedMap[InKey, InVal],
	transform func(ctx context.Context, key InKey, val InVal) (maps.OrderedMap[OutKey, OutVal], error),
) (maps.OrderedMap[OutKey, OutVal], error)

FlatMapOrderedMap transforms an OrderedMap by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening).

Unlike FlatMapMap, this function preserves order. Results from inputs[i] appear before results from inputs[i+1] in the output map's insertion order, even though transforms execute in parallel.

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each entry in the input map. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use FlatMapOrderedMapCtx.

Returns nil if the input map is nil. The output map uses the same hash function as the input. Order preservation: all results from inputs[i] appear before all results from inputs[i+1].

Example:

input := maps.NewOrderedHashMap[maps.Key[string], int](hashing.Sha256)
input.Add(maps.Key[string]{Key: "a"}, 2)
input.Add(maps.Key[string]{Key: "b"}, 3)
output, err := FlatMapOrderedMap(2, input,
    func(ctx context.Context, k maps.Key[string], v int) (maps.OrderedMap[maps.Key[string], int], error) {
        result := maps.NewOrderedHashMap[maps.Key[string], int](hashing.Sha256)
        for i := 0; i < v; i++ {
            key := maps.Key[string]{Key: fmt.Sprintf("%s%d", k.Key, i)}
            result.Add(key, i)
        }
        return result, nil
    })
// output: a0->0, a1->1, b0->0, b1->1, b2->2 (in this order)

func FlatMapOrderedMapCtx

func FlatMapOrderedMapCtx[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](
	ctx context.Context,
	maxConcurrent int,
	input maps.OrderedMap[InKey, InVal],
	transform func(ctx context.Context, key InKey, val InVal) (maps.OrderedMap[OutKey, OutVal], error),
) (result maps.OrderedMap[OutKey, OutVal], err error)

FlatMapOrderedMapCtx transforms an OrderedMap by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening).

This is the context-aware version of FlatMapOrderedMap. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.

Unlike FlatMapMapCtx, this function preserves order. Results from inputs[i] appear before results from inputs[i+1] in the output map's insertion order, even though transforms execute in parallel.

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

Returns nil if the input map is nil. The output map uses the same hash function as the input. Order preservation: all results from inputs[i] appear before all results from inputs[i+1].

Thread-safety: Results are collected in parallel with a mutex, then flattened and added to the output map in the original insertion order to preserve ordering semantics.

Example:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

output, err := FlatMapOrderedMapCtx(ctx, 2, input,
    func(ctx context.Context, k maps.Key[string], v int) (maps.OrderedMap[maps.Key[string], int], error) {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
        }
        result := maps.NewOrderedHashMap[maps.Key[string], int](hashing.Sha256)
        for i := 0; i < v; i++ {
            result.Add(maps.Key[string]{Key: fmt.Sprintf("%s%d", k.Key, i)}, i)
        }
        return result, nil
    })

func FlatMapOrderedMapCtxWithExecutor

func FlatMapOrderedMapCtxWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](
	ctx context.Context,
	exec Executor,
	input maps.OrderedMap[InKey, InVal],
	transform func(ctx context.Context, key InKey, val InVal) (maps.OrderedMap[OutKey, OutVal], error),
) (maps.OrderedMap[OutKey, OutVal], error)

FlatMapOrderedMapCtxWithExecutor transforms an OrderedMap by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening), using a custom executor.

This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.

Unlike FlatMapMapCtxWithExecutor, this function preserves order. Results from inputs[i] appear before results from inputs[i+1] in the output map's insertion order, even though transforms execute in parallel.

Returns nil if the input map is nil. The output map uses the same hash function as the input. Order preservation: all results from inputs[i] appear before all results from inputs[i+1].

Thread-safety: Results are collected in parallel with a mutex, then flattened and added to the output map in the original insertion order to preserve ordering semantics.

Example:

exec := NewDefaultExecutor(2)
defer exec.Close()

output, err := FlatMapOrderedMapCtxWithExecutor(ctx, exec, input,
    func(ctx context.Context, k maps.Key[string], v int) (maps.OrderedMap[maps.Key[string], int], error) {
        result := maps.NewOrderedHashMap[maps.Key[string], int](hashing.Sha256)
        for i := 0; i < v; i++ {
            result.Add(maps.Key[string]{Key: fmt.Sprintf("%s%d", k.Key, i)}, i)
        }
        return result, nil
    })

func FlatMapOrderedMapWithExecutor

func FlatMapOrderedMapWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](
	exec Executor,
	input maps.OrderedMap[InKey, InVal],
	transform func(ctx context.Context, key InKey, val InVal) (maps.OrderedMap[OutKey, OutVal], error),
) (maps.OrderedMap[OutKey, OutVal], error)

FlatMapOrderedMapWithExecutor transforms an OrderedMap by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening), using a custom executor. See FlatMapOrderedMapCtxWithExecutor for more information.

func FlatMapOrderedSet

func FlatMapOrderedSet[InElem Collectable[InElem], OutElem Collectable[OutElem]](
	maxConcurrent int,
	input set.OrderedSet[InElem],
	transform func(ctx context.Context, elem InElem) (set.OrderedSet[OutElem], error),
) (set.OrderedSet[OutElem], error)

FlatMapOrderedSet transforms an OrderedSet by applying a transform function to each element in parallel, where each transform can produce multiple output elements (flattening).

Unlike FlatMapSet, this function preserves order. Results from inputs[i] appear before results from inputs[i+1] in the output set's insertion order, even though transforms execute in parallel.

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each element in the input set. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use FlatMapOrderedSetCtx.

Returns nil if the input set is nil. The output set uses the same hash function as the input. Order preservation: all results from inputs[i] appear before all results from inputs[i+1].

Example:

input := set.NewOrderedSet[hashing.HashableString](hashing.Sha256)
input.Add(hashing.HashableString("ab"))
input.Add(hashing.HashableString("cd"))
output, err := FlatMapOrderedSet(2, input,
    func(ctx context.Context, s hashing.HashableString) (set.OrderedSet[hashing.HashableString], error) {
        result := set.NewOrderedSet[hashing.HashableString](hashing.Sha256)
        for _, ch := range string(s) {
            result.Add(hashing.HashableString(string(ch)))
        }
        return result, nil
    })
// output: "a", "b", "c", "d" (in this order)

func FlatMapOrderedSetCtx

func FlatMapOrderedSetCtx[InElem Collectable[InElem], OutElem Collectable[OutElem]](
	ctx context.Context,
	maxConcurrent int,
	input set.OrderedSet[InElem],
	transform func(ctx context.Context, elem InElem) (set.OrderedSet[OutElem], error),
) (result set.OrderedSet[OutElem], err error)

FlatMapOrderedSetCtx transforms an OrderedSet by applying a transform function to each element in parallel, where each transform can produce multiple output elements (flattening).

This is the context-aware version of FlatMapOrderedSet. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.

Unlike FlatMapSetCtx, this function preserves order. Results from inputs[i] appear before results from inputs[i+1] in the output set's insertion order, even though transforms execute in parallel.

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each element in the input set with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

Returns nil if the input set is nil. The output set uses the same hash function as the input. Order preservation: all results from inputs[i] appear before all results from inputs[i+1].

Thread-safety: Results are collected in parallel using FlatMapSlice, then flattened and added to the output set in the original insertion order to preserve ordering semantics.

Example:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

output, err := FlatMapOrderedSetCtx(ctx, 2, input,
    func(ctx context.Context, s hashing.HashableString) (set.OrderedSet[hashing.HashableString], error) {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
        }
        result := set.NewOrderedSet[hashing.HashableString](hashing.Sha256)
        for _, ch := range string(s) {
            result.Add(hashing.HashableString(string(ch)))
        }
        return result, nil
    })

func FlatMapOrderedSetCtxWithExecutor

func FlatMapOrderedSetCtxWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]](
	ctx context.Context,
	exec Executor,
	input set.OrderedSet[InElem],
	transform func(ctx context.Context, elem InElem) (set.OrderedSet[OutElem], error),
) (set.OrderedSet[OutElem], error)

FlatMapOrderedSetCtxWithExecutor transforms an OrderedSet by applying a transform function to each element in parallel, where each transform can produce multiple output elements (flattening), using a custom executor.

This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.

Unlike FlatMapSetCtxWithExecutor, this function preserves order. Results from inputs[i] appear before results from inputs[i+1] in the output set's insertion order, even though transforms execute in parallel.

Returns nil if the input set is nil. The output set uses the same hash function as the input. Order preservation: all results from inputs[i] appear before all results from inputs[i+1].

Thread-safety: Results are collected in parallel using FlatMapSliceCtxWithExecutor, then flattened and added to the output set in the original insertion order to preserve ordering semantics.

Example:

exec := NewDefaultExecutor(2)
defer exec.Close()

output, err := FlatMapOrderedSetCtxWithExecutor(ctx, exec, input,
    func(ctx context.Context, s hashing.HashableString) (set.OrderedSet[hashing.HashableString], error) {
        result := set.NewOrderedSet[hashing.HashableString](hashing.Sha256)
        for _, ch := range string(s) {
            result.Add(hashing.HashableString(string(ch)))
        }
        return result, nil
    })

func FlatMapOrderedSetWithExecutor

func FlatMapOrderedSetWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]](
	exec Executor,
	input set.OrderedSet[InElem],
	transform func(ctx context.Context, elem InElem) (set.OrderedSet[OutElem], error),
) (set.OrderedSet[OutElem], error)

FlatMapOrderedSetWithExecutor transforms an OrderedSet by applying a transform function to each element in parallel, where each transform can produce multiple output elements (flattening), using a custom executor. See FlatMapOrderedSetCtxWithExecutor for more information.

func FlatMapSet

func FlatMapSet[InElem Collectable[InElem], OutElem Collectable[OutElem]](
	maxConcurrent int,
	input set.Set[InElem],
	transform func(ctx context.Context, elem InElem) (set.Set[OutElem], error),
) (set.Set[OutElem], error)

FlatMapSet transforms a Set by applying a transform function to each element in parallel, where each transform can produce multiple output elements (flattening).

Unlike MapSet which produces one output element per input element, FlatMapSet allows each transform to return an entire Set of results, which are then merged into the final output set. This is useful when one input element should expand into multiple output elements.

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each element in the input set. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use FlatMapSetCtx.

Returns nil if the input set is nil. The output set uses the same hash function as the input. If multiple transforms produce the same output element, duplicates are automatically handled by the set semantics.

Example:

// Expand each string into its individual characters
input := set.NewSet[hashing.HashableString](hashing.Sha256)
input.Add(hashing.HashableString("hi"))
output, err := FlatMapSet(2, input,
    func(ctx context.Context, s hashing.HashableString) (set.Set[hashing.HashableString], error) {
        result := set.NewSet[hashing.HashableString](hashing.Sha256)
        for _, ch := range string(s) {
            result.Add(hashing.HashableString(string(ch)))
        }
        return result, nil
    })
// output contains: "h", "i"

func FlatMapSetCtx

func FlatMapSetCtx[InElem Collectable[InElem], OutElem Collectable[OutElem]](
	ctx context.Context,
	maxConcurrent int,
	input set.Set[InElem],
	transform func(ctx context.Context, elem InElem) (set.Set[OutElem], error),
) (result set.Set[OutElem], err error)

FlatMapSetCtx transforms a Set by applying a transform function to each element in parallel, where each transform can produce multiple output elements (flattening).

This is the context-aware version of FlatMapSet. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.

Unlike MapSetCtx which produces one output element per input element, FlatMapSetCtx allows each transform to return an entire Set of results, which are then merged into the final output set. This is useful when one input element should expand into multiple output elements.

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each element in the input set with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

Returns nil if the input set is nil. The output set uses the same hash function as the input. If multiple transforms produce the same output element, duplicates are automatically handled by the set semantics.

Thread-safety: The output set is built with a mutex to handle concurrent additions from all the flattened results, ensuring thread-safe construction even when transforms execute in parallel.

Example:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

output, err := FlatMapSetCtx(ctx, 2, input,
    func(ctx context.Context, s hashing.HashableString) (set.Set[hashing.HashableString], error) {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
        }
        result := set.NewSet[hashing.HashableString](hashing.Sha256)
        for _, ch := range string(s) {
            result.Add(hashing.HashableString(string(ch)))
        }
        return result, nil
    })

func FlatMapSetCtxWithExecutor

func FlatMapSetCtxWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]](
	ctx context.Context,
	exec Executor,
	input set.Set[InElem],
	transform func(ctx context.Context, elem InElem) (set.Set[OutElem], error),
) (set.Set[OutElem], error)

FlatMapSetCtxWithExecutor transforms a Set by applying a transform function to each element in parallel, where each transform can produce multiple output elements (flattening), using a custom executor.

This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.

Unlike MapSetCtxWithExecutor which produces one output element per input element, FlatMapSetCtxWithExecutor allows each transform to return an entire Set of results, which are then merged into the final output set.

The transform function is called for each element in the input set with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

Returns nil if the input set is nil. The output set uses the same hash function as the input. If multiple transforms produce the same output element, duplicates are automatically handled by the set semantics.

Thread-safety: The output set is built with a mutex to handle concurrent additions from all the flattened results, ensuring thread-safe construction even when transforms execute in parallel.

Example:

exec := NewDefaultExecutor(2)
defer exec.Close()

output, err := FlatMapSetCtxWithExecutor(ctx, exec, input,
    func(ctx context.Context, s hashing.HashableString) (set.Set[hashing.HashableString], error) {
        result := set.NewSet[hashing.HashableString](hashing.Sha256)
        for _, ch := range string(s) {
            result.Add(hashing.HashableString(string(ch)))
        }
        return result, nil
    })

func FlatMapSetWithExecutor

func FlatMapSetWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]](
	exec Executor,
	input set.Set[InElem],
	transform func(ctx context.Context, elem InElem) (set.Set[OutElem], error),
) (set.Set[OutElem], error)

FlatMapSetWithExecutor transforms a Set by applying a transform function to each element in parallel, where each transform can produce multiple output elements (flattening), using a custom executor. See FlatMapSetCtxWithExecutor for more information.

func FlatMapSlice

func FlatMapSlice[Input, Output any](
	maxConcurrent int,
	values []Input,
	transform func(ctx context.Context, value Input) ([]Output, error),
) ([]Output, error)

FlatMapSlice transforms a slice of values in parallel where each input produces zero or more outputs, then flattens the results into a single slice. See FlatMapSliceCtx for more information.

func FlatMapSliceCtx

func FlatMapSliceCtx[Input, Output any](
	ctx context.Context,
	maxConcurrent int,
	values []Input,
	transform func(ctx context.Context, value Input) ([]Output, error),
) (result []Output, err error)

FlatMapSliceCtx transforms a slice of values in parallel where each input produces zero or more outputs, then flattens the results into a single slice. This is useful when each input element needs to be expanded into multiple output elements.

The maxConcurrent parameter limits the number of concurrent transformations. If maxConcurrent is less than 1, all transformations will run at the same time.

If any transformation returns an error, all remaining transformations are canceled (via their context) and the first error is returned. The output slice will be nil.

Panics that occur within the transformation function are automatically recovered and converted to errors. Order is preserved: results from values[i] appear before results from values[i+1] in the flattened output.

Example:

words := []string{"hello", "world"}
chars, err := FlatMapSliceCtx(ctx, 2, words, func(ctx context.Context, word string) ([]rune, error) {
    return []rune(word), nil
})
// chars = ['h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd']

func FlatMapSliceCtxWithExecutor

func FlatMapSliceCtxWithExecutor[Input, Output any](
	ctx context.Context,
	exec Executor,
	values []Input,
	transform func(ctx context.Context, value Input) ([]Output, error),
) ([]Output, error)

FlatMapSliceCtxWithExecutor transforms a slice of values in parallel where each input produces zero or more outputs, then flattens the results into a single slice, using a custom executor for concurrency control.

This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.

If any transformation returns an error, all remaining transformations are canceled (via their context) and the first error is returned. The output slice will be nil.

Panics that occur within the transformation function are automatically recovered and converted to errors. Order is preserved: results from values[i] appear before results from values[i+1] in the flattened output.

Example:

exec := NewDefaultExecutor(2)
defer exec.Close()

words := []string{"hello", "world"}
chars, err := FlatMapSliceCtxWithExecutor(ctx, exec, words, func(ctx context.Context, word string) ([]rune, error) {
    return []rune(word), nil
})
// chars = ['h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd']

func FlatMapSliceWithExecutor

func FlatMapSliceWithExecutor[Input, Output any](
	exec Executor,
	values []Input,
	transform func(ctx context.Context, value Input) ([]Output, error),
) ([]Output, error)

FlatMapSliceWithExecutor transforms a slice of values in parallel where each input produces zero or more outputs, then flattens the results into a single slice, using a custom executor. See FlatMapSliceCtxWithExecutor for more information.

func MapGoMap

func MapGoMap[InKey comparable, InVal any, OutKey comparable, OutVal any](
	maxConcurrent int,
	input map[InKey]InVal,
	transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error),
) (map[OutKey]OutVal, error)

MapGoMap transforms a standard Go map by applying a transform function to each key-value pair in parallel, producing a new map with potentially different key and value types.

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each entry in the input map. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use MapGoMapCtx.

Returns nil if the input map is nil. The output map may have fewer entries if the transform produces duplicate keys (later entries overwrite earlier ones).

Example:

// Convert map[string]int to map[int]string in parallel
input := map[string]int{"a": 1, "b": 2, "c": 3}
output, err := MapGoMap(2, input, func(ctx context.Context, k string, v int) (int, string, error) {
    return v, strings.ToUpper(k), nil
})
// output: map[int]string{1: "A", 2: "B", 3: "C"}

func MapGoMapCtx

func MapGoMapCtx[InKey comparable, InVal any, OutKey comparable, OutVal any](
	ctx context.Context,
	maxConcurrent int,
	input map[InKey]InVal,
	transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error),
) (result map[OutKey]OutVal, err error)

MapGoMapCtx transforms a standard Go map by applying a transform function to each key-value pair in parallel, producing a new map with potentially different key and value types.

This is the context-aware version of MapGoMap. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

Returns nil if the input map is nil. The output map may have fewer entries if the transform produces duplicate keys (later entries overwrite earlier ones).

Example:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

output, err := MapGoMapCtx(ctx, 2, input, func(ctx context.Context, k string, v int) (int, string, error) {
    // Check context cancellation
    select {
    case <-ctx.Done():
        return 0, "", ctx.Err()
    default:
    }
    return v, strings.ToUpper(k), nil
})

func MapGoMapCtxWithExecutor

func MapGoMapCtxWithExecutor[InKey comparable, InVal any, OutKey comparable, OutVal any](
	ctx context.Context,
	exec Executor,
	input map[InKey]InVal,
	transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error),
) (map[OutKey]OutVal, error)

MapGoMapCtxWithExecutor transforms a standard Go map by applying a transform function to each key-value pair in parallel, producing a new map with potentially different key and value types, using a custom executor.

This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.

The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

Returns nil if the input map is nil. The output map may have fewer entries if the transform produces duplicate keys (later entries overwrite earlier ones).

Example:

exec := NewDefaultExecutor(2)
defer exec.Close()

output, err := MapGoMapCtxWithExecutor(ctx, exec, input,
    func(ctx context.Context, k string, v int) (int, string, error) {
        return v, strings.ToUpper(k), nil
    })

func MapGoMapWithExecutor

func MapGoMapWithExecutor[InKey comparable, InVal any, OutKey comparable, OutVal any](
	exec Executor,
	input map[InKey]InVal,
	transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error),
) (map[OutKey]OutVal, error)

MapGoMapWithExecutor transforms a standard Go map by applying a transform function to each key-value pair in parallel, producing a new map with potentially different key and value types, using a custom executor. See MapGoMapCtxWithExecutor for more information.

func MapMap

func MapMap[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](
	maxConcurrent int,
	input maps.Map[InKey, InVal],
	transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error),
) (maps.Map[OutKey, OutVal], error)

MapMap transforms an amp-common Map by applying a transform function to each key-value pair in parallel, producing a new Map with potentially different key and value types.

This function is similar to MapGoMap but works with amp-common Map types instead of standard Go maps. Keys must implement the Collectable interface (hashable and comparable).

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each entry in the input map. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use MapMapCtx.

Returns nil if the input map is nil. The output map uses the same hash function as the input. The output map may have fewer entries if the transform produces duplicate keys.

Example:

// Transform map entries while preserving map type
input := maps.NewHashMap[MyKey, int](hashing.Sha256)
output, err := MapMap(2, input, func(ctx context.Context, k MyKey, v int) (MyKey, string, error) {
    return k, strconv.Itoa(v), nil
})

func MapMapCtx

func MapMapCtx[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](
	ctx context.Context,
	maxConcurrent int,
	input maps.Map[InKey, InVal],
	transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error),
) (result maps.Map[OutKey, OutVal], err error)

MapMapCtx transforms an amp-common Map by applying a transform function to each key-value pair in parallel, producing a new Map with potentially different key and value types.

This is the context-aware version of MapMap. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.

This function is similar to MapGoMapCtx but works with amp-common Map types instead of standard Go maps. Keys must implement the Collectable interface (hashable and comparable).

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

Returns nil if the input map is nil. The output map uses the same hash function as the input. The output map may have fewer entries if the transform produces duplicate keys.

Thread-safety: The output map is built with a mutex to handle concurrent additions, ensuring thread-safe construction even when transforms execute in parallel.

Example:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

output, err := MapMapCtx(ctx, 2, input, func(ctx context.Context, k MyKey, v int) (MyKey, string, error) {
    select {
    case <-ctx.Done():
        return MyKey{}, "", ctx.Err()
    default:
    }
    return k, strconv.Itoa(v), nil
})

func MapMapCtxWithExecutor

func MapMapCtxWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](
	ctx context.Context,
	exec Executor,
	input maps.Map[InKey, InVal],
	transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error),
) (maps.Map[OutKey, OutVal], error)

MapMapCtxWithExecutor transforms an amp-common Map by applying a transform function to each key-value pair in parallel, producing a new Map with potentially different key and value types, using a custom executor.

This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.

This function is similar to MapGoMapCtxWithExecutor but works with amp-common Map types instead of standard Go maps. Keys must implement the Collectable interface (hashable and comparable).

The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

Returns nil if the input map is nil. The output map uses the same hash function as the input. The output map may have fewer entries if the transform produces duplicate keys.

Thread-safety: The output map is built with a mutex to handle concurrent additions, ensuring thread-safe construction even when transforms execute in parallel.

Example:

exec := NewDefaultExecutor(2)
defer exec.Close()

output, err := MapMapCtxWithExecutor(ctx, exec, input,
    func(ctx context.Context, k MyKey, v int) (MyKey, string, error) {
        return k, strconv.Itoa(v), nil
    })

func MapMapWithExecutor

func MapMapWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](
	exec Executor,
	input maps.Map[InKey, InVal],
	transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error),
) (maps.Map[OutKey, OutVal], error)

MapMapWithExecutor transforms an amp-common Map by applying a transform function to each key-value pair in parallel, producing a new Map with potentially different key and value types, using a custom executor. See MapMapCtxWithExecutor for more information.

func MapOrderedMap

func MapOrderedMap[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](
	maxConcurrent int,
	input maps.OrderedMap[InKey, InVal],
	transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error),
) (maps.OrderedMap[OutKey, OutVal], error)

MapOrderedMap transforms an OrderedMap by applying a transform function to each key-value pair in parallel, producing a new OrderedMap with potentially different key and value types.

Unlike MapMap, this function preserves the insertion order of entries. The output map will have entries in the same order as the input map, even though transforms execute in parallel.

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each entry in the input map. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use MapOrderedMapCtx.

Returns nil if the input map is nil. The output map uses the same hash function as the input. Order preservation: outputs[i] corresponds to inputs[i] in insertion order.

Example:

input := maps.NewOrderedHashMap[maps.Key[string], int](hashing.Sha256)
input.Add(maps.Key[string]{Key: "first"}, 1)
input.Add(maps.Key[string]{Key: "second"}, 2)
output, err := MapOrderedMap(2, input,
    func(ctx context.Context, k maps.Key[string], v int) (maps.Key[string], string, error) {
        return k, strconv.Itoa(v), nil
    })
// output has entries in order: "first" -> "1", "second" -> "2"

func MapOrderedMapCtx

func MapOrderedMapCtx[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](
	ctx context.Context,
	maxConcurrent int,
	input maps.OrderedMap[InKey, InVal],
	transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error),
) (result maps.OrderedMap[OutKey, OutVal], err error)

MapOrderedMapCtx transforms an OrderedMap by applying a transform function to each key-value pair in parallel, producing a new OrderedMap with potentially different key and value types.

This is the context-aware version of MapOrderedMap. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.

Unlike MapMapCtx, this function preserves the insertion order of entries. The output map will have entries in the same order as the input map, even though transforms execute in parallel.

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

Returns nil if the input map is nil. The output map uses the same hash function as the input. Order preservation: outputs[i] corresponds to inputs[i] in insertion order.

Thread-safety: Results are collected in parallel with a mutex, then added to the output map in the original insertion order to preserve ordering semantics.

Example:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

output, err := MapOrderedMapCtx(ctx, 2, input,
    func(ctx context.Context, k maps.Key[string], v int) (maps.Key[string], string, error) {
        select {
        case <-ctx.Done():
            return maps.Key[string]{}, "", ctx.Err()
        default:
        }
        return k, strconv.Itoa(v), nil
    })

func MapOrderedMapCtxWithExecutor

func MapOrderedMapCtxWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](
	ctx context.Context,
	exec Executor,
	input maps.OrderedMap[InKey, InVal],
	transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error),
) (maps.OrderedMap[OutKey, OutVal], error)

MapOrderedMapCtxWithExecutor transforms an OrderedMap by applying a transform function to each key-value pair in parallel, producing a new OrderedMap with potentially different key and value types, using a custom executor.

This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.

Unlike MapMapCtxWithExecutor, this function preserves the insertion order of entries. The output map will have entries in the same order as the input map, even though transforms execute in parallel.

Returns nil if the input map is nil. The output map uses the same hash function as the input. Order preservation: outputs[i] corresponds to inputs[i] in insertion order.

Thread-safety: Results are collected in parallel with a mutex, then added to the output map in the original insertion order to preserve ordering semantics.

Example:

exec := NewDefaultExecutor(2)
defer exec.Close()

output, err := MapOrderedMapCtxWithExecutor(ctx, exec, input,
    func(ctx context.Context, k maps.Key[string], v int) (maps.Key[string], string, error) {
        return k, strconv.Itoa(v), nil
    })

func MapOrderedMapWithExecutor

func MapOrderedMapWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](
	exec Executor,
	input maps.OrderedMap[InKey, InVal],
	transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error),
) (maps.OrderedMap[OutKey, OutVal], error)

MapOrderedMapWithExecutor transforms an OrderedMap by applying a transform function to each key-value pair in parallel, producing a new OrderedMap with potentially different key and value types, using a custom executor. See MapOrderedMapCtxWithExecutor for more information.

func MapOrderedSet

func MapOrderedSet[InElem Collectable[InElem], OutElem Collectable[OutElem]](
	maxConcurrent int,
	input set.OrderedSet[InElem],
	transform func(ctx context.Context, elem InElem) (OutElem, error),
) (set.OrderedSet[OutElem], error)

MapOrderedSet transforms an OrderedSet by applying a transform function to each element in parallel, producing a new OrderedSet with potentially different element types.

Unlike MapSet, this function preserves the insertion order of elements. The output set will have elements in the same order as the input set, even though transforms execute in parallel.

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each element in the input set. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use MapOrderedSetCtx.

Returns nil if the input set is nil. The output set uses the same hash function as the input. Order preservation: outputs[i] corresponds to inputs[i] in insertion order.

Example:

input := set.NewOrderedSet[hashing.HashableInt](hashing.Sha256)
input.Add(hashing.HashableInt(1))
input.Add(hashing.HashableInt(2))
input.Add(hashing.HashableInt(3))
output, err := MapOrderedSet(2, input,
    func(ctx context.Context, v hashing.HashableInt) (hashing.HashableString, error) {
        return hashing.HashableString(strconv.Itoa(int(v))), nil
    })
// output has elements in order: "1", "2", "3"

func MapOrderedSetCtx

func MapOrderedSetCtx[InElem Collectable[InElem], OutElem Collectable[OutElem]](
	ctx context.Context,
	maxConcurrent int,
	input set.OrderedSet[InElem],
	transform func(ctx context.Context, elem InElem) (OutElem, error),
) (set.OrderedSet[OutElem], error)

MapOrderedSetCtx transforms an OrderedSet by applying a transform function to each element in parallel, producing a new OrderedSet with potentially different element types.

This is the context-aware version of MapOrderedSet. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.

Unlike MapSetCtx, this function preserves the insertion order of elements. The output set will have elements in the same order as the input set, even though transforms execute in parallel.

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each element in the input set with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

Returns nil if the input set is nil. The output set uses the same hash function as the input. Order preservation: outputs[i] corresponds to inputs[i] in insertion order.

Thread-safety: Results are collected in parallel using MapSlice, then added to the output set in the original insertion order to preserve ordering semantics.

Example:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

output, err := MapOrderedSetCtx(ctx, 2, input,
    func(ctx context.Context, v hashing.HashableInt) (hashing.HashableString, error) {
        select {
        case <-ctx.Done():
            return "", ctx.Err()
        default:
        }
        return hashing.HashableString(strconv.Itoa(int(v))), nil
    })

func MapOrderedSetCtxWithExecutor

func MapOrderedSetCtxWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]](
	ctx context.Context,
	exec Executor,
	input set.OrderedSet[InElem],
	transform func(ctx context.Context, elem InElem) (OutElem, error),
) (set.OrderedSet[OutElem], error)

MapOrderedSetCtxWithExecutor transforms an OrderedSet by applying a transform function to each element in parallel, producing a new OrderedSet with potentially different element types, using a custom executor.

This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.

Unlike MapSetCtxWithExecutor, this function preserves the insertion order of elements. The output set will have elements in the same order as the input set, even though transforms execute in parallel.

Returns nil if the input set is nil. The output set uses the same hash function as the input. Order preservation: outputs[i] corresponds to inputs[i] in insertion order.

Thread-safety: Results are collected in parallel using MapSliceCtxWithExecutor, then added to the output set in the original insertion order to preserve ordering semantics.

Example:

exec := NewDefaultExecutor(2)
defer exec.Close()

output, err := MapOrderedSetCtxWithExecutor(ctx, exec, input,
    func(ctx context.Context, v hashing.HashableInt) (hashing.HashableString, error) {
        return hashing.HashableString(strconv.Itoa(int(v))), nil
    })

func MapOrderedSetWithExecutor

func MapOrderedSetWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]](
	exec Executor,
	input set.OrderedSet[InElem],
	transform func(ctx context.Context, elem InElem) (OutElem, error),
) (set.OrderedSet[OutElem], error)

MapOrderedSetWithExecutor transforms an OrderedSet by applying a transform function to each element in parallel, producing a new OrderedSet with potentially different element types, using a custom executor. See MapOrderedSetCtxWithExecutor for more information.

func MapSet

func MapSet[InElem Collectable[InElem], OutElem Collectable[OutElem]](
	maxConcurrent int,
	input set.Set[InElem],
	transform func(ctx context.Context, elem InElem) (OutElem, error),
) (set.Set[OutElem], error)

MapSet transforms a Set by applying a transform function to each element in parallel, producing a new Set with potentially different element types.

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each element in the input set. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use MapSetCtx.

Returns nil if the input set is nil. The output set uses the same hash function as the input. The output set may have fewer elements if the transform produces duplicate elements.

Example:

// Transform string set to int set by converting to lengths in parallel
input := set.NewSet[hashing.HashableString](hashing.Sha256)
input.Add(hashing.HashableString("hello"))
input.Add(hashing.HashableString("world"))
output, err := MapSet(2, input, func(ctx context.Context, s hashing.HashableString) (hashing.HashableInt, error) {
    return hashing.HashableInt(len(s)), nil
})
// output contains: HashableInt(5) for both "hello" and "world"

func MapSetCtx

func MapSetCtx[InElem Collectable[InElem], OutElem Collectable[OutElem]](
	ctx context.Context,
	maxConcurrent int,
	input set.Set[InElem],
	transform func(ctx context.Context, elem InElem) (OutElem, error),
) (result set.Set[OutElem], err error)

MapSetCtx transforms a Set by applying a transform function to each element in parallel, producing a new Set with potentially different element types.

This is the context-aware version of MapSet. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.

The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).

The transform function is called for each element in the input set with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

Returns nil if the input set is nil. The output set uses the same hash function as the input. The output set may have fewer elements if the transform produces duplicate elements.

Thread-safety: The output set is built with a mutex to handle concurrent additions, ensuring thread-safe construction even when transforms execute in parallel.

Example:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

output, err := MapSetCtx(ctx, 2, input,
    func(ctx context.Context, s hashing.HashableString) (hashing.HashableInt, error) {
        select {
        case <-ctx.Done():
            return 0, ctx.Err()
        default:
        }
        return hashing.HashableInt(len(s)), nil
    })

func MapSetCtxWithExecutor

func MapSetCtxWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]](
	ctx context.Context,
	exec Executor,
	input set.Set[InElem],
	transform func(ctx context.Context, elem InElem) (OutElem, error),
) (set.Set[OutElem], error)

MapSetCtxWithExecutor transforms a Set by applying a transform function to each element in parallel, producing a new Set with potentially different element types, using a custom executor.

This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.

The transform function is called for each element in the input set with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.

Returns nil if the input set is nil. The output set uses the same hash function as the input. The output set may have fewer elements if the transform produces duplicate elements.

Thread-safety: The output set is built with a mutex to handle concurrent additions, ensuring thread-safe construction even when transforms execute in parallel.

Example:

exec := NewDefaultExecutor(2)
defer exec.Close()

output, err := MapSetCtxWithExecutor(ctx, exec, input,
    func(ctx context.Context, s hashing.HashableString) (hashing.HashableInt, error) {
        return hashing.HashableInt(len(s)), nil
    })

func MapSetWithExecutor

func MapSetWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]](
	exec Executor,
	input set.Set[InElem],
	transform func(ctx context.Context, elem InElem) (OutElem, error),
) (set.Set[OutElem], error)

MapSetWithExecutor transforms a Set by applying a transform function to each element in parallel, producing a new Set with potentially different element types, using a custom executor. See MapSetCtxWithExecutor for more information.

func MapSlice

func MapSlice[Input, Output any](
	maxConcurrent int,
	values []Input,
	transform func(ctx context.Context, value Input) (Output, error),
) ([]Output, error)

MapSlice transforms a slice of values in parallel by applying a function to each element. See MapSliceCtx for more information.

func MapSliceCtx

func MapSliceCtx[Input, Output any](
	ctx context.Context,
	maxConcurrent int,
	values []Input,
	transform func(ctx context.Context, value Input) (Output, error),
) (result []Output, err error)

MapSliceCtx transforms a slice of values in parallel by applying a function to each element. It returns a new slice containing the transformed values in the same order as the input.

The maxConcurrent parameter limits the number of concurrent transformations. If maxConcurrent is less than 1, all transformations will run at the same time.

If any transformation returns an error, all remaining transformations are canceled (via their context) and the first error is returned. The output slice will be nil.

Panics that occur within the transformation function are automatically recovered and converted to errors. Order is preserved: outputs[i] corresponds to values[i].

Example:

numbers := []int{1, 2, 3, 4, 5}
doubled, err := MapSliceCtx(ctx, 2, numbers, func(ctx context.Context, n int) (int, error) {
    return n * 2, nil
})
// doubled = [2, 4, 6, 8, 10]

func MapSliceCtxWithExecutor

func MapSliceCtxWithExecutor[Input, Output any](
	ctx context.Context,
	exec Executor,
	values []Input,
	transform func(ctx context.Context, value Input) (Output, error),
) ([]Output, error)

MapSliceCtxWithExecutor transforms a slice of values in parallel by applying a function to each element, using a custom executor for concurrency control instead of creating a new one.

This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.

If any transformation returns an error, all remaining transformations are canceled (via their context) and the first error is returned. The output slice will be nil.

Panics that occur within the transformation function are automatically recovered and converted to errors. Order is preserved: outputs[i] corresponds to values[i].

Example:

exec := NewDefaultExecutor(2) // Create reusable executor with max concurrency of 2
defer exec.Close()

numbers := []int{1, 2, 3, 4, 5}
doubled, err := MapSliceCtxWithExecutor(ctx, exec, numbers, func(ctx context.Context, n int) (int, error) {
    return n * 2, nil
})
// doubled = [2, 4, 6, 8, 10]

func MapSliceWithExecutor

func MapSliceWithExecutor[Input, Output any](
	exec Executor,
	values []Input,
	transform func(ctx context.Context, value Input) (Output, error),
) ([]Output, error)

MapSliceWithExecutor transforms a slice of values in parallel by applying a function to each element, using a custom executor for concurrency control. See MapSliceCtxWithExecutor for more information.

Types

type Collectable

type Collectable[T any] = collectable.Collectable[T]

Collectable is a type alias for collectable.Collectable, providing a shorter name for use in this package. It represents types that can be both hashed and compared for equality, which is required for use as map keys in amp-common maps.

type Executor

type Executor interface {
	// GoContext executes fn asynchronously using the provided context, calling done with the result.
	// If the executor is closed or the context is canceled, done is called with the appropriate error.
	GoContext(ctx context.Context, fn func(context.Context) error, done func(error))

	// Go executes fn asynchronously using a background context, calling done with the result.
	// This is a convenience wrapper around GoContext that uses context.Background().
	Go(fn func(context.Context) error, done func(error))

	// Close shuts down the executor, preventing new executions and waiting for in-flight operations.
	// Returns ErrExecutorClosed if the executor is already closed.
	Close() error
}

Executor manages concurrent execution of functions with a configurable concurrency limit. It provides methods to execute functions asynchronously while respecting resource constraints.

func NewDefaultExecutor

func NewDefaultExecutor(maxConcurrent int) Executor

NewDefaultExecutor creates a new executor with the specified concurrency limit.

The executor manages parallel execution of functions while respecting the maxConcurrent limit. It uses a semaphore-based approach to control how many functions can run simultaneously.

This creates an executor with a fixed concurrency limit that does not adapt to the size of input data. Use this when you want consistent concurrency across multiple batches with varying sizes.

For single-use transformations, prefer the base functions (MapSlice, Do, etc.) which create optimally-sized internal executors automatically that adapt to your data size.

Parameters:

  • maxConcurrent: Maximum number of functions that can execute concurrently. If less than 1, defaults to 1 (sequential execution).

Returns:

  • An Executor that can be used with DoWithExecutor, MapSliceWithExecutor, and other *WithExecutor variant functions.

The executor must be closed when no longer needed to release resources:

exec := NewDefaultExecutor(5)
defer exec.Close()

Example usage:

// Create executor with max 3 concurrent operations
exec := NewDefaultExecutor(3)
defer exec.Close()

// Use with DoWithExecutor to reuse across multiple batches
batch1 := []func(context.Context) error{...}
batch2 := []func(context.Context) error{...}

if err := DoWithExecutor(exec, batch1...); err != nil {
    return err
}
if err := DoWithExecutor(exec, batch2...); err != nil {
    return err
}

Executor reuse is beneficial when processing multiple batches of work as it avoids the overhead of creating and destroying executors repeatedly.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL