Documentation
¶
Overview ¶
Package store provides a pooled memory allocator for Arrow operations.
Index ¶
- Constants
- Variables
- func AdviseMemory(ptr unsafe.Pointer, size uintptr, advice MemoryAdvice) error
- func AdviseRecord(rec arrow.RecordBatch, advice MemoryAdvice)
- func BuildNamespacedPath(namespace, dataset string) string
- func CachedRecordSize(rec arrow.RecordBatch) int64
- func EnsureTimestampZeroCopy(mem memory.Allocator, rec arrow.RecordBatch) (arrow.RecordBatch, error)
- func EstimateRecordSize(rec arrow.RecordBatch) int64
- func FastPathEqual(ctx context.Context, arr arrow.Array, value string) (arrow.Array, error)
- func FastPathGreater(ctx context.Context, arr arrow.Array, value string) (arrow.Array, error)
- func FastPathGreaterEqual(ctx context.Context, arr arrow.Array, value string) (arrow.Array, error)
- func FastPathLess(ctx context.Context, arr arrow.Array, value string) (arrow.Array, error)
- func FastPathLessEqual(ctx context.Context, arr arrow.Array, value string) (arrow.Array, error)
- func FastPathNotEqual(ctx context.Context, arr arrow.Array, value string) (arrow.Array, error)
- func GetCurrentCPU() int
- func GetJSONEncoder() (*bytes.Buffer, *json.Encoder)
- func GetNumaNode(ptr unsafe.Pointer) (int, error)
- func GetTCPNoDelayConnectionsTotal() int64
- func HammingDistanceBatch(query []uint64, vectors [][]uint64, distances []int)
- func HammingDistancePOPCNT(a, b []uint64) int
- func IncrementRecordAccess()
- func IsFastPathSupported(dt arrow.DataType, op FilterOperator) bool
- func IsNotFoundError(err error) bool
- func LockMemory(ptr unsafe.Pointer, size uintptr) error
- func MatchesFilters(rec arrow.RecordBatch, rowIdx int, filters []Filter) (bool, error)
- func MustToGRPCStatus(err error) error
- func NewConfigError(component, field, value, message string) error
- func NewDimensionMismatchError(dataset string, expected, actual int) error
- func NewInternalError(operation string, cause error) error
- func NewInvalidArgumentError(field, message string) error
- func NewNotFoundError(resource, name string) error
- func NewPersistenceError(operation string, cause error) error
- func NewReplicationError(op, peerAddr, dataset string, cause error) error
- func NewResourceExhaustedError(resource, message string) error
- func NewS3Error(op, bucket, key string, cause error) error
- func NewSchemaMismatchError(dataset, message string) error
- func NewShutdownError(phase, component string, cause error) error
- func NewUnavailableError(operation, reason string) error
- func NewWALError(op, path string, offset int64, cause error) error
- func PackBytesToFloat32s(codes []uint8) []float32
- func ParseNamespacedPath(path string) (namespace, dataset string)
- func PinThreadToCore(core int) error
- func PinThreadToNode(node int) error
- func PinToNUMANode(topo *NUMATopology, nodeID int) error
- func PutArena(arena *SearchArena)
- func PutJSONEncoder(buf *bytes.Buffer, enc *json.Encoder)
- func ResetGlobalSizeCache()
- func RetainRecordBatch(rec arrow.RecordBatch) arrow.RecordBatch
- func SQ8CosineDistance(q1, q2 []uint8, enc *SQ8Encoder) float32
- func SQ8DistanceFast(q1, q2 []uint8) uint32
- func SQ8EuclideanDistance(q1, q2 []uint8, enc *SQ8Encoder) float32
- func ShouldUsePipeline(numBatches int) bool
- func ToGRPCStatus(err error) error
- func UnlockMemory(ptr unsafe.Pointer, size uintptr) error
- func UnmarshalZeroCopy(payload *ZeroCopyPayload, mem memory.Allocator) (arrow.RecordBatch, error)
- func UnpackFloat32sToBytes(packed []float32, length int) []uint8
- func ZeroCopyRecordBatch(mem memory.Allocator, rec arrow.RecordBatch, deleted *Bitset) (arrow.RecordBatch, error)
- type AdaptiveIndex
- type AdaptiveIndexConfig
- type AdaptiveIntervalCalculator
- type AdaptiveWALConfig
- type AllocatorAwareCache
- type ArrayStructure
- type AsyncFsyncConfig
- type AsyncFsyncer
- func (a *AsyncFsyncer) AddDirtyBytes(n int64)
- func (a *AsyncFsyncer) Config() AsyncFsyncConfig
- func (a *AsyncFsyncer) DirtyBytes() int64
- func (a *AsyncFsyncer) ForceFsync() error
- func (a *AsyncFsyncer) IsRunning() bool
- func (a *AsyncFsyncer) RequestFsyncIfNeeded() bool
- func (a *AsyncFsyncer) Start(f *os.File) error
- func (a *AsyncFsyncer) Stats() AsyncFsyncerStats
- func (a *AsyncFsyncer) Stop() error
- func (a *AsyncFsyncer) WaitForPendingFsyncs()
- type AsyncFsyncerStats
- type AutoShardingConfig
- type AutoShardingIndex
- func (a *AutoShardingIndex) AddByLocation(batchIdx, rowIdx int) (uint32, error)
- func (a *AutoShardingIndex) AddByRecord(rec arrow.RecordBatch, rowIdx, batchIdx int) (uint32, error)
- func (idx *AutoShardingIndex) Close() error
- func (a *AutoShardingIndex) GetDimension() uint32
- func (idx *AutoShardingIndex) GetLocation(id VectorID) (Location, bool)
- func (a *AutoShardingIndex) Len() int
- func (a *AutoShardingIndex) SearchVectors(query []float32, k int, filters []Filter) []SearchResult
- func (a *AutoShardingIndex) SearchVectorsWithBitmap(query []float32, k int, filter *Bitset) []SearchResult
- func (idx *AutoShardingIndex) SetIndexedColumns(cols []string)
- func (idx *AutoShardingIndex) Warmup() int
- type BM25Config
- type BM25InvertedIndex
- func (idx *BM25InvertedIndex) Add(id VectorID, text string)
- func (idx *BM25InvertedIndex) Delete(id VectorID)
- func (idx *BM25InvertedIndex) DocCount() int
- func (idx *BM25InvertedIndex) GetDocLength(id VectorID) int
- func (idx *BM25InvertedIndex) GetTermDocFreq(term string) int
- func (idx *BM25InvertedIndex) SearchBM25(query string, limit int) []SearchResult
- type BM25Scorer
- func (s *BM25Scorer) AddDocument(docLength int)
- func (s *BM25Scorer) AvgDocLength() float64
- func (s *BM25Scorer) Config() BM25Config
- func (s *BM25Scorer) IDF(docFreq int) float64
- func (s *BM25Scorer) RemoveDocument(docLength int)
- func (s *BM25Scorer) Score(tf, docLength, docFreq int) float64
- func (s *BM25Scorer) ScoreMultiTerm(docLength int, terms []struct{ ... }) float64
- func (s *BM25Scorer) TotalDocs() int
- type BackpressureConfig
- type BatchRemapInfo
- type BinaryQuantizer
- type BitmapPool
- type BitmapPoolConfig
- type BitmapPoolStats
- type Bitset
- type BloomFilter
- type BruteForceIndex
- type BufferRetainerStats
- type BytePool
- type COWMetadataMap
- func (c *COWMetadataMap) Delete(name string)
- func (c *COWMetadataMap) Get(name string) (DatasetMetadata, bool)
- func (c *COWMetadataMap) IncrementStats(name string, rows int64, batches int)
- func (c *COWMetadataMap) Len() int
- func (c *COWMetadataMap) Set(name string, meta DatasetMetadata)
- func (c *COWMetadataMap) Snapshot() map[string]DatasetMetadata
- func (c *COWMetadataMap) UpdateFromRecords(name string, batches []arrow.RecordBatch)
- type CheckpointConfig
- type CheckpointCoordinator
- func (c *CheckpointCoordinator) GetCheckpointCount() uint64
- func (c *CheckpointCoordinator) GetEpoch() uint64
- func (c *CheckpointCoordinator) InitiateCheckpoint(ctx context.Context) error
- func (c *CheckpointCoordinator) RecoverFromEpoch(epoch uint64)
- func (c *CheckpointCoordinator) RegisterParticipant(id string)
- func (c *CheckpointCoordinator) ShouldTruncateWAL(epoch uint64) bool
- func (c *CheckpointCoordinator) UnregisterParticipant(id string)
- func (c *CheckpointCoordinator) WaitForBarrier(ctx context.Context, participantID string, epoch uint64) error
- type ChunkHandler
- type ChunkWorkerPool
- type ChunkWorkerPoolConfig
- type ChunkWorkerPoolStats
- type CircuitBreaker
- func (cb *CircuitBreaker) Allow() bool
- func (cb *CircuitBreaker) Execute(fn func() (interface{}, error)) (interface{}, error)
- func (cb *CircuitBreaker) RecordFailure()
- func (cb *CircuitBreaker) RecordSuccess()
- func (cb *CircuitBreaker) Reset()
- func (cb *CircuitBreaker) State() CircuitState
- func (cb *CircuitBreaker) Stats() CircuitBreakerStats
- type CircuitBreakerConfig
- type CircuitBreakerRegistry
- type CircuitBreakerStats
- type CircuitState
- type ClockComparison
- type ColumnInvertedIndex
- func (idx *ColumnInvertedIndex) BuildFilterMask(datasetName string, recordIdx int, columnName, value string, numRows int, ...) *array.Boolean
- func (idx *ColumnInvertedIndex) FilterRecordWithIndex(ctx context.Context, datasetName string, recordIdx int, rec arrow.RecordBatch, ...) (arrow.RecordBatch, error)
- func (idx *ColumnInvertedIndex) GetMatchingRowIndices(datasetName string, recordIdx int, columnName, value string) []int
- func (idx *ColumnInvertedIndex) HasIndex(datasetName, columnName string) bool
- func (idx *ColumnInvertedIndex) IndexRecord(datasetName string, recordIdx int, rec arrow.RecordBatch, ...)
- func (idx *ColumnInvertedIndex) Lookup(datasetName, columnName, value string) []RowPosition
- func (idx *ColumnInvertedIndex) RemoveDataset(datasetName string)
- func (idx *ColumnInvertedIndex) RemoveRecord(datasetName string, recordIdx int)
- func (idx *ColumnInvertedIndex) Stats() ColumnInvertedIndexStats
- type ColumnInvertedIndexStats
- type ColumnMetadata
- type Community
- type CompactionCandidate
- type CompactionConfig
- type CompactionStats
- type CompactionWorker
- type CompareOp
- type CompressedBitmap
- func (b *CompressedBitmap) Add(id VectorID)
- func (b *CompressedBitmap) Cardinality() uint64
- func (b *CompressedBitmap) Contains(id VectorID) bool
- func (b *CompressedBitmap) Difference(other *CompressedBitmap) *CompressedBitmap
- func (b *CompressedBitmap) Intersection(other *CompressedBitmap) *CompressedBitmap
- func (b *CompressedBitmap) Iterator() roaring.IntIterable
- func (b *CompressedBitmap) Remove(id VectorID)
- func (b *CompressedBitmap) Union(other *CompressedBitmap) *CompressedBitmap
- type ConfigError
- type ConsistencyLevel
- type DataServer
- func (s *DataServer) DoAction(action *flight.Action, stream flight.FlightService_DoActionServer) error
- func (s *DataServer) DoExchange(stream flight.FlightService_DoExchangeServer) error
- func (s *DataServer) DoGet(tkt *flight.Ticket, stream flight.FlightService_DoGetServer) error
- func (s *DataServer) DoPut(stream flight.FlightService_DoPutServer) error
- func (s *DataServer) GetFlightInfo(ctx context.Context, desc *flight.FlightDescriptor) (*flight.FlightInfo, error)
- func (s *DataServer) GetSchema(ctx context.Context, desc *flight.FlightDescriptor) (*flight.SchemaResult, error)
- func (s *DataServer) ListFlights(c *flight.Criteria, stream flight.FlightService_ListFlightsServer) error
- type Dataset
- func (d *Dataset) AddToIndex(batchIdx, rowIdx int) error
- func (d *Dataset) EvictExpiredRecords() []arrow.RecordBatch
- func (ds *Dataset) GetExistingSchema() *arrow.Schema
- func (d *Dataset) GetRecord(idx int) (arrow.RecordBatch, bool)
- func (d *Dataset) GetRecordMetadata(rec arrow.RecordBatch) *RecordMetadata
- func (ds *Dataset) GetRecordsCount() int
- func (d *Dataset) GetVectorIndex() VectorIndex
- func (ds *Dataset) GetVersion() int64
- func (d *Dataset) IndexLen() int
- func (d *Dataset) InitRecordEviction()
- func (d *Dataset) IsSharded() bool
- func (d *Dataset) LastAccess() time.Time
- func (d *Dataset) MigrateToShardedIndex(cfg AutoShardingConfig) error
- func (d *Dataset) RegisterRecordWithTTL(rec arrow.RecordBatch, ttl time.Duration)
- func (d *Dataset) SearchDataset(query []float32, k int) []SearchResult
- func (d *Dataset) SetLastAccess(t time.Time)
- func (ds *Dataset) UpgradeSchemaVersion()
- type DatasetMetadata
- type DeltaSync
- type DirectBufferReader
- type DiskANNConfig
- type DiskANNIndex
- func (d *DiskANNIndex) Add(id uint64, vector []float32) error
- func (d *DiskANNIndex) AddBatch(ids []uint64, vectors [][]float32) error
- func (d *DiskANNIndex) AddByLocation(batchIdx, rowIdx int) error
- func (d *DiskANNIndex) Build() error
- func (d *DiskANNIndex) Close() error
- func (d *DiskANNIndex) Dimension() int
- func (d *DiskANNIndex) Len() int
- func (d *DiskANNIndex) Load(path string) error
- func (d *DiskANNIndex) NeedsBuild() bool
- func (d *DiskANNIndex) Save(path string) error
- func (d *DiskANNIndex) Search(query []float32, k int) ([]IndexSearchResult, error)
- func (d *DiskANNIndex) SearchBatch(queries [][]float32, k int) ([][]IndexSearchResult, error)
- func (d *DiskANNIndex) SearchVectors(query []float32, k int) []SearchResult
- func (d *DiskANNIndex) Size() int
- func (d *DiskANNIndex) Type() IndexType
- type DoGetPipeline
- func (p *DoGetPipeline) NumWorkers() int
- func (p *DoGetPipeline) Process(ctx context.Context, batches []arrow.RecordBatch, filterFn FilterFunc) (results <-chan PipelineResult, errs <-chan error)
- func (p *DoGetPipeline) ProcessRecords(ctx context.Context, records []arrow.RecordBatch, tombstones map[int]*Bitset, ...) <-chan PipelineStage
- func (p *DoGetPipeline) Stats() PipelineStats
- func (p *DoGetPipeline) Stop()
- type DoGetPipelineConfig
- type DoGetPipelinePool
- type Edge
- type ErrDimensionMismatch
- type ErrInternal
- type ErrInvalidArgument
- type ErrNotFound
- type ErrPersistence
- type ErrResourceExhausted
- type ErrSchemaMismatch
- type ErrUnavailable
- type FSBackend
- type Filter
- type FilterEvaluator
- type FilterFunc
- type FilterOperator
- type FlightClientPool
- func (p *FlightClientPool) AddHost(host string) error
- func (p *FlightClientPool) Close() error
- func (p *FlightClientPool) DoGetFromPeer(ctx context.Context, host, dataset string, filters []Filter) ([]arrow.RecordBatch, error)
- func (p *FlightClientPool) DoPutToPeer(ctx context.Context, host, dataset string, records []arrow.RecordBatch) error
- func (p *FlightClientPool) Get(ctx context.Context, host string) (*PooledFlightClient, error)
- func (p *FlightClientPool) Put(conn *PooledFlightClient)
- func (p *FlightClientPool) RemoveHost(host string) error
- func (p *FlightClientPool) ReplicateToPeers(ctx context.Context, peers []string, dataset string, ...) map[string]error
- func (p *FlightClientPool) Stats() FlightClientPoolStats
- type FlightClientPoolConfig
- type FlightClientPoolStats
- type FlightDataChunk
- type FlightDataQueue
- func (q *FlightDataQueue) Close()
- func (q *FlightDataQueue) Dequeue(ctx context.Context) (*FlightDataChunk, bool)
- func (q *FlightDataQueue) IsClosed() bool
- func (q *FlightDataQueue) Len() int
- func (q *FlightDataQueue) Stats() FlightDataQueueStats
- func (q *FlightDataQueue) TryEnqueue(chunk *FlightDataChunk) bool
- type FlightDataQueueConfig
- type FlightDataQueueStats
- type FlightReaderBufferRetainer
- func (r *FlightReaderBufferRetainer) GetBuffer(handle uint64) []byte
- func (r *FlightReaderBufferRetainer) MaxBuffers() int
- func (r *FlightReaderBufferRetainer) ReleaseBuffer(handle uint64) bool
- func (r *FlightReaderBufferRetainer) RetainBuffer(buf []byte) uint64
- func (r *FlightReaderBufferRetainer) Stats() BufferRetainerStats
- type FusionMode
- type GRPCConfig
- func (c GRPCConfig) BuildClientOptions() []grpc.DialOption
- func (c GRPCConfig) BuildServerOptions() []grpc.ServerOption
- func (c GRPCConfig) ClientKeepaliveParams() keepalive.ClientParameters
- func (c GRPCConfig) ServerEnforcementPolicy() keepalive.EnforcementPolicy
- func (c GRPCConfig) ServerKeepaliveParams() keepalive.ServerParameters
- func (c GRPCConfig) String() string
- func (c GRPCConfig) Validate() error
- type GlobalSearchCoordinator
- type GraphStore
- func (gs *GraphStore) AddEdge(e Edge) error
- func (gs *GraphStore) CommunityCount() int
- func (gs *GraphStore) DetectCommunities() []Community
- func (gs *GraphStore) EdgeCount() int
- func (gs *GraphStore) FromArrowBatch(batch arrow.Record) error
- func (gs *GraphStore) GetCommunityForNode(node string) int
- func (gs *GraphStore) GetEdgesByObject(object string) []Edge
- func (gs *GraphStore) GetEdgesByPredicate(predicate string) []Edge
- func (gs *GraphStore) GetEdgesBySubject(subject string) []Edge
- func (gs *GraphStore) PredicateVocabulary() []string
- func (gs *GraphStore) ToArrowBatch(mem memory.Allocator) (arrow.Record, error)
- func (gs *GraphStore) Traverse(start string, maxHops int) []Path
- func (gs *GraphStore) TraverseParallel(starts []string, maxHops int) map[string][]Path
- type HNSWGraphSync
- func (s *HNSWGraphSync) ApplyDelta(delta *DeltaSync) error
- func (s *HNSWGraphSync) ExportDelta(fromVersion uint64) (*DeltaSync, error)
- func (s *HNSWGraphSync) ExportGraph(w io.Writer) error
- func (s *HNSWGraphSync) ExportState() ([]byte, error)
- func (s *HNSWGraphSync) GetExportCount() uint64
- func (s *HNSWGraphSync) GetImportCount() uint64
- func (s *HNSWGraphSync) GetVersion() uint64
- func (s *HNSWGraphSync) ImportGraph(r io.Reader) error
- func (s *HNSWGraphSync) ImportState(data []byte) error
- func (s *HNSWGraphSync) IncrementVersion()
- type HNSWIndex
- func (h *HNSWIndex) Add(batchIdx, rowIdx int) (uint32, error)
- func (h *HNSWIndex) AddBatchParallel(locations []Location, workers int) error
- func (h *HNSWIndex) AddByLocation(batchIdx, rowIdx int) (uint32, error)
- func (h *HNSWIndex) AddByRecord(rec arrow.RecordBatch, rowIdx, batchIdx int) (uint32, error)
- func (h *HNSWIndex) AddSafe(rec arrow.RecordBatch, rowIdx, batchIdx int) (uint32, error)
- func (h *HNSWIndex) Close() error
- func (h *HNSWIndex) CloseGPU() error
- func (h *HNSWIndex) GetDimension() uint32
- func (h *HNSWIndex) GetDistanceFunc() func(a, b []float32) float32
- func (h *HNSWIndex) GetLocation(id VectorID) (Location, bool)
- func (h *HNSWIndex) InitGPU(deviceID int, logger *zap.Logger) error
- func (h *HNSWIndex) IsGPUEnabled() bool
- func (h *HNSWIndex) Len() int
- func (h *HNSWIndex) PutResults(results []VectorID)
- func (h *HNSWIndex) RegisterReader()
- func (h *HNSWIndex) RemapLocations(remapping map[int]BatchRemapInfo)
- func (h *HNSWIndex) RerankBatch(query []float32, candidateIDs []VectorID, k int) []RankedResult
- func (h *HNSWIndex) Search(query []float32, k int) []VectorID
- func (h *HNSWIndex) SearchBatch(queries [][]float32, k int) [][]RankedResult
- func (h *HNSWIndex) SearchBatchOptimized(queries [][]float32, k int) [][]RankedResult
- func (h *HNSWIndex) SearchBatchWithArena(queries [][]float32, k int, arena *SearchArena) [][]RankedResult
- func (h *HNSWIndex) SearchByID(id VectorID, k int) []VectorID
- func (h *HNSWIndex) SearchByIDUnsafe(id VectorID, k int) []VectorID
- func (h *HNSWIndex) SearchHybrid(query []float32, k int) []SearchResult
- func (h *HNSWIndex) SearchVectors(query []float32, k int, filters []Filter) []SearchResult
- func (h *HNSWIndex) SearchVectorsWithBitmap(query []float32, k int, filter *Bitset) []SearchResult
- func (h *HNSWIndex) SearchWithArena(query []float32, k int, arena *SearchArena) []VectorID
- func (h *HNSWIndex) SearchWithBatchDistance(query []float32, k int) []RankedResult
- func (h *HNSWIndex) SetIndexedColumns(cols []string)
- func (h *HNSWIndex) SetPQEncoder(encoder *PQEncoder)
- func (h *HNSWIndex) SyncGPU(ids []int64, vectors []float32) error
- func (h *HNSWIndex) TrainPQ(dimensions, m, ksub, iterations int) error
- func (h *HNSWIndex) UnregisterReader()
- func (h *HNSWIndex) Warmup() int
- type HNSWIndexConfig
- type HNSWPluggableAdapter
- func (h *HNSWPluggableAdapter) Add(id uint64, vector []float32) error
- func (h *HNSWPluggableAdapter) AddBatch(ids []uint64, vectors [][]float32) error
- func (h *HNSWPluggableAdapter) AddByLocation(batchIdx, rowIdx int) error
- func (h *HNSWPluggableAdapter) Build() error
- func (h *HNSWPluggableAdapter) Close() error
- func (h *HNSWPluggableAdapter) Dimension() int
- func (h *HNSWPluggableAdapter) Len() int
- func (h *HNSWPluggableAdapter) Load(path string) error
- func (h *HNSWPluggableAdapter) NeedsBuild() bool
- func (h *HNSWPluggableAdapter) Save(path string) error
- func (h *HNSWPluggableAdapter) Search(query []float32, k int) ([]IndexSearchResult, error)
- func (h *HNSWPluggableAdapter) SearchBatch(queries [][]float32, k int) ([][]IndexSearchResult, error)
- func (h *HNSWPluggableAdapter) SearchVectors(query []float32, k int) []SearchResult
- func (h *HNSWPluggableAdapter) Size() int
- func (h *HNSWPluggableAdapter) Type() IndexType
- type HybridPipelineConfig
- type HybridQuery
- type HybridSearchConfig
- type HybridSearchPipeline
- type HybridSearchQuery
- type IPCBufferPool
- type IPCBufferPoolStats
- type IVFFlatConfig
- type IVFFlatIndex
- func (ivf *IVFFlatIndex) Add(id uint64, vector []float32) error
- func (ivf *IVFFlatIndex) AddBatch(ids []uint64, vectors [][]float32) error
- func (ivf *IVFFlatIndex) AddByLocation(batchIdx, rowIdx int) error
- func (ivf *IVFFlatIndex) Build() error
- func (ivf *IVFFlatIndex) Close() error
- func (ivf *IVFFlatIndex) Dimension() int
- func (ivf *IVFFlatIndex) Len() int
- func (ivf *IVFFlatIndex) Load(path string) error
- func (ivf *IVFFlatIndex) NeedsBuild() bool
- func (ivf *IVFFlatIndex) Save(path string) error
- func (ivf *IVFFlatIndex) Search(query []float32, k int) ([]IndexSearchResult, error)
- func (ivf *IVFFlatIndex) SearchBatch(queries [][]float32, k int) ([][]IndexSearchResult, error)
- func (ivf *IVFFlatIndex) SearchVectors(query []float32, k int) []SearchResult
- func (ivf *IVFFlatIndex) Size() int
- func (ivf *IVFFlatIndex) Type() IndexType
- type IndexConfig
- type IndexConstructor
- type IndexFactory
- type IndexJob
- type IndexJobQueue
- type IndexJobQueueConfig
- type IndexJobQueueStats
- type IndexSearchResult
- type IndexType
- type InvertedIndex
- func (idx *InvertedIndex) Add(term string, docID uint32)
- func (idx *InvertedIndex) AddBatch(terms []string, docID uint32)
- func (idx *InvertedIndex) Delete(term string, docID uint32)
- func (idx *InvertedIndex) Get(term string) *roaring.Bitmap
- func (idx *InvertedIndex) MemoryUsage() uint64
- func (idx *InvertedIndex) Stats() (int, uint64)
- type LeastConnectionsStrategy
- type LoadBalancerConfig
- type LoadBalancerStats
- type LoadBalancerStrategy
- type Location
- type LockFreeHNSW
- type LockFreeNode
- type MemoryAdvice
- type MemoryBackpressureController
- func (c *MemoryBackpressureController) Acquire(ctx context.Context) error
- func (c *MemoryBackpressureController) CheckPressure() PressureLevel
- func (c *MemoryBackpressureController) GetAcquireCount() uint64
- func (c *MemoryBackpressureController) GetHardLimit() uint64
- func (c *MemoryBackpressureController) GetPressureLevel() PressureLevel
- func (c *MemoryBackpressureController) GetRejectCount() uint64
- func (c *MemoryBackpressureController) GetSoftLimit() uint64
- func (c *MemoryBackpressureController) Release()
- func (c *MemoryBackpressureController) SetPressureLevel(level PressureLevel)
- type MerkleNode
- type MerkleTree
- type MeshStatusCache
- type MetaServer
- func (s *MetaServer) DoAction(action *flight.Action, stream flight.FlightService_DoActionServer) error
- func (s *MetaServer) DoExchange(stream flight.FlightService_DoExchangeServer) error
- func (s *MetaServer) DoGet(tkt *flight.Ticket, stream flight.FlightService_DoGetServer) error
- func (s *MetaServer) DoPut(stream flight.FlightService_DoPutServer) error
- func (s *MetaServer) GetFlightInfo(ctx context.Context, desc *flight.FlightDescriptor) (*flight.FlightInfo, error)
- func (s *MetaServer) ListFlights(c *flight.Criteria, stream flight.FlightService_ListFlightsServer) error
- type NUMAAllocator
- type NUMATopology
- type Namespace
- type NotFoundError
- type PQConfig
- type PQEncoder
- func (e *PQEncoder) ADCDistance(table [][]float32, codes []uint8) float32
- func (e *PQEncoder) CodeSize() int
- func (e *PQEncoder) ComputeDistanceTable(query []float32) [][]float32
- func (e *PQEncoder) Decode(codes []uint8) []float32
- func (e *PQEncoder) Dims() int
- func (e *PQEncoder) EnableSDC()
- func (e *PQEncoder) Encode(vec []float32) []uint8
- func (e *PQEncoder) EncodeInto(vec []float32, codes []uint8)
- func (e *PQEncoder) GetCodebook() [][][]float32
- func (e *PQEncoder) SDCDistance(codes1, codes2 []uint8) float32
- func (e *PQEncoder) SDCDistancePacked(a, b []float32) float32
- type ParserPoolStats
- type PartitionedRecords
- func (pr *PartitionedRecords) Append(batch arrow.RecordBatch, routingKey uint64)
- func (pr *PartitionedRecords) AppendToPartition(batch arrow.RecordBatch, partIdx int)
- func (pr *PartitionedRecords) AppendWithKey(batch arrow.RecordBatch, key uint64) int
- func (pr *PartitionedRecords) Clear()
- func (pr *PartitionedRecords) ForEach(fn func(batch arrow.RecordBatch, partition int) bool)
- func (pr *PartitionedRecords) ForEachInPartition(partIdx int, fn func(batch arrow.RecordBatch) bool)
- func (pr *PartitionedRecords) GetAll() []arrow.RecordBatch
- func (pr *PartitionedRecords) GetPartition(partIdx int) []arrow.RecordBatch
- func (pr *PartitionedRecords) NumPartitions() int
- func (pr *PartitionedRecords) PartitionBatches(partitionIdx int) int
- func (pr *PartitionedRecords) ReplaceAll(partIdx int, batches []arrow.RecordBatch)
- func (pr *PartitionedRecords) Stats() PartitionedRecordsStats
- func (pr *PartitionedRecords) TotalBatches() int
- type PartitionedRecordsConfig
- type PartitionedRecordsStats
- type Path
- type PeerReplicationResult
- type PeerReplicator
- func (r *PeerReplicator) AddPeer(id, address string) error
- func (r *PeerReplicator) GetCircuitBreaker(peerID string) *CircuitBreaker
- func (r *PeerReplicator) GetPeers() []*ReplicatorPeerInfo
- func (r *PeerReplicator) RemovePeer(id string)
- func (r *PeerReplicator) ReplicateAsync(ctx context.Context, dataset string, record arrow.Record) bool
- func (r *PeerReplicator) ReplicateRecord(ctx context.Context, dataset string, record arrow.Record) []PeerReplicationResult
- func (r *PeerReplicator) ReplicateWithQuorum(ctx context.Context, dataset string, record arrow.Record) []PeerReplicationResult
- func (r *PeerReplicator) Start() error
- func (r *PeerReplicator) Stats() ReplicatorStats
- func (r *PeerReplicator) Stop()
- type PerPResultPool
- type PerPResultPoolConfig
- type PerPResultPoolStats
- type PerPShardStats
- type PipelineResult
- type PipelineStage
- type PipelineStats
- type PluggableVectorIndex
- type PooledAllocator
- type PooledAllocatorStats
- type PooledBitmap
- type PooledFlightClient
- type PressureLevel
- type QuorumCalculator
- type QuorumConfig
- type QuorumManager
- func (qm *QuorumManager) ExecuteRead(ctx context.Context, peers []string, level ConsistencyLevel, readFn ReadFn) QuorumResult
- func (qm *QuorumManager) ExecuteWrite(ctx context.Context, peers []string, level ConsistencyLevel, writeFn WriteFn) QuorumResult
- func (qm *QuorumManager) GetDefaultReadLevel() ConsistencyLevel
- func (qm *QuorumManager) GetDefaultWriteLevel() ConsistencyLevel
- type QuorumResult
- type RankedResult
- type ReadFn
- type RecordEvictionManager
- func (m *RecordEvictionManager) Count() int
- func (m *RecordEvictionManager) EvictExpired() []uintptr
- func (m *RecordEvictionManager) Get(rec arrow.RecordBatch) *RecordMetadata
- func (m *RecordEvictionManager) GetEvictionCount() int64
- func (m *RecordEvictionManager) Register(rec arrow.RecordBatch, ttl time.Duration)
- func (m *RecordEvictionManager) SelectLFUVictims(count int) []uintptr
- func (m *RecordEvictionManager) SelectLRUVictims(count int) []uintptr
- func (m *RecordEvictionManager) Unregister(rec arrow.RecordBatch)
- func (m *RecordEvictionManager) UpdateRecordMetadataGauge()
- type RecordMetadata
- type RecordSizeCache
- type RecordWriterPoolConfig
- type ReplicaInfo
- type ReplicaLoadBalancer
- func (lb *ReplicaLoadBalancer) AddReplica(id, address string)
- func (lb *ReplicaLoadBalancer) GetHealthyReplicas() []string
- func (lb *ReplicaLoadBalancer) GetReplicas() []string
- func (lb *ReplicaLoadBalancer) GetStats() LoadBalancerStats
- func (lb *ReplicaLoadBalancer) MarkHealthy(id string)
- func (lb *ReplicaLoadBalancer) MarkUnhealthy(id string)
- func (lb *ReplicaLoadBalancer) RemoveReplica(id string)
- func (lb *ReplicaLoadBalancer) SelectReplica(ctx context.Context) (string, error)
- type ReplicationError
- type ReplicationTask
- type ReplicatorConfig
- type ReplicatorPeerInfo
- type ReplicatorStats
- type RequestSemaphore
- type RequestSemaphoreConfig
- type RoundRobinStrategy
- type RowPosition
- type S3Backend
- func (b *S3Backend) DeleteSnapshot(ctx context.Context, name string) error
- func (b *S3Backend) GetHTTPClient() *http.Client
- func (b *S3Backend) GetHTTPTransport() *http.Transport
- func (b *S3Backend) ListSnapshots(ctx context.Context) ([]string, error)
- func (b *S3Backend) ReadSnapshot(ctx context.Context, name string) (io.ReadCloser, error)
- func (b *S3Backend) WriteSnapshot(ctx context.Context, name string, data []byte) error
- type S3BackendConfig
- type S3Error
- type SQ8Config
- type SQ8Encoder
- func (e *SQ8Encoder) Decode(quantized []uint8) []float32
- func (e *SQ8Encoder) DecodeInto(quantized []uint8, dst []float32)
- func (e *SQ8Encoder) Dims() int
- func (e *SQ8Encoder) Encode(vec []float32) []uint8
- func (e *SQ8Encoder) EncodeInto(vec []float32, dst []uint8)
- func (e *SQ8Encoder) GetBounds() (minVals, maxVals []float32)
- type SchemaCompatibility
- type SchemaEvolutionManager
- func (m *SchemaEvolutionManager) AddColumn(name string, dtype arrow.DataType) error
- func (m *SchemaEvolutionManager) DropColumn(name string) error
- func (m *SchemaEvolutionManager) GetColumnCount() int
- func (m *SchemaEvolutionManager) GetCurrentSchema() *arrow.Schema
- func (m *SchemaEvolutionManager) GetCurrentVersion() uint64
- func (m *SchemaEvolutionManager) GetSchemaAtVersion(version uint64) *arrow.Schema
- func (m *SchemaEvolutionManager) GetVersionCount() int
- func (m *SchemaEvolutionManager) IsColumnAvailable(name string, version uint64) bool
- func (m *SchemaEvolutionManager) IsColumnDropped(name string) bool
- type SchemaVersion
- type SearchArena
- func (a *SearchArena) Alloc(size int) []byte
- func (a *SearchArena) AllocFloat32Slice(count int) []float32
- func (a *SearchArena) AllocVectorIDSlice(count int) []VectorID
- func (a *SearchArena) Cap() int
- func (a *SearchArena) Offset() int
- func (a *SearchArena) Remaining() int
- func (a *SearchArena) Reset()
- type SearchResult
- func CombineHybridResults(vectorResults, bm25Results []SearchResult, alpha float32, k int) []SearchResult
- func CombineHybridResultsRRF(vectorResults, bm25Results []SearchResult, rrfK, limit int) []SearchResult
- func FuseCascade(exact map[VectorID]struct{}, keyword, vector []SearchResult, limit int) []SearchResult
- func FuseLinear(dense, sparse []SearchResult, alpha float32, limit int) []SearchResult
- func FuseRRF(dense, sparse []SearchResult, k, limit int) []SearchResult
- func HybridSearch(ctx context.Context, s *VectorStore, name string, query []float32, k int, ...) ([]SearchResult, error)
- func ReciprocalRankFusion(dense, sparse []SearchResult, k, limit int) []SearchResult
- func SearchHybrid(ctx context.Context, s *VectorStore, name string, query []float32, ...) ([]SearchResult, error)
- type SemaphoreStats
- type ShardStat
- type ShardStats
- type ShardedDataset
- func (sd *ShardedDataset) Append(batch arrow.RecordBatch, routingKey uint64)
- func (sd *ShardedDataset) AppendToShard(batch arrow.RecordBatch, shardIdx int)
- func (sd *ShardedDataset) Clear()
- func (sd *ShardedDataset) ForEach(fn func(batch arrow.RecordBatch, shard int) bool)
- func (sd *ShardedDataset) ForEachInShard(shardIdx int, fn func(batch arrow.RecordBatch) bool)
- func (sd *ShardedDataset) GetAllRecords() []arrow.RecordBatch
- func (sd *ShardedDataset) GetShardRecords(shardIdx int) []arrow.RecordBatch
- func (sd *ShardedDataset) IncrementVersion() int64
- func (sd *ShardedDataset) Index() *HNSWIndex
- func (sd *ShardedDataset) LastAccess() time.Time
- func (sd *ShardedDataset) Name() string
- func (sd *ShardedDataset) NumShards() int
- func (sd *ShardedDataset) ReplaceShardRecords(shardIdx int, batches []arrow.RecordBatch)
- func (sd *ShardedDataset) SetIndex(idx *HNSWIndex)
- func (sd *ShardedDataset) SetLastAccess(t time.Time)
- func (sd *ShardedDataset) ShardRecordCount(shardIdx int) int
- func (sd *ShardedDataset) Stats() ShardedDatasetStats
- func (sd *ShardedDataset) ToLegacyRecords() []arrow.RecordBatch
- func (sd *ShardedDataset) TotalRecords() int
- func (sd *ShardedDataset) Version() int64
- type ShardedDatasetConfig
- type ShardedDatasetStats
- type ShardedHNSW
- func (s *ShardedHNSW) AddByLocation(batchIdx, rowIdx int) (uint32, error)
- func (s *ShardedHNSW) AddByRecord(rec arrow.RecordBatch, rowIdx, batchIdx int) (uint32, error)
- func (s *ShardedHNSW) AddSafe(rec arrow.RecordBatch, rowIdx, batchIdx int) (VectorID, error)
- func (s *ShardedHNSW) Close() error
- func (s *ShardedHNSW) GetDimension() uint32
- func (s *ShardedHNSW) GetLocation(id VectorID) (Location, bool)
- func (s *ShardedHNSW) GetShardForID(id VectorID) int
- func (s *ShardedHNSW) Len() int
- func (s *ShardedHNSW) SearchByID(id VectorID, k int) []VectorID
- func (s *ShardedHNSW) SearchVectors(query []float32, k int, filters []Filter) []SearchResult
- func (s *ShardedHNSW) SearchVectorsWithBitmap(query []float32, k int, filter *Bitset) []SearchResult
- func (s *ShardedHNSW) SetIndexedColumns(cols []string)
- func (s *ShardedHNSW) ShardStats() []ShardStat
- func (idx *ShardedHNSW) Warmup() int
- type ShardedHNSWConfig
- type ShardedIndexChannel
- func (sic *ShardedIndexChannel) Close()
- func (sic *ShardedIndexChannel) GetShardChannel(shardID int) chan IndexJob
- func (sic *ShardedIndexChannel) GetShardForDataset(datasetName string) int
- func (sic *ShardedIndexChannel) NumShards() int
- func (sic *ShardedIndexChannel) Send(job IndexJob) bool
- func (sic *ShardedIndexChannel) Stats() []ShardStats
- func (sic *ShardedIndexChannel) TrySend(job IndexJob) bool
- type ShardedInvertedIndex
- type ShardedMap
- func (sm *ShardedMap) Delete(name string)
- func (sm *ShardedMap) Get(name string) (*Dataset, bool)
- func (sm *ShardedMap) GetOrCreate(name string, create func() *Dataset) *Dataset
- func (sm *ShardedMap) Keys() []string
- func (sm *ShardedMap) Len() int
- func (sm *ShardedMap) Range(fn func(name string, ds *Dataset) bool)
- func (sm *ShardedMap) RangeWithLock(fn func(name string, ds *Dataset, deleteFn func()))
- func (sm *ShardedMap) Set(name string, ds *Dataset)
- func (sm *ShardedMap) WithLock(name string, fn func(data map[string]*Dataset))
- func (sm *ShardedMap) WithRLock(name string, fn func(data map[string]*Dataset))
- type ShardedRWMutex
- func (sm *ShardedRWMutex) Lock(key uint64)
- func (sm *ShardedRWMutex) LockAll()
- func (sm *ShardedRWMutex) LockShard(shard int)
- func (sm *ShardedRWMutex) NumShards() int
- func (sm *ShardedRWMutex) RLock(key uint64)
- func (sm *ShardedRWMutex) RLockAll()
- func (sm *ShardedRWMutex) RLockShard(shard int)
- func (sm *ShardedRWMutex) RUnlock(key uint64)
- func (sm *ShardedRWMutex) RUnlockAll()
- func (sm *ShardedRWMutex) RUnlockShard(shard int)
- func (sm *ShardedRWMutex) ShardFor(key uint64) int
- func (sm *ShardedRWMutex) Stats() ShardedRWMutexStats
- func (sm *ShardedRWMutex) Unlock(key uint64)
- func (sm *ShardedRWMutex) UnlockAll()
- func (sm *ShardedRWMutex) UnlockShard(shard int)
- type ShardedRWMutexConfig
- type ShardedRWMutexStats
- type ShutdownError
- type SnapshotBackend
- type SplitBrainConfig
- type SplitBrainDetector
- func (d *SplitBrainDetector) CheckQuorum() bool
- func (d *SplitBrainDetector) GetHealthyPeerCount() int
- func (d *SplitBrainDetector) GetHeartbeatCount() uint64
- func (d *SplitBrainDetector) GetPartitionCount() uint64
- func (d *SplitBrainDetector) GetPeers() []string
- func (d *SplitBrainDetector) IsFenced() bool
- func (d *SplitBrainDetector) IsPeerHealthy(id string) bool
- func (d *SplitBrainDetector) RecordHeartbeat(id string)
- func (d *SplitBrainDetector) RegisterPeer(id string)
- func (d *SplitBrainDetector) UnregisterPeer(id string)
- type StdWAL
- type StorageConfig
- type StrategyType
- type SyncState
- type TCPNoDelayListener
- type TicketQuery
- type TimestampMap
- type VectorClock
- func (vc *VectorClock) Compare(other *VectorClock) ClockComparison
- func (vc *VectorClock) Copy() *VectorClock
- func (vc *VectorClock) Deserialize(data []byte) error
- func (vc *VectorClock) Get(nodeID string) uint64
- func (vc *VectorClock) Increment()
- func (vc *VectorClock) Merge(other *VectorClock)
- func (vc *VectorClock) NodeID() string
- func (vc *VectorClock) Serialize() []byte
- func (vc *VectorClock) Set(nodeID string, value uint64)
- type VectorID
- type VectorIndex
- type VectorMetric
- type VectorRecord
- type VectorSearchRequest
- type VectorSearchResponse
- type VectorStore
- func NewVectorStore(mem memory.Allocator, logger *zap.Logger, maxMemory, maxWALSize int64, ...) *VectorStore
- func NewVectorStoreWithCompaction(mem memory.Allocator, logger *zap.Logger, maxMemoryBytes int64, ...) *VectorStore
- func NewVectorStoreWithHybridConfig(mem memory.Allocator, logger *zap.Logger, cfg HybridSearchConfig) (*VectorStore, error)
- func NewVectorStoreWithPipeline(mem memory.Allocator, logger *zap.Logger, workers, bufferSize int) *VectorStore
- func NewVectorStoreWithPipelineThreshold(mem memory.Allocator, logger *zap.Logger, workers, bufferSize, threshold int) *VectorStore
- func (s *VectorStore) ApplyDelta(name string, rec arrow.RecordBatch, seq uint64, ts int64) error
- func (s *VectorStore) Close() error
- func (vs *VectorStore) CompactDataset(name string) error
- func (vs *VectorStore) CreateNamespace(name string) error
- func (vs *VectorStore) DeleteNamespace(name string) error
- func (s *VectorStore) DoAction(action *flight.Action, stream flight.FlightService_DoActionServer) error
- func (s *VectorStore) DoExchange(stream flight.FlightService_DoExchangeServer) error
- func (s *VectorStore) DoGet(tkt *flight.Ticket, stream flight.FlightService_DoGetServer) error
- func (s *VectorStore) DoPut(stream flight.FlightService_DoPutServer) error
- func (s *VectorStore) GetAutoCompactionTriggerCount() int64
- func (s *VectorStore) GetAutoShardingConfig() AutoShardingConfig
- func (s *VectorStore) GetBM25Index() *BM25InvertedIndex
- func (vs *VectorStore) GetCompactionConfig() CompactionConfig
- func (vs *VectorStore) GetCompactionStats() CompactionStats
- func (s *VectorStore) GetDataset(name string) (*Dataset, error)
- func (s *VectorStore) GetDoGetPipelinePool() *DoGetPipelinePool
- func (s *VectorStore) GetFlightInfo(ctx context.Context, desc *flight.FlightDescriptor) (*flight.FlightInfo, error)
- func (s *VectorStore) GetHybridSearchConfig() HybridSearchConfig
- func (s *VectorStore) GetIndexedColumns() []string
- func (vs *VectorStore) GetNamespace(name string) *Namespace
- func (vs *VectorStore) GetNamespaceDatasetCount(name string) int
- func (s *VectorStore) GetPipelineStats() PipelineStats
- func (s *VectorStore) GetPipelineThreshold() int
- func (s *VectorStore) GetSchema(ctx context.Context, desc *flight.FlightDescriptor) (*flight.SchemaResult, error)
- func (vs *VectorStore) GetTotalNamespaceCount() int
- func (s *VectorStore) GetWALQueueDepth() (int, int)
- func (s *VectorStore) HybridSearch(ctx context.Context, name string, query []float32, k int, ...) ([]SearchResult, error)
- func (s *VectorStore) IndexRecordColumns(datasetName string, rec arrow.RecordBatch, batchIdx int)
- func (s *VectorStore) InitPersistence(cfg StorageConfig) error
- func (vs *VectorStore) IsCompactionRunning() bool
- func (s *VectorStore) IterateDatasets(fn func(ds *Dataset))
- func (s *VectorStore) ListFlights(c *flight.Criteria, stream flight.FlightService_ListFlightsServer) error
- func (vs *VectorStore) ListNamespaces() []string
- func (s *VectorStore) MapInternalToUserIDs(ds *Dataset, results []SearchResult) []SearchResult
- func (s *VectorStore) MerkleRoot(name string) [32]byte
- func (vs *VectorStore) NamespaceExists(name string) bool
- func (s *VectorStore) SearchHybrid(ctx context.Context, name string, query []float32, textQuery string, k int, ...) ([]SearchResult, error)
- func (s *VectorStore) SetAutoShardingConfig(cfg AutoShardingConfig)
- func (s *VectorStore) SetIndexedColumns(cols []string)
- func (s *VectorStore) SetMesh(m *mesh.Gossip)
- func (s *VectorStore) Shutdown(ctx context.Context) error
- func (s *VectorStore) Snapshot() error
- func (s *VectorStore) StartEvictionTicker(d time.Duration)
- func (s *VectorStore) StartMetricsTicker(d time.Duration)
- func (s *VectorStore) StartWALCheckTicker(d time.Duration)
- func (s *VectorStore) StoreRecordBatch(ctx context.Context, name string, rec arrow.RecordBatch) error
- func (s *VectorStore) TruncateWAL() error
- func (s *VectorStore) UpdateConfig(maxMemory, maxWALSize int64, snapshotInterval time.Duration)
- func (s *VectorStore) Warmup() WarmupStats
- type VectorizedFilter
- type VersionedData
- type WAL
- type WALBackend
- type WALBatcher
- func (w *WALBatcher) AsyncFsyncStats() *AsyncFsyncerStats
- func (w *WALBatcher) FlushError() error
- func (w *WALBatcher) GetCurrentInterval() time.Duration
- func (w *WALBatcher) IsAdaptiveEnabled() bool
- func (w *WALBatcher) IsAsyncFsyncEnabled() bool
- func (w *WALBatcher) QueueCapacity() int
- func (w *WALBatcher) QueueDepth() int
- func (w *WALBatcher) Start() error
- func (w *WALBatcher) Stop() error
- func (w *WALBatcher) Write(rec arrow.RecordBatch, name string, seq uint64, ts int64) error
- type WALBatcherConfig
- type WALEntry
- type WALError
- type WALIterator
- type WarmupStats
- type WeightedStrategy
- type WriteFn
- type WriteRateTracker
- type ZeroAllocTicketParser
- type ZeroAllocVectorSearchParser
- type ZeroCopyBufferConfig
- type ZeroCopyPayload
Constants ¶
const ( MaxConnections = 16 MaxLayers = 16 EfConstruction = 128 ML = 0.36 // 1 / ln(MaxConnections) )
const ( MerkleFanout = 16 MerkleDepth = 4 )
const ( // DefaultMaxIdleConns is the default maximum number of idle connections across all hosts DefaultMaxIdleConns = 100 // DefaultMaxIdleConnsPerHost is the default maximum number of idle connections per host DefaultMaxIdleConnsPerHost = 100 // DefaultIdleConnTimeout is the default timeout for idle connections DefaultIdleConnTimeout = 90 * time.Second )
Connection pool default settings for S3 backend
const DefaultArenaSize = 64 * 1024
DefaultArenaSize is the default capacity for pooled arenas (64KB)
Variables ¶
var ErrCircuitOpen = errors.New("circuit breaker is open")
ErrCircuitOpen is returned when circuit breaker rejects request
var (
ErrNoHealthyReplicas = errors.New("no healthy replicas available")
)
Functions ¶
func AdviseMemory ¶
func AdviseMemory(ptr unsafe.Pointer, size uintptr, advice MemoryAdvice) error
AdviseMemory provides hints to the kernel about the memory usage pattern. This is a no-op on non-Unix systems.
func AdviseRecord ¶
func AdviseRecord(rec arrow.RecordBatch, advice MemoryAdvice)
AdviseRecord provides memory hints for all buffers in an Arrow RecordBatch.
func BuildNamespacedPath ¶
BuildNamespacedPath combines namespace and dataset into a path.
func CachedRecordSize ¶
func CachedRecordSize(rec arrow.RecordBatch) int64
CachedRecordSize uses global cache
func EnsureTimestampZeroCopy ¶
func EnsureTimestampZeroCopy(mem memory.Allocator, rec arrow.RecordBatch) (arrow.RecordBatch, error)
EnsureTimestampZeroCopy ensures the record has a timestamp column, adding one if missing (zero-copy optimized)
func EstimateRecordSize ¶
func EstimateRecordSize(rec arrow.RecordBatch) int64
EstimateRecordSize is a heuristic or fast path. For now, we alias to calculateRecordSize as it's reasonably fast (O(cols)).
func FastPathEqual ¶
FastPathEqual applies equality filter directly without Arrow Compute overhead. Returns a boolean mask array where true indicates matching values.
func FastPathGreater ¶
FastPathGreater applies greater-than filter directly without Arrow Compute overhead.
func FastPathGreaterEqual ¶
FastPathGreaterEqual applies greater-than-or-equal filter directly without Arrow Compute overhead.
func FastPathLess ¶
FastPathLess applies less-than filter directly without Arrow Compute overhead.
func FastPathLessEqual ¶
FastPathLessEqual applies less-than-or-equal filter directly without Arrow Compute overhead.
func FastPathNotEqual ¶
FastPathNotEqual applies not-equal filter directly without Arrow Compute overhead. Returns a boolean mask array where true indicates non-matching values.
func GetCurrentCPU ¶
func GetCurrentCPU() int
GetCurrentCPU returns the current CPU number or -1 if unavailable. This is Linux-specific using getcpu syscall.
func GetJSONEncoder ¶
GetJSONEncoder retrieves a pooled encoder and buffer.
func GetNumaNode ¶
GetNumaNode returns the NUMA node (memory bank) where the page containing the given pointer resides. On non-Linux systems, returns -1 and nil error.
func GetTCPNoDelayConnectionsTotal ¶
func GetTCPNoDelayConnectionsTotal() int64
GetTCPNoDelayConnectionsTotal returns the total number of connections with TCP_NODELAY set. This is useful for testing and monitoring.
func HammingDistanceBatch ¶
HammingDistanceBatch computes Hamming distances from query to multiple vectors. Optimized for batch processing with better cache utilization.
func HammingDistancePOPCNT ¶
HammingDistancePOPCNT computes Hamming distance using POPCNT instruction. Returns the number of differing bits between two binary vectors. Uses math/bits.OnesCount64 which compiles to POPCNT on supported CPUs.
func IncrementRecordAccess ¶
func IncrementRecordAccess()
IncrementRecordAccess should be called when a record is accessed (for metrics)
func IsFastPathSupported ¶
func IsFastPathSupported(dt arrow.DataType, op FilterOperator) bool
IsFastPathSupported returns true if the data type and operator can use the fast path that bypasses Arrow Compute overhead.
func IsNotFoundError ¶
IsNotFoundError checks if an error is a NotFoundError
func LockMemory ¶
LockMemory pins the memory region in RAM, preventing it from being swapped out.
func MatchesFilters ¶
MatchesFilters checks if a specific row satisfies the filters.
func MustToGRPCStatus ¶
MustToGRPCStatus is like ToGRPCStatus but panics if conversion fails. Useful for testing.
func NewConfigError ¶
NewConfigError creates a configuration error with timestamp.
func NewDimensionMismatchError ¶
NewDimensionMismatchError creates a dimension mismatch error.
func NewInternalError ¶
NewInternalError creates an internal error.
func NewInvalidArgumentError ¶
NewInvalidArgumentError creates an invalid argument error.
func NewNotFoundError ¶
NewNotFoundError creates a not found error.
func NewPersistenceError ¶
NewPersistenceError creates a persistence error.
func NewReplicationError ¶
NewReplicationError creates a replication error with timestamp.
func NewResourceExhaustedError ¶
NewResourceExhaustedError creates a resource exhausted error.
func NewS3Error ¶
NewS3Error creates an S3 error with timestamp.
func NewSchemaMismatchError ¶
NewSchemaMismatchError creates a schema mismatch error.
func NewShutdownError ¶
NewShutdownError creates a shutdown error with timestamp.
func NewUnavailableError ¶
NewUnavailableError creates an unavailable error.
func NewWALError ¶
NewWALError creates a WAL error with timestamp.
func PackBytesToFloat32s ¶
PackBytesToFloat32s packs uint8 codes into float32 slice for HNSW storage. Packs 4 bytes into 1 float32.
func ParseNamespacedPath ¶
ParseNamespacedPath parses a path into namespace and dataset components. Format: "namespace/dataset" or "dataset" (uses "default" namespace) Examples:
- "tenant1/mydata" -> ("tenant1", "mydata")
- "mydata" -> ("default", "mydata")
- "org/project/data" -> ("org", "project/data")
func PinThreadToCore ¶
PinThreadToCore pins the current goroutine's thread to a specific CPU core. This is most effective when combined with runtime.LockOSThread(). On non-Linux systems, this returns nil (no-op).
func PinThreadToNode ¶
PinThreadToNode pins the current goroutine's thread to the set of CPUs associated with the given NUMA node. On non-Linux systems, this returns nil (no-op).
func PinToNUMANode ¶
func PinToNUMANode(topo *NUMATopology, nodeID int) error
PinToNUMANode pins the current goroutine to CPUs on the specified NUMA node. This ensures that the goroutine runs on CPUs local to the NUMA node, reducing remote memory access latency.
func PutArena ¶
func PutArena(arena *SearchArena)
PutArena returns a SearchArena to the global pool for reuse. The arena is automatically reset before being pooled.
func PutJSONEncoder ¶
PutJSONEncoder returns the encoder and buffer to the pool.
func ResetGlobalSizeCache ¶
func ResetGlobalSizeCache()
ResetGlobalSizeCache resets the global cache
func RetainRecordBatch ¶
func RetainRecordBatch(rec arrow.RecordBatch) arrow.RecordBatch
RetainRecordBatch simply retains the record batch for zero-copy access. Caller must call Release() when done.
func SQ8CosineDistance ¶
func SQ8CosineDistance(q1, q2 []uint8, enc *SQ8Encoder) float32
SQ8CosineDistance computes cosine distance between quantized vectors.
func SQ8DistanceFast ¶
SQ8DistanceFast computes squared L2 distance directly in quantized space. This is faster but returns integer distance (not actual Euclidean). Useful for ranking (ordering is preserved) but not exact distances.
func SQ8EuclideanDistance ¶
func SQ8EuclideanDistance(q1, q2 []uint8, enc *SQ8Encoder) float32
SQ8EuclideanDistance computes Euclidean distance between quantized vectors. It decodes to float32 for accurate distance computation.
func ShouldUsePipeline ¶
func ToGRPCStatus ¶
ToGRPCStatus converts a domain error to a gRPC status error with appropriate code. This provides specific Flight status codes for client-side debugging.
func UnlockMemory ¶
UnlockMemory unpins a previously locked memory region.
func UnmarshalZeroCopy ¶
func UnmarshalZeroCopy(payload *ZeroCopyPayload, mem memory.Allocator) (arrow.RecordBatch, error)
UnmarshalZeroCopy reconstructs a RecordBatch from a ZeroCopyPayload. This is currently a simplified implementation that uses the provided buffers to rebuild ArrayData objects.
func UnpackFloat32sToBytes ¶
UnpackFloat32sToBytes reconstructs uint8 codes from packed float32 slice.
func ZeroCopyRecordBatch ¶
func ZeroCopyRecordBatch(mem memory.Allocator, rec arrow.RecordBatch, deleted *Bitset) (arrow.RecordBatch, error)
ZeroCopyRecordBatch creates a zero-copy view of a record batch with tombstone filtering. Uses Arrow's compute kernels for efficient slicing without deep copies.
Types ¶
type AdaptiveIndex ¶
type AdaptiveIndex struct {
// contains filtered or unexported fields
}
AdaptiveIndex automatically switches between BruteForce and HNSW based on size.
func NewAdaptiveIndex ¶
func NewAdaptiveIndex(ds *Dataset, cfg AdaptiveIndexConfig) *AdaptiveIndex
NewAdaptiveIndex creates an adaptive index starting with BruteForce.
func (*AdaptiveIndex) AddByLocation ¶
func (a *AdaptiveIndex) AddByLocation(batchIdx, rowIdx int) error
AddByLocation adds a vector and potentially triggers migration to HNSW.
func (*AdaptiveIndex) GetIndexType ¶
func (a *AdaptiveIndex) GetIndexType() string
GetIndexType returns the current index type.
func (*AdaptiveIndex) GetMigrationCount ¶
func (a *AdaptiveIndex) GetMigrationCount() int64
GetMigrationCount returns the number of times migration occurred.
func (*AdaptiveIndex) Len ¶
func (a *AdaptiveIndex) Len() int
Len returns the number of indexed vectors.
func (*AdaptiveIndex) SearchVectors ¶
func (a *AdaptiveIndex) SearchVectors(query []float32, k int, filters []Filter) ([]SearchResult, error)
SearchVectors delegates to the active index.
type AdaptiveIndexConfig ¶
type AdaptiveIndexConfig struct {
// Threshold is the number of vectors at which to switch from BruteForce to HNSW.
Threshold int
// Enabled controls whether adaptive indexing is active.
Enabled bool
}
AdaptiveIndexConfig controls automatic switching between BruteForce and HNSW.
func DefaultAdaptiveIndexConfig ¶
func DefaultAdaptiveIndexConfig() AdaptiveIndexConfig
DefaultAdaptiveIndexConfig returns sensible defaults for adaptive indexing.
func (AdaptiveIndexConfig) Validate ¶
func (c AdaptiveIndexConfig) Validate() error
Validate checks that the configuration is valid.
type AdaptiveIntervalCalculator ¶
type AdaptiveIntervalCalculator struct {
// contains filtered or unexported fields
}
AdaptiveIntervalCalculator computes optimal flush intervals based on load
func NewAdaptiveIntervalCalculator ¶
func NewAdaptiveIntervalCalculator(cfg AdaptiveWALConfig) *AdaptiveIntervalCalculator
NewAdaptiveIntervalCalculator creates a new calculator with the given config
func (*AdaptiveIntervalCalculator) CalculateInterval ¶
func (c *AdaptiveIntervalCalculator) CalculateInterval(writeRate float64) time.Duration
CalculateInterval returns the optimal flush interval for the given write rate
type AdaptiveWALConfig ¶
type AdaptiveWALConfig struct {
MinInterval time.Duration // Minimum flush interval under high load (e.g., 1ms)
MaxInterval time.Duration // Maximum flush interval under low load (e.g., 100ms)
TargetLatency time.Duration // Target p99 latency for writes (e.g., 5ms)
Enabled bool // Whether adaptive batching is enabled
}
AdaptiveWALConfig configures adaptive batching behavior
func NewAdaptiveWALConfig ¶
func NewAdaptiveWALConfig() AdaptiveWALConfig
NewAdaptiveWALConfig returns sensible defaults for adaptive batching
func (*AdaptiveWALConfig) Validate ¶
func (c *AdaptiveWALConfig) Validate() error
Validate checks the configuration for consistency
type AllocatorAwareCache ¶
type AllocatorAwareCache struct {
// contains filtered or unexported fields
}
AllocatorAwareCache provides cached access to Arrow RecordBatches with zero-copy buffer retrieval
func NewAllocatorAwareCache ¶
func NewAllocatorAwareCache(alloc memory.Allocator, maxSize int) *AllocatorAwareCache
NewAllocatorAwareCache creates a new cache with zero-copy support
func (*AllocatorAwareCache) Get ¶
func (c *AllocatorAwareCache) Get(key string) (arrow.RecordBatch, bool)
Get retrieves a RecordBatch from cache Returns a retained reference - caller must Release()
func (*AllocatorAwareCache) GetBufferDirect ¶
func (c *AllocatorAwareCache) GetBufferDirect(key string, colIdx, bufIdx int) []byte
GetBufferDirect retrieves underlying buffer bytes without copying colIdx: column index, bufIdx: buffer index (0=validity, 1=data)
func (*AllocatorAwareCache) Put ¶
func (c *AllocatorAwareCache) Put(key string, rec arrow.RecordBatch)
Put stores a RecordBatch in the cache with proper reference counting
type ArrayStructure ¶
type ArrayStructure struct {
Length int
NullCount int
Offset int
Buffers int // Number of buffers used by this node specifically
Children []ArrayStructure // Recursive children
}
ArrayStructure captures the metadata needed to rebuild an ArrayData hierarchy.
type AsyncFsyncConfig ¶
type AsyncFsyncConfig struct {
DirtyThreshold int64
MaxPendingFsyncs int
FsyncTimeout time.Duration
Enabled bool
FallbackToSync bool
}
AsyncFsyncConfig configures asynchronous fsync behavior for WAL
func DefaultAsyncFsyncConfig ¶
func DefaultAsyncFsyncConfig() AsyncFsyncConfig
DefaultAsyncFsyncConfig returns sensible defaults
func (AsyncFsyncConfig) IsAsyncEnabled ¶
func (c AsyncFsyncConfig) IsAsyncEnabled() bool
IsAsyncEnabled returns whether async fsync is enabled
func (AsyncFsyncConfig) ShouldFsync ¶
func (c AsyncFsyncConfig) ShouldFsync(dirtyBytes int64) bool
ShouldFsync returns true if dirty bytes exceed threshold
func (AsyncFsyncConfig) Validate ¶
func (c AsyncFsyncConfig) Validate() error
Validate checks if the configuration is valid
type AsyncFsyncer ¶
type AsyncFsyncer struct {
// contains filtered or unexported fields
}
AsyncFsyncer manages asynchronous fsync operations
func NewAsyncFsyncer ¶
func NewAsyncFsyncer(cfg AsyncFsyncConfig) *AsyncFsyncer
NewAsyncFsyncer creates a new async fsyncer
func (*AsyncFsyncer) AddDirtyBytes ¶
func (a *AsyncFsyncer) AddDirtyBytes(n int64)
AddDirtyBytes adds to the dirty byte counter
func (*AsyncFsyncer) Config ¶
func (a *AsyncFsyncer) Config() AsyncFsyncConfig
Config returns the configuration
func (*AsyncFsyncer) DirtyBytes ¶
func (a *AsyncFsyncer) DirtyBytes() int64
DirtyBytes returns current dirty byte count
func (*AsyncFsyncer) ForceFsync ¶
func (a *AsyncFsyncer) ForceFsync() error
ForceFsync performs a synchronous fsync (fallback)
func (*AsyncFsyncer) IsRunning ¶
func (a *AsyncFsyncer) IsRunning() bool
IsRunning returns whether the fsyncer is active
func (*AsyncFsyncer) RequestFsyncIfNeeded ¶
func (a *AsyncFsyncer) RequestFsyncIfNeeded() bool
RequestFsyncIfNeeded requests fsync if dirty threshold exceeded
func (*AsyncFsyncer) Start ¶
func (a *AsyncFsyncer) Start(f *os.File) error
Start begins the async fsync goroutine
func (*AsyncFsyncer) Stats ¶
func (a *AsyncFsyncer) Stats() AsyncFsyncerStats
Stats returns current statistics
func (*AsyncFsyncer) Stop ¶
func (a *AsyncFsyncer) Stop() error
Stop gracefully shuts down the fsyncer
func (*AsyncFsyncer) WaitForPendingFsyncs ¶
func (a *AsyncFsyncer) WaitForPendingFsyncs()
WaitForPendingFsyncs waits for all pending fsyncs to complete
type AsyncFsyncerStats ¶
type AsyncFsyncerStats struct {
TotalRequests int64
CompletedFsyncs int64
FailedFsyncs int64
SyncFallbacks int64
QueueFullDrops int64
}
AsyncFsyncerStats holds statistics for the async fsyncer
type AutoShardingConfig ¶
type AutoShardingConfig struct {
// Enabled determines if auto-sharding is active.
Enabled bool
// ShardThreshold is the number of vectors at which to trigger sharding.
ShardThreshold int
// ShardCount is the target number of shards to create (defaults to NumCPU).
ShardCount int
}
AutoShardingConfig configures the auto-sharding behavior.
func DefaultAutoShardingConfig ¶
func DefaultAutoShardingConfig() AutoShardingConfig
DefaultAutoShardingConfig returns a standard configuration.
type AutoShardingIndex ¶
type AutoShardingIndex struct {
// contains filtered or unexported fields
}
AutoShardingIndex wraps a VectorIndex and transparently upgrades it to a ShardedHNSW when the dataset grows beyond a threshold.
func NewAutoShardingIndex ¶
func NewAutoShardingIndex(ds *Dataset, config AutoShardingConfig) *AutoShardingIndex
NewAutoShardingIndex creates a new auto-sharding index. Initially, it uses a standard HNSWIndex.
func (*AutoShardingIndex) AddByLocation ¶
func (a *AutoShardingIndex) AddByLocation(batchIdx, rowIdx int) (uint32, error)
AddByLocation adds a vector to the index.
func (*AutoShardingIndex) AddByRecord ¶
func (a *AutoShardingIndex) AddByRecord(rec arrow.RecordBatch, rowIdx, batchIdx int) (uint32, error)
AddByRecord adds a vector from a record batch.
func (*AutoShardingIndex) Close ¶
func (idx *AutoShardingIndex) Close() error
func (*AutoShardingIndex) GetDimension ¶
func (a *AutoShardingIndex) GetDimension() uint32
GetDimension implements VectorIndex.
func (*AutoShardingIndex) GetLocation ¶
func (idx *AutoShardingIndex) GetLocation(id VectorID) (Location, bool)
GetLocation retrieves the storage location for a given vector ID.
func (*AutoShardingIndex) SearchVectors ¶
func (a *AutoShardingIndex) SearchVectors(query []float32, k int, filters []Filter) []SearchResult
SearchVectors implements VectorIndex.
func (*AutoShardingIndex) SearchVectorsWithBitmap ¶
func (a *AutoShardingIndex) SearchVectorsWithBitmap(query []float32, k int, filter *Bitset) []SearchResult
SearchVectorsWithBitmap implements VectorIndex.
func (*AutoShardingIndex) SetIndexedColumns ¶
func (idx *AutoShardingIndex) SetIndexedColumns(cols []string)
SetIndexedColumns configures which columns are indexed for fast equality lookups
func (*AutoShardingIndex) Warmup ¶
func (idx *AutoShardingIndex) Warmup() int
Warmup delegates to the current index.
type BM25Config ¶
type BM25Config struct {
K1 float64 // Term frequency saturation parameter (default: 1.2)
B float64 // Length normalization parameter (default: 0.75)
}
BM25Config holds BM25 algorithm parameters. K1 controls term frequency saturation (typically 1.2-2.0) B controls document length normalization (0=no normalization, 1=full normalization)
func DefaultBM25Config ¶
func DefaultBM25Config() BM25Config
DefaultBM25Config returns standard BM25 parameters.
func (BM25Config) Validate ¶
func (c BM25Config) Validate() error
Validate checks if the BM25 configuration is valid.
type BM25InvertedIndex ¶
type BM25InvertedIndex struct {
// contains filtered or unexported fields
}
BM25InvertedIndex is a sharded inverted index with proper BM25 scoring
func NewBM25InvertedIndex ¶
func NewBM25InvertedIndex(config BM25Config) *BM25InvertedIndex
NewBM25InvertedIndex creates a new BM25 inverted index
func (*BM25InvertedIndex) Add ¶
func (idx *BM25InvertedIndex) Add(id VectorID, text string)
Add indexes a document with the given text
func (*BM25InvertedIndex) Delete ¶
func (idx *BM25InvertedIndex) Delete(id VectorID)
Delete removes a document from the index
func (*BM25InvertedIndex) DocCount ¶
func (idx *BM25InvertedIndex) DocCount() int
DocCount returns the total number of documents in the index
func (*BM25InvertedIndex) GetDocLength ¶
func (idx *BM25InvertedIndex) GetDocLength(id VectorID) int
GetDocLength returns the length (term count) of a document
func (*BM25InvertedIndex) GetTermDocFreq ¶
func (idx *BM25InvertedIndex) GetTermDocFreq(term string) int
GetTermDocFreq returns the number of documents containing the term
func (*BM25InvertedIndex) SearchBM25 ¶
func (idx *BM25InvertedIndex) SearchBM25(query string, limit int) []SearchResult
SearchBM25 returns documents matching the query, scored by BM25
type BM25Scorer ¶
type BM25Scorer struct {
// contains filtered or unexported fields
}
BM25Scorer computes BM25 relevance scores for documents. It maintains corpus statistics for IDF calculation and length normalization.
func NewBM25Scorer ¶
func NewBM25Scorer(config BM25Config) *BM25Scorer
NewBM25Scorer creates a new BM25 scorer with the given configuration.
func (*BM25Scorer) AddDocument ¶
func (s *BM25Scorer) AddDocument(docLength int)
AddDocument registers a document with the given length to the corpus.
func (*BM25Scorer) AvgDocLength ¶
func (s *BM25Scorer) AvgDocLength() float64
AvgDocLength returns the average document length in the corpus.
func (*BM25Scorer) Config ¶
func (s *BM25Scorer) Config() BM25Config
Config returns the BM25 configuration.
func (*BM25Scorer) IDF ¶
func (s *BM25Scorer) IDF(docFreq int) float64
IDF computes the Inverse Document Frequency for a term. Uses the BM25 IDF formula: log((N - df + 0.5) / (df + 0.5) + 1) where N is total documents and df is document frequency.
func (*BM25Scorer) RemoveDocument ¶
func (s *BM25Scorer) RemoveDocument(docLength int)
RemoveDocument removes a document with the given length from the corpus.
func (*BM25Scorer) Score ¶
func (s *BM25Scorer) Score(tf, docLength, docFreq int) float64
Score computes the BM25 score for a term in a document. Parameters:
- tf: term frequency in the document
- docLength: length of the document (number of terms)
- docFreq: number of documents containing the term
Returns the BM25 score component for this term.
func (*BM25Scorer) ScoreMultiTerm ¶
func (s *BM25Scorer) ScoreMultiTerm(docLength int, terms []struct{ TF, DocFreq int }) float64
ScoreMultiTerm computes the total BM25 score for multiple terms. Optimization: Acquires read lock ONCE for all terms.
func (*BM25Scorer) TotalDocs ¶
func (s *BM25Scorer) TotalDocs() int
TotalDocs returns the number of documents in the corpus.
type BackpressureConfig ¶
type BackpressureConfig struct {
SoftLimitBytes uint64
HardLimitBytes uint64
CheckInterval time.Duration
SoftPressureDelay time.Duration
}
BackpressureConfig configures memory backpressure behavior.
type BatchRemapInfo ¶
type BatchRemapInfo struct {
NewBatchIdx int
NewRowIdxs []int // Maps oldRowIdx to newRowIdx in NewBatchIdx, or -1 if dropped
}
BatchRemapInfo describes how a batch ID maps to a new one
type BinaryQuantizer ¶
type BinaryQuantizer struct {
Threshold []float32 // Per-dimension threshold (typically median)
}
BinaryQuantizer holds per-dimension thresholds for binary quantization.
func TrainBinaryQuantizer ¶
func TrainBinaryQuantizer(vectors [][]float32) (*BinaryQuantizer, error)
TrainBinaryQuantizer learns per-dimension thresholds from sample vectors. Uses median as threshold for balanced bit distribution.
func (*BinaryQuantizer) AsymmetricBinaryDistance ¶
func (bq *BinaryQuantizer) AsymmetricBinaryDistance(query []float32, quantized []uint64) float32
AsymmetricBinaryDistance computes distance from float32 query to binary vector. This provides better recall than pure binary comparison.
func (*BinaryQuantizer) Dims ¶
func (bq *BinaryQuantizer) Dims() int
Dims returns the number of dimensions this quantizer handles.
func (*BinaryQuantizer) Quantize ¶
func (bq *BinaryQuantizer) Quantize(vec []float32) []uint64
Quantize converts a float32 vector to bit-packed uint64 representation. Each dimension becomes 1 bit: value >= threshold -> 1, else -> 0. Bits are packed LSB-first: dimension 0 is bit 0 of first uint64.
func (*BinaryQuantizer) QuantizeInto ¶
func (bq *BinaryQuantizer) QuantizeInto(vec []float32, dst []uint64)
QuantizeInto encodes a vector into a pre-allocated destination slice. Zero-allocation hot path for bulk encoding.
type BitmapPool ¶
type BitmapPool struct {
// contains filtered or unexported fields
}
BitmapPool manages pooled bitmap buffers to reduce GC pressure
func NewBitmapPool ¶
func NewBitmapPool(cfg BitmapPoolConfig) *BitmapPool
NewBitmapPool creates a new bitmap pool
func (*BitmapPool) Get ¶
func (bp *BitmapPool) Get(numBits int) *PooledBitmap
Get retrieves a bitmap buffer of at least numBits capacity
func (*BitmapPool) Put ¶
func (bp *BitmapPool) Put(buf *PooledBitmap)
Put returns a bitmap buffer to the pool
func (*BitmapPool) Stats ¶
func (bp *BitmapPool) Stats() BitmapPoolStats
Stats returns pool statistics
type BitmapPoolConfig ¶
type BitmapPoolConfig struct {
// SizeBuckets defines the pooled buffer sizes (in bits)
SizeBuckets []int
// MaxBufferSize is the largest buffer that will be pooled
MaxBufferSize int
// MaxPoolSize per bucket (0 = unlimited, managed by sync.Pool)
MaxPoolSize int
// MetricsEnabled enables Prometheus metrics
MetricsEnabled bool
}
BitmapPoolConfig configures the bitmap buffer pool
func DefaultBitmapPoolConfig ¶
func DefaultBitmapPoolConfig() BitmapPoolConfig
DefaultBitmapPoolConfig returns production defaults
func (*BitmapPoolConfig) Validate ¶
func (c *BitmapPoolConfig) Validate() error
Validate checks configuration
type BitmapPoolStats ¶
type BitmapPoolStats struct {
Gets uint64 // Total get operations
Puts uint64 // Total put operations
Hits uint64 // Buffers returned from pool
Misses uint64 // New allocations (pool empty)
Discards uint64 // Buffers too large to pool
}
BitmapPoolStats contains pool statistics
type Bitset ¶
type Bitset struct {
// contains filtered or unexported fields
}
Bitset is a thread-safe wrapper around a Roaring Bitmap (Item 10)
func (*Bitset) ToUint32Array ¶
ToUint32Array returns the set bits as a slice of uint32.
type BloomFilter ¶
type BloomFilter struct {
// contains filtered or unexported fields
}
BloomFilter is a space-efficient probabilistic data structure for testing set membership with no false negatives.
func NewBloomFilter ¶
func NewBloomFilter(n int, p float64) *BloomFilter
NewBloomFilter creates a bloom filter optimized for n items with false positive rate p. Uses optimal size and hash count formulas:
m = -n*ln(p) / (ln(2)^2) k = m/n * ln(2)
func (*BloomFilter) Add ¶
func (bf *BloomFilter) Add(item string)
Add inserts an item into the bloom filter.
func (*BloomFilter) Contains ¶
func (bf *BloomFilter) Contains(item string) bool
Contains checks if an item might be in the set. Returns true if possibly present (may be false positive). Returns false if definitely not present (never false negative).
type BruteForceIndex ¶
type BruteForceIndex struct {
// contains filtered or unexported fields
}
BruteForceIndex implements VectorIndex using linear scan O(N) search.
func NewBruteForceIndex ¶
func NewBruteForceIndex(ds *Dataset) *BruteForceIndex
NewBruteForceIndex creates a new brute force index for the given dataset.
func (*BruteForceIndex) AddByLocation ¶
func (b *BruteForceIndex) AddByLocation(batchIdx, rowIdx int) error
AddByLocation adds a vector from the dataset using batch and row indices.
func (*BruteForceIndex) Len ¶
func (b *BruteForceIndex) Len() int
Len returns the number of indexed vectors.
func (*BruteForceIndex) SearchVectors ¶
func (b *BruteForceIndex) SearchVectors(query []float32, k int, filters []Filter) ([]SearchResult, error)
SearchVectors returns the k nearest neighbors using linear scan.
type BufferRetainerStats ¶
type BufferRetainerStats struct {
ActiveBuffers int
TotalBytesRetained int64
TotalRetained int64
TotalReleased int64
}
BufferRetainerStats holds statistics about retained buffers
type BytePool ¶
type BytePool struct {
// contains filtered or unexported fields
}
BytePool pools bytes.Buffer instances to reduce allocation pressure in hot paths (WAL, serialization).
type COWMetadataMap ¶
type COWMetadataMap struct {
// contains filtered or unexported fields
}
COWMetadataMap provides a Copy-On-Write map for dataset metadata. Reads are lock-free via atomic pointer; writes copy the entire map. This eliminates lock contention in ListFlights.
func NewCOWMetadataMap ¶
func NewCOWMetadataMap() *COWMetadataMap
NewCOWMetadataMap creates a new Copy-On-Write metadata map.
func (*COWMetadataMap) Delete ¶
func (c *COWMetadataMap) Delete(name string)
Delete removes metadata for a dataset (copy-on-write).
func (*COWMetadataMap) Get ¶
func (c *COWMetadataMap) Get(name string) (DatasetMetadata, bool)
Get returns metadata for a dataset (lock-free read).
func (*COWMetadataMap) IncrementStats ¶
func (c *COWMetadataMap) IncrementStats(name string, rows int64, batches int)
IncrementStats atomically increments row and batch counts.
func (*COWMetadataMap) Len ¶
func (c *COWMetadataMap) Len() int
Len returns the number of datasets in the map.
func (*COWMetadataMap) Set ¶
func (c *COWMetadataMap) Set(name string, meta DatasetMetadata)
Set adds or updates metadata for a dataset (copy-on-write).
func (*COWMetadataMap) Snapshot ¶
func (c *COWMetadataMap) Snapshot() map[string]DatasetMetadata
Snapshot returns the current map for iteration. The returned map is immutable and safe to iterate without locks.
func (*COWMetadataMap) UpdateFromRecords ¶
func (c *COWMetadataMap) UpdateFromRecords(name string, batches []arrow.RecordBatch)
UpdateFromRecords updates metadata from a slice of record batches.
type CheckpointConfig ¶
type CheckpointConfig struct {
Interval time.Duration
Timeout time.Duration
MinWALSize int64
QuorumRequired int
}
CheckpointConfig configures coordinated checkpointing behavior.
type CheckpointCoordinator ¶
type CheckpointCoordinator struct {
// contains filtered or unexported fields
}
CheckpointCoordinator manages distributed checkpoint synchronization.
func NewCheckpointCoordinator ¶
func NewCheckpointCoordinator(cfg CheckpointConfig) *CheckpointCoordinator
NewCheckpointCoordinator creates a new checkpoint coordinator.
func (*CheckpointCoordinator) GetCheckpointCount ¶
func (c *CheckpointCoordinator) GetCheckpointCount() uint64
GetCheckpointCount returns the total number of checkpoints completed.
func (*CheckpointCoordinator) GetEpoch ¶
func (c *CheckpointCoordinator) GetEpoch() uint64
GetEpoch returns the current checkpoint epoch.
func (*CheckpointCoordinator) InitiateCheckpoint ¶
func (c *CheckpointCoordinator) InitiateCheckpoint(ctx context.Context) error
InitiateCheckpoint starts a new checkpoint epoch.
func (*CheckpointCoordinator) RecoverFromEpoch ¶
func (c *CheckpointCoordinator) RecoverFromEpoch(epoch uint64)
RecoverFromEpoch sets the epoch during recovery.
func (*CheckpointCoordinator) RegisterParticipant ¶
func (c *CheckpointCoordinator) RegisterParticipant(id string)
RegisterParticipant adds a participant to the checkpoint barrier.
func (*CheckpointCoordinator) ShouldTruncateWAL ¶
func (c *CheckpointCoordinator) ShouldTruncateWAL(epoch uint64) bool
ShouldTruncateWAL returns true if WAL can be truncated after checkpoint.
func (*CheckpointCoordinator) UnregisterParticipant ¶
func (c *CheckpointCoordinator) UnregisterParticipant(id string)
UnregisterParticipant removes a participant from the checkpoint barrier.
func (*CheckpointCoordinator) WaitForBarrier ¶
func (c *CheckpointCoordinator) WaitForBarrier(ctx context.Context, participantID string, epoch uint64) error
WaitForBarrier waits until quorum participants reach the barrier.
type ChunkHandler ¶
type ChunkHandler func(ctx context.Context, chunk *FlightDataChunk) error
ChunkHandler is the callback for processing a chunk.
type ChunkWorkerPool ¶
type ChunkWorkerPool struct {
// contains filtered or unexported fields
}
ChunkWorkerPool processes chunks from a queue using multiple workers.
func NewChunkWorkerPool ¶
func NewChunkWorkerPool(config ChunkWorkerPoolConfig, queue *FlightDataQueue, handler ChunkHandler) *ChunkWorkerPool
NewChunkWorkerPool creates a new worker pool.
func (*ChunkWorkerPool) IsRunning ¶
func (p *ChunkWorkerPool) IsRunning() bool
IsRunning returns whether pool is running.
func (*ChunkWorkerPool) Stats ¶
func (p *ChunkWorkerPool) Stats() ChunkWorkerPoolStats
Stats returns current statistics.
func (*ChunkWorkerPool) Stop ¶
func (p *ChunkWorkerPool) Stop()
Stop signals workers to stop and waits for completion.
type ChunkWorkerPoolConfig ¶
type ChunkWorkerPoolConfig struct {
NumWorkers int // number of worker goroutines
ProcessTimeout time.Duration // timeout for processing each chunk
}
ChunkWorkerPoolConfig configures the worker pool.
func DefaultChunkWorkerPoolConfig ¶
func DefaultChunkWorkerPoolConfig() ChunkWorkerPoolConfig
DefaultChunkWorkerPoolConfig returns sensible defaults.
func (ChunkWorkerPoolConfig) Validate ¶
func (c ChunkWorkerPoolConfig) Validate() error
Validate checks config validity.
type ChunkWorkerPoolStats ¶
ChunkWorkerPoolStats tracks worker pool operations.
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern
func NewCircuitBreaker ¶
func NewCircuitBreaker(config CircuitBreakerConfig) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker
func (*CircuitBreaker) Allow ¶
func (cb *CircuitBreaker) Allow() bool
Allow checks if a request should be allowed
func (*CircuitBreaker) Execute ¶
func (cb *CircuitBreaker) Execute(fn func() (interface{}, error)) (interface{}, error)
Execute wraps a function with circuit breaker logic
func (*CircuitBreaker) RecordFailure ¶
func (cb *CircuitBreaker) RecordFailure()
RecordFailure records a failed operation
func (*CircuitBreaker) RecordSuccess ¶
func (cb *CircuitBreaker) RecordSuccess()
RecordSuccess records a successful operation
func (*CircuitBreaker) Reset ¶
func (cb *CircuitBreaker) Reset()
Reset resets the circuit breaker to closed state
func (*CircuitBreaker) State ¶
func (cb *CircuitBreaker) State() CircuitState
State returns current circuit state
func (*CircuitBreaker) Stats ¶
func (cb *CircuitBreaker) Stats() CircuitBreakerStats
Stats returns current statistics
type CircuitBreakerConfig ¶
type CircuitBreakerConfig struct {
FailureThreshold int // failures before opening
SuccessThreshold int // successes in half-open before closing
Timeout time.Duration // time in open state before half-open
}
CircuitBreakerConfig holds configuration for circuit breaker
func DefaultCircuitBreakerConfig ¶
func DefaultCircuitBreakerConfig() CircuitBreakerConfig
DefaultCircuitBreakerConfig returns sensible defaults
type CircuitBreakerRegistry ¶
type CircuitBreakerRegistry struct {
// contains filtered or unexported fields
}
CircuitBreakerRegistry manages multiple circuit breakers by key
func NewCircuitBreakerRegistry ¶
func NewCircuitBreakerRegistry(config CircuitBreakerConfig) *CircuitBreakerRegistry
NewCircuitBreakerRegistry creates a new registry
func (*CircuitBreakerRegistry) GetOrCreate ¶
func (r *CircuitBreakerRegistry) GetOrCreate(key string) *CircuitBreaker
GetOrCreate returns an existing circuit breaker or creates a new one
func (*CircuitBreakerRegistry) Reset ¶
func (r *CircuitBreakerRegistry) Reset(key string)
Reset resets a specific circuit breaker
func (*CircuitBreakerRegistry) ResetAll ¶
func (r *CircuitBreakerRegistry) ResetAll()
ResetAll resets all circuit breakers
type CircuitBreakerStats ¶
type CircuitBreakerStats struct {
Successes int64
Failures int64
Rejections int64
State CircuitState
LastFailure time.Time
LastSuccess time.Time
StateChanges int64
}
CircuitBreakerStats holds operational statistics
type CircuitState ¶
type CircuitState int32
Circuit breaker states
const ( CircuitClosed CircuitState = iota // Normal operation CircuitOpen // Failing, reject requests CircuitHalfOpen // Testing recovery )
func (CircuitState) String ¶
func (s CircuitState) String() string
type ClockComparison ¶
type ClockComparison int
ClockComparison represents the result of comparing two vector clocks
const ( ClockEqual ClockComparison = iota // Clocks are identical ClockBefore // First clock happened before second ClockAfter // First clock happened after second ClockConcurrent // Clocks are concurrent (conflict) )
type ColumnInvertedIndex ¶
type ColumnInvertedIndex struct {
// contains filtered or unexported fields
}
ColumnInvertedIndex provides O(1) equality lookups on indexed columns Structure: dataset -> column -> value -> []RowPosition
func NewColumnInvertedIndex ¶
func NewColumnInvertedIndex() *ColumnInvertedIndex
NewColumnInvertedIndex creates a new column-based inverted index
func (*ColumnInvertedIndex) BuildFilterMask ¶
func (idx *ColumnInvertedIndex) BuildFilterMask(datasetName string, recordIdx int, columnName, value string, numRows int, mem memory.Allocator) *array.Boolean
BuildFilterMask creates a boolean mask for filtering using indexed lookup Returns nil if no index exists for the column
func (*ColumnInvertedIndex) FilterRecordWithIndex ¶
func (idx *ColumnInvertedIndex) FilterRecordWithIndex(ctx context.Context, datasetName string, recordIdx int, rec arrow.RecordBatch, filter Filter, mem memory.Allocator) (arrow.RecordBatch, error)
FilterRecordWithIndex applies an equality filter using the index for O(1) lookup Falls back to compute.Filter if no index exists
func (*ColumnInvertedIndex) GetMatchingRowIndices ¶
func (idx *ColumnInvertedIndex) GetMatchingRowIndices(datasetName string, recordIdx int, columnName, value string) []int
GetMatchingRowIndices returns row indices within a specific record
func (*ColumnInvertedIndex) HasIndex ¶
func (idx *ColumnInvertedIndex) HasIndex(datasetName, columnName string) bool
HasIndex checks if an index exists for the given dataset and column
func (*ColumnInvertedIndex) IndexRecord ¶
func (idx *ColumnInvertedIndex) IndexRecord(datasetName string, recordIdx int, rec arrow.RecordBatch, columnsToIndex []string)
IndexRecord indexes specified columns of a record batch
func (*ColumnInvertedIndex) Lookup ¶
func (idx *ColumnInvertedIndex) Lookup(datasetName, columnName, value string) []RowPosition
Lookup returns all row positions matching the given value Returns empty slice if not found (O(1) lookup)
func (*ColumnInvertedIndex) RemoveDataset ¶
func (idx *ColumnInvertedIndex) RemoveDataset(datasetName string)
RemoveDataset removes all indices for a dataset
func (*ColumnInvertedIndex) RemoveRecord ¶
func (idx *ColumnInvertedIndex) RemoveRecord(datasetName string, recordIdx int)
RemoveRecord removes all index entries for a specific record
func (*ColumnInvertedIndex) Stats ¶
func (idx *ColumnInvertedIndex) Stats() ColumnInvertedIndexStats
type ColumnInvertedIndexStats ¶
type ColumnInvertedIndexStats struct {
Datasets int
TotalColumns int
TotalValues int
TotalPositions int
}
Stats returns statistics about the index
type ColumnMetadata ¶
type ColumnMetadata struct {
Name string
Type arrow.DataType
AddedAt uint64 // version when added
DroppedAt uint64 // version when dropped (0 = not dropped)
}
ColumnMetadata tracks column lifecycle
type CompactionCandidate ¶
CompactionCandidate represents a range of batches to be merged
type CompactionConfig ¶
type CompactionConfig struct {
// TargetBatchSize is the target number of rows per compacted batch.
TargetBatchSize int64
// MinBatchesToCompact is the minimum number of batches before compaction triggers.
MinBatchesToCompact int
// CompactionInterval is how often the compaction worker checks for work.
CompactionInterval time.Duration
// Enabled controls whether background compaction runs.
Enabled bool
}
CompactionConfig configures the background RecordBatch compaction worker.
func DefaultCompactionConfig ¶
func DefaultCompactionConfig() CompactionConfig
DefaultCompactionConfig returns sensible defaults for compaction.
func (CompactionConfig) Validate ¶
func (c CompactionConfig) Validate() error
Validate checks if the configuration is valid.
type CompactionStats ¶
type CompactionStats struct {
CompactionsRun int64
BatchesMerged int64
RowsProcessed int64
LastRunTime time.Time
}
CompactionStats tracks compaction worker statistics.
type CompactionWorker ¶
type CompactionWorker struct {
// contains filtered or unexported fields
}
CompactionWorker runs background compaction of RecordBatches.
func NewCompactionWorker ¶
func NewCompactionWorker(store *VectorStore, cfg CompactionConfig) *CompactionWorker
NewCompactionWorker creates a new compaction worker with the given config.
func (*CompactionWorker) GetTriggerCount ¶
func (w *CompactionWorker) GetTriggerCount() int64
GetTriggerCount returns the total number of auto-compaction triggers.
func (*CompactionWorker) IsRunning ¶
func (w *CompactionWorker) IsRunning() bool
IsRunning returns true if the worker is currently running.
func (*CompactionWorker) Start ¶
func (w *CompactionWorker) Start()
Start begins the background compaction goroutine.
func (*CompactionWorker) Stats ¶
func (w *CompactionWorker) Stats() CompactionStats
Stats returns current compaction statistics.
func (*CompactionWorker) Stop ¶
func (w *CompactionWorker) Stop()
Stop halts the background compaction goroutine.
func (*CompactionWorker) TriggerCompaction ¶
func (w *CompactionWorker) TriggerCompaction(dataset string) error
TriggerCompaction triggers compaction for a specific dataset. This is non-blocking - if the channel is full, it returns without blocking.
type CompressedBitmap ¶
type CompressedBitmap struct {
// contains filtered or unexported fields
}
CompressedBitmap provides a memory-efficient bitset using Roaring Bitmaps. It is optimized for high-cardinality integer sets (e.g., VectorID lists, tombstones).
func NewBitmap ¶
func NewBitmap() *CompressedBitmap
NewBitmap creates a new empty compressed bitmap.
func (*CompressedBitmap) Add ¶
func (b *CompressedBitmap) Add(id VectorID)
Add sets the bit for the given VectorID.
func (*CompressedBitmap) Cardinality ¶
func (b *CompressedBitmap) Cardinality() uint64
Cardinality returns the number of set bits.
func (*CompressedBitmap) Contains ¶
func (b *CompressedBitmap) Contains(id VectorID) bool
Contains checks if the bit for the given VectorID is set.
func (*CompressedBitmap) Difference ¶
func (b *CompressedBitmap) Difference(other *CompressedBitmap) *CompressedBitmap
Difference returns a new bitmap containing elements in b but not in other.
func (*CompressedBitmap) Intersection ¶
func (b *CompressedBitmap) Intersection(other *CompressedBitmap) *CompressedBitmap
Intersection returns a new bitmap containing the intersection of b and other.
func (*CompressedBitmap) Iterator ¶
func (b *CompressedBitmap) Iterator() roaring.IntIterable
Iterator returns an iterator over the set bits.
func (*CompressedBitmap) Remove ¶
func (b *CompressedBitmap) Remove(id VectorID)
Remove clears the bit for the given VectorID.
func (*CompressedBitmap) Union ¶
func (b *CompressedBitmap) Union(other *CompressedBitmap) *CompressedBitmap
Union returns a new bitmap containing the union of b and other.
type ConfigError ¶
type ConfigError struct {
Component string // Component: "NUMA", "S3Backend", "Replication"
Field string // Configuration field name
Value string // Invalid value (as string)
Message string // Validation error message
Timestamp time.Time // When the error occurred
}
ConfigError provides rich context for configuration validation errors.
func (*ConfigError) Error ¶
func (e *ConfigError) Error() string
type ConsistencyLevel ¶
type ConsistencyLevel int
ConsistencyLevel defines the consistency requirements for read/write operations
const ( ConsistencyOne ConsistencyLevel = iota // At least one node must acknowledge ConsistencyQuorum // Majority of nodes must acknowledge ConsistencyAll // All nodes must acknowledge )
func ParseConsistencyLevel ¶
func ParseConsistencyLevel(s string) (ConsistencyLevel, error)
ParseConsistencyLevel parses a string into ConsistencyLevel
func (ConsistencyLevel) String ¶
func (cl ConsistencyLevel) String() string
String returns the string representation of ConsistencyLevel
type DataServer ¶
type DataServer struct {
*VectorStore
}
DataServer handles data plane operations (DoGet, DoPut) Embeds VectorStore to inherit base interface, overrides methods for error conversion.
func NewDataServer ¶
func NewDataServer(store *VectorStore) *DataServer
func (*DataServer) DoAction ¶
func (s *DataServer) DoAction(action *flight.Action, stream flight.FlightService_DoActionServer) error
DoAction returns Unimplemented on DataServer (data plane only)
func (*DataServer) DoExchange ¶
func (s *DataServer) DoExchange(stream flight.FlightService_DoExchangeServer) error
DoExchange delegates to VectorStore with error conversion
func (*DataServer) DoGet ¶
func (s *DataServer) DoGet(tkt *flight.Ticket, stream flight.FlightService_DoGetServer) error
DoGet retrieves a dataset, converting domain errors to gRPC status codes.
func (*DataServer) DoPut ¶
func (s *DataServer) DoPut(stream flight.FlightService_DoPutServer) error
DoPut stores a dataset, converting domain errors to gRPC status codes.
func (*DataServer) GetFlightInfo ¶
func (s *DataServer) GetFlightInfo(ctx context.Context, desc *flight.FlightDescriptor) (*flight.FlightInfo, error)
GetFlightInfo returns Unimplemented on DataServer
func (*DataServer) GetSchema ¶
func (s *DataServer) GetSchema(ctx context.Context, desc *flight.FlightDescriptor) (*flight.SchemaResult, error)
GetSchema delegates to VectorStore with error conversion
func (*DataServer) ListFlights ¶
func (s *DataServer) ListFlights(c *flight.Criteria, stream flight.FlightService_ListFlightsServer) error
ListFlights returns Unimplemented on DataServer
type Dataset ¶
type Dataset struct {
Records []arrow.RecordBatch
Version int64
Index VectorIndex // Use common interface (Item 3)
Name string
Schema *arrow.Schema
Topo *NUMATopology
// Tombstones map BatchIdx -> Bitset of deleted RowIdxs
Tombstones map[int]*Bitset
// BatchNodes tracks which NUMA node each RecordBatch is allocated on
BatchNodes []int
// Memory tracking
SizeBytes atomic.Int64
// LWW State
LWW *TimestampMap
// Anti-Entropy
Merkle *MerkleTree
// Hybrid Search
InvertedIndexes map[string]*InvertedIndex
BM25Index *BM25InvertedIndex
// contains filtered or unexported fields
}
Dataset wraps records with metadata for eviction and tombstones
func (*Dataset) AddToIndex ¶
AddToIndex adds a vector to the index
func (*Dataset) EvictExpiredRecords ¶
func (d *Dataset) EvictExpiredRecords() []arrow.RecordBatch
EvictExpiredRecords removes TTL-expired records from the dataset using tombstones Returns the number of records evicted
func (*Dataset) GetExistingSchema ¶
GetExistingSchema returns the schema of the first record in the dataset. Uses dataMu RLock for read-only access to Records. Returns nil if dataset has no records.
func (*Dataset) GetRecord ¶
func (d *Dataset) GetRecord(idx int) (arrow.RecordBatch, bool)
GetRecord returns the record batch at the given index in a thread-safe manner.
func (*Dataset) GetRecordMetadata ¶
func (d *Dataset) GetRecordMetadata(rec arrow.RecordBatch) *RecordMetadata
GetRecordMetadata retrieves eviction metadata for a record
func (*Dataset) GetRecordsCount ¶
GetRecordsCount returns the number of records in the dataset. Uses dataMu RLock for read-only access.
func (*Dataset) GetVectorIndex ¶
func (d *Dataset) GetVectorIndex() VectorIndex
GetVectorIndex returns the current index safely
func (*Dataset) GetVersion ¶
GetVersion returns the current schema version. Uses RLock for read-only access.
func (*Dataset) InitRecordEviction ¶
func (d *Dataset) InitRecordEviction()
InitRecordEviction initializes the per-record eviction manager for a dataset
func (*Dataset) LastAccess ¶
func (*Dataset) MigrateToShardedIndex ¶
func (d *Dataset) MigrateToShardedIndex(cfg AutoShardingConfig) error
MigrateToShardedIndex migrates the current index to a sharded index
func (*Dataset) RegisterRecordWithTTL ¶
func (d *Dataset) RegisterRecordWithTTL(rec arrow.RecordBatch, ttl time.Duration)
RegisterRecordWithTTL registers a record for per-record TTL eviction
func (*Dataset) SearchDataset ¶
func (d *Dataset) SearchDataset(query []float32, k int) []SearchResult
SearchDataset delegates to the vector index if available
func (*Dataset) SetLastAccess ¶
func (*Dataset) UpgradeSchemaVersion ¶
func (ds *Dataset) UpgradeSchemaVersion()
UpgradeSchemaVersion increments the dataset version. Uses Lock for exclusive write access.
type DatasetMetadata ¶
DatasetMetadata holds lightweight metadata for ListFlights without requiring locks on the main Dataset struct.
type DeltaSync ¶
type DeltaSync struct {
FromVersion uint64
ToVersion uint64
NewLocations []Location
StartIndex int
}
DeltaSync represents incremental changes between versions.
type DirectBufferReader ¶
type DirectBufferReader struct {
// contains filtered or unexported fields
}
DirectBufferReader provides zero-copy access to Arrow buffers
func NewDirectBufferReader ¶
func NewDirectBufferReader(alloc memory.Allocator) *DirectBufferReader
NewDirectBufferReader creates a new direct buffer reader
func (*DirectBufferReader) Allocator ¶
func (r *DirectBufferReader) Allocator() memory.Allocator
Allocator returns the underlying allocator
func (*DirectBufferReader) CreateArrayDataFromBuffer ¶
func (r *DirectBufferReader) CreateArrayDataFromBuffer( dt arrow.DataType, length int, dataBuf []byte, nullBitmap []byte, ) arrow.ArrayData
CreateArrayDataFromBuffer creates arrow.ArrayData directly from a byte buffer without copying the data. The buffer must remain valid for the lifetime of the returned ArrayData.
func (*DirectBufferReader) GetBufferReference ¶
func (r *DirectBufferReader) GetBufferReference(data []byte) []byte
GetBufferReference returns a reference to the buffer without copying The returned slice points to the same underlying array as the input
type DiskANNConfig ¶
DiskANNConfig holds DiskANN-specific configuration
type DiskANNIndex ¶
type DiskANNIndex struct {
// contains filtered or unexported fields
}
DiskANNIndex implements PluggableVectorIndex for DiskANN algorithm
func (*DiskANNIndex) AddBatch ¶
func (d *DiskANNIndex) AddBatch(ids []uint64, vectors [][]float32) error
func (*DiskANNIndex) AddByLocation ¶
func (d *DiskANNIndex) AddByLocation(batchIdx, rowIdx int) error
func (*DiskANNIndex) Build ¶
func (d *DiskANNIndex) Build() error
func (*DiskANNIndex) Close ¶
func (d *DiskANNIndex) Close() error
func (*DiskANNIndex) Dimension ¶
func (d *DiskANNIndex) Dimension() int
func (*DiskANNIndex) Len ¶
func (d *DiskANNIndex) Len() int
func (*DiskANNIndex) Load ¶
func (d *DiskANNIndex) Load(path string) error
func (*DiskANNIndex) NeedsBuild ¶
func (d *DiskANNIndex) NeedsBuild() bool
func (*DiskANNIndex) Save ¶
func (d *DiskANNIndex) Save(path string) error
func (*DiskANNIndex) Search ¶
func (d *DiskANNIndex) Search(query []float32, k int) ([]IndexSearchResult, error)
func (*DiskANNIndex) SearchBatch ¶
func (d *DiskANNIndex) SearchBatch(queries [][]float32, k int) ([][]IndexSearchResult, error)
func (*DiskANNIndex) SearchVectors ¶
func (d *DiskANNIndex) SearchVectors(query []float32, k int) []SearchResult
func (*DiskANNIndex) Size ¶
func (d *DiskANNIndex) Size() int
func (*DiskANNIndex) Type ¶
func (d *DiskANNIndex) Type() IndexType
type DoGetPipeline ¶
type DoGetPipeline struct {
// contains filtered or unexported fields
}
DoGetPipeline implements pipelined processing for DoGet operations. Workers filter batches in parallel while the main thread writes, hiding compute latency behind network I/O.
func NewDoGetPipeline ¶
func NewDoGetPipeline(workers, bufferSize int) *DoGetPipeline
NewDoGetPipeline creates a new pipeline with the specified number of workers. If workers is 0, defaults to runtime.NumCPU(). If bufferSize is 0, defaults to workers * 2.
func (*DoGetPipeline) NumWorkers ¶
func (p *DoGetPipeline) NumWorkers() int
NumWorkers returns the number of worker goroutines
func (*DoGetPipeline) Process ¶
func (p *DoGetPipeline) Process( ctx context.Context, batches []arrow.RecordBatch, filterFn FilterFunc, ) (results <-chan PipelineResult, errs <-chan error)
Process starts pipelined processing of batches. Returns a channel of ordered results and an error channel. Results are guaranteed to arrive in the same order as input batches.
func (*DoGetPipeline) ProcessRecords ¶
func (p *DoGetPipeline) ProcessRecords( ctx context.Context, records []arrow.RecordBatch, tombstones map[int]*Bitset, filters []Filter, evaluator interface{}, ) <-chan PipelineStage
ProcessRecords is the compatibility method for store.go
func (*DoGetPipeline) Stats ¶
func (p *DoGetPipeline) Stats() PipelineStats
Stats returns current pipeline statistics
func (*DoGetPipeline) Stop ¶
func (p *DoGetPipeline) Stop()
type DoGetPipelineConfig ¶
type DoGetPipelineConfig struct {
Workers int // Number of parallel workers
BufferSize int // Channel buffer size
Threshold int // Minimum batches to trigger pipeline (vs serial)
}
DoGetPipelineConfig holds configuration for DoGet pipeline processing
func DefaultDoGetPipelineConfig ¶
func DefaultDoGetPipelineConfig() DoGetPipelineConfig
DefaultDoGetPipelineConfig returns sensible defaults
type DoGetPipelinePool ¶
type DoGetPipelinePool struct {
// contains filtered or unexported fields
}
DoGetPipelinePool manages a pool of reusable pipelines
func NewDoGetPipelinePool ¶
func NewDoGetPipelinePool(workers, bufferSize int) *DoGetPipelinePool
NewDoGetPipelinePool creates a pool of pipelines
func (*DoGetPipelinePool) Get ¶
func (pp *DoGetPipelinePool) Get() *DoGetPipeline
Get retrieves a pipeline from the pool
func (*DoGetPipelinePool) Put ¶
func (pp *DoGetPipelinePool) Put(p *DoGetPipeline)
Put returns a pipeline to the pool
type Edge ¶
type Edge struct {
Subject string // Source entity (e.g., "user:alice")
Predicate string // Relationship type (e.g., "owns", "likes")
Object string // Target entity (e.g., "doc:report1")
Weight float32 // Edge weight for scoring
}
Edge represents a knowledge graph edge (subject -> predicate -> object)
type ErrDimensionMismatch ¶
ErrDimensionMismatch indicates vector dimension incompatibility.
func (*ErrDimensionMismatch) Error ¶
func (e *ErrDimensionMismatch) Error() string
type ErrInternal ¶
ErrInternal indicates an unexpected internal error.
func (*ErrInternal) Error ¶
func (e *ErrInternal) Error() string
func (*ErrInternal) Unwrap ¶
func (e *ErrInternal) Unwrap() error
type ErrInvalidArgument ¶
ErrInvalidArgument indicates invalid input from the client.
func (*ErrInvalidArgument) Error ¶
func (e *ErrInvalidArgument) Error() string
type ErrNotFound ¶
ErrNotFound indicates a requested resource does not exist.
func (*ErrNotFound) Error ¶
func (e *ErrNotFound) Error() string
type ErrPersistence ¶
ErrPersistence indicates a storage/persistence failure.
func (*ErrPersistence) Error ¶
func (e *ErrPersistence) Error() string
func (*ErrPersistence) Unwrap ¶
func (e *ErrPersistence) Unwrap() error
type ErrResourceExhausted ¶
ErrResourceExhausted indicates system resource limits exceeded.
func (*ErrResourceExhausted) Error ¶
func (e *ErrResourceExhausted) Error() string
type ErrSchemaMismatch ¶
ErrSchemaMismatch indicates incompatible schema between operations.
func (*ErrSchemaMismatch) Error ¶
func (e *ErrSchemaMismatch) Error() string
type ErrUnavailable ¶
type ErrUnavailable struct {
}
ErrUnavailable indicates temporary unavailability (e.g., during snapshots).
func (*ErrUnavailable) Error ¶
func (e *ErrUnavailable) Error() string
type FSBackend ¶
type FSBackend struct {
// contains filtered or unexported fields
}
FSBackend is a standard os.File backed implementation.
func NewFSBackend ¶
type FilterEvaluator ¶
type FilterEvaluator struct {
// contains filtered or unexported fields
}
FilterEvaluator pre-processes filters for a specific RecordBatch to enable fast scanning
func NewFilterEvaluator ¶
func NewFilterEvaluator(rec arrow.RecordBatch, filters []Filter) (*FilterEvaluator, error)
NewFilterEvaluator creates a new evaluator, pre-binding filters to RecordBatch columns
func (*FilterEvaluator) Matches ¶
func (e *FilterEvaluator) Matches(rowIdx int) bool
Matches returns true if the row satisfies all filters
func (*FilterEvaluator) MatchesAll ¶
func (e *FilterEvaluator) MatchesAll(batchLen int) []int
MatchesAll evaluates all filters on the entire batch using SIMD and returns matching row indices. This is optimal for dense scans.
func (*FilterEvaluator) MatchesBatch ¶
func (e *FilterEvaluator) MatchesBatch(rowIndices []int) []int
MatchesBatch evaluates filters for a slice of row indices and returns a subset of matching indices. This is the "SIMD-friendly" entry point for batch processing.
type FilterFunc ¶
type FilterFunc func(ctx context.Context, rec arrow.RecordBatch) (arrow.RecordBatch, error)
FilterFunc is the signature for batch filtering functions
type FilterOperator ¶
type FilterOperator int
FilterOperator represents a vectorized filter comparison operator
const ( FilterOpEqual FilterOperator = iota FilterOpNotEqual FilterOpGreater FilterOpLess FilterOpGreaterEqual FilterOpLessEqual FilterOpIn FilterOpNotIn FilterOpContains )
func ParseFilterOperator ¶
func ParseFilterOperator(op string) (FilterOperator, error)
ParseFilterOperator parses a string operator into FilterOperator
func (FilterOperator) ComputeFunc ¶
func (op FilterOperator) ComputeFunc() string
ComputeFunc returns the Arrow compute function name for the operator
func (FilterOperator) String ¶
func (op FilterOperator) String() string
String returns the string representation of the operator
type FlightClientPool ¶
type FlightClientPool struct {
// contains filtered or unexported fields
}
FlightClientPool manages pooled connections to multiple peer Flight servers.
func NewFlightClientPool ¶
func NewFlightClientPool(cfg FlightClientPoolConfig) *FlightClientPool
NewFlightClientPool creates a new connection pool.
func (*FlightClientPool) AddHost ¶
func (p *FlightClientPool) AddHost(host string) error
AddHost adds a peer host to the pool.
func (*FlightClientPool) Close ¶
func (p *FlightClientPool) Close() error
Close closes all connections and shuts down the pool.
func (*FlightClientPool) DoGetFromPeer ¶
func (p *FlightClientPool) DoGetFromPeer(ctx context.Context, host, dataset string, filters []Filter) ([]arrow.RecordBatch, error)
DoGetFromPeer retrieves Arrow RecordBatches from a peer via Flight DoGet.
func (*FlightClientPool) DoPutToPeer ¶
func (p *FlightClientPool) DoPutToPeer(ctx context.Context, host, dataset string, records []arrow.RecordBatch) error
DoPutToPeer sends Arrow RecordBatches to a peer via Flight DoPut.
func (*FlightClientPool) Get ¶
func (p *FlightClientPool) Get(ctx context.Context, host string) (*PooledFlightClient, error)
Get acquires a connection to the specified host. If no idle connection is available, creates a new one.
func (*FlightClientPool) Put ¶
func (p *FlightClientPool) Put(conn *PooledFlightClient)
Put returns a connection to the pool.
func (*FlightClientPool) RemoveHost ¶
func (p *FlightClientPool) RemoveHost(host string) error
RemoveHost removes a peer host from the pool and closes its connections.
func (*FlightClientPool) ReplicateToPeers ¶
func (p *FlightClientPool) ReplicateToPeers(ctx context.Context, peers []string, dataset string, records []arrow.RecordBatch) map[string]error
ReplicateToPeers replicates data to multiple peers concurrently. Returns a map of host -> error for failed replications.
func (*FlightClientPool) Stats ¶
func (p *FlightClientPool) Stats() FlightClientPoolStats
Stats returns current pool statistics.
type FlightClientPoolConfig ¶
type FlightClientPoolConfig struct {
// MaxConnsPerHost is the maximum connections per peer host (default: 10)
MaxConnsPerHost int
// MinConnsPerHost is the minimum idle connections to maintain (default: 1)
MinConnsPerHost int
// ConnTimeout is the connection establishment timeout (default: 10s)
ConnTimeout time.Duration
// IdleTimeout is how long idle connections are kept (default: 5m)
IdleTimeout time.Duration
// MaxConnLifetime is the maximum lifetime of any connection (default: 1h)
MaxConnLifetime time.Duration
// HealthCheckInterval is how often to check connection health (default: 30s)
HealthCheckInterval time.Duration
// EnableCompression enables gzip compression for Flight streams
EnableCompression bool
}
FlightClientPoolConfig configures the Flight client connection pool.
func DefaultFlightClientPoolConfig ¶
func DefaultFlightClientPoolConfig() FlightClientPoolConfig
DefaultFlightClientPoolConfig returns sensible defaults for Flight client pooling.
func (FlightClientPoolConfig) Validate ¶
func (c FlightClientPoolConfig) Validate() error
Validate checks if the configuration is valid.
type FlightClientPoolStats ¶
type FlightClientPoolStats struct {
TotalHosts int
TotalConnections int
ActiveConns int
IdleConns int
Gets int64
Puts int64
Hits int64
Misses int64
Timeouts int64
Errors int64
}
FlightClientPoolStats provides statistics about the connection pool.
type FlightDataChunk ¶
type FlightDataChunk struct {
DatasetName string
Record arrow.RecordBatch
ReceivedAt time.Time
}
FlightDataChunk represents a received Arrow RecordBatch with metadata.
type FlightDataQueue ¶
type FlightDataQueue struct {
// contains filtered or unexported fields
}
FlightDataQueue is a buffered channel wrapper for decoupling receive from processing.
func NewFlightDataQueue ¶
func NewFlightDataQueue(config FlightDataQueueConfig) *FlightDataQueue
NewFlightDataQueue creates a new queue.
func (*FlightDataQueue) Close ¶
func (q *FlightDataQueue) Close()
Close closes the queue, preventing new enqueues.
func (*FlightDataQueue) Dequeue ¶
func (q *FlightDataQueue) Dequeue(ctx context.Context) (*FlightDataChunk, bool)
Dequeue retrieves the next chunk, respecting context cancellation. Returns (nil, false) if queue is closed and empty or context canceled.
func (*FlightDataQueue) IsClosed ¶
func (q *FlightDataQueue) IsClosed() bool
IsClosed returns whether queue is closed.
func (*FlightDataQueue) Stats ¶
func (q *FlightDataQueue) Stats() FlightDataQueueStats
Stats returns current statistics.
func (*FlightDataQueue) TryEnqueue ¶
func (q *FlightDataQueue) TryEnqueue(chunk *FlightDataChunk) bool
TryEnqueue attempts to enqueue a chunk without blocking indefinitely. Returns false if queue is full or closed.
type FlightDataQueueConfig ¶
type FlightDataQueueConfig struct {
QueueSize int // buffered channel capacity
EnqueueTimeout time.Duration // timeout for TryEnqueue
DrainTimeout time.Duration // timeout for draining on close
}
FlightDataQueueConfig configures the flight data queue.
func DefaultFlightDataQueueConfig ¶
func DefaultFlightDataQueueConfig() FlightDataQueueConfig
DefaultFlightDataQueueConfig returns sensible defaults.
func (FlightDataQueueConfig) Validate ¶
func (c FlightDataQueueConfig) Validate() error
Validate checks config validity.
type FlightDataQueueStats ¶
FlightDataQueueStats tracks queue operations.
type FlightReaderBufferRetainer ¶
type FlightReaderBufferRetainer struct {
// contains filtered or unexported fields
}
FlightReaderBufferRetainer manages buffer lifecycles for zero-copy operations
func NewFlightReaderBufferRetainer ¶
func NewFlightReaderBufferRetainer(maxBuffers int) *FlightReaderBufferRetainer
NewFlightReaderBufferRetainer creates a new buffer retainer
func (*FlightReaderBufferRetainer) GetBuffer ¶
func (r *FlightReaderBufferRetainer) GetBuffer(handle uint64) []byte
GetBuffer retrieves a retained buffer by handle
func (*FlightReaderBufferRetainer) MaxBuffers ¶
func (r *FlightReaderBufferRetainer) MaxBuffers() int
MaxBuffers returns the maximum number of retained buffers
func (*FlightReaderBufferRetainer) ReleaseBuffer ¶
func (r *FlightReaderBufferRetainer) ReleaseBuffer(handle uint64) bool
ReleaseBuffer releases a retained buffer
func (*FlightReaderBufferRetainer) RetainBuffer ¶
func (r *FlightReaderBufferRetainer) RetainBuffer(buf []byte) uint64
RetainBuffer retains a buffer and returns a handle for later retrieval
func (*FlightReaderBufferRetainer) Stats ¶
func (r *FlightReaderBufferRetainer) Stats() BufferRetainerStats
Stats returns current buffer retainer statistics
type FusionMode ¶
type FusionMode int
FusionMode defines how to combine dense and sparse results
const ( FusionModeRRF FusionMode = iota // Reciprocal Rank Fusion FusionModeLinear // Linear weighted combination FusionModeCascade // Cascade: filters -> keyword -> vector )
type GRPCConfig ¶
type GRPCConfig struct {
// Keepalive parameters
KeepAliveTime time.Duration // Time between keepalive pings (server sends to client)
KeepAliveTimeout time.Duration // Timeout for keepalive ping acknowledgement
KeepAliveMinTime time.Duration // Minimum time client should wait between pings
KeepAlivePermitWithoutStream bool // Allow keepalive pings when no active streams
// Concurrent streams limit
MaxConcurrentStreams uint32 // Maximum concurrent streams per connection
// HTTP/2 flow control window sizes
InitialWindowSize int32 // Initial window size for a stream
InitialConnWindowSize int32 // Initial window size for a connection
// Message size limits
MaxRecvMsgSize int // Maximum message size the server can receive
MaxSendMsgSize int // Maximum message size the server can send
// Compression settings - enables gzip compression for 50-70% bandwidth reduction
CompressionEnabled bool // Enable gzip compression for streaming data
}
GRPCConfig holds all gRPC server/client tuning parameters for optimal throughput. Configuring these parameters can yield 5-10% improvement in sustained throughput.
func DefaultGRPCConfig ¶
func DefaultGRPCConfig() GRPCConfig
DefaultGRPCConfig returns a GRPCConfig with sensible defaults optimized for throughput. These defaults are tuned for vector database workloads with large message payloads.
func (GRPCConfig) BuildClientOptions ¶
func (c GRPCConfig) BuildClientOptions() []grpc.DialOption
BuildClientOptions returns a slice of grpc.DialOption configured from this GRPCConfig. These options optimize gRPC client performance to match server settings. When CompressionEnabled is true, all calls will use gzip compression for 50-70% bandwidth reduction on vector data.
func (GRPCConfig) BuildServerOptions ¶
func (c GRPCConfig) BuildServerOptions() []grpc.ServerOption
BuildServerOptions returns a slice of grpc.ServerOption configured from this GRPCConfig. These options optimize gRPC server performance for sustained throughput. Note: Server-side compression is automatically available when the gzip encoding package is imported - no explicit server option needed. The server will respond with compressed data when clients request it via UseCompressor.
func (GRPCConfig) ClientKeepaliveParams ¶
func (c GRPCConfig) ClientKeepaliveParams() keepalive.ClientParameters
ClientKeepaliveParams returns keepalive.ClientParameters for client configuration.
func (GRPCConfig) ServerEnforcementPolicy ¶
func (c GRPCConfig) ServerEnforcementPolicy() keepalive.EnforcementPolicy
ServerEnforcementPolicy returns keepalive.EnforcementPolicy for server configuration.
func (GRPCConfig) ServerKeepaliveParams ¶
func (c GRPCConfig) ServerKeepaliveParams() keepalive.ServerParameters
ServerKeepaliveParams returns keepalive.ServerParameters for server configuration.
func (GRPCConfig) String ¶
func (c GRPCConfig) String() string
String returns a human-readable representation of the config for logging.
func (GRPCConfig) Validate ¶
func (c GRPCConfig) Validate() error
Validate checks if the configuration is valid.
type GlobalSearchCoordinator ¶
type GlobalSearchCoordinator struct {
// contains filtered or unexported fields
}
GlobalSearchCoordinator handles scatter-gather logic
func NewGlobalSearchCoordinator ¶
func NewGlobalSearchCoordinator(logger *zap.Logger) *GlobalSearchCoordinator
func (*GlobalSearchCoordinator) GlobalSearch ¶
func (c *GlobalSearchCoordinator) GlobalSearch(ctx context.Context, localResults []SearchResult, req VectorSearchRequest, peers []mesh.Member) ([]SearchResult, error)
GlobalSearch performs scatter-gather search across the cluster
type GraphStore ¶
type GraphStore struct {
// contains filtered or unexported fields
}
GraphStore manages knowledge graph edges for GraphRAG workflows
func (*GraphStore) AddEdge ¶
func (gs *GraphStore) AddEdge(e Edge) error
AddEdge adds an edge to the graph store
func (*GraphStore) CommunityCount ¶
func (gs *GraphStore) CommunityCount() int
CommunityCount returns the number of detected communities
func (*GraphStore) DetectCommunities ¶
func (gs *GraphStore) DetectCommunities() []Community
DetectCommunities runs Louvain algorithm to find communities
func (*GraphStore) EdgeCount ¶
func (gs *GraphStore) EdgeCount() int
EdgeCount returns the total number of edges
func (*GraphStore) FromArrowBatch ¶
func (gs *GraphStore) FromArrowBatch(batch arrow.Record) error
FromArrowBatch loads edges from an Arrow RecordBatch
func (*GraphStore) GetCommunityForNode ¶
func (gs *GraphStore) GetCommunityForNode(node string) int
GetCommunityForNode returns the community ID for a given node
func (*GraphStore) GetEdgesByObject ¶
func (gs *GraphStore) GetEdgesByObject(object string) []Edge
GetEdgesByObject returns all edges with the given object (incoming edges)
func (*GraphStore) GetEdgesByPredicate ¶
func (gs *GraphStore) GetEdgesByPredicate(predicate string) []Edge
GetEdgesByPredicate returns all edges with the given predicate
func (*GraphStore) GetEdgesBySubject ¶
func (gs *GraphStore) GetEdgesBySubject(subject string) []Edge
GetEdgesBySubject returns all edges with the given subject (outgoing edges)
func (*GraphStore) PredicateVocabulary ¶
func (gs *GraphStore) PredicateVocabulary() []string
PredicateVocabulary returns all unique predicate types
func (*GraphStore) ToArrowBatch ¶
ToArrowBatch converts all edges to an Arrow RecordBatch with Dictionary-encoded predicates
func (*GraphStore) Traverse ¶
func (gs *GraphStore) Traverse(start string, maxHops int) []Path
Traverse performs BFS traversal from start node up to maxHops depth
func (*GraphStore) TraverseParallel ¶
func (gs *GraphStore) TraverseParallel(starts []string, maxHops int) map[string][]Path
TraverseParallel performs concurrent traversal from multiple starting points
type HNSWGraphSync ¶
type HNSWGraphSync struct {
// contains filtered or unexported fields
}
HNSWGraphSync manages HNSW index synchronization between peers.
func NewHNSWGraphSync ¶
func NewHNSWGraphSync(index *HNSWIndex) *HNSWGraphSync
NewHNSWGraphSync creates a new sync manager for the given index.
func (*HNSWGraphSync) ApplyDelta ¶
func (s *HNSWGraphSync) ApplyDelta(delta *DeltaSync) error
ApplyDelta applies incremental changes from a delta.
func (*HNSWGraphSync) ExportDelta ¶
func (s *HNSWGraphSync) ExportDelta(fromVersion uint64) (*DeltaSync, error)
ExportDelta exports changes since the given version.
func (*HNSWGraphSync) ExportGraph ¶
func (s *HNSWGraphSync) ExportGraph(w io.Writer) error
ExportGraph exports just the HNSW graph structure.
func (*HNSWGraphSync) ExportState ¶
func (s *HNSWGraphSync) ExportState() ([]byte, error)
ExportState serializes the complete HNSW index state.
func (*HNSWGraphSync) GetExportCount ¶
func (s *HNSWGraphSync) GetExportCount() uint64
GetExportCount returns the number of exports performed.
func (*HNSWGraphSync) GetImportCount ¶
func (s *HNSWGraphSync) GetImportCount() uint64
GetImportCount returns the number of imports performed.
func (*HNSWGraphSync) GetVersion ¶
func (s *HNSWGraphSync) GetVersion() uint64
GetVersion returns the current sync version.
func (*HNSWGraphSync) ImportGraph ¶
func (s *HNSWGraphSync) ImportGraph(r io.Reader) error
ImportGraph imports just the HNSW graph structure.
func (*HNSWGraphSync) ImportState ¶
func (s *HNSWGraphSync) ImportState(data []byte) error
ImportState deserializes and applies HNSW index state.
func (*HNSWGraphSync) IncrementVersion ¶
func (s *HNSWGraphSync) IncrementVersion()
IncrementVersion increments the sync version.
type HNSWIndex ¶
type HNSWIndex struct {
Graph *hnsw.Graph[VectorID]
Metric VectorMetric // Distance metric used by this index
// contains filtered or unexported fields
}
HNSWIndex wraps the hnsw.Graph and manages the mapping from ID to Arrow data.
func NewHNSWIndex ¶
NewHNSWIndex creates a new index for the given dataset using Euclidean distance.
func NewHNSWIndexWithCapacity ¶
NewHNSWIndexWithCapacity creates a new index with pre-allocated locations slice.
func NewHNSWIndexWithMetric ¶
func NewHNSWIndexWithMetric(ds *Dataset, metric VectorMetric) *HNSWIndex
NewHNSWIndexWithMetric creates a new index for the given dataset with the specified metric.
func (*HNSWIndex) AddBatchParallel ¶
AddBatchParallel adds multiple vectors in parallel using worker goroutines. It uses a three-phase approach: 1. Pre-allocate locations atomically under single lock 2. Parallel vector retrieval using worker goroutines 3. Sequential graph insertion (hnsw library requirement) This can reduce build time by up to 85% compared to sequential insertion by parallelizing the expensive vector fetch operations.
func (*HNSWIndex) AddByLocation ¶
AddByLocation implements VectorIndex interface for HNSWIndex.
func (*HNSWIndex) AddByRecord ¶
AddByRecord implements VectorIndex interface for HNSWIndex.
func (*HNSWIndex) AddSafe ¶
AddSafe adds a vector using a direct record batch reference. It COPIES the vector to ensure it remains stable even if the record batch is released.
func (*HNSWIndex) GetDimension ¶
GetDimension returns the vector dimension for this index
func (*HNSWIndex) GetDistanceFunc ¶
GetDistanceFunc returns the distance function. If PQ is enabled, it returns the SDC-accelerated PQ distance.
func (*HNSWIndex) GetLocation ¶
GetLocation returns the storage location for a given VectorID
func (*HNSWIndex) IsGPUEnabled ¶
IsGPUEnabled returns whether GPU acceleration is active
func (*HNSWIndex) PutResults ¶
PutResults returns a search result slice to the pool for reuse. Callers should call this when done with SearchByID results.
func (*HNSWIndex) RegisterReader ¶
func (h *HNSWIndex) RegisterReader()
RegisterReader increments the active reader count for zero-copy safety
func (*HNSWIndex) RemapLocations ¶
func (h *HNSWIndex) RemapLocations(remapping map[int]BatchRemapInfo)
RemapLocations safely updates vector locations after compaction. It iterates over all locations and applies the remapping if the batch index matches.
func (*HNSWIndex) RerankBatch ¶
func (h *HNSWIndex) RerankBatch(query []float32, candidateIDs []VectorID, k int) []RankedResult
RerankBatch computes exact distances for a set of candidate IDs and returns top-k.
func (*HNSWIndex) SearchBatch ¶
func (h *HNSWIndex) SearchBatch(queries [][]float32, k int) [][]RankedResult
SearchBatch is a convenience method that calls SearchBatchOptimized.
func (*HNSWIndex) SearchBatchOptimized ¶
func (h *HNSWIndex) SearchBatchOptimized(queries [][]float32, k int) [][]RankedResult
SearchBatchOptimized performs k-NN search for multiple query vectors using optimized batch distance calculations.
This method is more efficient than calling SearchWithBatchDistance multiple times because it can amortize the overhead of vector collection and leverage cache locality when processing multiple queries.
func (*HNSWIndex) SearchBatchWithArena ¶
func (h *HNSWIndex) SearchBatchWithArena(queries [][]float32, k int, arena *SearchArena) [][]RankedResult
SearchBatchWithArena performs batch search using an arena allocator.
func (*HNSWIndex) SearchByIDUnsafe ¶
SearchByIDUnsafe performs k-NN search using zero-copy vector access. This avoids the allocation and copy overhead of SearchByID by using getVectorUnsafe with epoch protection. The vector data is only valid during the search operation. Returns nil if id is invalid or k <= 0.
func (*HNSWIndex) SearchHybrid ¶
func (h *HNSWIndex) SearchHybrid(query []float32, k int) []SearchResult
SearchHybrid performs GPU+CPU hybrid search Uses GPU for candidate generation, then refines with CPU HNSW graph
func (*HNSWIndex) SearchVectors ¶
func (h *HNSWIndex) SearchVectors(query []float32, k int, filters []Filter) []SearchResult
SearchVectors performs k-NN search returning full results with scores (distances). Uses striped locks for location access to reduce contention in result processing.
func (*HNSWIndex) SearchVectorsWithBitmap ¶
func (h *HNSWIndex) SearchVectorsWithBitmap(query []float32, k int, filter *Bitset) []SearchResult
SearchVectorsWithBitmap returns k nearest neighbors filtered by a bitset.
func (*HNSWIndex) SearchWithArena ¶
func (h *HNSWIndex) SearchWithArena(query []float32, k int, arena *SearchArena) []VectorID
SearchWithArena performs k-NN search using the provided arena for allocations. If arena is nil or exhausted, falls back to heap allocation. The returned slice is allocated from the arena and should NOT be returned to resultPool. Call arena.Reset() after processing results to reclaim memory for next request.
func (*HNSWIndex) SearchWithBatchDistance ¶
func (h *HNSWIndex) SearchWithBatchDistance(query []float32, k int) []RankedResult
SearchWithBatchDistance performs k-NN search using batch distance calculations. This is a two-stage retrieval: 1. Coarse search using the HNSW graph to get initial candidates 2. Batch distance calculation on all candidates for precise ranking
Using batch SIMD operations provides significant speedup over per-vector distance calculations by reducing function call overhead and maximizing CPU pipeline utilization.
func (*HNSWIndex) SetIndexedColumns ¶
SetIndexedColumns satisfies VectorIndex interface
func (*HNSWIndex) SetPQEncoder ¶
SetPQEncoder enables product quantization with the provided encoder.
func (*HNSWIndex) SyncGPU ¶
SyncGPU adds vectors to the GPU index Should be called after adding vectors to the CPU index
func (*HNSWIndex) TrainPQ ¶
TrainPQ trains a PQ encoder on the current dataset elements and enables it. This is a blocking operation.
func (*HNSWIndex) UnregisterReader ¶
func (h *HNSWIndex) UnregisterReader()
UnregisterReader decrements the active reader count
type HNSWIndexConfig ¶
HNSWIndexConfig holds HNSW-specific configuration
type HNSWPluggableAdapter ¶
type HNSWPluggableAdapter struct {
// contains filtered or unexported fields
}
HNSWPluggableAdapter wraps HNSWIndex to implement PluggableVectorIndex
func (*HNSWPluggableAdapter) Add ¶
func (h *HNSWPluggableAdapter) Add(id uint64, vector []float32) error
func (*HNSWPluggableAdapter) AddBatch ¶
func (h *HNSWPluggableAdapter) AddBatch(ids []uint64, vectors [][]float32) error
func (*HNSWPluggableAdapter) AddByLocation ¶
func (h *HNSWPluggableAdapter) AddByLocation(batchIdx, rowIdx int) error
func (*HNSWPluggableAdapter) Build ¶
func (h *HNSWPluggableAdapter) Build() error
func (*HNSWPluggableAdapter) Close ¶
func (h *HNSWPluggableAdapter) Close() error
func (*HNSWPluggableAdapter) Dimension ¶
func (h *HNSWPluggableAdapter) Dimension() int
func (*HNSWPluggableAdapter) Len ¶
func (h *HNSWPluggableAdapter) Len() int
func (*HNSWPluggableAdapter) Load ¶
func (h *HNSWPluggableAdapter) Load(path string) error
func (*HNSWPluggableAdapter) NeedsBuild ¶
func (h *HNSWPluggableAdapter) NeedsBuild() bool
func (*HNSWPluggableAdapter) Save ¶
func (h *HNSWPluggableAdapter) Save(path string) error
func (*HNSWPluggableAdapter) Search ¶
func (h *HNSWPluggableAdapter) Search(query []float32, k int) ([]IndexSearchResult, error)
func (*HNSWPluggableAdapter) SearchBatch ¶
func (h *HNSWPluggableAdapter) SearchBatch(queries [][]float32, k int) ([][]IndexSearchResult, error)
func (*HNSWPluggableAdapter) SearchVectors ¶
func (h *HNSWPluggableAdapter) SearchVectors(query []float32, k int) []SearchResult
func (*HNSWPluggableAdapter) Size ¶
func (h *HNSWPluggableAdapter) Size() int
func (*HNSWPluggableAdapter) Type ¶
func (h *HNSWPluggableAdapter) Type() IndexType
type HybridPipelineConfig ¶
type HybridPipelineConfig struct {
Alpha float32 // 0.0=pure keyword, 1.0=pure vector, 0.5=balanced
RRFk int // RRF parameter k (typically 60)
FusionMode FusionMode // How to combine results
UseColumnIndex bool // Use column index for exact filters
}
HybridPipelineConfig configures the hybrid search pipeline
func DefaultHybridPipelineConfig ¶
func DefaultHybridPipelineConfig() HybridPipelineConfig
DefaultHybridPipelineConfig returns sensible defaults
func (*HybridPipelineConfig) Validate ¶
func (c *HybridPipelineConfig) Validate() error
type HybridQuery ¶
type HybridQuery struct {
Enabled bool // Whether hybrid search is enabled for this query
TextQuery string // The text query for BM25/keyword search
Alpha float64 // Weighting: 1.0 = pure vector, 0.0 = pure keyword
K int // Number of results to return
}
HybridQuery represents a hybrid search query combining vector and BM25 search
func DefaultHybridQuery ¶
func DefaultHybridQuery() HybridQuery
DefaultHybridQuery returns a HybridQuery with sensible defaults
func (*HybridQuery) IsPureKeyword ¶
func (q *HybridQuery) IsPureKeyword() bool
IsPureKeyword returns true if this is a pure keyword/BM25 search (alpha=0.0)
func (*HybridQuery) IsPureVector ¶
func (q *HybridQuery) IsPureVector() bool
IsPureVector returns true if this is a pure vector search (alpha=1.0)
func (*HybridQuery) Validate ¶
func (q *HybridQuery) Validate() error
Validate checks if the HybridQuery is valid
type HybridSearchConfig ¶
type HybridSearchConfig struct {
// Alpha controls the weighting between vector and keyword search
// Alpha = 1.0 means pure vector search (dense)
// Alpha = 0.0 means pure keyword search (sparse/BM25)
// Alpha = 0.5 means balanced hybrid
Alpha float32
// TextColumns specifies which columns should be indexed for BM25 search
TextColumns []string
// RRFk is the k parameter for Reciprocal Rank Fusion
// Higher values give more weight to lower-ranked results
// Typical values: 60 (default), range 1-1000
RRFk int
// BM25 contains the BM25 algorithm parameters
BM25 BM25Config
// Enabled determines if hybrid search is active
Enabled bool
}
HybridSearchConfig configures hybrid search behavior combining BM25 and vector search
func DefaultHybridSearchConfig ¶
func DefaultHybridSearchConfig() HybridSearchConfig
DefaultHybridSearchConfig returns a HybridSearchConfig with sensible defaults
func (*HybridSearchConfig) IsPureKeyword ¶
func (c *HybridSearchConfig) IsPureKeyword() bool
IsPureKeyword returns true if Alpha is 0.0 (pure keyword search)
func (*HybridSearchConfig) IsPureVector ¶
func (c *HybridSearchConfig) IsPureVector() bool
IsPureVector returns true if Alpha is 1.0 (pure vector search)
func (*HybridSearchConfig) Validate ¶
func (c *HybridSearchConfig) Validate() error
Validate checks if the configuration is valid
type HybridSearchPipeline ¶
type HybridSearchPipeline struct {
// contains filtered or unexported fields
}
HybridSearchPipeline combines column index, BM25, and HNSW search
func NewHybridSearchPipeline ¶
func NewHybridSearchPipeline(cfg HybridPipelineConfig) *HybridSearchPipeline
NewHybridSearchPipeline creates a new hybrid search pipeline
func (*HybridSearchPipeline) Search ¶
func (p *HybridSearchPipeline) Search(query *HybridSearchQuery) ([]SearchResult, error)
Search performs hybrid search combining all configured indexes
func (*HybridSearchPipeline) SetBM25Index ¶
func (p *HybridSearchPipeline) SetBM25Index(idx *BM25InvertedIndex)
SetBM25Index sets the BM25 inverted index for keyword search
func (*HybridSearchPipeline) SetColumnIndex ¶
func (p *HybridSearchPipeline) SetColumnIndex(idx *ColumnInvertedIndex)
SetColumnIndex sets the column inverted index for exact filters
func (*HybridSearchPipeline) SetHNSWIndex ¶
func (p *HybridSearchPipeline) SetHNSWIndex(idx *HNSWIndex)
SetHNSWIndex sets the HNSW index for vector search
type HybridSearchQuery ¶
type HybridSearchQuery struct {
Vector []float32 // Query vector for dense search
KeywordQuery string // Text query for BM25 search
K int // Number of results
AlphaOverride *float32 // Override pipeline alpha if set
ExactFilters []Filter // Exact match filters (for column index)
}
HybridSearchQuery extends search with vector, keyword, and filter options
func DefaultHybridSearchQuery ¶
func DefaultHybridSearchQuery() HybridSearchQuery
func (*HybridSearchQuery) Validate ¶
func (q *HybridSearchQuery) Validate() error
type IPCBufferPool ¶
type IPCBufferPool struct {
// contains filtered or unexported fields
}
IPCBufferPool manages reusable bytes.Buffer instances for IPC serialization. Uses sync.Pool internally for thread-safe, lock-free buffer management.
func NewIPCBufferPool ¶
func NewIPCBufferPool(cfg RecordWriterPoolConfig) *IPCBufferPool
NewIPCBufferPool creates a new buffer pool with the given configuration.
func (*IPCBufferPool) Get ¶
func (p *IPCBufferPool) Get() *bytes.Buffer
Get retrieves a buffer from the pool or allocates a new one. The returned buffer is reset and ready for use.
func (*IPCBufferPool) Put ¶
func (p *IPCBufferPool) Put(buf *bytes.Buffer)
Put returns a buffer to the pool for reuse. Oversized buffers (> MaxBufferSize) are discarded to prevent memory bloat.
func (*IPCBufferPool) Stats ¶
func (p *IPCBufferPool) Stats() IPCBufferPoolStats
Stats returns current pool statistics.
type IPCBufferPoolStats ¶
type IPCBufferPoolStats struct {
Gets int64 // Total Get() calls
Puts int64 // Total Put() calls
Hits int64 // Buffer reused from pool
Misses int64 // New buffer allocated
Discarded int64 // Oversized buffers discarded
}
IPCBufferPoolStats tracks pool usage metrics.
type IVFFlatConfig ¶
IVFFlatConfig holds IVF-Flat-specific configuration
type IVFFlatIndex ¶
type IVFFlatIndex struct {
// contains filtered or unexported fields
}
IVFFlatIndex implements PluggableVectorIndex for IVF-Flat algorithm
func (*IVFFlatIndex) AddBatch ¶
func (ivf *IVFFlatIndex) AddBatch(ids []uint64, vectors [][]float32) error
func (*IVFFlatIndex) AddByLocation ¶
func (ivf *IVFFlatIndex) AddByLocation(batchIdx, rowIdx int) error
func (*IVFFlatIndex) Build ¶
func (ivf *IVFFlatIndex) Build() error
func (*IVFFlatIndex) Close ¶
func (ivf *IVFFlatIndex) Close() error
func (*IVFFlatIndex) Dimension ¶
func (ivf *IVFFlatIndex) Dimension() int
func (*IVFFlatIndex) Len ¶
func (ivf *IVFFlatIndex) Len() int
func (*IVFFlatIndex) Load ¶
func (ivf *IVFFlatIndex) Load(path string) error
func (*IVFFlatIndex) NeedsBuild ¶
func (ivf *IVFFlatIndex) NeedsBuild() bool
func (*IVFFlatIndex) Save ¶
func (ivf *IVFFlatIndex) Save(path string) error
func (*IVFFlatIndex) Search ¶
func (ivf *IVFFlatIndex) Search(query []float32, k int) ([]IndexSearchResult, error)
func (*IVFFlatIndex) SearchBatch ¶
func (ivf *IVFFlatIndex) SearchBatch(queries [][]float32, k int) ([][]IndexSearchResult, error)
func (*IVFFlatIndex) SearchVectors ¶
func (ivf *IVFFlatIndex) SearchVectors(query []float32, k int) []SearchResult
func (*IVFFlatIndex) Size ¶
func (ivf *IVFFlatIndex) Size() int
func (*IVFFlatIndex) Type ¶
func (ivf *IVFFlatIndex) Type() IndexType
type IndexConfig ¶
type IndexConfig struct {
Type IndexType
Dimension int
// Type-specific configurations
HNSWConfig *HNSWIndexConfig
IVFFlatConfig *IVFFlatConfig
DiskANNConfig *DiskANNConfig
}
IndexConfig holds configuration for creating an index
type IndexConstructor ¶
type IndexConstructor func(cfg IndexConfig) (PluggableVectorIndex, error)
IndexConstructor is a function that creates a PluggableVectorIndex
type IndexFactory ¶
type IndexFactory struct {
// contains filtered or unexported fields
}
IndexFactory creates indexes by type using a registry pattern
func NewIndexFactory ¶
func NewIndexFactory() *IndexFactory
NewIndexFactory creates a new factory with default index types registered
func (*IndexFactory) Create ¶
func (f *IndexFactory) Create(cfg IndexConfig) (PluggableVectorIndex, error)
Create creates an index of the specified type
func (*IndexFactory) ListTypes ¶
func (f *IndexFactory) ListTypes() []IndexType
ListTypes returns all registered index types
func (*IndexFactory) Register ¶
func (f *IndexFactory) Register(t IndexType, ctor IndexConstructor)
Register adds a new index type to the factory
type IndexJob ¶
type IndexJob struct {
DatasetName string
Record arrow.RecordBatch
BatchIdx int
RowIdx int
CreatedAt time.Time
}
IndexJob represents a job for the indexing worker
type IndexJobQueue ¶
type IndexJobQueue struct {
// contains filtered or unexported fields
}
IndexJobQueue provides non-blocking job submission with overflow handling.
func NewIndexJobQueue ¶
func NewIndexJobQueue(cfg IndexJobQueueConfig) *IndexJobQueue
NewIndexJobQueue creates a new non-blocking index job queue.
func (*IndexJobQueue) Jobs ¶
func (q *IndexJobQueue) Jobs() <-chan IndexJob
Jobs returns the channel for consumers to read from.
func (*IndexJobQueue) Len ¶
func (q *IndexJobQueue) Len() int
Len returns approximate queue depth (main + overflow).
func (*IndexJobQueue) Send ¶
func (q *IndexJobQueue) Send(job IndexJob) bool
Send submits a job without blocking. Returns true if accepted, false if dropped.
func (*IndexJobQueue) SendBatch ¶
func (q *IndexJobQueue) SendBatch(jobs []IndexJob) int
SendBatch submits multiple jobs efficiently.
func (*IndexJobQueue) Stats ¶
func (q *IndexJobQueue) Stats() IndexJobQueueStats
Stats returns current queue statistics.
type IndexJobQueueConfig ¶
type IndexJobQueueConfig struct {
MainChannelSize int // Primary channel buffer size
OverflowBufferSize int // Secondary overflow buffer size
DropOnOverflow bool // If true, drop jobs when both buffers full
DrainInterval time.Duration // How often to drain overflow to main channel
}
IndexJobQueueConfig configures the non-blocking index job queue.
func DefaultIndexJobQueueConfig ¶
func DefaultIndexJobQueueConfig() IndexJobQueueConfig
DefaultIndexJobQueueConfig returns sensible defaults for production.
type IndexJobQueueStats ¶
type IndexJobQueueStats struct {
TotalSent uint64 // Total jobs sent
DirectSent uint64 // Jobs sent directly to main channel
OverflowCount uint64 // Jobs sent to overflow buffer
DrainedCount uint64 // Jobs drained from overflow to main
DroppedCount uint64 // Jobs dropped when both buffers full
}
IndexJobQueueStats tracks queue statistics.
type IndexSearchResult ¶
IndexSearchResult represents a single search result from any index type
type InvertedIndex ¶
type InvertedIndex struct {
// contains filtered or unexported fields
}
InvertedIndex maps string terms to document IDs (row indices) using compressed bitmaps
func NewInvertedIndex ¶
func NewInvertedIndex() *InvertedIndex
NewInvertedIndex creates a new empty inverted index
func (*InvertedIndex) Add ¶
func (idx *InvertedIndex) Add(term string, docID uint32)
Add inserts a term-docID pair into the index
func (*InvertedIndex) AddBatch ¶
func (idx *InvertedIndex) AddBatch(terms []string, docID uint32)
AddBatch adds multiple terms for a single document
func (*InvertedIndex) Delete ¶
func (idx *InvertedIndex) Delete(term string, docID uint32)
Delete removes a term-docID pair from the index
func (*InvertedIndex) Get ¶
func (idx *InvertedIndex) Get(term string) *roaring.Bitmap
Get returns the bitmap for a given term, or nil if not found Returns a CLONE/Copy to be safe for concurrent reading/iteration by caller? Roaring bitmaps are generally not thread-safe for mutation, but read-only is fine IF write lock is held during mutation. But we return it to caller who might use it while writer mutates. So we MUST return a Clone.
func (*InvertedIndex) MemoryUsage ¶
func (idx *InvertedIndex) MemoryUsage() uint64
MemoryUsage estimates memory usage (expensive)
func (*InvertedIndex) Stats ¶
func (idx *InvertedIndex) Stats() (int, uint64)
Stats returns basic stats about the index
type LeastConnectionsStrategy ¶
type LeastConnectionsStrategy struct {
// contains filtered or unexported fields
}
LeastConnectionsStrategy picks the replica with fewest active connections
func NewLeastConnectionsStrategy ¶
func NewLeastConnectionsStrategy() *LeastConnectionsStrategy
NewLeastConnectionsStrategy creates a new least connections strategy
func (*LeastConnectionsStrategy) DecrementConnections ¶
func (s *LeastConnectionsStrategy) DecrementConnections(replica string)
DecrementConnections removes a connection for a replica
func (*LeastConnectionsStrategy) GetConnectionCount ¶
func (s *LeastConnectionsStrategy) GetConnectionCount(replica string) int64
GetConnectionCount returns current connection count for a replica
func (*LeastConnectionsStrategy) IncrementConnections ¶
func (s *LeastConnectionsStrategy) IncrementConnections(replica string)
IncrementConnections adds a connection for a replica
func (*LeastConnectionsStrategy) Select ¶
func (s *LeastConnectionsStrategy) Select(replicas []string) string
Select picks the replica with lowest connection count
type LoadBalancerConfig ¶
type LoadBalancerConfig struct {
Strategy StrategyType
HealthCheckInterval time.Duration
}
LoadBalancerConfig holds configuration for the load balancer
type LoadBalancerStats ¶
type LoadBalancerStats struct {
TotalSelections int64
FailedSelections int64
HealthyReplicas int
TotalReplicas int
}
LoadBalancerStats holds statistics about load balancer operations
type LoadBalancerStrategy ¶
LoadBalancerStrategy defines the interface for selection strategies
type LockFreeHNSW ¶
type LockFreeHNSW struct {
// contains filtered or unexported fields
}
LockFreeHNSW implements a high-throughput HNSW index avoiding global locks.
func NewLockFreeHNSW ¶
func NewLockFreeHNSW() *LockFreeHNSW
func (*LockFreeHNSW) Add ¶
func (h *LockFreeHNSW) Add(id VectorID, vec []float32)
type LockFreeNode ¶
type LockFreeNode struct {
ID VectorID
Vec []float32
Level int
// Friends[level] is a list of neighbor IDs.
Friends [][]VectorID
// contains filtered or unexported fields
}
LockFreeNode represents a node in the HNSW graph with concurrent access support.
type MemoryAdvice ¶
type MemoryAdvice int
MemoryAdvice identifies the type of access pattern for a memory region.
const ( AdviceNormal MemoryAdvice = iota AdviceRandom AdviceSequential AdviceWillNeed AdviceDontNeed AdviceHugePage )
type MemoryBackpressureController ¶
type MemoryBackpressureController struct {
// contains filtered or unexported fields
}
MemoryBackpressureController manages memory backpressure for the store.
func NewMemoryBackpressureController ¶
func NewMemoryBackpressureController(cfg BackpressureConfig) *MemoryBackpressureController
NewMemoryBackpressureController creates a new backpressure controller.
func (*MemoryBackpressureController) Acquire ¶
func (c *MemoryBackpressureController) Acquire(ctx context.Context) error
Acquire blocks until memory pressure allows proceeding.
func (*MemoryBackpressureController) CheckPressure ¶
func (c *MemoryBackpressureController) CheckPressure() PressureLevel
CheckPressure evaluates current memory usage and returns pressure level.
func (*MemoryBackpressureController) GetAcquireCount ¶
func (c *MemoryBackpressureController) GetAcquireCount() uint64
GetAcquireCount returns the total number of successful acquires.
func (*MemoryBackpressureController) GetHardLimit ¶
func (c *MemoryBackpressureController) GetHardLimit() uint64
GetHardLimit returns the configured hard limit.
func (*MemoryBackpressureController) GetPressureLevel ¶
func (c *MemoryBackpressureController) GetPressureLevel() PressureLevel
GetPressureLevel returns the current pressure level without re-checking.
func (*MemoryBackpressureController) GetRejectCount ¶
func (c *MemoryBackpressureController) GetRejectCount() uint64
GetRejectCount returns the total number of rejected acquires.
func (*MemoryBackpressureController) GetSoftLimit ¶
func (c *MemoryBackpressureController) GetSoftLimit() uint64
GetSoftLimit returns the configured soft limit.
func (*MemoryBackpressureController) Release ¶
func (c *MemoryBackpressureController) Release()
Release signals completion of a memory-intensive operation.
func (*MemoryBackpressureController) SetPressureLevel ¶
func (c *MemoryBackpressureController) SetPressureLevel(level PressureLevel)
SetPressureLevel manually sets the pressure level (for testing).
type MerkleNode ¶
type MerkleNode struct {
Hash [32]byte
Children [MerkleFanout]*MerkleNode
}
MerkleNode represents a node in the Merkle Tree
type MerkleTree ¶
type MerkleTree struct {
// contains filtered or unexported fields
}
MerkleTree manages consistency hashes for a Dataset
func NewMerkleTree ¶
func NewMerkleTree() *MerkleTree
func (*MerkleTree) RootHash ¶
func (t *MerkleTree) RootHash() [32]byte
func (*MerkleTree) Update ¶
func (t *MerkleTree) Update(id VectorID, ts int64)
Update updates the hash for a given VectorID with its latest timestamp
type MeshStatusCache ¶
type MeshStatusCache struct {
// contains filtered or unexported fields
}
MeshStatusCache caches the serialized mesh status to avoid repeated allocations.
func NewMeshStatusCache ¶
func NewMeshStatusCache(ttl time.Duration) *MeshStatusCache
NewMeshStatusCache creates a new mesh status cache with the specified TTL.
func (*MeshStatusCache) Get ¶
func (c *MeshStatusCache) Get(currentMemberCount int) []byte
Get returns the cached JSON if valid, otherwise returns nil.
func (*MeshStatusCache) Invalidate ¶
func (c *MeshStatusCache) Invalidate()
Invalidate clears the cache.
func (*MeshStatusCache) Set ¶
func (c *MeshStatusCache) Set(data []byte, memberCount int)
Set updates the cache with new JSON data.
type MetaServer ¶
type MetaServer struct {
*VectorStore
// contains filtered or unexported fields
}
MetaServer handles control plane operations (ListFlights, GetFlightInfo) Embeds VectorStore to inherit base interface, overrides methods for error conversion.
func NewMetaServer ¶
func NewMetaServer(store *VectorStore) *MetaServer
func (*MetaServer) DoAction ¶
func (s *MetaServer) DoAction(action *flight.Action, stream flight.FlightService_DoActionServer) error
DoAction handles management commands on MetaServer
func (*MetaServer) DoExchange ¶
func (s *MetaServer) DoExchange(stream flight.FlightService_DoExchangeServer) error
DoExchange returns Unimplemented on MetaServer
func (*MetaServer) DoGet ¶
func (s *MetaServer) DoGet(tkt *flight.Ticket, stream flight.FlightService_DoGetServer) error
DoGet returns Unimplemented on MetaServer
func (*MetaServer) DoPut ¶
func (s *MetaServer) DoPut(stream flight.FlightService_DoPutServer) error
DoPut returns Unimplemented on MetaServer
func (*MetaServer) GetFlightInfo ¶
func (s *MetaServer) GetFlightInfo(ctx context.Context, desc *flight.FlightDescriptor) (*flight.FlightInfo, error)
GetFlightInfo returns dataset metadata, converting domain errors to gRPC status.
func (*MetaServer) ListFlights ¶
func (s *MetaServer) ListFlights(c *flight.Criteria, stream flight.FlightService_ListFlightsServer) error
ListFlights returns available datasets, converting domain errors to gRPC status.
type NUMAAllocator ¶
type NUMAAllocator struct {
// contains filtered or unexported fields
}
NUMAAllocator is a NUMA-aware memory allocator for Arrow buffers. On Linux with NUMA, it attempts to allocate memory on the specified node. On other platforms, it falls back to the default Go allocator.
func NewNUMAAllocator ¶
func NewNUMAAllocator(topo *NUMATopology, nodeID int) *NUMAAllocator
NewNUMAAllocator creates a new NUMA-aware allocator for the specified node.
func (*NUMAAllocator) Allocate ¶
func (a *NUMAAllocator) Allocate(size int) []byte
Allocate allocates memory, preferably on the NUMA node.
func (*NUMAAllocator) Free ¶
func (a *NUMAAllocator) Free(b []byte)
Free frees memory (handled by Go GC).
func (*NUMAAllocator) Reallocate ¶
func (a *NUMAAllocator) Reallocate(size int, b []byte) []byte
Reallocate reallocates memory.
type NUMATopology ¶
type NUMATopology struct {
NumNodes int // Number of NUMA nodes
CPUs [][]int // CPUs[nodeID] = list of CPU IDs on that node
}
NUMATopology represents the NUMA topology of the system.
func DetectNUMATopology ¶
func DetectNUMATopology() (*NUMATopology, error)
DetectNUMATopology detects the NUMA topology on Linux systems. Returns a single-node topology if NUMA is not available.
func (*NUMATopology) GetNodeForCPU ¶
func (t *NUMATopology) GetNodeForCPU(cpu int) int
GetNodeForCPU returns the node ID for a given CPU, or -1 if not found.
func (*NUMATopology) String ¶
func (t *NUMATopology) String() string
String returns a human-readable representation of the topology.
type Namespace ¶
type Namespace struct {
Name string
CreatedAt time.Time
Metadata map[string]string
// contains filtered or unexported fields
}
Namespace represents a tenant isolation unit containing related datasets. Namespaces provide multi-tenancy support with dataset isolation.
func NewNamespace ¶
NewNamespace creates a new namespace with the given name.
func (*Namespace) AddDataset ¶
AddDataset registers a dataset in this namespace.
func (*Namespace) DatasetCount ¶
DatasetCount returns the number of datasets in this namespace.
func (*Namespace) HasDataset ¶
HasDataset checks if a dataset exists in this namespace.
func (*Namespace) RemoveDataset ¶
RemoveDataset removes a dataset from this namespace.
type NotFoundError ¶
type NotFoundError struct {
Name string
}
NotFoundError indicates a snapshot was not found
func (*NotFoundError) Error ¶
func (e *NotFoundError) Error() string
type PQConfig ¶
type PQConfig struct {
M int // Number of subvectors (typically 8, 16, or 32)
Ksub int // Number of centroids per subvector (typically 256 for uint8)
Dim int // Total vector dimensions
SubDim int // Dimensions per subvector (Dim / M)
}
PQConfig holds the configuration for product quantization.
func DefaultPQConfig ¶
DefaultPQConfig returns a default PQ configuration for given dimensions.
type PQEncoder ¶
type PQEncoder struct {
// contains filtered or unexported fields
}
PQEncoder handles encoding/decoding using product quantization.
func NewPQEncoder ¶
NewPQEncoder creates encoder from pre-trained codebook.
func TrainPQEncoder ¶
TrainPQEncoder trains codebook via k-means on sample vectors.
func (*PQEncoder) ADCDistance ¶
ADCDistance computes Asymmetric Distance using precomputed table. This is O(M) per vector instead of O(D) - very fast for search.
func (*PQEncoder) ComputeDistanceTable ¶
ComputeDistanceTable precomputes distances from query to all centroids. Returns table[m][k] = squared distance from query subvector m to centroid k.
func (*PQEncoder) EnableSDC ¶
func (e *PQEncoder) EnableSDC()
EnableSDC precomputes the centroid-to-centroid distance table for O(M) search.
func (*PQEncoder) EncodeInto ¶
EncodeInto encodes vector into pre-allocated codes slice.
func (*PQEncoder) GetCodebook ¶
GetCodebook returns the learned codebook.
func (*PQEncoder) SDCDistance ¶
SDCDistance computes Symmetric Distance between two PQ codes. Uses precomputed centroid-centroid distances if EnableSDC was called.
func (*PQEncoder) SDCDistancePacked ¶
SDCDistancePacked computes Symmetric Distance between two packed PQ codes. 'a' and 'b' are float32 slices containing packed uint8 codes (4 codes per float32).
type ParserPoolStats ¶
ParserPoolStats tracks pool usage statistics
func GetParserPoolStats ¶
func GetParserPoolStats() ParserPoolStats
GetParserPoolStats returns current pool statistics
type PartitionedRecords ¶
type PartitionedRecords struct {
// contains filtered or unexported fields
}
PartitionedRecords provides partitioned storage for Arrow RecordBatches. Each partition has its own lock, enabling concurrent access to different partitions.
func NewPartitionedRecords ¶
func NewPartitionedRecords(cfg PartitionedRecordsConfig) *PartitionedRecords
NewPartitionedRecords creates new partitioned records with given config.
func NewPartitionedRecordsDefault ¶
func NewPartitionedRecordsDefault() *PartitionedRecords
NewPartitionedRecordsDefault creates partitioned records with default config.
func (*PartitionedRecords) Append ¶
func (pr *PartitionedRecords) Append(batch arrow.RecordBatch, routingKey uint64)
Append adds a batch to a partition based on the provided routing key. The batch is retained (ref count incremented).
func (*PartitionedRecords) AppendToPartition ¶
func (pr *PartitionedRecords) AppendToPartition(batch arrow.RecordBatch, partIdx int)
AppendToPartition adds a batch directly to a specific partition.
func (*PartitionedRecords) AppendWithKey ¶
func (pr *PartitionedRecords) AppendWithKey(batch arrow.RecordBatch, key uint64) int
AppendWithKey adds a batch using hash-based routing, returns partition index.
func (*PartitionedRecords) Clear ¶
func (pr *PartitionedRecords) Clear()
Clear removes all batches from all partitions.
func (*PartitionedRecords) ForEach ¶
func (pr *PartitionedRecords) ForEach(fn func(batch arrow.RecordBatch, partition int) bool)
ForEach iterates over all batches, calling fn for each. If fn returns false, iteration stops early.
func (*PartitionedRecords) ForEachInPartition ¶
func (pr *PartitionedRecords) ForEachInPartition(partIdx int, fn func(batch arrow.RecordBatch) bool)
ForEachInPartition iterates over batches in a specific partition.
func (*PartitionedRecords) GetAll ¶
func (pr *PartitionedRecords) GetAll() []arrow.RecordBatch
GetAll returns all batches across all partitions. Caller should NOT release returned batches - they are still owned by PartitionedRecords.
func (*PartitionedRecords) GetPartition ¶
func (pr *PartitionedRecords) GetPartition(partIdx int) []arrow.RecordBatch
GetPartition returns all batches from a specific partition.
func (*PartitionedRecords) NumPartitions ¶
func (pr *PartitionedRecords) NumPartitions() int
NumPartitions returns the number of partitions.
func (*PartitionedRecords) PartitionBatches ¶
func (pr *PartitionedRecords) PartitionBatches(partitionIdx int) int
PartitionBatches returns the number of batches in a specific partition.
func (*PartitionedRecords) ReplaceAll ¶
func (pr *PartitionedRecords) ReplaceAll(partIdx int, batches []arrow.RecordBatch)
ReplaceAll atomically replaces all batches in a partition.
func (*PartitionedRecords) Stats ¶
func (pr *PartitionedRecords) Stats() PartitionedRecordsStats
Stats returns current statistics.
func (*PartitionedRecords) TotalBatches ¶
func (pr *PartitionedRecords) TotalBatches() int
TotalBatches returns the total number of batches across all partitions.
type PartitionedRecordsConfig ¶
type PartitionedRecordsConfig struct {
NumPartitions int // Number of partitions (default: runtime.NumCPU())
}
PartitionedRecordsConfig configures partitioned records storage.
func DefaultPartitionedRecordsConfig ¶
func DefaultPartitionedRecordsConfig() PartitionedRecordsConfig
DefaultPartitionedRecordsConfig returns sensible defaults.
func (PartitionedRecordsConfig) Validate ¶
func (c PartitionedRecordsConfig) Validate() error
Validate checks configuration validity.
type PartitionedRecordsStats ¶
PartitionedRecordsStats holds statistics for partitioned records.
type Path ¶
type Path struct {
Nodes []string // Sequence of nodes visited
Edges []Edge // Edges traversed
Weight float32 // Cumulative path weight
}
Path represents a traversal path through the graph
type PeerReplicationResult ¶
type PeerReplicationResult struct {
PeerID string
Success bool
Error error
Attempts int
Duration time.Duration
}
PeerReplicationResult holds the result of a single peer replication attempt
type PeerReplicator ¶
type PeerReplicator struct {
// contains filtered or unexported fields
}
PeerReplicator handles replication to peer nodes
func NewPeerReplicator ¶
func NewPeerReplicator(cfg ReplicatorConfig) *PeerReplicator
NewPeerReplicator creates a new peer replicator
func (*PeerReplicator) AddPeer ¶
func (r *PeerReplicator) AddPeer(id, address string) error
AddPeer adds a peer to the replicator
func (*PeerReplicator) GetCircuitBreaker ¶
func (r *PeerReplicator) GetCircuitBreaker(peerID string) *CircuitBreaker
GetCircuitBreaker returns the circuit breaker for a peer
func (*PeerReplicator) GetPeers ¶
func (r *PeerReplicator) GetPeers() []*ReplicatorPeerInfo
GetPeers returns all registered peers
func (*PeerReplicator) RemovePeer ¶
func (r *PeerReplicator) RemovePeer(id string)
RemovePeer removes a peer from the replicator
func (*PeerReplicator) ReplicateAsync ¶
func (r *PeerReplicator) ReplicateAsync(ctx context.Context, dataset string, record arrow.Record) bool
ReplicateAsync queues a record for async replication
func (*PeerReplicator) ReplicateRecord ¶
func (r *PeerReplicator) ReplicateRecord(ctx context.Context, dataset string, record arrow.Record) []PeerReplicationResult
ReplicateRecord replicates a record to all peers synchronously
func (*PeerReplicator) ReplicateWithQuorum ¶
func (r *PeerReplicator) ReplicateWithQuorum(ctx context.Context, dataset string, record arrow.Record) []PeerReplicationResult
ReplicateWithQuorum replicates to peers and waits for quorum
func (*PeerReplicator) Start ¶
func (r *PeerReplicator) Start() error
Start starts the async replication worker
func (*PeerReplicator) Stats ¶
func (r *PeerReplicator) Stats() ReplicatorStats
Stats returns current statistics
func (*PeerReplicator) Stop ¶
func (r *PeerReplicator) Stop()
Stop stops the async replication worker
type PerPResultPool ¶
type PerPResultPool struct {
// contains filtered or unexported fields
}
PerPResultPool provides per-processor local pools for search result slices. It eliminates sync.Pool contention by sharding pools across processors.
func NewPerPResultPool ¶
func NewPerPResultPool(config *PerPResultPoolConfig) *PerPResultPool
NewPerPResultPool creates a new per-processor result pool. If config is nil, default configuration is used.
func (*PerPResultPool) Get ¶
func (p *PerPResultPool) Get(k int) []VectorID
Get retrieves a []VectorID slice of the specified length. For common k values, slices are pooled per-shard.
func (*PerPResultPool) NumShards ¶
func (p *PerPResultPool) NumShards() int
NumShards returns the number of shards.
func (*PerPResultPool) PerShardStats ¶
func (p *PerPResultPool) PerShardStats() []PerPShardStats
PerShardStats returns statistics for each shard.
func (*PerPResultPool) Put ¶
func (p *PerPResultPool) Put(slice []VectorID)
Put returns a []VectorID slice to the appropriate pool shard.
func (*PerPResultPool) ResetStats ¶
func (p *PerPResultPool) ResetStats()
ResetStats resets all statistics counters.
func (*PerPResultPool) Stats ¶
func (p *PerPResultPool) Stats() PerPResultPoolStats
Stats returns aggregate statistics across all shards.
type PerPResultPoolConfig ¶
type PerPResultPoolConfig struct {
NumShards int // Number of shards (default: GOMAXPROCS)
EnableStats bool // Enable statistics tracking
}
PerPResultPoolConfig configures the per-processor result pool.
func DefaultPerPResultPoolConfig ¶
func DefaultPerPResultPoolConfig() PerPResultPoolConfig
DefaultPerPResultPoolConfig returns sensible defaults.
func (PerPResultPoolConfig) Validate ¶
func (c PerPResultPoolConfig) Validate() error
Validate checks configuration validity.
type PerPResultPoolStats ¶
PerPResultPoolStats holds aggregate statistics.
type PerPShardStats ¶
PerPShardStats holds per-shard statistics.
type PipelineResult ¶
type PipelineResult struct {
Record arrow.RecordBatch
Index int
}
PipelineResult holds a filtered record with its original index for ordering
type PipelineStage ¶
type PipelineStage struct {
Record arrow.RecordBatch
BatchIdx int
Tombstone *Bitset // Legacy field
Err error
}
PipelineStage is the legacy result type expected by store.go
type PipelineStats ¶
PipelineStats tracks pipeline processing statistics
type PluggableVectorIndex ¶
type PluggableVectorIndex interface {
// Type returns the index type identifier
Type() IndexType
// Dimension returns the vector dimension
Dimension() int
// Size returns the number of vectors in the index
Size() int
// NeedsBuild returns true if index requires explicit Build() call
NeedsBuild() bool
// Add adds a single vector to the index
Add(id uint64, vector []float32) error
// AddBatch adds multiple vectors to the index
AddBatch(ids []uint64, vectors [][]float32) error
// Search finds k nearest neighbors for the query vector
Search(query []float32, k int) ([]IndexSearchResult, error)
// SearchBatch performs batch search for multiple queries
SearchBatch(queries [][]float32, k int) ([][]IndexSearchResult, error)
// Build builds the index (for algorithms requiring training)
Build() error
// Save persists the index to disk
Save(path string) error
// Load loads the index from disk
Load(path string) error
// Close releases resources
Close() error
// Legacy interface compatibility
AddByLocation(batchIdx, rowIdx int) error
SearchVectors(query []float32, k int) []SearchResult
Len() int
}
PluggableVectorIndex is the abstract interface for all vector index implementations
type PooledAllocator ¶
type PooledAllocator struct {
// contains filtered or unexported fields
}
PooledAllocator implements memory.Allocator with buffer pooling to reduce GC pressure during high-throughput ingestion. It uses size-bucketed sync.Pools for efficient buffer reuse.
func NewPooledAllocator ¶
func NewPooledAllocator() *PooledAllocator
NewPooledAllocator creates a new pooled allocator. The underlying allocator is used for allocations larger than 32MB.
func (*PooledAllocator) Allocate ¶
func (p *PooledAllocator) Allocate(size int) []byte
Allocate returns a buffer of at least the requested size. Buffers may be larger than requested due to bucketing.
func (*PooledAllocator) Free ¶
func (p *PooledAllocator) Free(b []byte)
Free returns a buffer to the appropriate pool.
func (*PooledAllocator) Reallocate ¶
func (p *PooledAllocator) Reallocate(size int, b []byte) []byte
Reallocate resizes a buffer, potentially reusing pooled memory.
func (*PooledAllocator) Stats ¶
func (p *PooledAllocator) Stats() PooledAllocatorStats
Stats returns current allocator statistics.
type PooledAllocatorStats ¶
Stats returns allocator statistics.
type PooledBitmap ¶
type PooledBitmap struct {
Data []byte
Capacity int // in bits
// contains filtered or unexported fields
}
PooledBitmap is a pooled bitmap buffer
func (*PooledBitmap) GetBit ¶
func (pb *PooledBitmap) GetBit(index int) bool
GetBit gets a bit at the given index
func (*PooledBitmap) Release ¶
func (pb *PooledBitmap) Release()
Release returns the bitmap to the pool
func (*PooledBitmap) SetBit ¶
func (pb *PooledBitmap) SetBit(index int)
SetBit sets a bit at the given index
type PooledFlightClient ¶
type PooledFlightClient struct {
// contains filtered or unexported fields
}
PooledFlightClient wraps a Flight client with pool metadata.
func (*PooledFlightClient) Client ¶
func (c *PooledFlightClient) Client() flight.Client
Client returns the underlying Flight client.
func (*PooledFlightClient) CreatedAt ¶
func (c *PooledFlightClient) CreatedAt() time.Time
CreatedAt returns when the connection was established.
func (*PooledFlightClient) Host ¶
func (c *PooledFlightClient) Host() string
Host returns the peer host address.
func (*PooledFlightClient) IsExpired ¶
func (c *PooledFlightClient) IsExpired(maxLifetime time.Duration) bool
IsExpired checks if the connection exceeds max lifetime.
func (*PooledFlightClient) LastUsed ¶
func (c *PooledFlightClient) LastUsed() time.Time
LastUsed returns when the connection was last used.
type PressureLevel ¶
type PressureLevel int32
PressureLevel indicates the current memory pressure state.
const ( PressureNone PressureLevel = iota PressureSoft PressureHard )
type QuorumCalculator ¶
type QuorumCalculator struct{}
QuorumCalculator handles quorum arithmetic
func NewQuorumCalculator ¶
func NewQuorumCalculator() *QuorumCalculator
NewQuorumCalculator creates a new QuorumCalculator
func (*QuorumCalculator) IsSatisfied ¶
func (qc *QuorumCalculator) IsSatisfied(acks, totalNodes int, level ConsistencyLevel) bool
IsSatisfied returns true if the number of acks meets the consistency requirement
func (*QuorumCalculator) RequiredNodes ¶
func (qc *QuorumCalculator) RequiredNodes(totalNodes int, level ConsistencyLevel) int
RequiredNodes returns the number of nodes required for the given consistency level
type QuorumConfig ¶
type QuorumConfig struct {
DefaultReadLevel ConsistencyLevel
DefaultWriteLevel ConsistencyLevel
Timeout time.Duration
}
QuorumConfig holds configuration for quorum operations
type QuorumManager ¶
type QuorumManager struct {
// contains filtered or unexported fields
}
QuorumManager manages quorum-based read/write operations
func NewQuorumManager ¶
func NewQuorumManager(cfg QuorumConfig) *QuorumManager
NewQuorumManager creates a new QuorumManager
func (*QuorumManager) ExecuteRead ¶
func (qm *QuorumManager) ExecuteRead( ctx context.Context, peers []string, level ConsistencyLevel, readFn ReadFn, ) QuorumResult
ExecuteRead executes a read operation across peers with quorum semantics
func (*QuorumManager) ExecuteWrite ¶
func (qm *QuorumManager) ExecuteWrite( ctx context.Context, peers []string, level ConsistencyLevel, writeFn WriteFn, ) QuorumResult
ExecuteWrite executes a write operation across peers with quorum semantics
func (*QuorumManager) GetDefaultReadLevel ¶
func (qm *QuorumManager) GetDefaultReadLevel() ConsistencyLevel
GetDefaultReadLevel returns the default read consistency level
func (*QuorumManager) GetDefaultWriteLevel ¶
func (qm *QuorumManager) GetDefaultWriteLevel() ConsistencyLevel
GetDefaultWriteLevel returns the default write consistency level
type QuorumResult ¶
type QuorumResult struct {
SuccessCount int
FailureCount int
QuorumMet bool
Err error
Responses map[string][]byte // For read operations
}
QuorumResult holds the result of a quorum operation
type RankedResult ¶
RankedResult represents a search result with its distance score. Used for reranking and batch search operations that return distances.
type RecordEvictionManager ¶
type RecordEvictionManager struct {
// contains filtered or unexported fields
}
RecordEvictionManager tracks eviction metadata for individual records Uses unsafe.Pointer as keys for zero-copy pointer-identity lookups
func NewRecordEvictionManager ¶
func NewRecordEvictionManager() *RecordEvictionManager
NewRecordEvictionManager creates a new per-record eviction manager
func (*RecordEvictionManager) Count ¶
func (m *RecordEvictionManager) Count() int
Count returns the number of tracked records
func (*RecordEvictionManager) EvictExpired ¶
func (m *RecordEvictionManager) EvictExpired() []uintptr
EvictExpired removes all expired records and returns their pointers
func (*RecordEvictionManager) Get ¶
func (m *RecordEvictionManager) Get(rec arrow.RecordBatch) *RecordMetadata
Get retrieves metadata for a record by pointer identity
func (*RecordEvictionManager) GetEvictionCount ¶
func (m *RecordEvictionManager) GetEvictionCount() int64
GetEvictionCount returns total number of records evicted
func (*RecordEvictionManager) Register ¶
func (m *RecordEvictionManager) Register(rec arrow.RecordBatch, ttl time.Duration)
Register adds a record to the eviction manager with optional TTL
func (*RecordEvictionManager) SelectLFUVictims ¶
func (m *RecordEvictionManager) SelectLFUVictims(count int) []uintptr
SelectLFUVictims returns pointers to the N least frequently accessed records
func (*RecordEvictionManager) SelectLRUVictims ¶
func (m *RecordEvictionManager) SelectLRUVictims(count int) []uintptr
SelectLRUVictims returns pointers to the N least recently accessed records
func (*RecordEvictionManager) Unregister ¶
func (m *RecordEvictionManager) Unregister(rec arrow.RecordBatch)
Unregister removes a record from the eviction manager
func (*RecordEvictionManager) UpdateRecordMetadataGauge ¶
func (m *RecordEvictionManager) UpdateRecordMetadataGauge()
UpdateRecordMetadataGauge updates the gauge with current tracked record count
type RecordMetadata ¶
type RecordMetadata struct {
CreatedAt int64 // Unix nanos - immutable after creation
LastAccess atomic.Int64 // Unix nanos - updated atomically on access
AccessCount atomic.Int64 // Frequency counter - incremented atomically
TTL time.Duration // 0 = no expiration
}
RecordMetadata tracks per-record eviction information using atomics for zero-lock reads
func NewRecordMetadata ¶
func NewRecordMetadata(ttl time.Duration) *RecordMetadata
NewRecordMetadata creates metadata for a record with optional TTL
func (*RecordMetadata) GetAccessCount ¶
func (m *RecordMetadata) GetAccessCount() int64
GetAccessCount returns the total access count
func (*RecordMetadata) GetLastAccess ¶
func (m *RecordMetadata) GetLastAccess() time.Time
GetLastAccess returns the last access time
func (*RecordMetadata) IsExpired ¶
func (m *RecordMetadata) IsExpired() bool
IsExpired returns true if TTL has elapsed since creation
func (*RecordMetadata) RecordAccess ¶
func (m *RecordMetadata) RecordAccess()
RecordAccess updates last access time and increments access count atomically
type RecordSizeCache ¶
type RecordSizeCache struct {
// contains filtered or unexported fields
}
RecordSizeCache caches the size of Arrow record batches to avoid repeated calculations
func NewRecordSizeCache ¶
func NewRecordSizeCache() *RecordSizeCache
NewRecordSizeCache creates a new cache
func (*RecordSizeCache) GetOrCompute ¶
func (c *RecordSizeCache) GetOrCompute(rec arrow.RecordBatch) int64
GetOrCompute returns the cached size or computes it
func (*RecordSizeCache) Invalidate ¶
func (c *RecordSizeCache) Invalidate(rec arrow.RecordBatch)
Invalidate removes a record from cache
type RecordWriterPoolConfig ¶
type RecordWriterPoolConfig struct {
// InitialBufferSize is the starting capacity for pooled buffers (default: 64KB)
InitialBufferSize int
// MaxBufferSize is the maximum buffer size to keep in pool (default: 4MB)
// Buffers larger than this are discarded to prevent memory bloat
MaxBufferSize int
// UseLZ4 enables LZ4 compression for IPC messages
UseLZ4 bool
}
RecordWriterPoolConfig configures the IPC buffer pool.
func DefaultRecordWriterPoolConfig ¶
func DefaultRecordWriterPoolConfig() RecordWriterPoolConfig
DefaultRecordWriterPoolConfig returns sensible defaults for high-throughput DoGet.
func (RecordWriterPoolConfig) Validate ¶
func (c RecordWriterPoolConfig) Validate() error
Validate checks configuration validity.
type ReplicaInfo ¶
ReplicaInfo holds information about a replica
type ReplicaLoadBalancer ¶
type ReplicaLoadBalancer struct {
// contains filtered or unexported fields
}
ReplicaLoadBalancer distributes read requests across replicas
func NewReplicaLoadBalancer ¶
func NewReplicaLoadBalancer(config LoadBalancerConfig) *ReplicaLoadBalancer
NewReplicaLoadBalancer creates a new load balancer
func (*ReplicaLoadBalancer) AddReplica ¶
func (lb *ReplicaLoadBalancer) AddReplica(id, address string)
AddReplica adds a replica to the load balancer
func (*ReplicaLoadBalancer) GetHealthyReplicas ¶
func (lb *ReplicaLoadBalancer) GetHealthyReplicas() []string
GetHealthyReplicas returns all healthy replica IDs
func (*ReplicaLoadBalancer) GetReplicas ¶
func (lb *ReplicaLoadBalancer) GetReplicas() []string
GetReplicas returns all replica IDs
func (*ReplicaLoadBalancer) GetStats ¶
func (lb *ReplicaLoadBalancer) GetStats() LoadBalancerStats
GetStats returns load balancer statistics
func (*ReplicaLoadBalancer) MarkHealthy ¶
func (lb *ReplicaLoadBalancer) MarkHealthy(id string)
MarkHealthy marks a replica as healthy
func (*ReplicaLoadBalancer) MarkUnhealthy ¶
func (lb *ReplicaLoadBalancer) MarkUnhealthy(id string)
MarkUnhealthy marks a replica as unhealthy
func (*ReplicaLoadBalancer) RemoveReplica ¶
func (lb *ReplicaLoadBalancer) RemoveReplica(id string)
RemoveReplica removes a replica from the load balancer
func (*ReplicaLoadBalancer) SelectReplica ¶
func (lb *ReplicaLoadBalancer) SelectReplica(ctx context.Context) (string, error)
SelectReplica selects a healthy replica using the configured strategy
type ReplicationError ¶
type ReplicationError struct {
Op string // Operation: "sync", "replicate", "connect"
PeerAddr string // Peer address
Dataset string // Dataset being replicated
Cause error // Underlying error
Timestamp time.Time // When the error occurred
}
ReplicationError provides rich context for replication operations.
func (*ReplicationError) Error ¶
func (e *ReplicationError) Error() string
func (*ReplicationError) Unwrap ¶
func (e *ReplicationError) Unwrap() error
type ReplicationTask ¶
type ReplicationTask struct {
Dataset string
Record arrow.Record //nolint:staticcheck
Ctx context.Context
}
ReplicationTask represents an async replication task
type ReplicatorConfig ¶
type ReplicatorConfig struct {
Timeout time.Duration
MaxRetries int
RetryBackoff time.Duration
AsyncReplication bool
AsyncBufferSize int
QuorumSize int
CircuitBreakerConfig CircuitBreakerConfig
}
ReplicatorConfig holds configuration for peer replication
func DefaultReplicatorConfig ¶
func DefaultReplicatorConfig() ReplicatorConfig
DefaultReplicatorConfig returns sensible defaults
type ReplicatorPeerInfo ¶
ReplicatorPeerInfo holds information about a peer for replicator
type ReplicatorStats ¶
type ReplicatorStats struct {
SuccessTotal int64
FailureTotal int64
RetryTotal int64
QueuedTotal int64
CircuitOpens int64
}
ReplicatorStats holds operational statistics
type RequestSemaphore ¶
type RequestSemaphore struct {
// contains filtered or unexported fields
}
RequestSemaphore limits concurrent DoGet/DoPut operations using a weighted semaphore.
func NewRequestSemaphore ¶
func NewRequestSemaphore(cfg RequestSemaphoreConfig) *RequestSemaphore
NewRequestSemaphore creates a new request semaphore with the given configuration.
func (*RequestSemaphore) Acquire ¶
func (rs *RequestSemaphore) Acquire(ctx context.Context) error
Acquire blocks until a slot is available or timeout/context cancellation occurs.
func (*RequestSemaphore) Available ¶
func (rs *RequestSemaphore) Available() int
Available returns the number of available slots.
func (*RequestSemaphore) MaxConcurrent ¶
func (rs *RequestSemaphore) MaxConcurrent() int
MaxConcurrent returns the maximum concurrent requests allowed.
func (*RequestSemaphore) Release ¶
func (rs *RequestSemaphore) Release()
Release releases a slot back to the semaphore.
func (*RequestSemaphore) Stats ¶
func (rs *RequestSemaphore) Stats() SemaphoreStats
Stats returns current statistics about semaphore usage.
func (*RequestSemaphore) TryAcquire ¶
func (rs *RequestSemaphore) TryAcquire() bool
TryAcquire attempts to acquire a slot without blocking. Returns true if successful, false if no slots available.
type RequestSemaphoreConfig ¶
type RequestSemaphoreConfig struct {
// MaxConcurrent is the maximum number of concurrent requests.
// Default: runtime.GOMAXPROCS(0) (number of CPU cores)
MaxConcurrent int
// AcquireTimeout is how long to wait before timing out on acquire.
// Default: 30 seconds
AcquireTimeout time.Duration
// Enabled controls whether the semaphore is active.
// When disabled, Acquire always succeeds immediately.
Enabled bool
}
RequestSemaphoreConfig configures the request semaphore for limiting concurrent DoGet/DoPut operations to prevent thread-thrashing.
func DefaultRequestSemaphoreConfig ¶
func DefaultRequestSemaphoreConfig() RequestSemaphoreConfig
DefaultRequestSemaphoreConfig returns a configuration with sensible defaults.
func (RequestSemaphoreConfig) Validate ¶
func (c RequestSemaphoreConfig) Validate() error
Validate checks if the configuration is valid.
type RoundRobinStrategy ¶
type RoundRobinStrategy struct {
// contains filtered or unexported fields
}
RoundRobinStrategy cycles through replicas in order
func NewRoundRobinStrategy ¶
func NewRoundRobinStrategy() *RoundRobinStrategy
NewRoundRobinStrategy creates a new round robin strategy
func (*RoundRobinStrategy) Select ¶
func (s *RoundRobinStrategy) Select(replicas []string) string
Select picks the next replica in rotation
type RowPosition ¶
type RowPosition struct {
RecordIdx int // Index of the record batch
RowIdx int // Index of the row within the record
}
RowPosition identifies a row within a dataset's records
type S3Backend ¶
type S3Backend struct {
// contains filtered or unexported fields
}
S3Backend implements SnapshotBackend for S3-compatible storage
func NewS3Backend ¶
func NewS3Backend(cfg *S3BackendConfig) (*S3Backend, error)
NewS3Backend creates a new S3 backend from configuration
func (*S3Backend) DeleteSnapshot ¶
DeleteSnapshot removes a snapshot from S3
func (*S3Backend) GetHTTPClient ¶
GetHTTPClient returns the HTTP client used by this S3 backend
func (*S3Backend) GetHTTPTransport ¶
GetHTTPTransport returns the underlying HTTP transport used for connection pooling
func (*S3Backend) ListSnapshots ¶
ListSnapshots returns all collection names that have snapshots
func (*S3Backend) ReadSnapshot ¶
ReadSnapshot downloads snapshot data from S3
type S3BackendConfig ¶
type S3BackendConfig struct {
Endpoint string // S3-compatible endpoint URL (e.g., "http://localhost:9000" for MinIO)
Bucket string // Bucket name
Prefix string // Optional key prefix for all snapshots
AccessKeyID string // AWS access key
SecretAccessKey string // AWS secret key
Region string // AWS region (default: us-east-1)
UsePathStyle bool // Use path-style addressing (required for MinIO)
// Connection pool settings
MaxIdleConns int // Maximum idle connections (0 = use default)
MaxIdleConnsPerHost int // Maximum idle connections per host (0 = use default)
IdleConnTimeout time.Duration // Idle connection timeout (0 = use default)
}
S3BackendConfig holds configuration for the S3 backend
func (*S3BackendConfig) Validate ¶
func (c *S3BackendConfig) Validate() error
Validate checks the configuration for required fields
type S3Error ¶
type S3Error struct {
Op string // Operation: "upload", "download", "list", "delete"
Bucket string // S3 bucket name
Key string // S3 object key
Cause error // Underlying error
Timestamp time.Time // When the error occurred
}
S3Error provides rich context for S3 backend operations.
type SQ8Config ¶
type SQ8Config struct {
Min []float32 // Per-dimension minimum values
Max []float32 // Per-dimension maximum values
Trained bool // Whether min/max have been learned from data
}
SQ8Config holds the configuration for scalar quantization.
func DefaultSQ8Config ¶
func DefaultSQ8Config() *SQ8Config
DefaultSQ8Config returns a default untrained configuration.
type SQ8Encoder ¶
type SQ8Encoder struct {
// contains filtered or unexported fields
}
SQ8Encoder handles encoding and decoding of vectors using scalar quantization.
func NewSQ8Encoder ¶
func NewSQ8Encoder(cfg *SQ8Config) (*SQ8Encoder, error)
NewSQ8Encoder creates a new encoder from a trained configuration.
func TrainSQ8Encoder ¶
func TrainSQ8Encoder(vectors [][]float32) (*SQ8Encoder, error)
TrainSQ8Encoder creates an encoder by learning min/max from sample vectors.
func (*SQ8Encoder) Decode ¶
func (e *SQ8Encoder) Decode(quantized []uint8) []float32
Decode converts a uint8 vector back to float32.
func (*SQ8Encoder) DecodeInto ¶
func (e *SQ8Encoder) DecodeInto(quantized []uint8, dst []float32)
DecodeInto decodes a quantized vector into a pre-allocated destination slice.
func (*SQ8Encoder) Dims ¶
func (e *SQ8Encoder) Dims() int
Dims returns the number of dimensions this encoder handles.
func (*SQ8Encoder) Encode ¶
func (e *SQ8Encoder) Encode(vec []float32) []uint8
Encode converts a float32 vector to uint8 with clamping.
func (*SQ8Encoder) EncodeInto ¶
func (e *SQ8Encoder) EncodeInto(vec []float32, dst []uint8)
EncodeInto encodes a vector into a pre-allocated destination slice. This is the zero-allocation hot path for bulk encoding.
func (*SQ8Encoder) GetBounds ¶
func (e *SQ8Encoder) GetBounds() (minVals, maxVals []float32)
GetBounds returns the learned min and max values.
type SchemaCompatibility ¶
type SchemaCompatibility int
SchemaCompatibility represents the result of schema comparison
const ( // SchemaExactMatch indicates schemas are identical SchemaExactMatch SchemaCompatibility = iota // SchemaEvolution indicates new schema is a compatible superset SchemaEvolution // SchemaIncompatible indicates schemas are not compatible SchemaIncompatible )
func CheckSchemaCompatibility ¶
func CheckSchemaCompatibility(existing, incoming *arrow.Schema) SchemaCompatibility
CheckSchemaCompatibility compares existing and incoming schemas. Returns SchemaExactMatch if identical, SchemaEvolution if incoming is a compatible superset (prefix matches), or SchemaIncompatible otherwise.
type SchemaEvolutionManager ¶
type SchemaEvolutionManager struct {
// contains filtered or unexported fields
}
SchemaEvolutionManager handles dynamic schema changes without dataset locks
func NewSchemaEvolutionManager ¶
func NewSchemaEvolutionManager(initialSchema *arrow.Schema, datasetName string) *SchemaEvolutionManager
NewSchemaEvolutionManager creates a new manager with initial schema
func (*SchemaEvolutionManager) AddColumn ¶
func (m *SchemaEvolutionManager) AddColumn(name string, dtype arrow.DataType) error
AddColumn adds a new column without requiring dataset lock or rewrite
func (*SchemaEvolutionManager) DropColumn ¶
func (m *SchemaEvolutionManager) DropColumn(name string) error
DropColumn marks a column as dropped without rewriting data
func (*SchemaEvolutionManager) GetColumnCount ¶
func (m *SchemaEvolutionManager) GetColumnCount() int
GetColumnCount returns the number of active (non-dropped) columns
func (*SchemaEvolutionManager) GetCurrentSchema ¶
func (m *SchemaEvolutionManager) GetCurrentSchema() *arrow.Schema
GetCurrentSchema returns the current schema (excluding dropped columns)
func (*SchemaEvolutionManager) GetCurrentVersion ¶
func (m *SchemaEvolutionManager) GetCurrentVersion() uint64
GetCurrentVersion returns the current schema version
func (*SchemaEvolutionManager) GetSchemaAtVersion ¶
func (m *SchemaEvolutionManager) GetSchemaAtVersion(version uint64) *arrow.Schema
GetSchemaAtVersion returns the schema at a specific version
func (*SchemaEvolutionManager) GetVersionCount ¶
func (m *SchemaEvolutionManager) GetVersionCount() int
GetVersionCount returns the number of schema versions
func (*SchemaEvolutionManager) IsColumnAvailable ¶
func (m *SchemaEvolutionManager) IsColumnAvailable(name string, version uint64) bool
IsColumnAvailable checks if a column is available at a specific version
func (*SchemaEvolutionManager) IsColumnDropped ¶
func (m *SchemaEvolutionManager) IsColumnDropped(name string) bool
IsColumnDropped checks if a column has been dropped
type SchemaVersion ¶
SchemaVersion tracks a specific schema state
type SearchArena ¶
type SearchArena struct {
// contains filtered or unexported fields
}
SearchArena is a per-request arena allocator that provides O(1) allocations with zero GC pressure. It uses a simple bump allocator strategy where allocations advance an offset through a pre-allocated buffer. The arena should be Reset() after each search request to reuse memory.
func GetArena ¶
func GetArena() *SearchArena
GetArena retrieves a SearchArena from the global pool. The arena is reset and ready for use. Caller must call PutArena when done to return it to the pool.
func NewSearchArena ¶
func NewSearchArena(capacity int) *SearchArena
NewSearchArena creates a new arena with the specified capacity in bytes. The entire buffer is pre-allocated to avoid runtime allocations.
func (*SearchArena) Alloc ¶
func (a *SearchArena) Alloc(size int) []byte
Alloc allocates size bytes from the arena and returns a slice. Returns nil if the allocation would exceed capacity. This is O(1) - just a pointer bump with no GC involvement.
func (*SearchArena) AllocFloat32Slice ¶
func (a *SearchArena) AllocFloat32Slice(count int) []float32
AllocFloat32Slice allocates a slice of float32 values from the arena. Returns nil if the allocation would exceed capacity. This is useful for allocating distance/score arrays in search operations.
func (*SearchArena) AllocVectorIDSlice ¶
func (a *SearchArena) AllocVectorIDSlice(count int) []VectorID
AllocVectorIDSlice allocates a slice of VectorID values from the arena. Returns nil if the allocation would exceed capacity. This is useful for allocating search result arrays without GC pressure.
func (*SearchArena) Cap ¶
func (a *SearchArena) Cap() int
Cap returns the total capacity of the arena in bytes.
func (*SearchArena) Offset ¶
func (a *SearchArena) Offset() int
Offset returns the current allocation offset (bytes used).
func (*SearchArena) Remaining ¶
func (a *SearchArena) Remaining() int
Remaining returns the number of bytes still available for allocation.
func (*SearchArena) Reset ¶
func (a *SearchArena) Reset()
Reset resets the arena for reuse without deallocating the underlying buffer. Call this after each search request to recycle memory.
type SearchResult ¶
func CombineHybridResults ¶
func CombineHybridResults(vectorResults, bm25Results []SearchResult, alpha float32, k int) []SearchResult
CombineHybridResults combines vector and BM25 results using alpha weighting vector scores are distances (lower is better), BM25 scores are relevance (higher is better)
func CombineHybridResultsRRF ¶
func CombineHybridResultsRRF(vectorResults, bm25Results []SearchResult, rrfK, limit int) []SearchResult
CombineHybridResultsRRF combines results using Reciprocal Rank Fusion (legacy alignment)
func FuseCascade ¶
func FuseCascade(exact map[VectorID]struct{}, keyword, vector []SearchResult, limit int) []SearchResult
FuseCascade implements cascade-style filtering: exact -> keyword -> vector
func FuseLinear ¶
func FuseLinear(dense, sparse []SearchResult, alpha float32, limit int) []SearchResult
FuseLinear combines results using linear weighted combination
func FuseRRF ¶
func FuseRRF(dense, sparse []SearchResult, k, limit int) []SearchResult
FuseRRF is an alias for ReciprocalRankFusion (legacy alignment)
func HybridSearch ¶
func HybridSearch(ctx context.Context, s *VectorStore, name string, query []float32, k int, filters map[string]string) ([]SearchResult, error)
HybridSearch performs a filtered vector search using inverted indexes for pre-filtering.
func ReciprocalRankFusion ¶
func ReciprocalRankFusion(dense, sparse []SearchResult, k, limit int) []SearchResult
ReciprocalRankFusion combines results from multiple search systems using their ranks. Formula: score = sum(1 / (k + rank)) Reference: https://dl.acm.org/doi/10.1145/1571941.1572114
func SearchHybrid ¶
func SearchHybrid(ctx context.Context, s *VectorStore, name string, query []float32, textQuery string, k int, alpha float32, rrfK int) ([]SearchResult, error)
SearchHybrid performs a hybrid search combining dense vector search and sparse keyword search.
type SemaphoreStats ¶
type SemaphoreStats struct {
TotalAcquires int64
TotalReleases int64
TotalTimeouts int64
CurrentActive int64
MaxConcurrent int
}
SemaphoreStats holds statistics about semaphore usage.
type ShardStats ¶
ShardStats holds per-shard statistics
type ShardedDataset ¶
type ShardedDataset struct {
// contains filtered or unexported fields
}
ShardedDataset provides a dataset with sharded record storage. Enables concurrent access to different shards without lock contention.
func NewShardedDataset ¶
func NewShardedDataset(name string, cfg ShardedDatasetConfig) *ShardedDataset
NewShardedDataset creates a new sharded dataset with given config.
func NewShardedDatasetDefault ¶
func NewShardedDatasetDefault(name string) *ShardedDataset
NewShardedDatasetDefault creates a sharded dataset with default config.
func (*ShardedDataset) Append ¶
func (sd *ShardedDataset) Append(batch arrow.RecordBatch, routingKey uint64)
Append adds a record batch using hash-based routing.
func (*ShardedDataset) AppendToShard ¶
func (sd *ShardedDataset) AppendToShard(batch arrow.RecordBatch, shardIdx int)
AppendToShard adds a record batch directly to a specific shard.
func (*ShardedDataset) Clear ¶
func (sd *ShardedDataset) Clear()
Clear removes all records from all shards.
func (*ShardedDataset) ForEach ¶
func (sd *ShardedDataset) ForEach(fn func(batch arrow.RecordBatch, shard int) bool)
ForEach iterates over all record batches. If fn returns false, iteration stops early.
func (*ShardedDataset) ForEachInShard ¶
func (sd *ShardedDataset) ForEachInShard(shardIdx int, fn func(batch arrow.RecordBatch) bool)
ForEachInShard iterates over records in a specific shard.
func (*ShardedDataset) GetAllRecords ¶
func (sd *ShardedDataset) GetAllRecords() []arrow.RecordBatch
GetAllRecords returns all record batches across all shards. Order is not guaranteed to be consistent.
func (*ShardedDataset) GetShardRecords ¶
func (sd *ShardedDataset) GetShardRecords(shardIdx int) []arrow.RecordBatch
GetShardRecords returns all records from a specific shard.
func (*ShardedDataset) IncrementVersion ¶
func (sd *ShardedDataset) IncrementVersion() int64
IncrementVersion atomically increments and returns the new version.
func (*ShardedDataset) Index ¶
func (sd *ShardedDataset) Index() *HNSWIndex
Index returns the HNSW index (may be nil).
func (*ShardedDataset) LastAccess ¶
func (sd *ShardedDataset) LastAccess() time.Time
LastAccess returns the last access time.
func (*ShardedDataset) Name ¶
func (sd *ShardedDataset) Name() string
Name returns the dataset name.
func (*ShardedDataset) NumShards ¶
func (sd *ShardedDataset) NumShards() int
NumShards returns the number of shards.
func (*ShardedDataset) ReplaceShardRecords ¶
func (sd *ShardedDataset) ReplaceShardRecords(shardIdx int, batches []arrow.RecordBatch)
ReplaceShardRecords atomically replaces all records in a shard.
func (*ShardedDataset) SetIndex ¶
func (sd *ShardedDataset) SetIndex(idx *HNSWIndex)
SetIndex sets the HNSW index.
func (*ShardedDataset) SetLastAccess ¶
func (sd *ShardedDataset) SetLastAccess(t time.Time)
SetLastAccess sets the last access time.
func (*ShardedDataset) ShardRecordCount ¶
func (sd *ShardedDataset) ShardRecordCount(shardIdx int) int
ShardRecordCount returns the number of records in a specific shard.
func (*ShardedDataset) Stats ¶
func (sd *ShardedDataset) Stats() ShardedDatasetStats
Stats returns current statistics.
func (*ShardedDataset) ToLegacyRecords ¶
func (sd *ShardedDataset) ToLegacyRecords() []arrow.RecordBatch
ToLegacyRecords returns all records as a flat slice for backward compatibility.
func (*ShardedDataset) TotalRecords ¶
func (sd *ShardedDataset) TotalRecords() int
TotalRecords returns total number of record batches across all shards.
func (*ShardedDataset) Version ¶
func (sd *ShardedDataset) Version() int64
Version returns the current version.
type ShardedDatasetConfig ¶
type ShardedDatasetConfig struct {
NumShards int // Number of shards (default: runtime.NumCPU())
}
ShardedDatasetConfig configures a sharded dataset.
func DefaultShardedDatasetConfig ¶
func DefaultShardedDatasetConfig() ShardedDatasetConfig
DefaultShardedDatasetConfig returns sensible defaults.
func (ShardedDatasetConfig) Validate ¶
func (c ShardedDatasetConfig) Validate() error
Validate checks configuration validity.
type ShardedDatasetStats ¶
ShardedDatasetStats holds statistics for a sharded dataset.
type ShardedHNSW ¶
type ShardedHNSW struct {
// contains filtered or unexported fields
}
ShardedHNSW provides fine-grained locking via multiple independent HNSW shards.
func NewShardedHNSW ¶
func NewShardedHNSW(config ShardedHNSWConfig, dataset *Dataset) *ShardedHNSW
NewShardedHNSW creates a new sharded HNSW index.
func (*ShardedHNSW) AddByLocation ¶
func (s *ShardedHNSW) AddByLocation(batchIdx, rowIdx int) (uint32, error)
AddByLocation implements VectorIndex.
func (*ShardedHNSW) AddByRecord ¶
func (s *ShardedHNSW) AddByRecord(rec arrow.RecordBatch, rowIdx, batchIdx int) (uint32, error)
AddByRecord implements VectorIndex.
func (*ShardedHNSW) AddSafe ¶
func (s *ShardedHNSW) AddSafe(rec arrow.RecordBatch, rowIdx, batchIdx int) (VectorID, error)
AddSafe is an alias for AddByRecord for consistency with HNSWIndex.
func (*ShardedHNSW) GetDimension ¶
func (s *ShardedHNSW) GetDimension() uint32
GetDimension implements VectorIndex.
func (*ShardedHNSW) GetLocation ¶
func (s *ShardedHNSW) GetLocation(id VectorID) (Location, bool)
GetLocation returns the storage location for a given VectorID
func (*ShardedHNSW) GetShardForID ¶
func (s *ShardedHNSW) GetShardForID(id VectorID) int
GetShardForID returns the shard index for a given VectorID.
func (*ShardedHNSW) SearchByID ¶
func (s *ShardedHNSW) SearchByID(id VectorID, k int) []VectorID
SearchByID searches for vectors similar to the vector at the given ID.
func (*ShardedHNSW) SearchVectors ¶
func (s *ShardedHNSW) SearchVectors(query []float32, k int, filters []Filter) []SearchResult
SearchVectors implements VectorIndex.
func (*ShardedHNSW) SearchVectorsWithBitmap ¶
func (s *ShardedHNSW) SearchVectorsWithBitmap(query []float32, k int, filter *Bitset) []SearchResult
SearchVectorsWithBitmap implements VectorIndex.
func (*ShardedHNSW) SetIndexedColumns ¶
func (s *ShardedHNSW) SetIndexedColumns(cols []string)
SetIndexedColumns satisfies VectorIndex interface
func (*ShardedHNSW) ShardStats ¶
func (s *ShardedHNSW) ShardStats() []ShardStat
Stats returns multi-index statistics for all shards.
type ShardedHNSWConfig ¶
type ShardedHNSWConfig struct {
NumShards int // Number of independent HNSW shards
M int // HNSW M parameter
EfConstruction int // HNSW efConstruction parameter
Metric VectorMetric // Distance metric for this index
}
ShardedHNSWConfig configures the sharded HNSW index.
func DefaultShardedHNSWConfig ¶
func DefaultShardedHNSWConfig() ShardedHNSWConfig
DefaultShardedHNSWConfig returns sensible defaults.
func (ShardedHNSWConfig) Validate ¶
func (c ShardedHNSWConfig) Validate() error
type ShardedIndexChannel ¶
type ShardedIndexChannel struct {
// contains filtered or unexported fields
}
ShardedIndexChannel provides hash-based routing of IndexJobs to multiple channels, preventing noisy neighbor issues where one dataset's high write throughput blocks indexing for others.
func NewShardedIndexChannel ¶
func NewShardedIndexChannel(numShards, bufferSize int) *ShardedIndexChannel
NewShardedIndexChannel creates a new sharded index channel with the specified number of shards and buffer size per shard.
func NewShardedIndexChannelDefault ¶
func NewShardedIndexChannelDefault(bufferSize int) *ShardedIndexChannel
NewShardedIndexChannelDefault creates a sharded index channel with one shard per CPU core (recommended default).
func (*ShardedIndexChannel) Close ¶
func (sic *ShardedIndexChannel) Close()
Close closes all shard channels. Safe to call multiple times.
func (*ShardedIndexChannel) GetShardChannel ¶
func (sic *ShardedIndexChannel) GetShardChannel(shardID int) chan IndexJob
GetShardChannel returns the channel for a specific shard ID. Used by workers to consume jobs from their assigned shard.
func (*ShardedIndexChannel) GetShardForDataset ¶
func (sic *ShardedIndexChannel) GetShardForDataset(datasetName string) int
GetShardForDataset returns the shard ID for a given dataset name. Uses FNV-1a hash for consistent, fast routing.
func (*ShardedIndexChannel) NumShards ¶
func (sic *ShardedIndexChannel) NumShards() int
NumShards returns the number of shards.
func (*ShardedIndexChannel) Send ¶
func (sic *ShardedIndexChannel) Send(job IndexJob) bool
Send routes an IndexJob to the appropriate shard based on dataset name. Blocks if the target shard's buffer is full. Returns false if the channel is closed.
func (*ShardedIndexChannel) Stats ¶
func (sic *ShardedIndexChannel) Stats() []ShardStats
Stats returns statistics for all shards.
func (*ShardedIndexChannel) TrySend ¶
func (sic *ShardedIndexChannel) TrySend(job IndexJob) bool
TrySend attempts to send an IndexJob without blocking. Returns true if sent successfully, false if buffer full or closed.
type ShardedInvertedIndex ¶
type ShardedInvertedIndex struct {
// contains filtered or unexported fields
}
ShardedInvertedIndex is a concurrent-safe inverted index using sharding
func NewShardedInvertedIndex ¶
func NewShardedInvertedIndex() *ShardedInvertedIndex
NewShardedInvertedIndex creates a new sharded inverted index
func (*ShardedInvertedIndex) Add ¶
func (idx *ShardedInvertedIndex) Add(id VectorID, text string)
Add indexes a document with the given text
func (*ShardedInvertedIndex) Delete ¶
func (idx *ShardedInvertedIndex) Delete(id VectorID)
Delete removes a document from the index
func (*ShardedInvertedIndex) Search ¶
func (idx *ShardedInvertedIndex) Search(query string, limit int) []SearchResult
Search returns documents matching any of the query terms, scored by BM25-like TF
type ShardedMap ¶
type ShardedMap struct {
// contains filtered or unexported fields
}
ShardedMap provides concurrent access to datasets via sharding
func (*ShardedMap) Delete ¶
func (sm *ShardedMap) Delete(name string)
Delete removes a dataset (write lock on single shard)
func (*ShardedMap) Get ¶
func (sm *ShardedMap) Get(name string) (*Dataset, bool)
Get retrieves a dataset by name (read lock on single shard)
func (*ShardedMap) GetOrCreate ¶
func (sm *ShardedMap) GetOrCreate(name string, create func() *Dataset) *Dataset
GetOrCreate atomically gets or creates a dataset
func (*ShardedMap) Len ¶
func (sm *ShardedMap) Len() int
Len returns total count across all shards (acquires all read locks)
func (*ShardedMap) Range ¶
func (sm *ShardedMap) Range(fn func(name string, ds *Dataset) bool)
Range iterates over all datasets (acquires read locks per shard) The callback should not hold references to the dataset after returning
func (*ShardedMap) RangeWithLock ¶
func (sm *ShardedMap) RangeWithLock(fn func(name string, ds *Dataset, deleteFn func()))
RangeWithLock iterates with write lock (for modifications)
func (*ShardedMap) Set ¶
func (sm *ShardedMap) Set(name string, ds *Dataset)
Set stores a dataset (write lock on single shard)
type ShardedRWMutex ¶
type ShardedRWMutex struct {
// contains filtered or unexported fields
}
ShardedRWMutex provides a sharded read-write mutex for reduced contention. Different keys may hash to different shards, allowing concurrent access.
func NewShardedRWMutex ¶
func NewShardedRWMutex(cfg ShardedRWMutexConfig) *ShardedRWMutex
NewShardedRWMutex creates a new sharded mutex with the given configuration.
func NewShardedRWMutexDefault ¶
func NewShardedRWMutexDefault() *ShardedRWMutex
NewShardedRWMutexDefault creates a sharded mutex with default configuration.
func (*ShardedRWMutex) Lock ¶
func (sm *ShardedRWMutex) Lock(key uint64)
Lock acquires a write lock for the shard associated with the given key.
func (*ShardedRWMutex) LockAll ¶
func (sm *ShardedRWMutex) LockAll()
LockAll acquires write locks on all shards (use sparingly).
func (*ShardedRWMutex) LockShard ¶
func (sm *ShardedRWMutex) LockShard(shard int)
LockShard acquires a write lock on a specific shard directly.
func (*ShardedRWMutex) NumShards ¶
func (sm *ShardedRWMutex) NumShards() int
NumShards returns the number of shards.
func (*ShardedRWMutex) RLock ¶
func (sm *ShardedRWMutex) RLock(key uint64)
RLock acquires a read lock for the shard associated with the given key.
func (*ShardedRWMutex) RLockAll ¶
func (sm *ShardedRWMutex) RLockAll()
RLockAll acquires read locks on all shards.
func (*ShardedRWMutex) RLockShard ¶
func (sm *ShardedRWMutex) RLockShard(shard int)
RLockShard acquires a read lock on a specific shard directly.
func (*ShardedRWMutex) RUnlock ¶
func (sm *ShardedRWMutex) RUnlock(key uint64)
RUnlock releases the read lock for the shard associated with the given key.
func (*ShardedRWMutex) RUnlockAll ¶
func (sm *ShardedRWMutex) RUnlockAll()
RUnlockAll releases read locks on all shards.
func (*ShardedRWMutex) RUnlockShard ¶
func (sm *ShardedRWMutex) RUnlockShard(shard int)
RUnlockShard releases a read lock on a specific shard directly.
func (*ShardedRWMutex) ShardFor ¶
func (sm *ShardedRWMutex) ShardFor(key uint64) int
ShardFor returns the shard index for a given key using FNV-1a hash.
func (*ShardedRWMutex) Stats ¶
func (sm *ShardedRWMutex) Stats() ShardedRWMutexStats
Stats returns current statistics.
func (*ShardedRWMutex) Unlock ¶
func (sm *ShardedRWMutex) Unlock(key uint64)
Unlock releases the write lock for the shard associated with the given key.
func (*ShardedRWMutex) UnlockAll ¶
func (sm *ShardedRWMutex) UnlockAll()
UnlockAll releases write locks on all shards.
func (*ShardedRWMutex) UnlockShard ¶
func (sm *ShardedRWMutex) UnlockShard(shard int)
UnlockShard releases a write lock on a specific shard directly.
type ShardedRWMutexConfig ¶
type ShardedRWMutexConfig struct {
NumShards int // Number of shards (default: runtime.NumCPU())
}
ShardedRWMutexConfig configures the sharded mutex.
func DefaultShardedRWMutexConfig ¶
func DefaultShardedRWMutexConfig() ShardedRWMutexConfig
DefaultShardedRWMutexConfig returns sensible defaults.
func (ShardedRWMutexConfig) Validate ¶
func (c ShardedRWMutexConfig) Validate() error
Validate checks configuration validity.
type ShardedRWMutexStats ¶
ShardedRWMutexStats holds statistics for the sharded mutex.
type ShutdownError ¶
type ShutdownError struct {
Phase string // Phase: "drain", "flush", "close", "truncate"
Component string // Component: "WAL", "index_queue", "connections"
Cause error // Underlying error
Timestamp time.Time // When the error occurred
}
ShutdownError provides rich context for graceful shutdown operations.
func (*ShutdownError) Error ¶
func (e *ShutdownError) Error() string
func (*ShutdownError) Unwrap ¶
func (e *ShutdownError) Unwrap() error
type SnapshotBackend ¶
type SnapshotBackend interface {
// WriteSnapshot writes snapshot data for a collection
WriteSnapshot(ctx context.Context, name string, data []byte) error
// ReadSnapshot returns a reader for snapshot data
ReadSnapshot(ctx context.Context, name string) (io.ReadCloser, error)
// ListSnapshots returns all collection names with snapshots
ListSnapshots(ctx context.Context) ([]string, error)
// DeleteSnapshot removes a snapshot
DeleteSnapshot(ctx context.Context, name string) error
}
SnapshotBackend defines the interface for snapshot storage backends
type SplitBrainConfig ¶
type SplitBrainConfig struct {
HeartbeatInterval time.Duration
HeartbeatTimeout time.Duration
MinQuorum int
FenceOnPartition bool
}
SplitBrainConfig configures split-brain detection behavior.
type SplitBrainDetector ¶
type SplitBrainDetector struct {
// contains filtered or unexported fields
}
SplitBrainDetector monitors cluster health and detects network partitions.
func NewSplitBrainDetector ¶
func NewSplitBrainDetector(cfg SplitBrainConfig) *SplitBrainDetector
NewSplitBrainDetector creates a new split-brain detector.
func (*SplitBrainDetector) CheckQuorum ¶
func (d *SplitBrainDetector) CheckQuorum() bool
CheckQuorum evaluates if quorum is maintained and updates fencing state.
func (*SplitBrainDetector) GetHealthyPeerCount ¶
func (d *SplitBrainDetector) GetHealthyPeerCount() int
GetHealthyPeerCount returns the number of healthy peers.
func (*SplitBrainDetector) GetHeartbeatCount ¶
func (d *SplitBrainDetector) GetHeartbeatCount() uint64
GetHeartbeatCount returns the total heartbeat count.
func (*SplitBrainDetector) GetPartitionCount ¶
func (d *SplitBrainDetector) GetPartitionCount() uint64
GetPartitionCount returns the number of detected partitions.
func (*SplitBrainDetector) GetPeers ¶
func (d *SplitBrainDetector) GetPeers() []string
GetPeers returns all registered peer IDs.
func (*SplitBrainDetector) IsFenced ¶
func (d *SplitBrainDetector) IsFenced() bool
IsFenced returns true if the node is fenced due to partition.
func (*SplitBrainDetector) IsPeerHealthy ¶
func (d *SplitBrainDetector) IsPeerHealthy(id string) bool
IsPeerHealthy returns true if peer has sent heartbeat within timeout.
func (*SplitBrainDetector) RecordHeartbeat ¶
func (d *SplitBrainDetector) RecordHeartbeat(id string)
RecordHeartbeat updates the last heartbeat time for a peer.
func (*SplitBrainDetector) RegisterPeer ¶
func (d *SplitBrainDetector) RegisterPeer(id string)
RegisterPeer adds a peer to monitor.
func (*SplitBrainDetector) UnregisterPeer ¶
func (d *SplitBrainDetector) UnregisterPeer(id string)
UnregisterPeer removes a peer from monitoring.
type StdWAL ¶
type StdWAL struct {
// contains filtered or unexported fields
}
StdWAL is a standard file-based WAL implementation (fallback).
func NewStdWAL ¶
func NewStdWAL(dir string, v *VectorStore) *StdWAL
type StorageConfig ¶
type StorageConfig struct {
DataPath string
SnapshotInterval time.Duration
AsyncFsync bool
DoPutBatchSize int
UseIOUring bool
}
StorageConfig holds configuration for persistence
type StrategyType ¶
type StrategyType string
const ( StrategyRoundRobin StrategyType = "round_robin" StrategyLeastConnections StrategyType = "least_connections" StrategyWeighted StrategyType = "weighted" )
type TCPNoDelayListener ¶
type TCPNoDelayListener struct {
*net.TCPListener
}
TCPNoDelayListener wraps a TCPListener and sets TCP_NODELAY on all accepted connections. This disables Nagle's algorithm for lower latency at the cost of potential bandwidth efficiency.
func NewTCPNoDelayListener ¶
func NewTCPNoDelayListener(l *net.TCPListener) *TCPNoDelayListener
NewTCPNoDelayListener creates a new listener that sets TCP_NODELAY on accepted connections.
type TicketQuery ¶
type TicketQuery struct {
Name string `json:"name"`
Limit int64 `json:"limit"`
Filters []Filter `json:"filters"`
}
func FastParseTicketQuery ¶
func FastParseTicketQuery(ticket []byte) (TicketQuery, error)
FastParseTicketQuery parses ticket JSON using goccy/go-json which is 2-3x faster than encoding/json for this use case. It's a drop-in replacement with identical semantics.
func ParseTicketQuerySafe ¶
func ParseTicketQuerySafe(data []byte) (TicketQuery, error)
ParseTicketQuerySafe is a thread-safe wrapper that uses pooled parsers
type TimestampMap ¶
type TimestampMap struct {
// contains filtered or unexported fields
}
TimestampMap tracks the latest LWW timestamp for each VectorID.
func NewTimestampMap ¶
func NewTimestampMap() *TimestampMap
func (*TimestampMap) Get ¶
func (tm *TimestampMap) Get(id VectorID) int64
Get returns the timestamp for a given ID.
func (*TimestampMap) Len ¶
func (tm *TimestampMap) Len() int
Len returns the total number of entries tracked.
type VectorClock ¶
type VectorClock struct {
// contains filtered or unexported fields
}
VectorClock implements a vector clock for causal ordering in distributed systems
func NewVectorClock ¶
func NewVectorClock(nodeID string) *VectorClock
NewVectorClock creates a new vector clock for the given node
func (*VectorClock) Compare ¶
func (vc *VectorClock) Compare(other *VectorClock) ClockComparison
Compare compares this vector clock with another Returns: ClockEqual, ClockBefore, ClockAfter, or ClockConcurrent
func (*VectorClock) Copy ¶
func (vc *VectorClock) Copy() *VectorClock
Copy creates a deep copy of the vector clock
func (*VectorClock) Deserialize ¶
func (vc *VectorClock) Deserialize(data []byte) error
Deserialize restores vector clock from bytes
func (*VectorClock) Get ¶
func (vc *VectorClock) Get(nodeID string) uint64
Get returns the clock value for a specific node
func (*VectorClock) Increment ¶
func (vc *VectorClock) Increment()
Increment increments this node's clock value
func (*VectorClock) Merge ¶
func (vc *VectorClock) Merge(other *VectorClock)
Merge combines another vector clock into this one, taking max values
func (*VectorClock) NodeID ¶
func (vc *VectorClock) NodeID() string
NodeID returns the node identifier for this clock
func (*VectorClock) Serialize ¶
func (vc *VectorClock) Serialize() []byte
Serialize converts the vector clock to bytes for network transfer
func (*VectorClock) Set ¶
func (vc *VectorClock) Set(nodeID string, value uint64)
Set sets the clock value for a specific node
type VectorID ¶
type VectorID uint32
VectorID represents a unique identifier for a vector in the index. It maps to a specific location (Batch, Row) in the Arrow buffers.
type VectorIndex ¶
type VectorIndex interface {
// AddByLocation adds a vector from the dataset using batch and row indices.
// Returns the assigned internal Vector ID.
AddByLocation(batchIdx, rowIdx int) (uint32, error)
// AddByRecord adds a vector directly from a record batch.
// Returns the assigned internal Vector ID.
AddByRecord(rec arrow.RecordBatch, rowIdx, batchIdx int) (uint32, error)
// SearchVectors returns the k nearest neighbors for the query vector with scores and optional filtering.
SearchVectors(query []float32, k int, filters []Filter) []SearchResult
// SearchVectorsWithBitmap returns k nearest neighbors filtered by a bitset.
SearchVectorsWithBitmap(query []float32, k int, filter *Bitset) []SearchResult
// GetLocation retrieves the storage location for a given vector ID.
// Returns the location and true if found, or zero location and false if not found.
GetLocation(id VectorID) (Location, bool)
// Len returns the number of vectors in the index.
Len() int
// GetDimension returns the vector dimension.
GetDimension() uint32
// Warmup pre-loads index data into memory
Warmup() int
// SetIndexedColumns updates columns that should be indexed for fast equality lookups
SetIndexedColumns(cols []string)
// Close releases index resources.
Close() error
}
type VectorMetric ¶
type VectorMetric int
VectorMetric defines the distance metric type.
const ( MetricEuclidean VectorMetric = iota MetricCosine MetricDotProduct )
type VectorRecord ¶
VectorRecord represents a single row for Parquet serialization
type VectorSearchRequest ¶
type VectorSearchRequest struct {
Dataset string `json:"dataset"`
Vector []float32 `json:"vector"`
K int `json:"k"`
Filters []Filter `json:"filters"`
LocalOnly bool `json:"local_only"`
// Hybrid Search Fields
TextQuery string `json:"text_query"`
Alpha float32 `json:"alpha"` // 0.0=sparse, 1.0=dense, 0.5=hybrid
}
VectorSearchRequest defines the request format for VectorSearch action
type VectorSearchResponse ¶
VectorSearchResponse defines the response format for VectorSearch action
type VectorStore ¶
type VectorStore struct {
flight.BaseFlightServer
// Mesh integration
Mesh *mesh.Gossip
// contains filtered or unexported fields
}
VectorStore implements flight.FlightServer with minimal logic
func NewVectorStore ¶
func NewVectorStoreWithCompaction ¶
func NewVectorStoreWithCompaction(mem memory.Allocator, logger *zap.Logger, maxMemoryBytes int64, walMaxBytes int64, ttlDuration time.Duration, compactionCfg CompactionConfig) *VectorStore
NewVectorStoreWithCompaction returns a VectorStore with initialized compaction
func NewVectorStoreWithHybridConfig ¶
func NewVectorStoreWithHybridConfig(mem memory.Allocator, logger *zap.Logger, cfg HybridSearchConfig) (*VectorStore, error)
NewVectorStoreWithHybridConfig creates a VectorStore with hybrid search enabled.
func NewVectorStoreWithPipeline ¶
func NewVectorStoreWithPipeline(mem memory.Allocator, logger *zap.Logger, workers, bufferSize int) *VectorStore
NewVectorStoreWithPipeline creates a VectorStore with DoGet pipeline enabled
func NewVectorStoreWithPipelineThreshold ¶
func NewVectorStoreWithPipelineThreshold(mem memory.Allocator, logger *zap.Logger, workers, bufferSize, threshold int) *VectorStore
NewVectorStoreWithPipelineThreshold creates a VectorStore with custom pipeline threshold
func (*VectorStore) ApplyDelta ¶
func (s *VectorStore) ApplyDelta(name string, rec arrow.RecordBatch, seq uint64, ts int64) error
func (*VectorStore) Close ¶
func (s *VectorStore) Close() error
Close ensures the WAL is flushed and closed properly
func (*VectorStore) CompactDataset ¶
func (vs *VectorStore) CompactDataset(name string) error
CompactDataset manually triggers compaction for a specific dataset.
func (*VectorStore) CreateNamespace ¶
func (vs *VectorStore) CreateNamespace(name string) error
CreateNamespace creates a new namespace with the given name. Returns error if namespace already exists.
func (*VectorStore) DeleteNamespace ¶
func (vs *VectorStore) DeleteNamespace(name string) error
DeleteNamespace removes a namespace. Returns error if namespace is "default" or doesn't exist.
func (*VectorStore) DoAction ¶
func (s *VectorStore) DoAction(action *flight.Action, stream flight.FlightService_DoActionServer) error
DoAction handles custom actions like deletion and status
func (*VectorStore) DoExchange ¶
func (s *VectorStore) DoExchange(stream flight.FlightService_DoExchangeServer) error
DoExchange implements bidirectional Arrow Flight streaming for mesh replication. This provides the foundation for peer-to-peer Arrow stream transfer.
Protocol: - Client sends FlightData messages with FlightDescriptor indicating dataset - Server receives, processes, and can send data back - Enables future mesh replication between peers
func (*VectorStore) DoGet ¶
func (s *VectorStore) DoGet(tkt *flight.Ticket, stream flight.FlightService_DoGetServer) error
DoGet - Minimal implementation
func (*VectorStore) DoPut ¶
func (s *VectorStore) DoPut(stream flight.FlightService_DoPutServer) error
DoPut - Minimal implementation DoPut - Optimized implementation with batching
func (*VectorStore) GetAutoCompactionTriggerCount ¶
func (s *VectorStore) GetAutoCompactionTriggerCount() int64
GetAutoCompactionTriggerCount returns trigger count from the worker
func (*VectorStore) GetAutoShardingConfig ¶
func (s *VectorStore) GetAutoShardingConfig() AutoShardingConfig
GetAutoShardingConfig returns the current auto-sharding configuration
func (*VectorStore) GetBM25Index ¶
func (s *VectorStore) GetBM25Index() *BM25InvertedIndex
GetBM25Index returns the BM25 inverted index for text search. Returns nil if hybrid search is not enabled.
func (*VectorStore) GetCompactionConfig ¶
func (vs *VectorStore) GetCompactionConfig() CompactionConfig
GetCompactionConfig returns the current compaction configuration.
func (*VectorStore) GetCompactionStats ¶
func (vs *VectorStore) GetCompactionStats() CompactionStats
GetCompactionStats returns compaction statistics.
func (*VectorStore) GetDataset ¶
func (s *VectorStore) GetDataset(name string) (*Dataset, error)
GetDataset retrieves a dataset by name.
func (*VectorStore) GetDoGetPipelinePool ¶
func (s *VectorStore) GetDoGetPipelinePool() *DoGetPipelinePool
GetDoGetPipelinePool returns the pipeline pool (nil if not configured)
func (*VectorStore) GetFlightInfo ¶
func (s *VectorStore) GetFlightInfo(ctx context.Context, desc *flight.FlightDescriptor) (*flight.FlightInfo, error)
func (*VectorStore) GetHybridSearchConfig ¶
func (s *VectorStore) GetHybridSearchConfig() HybridSearchConfig
GetHybridSearchConfig returns the hybrid search configuration.
func (*VectorStore) GetIndexedColumns ¶
func (s *VectorStore) GetIndexedColumns() []string
GetIndexedColumns returns columns currently being indexed
func (*VectorStore) GetNamespace ¶
func (vs *VectorStore) GetNamespace(name string) *Namespace
GetNamespace returns a namespace by name, or nil if not found.
func (*VectorStore) GetNamespaceDatasetCount ¶
func (vs *VectorStore) GetNamespaceDatasetCount(name string) int
GetNamespaceDatasetCount returns the number of datasets in a namespace.
func (*VectorStore) GetPipelineStats ¶
func (s *VectorStore) GetPipelineStats() PipelineStats
GetPipelineStats returns current pipeline statistics
func (*VectorStore) GetPipelineThreshold ¶
func (s *VectorStore) GetPipelineThreshold() int
GetPipelineThreshold returns the batch count threshold for pipeline use
func (*VectorStore) GetSchema ¶
func (s *VectorStore) GetSchema(ctx context.Context, desc *flight.FlightDescriptor) (*flight.SchemaResult, error)
func (*VectorStore) GetTotalNamespaceCount ¶
func (vs *VectorStore) GetTotalNamespaceCount() int
GetTotalNamespaceCount returns the total number of namespaces.
func (*VectorStore) GetWALQueueDepth ¶
func (s *VectorStore) GetWALQueueDepth() (int, int)
func (*VectorStore) HybridSearch ¶
func (s *VectorStore) HybridSearch(ctx context.Context, name string, query []float32, k int, filters map[string]string) ([]SearchResult, error)
HybridSearch is a wrapper for the HybridSearch function
func (*VectorStore) IndexRecordColumns ¶
func (s *VectorStore) IndexRecordColumns(datasetName string, rec arrow.RecordBatch, batchIdx int)
IndexRecordColumns indexes specific columns for fast equality lookups
func (*VectorStore) InitPersistence ¶
func (s *VectorStore) InitPersistence(cfg StorageConfig) error
InitPersistence initializes the WAL and loads any existing data
func (*VectorStore) IsCompactionRunning ¶
func (vs *VectorStore) IsCompactionRunning() bool
IsCompactionRunning returns true if the compaction worker is running.
func (*VectorStore) IterateDatasets ¶
func (s *VectorStore) IterateDatasets(fn func(ds *Dataset))
IterateDatasets safely iterates over all datasets for background tasks.
func (*VectorStore) ListFlights ¶
func (s *VectorStore) ListFlights(c *flight.Criteria, stream flight.FlightService_ListFlightsServer) error
func (*VectorStore) ListNamespaces ¶
func (vs *VectorStore) ListNamespaces() []string
ListNamespaces returns all namespace names.
func (*VectorStore) MapInternalToUserIDs ¶
func (s *VectorStore) MapInternalToUserIDs(ds *Dataset, results []SearchResult) []SearchResult
MapInternalToUserIDs maps internal HNSW IDs to user-provided IDs
func (*VectorStore) MerkleRoot ¶
func (s *VectorStore) MerkleRoot(name string) [32]byte
func (*VectorStore) NamespaceExists ¶
func (vs *VectorStore) NamespaceExists(name string) bool
NamespaceExists checks if a namespace exists.
func (*VectorStore) SearchHybrid ¶
func (s *VectorStore) SearchHybrid(ctx context.Context, name string, query []float32, textQuery string, k int, alpha float32, rrfK int) ([]SearchResult, error)
SearchHybrid is a wrapper for the SearchHybrid function (RRF version)
func (*VectorStore) SetAutoShardingConfig ¶
func (s *VectorStore) SetAutoShardingConfig(cfg AutoShardingConfig)
SetAutoShardingConfig updates the auto-sharding configuration
func (*VectorStore) SetIndexedColumns ¶
func (s *VectorStore) SetIndexedColumns(cols []string)
SetIndexedColumns updates columns that should be indexed for fast equality lookups
func (*VectorStore) SetMesh ¶
func (s *VectorStore) SetMesh(m *mesh.Gossip)
func (*VectorStore) Shutdown ¶
func (s *VectorStore) Shutdown(ctx context.Context) error
Shutdown performs a graceful shutdown of the VectorStore. It stops accepting new requests, drains the index queue, flushes the WAL, and closes all file handles. The context controls the shutdown timeout.
func (*VectorStore) Snapshot ¶
func (s *VectorStore) Snapshot() error
func (*VectorStore) StartEvictionTicker ¶
func (s *VectorStore) StartEvictionTicker(d time.Duration)
func (*VectorStore) StartMetricsTicker ¶
func (s *VectorStore) StartMetricsTicker(d time.Duration)
func (*VectorStore) StartWALCheckTicker ¶
func (s *VectorStore) StartWALCheckTicker(d time.Duration)
func (*VectorStore) StoreRecordBatch ¶
func (s *VectorStore) StoreRecordBatch(ctx context.Context, name string, rec arrow.RecordBatch) error
StoreRecordBatch stores a batch of records in a dataset
func (*VectorStore) TruncateWAL ¶
func (s *VectorStore) TruncateWAL() error
TruncateWAL truncates the WAL file after a successful snapshot. This should only be called after SaveSnapshot completes successfully.
func (*VectorStore) UpdateConfig ¶
func (s *VectorStore) UpdateConfig(maxMemory, maxWALSize int64, snapshotInterval time.Duration)
func (*VectorStore) Warmup ¶
func (s *VectorStore) Warmup() WarmupStats
Warmup iterates through all datasets and warms up their indexes
type VectorizedFilter ¶
type VectorizedFilter struct {
// contains filtered or unexported fields
}
VectorizedFilter provides optimized Arrow Compute based filtering
func NewVectorizedFilter ¶
func NewVectorizedFilter(alloc memory.Allocator) *VectorizedFilter
NewVectorizedFilter creates a new vectorized filter
func (*VectorizedFilter) Apply ¶
func (vf *VectorizedFilter) Apply(ctx context.Context, rec arrow.RecordBatch, filters []Filter) (arrow.RecordBatch, error)
Apply applies filters to a record batch using vectorized Arrow Compute operations
type VersionedData ¶
type VersionedData struct {
Dataset string
Data []byte
Clock *VectorClock
}
VersionedData wraps data with a vector clock for causal consistency
func NewVersionedData ¶
func NewVersionedData(dataset string, data []byte, clock *VectorClock) *VersionedData
NewVersionedData creates versioned data with a vector clock
func (*VersionedData) Conflicts ¶
func (vd *VersionedData) Conflicts(other *VersionedData) bool
Conflicts returns true if this data is concurrent with the other
func (*VersionedData) Supersedes ¶
func (vd *VersionedData) Supersedes(other *VersionedData) bool
Supersedes returns true if this data happened after the other
type WAL ¶
type WAL interface {
Write(name string, seq uint64, ts int64, record arrow.RecordBatch) error
Close() error
Sync() error
}
WAL defines the interface for Write-Ahead Logging.
func NewWAL ¶
func NewWAL(dir string, v *VectorStore) WAL
type WALBackend ¶
type WALBackend interface {
// Write writes the data to the WAL.
// Implementations should handle offset tracking if necessary (e.g. for io_uring pwrite).
Write(p []byte) (n int, err error)
// Sync explicitly flushes data to disk.
Sync() error
// Close closes the underlying resource.
Close() error
// Name returns the file name or identifier.
Name() string
// File returns the underlying *os.File if available.
File() *os.File
}
WALBackend defines the interface for low-level WAL I/O. Implementations must be thread-safe or serialization must be handled by caller. WALBatcher currently handles serialization (single writer in flush loop).
func NewUringBackend ¶
func NewUringBackend(path string) (WALBackend, error)
NewUringBackend returns an error on non-Linux systems.
func NewWALBackend ¶
func NewWALBackend(path string, preferAsync bool) (WALBackend, error)
NewWALBackend creates a new WALBackend. It attempts to use io_uring if preferAsync is true and the platform supports it. Otherwise, it falls back to FSBackend.
type WALBatcher ¶
type WALBatcher struct {
// contains filtered or unexported fields
}
WALBatcher batches WAL writes for improved performance Instead of fsync on every write, it groups writes and flushes periodically
func NewWALBatcher ¶
func NewWALBatcher(dataPath string, config *WALBatcherConfig) *WALBatcher
NewWALBatcher creates a new batched WAL writer
func (*WALBatcher) AsyncFsyncStats ¶
func (w *WALBatcher) AsyncFsyncStats() *AsyncFsyncerStats
AsyncFsyncStats returns stats from the async fsyncer, or nil if not enabled
func (*WALBatcher) FlushError ¶
func (w *WALBatcher) FlushError() error
FlushError returns the last flush error if any
func (*WALBatcher) GetCurrentInterval ¶
func (w *WALBatcher) GetCurrentInterval() time.Duration
GetCurrentInterval returns the current flush interval
func (*WALBatcher) IsAdaptiveEnabled ¶
func (w *WALBatcher) IsAdaptiveEnabled() bool
IsAdaptiveEnabled returns whether adaptive batching is enabled
func (*WALBatcher) IsAsyncFsyncEnabled ¶
func (w *WALBatcher) IsAsyncFsyncEnabled() bool
IsAsyncFsyncEnabled returns true if async fsync is configured and running
func (*WALBatcher) QueueCapacity ¶
func (w *WALBatcher) QueueCapacity() int
QueueCapacity returns the maximum capacity of the queue
func (*WALBatcher) QueueDepth ¶
func (w *WALBatcher) QueueDepth() int
QueueDepth returns the current number of pending entries
func (*WALBatcher) Start ¶
func (w *WALBatcher) Start() error
Start initializes the WAL file and starts the background flush goroutine
func (*WALBatcher) Stop ¶
func (w *WALBatcher) Stop() error
Stop gracefully shuts down the batcher, flushing pending writes
func (*WALBatcher) Write ¶
func (w *WALBatcher) Write(rec arrow.RecordBatch, name string, seq uint64, ts int64) error
Write queues a record for batched WAL writing (non-blocking)
type WALBatcherConfig ¶
type WALBatcherConfig struct {
FlushInterval time.Duration // Time between flushes (e.g., 10ms)
MaxBatchSize int // Max entries before forced flush (e.g., 100)
Adaptive AdaptiveWALConfig // Adaptive batching configuration
AsyncFsync AsyncFsyncConfig // Async fsync configuration
UseIOUring bool // Use io_uring backend if available
}
WALBatcherConfig configures the batched WAL writer
func DefaultWALBatcherConfig ¶
func DefaultWALBatcherConfig() WALBatcherConfig
DefaultWALBatcherConfig returns sensible defaults
type WALEntry ¶
type WALEntry struct {
Record arrow.RecordBatch
Name string
Seq uint64
Timestamp int64
}
WALEntry represents a single entry to be written to the WAL
type WALError ¶
type WALError struct {
Op string // Operation: "write", "read", "flush", "truncate"
Path string // WAL file path
Offset int64 // Byte offset where error occurred
Cause error // Underlying error
Timestamp time.Time // When the error occurred
}
WALError provides rich context for Write-Ahead Log operations.
type WALIterator ¶
type WALIterator struct {
// contains filtered or unexported fields
}
WALIterator iterators over WAL entries, supporting seeking by Sequence ID.
func NewWALIterator ¶
func NewWALIterator(dir string, mem memory.Allocator) (*WALIterator, error)
func (*WALIterator) Close ¶
func (it *WALIterator) Close() error
func (*WALIterator) Next ¶
func (it *WALIterator) Next() (uint64, int64, string, arrow.RecordBatch, error)
Next reads the next entry. Returns io.EOF if done.
func (*WALIterator) Seek ¶
func (it *WALIterator) Seek(seq uint64) error
Seek fast-forwards the iterator to the first entry with Sequence > seq.
type WarmupStats ¶
type WarmupStats struct {
DatasetsWarmed int
DatasetsSkipped int
TotalNodesWarmed int
Duration time.Duration
}
WarmupStats holds statistics about the warmup operation
func (WarmupStats) String ¶
func (w WarmupStats) String() string
type WeightedStrategy ¶
type WeightedStrategy struct {
// contains filtered or unexported fields
}
WeightedStrategy picks replicas based on assigned weights
func NewWeightedStrategy ¶
func NewWeightedStrategy() *WeightedStrategy
NewWeightedStrategy creates a new weighted strategy
func (*WeightedStrategy) Select ¶
func (s *WeightedStrategy) Select(replicas []string) string
Select picks a replica based on weighted probability
func (*WeightedStrategy) SetWeight ¶
func (s *WeightedStrategy) SetWeight(replica string, weight int)
SetWeight sets the weight for a replica
type WriteRateTracker ¶
type WriteRateTracker struct {
// contains filtered or unexported fields
}
WriteRateTracker tracks write rate using exponential moving average
func NewWriteRateTracker ¶
func NewWriteRateTracker(window time.Duration) *WriteRateTracker
NewWriteRateTracker creates a tracker with the given averaging window
func (*WriteRateTracker) GetRate ¶
func (t *WriteRateTracker) GetRate() float64
GetRate returns the current estimated write rate (writes per second)
func (*WriteRateTracker) RecordWrite ¶
func (t *WriteRateTracker) RecordWrite()
RecordWrite records a single write operation
type ZeroAllocTicketParser ¶
type ZeroAllocTicketParser struct {
// contains filtered or unexported fields
}
ZeroAllocTicketParser parses TicketQuery JSON with zero allocations for the common case (no escape sequences).
func NewZeroAllocTicketParser ¶
func NewZeroAllocTicketParser() *ZeroAllocTicketParser
NewZeroAllocTicketParser creates a new reusable parser
func (*ZeroAllocTicketParser) Parse ¶
func (p *ZeroAllocTicketParser) Parse(data []byte) (TicketQuery, error)
Parse parses the JSON data into a TicketQuery
type ZeroAllocVectorSearchParser ¶
type ZeroAllocVectorSearchParser struct {
// contains filtered or unexported fields
}
ZeroAllocVectorSearchParser parses VectorSearchRequest JSON with minimal allocations. The parser pre-allocates a vector slice and reuses it across parse calls. For the common case (no escape sequences in dataset name), this achieves zero allocations beyond the initial pre-allocation.
func NewZeroAllocVectorSearchParser ¶
func NewZeroAllocVectorSearchParser(maxDims int) *ZeroAllocVectorSearchParser
NewZeroAllocVectorSearchParser creates a new reusable parser. maxDims specifies the maximum expected vector dimensions for pre-allocation.
func (*ZeroAllocVectorSearchParser) Parse ¶
func (p *ZeroAllocVectorSearchParser) Parse(data []byte) (VectorSearchRequest, error)
Parse parses the JSON data into a VectorSearchRequest. The returned VectorSearchRequest.Vector shares the parser's internal buffer, so the result is only valid until the next Parse call.
type ZeroCopyBufferConfig ¶
ZeroCopyBufferConfig configures zero-copy buffer behavior
func DefaultZeroCopyBufferConfig ¶
func DefaultZeroCopyBufferConfig() ZeroCopyBufferConfig
DefaultZeroCopyBufferConfig returns sensible defaults
func (ZeroCopyBufferConfig) Validate ¶
func (c ZeroCopyBufferConfig) Validate() error
Validate checks configuration validity
type ZeroCopyPayload ¶
type ZeroCopyPayload struct {
DatasetName string
Schema []byte
RowCount int64
Meta []ArrayStructure // One per column
Buffers [][]byte
}
ZeroCopyPayload represents a serialized RecordBatch that references raw buffers. This is used for high-speed P2P replication (Item 7).
func MarshalZeroCopy ¶
func MarshalZeroCopy(name string, rec arrow.RecordBatch) (*ZeroCopyPayload, error)
MarshalZeroCopy creates a payload that references the underlying Arrow buffers. WARNING: The resulting payload is only valid as long as the RecordBatch is retained.
Source Files
¶
- adaptive_index.go
- adaptive_wal.go
- arrow_allocator.go
- async_fsync.go
- batched_wal.go
- binary_quantization.go
- bitmap.go
- bitmap_pool.go
- bloom_filter.go
- bm25_inverted_index.go
- bm25_scorer.go
- buffer_pool.go
- checkpoint_coordinator.go
- circuit_breaker.go
- column_inverted_index.go
- compaction.go
- compaction_store.go
- cow_metadata.go
- dataset.go
- do_exchange.go
- doget_pipeline.go
- doget_pipeline_integration.go
- errors.go
- fast_json.go
- fast_path_filter.go
- fast_path_ordered.go
- filter_evaluator.go
- flight_client_pool.go
- flight_data_queue.go
- global_search.go
- graph_store.go
- grpc_options.go
- hnsw.go
- hnsw_autoshard.go
- hnsw_batch.go
- hnsw_gpu.go
- hnsw_graph_sync.go
- hnsw_lockfree.go
- hybrid_config.go
- hybrid_pipeline.go
- hybrid_query.go
- hybrid_search.go
- index.go
- index_queue.go
- inverted_index.go
- inverted_index_sharded.go
- load_balancer.go
- lww.go
- memory.go
- memory_backpressure.go
- memory_linux.go
- merkle.go
- mesh_cache.go
- namespace.go
- numa_allocator.go
- numa_allocator_linux.go
- numa_linux.go
- numa_pin_linux.go
- parallel_filter.go
- parquet_adapter.go
- partitioned_records.go
- peer_replicator.go
- perp_result_pool.go
- persistence.go
- pluggable_index.go
- pluggable_index_adapters.go
- pooled_allocator.go
- product_quantization.go
- quorum.go
- record_eviction.go
- record_size_cache.go
- record_writer_pool.go
- replication.go
- request_semaphore.go
- result_pool.go
- rrf.go
- s3_backend.go
- scalar_quantization.go
- schema_evolution.go
- schema_lock.go
- search_arena.go
- servers.go
- shard.go
- sharded_dataset.go
- sharded_hnsw.go
- sharded_indexing.go
- sharded_mutex.go
- shutdown.go
- split_brain_detector.go
- store.go
- store_gpu_stub.go
- store_helpers.go
- store_hybrid.go
- tcp_nodelay.go
- vector_clock.go
- vector_search_action.go
- vectorized_filter.go
- wal_backend.go
- wal_backend_stub.go
- wal_buffer_pool.go
- wal_interface.go
- wal_iterator.go
- zero_alloc_parser.go
- zero_alloc_vector_search.go
- zero_copy.go