store

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2025 License: Unlicense Imports: 62 Imported by: 0

Documentation

Overview

Package store provides a pooled memory allocator for Arrow operations.

Index

Constants

View Source
const (
	MaxConnections = 16
	MaxLayers      = 16
	EfConstruction = 128
	ML             = 0.36 // 1 / ln(MaxConnections)
)
View Source
const (
	MerkleFanout = 16
	MerkleDepth  = 4
)
View Source
const (
	// DefaultMaxIdleConns is the default maximum number of idle connections across all hosts
	DefaultMaxIdleConns = 100
	// DefaultMaxIdleConnsPerHost is the default maximum number of idle connections per host
	DefaultMaxIdleConnsPerHost = 100
	// DefaultIdleConnTimeout is the default timeout for idle connections
	DefaultIdleConnTimeout = 90 * time.Second
)

Connection pool default settings for S3 backend

View Source
const DefaultArenaSize = 64 * 1024

DefaultArenaSize is the default capacity for pooled arenas (64KB)

Variables

View Source
var ErrCircuitOpen = errors.New("circuit breaker is open")

ErrCircuitOpen is returned when circuit breaker rejects request

View Source
var (
	ErrNoHealthyReplicas = errors.New("no healthy replicas available")
)

Functions

func AdviseMemory

func AdviseMemory(ptr unsafe.Pointer, size uintptr, advice MemoryAdvice) error

AdviseMemory provides hints to the kernel about the memory usage pattern. This is a no-op on non-Unix systems.

func AdviseRecord

func AdviseRecord(rec arrow.RecordBatch, advice MemoryAdvice)

AdviseRecord provides memory hints for all buffers in an Arrow RecordBatch.

func BuildNamespacedPath

func BuildNamespacedPath(namespace, dataset string) string

BuildNamespacedPath combines namespace and dataset into a path.

func CachedRecordSize

func CachedRecordSize(rec arrow.RecordBatch) int64

CachedRecordSize uses global cache

func EnsureTimestampZeroCopy

func EnsureTimestampZeroCopy(mem memory.Allocator, rec arrow.RecordBatch) (arrow.RecordBatch, error)

EnsureTimestampZeroCopy ensures the record has a timestamp column, adding one if missing (zero-copy optimized)

func EstimateRecordSize

func EstimateRecordSize(rec arrow.RecordBatch) int64

EstimateRecordSize is a heuristic or fast path. For now, we alias to calculateRecordSize as it's reasonably fast (O(cols)).

func FastPathEqual

func FastPathEqual(ctx context.Context, arr arrow.Array, value string) (arrow.Array, error)

FastPathEqual applies equality filter directly without Arrow Compute overhead. Returns a boolean mask array where true indicates matching values.

func FastPathGreater

func FastPathGreater(ctx context.Context, arr arrow.Array, value string) (arrow.Array, error)

FastPathGreater applies greater-than filter directly without Arrow Compute overhead.

func FastPathGreaterEqual

func FastPathGreaterEqual(ctx context.Context, arr arrow.Array, value string) (arrow.Array, error)

FastPathGreaterEqual applies greater-than-or-equal filter directly without Arrow Compute overhead.

func FastPathLess

func FastPathLess(ctx context.Context, arr arrow.Array, value string) (arrow.Array, error)

FastPathLess applies less-than filter directly without Arrow Compute overhead.

func FastPathLessEqual

func FastPathLessEqual(ctx context.Context, arr arrow.Array, value string) (arrow.Array, error)

FastPathLessEqual applies less-than-or-equal filter directly without Arrow Compute overhead.

func FastPathNotEqual

func FastPathNotEqual(ctx context.Context, arr arrow.Array, value string) (arrow.Array, error)

FastPathNotEqual applies not-equal filter directly without Arrow Compute overhead. Returns a boolean mask array where true indicates non-matching values.

func GetCurrentCPU

func GetCurrentCPU() int

GetCurrentCPU returns the current CPU number or -1 if unavailable. This is Linux-specific using getcpu syscall.

func GetJSONEncoder

func GetJSONEncoder() (*bytes.Buffer, *json.Encoder)

GetJSONEncoder retrieves a pooled encoder and buffer.

func GetNumaNode

func GetNumaNode(ptr unsafe.Pointer) (int, error)

GetNumaNode returns the NUMA node (memory bank) where the page containing the given pointer resides. On non-Linux systems, returns -1 and nil error.

func GetTCPNoDelayConnectionsTotal

func GetTCPNoDelayConnectionsTotal() int64

GetTCPNoDelayConnectionsTotal returns the total number of connections with TCP_NODELAY set. This is useful for testing and monitoring.

func HammingDistanceBatch

func HammingDistanceBatch(query []uint64, vectors [][]uint64, distances []int)

HammingDistanceBatch computes Hamming distances from query to multiple vectors. Optimized for batch processing with better cache utilization.

func HammingDistancePOPCNT

func HammingDistancePOPCNT(a, b []uint64) int

HammingDistancePOPCNT computes Hamming distance using POPCNT instruction. Returns the number of differing bits between two binary vectors. Uses math/bits.OnesCount64 which compiles to POPCNT on supported CPUs.

func IncrementRecordAccess

func IncrementRecordAccess()

IncrementRecordAccess should be called when a record is accessed (for metrics)

func IsFastPathSupported

func IsFastPathSupported(dt arrow.DataType, op FilterOperator) bool

IsFastPathSupported returns true if the data type and operator can use the fast path that bypasses Arrow Compute overhead.

func IsNotFoundError

func IsNotFoundError(err error) bool

IsNotFoundError checks if an error is a NotFoundError

func LockMemory

func LockMemory(ptr unsafe.Pointer, size uintptr) error

LockMemory pins the memory region in RAM, preventing it from being swapped out.

func MatchesFilters

func MatchesFilters(rec arrow.RecordBatch, rowIdx int, filters []Filter) (bool, error)

MatchesFilters checks if a specific row satisfies the filters.

func MustToGRPCStatus

func MustToGRPCStatus(err error) error

MustToGRPCStatus is like ToGRPCStatus but panics if conversion fails. Useful for testing.

func NewConfigError

func NewConfigError(component, field, value, message string) error

NewConfigError creates a configuration error with timestamp.

func NewDimensionMismatchError

func NewDimensionMismatchError(dataset string, expected, actual int) error

NewDimensionMismatchError creates a dimension mismatch error.

func NewInternalError

func NewInternalError(operation string, cause error) error

NewInternalError creates an internal error.

func NewInvalidArgumentError

func NewInvalidArgumentError(field, message string) error

NewInvalidArgumentError creates an invalid argument error.

func NewNotFoundError

func NewNotFoundError(resource, name string) error

NewNotFoundError creates a not found error.

func NewPersistenceError

func NewPersistenceError(operation string, cause error) error

NewPersistenceError creates a persistence error.

func NewReplicationError

func NewReplicationError(op, peerAddr, dataset string, cause error) error

NewReplicationError creates a replication error with timestamp.

func NewResourceExhaustedError

func NewResourceExhaustedError(resource, message string) error

NewResourceExhaustedError creates a resource exhausted error.

func NewS3Error

func NewS3Error(op, bucket, key string, cause error) error

NewS3Error creates an S3 error with timestamp.

func NewSchemaMismatchError

func NewSchemaMismatchError(dataset, message string) error

NewSchemaMismatchError creates a schema mismatch error.

func NewShutdownError

func NewShutdownError(phase, component string, cause error) error

NewShutdownError creates a shutdown error with timestamp.

func NewUnavailableError

func NewUnavailableError(operation, reason string) error

NewUnavailableError creates an unavailable error.

func NewWALError

func NewWALError(op, path string, offset int64, cause error) error

NewWALError creates a WAL error with timestamp.

func PackBytesToFloat32s

func PackBytesToFloat32s(codes []uint8) []float32

PackBytesToFloat32s packs uint8 codes into float32 slice for HNSW storage. Packs 4 bytes into 1 float32.

func ParseNamespacedPath

func ParseNamespacedPath(path string) (namespace, dataset string)

ParseNamespacedPath parses a path into namespace and dataset components. Format: "namespace/dataset" or "dataset" (uses "default" namespace) Examples:

  • "tenant1/mydata" -> ("tenant1", "mydata")
  • "mydata" -> ("default", "mydata")
  • "org/project/data" -> ("org", "project/data")

func PinThreadToCore

func PinThreadToCore(core int) error

PinThreadToCore pins the current goroutine's thread to a specific CPU core. This is most effective when combined with runtime.LockOSThread(). On non-Linux systems, this returns nil (no-op).

func PinThreadToNode

func PinThreadToNode(node int) error

PinThreadToNode pins the current goroutine's thread to the set of CPUs associated with the given NUMA node. On non-Linux systems, this returns nil (no-op).

func PinToNUMANode

func PinToNUMANode(topo *NUMATopology, nodeID int) error

PinToNUMANode pins the current goroutine to CPUs on the specified NUMA node. This ensures that the goroutine runs on CPUs local to the NUMA node, reducing remote memory access latency.

func PutArena

func PutArena(arena *SearchArena)

PutArena returns a SearchArena to the global pool for reuse. The arena is automatically reset before being pooled.

func PutJSONEncoder

func PutJSONEncoder(buf *bytes.Buffer, enc *json.Encoder)

PutJSONEncoder returns the encoder and buffer to the pool.

func ResetGlobalSizeCache

func ResetGlobalSizeCache()

ResetGlobalSizeCache resets the global cache

func RetainRecordBatch

func RetainRecordBatch(rec arrow.RecordBatch) arrow.RecordBatch

RetainRecordBatch simply retains the record batch for zero-copy access. Caller must call Release() when done.

func SQ8CosineDistance

func SQ8CosineDistance(q1, q2 []uint8, enc *SQ8Encoder) float32

SQ8CosineDistance computes cosine distance between quantized vectors.

func SQ8DistanceFast

func SQ8DistanceFast(q1, q2 []uint8) uint32

SQ8DistanceFast computes squared L2 distance directly in quantized space. This is faster but returns integer distance (not actual Euclidean). Useful for ranking (ordering is preserved) but not exact distances.

func SQ8EuclideanDistance

func SQ8EuclideanDistance(q1, q2 []uint8, enc *SQ8Encoder) float32

SQ8EuclideanDistance computes Euclidean distance between quantized vectors. It decodes to float32 for accurate distance computation.

func ShouldUsePipeline

func ShouldUsePipeline(numBatches int) bool

func ToGRPCStatus

func ToGRPCStatus(err error) error

ToGRPCStatus converts a domain error to a gRPC status error with appropriate code. This provides specific Flight status codes for client-side debugging.

func UnlockMemory

func UnlockMemory(ptr unsafe.Pointer, size uintptr) error

UnlockMemory unpins a previously locked memory region.

func UnmarshalZeroCopy

func UnmarshalZeroCopy(payload *ZeroCopyPayload, mem memory.Allocator) (arrow.RecordBatch, error)

UnmarshalZeroCopy reconstructs a RecordBatch from a ZeroCopyPayload. This is currently a simplified implementation that uses the provided buffers to rebuild ArrayData objects.

func UnpackFloat32sToBytes

func UnpackFloat32sToBytes(packed []float32, length int) []uint8

UnpackFloat32sToBytes reconstructs uint8 codes from packed float32 slice.

func ZeroCopyRecordBatch

func ZeroCopyRecordBatch(mem memory.Allocator, rec arrow.RecordBatch, deleted *Bitset) (arrow.RecordBatch, error)

ZeroCopyRecordBatch creates a zero-copy view of a record batch with tombstone filtering. Uses Arrow's compute kernels for efficient slicing without deep copies.

Types

type AdaptiveIndex

type AdaptiveIndex struct {
	// contains filtered or unexported fields
}

AdaptiveIndex automatically switches between BruteForce and HNSW based on size.

func NewAdaptiveIndex

func NewAdaptiveIndex(ds *Dataset, cfg AdaptiveIndexConfig) *AdaptiveIndex

NewAdaptiveIndex creates an adaptive index starting with BruteForce.

func (*AdaptiveIndex) AddByLocation

func (a *AdaptiveIndex) AddByLocation(batchIdx, rowIdx int) error

AddByLocation adds a vector and potentially triggers migration to HNSW.

func (*AdaptiveIndex) GetIndexType

func (a *AdaptiveIndex) GetIndexType() string

GetIndexType returns the current index type.

func (*AdaptiveIndex) GetMigrationCount

func (a *AdaptiveIndex) GetMigrationCount() int64

GetMigrationCount returns the number of times migration occurred.

func (*AdaptiveIndex) Len

func (a *AdaptiveIndex) Len() int

Len returns the number of indexed vectors.

func (*AdaptiveIndex) SearchVectors

func (a *AdaptiveIndex) SearchVectors(query []float32, k int, filters []Filter) ([]SearchResult, error)

SearchVectors delegates to the active index.

type AdaptiveIndexConfig

type AdaptiveIndexConfig struct {
	// Threshold is the number of vectors at which to switch from BruteForce to HNSW.
	Threshold int

	// Enabled controls whether adaptive indexing is active.
	Enabled bool
}

AdaptiveIndexConfig controls automatic switching between BruteForce and HNSW.

func DefaultAdaptiveIndexConfig

func DefaultAdaptiveIndexConfig() AdaptiveIndexConfig

DefaultAdaptiveIndexConfig returns sensible defaults for adaptive indexing.

func (AdaptiveIndexConfig) Validate

func (c AdaptiveIndexConfig) Validate() error

Validate checks that the configuration is valid.

type AdaptiveIntervalCalculator

type AdaptiveIntervalCalculator struct {
	// contains filtered or unexported fields
}

AdaptiveIntervalCalculator computes optimal flush intervals based on load

func NewAdaptiveIntervalCalculator

func NewAdaptiveIntervalCalculator(cfg AdaptiveWALConfig) *AdaptiveIntervalCalculator

NewAdaptiveIntervalCalculator creates a new calculator with the given config

func (*AdaptiveIntervalCalculator) CalculateInterval

func (c *AdaptiveIntervalCalculator) CalculateInterval(writeRate float64) time.Duration

CalculateInterval returns the optimal flush interval for the given write rate

type AdaptiveWALConfig

type AdaptiveWALConfig struct {
	MinInterval   time.Duration // Minimum flush interval under high load (e.g., 1ms)
	MaxInterval   time.Duration // Maximum flush interval under low load (e.g., 100ms)
	TargetLatency time.Duration // Target p99 latency for writes (e.g., 5ms)
	Enabled       bool          // Whether adaptive batching is enabled
}

AdaptiveWALConfig configures adaptive batching behavior

func NewAdaptiveWALConfig

func NewAdaptiveWALConfig() AdaptiveWALConfig

NewAdaptiveWALConfig returns sensible defaults for adaptive batching

func (*AdaptiveWALConfig) Validate

func (c *AdaptiveWALConfig) Validate() error

Validate checks the configuration for consistency

type AllocatorAwareCache

type AllocatorAwareCache struct {
	// contains filtered or unexported fields
}

AllocatorAwareCache provides cached access to Arrow RecordBatches with zero-copy buffer retrieval

func NewAllocatorAwareCache

func NewAllocatorAwareCache(alloc memory.Allocator, maxSize int) *AllocatorAwareCache

NewAllocatorAwareCache creates a new cache with zero-copy support

func (*AllocatorAwareCache) Get

Get retrieves a RecordBatch from cache Returns a retained reference - caller must Release()

func (*AllocatorAwareCache) GetBufferDirect

func (c *AllocatorAwareCache) GetBufferDirect(key string, colIdx, bufIdx int) []byte

GetBufferDirect retrieves underlying buffer bytes without copying colIdx: column index, bufIdx: buffer index (0=validity, 1=data)

func (*AllocatorAwareCache) Put

func (c *AllocatorAwareCache) Put(key string, rec arrow.RecordBatch)

Put stores a RecordBatch in the cache with proper reference counting

type ArrayStructure

type ArrayStructure struct {
	Length    int
	NullCount int
	Offset    int
	Buffers   int              // Number of buffers used by this node specifically
	Children  []ArrayStructure // Recursive children
}

ArrayStructure captures the metadata needed to rebuild an ArrayData hierarchy.

type AsyncFsyncConfig

type AsyncFsyncConfig struct {
	DirtyThreshold   int64
	MaxPendingFsyncs int
	FsyncTimeout     time.Duration
	Enabled          bool
	FallbackToSync   bool
}

AsyncFsyncConfig configures asynchronous fsync behavior for WAL

func DefaultAsyncFsyncConfig

func DefaultAsyncFsyncConfig() AsyncFsyncConfig

DefaultAsyncFsyncConfig returns sensible defaults

func (AsyncFsyncConfig) IsAsyncEnabled

func (c AsyncFsyncConfig) IsAsyncEnabled() bool

IsAsyncEnabled returns whether async fsync is enabled

func (AsyncFsyncConfig) ShouldFsync

func (c AsyncFsyncConfig) ShouldFsync(dirtyBytes int64) bool

ShouldFsync returns true if dirty bytes exceed threshold

func (AsyncFsyncConfig) Validate

func (c AsyncFsyncConfig) Validate() error

Validate checks if the configuration is valid

type AsyncFsyncer

type AsyncFsyncer struct {
	// contains filtered or unexported fields
}

AsyncFsyncer manages asynchronous fsync operations

func NewAsyncFsyncer

func NewAsyncFsyncer(cfg AsyncFsyncConfig) *AsyncFsyncer

NewAsyncFsyncer creates a new async fsyncer

func (*AsyncFsyncer) AddDirtyBytes

func (a *AsyncFsyncer) AddDirtyBytes(n int64)

AddDirtyBytes adds to the dirty byte counter

func (*AsyncFsyncer) Config

func (a *AsyncFsyncer) Config() AsyncFsyncConfig

Config returns the configuration

func (*AsyncFsyncer) DirtyBytes

func (a *AsyncFsyncer) DirtyBytes() int64

DirtyBytes returns current dirty byte count

func (*AsyncFsyncer) ForceFsync

func (a *AsyncFsyncer) ForceFsync() error

ForceFsync performs a synchronous fsync (fallback)

func (*AsyncFsyncer) IsRunning

func (a *AsyncFsyncer) IsRunning() bool

IsRunning returns whether the fsyncer is active

func (*AsyncFsyncer) RequestFsyncIfNeeded

func (a *AsyncFsyncer) RequestFsyncIfNeeded() bool

RequestFsyncIfNeeded requests fsync if dirty threshold exceeded

func (*AsyncFsyncer) Start

func (a *AsyncFsyncer) Start(f *os.File) error

Start begins the async fsync goroutine

func (*AsyncFsyncer) Stats

func (a *AsyncFsyncer) Stats() AsyncFsyncerStats

Stats returns current statistics

func (*AsyncFsyncer) Stop

func (a *AsyncFsyncer) Stop() error

Stop gracefully shuts down the fsyncer

func (*AsyncFsyncer) WaitForPendingFsyncs

func (a *AsyncFsyncer) WaitForPendingFsyncs()

WaitForPendingFsyncs waits for all pending fsyncs to complete

type AsyncFsyncerStats

type AsyncFsyncerStats struct {
	TotalRequests   int64
	CompletedFsyncs int64
	FailedFsyncs    int64
	SyncFallbacks   int64
	QueueFullDrops  int64
}

AsyncFsyncerStats holds statistics for the async fsyncer

type AutoShardingConfig

type AutoShardingConfig struct {
	// Enabled determines if auto-sharding is active.
	Enabled bool
	// ShardThreshold is the number of vectors at which to trigger sharding.
	ShardThreshold int
	// ShardCount is the target number of shards to create (defaults to NumCPU).
	ShardCount int
}

AutoShardingConfig configures the auto-sharding behavior.

func DefaultAutoShardingConfig

func DefaultAutoShardingConfig() AutoShardingConfig

DefaultAutoShardingConfig returns a standard configuration.

type AutoShardingIndex

type AutoShardingIndex struct {
	// contains filtered or unexported fields
}

AutoShardingIndex wraps a VectorIndex and transparently upgrades it to a ShardedHNSW when the dataset grows beyond a threshold.

func NewAutoShardingIndex

func NewAutoShardingIndex(ds *Dataset, config AutoShardingConfig) *AutoShardingIndex

NewAutoShardingIndex creates a new auto-sharding index. Initially, it uses a standard HNSWIndex.

func (*AutoShardingIndex) AddByLocation

func (a *AutoShardingIndex) AddByLocation(batchIdx, rowIdx int) (uint32, error)

AddByLocation adds a vector to the index.

func (*AutoShardingIndex) AddByRecord

func (a *AutoShardingIndex) AddByRecord(rec arrow.RecordBatch, rowIdx, batchIdx int) (uint32, error)

AddByRecord adds a vector from a record batch.

func (*AutoShardingIndex) Close

func (idx *AutoShardingIndex) Close() error

func (*AutoShardingIndex) GetDimension

func (a *AutoShardingIndex) GetDimension() uint32

GetDimension implements VectorIndex.

func (*AutoShardingIndex) GetLocation

func (idx *AutoShardingIndex) GetLocation(id VectorID) (Location, bool)

GetLocation retrieves the storage location for a given vector ID.

func (*AutoShardingIndex) Len

func (a *AutoShardingIndex) Len() int

Len implements VectorIndex.

func (*AutoShardingIndex) SearchVectors

func (a *AutoShardingIndex) SearchVectors(query []float32, k int, filters []Filter) []SearchResult

SearchVectors implements VectorIndex.

func (*AutoShardingIndex) SearchVectorsWithBitmap

func (a *AutoShardingIndex) SearchVectorsWithBitmap(query []float32, k int, filter *Bitset) []SearchResult

SearchVectorsWithBitmap implements VectorIndex.

func (*AutoShardingIndex) SetIndexedColumns

func (idx *AutoShardingIndex) SetIndexedColumns(cols []string)

SetIndexedColumns configures which columns are indexed for fast equality lookups

func (*AutoShardingIndex) Warmup

func (idx *AutoShardingIndex) Warmup() int

Warmup delegates to the current index.

type BM25Config

type BM25Config struct {
	K1 float64 // Term frequency saturation parameter (default: 1.2)
	B  float64 // Length normalization parameter (default: 0.75)
}

BM25Config holds BM25 algorithm parameters. K1 controls term frequency saturation (typically 1.2-2.0) B controls document length normalization (0=no normalization, 1=full normalization)

func DefaultBM25Config

func DefaultBM25Config() BM25Config

DefaultBM25Config returns standard BM25 parameters.

func (BM25Config) Validate

func (c BM25Config) Validate() error

Validate checks if the BM25 configuration is valid.

type BM25InvertedIndex

type BM25InvertedIndex struct {
	// contains filtered or unexported fields
}

BM25InvertedIndex is a sharded inverted index with proper BM25 scoring

func NewBM25InvertedIndex

func NewBM25InvertedIndex(config BM25Config) *BM25InvertedIndex

NewBM25InvertedIndex creates a new BM25 inverted index

func (*BM25InvertedIndex) Add

func (idx *BM25InvertedIndex) Add(id VectorID, text string)

Add indexes a document with the given text

func (*BM25InvertedIndex) Delete

func (idx *BM25InvertedIndex) Delete(id VectorID)

Delete removes a document from the index

func (*BM25InvertedIndex) DocCount

func (idx *BM25InvertedIndex) DocCount() int

DocCount returns the total number of documents in the index

func (*BM25InvertedIndex) GetDocLength

func (idx *BM25InvertedIndex) GetDocLength(id VectorID) int

GetDocLength returns the length (term count) of a document

func (*BM25InvertedIndex) GetTermDocFreq

func (idx *BM25InvertedIndex) GetTermDocFreq(term string) int

GetTermDocFreq returns the number of documents containing the term

func (*BM25InvertedIndex) SearchBM25

func (idx *BM25InvertedIndex) SearchBM25(query string, limit int) []SearchResult

SearchBM25 returns documents matching the query, scored by BM25

type BM25Scorer

type BM25Scorer struct {
	// contains filtered or unexported fields
}

BM25Scorer computes BM25 relevance scores for documents. It maintains corpus statistics for IDF calculation and length normalization.

func NewBM25Scorer

func NewBM25Scorer(config BM25Config) *BM25Scorer

NewBM25Scorer creates a new BM25 scorer with the given configuration.

func (*BM25Scorer) AddDocument

func (s *BM25Scorer) AddDocument(docLength int)

AddDocument registers a document with the given length to the corpus.

func (*BM25Scorer) AvgDocLength

func (s *BM25Scorer) AvgDocLength() float64

AvgDocLength returns the average document length in the corpus.

func (*BM25Scorer) Config

func (s *BM25Scorer) Config() BM25Config

Config returns the BM25 configuration.

func (*BM25Scorer) IDF

func (s *BM25Scorer) IDF(docFreq int) float64

IDF computes the Inverse Document Frequency for a term. Uses the BM25 IDF formula: log((N - df + 0.5) / (df + 0.5) + 1) where N is total documents and df is document frequency.

func (*BM25Scorer) RemoveDocument

func (s *BM25Scorer) RemoveDocument(docLength int)

RemoveDocument removes a document with the given length from the corpus.

func (*BM25Scorer) Score

func (s *BM25Scorer) Score(tf, docLength, docFreq int) float64

Score computes the BM25 score for a term in a document. Parameters:

  • tf: term frequency in the document
  • docLength: length of the document (number of terms)
  • docFreq: number of documents containing the term

Returns the BM25 score component for this term.

func (*BM25Scorer) ScoreMultiTerm

func (s *BM25Scorer) ScoreMultiTerm(docLength int, terms []struct{ TF, DocFreq int }) float64

ScoreMultiTerm computes the total BM25 score for multiple terms. Optimization: Acquires read lock ONCE for all terms.

func (*BM25Scorer) TotalDocs

func (s *BM25Scorer) TotalDocs() int

TotalDocs returns the number of documents in the corpus.

type BackpressureConfig

type BackpressureConfig struct {
	SoftLimitBytes    uint64
	HardLimitBytes    uint64
	CheckInterval     time.Duration
	SoftPressureDelay time.Duration
}

BackpressureConfig configures memory backpressure behavior.

type BatchRemapInfo

type BatchRemapInfo struct {
	NewBatchIdx int
	NewRowIdxs  []int // Maps oldRowIdx to newRowIdx in NewBatchIdx, or -1 if dropped
}

BatchRemapInfo describes how a batch ID maps to a new one

type BinaryQuantizer

type BinaryQuantizer struct {
	Threshold []float32 // Per-dimension threshold (typically median)
}

BinaryQuantizer holds per-dimension thresholds for binary quantization.

func TrainBinaryQuantizer

func TrainBinaryQuantizer(vectors [][]float32) (*BinaryQuantizer, error)

TrainBinaryQuantizer learns per-dimension thresholds from sample vectors. Uses median as threshold for balanced bit distribution.

func (*BinaryQuantizer) AsymmetricBinaryDistance

func (bq *BinaryQuantizer) AsymmetricBinaryDistance(query []float32, quantized []uint64) float32

AsymmetricBinaryDistance computes distance from float32 query to binary vector. This provides better recall than pure binary comparison.

func (*BinaryQuantizer) Dims

func (bq *BinaryQuantizer) Dims() int

Dims returns the number of dimensions this quantizer handles.

func (*BinaryQuantizer) Quantize

func (bq *BinaryQuantizer) Quantize(vec []float32) []uint64

Quantize converts a float32 vector to bit-packed uint64 representation. Each dimension becomes 1 bit: value >= threshold -> 1, else -> 0. Bits are packed LSB-first: dimension 0 is bit 0 of first uint64.

func (*BinaryQuantizer) QuantizeInto

func (bq *BinaryQuantizer) QuantizeInto(vec []float32, dst []uint64)

QuantizeInto encodes a vector into a pre-allocated destination slice. Zero-allocation hot path for bulk encoding.

type BitmapPool

type BitmapPool struct {
	// contains filtered or unexported fields
}

BitmapPool manages pooled bitmap buffers to reduce GC pressure

func NewBitmapPool

func NewBitmapPool(cfg BitmapPoolConfig) *BitmapPool

NewBitmapPool creates a new bitmap pool

func (*BitmapPool) Close

func (bp *BitmapPool) Close()

Close clears the pool

func (*BitmapPool) Get

func (bp *BitmapPool) Get(numBits int) *PooledBitmap

Get retrieves a bitmap buffer of at least numBits capacity

func (*BitmapPool) Put

func (bp *BitmapPool) Put(buf *PooledBitmap)

Put returns a bitmap buffer to the pool

func (*BitmapPool) Stats

func (bp *BitmapPool) Stats() BitmapPoolStats

Stats returns pool statistics

type BitmapPoolConfig

type BitmapPoolConfig struct {
	// SizeBuckets defines the pooled buffer sizes (in bits)
	SizeBuckets []int
	// MaxBufferSize is the largest buffer that will be pooled
	MaxBufferSize int
	// MaxPoolSize per bucket (0 = unlimited, managed by sync.Pool)
	MaxPoolSize int
	// MetricsEnabled enables Prometheus metrics
	MetricsEnabled bool
}

BitmapPoolConfig configures the bitmap buffer pool

func DefaultBitmapPoolConfig

func DefaultBitmapPoolConfig() BitmapPoolConfig

DefaultBitmapPoolConfig returns production defaults

func (*BitmapPoolConfig) Validate

func (c *BitmapPoolConfig) Validate() error

Validate checks configuration

type BitmapPoolStats

type BitmapPoolStats struct {
	Gets     uint64 // Total get operations
	Puts     uint64 // Total put operations
	Hits     uint64 // Buffers returned from pool
	Misses   uint64 // New allocations (pool empty)
	Discards uint64 // Buffers too large to pool
}

BitmapPoolStats contains pool statistics

type Bitset

type Bitset struct {
	// contains filtered or unexported fields
}

Bitset is a thread-safe wrapper around a Roaring Bitmap (Item 10)

func NewBitset

func NewBitset() *Bitset

func (*Bitset) Clear

func (b *Bitset) Clear(i int)

func (*Bitset) Clone

func (b *Bitset) Clone() *Bitset

Clone creates a thread-safe copy of the bitset.

func (*Bitset) Contains

func (b *Bitset) Contains(i int) bool

func (*Bitset) Count

func (b *Bitset) Count() uint64

Count returns the number of set bits.

func (*Bitset) Set

func (b *Bitset) Set(i int)

func (*Bitset) ToUint32Array

func (b *Bitset) ToUint32Array() []uint32

ToUint32Array returns the set bits as a slice of uint32.

type BloomFilter

type BloomFilter struct {
	// contains filtered or unexported fields
}

BloomFilter is a space-efficient probabilistic data structure for testing set membership with no false negatives.

func NewBloomFilter

func NewBloomFilter(n int, p float64) *BloomFilter

NewBloomFilter creates a bloom filter optimized for n items with false positive rate p. Uses optimal size and hash count formulas:

m = -n*ln(p) / (ln(2)^2)
k = m/n * ln(2)

func (*BloomFilter) Add

func (bf *BloomFilter) Add(item string)

Add inserts an item into the bloom filter.

func (*BloomFilter) Contains

func (bf *BloomFilter) Contains(item string) bool

Contains checks if an item might be in the set. Returns true if possibly present (may be false positive). Returns false if definitely not present (never false negative).

type BruteForceIndex

type BruteForceIndex struct {
	// contains filtered or unexported fields
}

BruteForceIndex implements VectorIndex using linear scan O(N) search.

func NewBruteForceIndex

func NewBruteForceIndex(ds *Dataset) *BruteForceIndex

NewBruteForceIndex creates a new brute force index for the given dataset.

func (*BruteForceIndex) AddByLocation

func (b *BruteForceIndex) AddByLocation(batchIdx, rowIdx int) error

AddByLocation adds a vector from the dataset using batch and row indices.

func (*BruteForceIndex) Len

func (b *BruteForceIndex) Len() int

Len returns the number of indexed vectors.

func (*BruteForceIndex) SearchVectors

func (b *BruteForceIndex) SearchVectors(query []float32, k int, filters []Filter) ([]SearchResult, error)

SearchVectors returns the k nearest neighbors using linear scan.

type BufferRetainerStats

type BufferRetainerStats struct {
	ActiveBuffers      int
	TotalBytesRetained int64
	TotalRetained      int64
	TotalReleased      int64
}

BufferRetainerStats holds statistics about retained buffers

type BytePool

type BytePool struct {
	// contains filtered or unexported fields
}

BytePool pools bytes.Buffer instances to reduce allocation pressure in hot paths (WAL, serialization).

func NewBytePool

func NewBytePool() *BytePool

NewBytePool creates a new buffer pool.

func (*BytePool) Get

func (p *BytePool) Get() *bytes.Buffer

Get retrieves a buffer from the pool. The buffer is guaranteed to be empty (Reset called).

func (*BytePool) Put

func (p *BytePool) Put(buf *bytes.Buffer)

Put returns a buffer to the pool after resetting it.

type COWMetadataMap

type COWMetadataMap struct {
	// contains filtered or unexported fields
}

COWMetadataMap provides a Copy-On-Write map for dataset metadata. Reads are lock-free via atomic pointer; writes copy the entire map. This eliminates lock contention in ListFlights.

func NewCOWMetadataMap

func NewCOWMetadataMap() *COWMetadataMap

NewCOWMetadataMap creates a new Copy-On-Write metadata map.

func (*COWMetadataMap) Delete

func (c *COWMetadataMap) Delete(name string)

Delete removes metadata for a dataset (copy-on-write).

func (*COWMetadataMap) Get

func (c *COWMetadataMap) Get(name string) (DatasetMetadata, bool)

Get returns metadata for a dataset (lock-free read).

func (*COWMetadataMap) IncrementStats

func (c *COWMetadataMap) IncrementStats(name string, rows int64, batches int)

IncrementStats atomically increments row and batch counts.

func (*COWMetadataMap) Len

func (c *COWMetadataMap) Len() int

Len returns the number of datasets in the map.

func (*COWMetadataMap) Set

func (c *COWMetadataMap) Set(name string, meta DatasetMetadata)

Set adds or updates metadata for a dataset (copy-on-write).

func (*COWMetadataMap) Snapshot

func (c *COWMetadataMap) Snapshot() map[string]DatasetMetadata

Snapshot returns the current map for iteration. The returned map is immutable and safe to iterate without locks.

func (*COWMetadataMap) UpdateFromRecords

func (c *COWMetadataMap) UpdateFromRecords(name string, batches []arrow.RecordBatch)

UpdateFromRecords updates metadata from a slice of record batches.

type CheckpointConfig

type CheckpointConfig struct {
	Interval       time.Duration
	Timeout        time.Duration
	MinWALSize     int64
	QuorumRequired int
}

CheckpointConfig configures coordinated checkpointing behavior.

type CheckpointCoordinator

type CheckpointCoordinator struct {
	// contains filtered or unexported fields
}

CheckpointCoordinator manages distributed checkpoint synchronization.

func NewCheckpointCoordinator

func NewCheckpointCoordinator(cfg CheckpointConfig) *CheckpointCoordinator

NewCheckpointCoordinator creates a new checkpoint coordinator.

func (*CheckpointCoordinator) GetCheckpointCount

func (c *CheckpointCoordinator) GetCheckpointCount() uint64

GetCheckpointCount returns the total number of checkpoints completed.

func (*CheckpointCoordinator) GetEpoch

func (c *CheckpointCoordinator) GetEpoch() uint64

GetEpoch returns the current checkpoint epoch.

func (*CheckpointCoordinator) InitiateCheckpoint

func (c *CheckpointCoordinator) InitiateCheckpoint(ctx context.Context) error

InitiateCheckpoint starts a new checkpoint epoch.

func (*CheckpointCoordinator) RecoverFromEpoch

func (c *CheckpointCoordinator) RecoverFromEpoch(epoch uint64)

RecoverFromEpoch sets the epoch during recovery.

func (*CheckpointCoordinator) RegisterParticipant

func (c *CheckpointCoordinator) RegisterParticipant(id string)

RegisterParticipant adds a participant to the checkpoint barrier.

func (*CheckpointCoordinator) ShouldTruncateWAL

func (c *CheckpointCoordinator) ShouldTruncateWAL(epoch uint64) bool

ShouldTruncateWAL returns true if WAL can be truncated after checkpoint.

func (*CheckpointCoordinator) UnregisterParticipant

func (c *CheckpointCoordinator) UnregisterParticipant(id string)

UnregisterParticipant removes a participant from the checkpoint barrier.

func (*CheckpointCoordinator) WaitForBarrier

func (c *CheckpointCoordinator) WaitForBarrier(ctx context.Context, participantID string, epoch uint64) error

WaitForBarrier waits until quorum participants reach the barrier.

type ChunkHandler

type ChunkHandler func(ctx context.Context, chunk *FlightDataChunk) error

ChunkHandler is the callback for processing a chunk.

type ChunkWorkerPool

type ChunkWorkerPool struct {
	// contains filtered or unexported fields
}

ChunkWorkerPool processes chunks from a queue using multiple workers.

func NewChunkWorkerPool

func NewChunkWorkerPool(config ChunkWorkerPoolConfig, queue *FlightDataQueue, handler ChunkHandler) *ChunkWorkerPool

NewChunkWorkerPool creates a new worker pool.

func (*ChunkWorkerPool) IsRunning

func (p *ChunkWorkerPool) IsRunning() bool

IsRunning returns whether pool is running.

func (*ChunkWorkerPool) Start

func (p *ChunkWorkerPool) Start()

Start launches worker goroutines.

func (*ChunkWorkerPool) Stats

Stats returns current statistics.

func (*ChunkWorkerPool) Stop

func (p *ChunkWorkerPool) Stop()

Stop signals workers to stop and waits for completion.

type ChunkWorkerPoolConfig

type ChunkWorkerPoolConfig struct {
	NumWorkers     int           // number of worker goroutines
	ProcessTimeout time.Duration // timeout for processing each chunk
}

ChunkWorkerPoolConfig configures the worker pool.

func DefaultChunkWorkerPoolConfig

func DefaultChunkWorkerPoolConfig() ChunkWorkerPoolConfig

DefaultChunkWorkerPoolConfig returns sensible defaults.

func (ChunkWorkerPoolConfig) Validate

func (c ChunkWorkerPoolConfig) Validate() error

Validate checks config validity.

type ChunkWorkerPoolStats

type ChunkWorkerPoolStats struct {
	Processed int64
	Errors    int64
}

ChunkWorkerPoolStats tracks worker pool operations.

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern

func NewCircuitBreaker

func NewCircuitBreaker(config CircuitBreakerConfig) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker

func (*CircuitBreaker) Allow

func (cb *CircuitBreaker) Allow() bool

Allow checks if a request should be allowed

func (*CircuitBreaker) Execute

func (cb *CircuitBreaker) Execute(fn func() (interface{}, error)) (interface{}, error)

Execute wraps a function with circuit breaker logic

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failed operation

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess records a successful operation

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset()

Reset resets the circuit breaker to closed state

func (*CircuitBreaker) State

func (cb *CircuitBreaker) State() CircuitState

State returns current circuit state

func (*CircuitBreaker) Stats

func (cb *CircuitBreaker) Stats() CircuitBreakerStats

Stats returns current statistics

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	FailureThreshold int           // failures before opening
	SuccessThreshold int           // successes in half-open before closing
	Timeout          time.Duration // time in open state before half-open
}

CircuitBreakerConfig holds configuration for circuit breaker

func DefaultCircuitBreakerConfig

func DefaultCircuitBreakerConfig() CircuitBreakerConfig

DefaultCircuitBreakerConfig returns sensible defaults

type CircuitBreakerRegistry

type CircuitBreakerRegistry struct {
	// contains filtered or unexported fields
}

CircuitBreakerRegistry manages multiple circuit breakers by key

func NewCircuitBreakerRegistry

func NewCircuitBreakerRegistry(config CircuitBreakerConfig) *CircuitBreakerRegistry

NewCircuitBreakerRegistry creates a new registry

func (*CircuitBreakerRegistry) GetOrCreate

func (r *CircuitBreakerRegistry) GetOrCreate(key string) *CircuitBreaker

GetOrCreate returns an existing circuit breaker or creates a new one

func (*CircuitBreakerRegistry) Reset

func (r *CircuitBreakerRegistry) Reset(key string)

Reset resets a specific circuit breaker

func (*CircuitBreakerRegistry) ResetAll

func (r *CircuitBreakerRegistry) ResetAll()

ResetAll resets all circuit breakers

type CircuitBreakerStats

type CircuitBreakerStats struct {
	Successes    int64
	Failures     int64
	Rejections   int64
	State        CircuitState
	LastFailure  time.Time
	LastSuccess  time.Time
	StateChanges int64
}

CircuitBreakerStats holds operational statistics

type CircuitState

type CircuitState int32

Circuit breaker states

const (
	CircuitClosed   CircuitState = iota // Normal operation
	CircuitOpen                         // Failing, reject requests
	CircuitHalfOpen                     // Testing recovery
)

func (CircuitState) String

func (s CircuitState) String() string

type ClockComparison

type ClockComparison int

ClockComparison represents the result of comparing two vector clocks

const (
	ClockEqual      ClockComparison = iota // Clocks are identical
	ClockBefore                            // First clock happened before second
	ClockAfter                             // First clock happened after second
	ClockConcurrent                        // Clocks are concurrent (conflict)
)

type ColumnInvertedIndex

type ColumnInvertedIndex struct {
	// contains filtered or unexported fields
}

ColumnInvertedIndex provides O(1) equality lookups on indexed columns Structure: dataset -> column -> value -> []RowPosition

func NewColumnInvertedIndex

func NewColumnInvertedIndex() *ColumnInvertedIndex

NewColumnInvertedIndex creates a new column-based inverted index

func (*ColumnInvertedIndex) BuildFilterMask

func (idx *ColumnInvertedIndex) BuildFilterMask(datasetName string, recordIdx int, columnName, value string, numRows int, mem memory.Allocator) *array.Boolean

BuildFilterMask creates a boolean mask for filtering using indexed lookup Returns nil if no index exists for the column

func (*ColumnInvertedIndex) FilterRecordWithIndex

func (idx *ColumnInvertedIndex) FilterRecordWithIndex(ctx context.Context, datasetName string, recordIdx int, rec arrow.RecordBatch, filter Filter, mem memory.Allocator) (arrow.RecordBatch, error)

FilterRecordWithIndex applies an equality filter using the index for O(1) lookup Falls back to compute.Filter if no index exists

func (*ColumnInvertedIndex) GetMatchingRowIndices

func (idx *ColumnInvertedIndex) GetMatchingRowIndices(datasetName string, recordIdx int, columnName, value string) []int

GetMatchingRowIndices returns row indices within a specific record

func (*ColumnInvertedIndex) HasIndex

func (idx *ColumnInvertedIndex) HasIndex(datasetName, columnName string) bool

HasIndex checks if an index exists for the given dataset and column

func (*ColumnInvertedIndex) IndexRecord

func (idx *ColumnInvertedIndex) IndexRecord(datasetName string, recordIdx int, rec arrow.RecordBatch, columnsToIndex []string)

IndexRecord indexes specified columns of a record batch

func (*ColumnInvertedIndex) Lookup

func (idx *ColumnInvertedIndex) Lookup(datasetName, columnName, value string) []RowPosition

Lookup returns all row positions matching the given value Returns empty slice if not found (O(1) lookup)

func (*ColumnInvertedIndex) RemoveDataset

func (idx *ColumnInvertedIndex) RemoveDataset(datasetName string)

RemoveDataset removes all indices for a dataset

func (*ColumnInvertedIndex) RemoveRecord

func (idx *ColumnInvertedIndex) RemoveRecord(datasetName string, recordIdx int)

RemoveRecord removes all index entries for a specific record

func (*ColumnInvertedIndex) Stats

type ColumnInvertedIndexStats

type ColumnInvertedIndexStats struct {
	Datasets       int
	TotalColumns   int
	TotalValues    int
	TotalPositions int
}

Stats returns statistics about the index

type ColumnMetadata

type ColumnMetadata struct {
	Name      string
	Type      arrow.DataType
	AddedAt   uint64 // version when added
	DroppedAt uint64 // version when dropped (0 = not dropped)
}

ColumnMetadata tracks column lifecycle

type Community

type Community struct {
	ID      int
	Members []string
}

Community represents a detected graph community

type CompactionCandidate

type CompactionCandidate struct {
	StartIdx int
	EndIdx   int // Exclusive
	TotalRow int64
}

CompactionCandidate represents a range of batches to be merged

type CompactionConfig

type CompactionConfig struct {
	// TargetBatchSize is the target number of rows per compacted batch.
	TargetBatchSize int64
	// MinBatchesToCompact is the minimum number of batches before compaction triggers.
	MinBatchesToCompact int
	// CompactionInterval is how often the compaction worker checks for work.
	CompactionInterval time.Duration
	// Enabled controls whether background compaction runs.
	Enabled bool
}

CompactionConfig configures the background RecordBatch compaction worker.

func DefaultCompactionConfig

func DefaultCompactionConfig() CompactionConfig

DefaultCompactionConfig returns sensible defaults for compaction.

func (CompactionConfig) Validate

func (c CompactionConfig) Validate() error

Validate checks if the configuration is valid.

type CompactionStats

type CompactionStats struct {
	CompactionsRun int64
	BatchesMerged  int64
	RowsProcessed  int64
	LastRunTime    time.Time
}

CompactionStats tracks compaction worker statistics.

type CompactionWorker

type CompactionWorker struct {
	// contains filtered or unexported fields
}

CompactionWorker runs background compaction of RecordBatches.

func NewCompactionWorker

func NewCompactionWorker(store *VectorStore, cfg CompactionConfig) *CompactionWorker

NewCompactionWorker creates a new compaction worker with the given config.

func (*CompactionWorker) GetTriggerCount

func (w *CompactionWorker) GetTriggerCount() int64

GetTriggerCount returns the total number of auto-compaction triggers.

func (*CompactionWorker) IsRunning

func (w *CompactionWorker) IsRunning() bool

IsRunning returns true if the worker is currently running.

func (*CompactionWorker) Start

func (w *CompactionWorker) Start()

Start begins the background compaction goroutine.

func (*CompactionWorker) Stats

func (w *CompactionWorker) Stats() CompactionStats

Stats returns current compaction statistics.

func (*CompactionWorker) Stop

func (w *CompactionWorker) Stop()

Stop halts the background compaction goroutine.

func (*CompactionWorker) TriggerCompaction

func (w *CompactionWorker) TriggerCompaction(dataset string) error

TriggerCompaction triggers compaction for a specific dataset. This is non-blocking - if the channel is full, it returns without blocking.

type CompareOp

type CompareOp int

CompareOp represents a comparison operation type

const (
	CompareOpLess CompareOp = iota
	CompareOpGreater
	CompareOpLessEqual
	CompareOpGreaterEqual
)

type CompressedBitmap

type CompressedBitmap struct {
	// contains filtered or unexported fields
}

CompressedBitmap provides a memory-efficient bitset using Roaring Bitmaps. It is optimized for high-cardinality integer sets (e.g., VectorID lists, tombstones).

func NewBitmap

func NewBitmap() *CompressedBitmap

NewBitmap creates a new empty compressed bitmap.

func (*CompressedBitmap) Add

func (b *CompressedBitmap) Add(id VectorID)

Add sets the bit for the given VectorID.

func (*CompressedBitmap) Cardinality

func (b *CompressedBitmap) Cardinality() uint64

Cardinality returns the number of set bits.

func (*CompressedBitmap) Contains

func (b *CompressedBitmap) Contains(id VectorID) bool

Contains checks if the bit for the given VectorID is set.

func (*CompressedBitmap) Difference

func (b *CompressedBitmap) Difference(other *CompressedBitmap) *CompressedBitmap

Difference returns a new bitmap containing elements in b but not in other.

func (*CompressedBitmap) Intersection

func (b *CompressedBitmap) Intersection(other *CompressedBitmap) *CompressedBitmap

Intersection returns a new bitmap containing the intersection of b and other.

func (*CompressedBitmap) Iterator

func (b *CompressedBitmap) Iterator() roaring.IntIterable

Iterator returns an iterator over the set bits.

func (*CompressedBitmap) Remove

func (b *CompressedBitmap) Remove(id VectorID)

Remove clears the bit for the given VectorID.

func (*CompressedBitmap) Union

Union returns a new bitmap containing the union of b and other.

type ConfigError

type ConfigError struct {
	Component string    // Component: "NUMA", "S3Backend", "Replication"
	Field     string    // Configuration field name
	Value     string    // Invalid value (as string)
	Message   string    // Validation error message
	Timestamp time.Time // When the error occurred
}

ConfigError provides rich context for configuration validation errors.

func (*ConfigError) Error

func (e *ConfigError) Error() string

type ConsistencyLevel

type ConsistencyLevel int

ConsistencyLevel defines the consistency requirements for read/write operations

const (
	ConsistencyOne    ConsistencyLevel = iota // At least one node must acknowledge
	ConsistencyQuorum                         // Majority of nodes must acknowledge
	ConsistencyAll                            // All nodes must acknowledge
)

func ParseConsistencyLevel

func ParseConsistencyLevel(s string) (ConsistencyLevel, error)

ParseConsistencyLevel parses a string into ConsistencyLevel

func (ConsistencyLevel) String

func (cl ConsistencyLevel) String() string

String returns the string representation of ConsistencyLevel

type DataServer

type DataServer struct {
	*VectorStore
}

DataServer handles data plane operations (DoGet, DoPut) Embeds VectorStore to inherit base interface, overrides methods for error conversion.

func NewDataServer

func NewDataServer(store *VectorStore) *DataServer

func (*DataServer) DoAction

func (s *DataServer) DoAction(action *flight.Action, stream flight.FlightService_DoActionServer) error

DoAction returns Unimplemented on DataServer (data plane only)

func (*DataServer) DoExchange

func (s *DataServer) DoExchange(stream flight.FlightService_DoExchangeServer) error

DoExchange delegates to VectorStore with error conversion

func (*DataServer) DoGet

DoGet retrieves a dataset, converting domain errors to gRPC status codes.

func (*DataServer) DoPut

DoPut stores a dataset, converting domain errors to gRPC status codes.

func (*DataServer) GetFlightInfo

func (s *DataServer) GetFlightInfo(ctx context.Context, desc *flight.FlightDescriptor) (*flight.FlightInfo, error)

GetFlightInfo returns Unimplemented on DataServer

func (*DataServer) GetSchema

GetSchema delegates to VectorStore with error conversion

func (*DataServer) ListFlights

ListFlights returns Unimplemented on DataServer

type Dataset

type Dataset struct {
	Records []arrow.RecordBatch

	Version int64
	Index   VectorIndex // Use common interface (Item 3)

	Name   string
	Schema *arrow.Schema
	Topo   *NUMATopology

	// Tombstones map BatchIdx -> Bitset of deleted RowIdxs
	Tombstones map[int]*Bitset

	// BatchNodes tracks which NUMA node each RecordBatch is allocated on
	BatchNodes []int

	// Memory tracking
	SizeBytes atomic.Int64

	// LWW State
	LWW *TimestampMap

	// Anti-Entropy
	Merkle *MerkleTree

	// Hybrid Search
	InvertedIndexes map[string]*InvertedIndex
	BM25Index       *BM25InvertedIndex
	// contains filtered or unexported fields
}

Dataset wraps records with metadata for eviction and tombstones

func NewDataset

func NewDataset(name string, schema *arrow.Schema) *Dataset

func (*Dataset) AddToIndex

func (d *Dataset) AddToIndex(batchIdx, rowIdx int) error

AddToIndex adds a vector to the index

func (*Dataset) EvictExpiredRecords

func (d *Dataset) EvictExpiredRecords() []arrow.RecordBatch

EvictExpiredRecords removes TTL-expired records from the dataset using tombstones Returns the number of records evicted

func (*Dataset) GetExistingSchema

func (ds *Dataset) GetExistingSchema() *arrow.Schema

GetExistingSchema returns the schema of the first record in the dataset. Uses dataMu RLock for read-only access to Records. Returns nil if dataset has no records.

func (*Dataset) GetRecord

func (d *Dataset) GetRecord(idx int) (arrow.RecordBatch, bool)

GetRecord returns the record batch at the given index in a thread-safe manner.

func (*Dataset) GetRecordMetadata

func (d *Dataset) GetRecordMetadata(rec arrow.RecordBatch) *RecordMetadata

GetRecordMetadata retrieves eviction metadata for a record

func (*Dataset) GetRecordsCount

func (ds *Dataset) GetRecordsCount() int

GetRecordsCount returns the number of records in the dataset. Uses dataMu RLock for read-only access.

func (*Dataset) GetVectorIndex

func (d *Dataset) GetVectorIndex() VectorIndex

GetVectorIndex returns the current index safely

func (*Dataset) GetVersion

func (ds *Dataset) GetVersion() int64

GetVersion returns the current schema version. Uses RLock for read-only access.

func (*Dataset) IndexLen

func (d *Dataset) IndexLen() int

IndexLen returns the number of vectors in the index.

func (*Dataset) InitRecordEviction

func (d *Dataset) InitRecordEviction()

InitRecordEviction initializes the per-record eviction manager for a dataset

func (*Dataset) IsSharded

func (d *Dataset) IsSharded() bool

IsSharded returns true if the dataset uses ShardedHNSW.

func (*Dataset) LastAccess

func (d *Dataset) LastAccess() time.Time

func (*Dataset) MigrateToShardedIndex

func (d *Dataset) MigrateToShardedIndex(cfg AutoShardingConfig) error

MigrateToShardedIndex migrates the current index to a sharded index

func (*Dataset) RegisterRecordWithTTL

func (d *Dataset) RegisterRecordWithTTL(rec arrow.RecordBatch, ttl time.Duration)

RegisterRecordWithTTL registers a record for per-record TTL eviction

func (*Dataset) SearchDataset

func (d *Dataset) SearchDataset(query []float32, k int) []SearchResult

SearchDataset delegates to the vector index if available

func (*Dataset) SetLastAccess

func (d *Dataset) SetLastAccess(t time.Time)

func (*Dataset) UpgradeSchemaVersion

func (ds *Dataset) UpgradeSchemaVersion()

UpgradeSchemaVersion increments the dataset version. Uses Lock for exclusive write access.

type DatasetMetadata

type DatasetMetadata struct {
	Name       string
	Schema     *arrow.Schema
	TotalRows  int64
	BatchCount int
}

DatasetMetadata holds lightweight metadata for ListFlights without requiring locks on the main Dataset struct.

type DeltaSync

type DeltaSync struct {
	FromVersion  uint64
	ToVersion    uint64
	NewLocations []Location
	StartIndex   int
}

DeltaSync represents incremental changes between versions.

type DirectBufferReader

type DirectBufferReader struct {
	// contains filtered or unexported fields
}

DirectBufferReader provides zero-copy access to Arrow buffers

func NewDirectBufferReader

func NewDirectBufferReader(alloc memory.Allocator) *DirectBufferReader

NewDirectBufferReader creates a new direct buffer reader

func (*DirectBufferReader) Allocator

func (r *DirectBufferReader) Allocator() memory.Allocator

Allocator returns the underlying allocator

func (*DirectBufferReader) CreateArrayDataFromBuffer

func (r *DirectBufferReader) CreateArrayDataFromBuffer(
	dt arrow.DataType,
	length int,
	dataBuf []byte,
	nullBitmap []byte,
) arrow.ArrayData

CreateArrayDataFromBuffer creates arrow.ArrayData directly from a byte buffer without copying the data. The buffer must remain valid for the lifetime of the returned ArrayData.

func (*DirectBufferReader) GetBufferReference

func (r *DirectBufferReader) GetBufferReference(data []byte) []byte

GetBufferReference returns a reference to the buffer without copying The returned slice points to the same underlying array as the input

type DiskANNConfig

type DiskANNConfig struct {
	MaxDegree    int
	BeamWidth    int
	BuildThreads int
}

DiskANNConfig holds DiskANN-specific configuration

type DiskANNIndex

type DiskANNIndex struct {
	// contains filtered or unexported fields
}

DiskANNIndex implements PluggableVectorIndex for DiskANN algorithm

func (*DiskANNIndex) Add

func (d *DiskANNIndex) Add(id uint64, vector []float32) error

func (*DiskANNIndex) AddBatch

func (d *DiskANNIndex) AddBatch(ids []uint64, vectors [][]float32) error

func (*DiskANNIndex) AddByLocation

func (d *DiskANNIndex) AddByLocation(batchIdx, rowIdx int) error

func (*DiskANNIndex) Build

func (d *DiskANNIndex) Build() error

func (*DiskANNIndex) Close

func (d *DiskANNIndex) Close() error

func (*DiskANNIndex) Dimension

func (d *DiskANNIndex) Dimension() int

func (*DiskANNIndex) Len

func (d *DiskANNIndex) Len() int

func (*DiskANNIndex) Load

func (d *DiskANNIndex) Load(path string) error

func (*DiskANNIndex) NeedsBuild

func (d *DiskANNIndex) NeedsBuild() bool

func (*DiskANNIndex) Save

func (d *DiskANNIndex) Save(path string) error

func (*DiskANNIndex) Search

func (d *DiskANNIndex) Search(query []float32, k int) ([]IndexSearchResult, error)

func (*DiskANNIndex) SearchBatch

func (d *DiskANNIndex) SearchBatch(queries [][]float32, k int) ([][]IndexSearchResult, error)

func (*DiskANNIndex) SearchVectors

func (d *DiskANNIndex) SearchVectors(query []float32, k int) []SearchResult

func (*DiskANNIndex) Size

func (d *DiskANNIndex) Size() int

func (*DiskANNIndex) Type

func (d *DiskANNIndex) Type() IndexType

type DoGetPipeline

type DoGetPipeline struct {
	// contains filtered or unexported fields
}

DoGetPipeline implements pipelined processing for DoGet operations. Workers filter batches in parallel while the main thread writes, hiding compute latency behind network I/O.

func NewDoGetPipeline

func NewDoGetPipeline(workers, bufferSize int) *DoGetPipeline

NewDoGetPipeline creates a new pipeline with the specified number of workers. If workers is 0, defaults to runtime.NumCPU(). If bufferSize is 0, defaults to workers * 2.

func (*DoGetPipeline) NumWorkers

func (p *DoGetPipeline) NumWorkers() int

NumWorkers returns the number of worker goroutines

func (*DoGetPipeline) Process

func (p *DoGetPipeline) Process(
	ctx context.Context,
	batches []arrow.RecordBatch,
	filterFn FilterFunc,
) (results <-chan PipelineResult, errs <-chan error)

Process starts pipelined processing of batches. Returns a channel of ordered results and an error channel. Results are guaranteed to arrive in the same order as input batches.

func (*DoGetPipeline) ProcessRecords

func (p *DoGetPipeline) ProcessRecords(
	ctx context.Context,
	records []arrow.RecordBatch,
	tombstones map[int]*Bitset,
	filters []Filter,
	evaluator interface{},
) <-chan PipelineStage

ProcessRecords is the compatibility method for store.go

func (*DoGetPipeline) Stats

func (p *DoGetPipeline) Stats() PipelineStats

Stats returns current pipeline statistics

func (*DoGetPipeline) Stop

func (p *DoGetPipeline) Stop()

type DoGetPipelineConfig

type DoGetPipelineConfig struct {
	Workers    int // Number of parallel workers
	BufferSize int // Channel buffer size
	Threshold  int // Minimum batches to trigger pipeline (vs serial)
}

DoGetPipelineConfig holds configuration for DoGet pipeline processing

func DefaultDoGetPipelineConfig

func DefaultDoGetPipelineConfig() DoGetPipelineConfig

DefaultDoGetPipelineConfig returns sensible defaults

type DoGetPipelinePool

type DoGetPipelinePool struct {
	// contains filtered or unexported fields
}

DoGetPipelinePool manages a pool of reusable pipelines

func NewDoGetPipelinePool

func NewDoGetPipelinePool(workers, bufferSize int) *DoGetPipelinePool

NewDoGetPipelinePool creates a pool of pipelines

func (*DoGetPipelinePool) Get

func (pp *DoGetPipelinePool) Get() *DoGetPipeline

Get retrieves a pipeline from the pool

func (*DoGetPipelinePool) Put

func (pp *DoGetPipelinePool) Put(p *DoGetPipeline)

Put returns a pipeline to the pool

type Edge

type Edge struct {
	Subject   string  // Source entity (e.g., "user:alice")
	Predicate string  // Relationship type (e.g., "owns", "likes")
	Object    string  // Target entity (e.g., "doc:report1")
	Weight    float32 // Edge weight for scoring
}

Edge represents a knowledge graph edge (subject -> predicate -> object)

type ErrDimensionMismatch

type ErrDimensionMismatch struct {
	Expected int
	Actual   int
	Dataset  string
}

ErrDimensionMismatch indicates vector dimension incompatibility.

func (*ErrDimensionMismatch) Error

func (e *ErrDimensionMismatch) Error() string

type ErrInternal

type ErrInternal struct {
	Operation string
	Cause     error
}

ErrInternal indicates an unexpected internal error.

func (*ErrInternal) Error

func (e *ErrInternal) Error() string

func (*ErrInternal) Unwrap

func (e *ErrInternal) Unwrap() error

type ErrInvalidArgument

type ErrInvalidArgument struct {
	Field   string
	Message string
}

ErrInvalidArgument indicates invalid input from the client.

func (*ErrInvalidArgument) Error

func (e *ErrInvalidArgument) Error() string

type ErrNotFound

type ErrNotFound struct {
	Resource string
	Name     string
}

ErrNotFound indicates a requested resource does not exist.

func (*ErrNotFound) Error

func (e *ErrNotFound) Error() string

type ErrPersistence

type ErrPersistence struct {
	Operation string
	Cause     error
}

ErrPersistence indicates a storage/persistence failure.

func (*ErrPersistence) Error

func (e *ErrPersistence) Error() string

func (*ErrPersistence) Unwrap

func (e *ErrPersistence) Unwrap() error

type ErrResourceExhausted

type ErrResourceExhausted struct {
	Resource string
	Message  string
}

ErrResourceExhausted indicates system resource limits exceeded.

func (*ErrResourceExhausted) Error

func (e *ErrResourceExhausted) Error() string

type ErrSchemaMismatch

type ErrSchemaMismatch struct {
	Dataset string
	Message string
}

ErrSchemaMismatch indicates incompatible schema between operations.

func (*ErrSchemaMismatch) Error

func (e *ErrSchemaMismatch) Error() string

type ErrUnavailable

type ErrUnavailable struct {
	Operation string
	Reason    string
}

ErrUnavailable indicates temporary unavailability (e.g., during snapshots).

func (*ErrUnavailable) Error

func (e *ErrUnavailable) Error() string

type FSBackend

type FSBackend struct {
	// contains filtered or unexported fields
}

FSBackend is a standard os.File backed implementation.

func NewFSBackend

func NewFSBackend(path string) (*FSBackend, error)

func (*FSBackend) Close

func (b *FSBackend) Close() error

func (*FSBackend) File

func (b *FSBackend) File() *os.File

func (*FSBackend) Name

func (b *FSBackend) Name() string

func (*FSBackend) Sync

func (b *FSBackend) Sync() error

func (*FSBackend) Write

func (b *FSBackend) Write(p []byte) (int, error)

type Filter

type Filter struct {
	Field    string `json:"field"`
	Operator string `json:"operator"`
	Value    string `json:"value"`
}

type FilterEvaluator

type FilterEvaluator struct {
	// contains filtered or unexported fields
}

FilterEvaluator pre-processes filters for a specific RecordBatch to enable fast scanning

func NewFilterEvaluator

func NewFilterEvaluator(rec arrow.RecordBatch, filters []Filter) (*FilterEvaluator, error)

NewFilterEvaluator creates a new evaluator, pre-binding filters to RecordBatch columns

func (*FilterEvaluator) Matches

func (e *FilterEvaluator) Matches(rowIdx int) bool

Matches returns true if the row satisfies all filters

func (*FilterEvaluator) MatchesAll

func (e *FilterEvaluator) MatchesAll(batchLen int) []int

MatchesAll evaluates all filters on the entire batch using SIMD and returns matching row indices. This is optimal for dense scans.

func (*FilterEvaluator) MatchesBatch

func (e *FilterEvaluator) MatchesBatch(rowIndices []int) []int

MatchesBatch evaluates filters for a slice of row indices and returns a subset of matching indices. This is the "SIMD-friendly" entry point for batch processing.

type FilterFunc

type FilterFunc func(ctx context.Context, rec arrow.RecordBatch) (arrow.RecordBatch, error)

FilterFunc is the signature for batch filtering functions

type FilterOperator

type FilterOperator int

FilterOperator represents a vectorized filter comparison operator

const (
	FilterOpEqual FilterOperator = iota
	FilterOpNotEqual
	FilterOpGreater
	FilterOpLess
	FilterOpGreaterEqual
	FilterOpLessEqual
	FilterOpIn
	FilterOpNotIn
	FilterOpContains
)

func ParseFilterOperator

func ParseFilterOperator(op string) (FilterOperator, error)

ParseFilterOperator parses a string operator into FilterOperator

func (FilterOperator) ComputeFunc

func (op FilterOperator) ComputeFunc() string

ComputeFunc returns the Arrow compute function name for the operator

func (FilterOperator) String

func (op FilterOperator) String() string

String returns the string representation of the operator

type FlightClientPool

type FlightClientPool struct {
	// contains filtered or unexported fields
}

FlightClientPool manages pooled connections to multiple peer Flight servers.

func NewFlightClientPool

func NewFlightClientPool(cfg FlightClientPoolConfig) *FlightClientPool

NewFlightClientPool creates a new connection pool.

func (*FlightClientPool) AddHost

func (p *FlightClientPool) AddHost(host string) error

AddHost adds a peer host to the pool.

func (*FlightClientPool) Close

func (p *FlightClientPool) Close() error

Close closes all connections and shuts down the pool.

func (*FlightClientPool) DoGetFromPeer

func (p *FlightClientPool) DoGetFromPeer(ctx context.Context, host, dataset string, filters []Filter) ([]arrow.RecordBatch, error)

DoGetFromPeer retrieves Arrow RecordBatches from a peer via Flight DoGet.

func (*FlightClientPool) DoPutToPeer

func (p *FlightClientPool) DoPutToPeer(ctx context.Context, host, dataset string, records []arrow.RecordBatch) error

DoPutToPeer sends Arrow RecordBatches to a peer via Flight DoPut.

func (*FlightClientPool) Get

Get acquires a connection to the specified host. If no idle connection is available, creates a new one.

func (*FlightClientPool) Put

func (p *FlightClientPool) Put(conn *PooledFlightClient)

Put returns a connection to the pool.

func (*FlightClientPool) RemoveHost

func (p *FlightClientPool) RemoveHost(host string) error

RemoveHost removes a peer host from the pool and closes its connections.

func (*FlightClientPool) ReplicateToPeers

func (p *FlightClientPool) ReplicateToPeers(ctx context.Context, peers []string, dataset string, records []arrow.RecordBatch) map[string]error

ReplicateToPeers replicates data to multiple peers concurrently. Returns a map of host -> error for failed replications.

func (*FlightClientPool) Stats

Stats returns current pool statistics.

type FlightClientPoolConfig

type FlightClientPoolConfig struct {
	// MaxConnsPerHost is the maximum connections per peer host (default: 10)
	MaxConnsPerHost int
	// MinConnsPerHost is the minimum idle connections to maintain (default: 1)
	MinConnsPerHost int
	// ConnTimeout is the connection establishment timeout (default: 10s)
	ConnTimeout time.Duration
	// IdleTimeout is how long idle connections are kept (default: 5m)
	IdleTimeout time.Duration
	// MaxConnLifetime is the maximum lifetime of any connection (default: 1h)
	MaxConnLifetime time.Duration
	// HealthCheckInterval is how often to check connection health (default: 30s)
	HealthCheckInterval time.Duration
	// EnableCompression enables gzip compression for Flight streams
	EnableCompression bool
}

FlightClientPoolConfig configures the Flight client connection pool.

func DefaultFlightClientPoolConfig

func DefaultFlightClientPoolConfig() FlightClientPoolConfig

DefaultFlightClientPoolConfig returns sensible defaults for Flight client pooling.

func (FlightClientPoolConfig) Validate

func (c FlightClientPoolConfig) Validate() error

Validate checks if the configuration is valid.

type FlightClientPoolStats

type FlightClientPoolStats struct {
	TotalHosts       int
	TotalConnections int
	ActiveConns      int
	IdleConns        int
	Gets             int64
	Puts             int64
	Hits             int64
	Misses           int64
	Timeouts         int64
	Errors           int64
}

FlightClientPoolStats provides statistics about the connection pool.

type FlightDataChunk

type FlightDataChunk struct {
	DatasetName string
	Record      arrow.RecordBatch
	ReceivedAt  time.Time
}

FlightDataChunk represents a received Arrow RecordBatch with metadata.

type FlightDataQueue

type FlightDataQueue struct {
	// contains filtered or unexported fields
}

FlightDataQueue is a buffered channel wrapper for decoupling receive from processing.

func NewFlightDataQueue

func NewFlightDataQueue(config FlightDataQueueConfig) *FlightDataQueue

NewFlightDataQueue creates a new queue.

func (*FlightDataQueue) Close

func (q *FlightDataQueue) Close()

Close closes the queue, preventing new enqueues.

func (*FlightDataQueue) Dequeue

func (q *FlightDataQueue) Dequeue(ctx context.Context) (*FlightDataChunk, bool)

Dequeue retrieves the next chunk, respecting context cancellation. Returns (nil, false) if queue is closed and empty or context canceled.

func (*FlightDataQueue) IsClosed

func (q *FlightDataQueue) IsClosed() bool

IsClosed returns whether queue is closed.

func (*FlightDataQueue) Len

func (q *FlightDataQueue) Len() int

Len returns current queue length.

func (*FlightDataQueue) Stats

Stats returns current statistics.

func (*FlightDataQueue) TryEnqueue

func (q *FlightDataQueue) TryEnqueue(chunk *FlightDataChunk) bool

TryEnqueue attempts to enqueue a chunk without blocking indefinitely. Returns false if queue is full or closed.

type FlightDataQueueConfig

type FlightDataQueueConfig struct {
	QueueSize      int           // buffered channel capacity
	EnqueueTimeout time.Duration // timeout for TryEnqueue
	DrainTimeout   time.Duration // timeout for draining on close
}

FlightDataQueueConfig configures the flight data queue.

func DefaultFlightDataQueueConfig

func DefaultFlightDataQueueConfig() FlightDataQueueConfig

DefaultFlightDataQueueConfig returns sensible defaults.

func (FlightDataQueueConfig) Validate

func (c FlightDataQueueConfig) Validate() error

Validate checks config validity.

type FlightDataQueueStats

type FlightDataQueueStats struct {
	Enqueued int64
	Dequeued int64
	Dropped  int64
}

FlightDataQueueStats tracks queue operations.

type FlightReaderBufferRetainer

type FlightReaderBufferRetainer struct {
	// contains filtered or unexported fields
}

FlightReaderBufferRetainer manages buffer lifecycles for zero-copy operations

func NewFlightReaderBufferRetainer

func NewFlightReaderBufferRetainer(maxBuffers int) *FlightReaderBufferRetainer

NewFlightReaderBufferRetainer creates a new buffer retainer

func (*FlightReaderBufferRetainer) GetBuffer

func (r *FlightReaderBufferRetainer) GetBuffer(handle uint64) []byte

GetBuffer retrieves a retained buffer by handle

func (*FlightReaderBufferRetainer) MaxBuffers

func (r *FlightReaderBufferRetainer) MaxBuffers() int

MaxBuffers returns the maximum number of retained buffers

func (*FlightReaderBufferRetainer) ReleaseBuffer

func (r *FlightReaderBufferRetainer) ReleaseBuffer(handle uint64) bool

ReleaseBuffer releases a retained buffer

func (*FlightReaderBufferRetainer) RetainBuffer

func (r *FlightReaderBufferRetainer) RetainBuffer(buf []byte) uint64

RetainBuffer retains a buffer and returns a handle for later retrieval

func (*FlightReaderBufferRetainer) Stats

Stats returns current buffer retainer statistics

type FusionMode

type FusionMode int

FusionMode defines how to combine dense and sparse results

const (
	FusionModeRRF     FusionMode = iota // Reciprocal Rank Fusion
	FusionModeLinear                    // Linear weighted combination
	FusionModeCascade                   // Cascade: filters -> keyword -> vector
)

type GRPCConfig

type GRPCConfig struct {
	// Keepalive parameters
	KeepAliveTime                time.Duration // Time between keepalive pings (server sends to client)
	KeepAliveTimeout             time.Duration // Timeout for keepalive ping acknowledgement
	KeepAliveMinTime             time.Duration // Minimum time client should wait between pings
	KeepAlivePermitWithoutStream bool          // Allow keepalive pings when no active streams

	// Concurrent streams limit
	MaxConcurrentStreams uint32 // Maximum concurrent streams per connection

	// HTTP/2 flow control window sizes
	InitialWindowSize     int32 // Initial window size for a stream
	InitialConnWindowSize int32 // Initial window size for a connection

	// Message size limits
	MaxRecvMsgSize int // Maximum message size the server can receive
	MaxSendMsgSize int // Maximum message size the server can send

	// Compression settings - enables gzip compression for 50-70% bandwidth reduction
	CompressionEnabled bool // Enable gzip compression for streaming data
}

GRPCConfig holds all gRPC server/client tuning parameters for optimal throughput. Configuring these parameters can yield 5-10% improvement in sustained throughput.

func DefaultGRPCConfig

func DefaultGRPCConfig() GRPCConfig

DefaultGRPCConfig returns a GRPCConfig with sensible defaults optimized for throughput. These defaults are tuned for vector database workloads with large message payloads.

func (GRPCConfig) BuildClientOptions

func (c GRPCConfig) BuildClientOptions() []grpc.DialOption

BuildClientOptions returns a slice of grpc.DialOption configured from this GRPCConfig. These options optimize gRPC client performance to match server settings. When CompressionEnabled is true, all calls will use gzip compression for 50-70% bandwidth reduction on vector data.

func (GRPCConfig) BuildServerOptions

func (c GRPCConfig) BuildServerOptions() []grpc.ServerOption

BuildServerOptions returns a slice of grpc.ServerOption configured from this GRPCConfig. These options optimize gRPC server performance for sustained throughput. Note: Server-side compression is automatically available when the gzip encoding package is imported - no explicit server option needed. The server will respond with compressed data when clients request it via UseCompressor.

func (GRPCConfig) ClientKeepaliveParams

func (c GRPCConfig) ClientKeepaliveParams() keepalive.ClientParameters

ClientKeepaliveParams returns keepalive.ClientParameters for client configuration.

func (GRPCConfig) ServerEnforcementPolicy

func (c GRPCConfig) ServerEnforcementPolicy() keepalive.EnforcementPolicy

ServerEnforcementPolicy returns keepalive.EnforcementPolicy for server configuration.

func (GRPCConfig) ServerKeepaliveParams

func (c GRPCConfig) ServerKeepaliveParams() keepalive.ServerParameters

ServerKeepaliveParams returns keepalive.ServerParameters for server configuration.

func (GRPCConfig) String

func (c GRPCConfig) String() string

String returns a human-readable representation of the config for logging.

func (GRPCConfig) Validate

func (c GRPCConfig) Validate() error

Validate checks if the configuration is valid.

type GlobalSearchCoordinator

type GlobalSearchCoordinator struct {
	// contains filtered or unexported fields
}

GlobalSearchCoordinator handles scatter-gather logic

func NewGlobalSearchCoordinator

func NewGlobalSearchCoordinator(logger *zap.Logger) *GlobalSearchCoordinator

func (*GlobalSearchCoordinator) GlobalSearch

func (c *GlobalSearchCoordinator) GlobalSearch(ctx context.Context, localResults []SearchResult, req VectorSearchRequest, peers []mesh.Member) ([]SearchResult, error)

GlobalSearch performs scatter-gather search across the cluster

type GraphStore

type GraphStore struct {
	// contains filtered or unexported fields
}

GraphStore manages knowledge graph edges for GraphRAG workflows

func NewGraphStore

func NewGraphStore() *GraphStore

NewGraphStore creates a new empty graph store

func (*GraphStore) AddEdge

func (gs *GraphStore) AddEdge(e Edge) error

AddEdge adds an edge to the graph store

func (*GraphStore) CommunityCount

func (gs *GraphStore) CommunityCount() int

CommunityCount returns the number of detected communities

func (*GraphStore) DetectCommunities

func (gs *GraphStore) DetectCommunities() []Community

DetectCommunities runs Louvain algorithm to find communities

func (*GraphStore) EdgeCount

func (gs *GraphStore) EdgeCount() int

EdgeCount returns the total number of edges

func (*GraphStore) FromArrowBatch

func (gs *GraphStore) FromArrowBatch(batch arrow.Record) error

FromArrowBatch loads edges from an Arrow RecordBatch

func (*GraphStore) GetCommunityForNode

func (gs *GraphStore) GetCommunityForNode(node string) int

GetCommunityForNode returns the community ID for a given node

func (*GraphStore) GetEdgesByObject

func (gs *GraphStore) GetEdgesByObject(object string) []Edge

GetEdgesByObject returns all edges with the given object (incoming edges)

func (*GraphStore) GetEdgesByPredicate

func (gs *GraphStore) GetEdgesByPredicate(predicate string) []Edge

GetEdgesByPredicate returns all edges with the given predicate

func (*GraphStore) GetEdgesBySubject

func (gs *GraphStore) GetEdgesBySubject(subject string) []Edge

GetEdgesBySubject returns all edges with the given subject (outgoing edges)

func (*GraphStore) PredicateVocabulary

func (gs *GraphStore) PredicateVocabulary() []string

PredicateVocabulary returns all unique predicate types

func (*GraphStore) ToArrowBatch

func (gs *GraphStore) ToArrowBatch(mem memory.Allocator) (arrow.Record, error)

ToArrowBatch converts all edges to an Arrow RecordBatch with Dictionary-encoded predicates

func (*GraphStore) Traverse

func (gs *GraphStore) Traverse(start string, maxHops int) []Path

Traverse performs BFS traversal from start node up to maxHops depth

func (*GraphStore) TraverseParallel

func (gs *GraphStore) TraverseParallel(starts []string, maxHops int) map[string][]Path

TraverseParallel performs concurrent traversal from multiple starting points

type HNSWGraphSync

type HNSWGraphSync struct {
	// contains filtered or unexported fields
}

HNSWGraphSync manages HNSW index synchronization between peers.

func NewHNSWGraphSync

func NewHNSWGraphSync(index *HNSWIndex) *HNSWGraphSync

NewHNSWGraphSync creates a new sync manager for the given index.

func (*HNSWGraphSync) ApplyDelta

func (s *HNSWGraphSync) ApplyDelta(delta *DeltaSync) error

ApplyDelta applies incremental changes from a delta.

func (*HNSWGraphSync) ExportDelta

func (s *HNSWGraphSync) ExportDelta(fromVersion uint64) (*DeltaSync, error)

ExportDelta exports changes since the given version.

func (*HNSWGraphSync) ExportGraph

func (s *HNSWGraphSync) ExportGraph(w io.Writer) error

ExportGraph exports just the HNSW graph structure.

func (*HNSWGraphSync) ExportState

func (s *HNSWGraphSync) ExportState() ([]byte, error)

ExportState serializes the complete HNSW index state.

func (*HNSWGraphSync) GetExportCount

func (s *HNSWGraphSync) GetExportCount() uint64

GetExportCount returns the number of exports performed.

func (*HNSWGraphSync) GetImportCount

func (s *HNSWGraphSync) GetImportCount() uint64

GetImportCount returns the number of imports performed.

func (*HNSWGraphSync) GetVersion

func (s *HNSWGraphSync) GetVersion() uint64

GetVersion returns the current sync version.

func (*HNSWGraphSync) ImportGraph

func (s *HNSWGraphSync) ImportGraph(r io.Reader) error

ImportGraph imports just the HNSW graph structure.

func (*HNSWGraphSync) ImportState

func (s *HNSWGraphSync) ImportState(data []byte) error

ImportState deserializes and applies HNSW index state.

func (*HNSWGraphSync) IncrementVersion

func (s *HNSWGraphSync) IncrementVersion()

IncrementVersion increments the sync version.

type HNSWIndex

type HNSWIndex struct {
	Graph *hnsw.Graph[VectorID]

	Metric VectorMetric // Distance metric used by this index
	// contains filtered or unexported fields
}

HNSWIndex wraps the hnsw.Graph and manages the mapping from ID to Arrow data.

func NewHNSWIndex

func NewHNSWIndex(ds *Dataset) *HNSWIndex

NewHNSWIndex creates a new index for the given dataset using Euclidean distance.

func NewHNSWIndexWithCapacity

func NewHNSWIndexWithCapacity(ds *Dataset, capacity int) *HNSWIndex

NewHNSWIndexWithCapacity creates a new index with pre-allocated locations slice.

func NewHNSWIndexWithMetric

func NewHNSWIndexWithMetric(ds *Dataset, metric VectorMetric) *HNSWIndex

NewHNSWIndexWithMetric creates a new index for the given dataset with the specified metric.

func (*HNSWIndex) Add

func (h *HNSWIndex) Add(batchIdx, rowIdx int) (uint32, error)

Add inserts a new vector location into the index and adds it to the graph.

func (*HNSWIndex) AddBatchParallel

func (h *HNSWIndex) AddBatchParallel(locations []Location, workers int) error

AddBatchParallel adds multiple vectors in parallel using worker goroutines. It uses a three-phase approach: 1. Pre-allocate locations atomically under single lock 2. Parallel vector retrieval using worker goroutines 3. Sequential graph insertion (hnsw library requirement) This can reduce build time by up to 85% compared to sequential insertion by parallelizing the expensive vector fetch operations.

func (*HNSWIndex) AddByLocation

func (h *HNSWIndex) AddByLocation(batchIdx, rowIdx int) (uint32, error)

AddByLocation implements VectorIndex interface for HNSWIndex.

func (*HNSWIndex) AddByRecord

func (h *HNSWIndex) AddByRecord(rec arrow.RecordBatch, rowIdx, batchIdx int) (uint32, error)

AddByRecord implements VectorIndex interface for HNSWIndex.

func (*HNSWIndex) AddSafe

func (h *HNSWIndex) AddSafe(rec arrow.RecordBatch, rowIdx, batchIdx int) (uint32, error)

AddSafe adds a vector using a direct record batch reference. It COPIES the vector to ensure it remains stable even if the record batch is released.

func (*HNSWIndex) Close

func (h *HNSWIndex) Close() error

Close releases resources associated with the index.

func (*HNSWIndex) CloseGPU

func (h *HNSWIndex) CloseGPU() error

CloseGPU releases GPU resources

func (*HNSWIndex) GetDimension

func (h *HNSWIndex) GetDimension() uint32

GetDimension returns the vector dimension for this index

func (*HNSWIndex) GetDistanceFunc

func (h *HNSWIndex) GetDistanceFunc() func(a, b []float32) float32

GetDistanceFunc returns the distance function. If PQ is enabled, it returns the SDC-accelerated PQ distance.

func (*HNSWIndex) GetLocation

func (h *HNSWIndex) GetLocation(id VectorID) (Location, bool)

GetLocation returns the storage location for a given VectorID

func (*HNSWIndex) InitGPU

func (h *HNSWIndex) InitGPU(deviceID int, logger *zap.Logger) error

InitGPU attempts to initialize GPU acceleration for this index

func (*HNSWIndex) IsGPUEnabled

func (h *HNSWIndex) IsGPUEnabled() bool

IsGPUEnabled returns whether GPU acceleration is active

func (*HNSWIndex) Len

func (h *HNSWIndex) Len() int

Len returns the number of vectors in the index

func (*HNSWIndex) PutResults

func (h *HNSWIndex) PutResults(results []VectorID)

PutResults returns a search result slice to the pool for reuse. Callers should call this when done with SearchByID results.

func (*HNSWIndex) RegisterReader

func (h *HNSWIndex) RegisterReader()

RegisterReader increments the active reader count for zero-copy safety

func (*HNSWIndex) RemapLocations

func (h *HNSWIndex) RemapLocations(remapping map[int]BatchRemapInfo)

RemapLocations safely updates vector locations after compaction. It iterates over all locations and applies the remapping if the batch index matches.

func (*HNSWIndex) RerankBatch

func (h *HNSWIndex) RerankBatch(query []float32, candidateIDs []VectorID, k int) []RankedResult

RerankBatch computes exact distances for a set of candidate IDs and returns top-k.

func (*HNSWIndex) Search

func (h *HNSWIndex) Search(query []float32, k int) []VectorID

Search performs k-NN search using the provided query vector.

func (*HNSWIndex) SearchBatch

func (h *HNSWIndex) SearchBatch(queries [][]float32, k int) [][]RankedResult

SearchBatch is a convenience method that calls SearchBatchOptimized.

func (*HNSWIndex) SearchBatchOptimized

func (h *HNSWIndex) SearchBatchOptimized(queries [][]float32, k int) [][]RankedResult

SearchBatchOptimized performs k-NN search for multiple query vectors using optimized batch distance calculations.

This method is more efficient than calling SearchWithBatchDistance multiple times because it can amortize the overhead of vector collection and leverage cache locality when processing multiple queries.

func (*HNSWIndex) SearchBatchWithArena

func (h *HNSWIndex) SearchBatchWithArena(queries [][]float32, k int, arena *SearchArena) [][]RankedResult

SearchBatchWithArena performs batch search using an arena allocator.

func (*HNSWIndex) SearchByID

func (h *HNSWIndex) SearchByID(id VectorID, k int) []VectorID

func (*HNSWIndex) SearchByIDUnsafe

func (h *HNSWIndex) SearchByIDUnsafe(id VectorID, k int) []VectorID

SearchByIDUnsafe performs k-NN search using zero-copy vector access. This avoids the allocation and copy overhead of SearchByID by using getVectorUnsafe with epoch protection. The vector data is only valid during the search operation. Returns nil if id is invalid or k <= 0.

func (*HNSWIndex) SearchHybrid

func (h *HNSWIndex) SearchHybrid(query []float32, k int) []SearchResult

SearchHybrid performs GPU+CPU hybrid search Uses GPU for candidate generation, then refines with CPU HNSW graph

func (*HNSWIndex) SearchVectors

func (h *HNSWIndex) SearchVectors(query []float32, k int, filters []Filter) []SearchResult

SearchVectors performs k-NN search returning full results with scores (distances). Uses striped locks for location access to reduce contention in result processing.

func (*HNSWIndex) SearchVectorsWithBitmap

func (h *HNSWIndex) SearchVectorsWithBitmap(query []float32, k int, filter *Bitset) []SearchResult

SearchVectorsWithBitmap returns k nearest neighbors filtered by a bitset.

func (*HNSWIndex) SearchWithArena

func (h *HNSWIndex) SearchWithArena(query []float32, k int, arena *SearchArena) []VectorID

SearchWithArena performs k-NN search using the provided arena for allocations. If arena is nil or exhausted, falls back to heap allocation. The returned slice is allocated from the arena and should NOT be returned to resultPool. Call arena.Reset() after processing results to reclaim memory for next request.

func (*HNSWIndex) SearchWithBatchDistance

func (h *HNSWIndex) SearchWithBatchDistance(query []float32, k int) []RankedResult

SearchWithBatchDistance performs k-NN search using batch distance calculations. This is a two-stage retrieval: 1. Coarse search using the HNSW graph to get initial candidates 2. Batch distance calculation on all candidates for precise ranking

Using batch SIMD operations provides significant speedup over per-vector distance calculations by reducing function call overhead and maximizing CPU pipeline utilization.

func (*HNSWIndex) SetIndexedColumns

func (h *HNSWIndex) SetIndexedColumns(cols []string)

SetIndexedColumns satisfies VectorIndex interface

func (*HNSWIndex) SetPQEncoder

func (h *HNSWIndex) SetPQEncoder(encoder *PQEncoder)

SetPQEncoder enables product quantization with the provided encoder.

func (*HNSWIndex) SyncGPU

func (h *HNSWIndex) SyncGPU(ids []int64, vectors []float32) error

SyncGPU adds vectors to the GPU index Should be called after adding vectors to the CPU index

func (*HNSWIndex) TrainPQ

func (h *HNSWIndex) TrainPQ(dimensions, m, ksub, iterations int) error

TrainPQ trains a PQ encoder on the current dataset elements and enables it. This is a blocking operation.

func (*HNSWIndex) UnregisterReader

func (h *HNSWIndex) UnregisterReader()

UnregisterReader decrements the active reader count

func (*HNSWIndex) Warmup

func (h *HNSWIndex) Warmup() int

Warmup implements VectorIndex interface.

type HNSWIndexConfig

type HNSWIndexConfig struct {
	M              int
	EfConstruction int
	EfSearch       int
}

HNSWIndexConfig holds HNSW-specific configuration

type HNSWPluggableAdapter

type HNSWPluggableAdapter struct {
	// contains filtered or unexported fields
}

HNSWPluggableAdapter wraps HNSWIndex to implement PluggableVectorIndex

func (*HNSWPluggableAdapter) Add

func (h *HNSWPluggableAdapter) Add(id uint64, vector []float32) error

func (*HNSWPluggableAdapter) AddBatch

func (h *HNSWPluggableAdapter) AddBatch(ids []uint64, vectors [][]float32) error

func (*HNSWPluggableAdapter) AddByLocation

func (h *HNSWPluggableAdapter) AddByLocation(batchIdx, rowIdx int) error

func (*HNSWPluggableAdapter) Build

func (h *HNSWPluggableAdapter) Build() error

func (*HNSWPluggableAdapter) Close

func (h *HNSWPluggableAdapter) Close() error

func (*HNSWPluggableAdapter) Dimension

func (h *HNSWPluggableAdapter) Dimension() int

func (*HNSWPluggableAdapter) Len

func (h *HNSWPluggableAdapter) Len() int

func (*HNSWPluggableAdapter) Load

func (h *HNSWPluggableAdapter) Load(path string) error

func (*HNSWPluggableAdapter) NeedsBuild

func (h *HNSWPluggableAdapter) NeedsBuild() bool

func (*HNSWPluggableAdapter) Save

func (h *HNSWPluggableAdapter) Save(path string) error

func (*HNSWPluggableAdapter) Search

func (h *HNSWPluggableAdapter) Search(query []float32, k int) ([]IndexSearchResult, error)

func (*HNSWPluggableAdapter) SearchBatch

func (h *HNSWPluggableAdapter) SearchBatch(queries [][]float32, k int) ([][]IndexSearchResult, error)

func (*HNSWPluggableAdapter) SearchVectors

func (h *HNSWPluggableAdapter) SearchVectors(query []float32, k int) []SearchResult

func (*HNSWPluggableAdapter) Size

func (h *HNSWPluggableAdapter) Size() int

func (*HNSWPluggableAdapter) Type

func (h *HNSWPluggableAdapter) Type() IndexType

type HybridPipelineConfig

type HybridPipelineConfig struct {
	Alpha          float32    // 0.0=pure keyword, 1.0=pure vector, 0.5=balanced
	RRFk           int        // RRF parameter k (typically 60)
	FusionMode     FusionMode // How to combine results
	UseColumnIndex bool       // Use column index for exact filters
}

HybridPipelineConfig configures the hybrid search pipeline

func DefaultHybridPipelineConfig

func DefaultHybridPipelineConfig() HybridPipelineConfig

DefaultHybridPipelineConfig returns sensible defaults

func (*HybridPipelineConfig) Validate

func (c *HybridPipelineConfig) Validate() error

type HybridQuery

type HybridQuery struct {
	Enabled   bool    // Whether hybrid search is enabled for this query
	TextQuery string  // The text query for BM25/keyword search
	Alpha     float64 // Weighting: 1.0 = pure vector, 0.0 = pure keyword
	K         int     // Number of results to return
}

HybridQuery represents a hybrid search query combining vector and BM25 search

func DefaultHybridQuery

func DefaultHybridQuery() HybridQuery

DefaultHybridQuery returns a HybridQuery with sensible defaults

func (*HybridQuery) IsPureKeyword

func (q *HybridQuery) IsPureKeyword() bool

IsPureKeyword returns true if this is a pure keyword/BM25 search (alpha=0.0)

func (*HybridQuery) IsPureVector

func (q *HybridQuery) IsPureVector() bool

IsPureVector returns true if this is a pure vector search (alpha=1.0)

func (*HybridQuery) Validate

func (q *HybridQuery) Validate() error

Validate checks if the HybridQuery is valid

type HybridSearchConfig

type HybridSearchConfig struct {
	// Alpha controls the weighting between vector and keyword search
	// Alpha = 1.0 means pure vector search (dense)
	// Alpha = 0.0 means pure keyword search (sparse/BM25)
	// Alpha = 0.5 means balanced hybrid
	Alpha float32

	// TextColumns specifies which columns should be indexed for BM25 search
	TextColumns []string

	// RRFk is the k parameter for Reciprocal Rank Fusion
	// Higher values give more weight to lower-ranked results
	// Typical values: 60 (default), range 1-1000
	RRFk int

	// BM25 contains the BM25 algorithm parameters
	BM25 BM25Config

	// Enabled determines if hybrid search is active
	Enabled bool
}

HybridSearchConfig configures hybrid search behavior combining BM25 and vector search

func DefaultHybridSearchConfig

func DefaultHybridSearchConfig() HybridSearchConfig

DefaultHybridSearchConfig returns a HybridSearchConfig with sensible defaults

func (*HybridSearchConfig) IsPureKeyword

func (c *HybridSearchConfig) IsPureKeyword() bool

IsPureKeyword returns true if Alpha is 0.0 (pure keyword search)

func (*HybridSearchConfig) IsPureVector

func (c *HybridSearchConfig) IsPureVector() bool

IsPureVector returns true if Alpha is 1.0 (pure vector search)

func (*HybridSearchConfig) Validate

func (c *HybridSearchConfig) Validate() error

Validate checks if the configuration is valid

type HybridSearchPipeline

type HybridSearchPipeline struct {
	// contains filtered or unexported fields
}

HybridSearchPipeline combines column index, BM25, and HNSW search

func NewHybridSearchPipeline

func NewHybridSearchPipeline(cfg HybridPipelineConfig) *HybridSearchPipeline

NewHybridSearchPipeline creates a new hybrid search pipeline

func (*HybridSearchPipeline) Search

Search performs hybrid search combining all configured indexes

func (*HybridSearchPipeline) SetBM25Index

func (p *HybridSearchPipeline) SetBM25Index(idx *BM25InvertedIndex)

SetBM25Index sets the BM25 inverted index for keyword search

func (*HybridSearchPipeline) SetColumnIndex

func (p *HybridSearchPipeline) SetColumnIndex(idx *ColumnInvertedIndex)

SetColumnIndex sets the column inverted index for exact filters

func (*HybridSearchPipeline) SetHNSWIndex

func (p *HybridSearchPipeline) SetHNSWIndex(idx *HNSWIndex)

SetHNSWIndex sets the HNSW index for vector search

type HybridSearchQuery

type HybridSearchQuery struct {
	Vector        []float32 // Query vector for dense search
	KeywordQuery  string    // Text query for BM25 search
	K             int       // Number of results
	AlphaOverride *float32  // Override pipeline alpha if set
	ExactFilters  []Filter  // Exact match filters (for column index)
}

HybridSearchQuery extends search with vector, keyword, and filter options

func DefaultHybridSearchQuery

func DefaultHybridSearchQuery() HybridSearchQuery

func (*HybridSearchQuery) Validate

func (q *HybridSearchQuery) Validate() error

type IPCBufferPool

type IPCBufferPool struct {
	// contains filtered or unexported fields
}

IPCBufferPool manages reusable bytes.Buffer instances for IPC serialization. Uses sync.Pool internally for thread-safe, lock-free buffer management.

func NewIPCBufferPool

func NewIPCBufferPool(cfg RecordWriterPoolConfig) *IPCBufferPool

NewIPCBufferPool creates a new buffer pool with the given configuration.

func (*IPCBufferPool) Get

func (p *IPCBufferPool) Get() *bytes.Buffer

Get retrieves a buffer from the pool or allocates a new one. The returned buffer is reset and ready for use.

func (*IPCBufferPool) Put

func (p *IPCBufferPool) Put(buf *bytes.Buffer)

Put returns a buffer to the pool for reuse. Oversized buffers (> MaxBufferSize) are discarded to prevent memory bloat.

func (*IPCBufferPool) Reset

func (p *IPCBufferPool) Reset()

Reset clears all statistics.

func (*IPCBufferPool) Stats

func (p *IPCBufferPool) Stats() IPCBufferPoolStats

Stats returns current pool statistics.

type IPCBufferPoolStats

type IPCBufferPoolStats struct {
	Gets      int64 // Total Get() calls
	Puts      int64 // Total Put() calls
	Hits      int64 // Buffer reused from pool
	Misses    int64 // New buffer allocated
	Discarded int64 // Oversized buffers discarded
}

IPCBufferPoolStats tracks pool usage metrics.

type IVFFlatConfig

type IVFFlatConfig struct {
	NClusters int
	NProbe    int
}

IVFFlatConfig holds IVF-Flat-specific configuration

type IVFFlatIndex

type IVFFlatIndex struct {
	// contains filtered or unexported fields
}

IVFFlatIndex implements PluggableVectorIndex for IVF-Flat algorithm

func (*IVFFlatIndex) Add

func (ivf *IVFFlatIndex) Add(id uint64, vector []float32) error

func (*IVFFlatIndex) AddBatch

func (ivf *IVFFlatIndex) AddBatch(ids []uint64, vectors [][]float32) error

func (*IVFFlatIndex) AddByLocation

func (ivf *IVFFlatIndex) AddByLocation(batchIdx, rowIdx int) error

func (*IVFFlatIndex) Build

func (ivf *IVFFlatIndex) Build() error

func (*IVFFlatIndex) Close

func (ivf *IVFFlatIndex) Close() error

func (*IVFFlatIndex) Dimension

func (ivf *IVFFlatIndex) Dimension() int

func (*IVFFlatIndex) Len

func (ivf *IVFFlatIndex) Len() int

func (*IVFFlatIndex) Load

func (ivf *IVFFlatIndex) Load(path string) error

func (*IVFFlatIndex) NeedsBuild

func (ivf *IVFFlatIndex) NeedsBuild() bool

func (*IVFFlatIndex) Save

func (ivf *IVFFlatIndex) Save(path string) error

func (*IVFFlatIndex) Search

func (ivf *IVFFlatIndex) Search(query []float32, k int) ([]IndexSearchResult, error)

func (*IVFFlatIndex) SearchBatch

func (ivf *IVFFlatIndex) SearchBatch(queries [][]float32, k int) ([][]IndexSearchResult, error)

func (*IVFFlatIndex) SearchVectors

func (ivf *IVFFlatIndex) SearchVectors(query []float32, k int) []SearchResult

func (*IVFFlatIndex) Size

func (ivf *IVFFlatIndex) Size() int

func (*IVFFlatIndex) Type

func (ivf *IVFFlatIndex) Type() IndexType

type IndexConfig

type IndexConfig struct {
	Type      IndexType
	Dimension int

	// Type-specific configurations
	HNSWConfig    *HNSWIndexConfig
	IVFFlatConfig *IVFFlatConfig
	DiskANNConfig *DiskANNConfig
}

IndexConfig holds configuration for creating an index

type IndexConstructor

type IndexConstructor func(cfg IndexConfig) (PluggableVectorIndex, error)

IndexConstructor is a function that creates a PluggableVectorIndex

type IndexFactory

type IndexFactory struct {
	// contains filtered or unexported fields
}

IndexFactory creates indexes by type using a registry pattern

func NewIndexFactory

func NewIndexFactory() *IndexFactory

NewIndexFactory creates a new factory with default index types registered

func (*IndexFactory) Create

Create creates an index of the specified type

func (*IndexFactory) ListTypes

func (f *IndexFactory) ListTypes() []IndexType

ListTypes returns all registered index types

func (*IndexFactory) Register

func (f *IndexFactory) Register(t IndexType, ctor IndexConstructor)

Register adds a new index type to the factory

type IndexJob

type IndexJob struct {
	DatasetName string
	Record      arrow.RecordBatch
	BatchIdx    int
	RowIdx      int
	CreatedAt   time.Time
}

IndexJob represents a job for the indexing worker

type IndexJobQueue

type IndexJobQueue struct {
	// contains filtered or unexported fields
}

IndexJobQueue provides non-blocking job submission with overflow handling.

func NewIndexJobQueue

func NewIndexJobQueue(cfg IndexJobQueueConfig) *IndexJobQueue

NewIndexJobQueue creates a new non-blocking index job queue.

func (*IndexJobQueue) Jobs

func (q *IndexJobQueue) Jobs() <-chan IndexJob

Jobs returns the channel for consumers to read from.

func (*IndexJobQueue) Len

func (q *IndexJobQueue) Len() int

Len returns approximate queue depth (main + overflow).

func (*IndexJobQueue) Send

func (q *IndexJobQueue) Send(job IndexJob) bool

Send submits a job without blocking. Returns true if accepted, false if dropped.

func (*IndexJobQueue) SendBatch

func (q *IndexJobQueue) SendBatch(jobs []IndexJob) int

SendBatch submits multiple jobs efficiently.

func (*IndexJobQueue) Stats

func (q *IndexJobQueue) Stats() IndexJobQueueStats

Stats returns current queue statistics.

func (*IndexJobQueue) Stop

func (q *IndexJobQueue) Stop()

Stop gracefully stops the queue.

type IndexJobQueueConfig

type IndexJobQueueConfig struct {
	MainChannelSize    int           // Primary channel buffer size
	OverflowBufferSize int           // Secondary overflow buffer size
	DropOnOverflow     bool          // If true, drop jobs when both buffers full
	DrainInterval      time.Duration // How often to drain overflow to main channel
}

IndexJobQueueConfig configures the non-blocking index job queue.

func DefaultIndexJobQueueConfig

func DefaultIndexJobQueueConfig() IndexJobQueueConfig

DefaultIndexJobQueueConfig returns sensible defaults for production.

type IndexJobQueueStats

type IndexJobQueueStats struct {
	TotalSent     uint64 // Total jobs sent
	DirectSent    uint64 // Jobs sent directly to main channel
	OverflowCount uint64 // Jobs sent to overflow buffer
	DrainedCount  uint64 // Jobs drained from overflow to main
	DroppedCount  uint64 // Jobs dropped when both buffers full
}

IndexJobQueueStats tracks queue statistics.

type IndexSearchResult

type IndexSearchResult struct {
	ID       uint64
	Distance float32
}

IndexSearchResult represents a single search result from any index type

type IndexType

type IndexType string

IndexType represents the type of vector index algorithm

const (
	// IndexTypeHNSW is Hierarchical Navigable Small World graph
	IndexTypeHNSW IndexType = "hnsw"
	// IndexTypeIVFFlat is Inverted File with Flat quantization
	IndexTypeIVFFlat IndexType = "ivf_flat"
	// IndexTypeDiskANN is Microsoft DiskANN algorithm
	IndexTypeDiskANN IndexType = "diskann"
)

type InvertedIndex

type InvertedIndex struct {
	// contains filtered or unexported fields
}

InvertedIndex maps string terms to document IDs (row indices) using compressed bitmaps

func NewInvertedIndex

func NewInvertedIndex() *InvertedIndex

NewInvertedIndex creates a new empty inverted index

func (*InvertedIndex) Add

func (idx *InvertedIndex) Add(term string, docID uint32)

Add inserts a term-docID pair into the index

func (*InvertedIndex) AddBatch

func (idx *InvertedIndex) AddBatch(terms []string, docID uint32)

AddBatch adds multiple terms for a single document

func (*InvertedIndex) Delete

func (idx *InvertedIndex) Delete(term string, docID uint32)

Delete removes a term-docID pair from the index

func (*InvertedIndex) Get

func (idx *InvertedIndex) Get(term string) *roaring.Bitmap

Get returns the bitmap for a given term, or nil if not found Returns a CLONE/Copy to be safe for concurrent reading/iteration by caller? Roaring bitmaps are generally not thread-safe for mutation, but read-only is fine IF write lock is held during mutation. But we return it to caller who might use it while writer mutates. So we MUST return a Clone.

func (*InvertedIndex) MemoryUsage

func (idx *InvertedIndex) MemoryUsage() uint64

MemoryUsage estimates memory usage (expensive)

func (*InvertedIndex) Stats

func (idx *InvertedIndex) Stats() (int, uint64)

Stats returns basic stats about the index

type LeastConnectionsStrategy

type LeastConnectionsStrategy struct {
	// contains filtered or unexported fields
}

LeastConnectionsStrategy picks the replica with fewest active connections

func NewLeastConnectionsStrategy

func NewLeastConnectionsStrategy() *LeastConnectionsStrategy

NewLeastConnectionsStrategy creates a new least connections strategy

func (*LeastConnectionsStrategy) DecrementConnections

func (s *LeastConnectionsStrategy) DecrementConnections(replica string)

DecrementConnections removes a connection for a replica

func (*LeastConnectionsStrategy) GetConnectionCount

func (s *LeastConnectionsStrategy) GetConnectionCount(replica string) int64

GetConnectionCount returns current connection count for a replica

func (*LeastConnectionsStrategy) IncrementConnections

func (s *LeastConnectionsStrategy) IncrementConnections(replica string)

IncrementConnections adds a connection for a replica

func (*LeastConnectionsStrategy) Select

func (s *LeastConnectionsStrategy) Select(replicas []string) string

Select picks the replica with lowest connection count

type LoadBalancerConfig

type LoadBalancerConfig struct {
	Strategy            StrategyType
	HealthCheckInterval time.Duration
}

LoadBalancerConfig holds configuration for the load balancer

type LoadBalancerStats

type LoadBalancerStats struct {
	TotalSelections  int64
	FailedSelections int64
	HealthyReplicas  int
	TotalReplicas    int
}

LoadBalancerStats holds statistics about load balancer operations

type LoadBalancerStrategy

type LoadBalancerStrategy interface {
	Select(replicas []string) string
}

LoadBalancerStrategy defines the interface for selection strategies

type Location

type Location struct {
	BatchIdx int
	RowIdx   int
}

Location points to the physical location of a vector in the Arrow records.

type LockFreeHNSW

type LockFreeHNSW struct {
	// contains filtered or unexported fields
}

LockFreeHNSW implements a high-throughput HNSW index avoiding global locks.

func NewLockFreeHNSW

func NewLockFreeHNSW() *LockFreeHNSW

func (*LockFreeHNSW) Add

func (h *LockFreeHNSW) Add(id VectorID, vec []float32)

func (*LockFreeHNSW) Search

func (h *LockFreeHNSW) Search(query []float32, k, ef int) []VectorID

type LockFreeNode

type LockFreeNode struct {
	ID    VectorID
	Vec   []float32
	Level int

	// Friends[level] is a list of neighbor IDs.
	Friends [][]VectorID
	// contains filtered or unexported fields
}

LockFreeNode represents a node in the HNSW graph with concurrent access support.

type MemoryAdvice

type MemoryAdvice int

MemoryAdvice identifies the type of access pattern for a memory region.

const (
	AdviceNormal MemoryAdvice = iota
	AdviceRandom
	AdviceSequential
	AdviceWillNeed
	AdviceDontNeed
	AdviceHugePage
)

type MemoryBackpressureController

type MemoryBackpressureController struct {
	// contains filtered or unexported fields
}

MemoryBackpressureController manages memory backpressure for the store.

func NewMemoryBackpressureController

func NewMemoryBackpressureController(cfg BackpressureConfig) *MemoryBackpressureController

NewMemoryBackpressureController creates a new backpressure controller.

func (*MemoryBackpressureController) Acquire

Acquire blocks until memory pressure allows proceeding.

func (*MemoryBackpressureController) CheckPressure

func (c *MemoryBackpressureController) CheckPressure() PressureLevel

CheckPressure evaluates current memory usage and returns pressure level.

func (*MemoryBackpressureController) GetAcquireCount

func (c *MemoryBackpressureController) GetAcquireCount() uint64

GetAcquireCount returns the total number of successful acquires.

func (*MemoryBackpressureController) GetHardLimit

func (c *MemoryBackpressureController) GetHardLimit() uint64

GetHardLimit returns the configured hard limit.

func (*MemoryBackpressureController) GetPressureLevel

func (c *MemoryBackpressureController) GetPressureLevel() PressureLevel

GetPressureLevel returns the current pressure level without re-checking.

func (*MemoryBackpressureController) GetRejectCount

func (c *MemoryBackpressureController) GetRejectCount() uint64

GetRejectCount returns the total number of rejected acquires.

func (*MemoryBackpressureController) GetSoftLimit

func (c *MemoryBackpressureController) GetSoftLimit() uint64

GetSoftLimit returns the configured soft limit.

func (*MemoryBackpressureController) Release

func (c *MemoryBackpressureController) Release()

Release signals completion of a memory-intensive operation.

func (*MemoryBackpressureController) SetPressureLevel

func (c *MemoryBackpressureController) SetPressureLevel(level PressureLevel)

SetPressureLevel manually sets the pressure level (for testing).

type MerkleNode

type MerkleNode struct {
	Hash     [32]byte
	Children [MerkleFanout]*MerkleNode
}

MerkleNode represents a node in the Merkle Tree

type MerkleTree

type MerkleTree struct {
	// contains filtered or unexported fields
}

MerkleTree manages consistency hashes for a Dataset

func NewMerkleTree

func NewMerkleTree() *MerkleTree

func (*MerkleTree) GetNode

func (t *MerkleTree) GetNode(path []int) ([32]byte, [][32]byte, bool)

func (*MerkleTree) RootHash

func (t *MerkleTree) RootHash() [32]byte

func (*MerkleTree) Update

func (t *MerkleTree) Update(id VectorID, ts int64)

Update updates the hash for a given VectorID with its latest timestamp

type MeshStatusCache

type MeshStatusCache struct {
	// contains filtered or unexported fields
}

MeshStatusCache caches the serialized mesh status to avoid repeated allocations.

func NewMeshStatusCache

func NewMeshStatusCache(ttl time.Duration) *MeshStatusCache

NewMeshStatusCache creates a new mesh status cache with the specified TTL.

func (*MeshStatusCache) Get

func (c *MeshStatusCache) Get(currentMemberCount int) []byte

Get returns the cached JSON if valid, otherwise returns nil.

func (*MeshStatusCache) Invalidate

func (c *MeshStatusCache) Invalidate()

Invalidate clears the cache.

func (*MeshStatusCache) Set

func (c *MeshStatusCache) Set(data []byte, memberCount int)

Set updates the cache with new JSON data.

type MetaServer

type MetaServer struct {
	*VectorStore
	// contains filtered or unexported fields
}

MetaServer handles control plane operations (ListFlights, GetFlightInfo) Embeds VectorStore to inherit base interface, overrides methods for error conversion.

func NewMetaServer

func NewMetaServer(store *VectorStore) *MetaServer

func (*MetaServer) DoAction

func (s *MetaServer) DoAction(action *flight.Action, stream flight.FlightService_DoActionServer) error

DoAction handles management commands on MetaServer

func (*MetaServer) DoExchange

func (s *MetaServer) DoExchange(stream flight.FlightService_DoExchangeServer) error

DoExchange returns Unimplemented on MetaServer

func (*MetaServer) DoGet

DoGet returns Unimplemented on MetaServer

func (*MetaServer) DoPut

DoPut returns Unimplemented on MetaServer

func (*MetaServer) GetFlightInfo

func (s *MetaServer) GetFlightInfo(ctx context.Context, desc *flight.FlightDescriptor) (*flight.FlightInfo, error)

GetFlightInfo returns dataset metadata, converting domain errors to gRPC status.

func (*MetaServer) ListFlights

ListFlights returns available datasets, converting domain errors to gRPC status.

type NUMAAllocator

type NUMAAllocator struct {
	// contains filtered or unexported fields
}

NUMAAllocator is a NUMA-aware memory allocator for Arrow buffers. On Linux with NUMA, it attempts to allocate memory on the specified node. On other platforms, it falls back to the default Go allocator.

func NewNUMAAllocator

func NewNUMAAllocator(topo *NUMATopology, nodeID int) *NUMAAllocator

NewNUMAAllocator creates a new NUMA-aware allocator for the specified node.

func (*NUMAAllocator) Allocate

func (a *NUMAAllocator) Allocate(size int) []byte

Allocate allocates memory, preferably on the NUMA node.

func (*NUMAAllocator) Free

func (a *NUMAAllocator) Free(b []byte)

Free frees memory (handled by Go GC).

func (*NUMAAllocator) Reallocate

func (a *NUMAAllocator) Reallocate(size int, b []byte) []byte

Reallocate reallocates memory.

type NUMATopology

type NUMATopology struct {
	NumNodes int     // Number of NUMA nodes
	CPUs     [][]int // CPUs[nodeID] = list of CPU IDs on that node
}

NUMATopology represents the NUMA topology of the system.

func DetectNUMATopology

func DetectNUMATopology() (*NUMATopology, error)

DetectNUMATopology detects the NUMA topology on Linux systems. Returns a single-node topology if NUMA is not available.

func (*NUMATopology) GetNodeForCPU

func (t *NUMATopology) GetNodeForCPU(cpu int) int

GetNodeForCPU returns the node ID for a given CPU, or -1 if not found.

func (*NUMATopology) String

func (t *NUMATopology) String() string

String returns a human-readable representation of the topology.

type Namespace

type Namespace struct {
	Name      string
	CreatedAt time.Time
	Metadata  map[string]string
	// contains filtered or unexported fields
}

Namespace represents a tenant isolation unit containing related datasets. Namespaces provide multi-tenancy support with dataset isolation.

func NewNamespace

func NewNamespace(name string) *Namespace

NewNamespace creates a new namespace with the given name.

func (*Namespace) AddDataset

func (n *Namespace) AddDataset(name string)

AddDataset registers a dataset in this namespace.

func (*Namespace) DatasetCount

func (n *Namespace) DatasetCount() int

DatasetCount returns the number of datasets in this namespace.

func (*Namespace) HasDataset

func (n *Namespace) HasDataset(name string) bool

HasDataset checks if a dataset exists in this namespace.

func (*Namespace) RemoveDataset

func (n *Namespace) RemoveDataset(name string)

RemoveDataset removes a dataset from this namespace.

type NotFoundError

type NotFoundError struct {
	Name string
}

NotFoundError indicates a snapshot was not found

func (*NotFoundError) Error

func (e *NotFoundError) Error() string

type PQConfig

type PQConfig struct {
	M      int // Number of subvectors (typically 8, 16, or 32)
	Ksub   int // Number of centroids per subvector (typically 256 for uint8)
	Dim    int // Total vector dimensions
	SubDim int // Dimensions per subvector (Dim / M)
}

PQConfig holds the configuration for product quantization.

func DefaultPQConfig

func DefaultPQConfig(dim int) *PQConfig

DefaultPQConfig returns a default PQ configuration for given dimensions.

func (*PQConfig) Validate

func (c *PQConfig) Validate() error

Validate checks if the configuration is valid.

type PQEncoder

type PQEncoder struct {
	// contains filtered or unexported fields
}

PQEncoder handles encoding/decoding using product quantization.

func NewPQEncoder

func NewPQEncoder(cfg *PQConfig, codebook [][][]float32) (*PQEncoder, error)

NewPQEncoder creates encoder from pre-trained codebook.

func TrainPQEncoder

func TrainPQEncoder(cfg *PQConfig, vectors [][]float32, iterations int) (*PQEncoder, error)

TrainPQEncoder trains codebook via k-means on sample vectors.

func (*PQEncoder) ADCDistance

func (e *PQEncoder) ADCDistance(table [][]float32, codes []uint8) float32

ADCDistance computes Asymmetric Distance using precomputed table. This is O(M) per vector instead of O(D) - very fast for search.

func (*PQEncoder) CodeSize

func (e *PQEncoder) CodeSize() int

CodeSize returns bytes per encoded vector (M bytes for uint8 codes).

func (*PQEncoder) ComputeDistanceTable

func (e *PQEncoder) ComputeDistanceTable(query []float32) [][]float32

ComputeDistanceTable precomputes distances from query to all centroids. Returns table[m][k] = squared distance from query subvector m to centroid k.

func (*PQEncoder) Decode

func (e *PQEncoder) Decode(codes []uint8) []float32

Decode reconstructs approximate vector from PQ codes.

func (*PQEncoder) Dims

func (e *PQEncoder) Dims() int

Dims returns total vector dimensions.

func (*PQEncoder) EnableSDC

func (e *PQEncoder) EnableSDC()

EnableSDC precomputes the centroid-to-centroid distance table for O(M) search.

func (*PQEncoder) Encode

func (e *PQEncoder) Encode(vec []float32) []uint8

Encode converts float32 vector to PQ codes.

func (*PQEncoder) EncodeInto

func (e *PQEncoder) EncodeInto(vec []float32, codes []uint8)

EncodeInto encodes vector into pre-allocated codes slice.

func (*PQEncoder) GetCodebook

func (e *PQEncoder) GetCodebook() [][][]float32

GetCodebook returns the learned codebook.

func (*PQEncoder) SDCDistance

func (e *PQEncoder) SDCDistance(codes1, codes2 []uint8) float32

SDCDistance computes Symmetric Distance between two PQ codes. Uses precomputed centroid-centroid distances if EnableSDC was called.

func (*PQEncoder) SDCDistancePacked

func (e *PQEncoder) SDCDistancePacked(a, b []float32) float32

SDCDistancePacked computes Symmetric Distance between two packed PQ codes. 'a' and 'b' are float32 slices containing packed uint8 codes (4 codes per float32).

type ParserPoolStats

type ParserPoolStats struct {
	Gets   uint64
	Puts   uint64
	Hits   uint64
	Misses uint64
}

ParserPoolStats tracks pool usage statistics

func GetParserPoolStats

func GetParserPoolStats() ParserPoolStats

GetParserPoolStats returns current pool statistics

type PartitionedRecords

type PartitionedRecords struct {
	// contains filtered or unexported fields
}

PartitionedRecords provides partitioned storage for Arrow RecordBatches. Each partition has its own lock, enabling concurrent access to different partitions.

func NewPartitionedRecords

func NewPartitionedRecords(cfg PartitionedRecordsConfig) *PartitionedRecords

NewPartitionedRecords creates new partitioned records with given config.

func NewPartitionedRecordsDefault

func NewPartitionedRecordsDefault() *PartitionedRecords

NewPartitionedRecordsDefault creates partitioned records with default config.

func (*PartitionedRecords) Append

func (pr *PartitionedRecords) Append(batch arrow.RecordBatch, routingKey uint64)

Append adds a batch to a partition based on the provided routing key. The batch is retained (ref count incremented).

func (*PartitionedRecords) AppendToPartition

func (pr *PartitionedRecords) AppendToPartition(batch arrow.RecordBatch, partIdx int)

AppendToPartition adds a batch directly to a specific partition.

func (*PartitionedRecords) AppendWithKey

func (pr *PartitionedRecords) AppendWithKey(batch arrow.RecordBatch, key uint64) int

AppendWithKey adds a batch using hash-based routing, returns partition index.

func (*PartitionedRecords) Clear

func (pr *PartitionedRecords) Clear()

Clear removes all batches from all partitions.

func (*PartitionedRecords) ForEach

func (pr *PartitionedRecords) ForEach(fn func(batch arrow.RecordBatch, partition int) bool)

ForEach iterates over all batches, calling fn for each. If fn returns false, iteration stops early.

func (*PartitionedRecords) ForEachInPartition

func (pr *PartitionedRecords) ForEachInPartition(partIdx int, fn func(batch arrow.RecordBatch) bool)

ForEachInPartition iterates over batches in a specific partition.

func (*PartitionedRecords) GetAll

func (pr *PartitionedRecords) GetAll() []arrow.RecordBatch

GetAll returns all batches across all partitions. Caller should NOT release returned batches - they are still owned by PartitionedRecords.

func (*PartitionedRecords) GetPartition

func (pr *PartitionedRecords) GetPartition(partIdx int) []arrow.RecordBatch

GetPartition returns all batches from a specific partition.

func (*PartitionedRecords) NumPartitions

func (pr *PartitionedRecords) NumPartitions() int

NumPartitions returns the number of partitions.

func (*PartitionedRecords) PartitionBatches

func (pr *PartitionedRecords) PartitionBatches(partitionIdx int) int

PartitionBatches returns the number of batches in a specific partition.

func (*PartitionedRecords) ReplaceAll

func (pr *PartitionedRecords) ReplaceAll(partIdx int, batches []arrow.RecordBatch)

ReplaceAll atomically replaces all batches in a partition.

func (*PartitionedRecords) Stats

Stats returns current statistics.

func (*PartitionedRecords) TotalBatches

func (pr *PartitionedRecords) TotalBatches() int

TotalBatches returns the total number of batches across all partitions.

type PartitionedRecordsConfig

type PartitionedRecordsConfig struct {
	NumPartitions int // Number of partitions (default: runtime.NumCPU())
}

PartitionedRecordsConfig configures partitioned records storage.

func DefaultPartitionedRecordsConfig

func DefaultPartitionedRecordsConfig() PartitionedRecordsConfig

DefaultPartitionedRecordsConfig returns sensible defaults.

func (PartitionedRecordsConfig) Validate

func (c PartitionedRecordsConfig) Validate() error

Validate checks configuration validity.

type PartitionedRecordsStats

type PartitionedRecordsStats struct {
	TotalBatches   int
	NumPartitions  int
	BatchesPerPart []int
}

PartitionedRecordsStats holds statistics for partitioned records.

type Path

type Path struct {
	Nodes  []string // Sequence of nodes visited
	Edges  []Edge   // Edges traversed
	Weight float32  // Cumulative path weight
}

Path represents a traversal path through the graph

type PeerReplicationResult

type PeerReplicationResult struct {
	PeerID   string
	Success  bool
	Error    error
	Attempts int
	Duration time.Duration
}

PeerReplicationResult holds the result of a single peer replication attempt

type PeerReplicator

type PeerReplicator struct {
	// contains filtered or unexported fields
}

PeerReplicator handles replication to peer nodes

func NewPeerReplicator

func NewPeerReplicator(cfg ReplicatorConfig) *PeerReplicator

NewPeerReplicator creates a new peer replicator

func (*PeerReplicator) AddPeer

func (r *PeerReplicator) AddPeer(id, address string) error

AddPeer adds a peer to the replicator

func (*PeerReplicator) GetCircuitBreaker

func (r *PeerReplicator) GetCircuitBreaker(peerID string) *CircuitBreaker

GetCircuitBreaker returns the circuit breaker for a peer

func (*PeerReplicator) GetPeers

func (r *PeerReplicator) GetPeers() []*ReplicatorPeerInfo

GetPeers returns all registered peers

func (*PeerReplicator) RemovePeer

func (r *PeerReplicator) RemovePeer(id string)

RemovePeer removes a peer from the replicator

func (*PeerReplicator) ReplicateAsync

func (r *PeerReplicator) ReplicateAsync(ctx context.Context, dataset string, record arrow.Record) bool

ReplicateAsync queues a record for async replication

func (*PeerReplicator) ReplicateRecord

func (r *PeerReplicator) ReplicateRecord(ctx context.Context, dataset string, record arrow.Record) []PeerReplicationResult

ReplicateRecord replicates a record to all peers synchronously

func (*PeerReplicator) ReplicateWithQuorum

func (r *PeerReplicator) ReplicateWithQuorum(ctx context.Context, dataset string, record arrow.Record) []PeerReplicationResult

ReplicateWithQuorum replicates to peers and waits for quorum

func (*PeerReplicator) Start

func (r *PeerReplicator) Start() error

Start starts the async replication worker

func (*PeerReplicator) Stats

func (r *PeerReplicator) Stats() ReplicatorStats

Stats returns current statistics

func (*PeerReplicator) Stop

func (r *PeerReplicator) Stop()

Stop stops the async replication worker

type PerPResultPool

type PerPResultPool struct {
	// contains filtered or unexported fields
}

PerPResultPool provides per-processor local pools for search result slices. It eliminates sync.Pool contention by sharding pools across processors.

func NewPerPResultPool

func NewPerPResultPool(config *PerPResultPoolConfig) *PerPResultPool

NewPerPResultPool creates a new per-processor result pool. If config is nil, default configuration is used.

func (*PerPResultPool) Get

func (p *PerPResultPool) Get(k int) []VectorID

Get retrieves a []VectorID slice of the specified length. For common k values, slices are pooled per-shard.

func (*PerPResultPool) NumShards

func (p *PerPResultPool) NumShards() int

NumShards returns the number of shards.

func (*PerPResultPool) PerShardStats

func (p *PerPResultPool) PerShardStats() []PerPShardStats

PerShardStats returns statistics for each shard.

func (*PerPResultPool) Put

func (p *PerPResultPool) Put(slice []VectorID)

Put returns a []VectorID slice to the appropriate pool shard.

func (*PerPResultPool) ResetStats

func (p *PerPResultPool) ResetStats()

ResetStats resets all statistics counters.

func (*PerPResultPool) Stats

Stats returns aggregate statistics across all shards.

type PerPResultPoolConfig

type PerPResultPoolConfig struct {
	NumShards   int  // Number of shards (default: GOMAXPROCS)
	EnableStats bool // Enable statistics tracking
}

PerPResultPoolConfig configures the per-processor result pool.

func DefaultPerPResultPoolConfig

func DefaultPerPResultPoolConfig() PerPResultPoolConfig

DefaultPerPResultPoolConfig returns sensible defaults.

func (PerPResultPoolConfig) Validate

func (c PerPResultPoolConfig) Validate() error

Validate checks configuration validity.

type PerPResultPoolStats

type PerPResultPoolStats struct {
	TotalGets uint64
	TotalPuts uint64
	Hits      uint64
	Misses    uint64
}

PerPResultPoolStats holds aggregate statistics.

type PerPShardStats

type PerPShardStats struct {
	ShardID int
	Gets    uint64
	Puts    uint64
	Hits    uint64
	Misses  uint64
}

PerPShardStats holds per-shard statistics.

type PipelineResult

type PipelineResult struct {
	Record arrow.RecordBatch
	Index  int
}

PipelineResult holds a filtered record with its original index for ordering

type PipelineStage

type PipelineStage struct {
	Record    arrow.RecordBatch
	BatchIdx  int
	Tombstone *Bitset // Legacy field
	Err       error
}

PipelineStage is the legacy result type expected by store.go

type PipelineStats

type PipelineStats struct {
	BatchesProcessed int64
	BatchesFiltered  int64
	ErrorCount       int64
}

PipelineStats tracks pipeline processing statistics

type PluggableVectorIndex

type PluggableVectorIndex interface {
	// Type returns the index type identifier
	Type() IndexType

	// Dimension returns the vector dimension
	Dimension() int

	// Size returns the number of vectors in the index
	Size() int

	// NeedsBuild returns true if index requires explicit Build() call
	NeedsBuild() bool

	// Add adds a single vector to the index
	Add(id uint64, vector []float32) error

	// AddBatch adds multiple vectors to the index
	AddBatch(ids []uint64, vectors [][]float32) error

	// Search finds k nearest neighbors for the query vector
	Search(query []float32, k int) ([]IndexSearchResult, error)

	// SearchBatch performs batch search for multiple queries
	SearchBatch(queries [][]float32, k int) ([][]IndexSearchResult, error)

	// Build builds the index (for algorithms requiring training)
	Build() error

	// Save persists the index to disk
	Save(path string) error

	// Load loads the index from disk
	Load(path string) error

	// Close releases resources
	Close() error

	// Legacy interface compatibility
	AddByLocation(batchIdx, rowIdx int) error
	SearchVectors(query []float32, k int) []SearchResult
	Len() int
}

PluggableVectorIndex is the abstract interface for all vector index implementations

type PooledAllocator

type PooledAllocator struct {
	// contains filtered or unexported fields
}

PooledAllocator implements memory.Allocator with buffer pooling to reduce GC pressure during high-throughput ingestion. It uses size-bucketed sync.Pools for efficient buffer reuse.

func NewPooledAllocator

func NewPooledAllocator() *PooledAllocator

NewPooledAllocator creates a new pooled allocator. The underlying allocator is used for allocations larger than 32MB.

func (*PooledAllocator) Allocate

func (p *PooledAllocator) Allocate(size int) []byte

Allocate returns a buffer of at least the requested size. Buffers may be larger than requested due to bucketing.

func (*PooledAllocator) Free

func (p *PooledAllocator) Free(b []byte)

Free returns a buffer to the appropriate pool.

func (*PooledAllocator) Reallocate

func (p *PooledAllocator) Reallocate(size int, b []byte) []byte

Reallocate resizes a buffer, potentially reusing pooled memory.

func (*PooledAllocator) Stats

Stats returns current allocator statistics.

type PooledAllocatorStats

type PooledAllocatorStats struct {
	AllocatedBytes int64
	ReusedBuffers  int64
}

Stats returns allocator statistics.

type PooledBitmap

type PooledBitmap struct {
	Data     []byte
	Capacity int // in bits
	// contains filtered or unexported fields
}

PooledBitmap is a pooled bitmap buffer

func (*PooledBitmap) GetBit

func (pb *PooledBitmap) GetBit(index int) bool

GetBit gets a bit at the given index

func (*PooledBitmap) Release

func (pb *PooledBitmap) Release()

Release returns the bitmap to the pool

func (*PooledBitmap) Reset

func (pb *PooledBitmap) Reset()

Reset clears the bitmap

func (*PooledBitmap) SetBit

func (pb *PooledBitmap) SetBit(index int)

SetBit sets a bit at the given index

type PooledFlightClient

type PooledFlightClient struct {
	// contains filtered or unexported fields
}

PooledFlightClient wraps a Flight client with pool metadata.

func (*PooledFlightClient) Client

func (c *PooledFlightClient) Client() flight.Client

Client returns the underlying Flight client.

func (*PooledFlightClient) CreatedAt

func (c *PooledFlightClient) CreatedAt() time.Time

CreatedAt returns when the connection was established.

func (*PooledFlightClient) Host

func (c *PooledFlightClient) Host() string

Host returns the peer host address.

func (*PooledFlightClient) IsExpired

func (c *PooledFlightClient) IsExpired(maxLifetime time.Duration) bool

IsExpired checks if the connection exceeds max lifetime.

func (*PooledFlightClient) LastUsed

func (c *PooledFlightClient) LastUsed() time.Time

LastUsed returns when the connection was last used.

type PressureLevel

type PressureLevel int32

PressureLevel indicates the current memory pressure state.

const (
	PressureNone PressureLevel = iota
	PressureSoft
	PressureHard
)

type QuorumCalculator

type QuorumCalculator struct{}

QuorumCalculator handles quorum arithmetic

func NewQuorumCalculator

func NewQuorumCalculator() *QuorumCalculator

NewQuorumCalculator creates a new QuorumCalculator

func (*QuorumCalculator) IsSatisfied

func (qc *QuorumCalculator) IsSatisfied(acks, totalNodes int, level ConsistencyLevel) bool

IsSatisfied returns true if the number of acks meets the consistency requirement

func (*QuorumCalculator) RequiredNodes

func (qc *QuorumCalculator) RequiredNodes(totalNodes int, level ConsistencyLevel) int

RequiredNodes returns the number of nodes required for the given consistency level

type QuorumConfig

type QuorumConfig struct {
	DefaultReadLevel  ConsistencyLevel
	DefaultWriteLevel ConsistencyLevel
	Timeout           time.Duration
}

QuorumConfig holds configuration for quorum operations

type QuorumManager

type QuorumManager struct {
	// contains filtered or unexported fields
}

QuorumManager manages quorum-based read/write operations

func NewQuorumManager

func NewQuorumManager(cfg QuorumConfig) *QuorumManager

NewQuorumManager creates a new QuorumManager

func (*QuorumManager) ExecuteRead

func (qm *QuorumManager) ExecuteRead(
	ctx context.Context,
	peers []string,
	level ConsistencyLevel,
	readFn ReadFn,
) QuorumResult

ExecuteRead executes a read operation across peers with quorum semantics

func (*QuorumManager) ExecuteWrite

func (qm *QuorumManager) ExecuteWrite(
	ctx context.Context,
	peers []string,
	level ConsistencyLevel,
	writeFn WriteFn,
) QuorumResult

ExecuteWrite executes a write operation across peers with quorum semantics

func (*QuorumManager) GetDefaultReadLevel

func (qm *QuorumManager) GetDefaultReadLevel() ConsistencyLevel

GetDefaultReadLevel returns the default read consistency level

func (*QuorumManager) GetDefaultWriteLevel

func (qm *QuorumManager) GetDefaultWriteLevel() ConsistencyLevel

GetDefaultWriteLevel returns the default write consistency level

type QuorumResult

type QuorumResult struct {
	SuccessCount int
	FailureCount int
	QuorumMet    bool
	Err          error
	Responses    map[string][]byte // For read operations
}

QuorumResult holds the result of a quorum operation

type RankedResult

type RankedResult struct {
	ID       VectorID
	Distance float32
}

RankedResult represents a search result with its distance score. Used for reranking and batch search operations that return distances.

type ReadFn

type ReadFn func(ctx context.Context, peer string) ([]byte, error)

ReadFn is the function signature for read operations

type RecordEvictionManager

type RecordEvictionManager struct {
	// contains filtered or unexported fields
}

RecordEvictionManager tracks eviction metadata for individual records Uses unsafe.Pointer as keys for zero-copy pointer-identity lookups

func NewRecordEvictionManager

func NewRecordEvictionManager() *RecordEvictionManager

NewRecordEvictionManager creates a new per-record eviction manager

func (*RecordEvictionManager) Count

func (m *RecordEvictionManager) Count() int

Count returns the number of tracked records

func (*RecordEvictionManager) EvictExpired

func (m *RecordEvictionManager) EvictExpired() []uintptr

EvictExpired removes all expired records and returns their pointers

func (*RecordEvictionManager) Get

Get retrieves metadata for a record by pointer identity

func (*RecordEvictionManager) GetEvictionCount

func (m *RecordEvictionManager) GetEvictionCount() int64

GetEvictionCount returns total number of records evicted

func (*RecordEvictionManager) Register

func (m *RecordEvictionManager) Register(rec arrow.RecordBatch, ttl time.Duration)

Register adds a record to the eviction manager with optional TTL

func (*RecordEvictionManager) SelectLFUVictims

func (m *RecordEvictionManager) SelectLFUVictims(count int) []uintptr

SelectLFUVictims returns pointers to the N least frequently accessed records

func (*RecordEvictionManager) SelectLRUVictims

func (m *RecordEvictionManager) SelectLRUVictims(count int) []uintptr

SelectLRUVictims returns pointers to the N least recently accessed records

func (*RecordEvictionManager) Unregister

func (m *RecordEvictionManager) Unregister(rec arrow.RecordBatch)

Unregister removes a record from the eviction manager

func (*RecordEvictionManager) UpdateRecordMetadataGauge

func (m *RecordEvictionManager) UpdateRecordMetadataGauge()

UpdateRecordMetadataGauge updates the gauge with current tracked record count

type RecordMetadata

type RecordMetadata struct {
	CreatedAt   int64         // Unix nanos - immutable after creation
	LastAccess  atomic.Int64  // Unix nanos - updated atomically on access
	AccessCount atomic.Int64  // Frequency counter - incremented atomically
	TTL         time.Duration // 0 = no expiration
}

RecordMetadata tracks per-record eviction information using atomics for zero-lock reads

func NewRecordMetadata

func NewRecordMetadata(ttl time.Duration) *RecordMetadata

NewRecordMetadata creates metadata for a record with optional TTL

func (*RecordMetadata) GetAccessCount

func (m *RecordMetadata) GetAccessCount() int64

GetAccessCount returns the total access count

func (*RecordMetadata) GetLastAccess

func (m *RecordMetadata) GetLastAccess() time.Time

GetLastAccess returns the last access time

func (*RecordMetadata) IsExpired

func (m *RecordMetadata) IsExpired() bool

IsExpired returns true if TTL has elapsed since creation

func (*RecordMetadata) RecordAccess

func (m *RecordMetadata) RecordAccess()

RecordAccess updates last access time and increments access count atomically

type RecordSizeCache

type RecordSizeCache struct {
	// contains filtered or unexported fields
}

RecordSizeCache caches the size of Arrow record batches to avoid repeated calculations

func NewRecordSizeCache

func NewRecordSizeCache() *RecordSizeCache

NewRecordSizeCache creates a new cache

func (*RecordSizeCache) Clear

func (c *RecordSizeCache) Clear()

Clear empties the cache

func (*RecordSizeCache) GetOrCompute

func (c *RecordSizeCache) GetOrCompute(rec arrow.RecordBatch) int64

GetOrCompute returns the cached size or computes it

func (*RecordSizeCache) Hits

func (c *RecordSizeCache) Hits() int64

Hits returns hit count

func (*RecordSizeCache) Invalidate

func (c *RecordSizeCache) Invalidate(rec arrow.RecordBatch)

Invalidate removes a record from cache

func (*RecordSizeCache) Len

func (c *RecordSizeCache) Len() int

Len returns cache size

func (*RecordSizeCache) Misses

func (c *RecordSizeCache) Misses() int64

Misses returns miss count

type RecordWriterPoolConfig

type RecordWriterPoolConfig struct {
	// InitialBufferSize is the starting capacity for pooled buffers (default: 64KB)
	InitialBufferSize int
	// MaxBufferSize is the maximum buffer size to keep in pool (default: 4MB)
	// Buffers larger than this are discarded to prevent memory bloat
	MaxBufferSize int
	// UseLZ4 enables LZ4 compression for IPC messages
	UseLZ4 bool
}

RecordWriterPoolConfig configures the IPC buffer pool.

func DefaultRecordWriterPoolConfig

func DefaultRecordWriterPoolConfig() RecordWriterPoolConfig

DefaultRecordWriterPoolConfig returns sensible defaults for high-throughput DoGet.

func (RecordWriterPoolConfig) Validate

func (c RecordWriterPoolConfig) Validate() error

Validate checks configuration validity.

type ReplicaInfo

type ReplicaInfo struct {
	ID      string
	Address string
	Healthy bool
}

ReplicaInfo holds information about a replica

type ReplicaLoadBalancer

type ReplicaLoadBalancer struct {
	// contains filtered or unexported fields
}

ReplicaLoadBalancer distributes read requests across replicas

func NewReplicaLoadBalancer

func NewReplicaLoadBalancer(config LoadBalancerConfig) *ReplicaLoadBalancer

NewReplicaLoadBalancer creates a new load balancer

func (*ReplicaLoadBalancer) AddReplica

func (lb *ReplicaLoadBalancer) AddReplica(id, address string)

AddReplica adds a replica to the load balancer

func (*ReplicaLoadBalancer) GetHealthyReplicas

func (lb *ReplicaLoadBalancer) GetHealthyReplicas() []string

GetHealthyReplicas returns all healthy replica IDs

func (*ReplicaLoadBalancer) GetReplicas

func (lb *ReplicaLoadBalancer) GetReplicas() []string

GetReplicas returns all replica IDs

func (*ReplicaLoadBalancer) GetStats

func (lb *ReplicaLoadBalancer) GetStats() LoadBalancerStats

GetStats returns load balancer statistics

func (*ReplicaLoadBalancer) MarkHealthy

func (lb *ReplicaLoadBalancer) MarkHealthy(id string)

MarkHealthy marks a replica as healthy

func (*ReplicaLoadBalancer) MarkUnhealthy

func (lb *ReplicaLoadBalancer) MarkUnhealthy(id string)

MarkUnhealthy marks a replica as unhealthy

func (*ReplicaLoadBalancer) RemoveReplica

func (lb *ReplicaLoadBalancer) RemoveReplica(id string)

RemoveReplica removes a replica from the load balancer

func (*ReplicaLoadBalancer) SelectReplica

func (lb *ReplicaLoadBalancer) SelectReplica(ctx context.Context) (string, error)

SelectReplica selects a healthy replica using the configured strategy

type ReplicationError

type ReplicationError struct {
	Op        string    // Operation: "sync", "replicate", "connect"
	PeerAddr  string    // Peer address
	Dataset   string    // Dataset being replicated
	Cause     error     // Underlying error
	Timestamp time.Time // When the error occurred
}

ReplicationError provides rich context for replication operations.

func (*ReplicationError) Error

func (e *ReplicationError) Error() string

func (*ReplicationError) Unwrap

func (e *ReplicationError) Unwrap() error

type ReplicationTask

type ReplicationTask struct {
	Dataset string
	Record  arrow.Record //nolint:staticcheck
	Ctx     context.Context
}

ReplicationTask represents an async replication task

type ReplicatorConfig

type ReplicatorConfig struct {
	Timeout              time.Duration
	MaxRetries           int
	RetryBackoff         time.Duration
	AsyncReplication     bool
	AsyncBufferSize      int
	QuorumSize           int
	CircuitBreakerConfig CircuitBreakerConfig
}

ReplicatorConfig holds configuration for peer replication

func DefaultReplicatorConfig

func DefaultReplicatorConfig() ReplicatorConfig

DefaultReplicatorConfig returns sensible defaults

type ReplicatorPeerInfo

type ReplicatorPeerInfo struct {
	ID       string
	Address  string
	Healthy  bool
	LastSeen time.Time
}

ReplicatorPeerInfo holds information about a peer for replicator

type ReplicatorStats

type ReplicatorStats struct {
	SuccessTotal int64
	FailureTotal int64
	RetryTotal   int64
	QueuedTotal  int64
	CircuitOpens int64
}

ReplicatorStats holds operational statistics

type RequestSemaphore

type RequestSemaphore struct {
	// contains filtered or unexported fields
}

RequestSemaphore limits concurrent DoGet/DoPut operations using a weighted semaphore.

func NewRequestSemaphore

func NewRequestSemaphore(cfg RequestSemaphoreConfig) *RequestSemaphore

NewRequestSemaphore creates a new request semaphore with the given configuration.

func (*RequestSemaphore) Acquire

func (rs *RequestSemaphore) Acquire(ctx context.Context) error

Acquire blocks until a slot is available or timeout/context cancellation occurs.

func (*RequestSemaphore) Available

func (rs *RequestSemaphore) Available() int

Available returns the number of available slots.

func (*RequestSemaphore) MaxConcurrent

func (rs *RequestSemaphore) MaxConcurrent() int

MaxConcurrent returns the maximum concurrent requests allowed.

func (*RequestSemaphore) Release

func (rs *RequestSemaphore) Release()

Release releases a slot back to the semaphore.

func (*RequestSemaphore) Stats

func (rs *RequestSemaphore) Stats() SemaphoreStats

Stats returns current statistics about semaphore usage.

func (*RequestSemaphore) TryAcquire

func (rs *RequestSemaphore) TryAcquire() bool

TryAcquire attempts to acquire a slot without blocking. Returns true if successful, false if no slots available.

type RequestSemaphoreConfig

type RequestSemaphoreConfig struct {
	// MaxConcurrent is the maximum number of concurrent requests.
	// Default: runtime.GOMAXPROCS(0) (number of CPU cores)
	MaxConcurrent int

	// AcquireTimeout is how long to wait before timing out on acquire.
	// Default: 30 seconds
	AcquireTimeout time.Duration

	// Enabled controls whether the semaphore is active.
	// When disabled, Acquire always succeeds immediately.
	Enabled bool
}

RequestSemaphoreConfig configures the request semaphore for limiting concurrent DoGet/DoPut operations to prevent thread-thrashing.

func DefaultRequestSemaphoreConfig

func DefaultRequestSemaphoreConfig() RequestSemaphoreConfig

DefaultRequestSemaphoreConfig returns a configuration with sensible defaults.

func (RequestSemaphoreConfig) Validate

func (c RequestSemaphoreConfig) Validate() error

Validate checks if the configuration is valid.

type RoundRobinStrategy

type RoundRobinStrategy struct {
	// contains filtered or unexported fields
}

RoundRobinStrategy cycles through replicas in order

func NewRoundRobinStrategy

func NewRoundRobinStrategy() *RoundRobinStrategy

NewRoundRobinStrategy creates a new round robin strategy

func (*RoundRobinStrategy) Select

func (s *RoundRobinStrategy) Select(replicas []string) string

Select picks the next replica in rotation

type RowPosition

type RowPosition struct {
	RecordIdx int // Index of the record batch
	RowIdx    int // Index of the row within the record
}

RowPosition identifies a row within a dataset's records

type S3Backend

type S3Backend struct {
	// contains filtered or unexported fields
}

S3Backend implements SnapshotBackend for S3-compatible storage

func NewS3Backend

func NewS3Backend(cfg *S3BackendConfig) (*S3Backend, error)

NewS3Backend creates a new S3 backend from configuration

func (*S3Backend) DeleteSnapshot

func (b *S3Backend) DeleteSnapshot(ctx context.Context, name string) error

DeleteSnapshot removes a snapshot from S3

func (*S3Backend) GetHTTPClient

func (b *S3Backend) GetHTTPClient() *http.Client

GetHTTPClient returns the HTTP client used by this S3 backend

func (*S3Backend) GetHTTPTransport

func (b *S3Backend) GetHTTPTransport() *http.Transport

GetHTTPTransport returns the underlying HTTP transport used for connection pooling

func (*S3Backend) ListSnapshots

func (b *S3Backend) ListSnapshots(ctx context.Context) ([]string, error)

ListSnapshots returns all collection names that have snapshots

func (*S3Backend) ReadSnapshot

func (b *S3Backend) ReadSnapshot(ctx context.Context, name string) (io.ReadCloser, error)

ReadSnapshot downloads snapshot data from S3

func (*S3Backend) WriteSnapshot

func (b *S3Backend) WriteSnapshot(ctx context.Context, name string, data []byte) error

WriteSnapshot uploads snapshot data to S3

type S3BackendConfig

type S3BackendConfig struct {
	Endpoint        string // S3-compatible endpoint URL (e.g., "http://localhost:9000" for MinIO)
	Bucket          string // Bucket name
	Prefix          string // Optional key prefix for all snapshots
	AccessKeyID     string // AWS access key
	SecretAccessKey string // AWS secret key
	Region          string // AWS region (default: us-east-1)
	UsePathStyle    bool   // Use path-style addressing (required for MinIO)

	// Connection pool settings
	MaxIdleConns        int           // Maximum idle connections (0 = use default)
	MaxIdleConnsPerHost int           // Maximum idle connections per host (0 = use default)
	IdleConnTimeout     time.Duration // Idle connection timeout (0 = use default)
}

S3BackendConfig holds configuration for the S3 backend

func (*S3BackendConfig) Validate

func (c *S3BackendConfig) Validate() error

Validate checks the configuration for required fields

type S3Error

type S3Error struct {
	Op        string    // Operation: "upload", "download", "list", "delete"
	Bucket    string    // S3 bucket name
	Key       string    // S3 object key
	Cause     error     // Underlying error
	Timestamp time.Time // When the error occurred
}

S3Error provides rich context for S3 backend operations.

func (*S3Error) Error

func (e *S3Error) Error() string

func (*S3Error) Unwrap

func (e *S3Error) Unwrap() error

type SQ8Config

type SQ8Config struct {
	Min     []float32 // Per-dimension minimum values
	Max     []float32 // Per-dimension maximum values
	Trained bool      // Whether min/max have been learned from data
}

SQ8Config holds the configuration for scalar quantization.

func DefaultSQ8Config

func DefaultSQ8Config() *SQ8Config

DefaultSQ8Config returns a default untrained configuration.

func (*SQ8Config) Validate

func (c *SQ8Config) Validate() error

Validate checks if the configuration is valid.

type SQ8Encoder

type SQ8Encoder struct {
	// contains filtered or unexported fields
}

SQ8Encoder handles encoding and decoding of vectors using scalar quantization.

func NewSQ8Encoder

func NewSQ8Encoder(cfg *SQ8Config) (*SQ8Encoder, error)

NewSQ8Encoder creates a new encoder from a trained configuration.

func TrainSQ8Encoder

func TrainSQ8Encoder(vectors [][]float32) (*SQ8Encoder, error)

TrainSQ8Encoder creates an encoder by learning min/max from sample vectors.

func (*SQ8Encoder) Decode

func (e *SQ8Encoder) Decode(quantized []uint8) []float32

Decode converts a uint8 vector back to float32.

func (*SQ8Encoder) DecodeInto

func (e *SQ8Encoder) DecodeInto(quantized []uint8, dst []float32)

DecodeInto decodes a quantized vector into a pre-allocated destination slice.

func (*SQ8Encoder) Dims

func (e *SQ8Encoder) Dims() int

Dims returns the number of dimensions this encoder handles.

func (*SQ8Encoder) Encode

func (e *SQ8Encoder) Encode(vec []float32) []uint8

Encode converts a float32 vector to uint8 with clamping.

func (*SQ8Encoder) EncodeInto

func (e *SQ8Encoder) EncodeInto(vec []float32, dst []uint8)

EncodeInto encodes a vector into a pre-allocated destination slice. This is the zero-allocation hot path for bulk encoding.

func (*SQ8Encoder) GetBounds

func (e *SQ8Encoder) GetBounds() (minVals, maxVals []float32)

GetBounds returns the learned min and max values.

type SchemaCompatibility

type SchemaCompatibility int

SchemaCompatibility represents the result of schema comparison

const (
	// SchemaExactMatch indicates schemas are identical
	SchemaExactMatch SchemaCompatibility = iota
	// SchemaEvolution indicates new schema is a compatible superset
	SchemaEvolution
	// SchemaIncompatible indicates schemas are not compatible
	SchemaIncompatible
)

func CheckSchemaCompatibility

func CheckSchemaCompatibility(existing, incoming *arrow.Schema) SchemaCompatibility

CheckSchemaCompatibility compares existing and incoming schemas. Returns SchemaExactMatch if identical, SchemaEvolution if incoming is a compatible superset (prefix matches), or SchemaIncompatible otherwise.

type SchemaEvolutionManager

type SchemaEvolutionManager struct {
	// contains filtered or unexported fields
}

SchemaEvolutionManager handles dynamic schema changes without dataset locks

func NewSchemaEvolutionManager

func NewSchemaEvolutionManager(initialSchema *arrow.Schema, datasetName string) *SchemaEvolutionManager

NewSchemaEvolutionManager creates a new manager with initial schema

func (*SchemaEvolutionManager) AddColumn

func (m *SchemaEvolutionManager) AddColumn(name string, dtype arrow.DataType) error

AddColumn adds a new column without requiring dataset lock or rewrite

func (*SchemaEvolutionManager) DropColumn

func (m *SchemaEvolutionManager) DropColumn(name string) error

DropColumn marks a column as dropped without rewriting data

func (*SchemaEvolutionManager) GetColumnCount

func (m *SchemaEvolutionManager) GetColumnCount() int

GetColumnCount returns the number of active (non-dropped) columns

func (*SchemaEvolutionManager) GetCurrentSchema

func (m *SchemaEvolutionManager) GetCurrentSchema() *arrow.Schema

GetCurrentSchema returns the current schema (excluding dropped columns)

func (*SchemaEvolutionManager) GetCurrentVersion

func (m *SchemaEvolutionManager) GetCurrentVersion() uint64

GetCurrentVersion returns the current schema version

func (*SchemaEvolutionManager) GetSchemaAtVersion

func (m *SchemaEvolutionManager) GetSchemaAtVersion(version uint64) *arrow.Schema

GetSchemaAtVersion returns the schema at a specific version

func (*SchemaEvolutionManager) GetVersionCount

func (m *SchemaEvolutionManager) GetVersionCount() int

GetVersionCount returns the number of schema versions

func (*SchemaEvolutionManager) IsColumnAvailable

func (m *SchemaEvolutionManager) IsColumnAvailable(name string, version uint64) bool

IsColumnAvailable checks if a column is available at a specific version

func (*SchemaEvolutionManager) IsColumnDropped

func (m *SchemaEvolutionManager) IsColumnDropped(name string) bool

IsColumnDropped checks if a column has been dropped

type SchemaVersion

type SchemaVersion struct {
	Version   uint64
	Fields    []arrow.Field
	CreatedAt time.Time
}

SchemaVersion tracks a specific schema state

type SearchArena

type SearchArena struct {
	// contains filtered or unexported fields
}

SearchArena is a per-request arena allocator that provides O(1) allocations with zero GC pressure. It uses a simple bump allocator strategy where allocations advance an offset through a pre-allocated buffer. The arena should be Reset() after each search request to reuse memory.

func GetArena

func GetArena() *SearchArena

GetArena retrieves a SearchArena from the global pool. The arena is reset and ready for use. Caller must call PutArena when done to return it to the pool.

func NewSearchArena

func NewSearchArena(capacity int) *SearchArena

NewSearchArena creates a new arena with the specified capacity in bytes. The entire buffer is pre-allocated to avoid runtime allocations.

func (*SearchArena) Alloc

func (a *SearchArena) Alloc(size int) []byte

Alloc allocates size bytes from the arena and returns a slice. Returns nil if the allocation would exceed capacity. This is O(1) - just a pointer bump with no GC involvement.

func (*SearchArena) AllocFloat32Slice

func (a *SearchArena) AllocFloat32Slice(count int) []float32

AllocFloat32Slice allocates a slice of float32 values from the arena. Returns nil if the allocation would exceed capacity. This is useful for allocating distance/score arrays in search operations.

func (*SearchArena) AllocVectorIDSlice

func (a *SearchArena) AllocVectorIDSlice(count int) []VectorID

AllocVectorIDSlice allocates a slice of VectorID values from the arena. Returns nil if the allocation would exceed capacity. This is useful for allocating search result arrays without GC pressure.

func (*SearchArena) Cap

func (a *SearchArena) Cap() int

Cap returns the total capacity of the arena in bytes.

func (*SearchArena) Offset

func (a *SearchArena) Offset() int

Offset returns the current allocation offset (bytes used).

func (*SearchArena) Remaining

func (a *SearchArena) Remaining() int

Remaining returns the number of bytes still available for allocation.

func (*SearchArena) Reset

func (a *SearchArena) Reset()

Reset resets the arena for reuse without deallocating the underlying buffer. Call this after each search request to recycle memory.

type SearchResult

type SearchResult struct {
	ID    VectorID
	Score float32
}

func CombineHybridResults

func CombineHybridResults(vectorResults, bm25Results []SearchResult, alpha float32, k int) []SearchResult

CombineHybridResults combines vector and BM25 results using alpha weighting vector scores are distances (lower is better), BM25 scores are relevance (higher is better)

func CombineHybridResultsRRF

func CombineHybridResultsRRF(vectorResults, bm25Results []SearchResult, rrfK, limit int) []SearchResult

CombineHybridResultsRRF combines results using Reciprocal Rank Fusion (legacy alignment)

func FuseCascade

func FuseCascade(exact map[VectorID]struct{}, keyword, vector []SearchResult, limit int) []SearchResult

FuseCascade implements cascade-style filtering: exact -> keyword -> vector

func FuseLinear

func FuseLinear(dense, sparse []SearchResult, alpha float32, limit int) []SearchResult

FuseLinear combines results using linear weighted combination

func FuseRRF

func FuseRRF(dense, sparse []SearchResult, k, limit int) []SearchResult

FuseRRF is an alias for ReciprocalRankFusion (legacy alignment)

func HybridSearch

func HybridSearch(ctx context.Context, s *VectorStore, name string, query []float32, k int, filters map[string]string) ([]SearchResult, error)

HybridSearch performs a filtered vector search using inverted indexes for pre-filtering.

func ReciprocalRankFusion

func ReciprocalRankFusion(dense, sparse []SearchResult, k, limit int) []SearchResult

ReciprocalRankFusion combines results from multiple search systems using their ranks. Formula: score = sum(1 / (k + rank)) Reference: https://dl.acm.org/doi/10.1145/1571941.1572114

func SearchHybrid

func SearchHybrid(ctx context.Context, s *VectorStore, name string, query []float32, textQuery string, k int, alpha float32, rrfK int) ([]SearchResult, error)

SearchHybrid performs a hybrid search combining dense vector search and sparse keyword search.

type SemaphoreStats

type SemaphoreStats struct {
	TotalAcquires int64
	TotalReleases int64
	TotalTimeouts int64
	CurrentActive int64
	MaxConcurrent int
}

SemaphoreStats holds statistics about semaphore usage.

type ShardStat

type ShardStat struct {
	ShardID int
	Count   int
}

ShardStat holds statistics for a single shard.

type ShardStats

type ShardStats struct {
	ShardID       int
	JobsSent      int64
	QueueLength   int
	QueueCapacity int
}

ShardStats holds per-shard statistics

type ShardedDataset

type ShardedDataset struct {
	// contains filtered or unexported fields
}

ShardedDataset provides a dataset with sharded record storage. Enables concurrent access to different shards without lock contention.

func NewShardedDataset

func NewShardedDataset(name string, cfg ShardedDatasetConfig) *ShardedDataset

NewShardedDataset creates a new sharded dataset with given config.

func NewShardedDatasetDefault

func NewShardedDatasetDefault(name string) *ShardedDataset

NewShardedDatasetDefault creates a sharded dataset with default config.

func (*ShardedDataset) Append

func (sd *ShardedDataset) Append(batch arrow.RecordBatch, routingKey uint64)

Append adds a record batch using hash-based routing.

func (*ShardedDataset) AppendToShard

func (sd *ShardedDataset) AppendToShard(batch arrow.RecordBatch, shardIdx int)

AppendToShard adds a record batch directly to a specific shard.

func (*ShardedDataset) Clear

func (sd *ShardedDataset) Clear()

Clear removes all records from all shards.

func (*ShardedDataset) ForEach

func (sd *ShardedDataset) ForEach(fn func(batch arrow.RecordBatch, shard int) bool)

ForEach iterates over all record batches. If fn returns false, iteration stops early.

func (*ShardedDataset) ForEachInShard

func (sd *ShardedDataset) ForEachInShard(shardIdx int, fn func(batch arrow.RecordBatch) bool)

ForEachInShard iterates over records in a specific shard.

func (*ShardedDataset) GetAllRecords

func (sd *ShardedDataset) GetAllRecords() []arrow.RecordBatch

GetAllRecords returns all record batches across all shards. Order is not guaranteed to be consistent.

func (*ShardedDataset) GetShardRecords

func (sd *ShardedDataset) GetShardRecords(shardIdx int) []arrow.RecordBatch

GetShardRecords returns all records from a specific shard.

func (*ShardedDataset) IncrementVersion

func (sd *ShardedDataset) IncrementVersion() int64

IncrementVersion atomically increments and returns the new version.

func (*ShardedDataset) Index

func (sd *ShardedDataset) Index() *HNSWIndex

Index returns the HNSW index (may be nil).

func (*ShardedDataset) LastAccess

func (sd *ShardedDataset) LastAccess() time.Time

LastAccess returns the last access time.

func (*ShardedDataset) Name

func (sd *ShardedDataset) Name() string

Name returns the dataset name.

func (*ShardedDataset) NumShards

func (sd *ShardedDataset) NumShards() int

NumShards returns the number of shards.

func (*ShardedDataset) ReplaceShardRecords

func (sd *ShardedDataset) ReplaceShardRecords(shardIdx int, batches []arrow.RecordBatch)

ReplaceShardRecords atomically replaces all records in a shard.

func (*ShardedDataset) SetIndex

func (sd *ShardedDataset) SetIndex(idx *HNSWIndex)

SetIndex sets the HNSW index.

func (*ShardedDataset) SetLastAccess

func (sd *ShardedDataset) SetLastAccess(t time.Time)

SetLastAccess sets the last access time.

func (*ShardedDataset) ShardRecordCount

func (sd *ShardedDataset) ShardRecordCount(shardIdx int) int

ShardRecordCount returns the number of records in a specific shard.

func (*ShardedDataset) Stats

func (sd *ShardedDataset) Stats() ShardedDatasetStats

Stats returns current statistics.

func (*ShardedDataset) ToLegacyRecords

func (sd *ShardedDataset) ToLegacyRecords() []arrow.RecordBatch

ToLegacyRecords returns all records as a flat slice for backward compatibility.

func (*ShardedDataset) TotalRecords

func (sd *ShardedDataset) TotalRecords() int

TotalRecords returns total number of record batches across all shards.

func (*ShardedDataset) Version

func (sd *ShardedDataset) Version() int64

Version returns the current version.

type ShardedDatasetConfig

type ShardedDatasetConfig struct {
	NumShards int // Number of shards (default: runtime.NumCPU())
}

ShardedDatasetConfig configures a sharded dataset.

func DefaultShardedDatasetConfig

func DefaultShardedDatasetConfig() ShardedDatasetConfig

DefaultShardedDatasetConfig returns sensible defaults.

func (ShardedDatasetConfig) Validate

func (c ShardedDatasetConfig) Validate() error

Validate checks configuration validity.

type ShardedDatasetStats

type ShardedDatasetStats struct {
	TotalRecords    int
	NumShards       int
	RecordsPerShard []int
}

ShardedDatasetStats holds statistics for a sharded dataset.

type ShardedHNSW

type ShardedHNSW struct {
	// contains filtered or unexported fields
}

ShardedHNSW provides fine-grained locking via multiple independent HNSW shards.

func NewShardedHNSW

func NewShardedHNSW(config ShardedHNSWConfig, dataset *Dataset) *ShardedHNSW

NewShardedHNSW creates a new sharded HNSW index.

func (*ShardedHNSW) AddByLocation

func (s *ShardedHNSW) AddByLocation(batchIdx, rowIdx int) (uint32, error)

AddByLocation implements VectorIndex.

func (*ShardedHNSW) AddByRecord

func (s *ShardedHNSW) AddByRecord(rec arrow.RecordBatch, rowIdx, batchIdx int) (uint32, error)

AddByRecord implements VectorIndex.

func (*ShardedHNSW) AddSafe

func (s *ShardedHNSW) AddSafe(rec arrow.RecordBatch, rowIdx, batchIdx int) (VectorID, error)

AddSafe is an alias for AddByRecord for consistency with HNSWIndex.

func (*ShardedHNSW) Close

func (s *ShardedHNSW) Close() error

Close implements VectorIndex.

func (*ShardedHNSW) GetDimension

func (s *ShardedHNSW) GetDimension() uint32

GetDimension implements VectorIndex.

func (*ShardedHNSW) GetLocation

func (s *ShardedHNSW) GetLocation(id VectorID) (Location, bool)

GetLocation returns the storage location for a given VectorID

func (*ShardedHNSW) GetShardForID

func (s *ShardedHNSW) GetShardForID(id VectorID) int

GetShardForID returns the shard index for a given VectorID.

func (*ShardedHNSW) Len

func (s *ShardedHNSW) Len() int

Len implements VectorIndex.

func (*ShardedHNSW) SearchByID

func (s *ShardedHNSW) SearchByID(id VectorID, k int) []VectorID

SearchByID searches for vectors similar to the vector at the given ID.

func (*ShardedHNSW) SearchVectors

func (s *ShardedHNSW) SearchVectors(query []float32, k int, filters []Filter) []SearchResult

SearchVectors implements VectorIndex.

func (*ShardedHNSW) SearchVectorsWithBitmap

func (s *ShardedHNSW) SearchVectorsWithBitmap(query []float32, k int, filter *Bitset) []SearchResult

SearchVectorsWithBitmap implements VectorIndex.

func (*ShardedHNSW) SetIndexedColumns

func (s *ShardedHNSW) SetIndexedColumns(cols []string)

SetIndexedColumns satisfies VectorIndex interface

func (*ShardedHNSW) ShardStats

func (s *ShardedHNSW) ShardStats() []ShardStat

Stats returns multi-index statistics for all shards.

func (*ShardedHNSW) Warmup

func (idx *ShardedHNSW) Warmup() int

Warmup warms up all shards.

type ShardedHNSWConfig

type ShardedHNSWConfig struct {
	NumShards      int          // Number of independent HNSW shards
	M              int          // HNSW M parameter
	EfConstruction int          // HNSW efConstruction parameter
	Metric         VectorMetric // Distance metric for this index
}

ShardedHNSWConfig configures the sharded HNSW index.

func DefaultShardedHNSWConfig

func DefaultShardedHNSWConfig() ShardedHNSWConfig

DefaultShardedHNSWConfig returns sensible defaults.

func (ShardedHNSWConfig) Validate

func (c ShardedHNSWConfig) Validate() error

type ShardedIndexChannel

type ShardedIndexChannel struct {
	// contains filtered or unexported fields
}

ShardedIndexChannel provides hash-based routing of IndexJobs to multiple channels, preventing noisy neighbor issues where one dataset's high write throughput blocks indexing for others.

func NewShardedIndexChannel

func NewShardedIndexChannel(numShards, bufferSize int) *ShardedIndexChannel

NewShardedIndexChannel creates a new sharded index channel with the specified number of shards and buffer size per shard.

func NewShardedIndexChannelDefault

func NewShardedIndexChannelDefault(bufferSize int) *ShardedIndexChannel

NewShardedIndexChannelDefault creates a sharded index channel with one shard per CPU core (recommended default).

func (*ShardedIndexChannel) Close

func (sic *ShardedIndexChannel) Close()

Close closes all shard channels. Safe to call multiple times.

func (*ShardedIndexChannel) GetShardChannel

func (sic *ShardedIndexChannel) GetShardChannel(shardID int) chan IndexJob

GetShardChannel returns the channel for a specific shard ID. Used by workers to consume jobs from their assigned shard.

func (*ShardedIndexChannel) GetShardForDataset

func (sic *ShardedIndexChannel) GetShardForDataset(datasetName string) int

GetShardForDataset returns the shard ID for a given dataset name. Uses FNV-1a hash for consistent, fast routing.

func (*ShardedIndexChannel) NumShards

func (sic *ShardedIndexChannel) NumShards() int

NumShards returns the number of shards.

func (*ShardedIndexChannel) Send

func (sic *ShardedIndexChannel) Send(job IndexJob) bool

Send routes an IndexJob to the appropriate shard based on dataset name. Blocks if the target shard's buffer is full. Returns false if the channel is closed.

func (*ShardedIndexChannel) Stats

func (sic *ShardedIndexChannel) Stats() []ShardStats

Stats returns statistics for all shards.

func (*ShardedIndexChannel) TrySend

func (sic *ShardedIndexChannel) TrySend(job IndexJob) bool

TrySend attempts to send an IndexJob without blocking. Returns true if sent successfully, false if buffer full or closed.

type ShardedInvertedIndex

type ShardedInvertedIndex struct {
	// contains filtered or unexported fields
}

ShardedInvertedIndex is a concurrent-safe inverted index using sharding

func NewShardedInvertedIndex

func NewShardedInvertedIndex() *ShardedInvertedIndex

NewShardedInvertedIndex creates a new sharded inverted index

func (*ShardedInvertedIndex) Add

func (idx *ShardedInvertedIndex) Add(id VectorID, text string)

Add indexes a document with the given text

func (*ShardedInvertedIndex) Delete

func (idx *ShardedInvertedIndex) Delete(id VectorID)

Delete removes a document from the index

func (*ShardedInvertedIndex) Search

func (idx *ShardedInvertedIndex) Search(query string, limit int) []SearchResult

Search returns documents matching any of the query terms, scored by BM25-like TF

type ShardedMap

type ShardedMap struct {
	// contains filtered or unexported fields
}

ShardedMap provides concurrent access to datasets via sharding

func NewShardedMap

func NewShardedMap() *ShardedMap

NewShardedMap creates a new sharded map

func (*ShardedMap) Delete

func (sm *ShardedMap) Delete(name string)

Delete removes a dataset (write lock on single shard)

func (*ShardedMap) Get

func (sm *ShardedMap) Get(name string) (*Dataset, bool)

Get retrieves a dataset by name (read lock on single shard)

func (*ShardedMap) GetOrCreate

func (sm *ShardedMap) GetOrCreate(name string, create func() *Dataset) *Dataset

GetOrCreate atomically gets or creates a dataset

func (*ShardedMap) Keys

func (sm *ShardedMap) Keys() []string

Keys returns all dataset names

func (*ShardedMap) Len

func (sm *ShardedMap) Len() int

Len returns total count across all shards (acquires all read locks)

func (*ShardedMap) Range

func (sm *ShardedMap) Range(fn func(name string, ds *Dataset) bool)

Range iterates over all datasets (acquires read locks per shard) The callback should not hold references to the dataset after returning

func (*ShardedMap) RangeWithLock

func (sm *ShardedMap) RangeWithLock(fn func(name string, ds *Dataset, deleteFn func()))

RangeWithLock iterates with write lock (for modifications)

func (*ShardedMap) Set

func (sm *ShardedMap) Set(name string, ds *Dataset)

Set stores a dataset (write lock on single shard)

func (*ShardedMap) WithLock

func (sm *ShardedMap) WithLock(name string, fn func(data map[string]*Dataset))

WithLock executes fn with write lock on the shard containing name

func (*ShardedMap) WithRLock

func (sm *ShardedMap) WithRLock(name string, fn func(data map[string]*Dataset))

WithRLock executes fn with read lock on the shard containing name

type ShardedRWMutex

type ShardedRWMutex struct {
	// contains filtered or unexported fields
}

ShardedRWMutex provides a sharded read-write mutex for reduced contention. Different keys may hash to different shards, allowing concurrent access.

func NewShardedRWMutex

func NewShardedRWMutex(cfg ShardedRWMutexConfig) *ShardedRWMutex

NewShardedRWMutex creates a new sharded mutex with the given configuration.

func NewShardedRWMutexDefault

func NewShardedRWMutexDefault() *ShardedRWMutex

NewShardedRWMutexDefault creates a sharded mutex with default configuration.

func (*ShardedRWMutex) Lock

func (sm *ShardedRWMutex) Lock(key uint64)

Lock acquires a write lock for the shard associated with the given key.

func (*ShardedRWMutex) LockAll

func (sm *ShardedRWMutex) LockAll()

LockAll acquires write locks on all shards (use sparingly).

func (*ShardedRWMutex) LockShard

func (sm *ShardedRWMutex) LockShard(shard int)

LockShard acquires a write lock on a specific shard directly.

func (*ShardedRWMutex) NumShards

func (sm *ShardedRWMutex) NumShards() int

NumShards returns the number of shards.

func (*ShardedRWMutex) RLock

func (sm *ShardedRWMutex) RLock(key uint64)

RLock acquires a read lock for the shard associated with the given key.

func (*ShardedRWMutex) RLockAll

func (sm *ShardedRWMutex) RLockAll()

RLockAll acquires read locks on all shards.

func (*ShardedRWMutex) RLockShard

func (sm *ShardedRWMutex) RLockShard(shard int)

RLockShard acquires a read lock on a specific shard directly.

func (*ShardedRWMutex) RUnlock

func (sm *ShardedRWMutex) RUnlock(key uint64)

RUnlock releases the read lock for the shard associated with the given key.

func (*ShardedRWMutex) RUnlockAll

func (sm *ShardedRWMutex) RUnlockAll()

RUnlockAll releases read locks on all shards.

func (*ShardedRWMutex) RUnlockShard

func (sm *ShardedRWMutex) RUnlockShard(shard int)

RUnlockShard releases a read lock on a specific shard directly.

func (*ShardedRWMutex) ShardFor

func (sm *ShardedRWMutex) ShardFor(key uint64) int

ShardFor returns the shard index for a given key using FNV-1a hash.

func (*ShardedRWMutex) Stats

func (sm *ShardedRWMutex) Stats() ShardedRWMutexStats

Stats returns current statistics.

func (*ShardedRWMutex) Unlock

func (sm *ShardedRWMutex) Unlock(key uint64)

Unlock releases the write lock for the shard associated with the given key.

func (*ShardedRWMutex) UnlockAll

func (sm *ShardedRWMutex) UnlockAll()

UnlockAll releases write locks on all shards.

func (*ShardedRWMutex) UnlockShard

func (sm *ShardedRWMutex) UnlockShard(shard int)

UnlockShard releases a write lock on a specific shard directly.

type ShardedRWMutexConfig

type ShardedRWMutexConfig struct {
	NumShards int // Number of shards (default: runtime.NumCPU())
}

ShardedRWMutexConfig configures the sharded mutex.

func DefaultShardedRWMutexConfig

func DefaultShardedRWMutexConfig() ShardedRWMutexConfig

DefaultShardedRWMutexConfig returns sensible defaults.

func (ShardedRWMutexConfig) Validate

func (c ShardedRWMutexConfig) Validate() error

Validate checks configuration validity.

type ShardedRWMutexStats

type ShardedRWMutexStats struct {
	TotalLocks  int64
	TotalRLocks int64
}

ShardedRWMutexStats holds statistics for the sharded mutex.

type ShutdownError

type ShutdownError struct {
	Phase     string    // Phase: "drain", "flush", "close", "truncate"
	Component string    // Component: "WAL", "index_queue", "connections"
	Cause     error     // Underlying error
	Timestamp time.Time // When the error occurred
}

ShutdownError provides rich context for graceful shutdown operations.

func (*ShutdownError) Error

func (e *ShutdownError) Error() string

func (*ShutdownError) Unwrap

func (e *ShutdownError) Unwrap() error

type SnapshotBackend

type SnapshotBackend interface {
	// WriteSnapshot writes snapshot data for a collection
	WriteSnapshot(ctx context.Context, name string, data []byte) error
	// ReadSnapshot returns a reader for snapshot data
	ReadSnapshot(ctx context.Context, name string) (io.ReadCloser, error)
	// ListSnapshots returns all collection names with snapshots
	ListSnapshots(ctx context.Context) ([]string, error)
	// DeleteSnapshot removes a snapshot
	DeleteSnapshot(ctx context.Context, name string) error
}

SnapshotBackend defines the interface for snapshot storage backends

type SplitBrainConfig

type SplitBrainConfig struct {
	HeartbeatInterval time.Duration
	HeartbeatTimeout  time.Duration
	MinQuorum         int
	FenceOnPartition  bool
}

SplitBrainConfig configures split-brain detection behavior.

type SplitBrainDetector

type SplitBrainDetector struct {
	// contains filtered or unexported fields
}

SplitBrainDetector monitors cluster health and detects network partitions.

func NewSplitBrainDetector

func NewSplitBrainDetector(cfg SplitBrainConfig) *SplitBrainDetector

NewSplitBrainDetector creates a new split-brain detector.

func (*SplitBrainDetector) CheckQuorum

func (d *SplitBrainDetector) CheckQuorum() bool

CheckQuorum evaluates if quorum is maintained and updates fencing state.

func (*SplitBrainDetector) GetHealthyPeerCount

func (d *SplitBrainDetector) GetHealthyPeerCount() int

GetHealthyPeerCount returns the number of healthy peers.

func (*SplitBrainDetector) GetHeartbeatCount

func (d *SplitBrainDetector) GetHeartbeatCount() uint64

GetHeartbeatCount returns the total heartbeat count.

func (*SplitBrainDetector) GetPartitionCount

func (d *SplitBrainDetector) GetPartitionCount() uint64

GetPartitionCount returns the number of detected partitions.

func (*SplitBrainDetector) GetPeers

func (d *SplitBrainDetector) GetPeers() []string

GetPeers returns all registered peer IDs.

func (*SplitBrainDetector) IsFenced

func (d *SplitBrainDetector) IsFenced() bool

IsFenced returns true if the node is fenced due to partition.

func (*SplitBrainDetector) IsPeerHealthy

func (d *SplitBrainDetector) IsPeerHealthy(id string) bool

IsPeerHealthy returns true if peer has sent heartbeat within timeout.

func (*SplitBrainDetector) RecordHeartbeat

func (d *SplitBrainDetector) RecordHeartbeat(id string)

RecordHeartbeat updates the last heartbeat time for a peer.

func (*SplitBrainDetector) RegisterPeer

func (d *SplitBrainDetector) RegisterPeer(id string)

RegisterPeer adds a peer to monitor.

func (*SplitBrainDetector) UnregisterPeer

func (d *SplitBrainDetector) UnregisterPeer(id string)

UnregisterPeer removes a peer from monitoring.

type StdWAL

type StdWAL struct {
	// contains filtered or unexported fields
}

StdWAL is a standard file-based WAL implementation (fallback).

func NewStdWAL

func NewStdWAL(dir string, v *VectorStore) *StdWAL

func (*StdWAL) Close

func (w *StdWAL) Close() error

func (*StdWAL) Sync

func (w *StdWAL) Sync() error

func (*StdWAL) Write

func (w *StdWAL) Write(name string, seq uint64, ts int64, record arrow.RecordBatch) error

type StorageConfig

type StorageConfig struct {
	DataPath         string
	SnapshotInterval time.Duration
	AsyncFsync       bool
	DoPutBatchSize   int
	UseIOUring       bool
}

StorageConfig holds configuration for persistence

type StrategyType

type StrategyType string
const (
	StrategyRoundRobin       StrategyType = "round_robin"
	StrategyLeastConnections StrategyType = "least_connections"
	StrategyWeighted         StrategyType = "weighted"
)

type SyncState

type SyncState struct {
	Version   uint64
	Dims      int
	Locations []Location
	GraphData []byte
}

SyncState represents the serializable state of an HNSW index.

type TCPNoDelayListener

type TCPNoDelayListener struct {
	*net.TCPListener
}

TCPNoDelayListener wraps a TCPListener and sets TCP_NODELAY on all accepted connections. This disables Nagle's algorithm for lower latency at the cost of potential bandwidth efficiency.

func NewTCPNoDelayListener

func NewTCPNoDelayListener(l *net.TCPListener) *TCPNoDelayListener

NewTCPNoDelayListener creates a new listener that sets TCP_NODELAY on accepted connections.

func (*TCPNoDelayListener) Accept

func (l *TCPNoDelayListener) Accept() (net.Conn, error)

Accept waits for and returns the next connection, with TCP_NODELAY enabled.

type TicketQuery

type TicketQuery struct {
	Name    string   `json:"name"`
	Limit   int64    `json:"limit"`
	Filters []Filter `json:"filters"`
}

func FastParseTicketQuery

func FastParseTicketQuery(ticket []byte) (TicketQuery, error)

FastParseTicketQuery parses ticket JSON using goccy/go-json which is 2-3x faster than encoding/json for this use case. It's a drop-in replacement with identical semantics.

func ParseTicketQuerySafe

func ParseTicketQuerySafe(data []byte) (TicketQuery, error)

ParseTicketQuerySafe is a thread-safe wrapper that uses pooled parsers

type TimestampMap

type TimestampMap struct {
	// contains filtered or unexported fields
}

TimestampMap tracks the latest LWW timestamp for each VectorID.

func NewTimestampMap

func NewTimestampMap() *TimestampMap

func (*TimestampMap) Get

func (tm *TimestampMap) Get(id VectorID) int64

Get returns the timestamp for a given ID.

func (*TimestampMap) Len

func (tm *TimestampMap) Len() int

Len returns the total number of entries tracked.

func (*TimestampMap) Update

func (tm *TimestampMap) Update(id VectorID, ts int64) bool

Update updates the timestamp if the new one is greater (LWW). Returns true if the update was applied.

type VectorClock

type VectorClock struct {
	// contains filtered or unexported fields
}

VectorClock implements a vector clock for causal ordering in distributed systems

func NewVectorClock

func NewVectorClock(nodeID string) *VectorClock

NewVectorClock creates a new vector clock for the given node

func (*VectorClock) Compare

func (vc *VectorClock) Compare(other *VectorClock) ClockComparison

Compare compares this vector clock with another Returns: ClockEqual, ClockBefore, ClockAfter, or ClockConcurrent

func (*VectorClock) Copy

func (vc *VectorClock) Copy() *VectorClock

Copy creates a deep copy of the vector clock

func (*VectorClock) Deserialize

func (vc *VectorClock) Deserialize(data []byte) error

Deserialize restores vector clock from bytes

func (*VectorClock) Get

func (vc *VectorClock) Get(nodeID string) uint64

Get returns the clock value for a specific node

func (*VectorClock) Increment

func (vc *VectorClock) Increment()

Increment increments this node's clock value

func (*VectorClock) Merge

func (vc *VectorClock) Merge(other *VectorClock)

Merge combines another vector clock into this one, taking max values

func (*VectorClock) NodeID

func (vc *VectorClock) NodeID() string

NodeID returns the node identifier for this clock

func (*VectorClock) Serialize

func (vc *VectorClock) Serialize() []byte

Serialize converts the vector clock to bytes for network transfer

func (*VectorClock) Set

func (vc *VectorClock) Set(nodeID string, value uint64)

Set sets the clock value for a specific node

type VectorID

type VectorID uint32

VectorID represents a unique identifier for a vector in the index. It maps to a specific location (Batch, Row) in the Arrow buffers.

type VectorIndex

type VectorIndex interface {
	// AddByLocation adds a vector from the dataset using batch and row indices.
	// Returns the assigned internal Vector ID.
	AddByLocation(batchIdx, rowIdx int) (uint32, error)

	// AddByRecord adds a vector directly from a record batch.
	// Returns the assigned internal Vector ID.
	AddByRecord(rec arrow.RecordBatch, rowIdx, batchIdx int) (uint32, error)

	// SearchVectors returns the k nearest neighbors for the query vector with scores and optional filtering.
	SearchVectors(query []float32, k int, filters []Filter) []SearchResult

	// SearchVectorsWithBitmap returns k nearest neighbors filtered by a bitset.
	SearchVectorsWithBitmap(query []float32, k int, filter *Bitset) []SearchResult

	// GetLocation retrieves the storage location for a given vector ID.
	// Returns the location and true if found, or zero location and false if not found.
	GetLocation(id VectorID) (Location, bool)

	// Len returns the number of vectors in the index.
	Len() int

	// GetDimension returns the vector dimension.
	GetDimension() uint32

	// Warmup pre-loads index data into memory
	Warmup() int

	// SetIndexedColumns updates columns that should be indexed for fast equality lookups
	SetIndexedColumns(cols []string)

	// Close releases index resources.
	Close() error
}

type VectorMetric

type VectorMetric int

VectorMetric defines the distance metric type.

const (
	MetricEuclidean VectorMetric = iota
	MetricCosine
	MetricDotProduct
)

type VectorRecord

type VectorRecord struct {
	ID     int32     `parquet:"id"`
	Vector []float32 `parquet:"vector"`
}

VectorRecord represents a single row for Parquet serialization

type VectorSearchRequest

type VectorSearchRequest struct {
	Dataset   string    `json:"dataset"`
	Vector    []float32 `json:"vector"`
	K         int       `json:"k"`
	Filters   []Filter  `json:"filters"`
	LocalOnly bool      `json:"local_only"`
	// Hybrid Search Fields
	TextQuery string  `json:"text_query"`
	Alpha     float32 `json:"alpha"` // 0.0=sparse, 1.0=dense, 0.5=hybrid
}

VectorSearchRequest defines the request format for VectorSearch action

type VectorSearchResponse

type VectorSearchResponse struct {
	IDs    []uint64  `json:"ids"`
	Scores []float32 `json:"scores"`
}

VectorSearchResponse defines the response format for VectorSearch action

type VectorStore

type VectorStore struct {
	flight.BaseFlightServer

	// Mesh integration
	Mesh *mesh.Gossip
	// contains filtered or unexported fields
}

VectorStore implements flight.FlightServer with minimal logic

func NewVectorStore

func NewVectorStore(mem memory.Allocator, logger *zap.Logger, maxMemory, maxWALSize int64, ttl time.Duration) *VectorStore

func NewVectorStoreWithCompaction

func NewVectorStoreWithCompaction(mem memory.Allocator, logger *zap.Logger, maxMemoryBytes int64, walMaxBytes int64, ttlDuration time.Duration, compactionCfg CompactionConfig) *VectorStore

NewVectorStoreWithCompaction returns a VectorStore with initialized compaction

func NewVectorStoreWithHybridConfig

func NewVectorStoreWithHybridConfig(mem memory.Allocator, logger *zap.Logger, cfg HybridSearchConfig) (*VectorStore, error)

NewVectorStoreWithHybridConfig creates a VectorStore with hybrid search enabled.

func NewVectorStoreWithPipeline

func NewVectorStoreWithPipeline(mem memory.Allocator, logger *zap.Logger, workers, bufferSize int) *VectorStore

NewVectorStoreWithPipeline creates a VectorStore with DoGet pipeline enabled

func NewVectorStoreWithPipelineThreshold

func NewVectorStoreWithPipelineThreshold(mem memory.Allocator, logger *zap.Logger, workers, bufferSize, threshold int) *VectorStore

NewVectorStoreWithPipelineThreshold creates a VectorStore with custom pipeline threshold

func (*VectorStore) ApplyDelta

func (s *VectorStore) ApplyDelta(name string, rec arrow.RecordBatch, seq uint64, ts int64) error

func (*VectorStore) Close

func (s *VectorStore) Close() error

Close ensures the WAL is flushed and closed properly

func (*VectorStore) CompactDataset

func (vs *VectorStore) CompactDataset(name string) error

CompactDataset manually triggers compaction for a specific dataset.

func (*VectorStore) CreateNamespace

func (vs *VectorStore) CreateNamespace(name string) error

CreateNamespace creates a new namespace with the given name. Returns error if namespace already exists.

func (*VectorStore) DeleteNamespace

func (vs *VectorStore) DeleteNamespace(name string) error

DeleteNamespace removes a namespace. Returns error if namespace is "default" or doesn't exist.

func (*VectorStore) DoAction

func (s *VectorStore) DoAction(action *flight.Action, stream flight.FlightService_DoActionServer) error

DoAction handles custom actions like deletion and status

func (*VectorStore) DoExchange

DoExchange implements bidirectional Arrow Flight streaming for mesh replication. This provides the foundation for peer-to-peer Arrow stream transfer.

Protocol: - Client sends FlightData messages with FlightDescriptor indicating dataset - Server receives, processes, and can send data back - Enables future mesh replication between peers

func (*VectorStore) DoGet

DoGet - Minimal implementation

func (*VectorStore) DoPut

DoPut - Minimal implementation DoPut - Optimized implementation with batching

func (*VectorStore) GetAutoCompactionTriggerCount

func (s *VectorStore) GetAutoCompactionTriggerCount() int64

GetAutoCompactionTriggerCount returns trigger count from the worker

func (*VectorStore) GetAutoShardingConfig

func (s *VectorStore) GetAutoShardingConfig() AutoShardingConfig

GetAutoShardingConfig returns the current auto-sharding configuration

func (*VectorStore) GetBM25Index

func (s *VectorStore) GetBM25Index() *BM25InvertedIndex

GetBM25Index returns the BM25 inverted index for text search. Returns nil if hybrid search is not enabled.

func (*VectorStore) GetCompactionConfig

func (vs *VectorStore) GetCompactionConfig() CompactionConfig

GetCompactionConfig returns the current compaction configuration.

func (*VectorStore) GetCompactionStats

func (vs *VectorStore) GetCompactionStats() CompactionStats

GetCompactionStats returns compaction statistics.

func (*VectorStore) GetDataset

func (s *VectorStore) GetDataset(name string) (*Dataset, error)

GetDataset retrieves a dataset by name.

func (*VectorStore) GetDoGetPipelinePool

func (s *VectorStore) GetDoGetPipelinePool() *DoGetPipelinePool

GetDoGetPipelinePool returns the pipeline pool (nil if not configured)

func (*VectorStore) GetFlightInfo

func (s *VectorStore) GetFlightInfo(ctx context.Context, desc *flight.FlightDescriptor) (*flight.FlightInfo, error)

func (*VectorStore) GetHybridSearchConfig

func (s *VectorStore) GetHybridSearchConfig() HybridSearchConfig

GetHybridSearchConfig returns the hybrid search configuration.

func (*VectorStore) GetIndexedColumns

func (s *VectorStore) GetIndexedColumns() []string

GetIndexedColumns returns columns currently being indexed

func (*VectorStore) GetNamespace

func (vs *VectorStore) GetNamespace(name string) *Namespace

GetNamespace returns a namespace by name, or nil if not found.

func (*VectorStore) GetNamespaceDatasetCount

func (vs *VectorStore) GetNamespaceDatasetCount(name string) int

GetNamespaceDatasetCount returns the number of datasets in a namespace.

func (*VectorStore) GetPipelineStats

func (s *VectorStore) GetPipelineStats() PipelineStats

GetPipelineStats returns current pipeline statistics

func (*VectorStore) GetPipelineThreshold

func (s *VectorStore) GetPipelineThreshold() int

GetPipelineThreshold returns the batch count threshold for pipeline use

func (*VectorStore) GetSchema

func (*VectorStore) GetTotalNamespaceCount

func (vs *VectorStore) GetTotalNamespaceCount() int

GetTotalNamespaceCount returns the total number of namespaces.

func (*VectorStore) GetWALQueueDepth

func (s *VectorStore) GetWALQueueDepth() (int, int)

func (*VectorStore) HybridSearch

func (s *VectorStore) HybridSearch(ctx context.Context, name string, query []float32, k int, filters map[string]string) ([]SearchResult, error)

HybridSearch is a wrapper for the HybridSearch function

func (*VectorStore) IndexRecordColumns

func (s *VectorStore) IndexRecordColumns(datasetName string, rec arrow.RecordBatch, batchIdx int)

IndexRecordColumns indexes specific columns for fast equality lookups

func (*VectorStore) InitPersistence

func (s *VectorStore) InitPersistence(cfg StorageConfig) error

InitPersistence initializes the WAL and loads any existing data

func (*VectorStore) IsCompactionRunning

func (vs *VectorStore) IsCompactionRunning() bool

IsCompactionRunning returns true if the compaction worker is running.

func (*VectorStore) IterateDatasets

func (s *VectorStore) IterateDatasets(fn func(ds *Dataset))

IterateDatasets safely iterates over all datasets for background tasks.

func (*VectorStore) ListFlights

func (*VectorStore) ListNamespaces

func (vs *VectorStore) ListNamespaces() []string

ListNamespaces returns all namespace names.

func (*VectorStore) MapInternalToUserIDs

func (s *VectorStore) MapInternalToUserIDs(ds *Dataset, results []SearchResult) []SearchResult

MapInternalToUserIDs maps internal HNSW IDs to user-provided IDs

func (*VectorStore) MerkleRoot

func (s *VectorStore) MerkleRoot(name string) [32]byte

func (*VectorStore) NamespaceExists

func (vs *VectorStore) NamespaceExists(name string) bool

NamespaceExists checks if a namespace exists.

func (*VectorStore) SearchHybrid

func (s *VectorStore) SearchHybrid(ctx context.Context, name string, query []float32, textQuery string, k int, alpha float32, rrfK int) ([]SearchResult, error)

SearchHybrid is a wrapper for the SearchHybrid function (RRF version)

func (*VectorStore) SetAutoShardingConfig

func (s *VectorStore) SetAutoShardingConfig(cfg AutoShardingConfig)

SetAutoShardingConfig updates the auto-sharding configuration

func (*VectorStore) SetIndexedColumns

func (s *VectorStore) SetIndexedColumns(cols []string)

SetIndexedColumns updates columns that should be indexed for fast equality lookups

func (*VectorStore) SetMesh

func (s *VectorStore) SetMesh(m *mesh.Gossip)

func (*VectorStore) Shutdown

func (s *VectorStore) Shutdown(ctx context.Context) error

Shutdown performs a graceful shutdown of the VectorStore. It stops accepting new requests, drains the index queue, flushes the WAL, and closes all file handles. The context controls the shutdown timeout.

func (*VectorStore) Snapshot

func (s *VectorStore) Snapshot() error

func (*VectorStore) StartEvictionTicker

func (s *VectorStore) StartEvictionTicker(d time.Duration)

func (*VectorStore) StartMetricsTicker

func (s *VectorStore) StartMetricsTicker(d time.Duration)

func (*VectorStore) StartWALCheckTicker

func (s *VectorStore) StartWALCheckTicker(d time.Duration)

func (*VectorStore) StoreRecordBatch

func (s *VectorStore) StoreRecordBatch(ctx context.Context, name string, rec arrow.RecordBatch) error

StoreRecordBatch stores a batch of records in a dataset

func (*VectorStore) TruncateWAL

func (s *VectorStore) TruncateWAL() error

TruncateWAL truncates the WAL file after a successful snapshot. This should only be called after SaveSnapshot completes successfully.

func (*VectorStore) UpdateConfig

func (s *VectorStore) UpdateConfig(maxMemory, maxWALSize int64, snapshotInterval time.Duration)

func (*VectorStore) Warmup

func (s *VectorStore) Warmup() WarmupStats

Warmup iterates through all datasets and warms up their indexes

type VectorizedFilter

type VectorizedFilter struct {
	// contains filtered or unexported fields
}

VectorizedFilter provides optimized Arrow Compute based filtering

func NewVectorizedFilter

func NewVectorizedFilter(alloc memory.Allocator) *VectorizedFilter

NewVectorizedFilter creates a new vectorized filter

func (*VectorizedFilter) Apply

func (vf *VectorizedFilter) Apply(ctx context.Context, rec arrow.RecordBatch, filters []Filter) (arrow.RecordBatch, error)

Apply applies filters to a record batch using vectorized Arrow Compute operations

type VersionedData

type VersionedData struct {
	Dataset string
	Data    []byte
	Clock   *VectorClock
}

VersionedData wraps data with a vector clock for causal consistency

func NewVersionedData

func NewVersionedData(dataset string, data []byte, clock *VectorClock) *VersionedData

NewVersionedData creates versioned data with a vector clock

func (*VersionedData) Conflicts

func (vd *VersionedData) Conflicts(other *VersionedData) bool

Conflicts returns true if this data is concurrent with the other

func (*VersionedData) Supersedes

func (vd *VersionedData) Supersedes(other *VersionedData) bool

Supersedes returns true if this data happened after the other

type WAL

type WAL interface {
	Write(name string, seq uint64, ts int64, record arrow.RecordBatch) error
	Close() error
	Sync() error
}

WAL defines the interface for Write-Ahead Logging.

func NewWAL

func NewWAL(dir string, v *VectorStore) WAL

type WALBackend

type WALBackend interface {
	// Write writes the data to the WAL.
	// Implementations should handle offset tracking if necessary (e.g. for io_uring pwrite).
	Write(p []byte) (n int, err error)

	// Sync explicitly flushes data to disk.
	Sync() error

	// Close closes the underlying resource.
	Close() error

	// Name returns the file name or identifier.
	Name() string

	// File returns the underlying *os.File if available.
	File() *os.File
}

WALBackend defines the interface for low-level WAL I/O. Implementations must be thread-safe or serialization must be handled by caller. WALBatcher currently handles serialization (single writer in flush loop).

func NewUringBackend

func NewUringBackend(path string) (WALBackend, error)

NewUringBackend returns an error on non-Linux systems.

func NewWALBackend

func NewWALBackend(path string, preferAsync bool) (WALBackend, error)

NewWALBackend creates a new WALBackend. It attempts to use io_uring if preferAsync is true and the platform supports it. Otherwise, it falls back to FSBackend.

type WALBatcher

type WALBatcher struct {
	// contains filtered or unexported fields
}

WALBatcher batches WAL writes for improved performance Instead of fsync on every write, it groups writes and flushes periodically

func NewWALBatcher

func NewWALBatcher(dataPath string, config *WALBatcherConfig) *WALBatcher

NewWALBatcher creates a new batched WAL writer

func (*WALBatcher) AsyncFsyncStats

func (w *WALBatcher) AsyncFsyncStats() *AsyncFsyncerStats

AsyncFsyncStats returns stats from the async fsyncer, or nil if not enabled

func (*WALBatcher) FlushError

func (w *WALBatcher) FlushError() error

FlushError returns the last flush error if any

func (*WALBatcher) GetCurrentInterval

func (w *WALBatcher) GetCurrentInterval() time.Duration

GetCurrentInterval returns the current flush interval

func (*WALBatcher) IsAdaptiveEnabled

func (w *WALBatcher) IsAdaptiveEnabled() bool

IsAdaptiveEnabled returns whether adaptive batching is enabled

func (*WALBatcher) IsAsyncFsyncEnabled

func (w *WALBatcher) IsAsyncFsyncEnabled() bool

IsAsyncFsyncEnabled returns true if async fsync is configured and running

func (*WALBatcher) QueueCapacity

func (w *WALBatcher) QueueCapacity() int

QueueCapacity returns the maximum capacity of the queue

func (*WALBatcher) QueueDepth

func (w *WALBatcher) QueueDepth() int

QueueDepth returns the current number of pending entries

func (*WALBatcher) Start

func (w *WALBatcher) Start() error

Start initializes the WAL file and starts the background flush goroutine

func (*WALBatcher) Stop

func (w *WALBatcher) Stop() error

Stop gracefully shuts down the batcher, flushing pending writes

func (*WALBatcher) Write

func (w *WALBatcher) Write(rec arrow.RecordBatch, name string, seq uint64, ts int64) error

Write queues a record for batched WAL writing (non-blocking)

type WALBatcherConfig

type WALBatcherConfig struct {
	FlushInterval time.Duration     // Time between flushes (e.g., 10ms)
	MaxBatchSize  int               // Max entries before forced flush (e.g., 100)
	Adaptive      AdaptiveWALConfig // Adaptive batching configuration
	AsyncFsync    AsyncFsyncConfig  // Async fsync configuration
	UseIOUring    bool              // Use io_uring backend if available
}

WALBatcherConfig configures the batched WAL writer

func DefaultWALBatcherConfig

func DefaultWALBatcherConfig() WALBatcherConfig

DefaultWALBatcherConfig returns sensible defaults

type WALEntry

type WALEntry struct {
	Record    arrow.RecordBatch
	Name      string
	Seq       uint64
	Timestamp int64
}

WALEntry represents a single entry to be written to the WAL

type WALError

type WALError struct {
	Op        string    // Operation: "write", "read", "flush", "truncate"
	Path      string    // WAL file path
	Offset    int64     // Byte offset where error occurred
	Cause     error     // Underlying error
	Timestamp time.Time // When the error occurred
}

WALError provides rich context for Write-Ahead Log operations.

func (*WALError) Error

func (e *WALError) Error() string

func (*WALError) Unwrap

func (e *WALError) Unwrap() error

type WALIterator

type WALIterator struct {
	// contains filtered or unexported fields
}

WALIterator iterators over WAL entries, supporting seeking by Sequence ID.

func NewWALIterator

func NewWALIterator(dir string, mem memory.Allocator) (*WALIterator, error)

func (*WALIterator) Close

func (it *WALIterator) Close() error

func (*WALIterator) Next

Next reads the next entry. Returns io.EOF if done.

func (*WALIterator) Seek

func (it *WALIterator) Seek(seq uint64) error

Seek fast-forwards the iterator to the first entry with Sequence > seq.

type WarmupStats

type WarmupStats struct {
	DatasetsWarmed   int
	DatasetsSkipped  int
	TotalNodesWarmed int
	Duration         time.Duration
}

WarmupStats holds statistics about the warmup operation

func (WarmupStats) String

func (w WarmupStats) String() string

type WeightedStrategy

type WeightedStrategy struct {
	// contains filtered or unexported fields
}

WeightedStrategy picks replicas based on assigned weights

func NewWeightedStrategy

func NewWeightedStrategy() *WeightedStrategy

NewWeightedStrategy creates a new weighted strategy

func (*WeightedStrategy) Select

func (s *WeightedStrategy) Select(replicas []string) string

Select picks a replica based on weighted probability

func (*WeightedStrategy) SetWeight

func (s *WeightedStrategy) SetWeight(replica string, weight int)

SetWeight sets the weight for a replica

type WriteFn

type WriteFn func(ctx context.Context, peer string) error

WriteFn is the function signature for write operations

type WriteRateTracker

type WriteRateTracker struct {
	// contains filtered or unexported fields
}

WriteRateTracker tracks write rate using exponential moving average

func NewWriteRateTracker

func NewWriteRateTracker(window time.Duration) *WriteRateTracker

NewWriteRateTracker creates a tracker with the given averaging window

func (*WriteRateTracker) GetRate

func (t *WriteRateTracker) GetRate() float64

GetRate returns the current estimated write rate (writes per second)

func (*WriteRateTracker) RecordWrite

func (t *WriteRateTracker) RecordWrite()

RecordWrite records a single write operation

type ZeroAllocTicketParser

type ZeroAllocTicketParser struct {
	// contains filtered or unexported fields
}

ZeroAllocTicketParser parses TicketQuery JSON with zero allocations for the common case (no escape sequences).

func NewZeroAllocTicketParser

func NewZeroAllocTicketParser() *ZeroAllocTicketParser

NewZeroAllocTicketParser creates a new reusable parser

func (*ZeroAllocTicketParser) Parse

func (p *ZeroAllocTicketParser) Parse(data []byte) (TicketQuery, error)

Parse parses the JSON data into a TicketQuery

type ZeroAllocVectorSearchParser

type ZeroAllocVectorSearchParser struct {
	// contains filtered or unexported fields
}

ZeroAllocVectorSearchParser parses VectorSearchRequest JSON with minimal allocations. The parser pre-allocates a vector slice and reuses it across parse calls. For the common case (no escape sequences in dataset name), this achieves zero allocations beyond the initial pre-allocation.

func NewZeroAllocVectorSearchParser

func NewZeroAllocVectorSearchParser(maxDims int) *ZeroAllocVectorSearchParser

NewZeroAllocVectorSearchParser creates a new reusable parser. maxDims specifies the maximum expected vector dimensions for pre-allocation.

func (*ZeroAllocVectorSearchParser) Parse

Parse parses the JSON data into a VectorSearchRequest. The returned VectorSearchRequest.Vector shares the parser's internal buffer, so the result is only valid until the next Parse call.

type ZeroCopyBufferConfig

type ZeroCopyBufferConfig struct {
	Enabled            bool
	MaxRetainedBuffers int
	MaxBufferSize      int64
}

ZeroCopyBufferConfig configures zero-copy buffer behavior

func DefaultZeroCopyBufferConfig

func DefaultZeroCopyBufferConfig() ZeroCopyBufferConfig

DefaultZeroCopyBufferConfig returns sensible defaults

func (ZeroCopyBufferConfig) Validate

func (c ZeroCopyBufferConfig) Validate() error

Validate checks configuration validity

type ZeroCopyPayload

type ZeroCopyPayload struct {
	DatasetName string
	Schema      []byte
	RowCount    int64
	Meta        []ArrayStructure // One per column
	Buffers     [][]byte
}

ZeroCopyPayload represents a serialized RecordBatch that references raw buffers. This is used for high-speed P2P replication (Item 7).

func MarshalZeroCopy

func MarshalZeroCopy(name string, rec arrow.RecordBatch) (*ZeroCopyPayload, error)

MarshalZeroCopy creates a payload that references the underlying Arrow buffers. WARNING: The resulting payload is only valid as long as the RecordBatch is retained.

Source Files

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL