storage

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2025 License: Apache-2.0 Imports: 39 Imported by: 0

README

File Version Compatibility Information

The current file version is 1.1.0, compatible with versions specified in version.yml.

Documentation

Overview

Package storage implements a time-series-based storage engine. It provides:

  • Partition data based on a time axis.
  • Sharding data based on a series id which represents a unique entity of stream/measure
  • Retrieving data based on index.Filter.
  • Cleaning expired data, or the data retention.

Index

Constants

View Source
const (
	// DefaultInitialRetryDelay is the initial delay before the first retry.
	DefaultInitialRetryDelay = 1 * time.Second
	// DefaultMaxRetries is the maximum number of retry attempts.
	DefaultMaxRetries = 3
	// DefaultBackoffMultiplier is the multiplier for exponential backoff.
	DefaultBackoffMultiplier = 2
	// FailedPartsDirName is the name of the directory for failed parts.
	FailedPartsDirName = "failed-parts"
)
View Source
const (

	// DirPerm is the permission of the directory.
	DirPerm = 0o700
	// SnapshotsDir is the directory for snapshots.
	SnapshotsDir = "snapshots"
	// RepairDir is the directory for repairs.
	RepairDir = "repairs"
	// DataDir is the directory for data.
	DataDir = "data"
	// FilePerm is the permission of the file.
	FilePerm = 0o600
)

Variables

View Source
var ErrSegmentClosed = errors.New("segment closed")

ErrSegmentClosed is returned when trying to access a closed segment.

View Source
var (
	// ErrUnknownShard indicates that the shard is not found.
	ErrUnknownShard = errors.New("unknown shard")
)

Functions

func DeleteOldSnapshots added in v0.9.0

func DeleteOldSnapshots(root string, maxAge time.Duration, lfs fs.FileSystem)

DeleteOldSnapshots deletes snapshots older than the specified maxAge duration. This function is used during forced cleanup to remove old snapshots regardless of count.

func DeleteStaleSnapshots added in v0.8.0

func DeleteStaleSnapshots(root string, maxNum int, lfs fs.FileSystem)

DeleteStaleSnapshots deletes the stale snapshots in the root directory.

func GetCompatibleVersions added in v0.9.0

func GetCompatibleVersions() []string

GetCompatibleVersions returns the list of compatible storage versions.

func GetCurrentVersion added in v0.9.0

func GetCurrentVersion() string

GetCurrentVersion returns the current storage version.

func VisitSegmentsInTimeRange added in v0.9.0

func VisitSegmentsInTimeRange(tsdbRootPath string, timeRange timestamp.TimeRange, visitor SegmentVisitor, segmentInterval IntervalRule) ([]string, error)

VisitSegmentsInTimeRange traverses segments within the specified time range and calls the visitor methods for series index and shard directories. This function works directly with the filesystem without requiring a database instance. Returns a list of segment suffixes that were visited.

Types

type Cache added in v0.9.0

type Cache interface {
	Get(key EntryKey) Sizable
	Put(key EntryKey, value Sizable)
	Close()
	Requests() uint64
	Misses() uint64
	Entries() uint64
	Size() uint64
}

Cache encapsulates the cache operations.

func NewBypassCache added in v0.9.0

func NewBypassCache() Cache

NewBypassCache creates a no-op cache implementation.

func NewServiceCache added in v0.9.0

func NewServiceCache() Cache

NewServiceCache creates a cache for service with default configuration.

func NewServiceCacheWithConfig added in v0.9.0

func NewServiceCacheWithConfig(config CacheConfig) Cache

NewServiceCacheWithConfig creates a cache for service with custom configuration.

func NewShardCache added in v0.9.0

func NewShardCache(group string, segmentID segmentID, shardID common.ShardID) Cache

NewShardCache creates a new shard cache.

type CacheConfig added in v0.9.0

type CacheConfig struct {
	MaxCacheSize    run.Bytes
	CleanupInterval time.Duration
	IdleTimeout     time.Duration
}

CacheConfig holds configuration parameters for the cache.

func DefaultCacheConfig added in v0.9.0

func DefaultCacheConfig() CacheConfig

DefaultCacheConfig returns the default cache configuration.

type DiskMonitor added in v0.9.0

type DiskMonitor struct {
	// contains filtered or unexported fields
}

DiskMonitor monitors disk usage and orchestrates forced retention cleanup for a service when disk usage exceeds configured watermarks.

func NewDiskMonitor added in v0.9.0

func NewDiskMonitor(service RetentionService, config RetentionConfig, omr observability.MetricsRegistry) *DiskMonitor

NewDiskMonitor creates a new disk monitor for the given service.

func (*DiskMonitor) Start added in v0.9.0

func (dm *DiskMonitor) Start()

Start begins monitoring disk usage and starts the forced retention process.

func (*DiskMonitor) Stop added in v0.9.0

func (dm *DiskMonitor) Stop()

Stop stops the disk monitor gracefully.

type EntryKey added in v0.9.0

type EntryKey struct {
	// contains filtered or unexported fields
}

EntryKey is the key of an entry in the cache.

func NewEntryKey added in v0.9.0

func NewEntryKey(partID uint64, offset uint64) EntryKey

NewEntryKey creates an entry key with partID and offset.

type FailedPartsHandler added in v0.9.0

type FailedPartsHandler struct {
	// contains filtered or unexported fields
}

FailedPartsHandler handles retry logic and filesystem fallback for failed parts.

func NewFailedPartsHandler added in v0.9.0

func NewFailedPartsHandler(fileSystem fs.FileSystem, root string, l *logger.Logger, maxTotalSizeBytes uint64) *FailedPartsHandler

NewFailedPartsHandler creates a new handler for failed parts.

func (*FailedPartsHandler) CopyToFailedPartsDir added in v0.9.0

func (h *FailedPartsHandler) CopyToFailedPartsDir(partID uint64, sourcePath string, destSubDir string) error

CopyToFailedPartsDir copies a part to the failed-parts directory using hard links.

func (*FailedPartsHandler) RetryFailedParts added in v0.9.0

func (h *FailedPartsHandler) RetryFailedParts(
	ctx context.Context,
	failedParts []queue.FailedPart,
	partsInfo map[uint64][]*PartInfo,
	syncFunc func(partIDs []uint64) ([]queue.FailedPart, error),
) ([]uint64, error)

RetryFailedParts attempts to retry failed parts with exponential backoff. Returns the list of permanently failed part IDs after all retries are exhausted.

type FieldResult added in v0.7.0

type FieldResult map[string][]byte

FieldResult is the result of a field.

type FieldResultList added in v0.7.0

type FieldResultList []FieldResult

FieldResultList is a list of FieldResult.

type IndexDB

type IndexDB interface {
	Insert(docs index.Documents) error
	Update(docs index.Documents) error
	Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts) (SeriesData, [][]byte, error)
	SearchWithoutSeries(ctx context.Context, opts IndexSearchOpts) (sd SeriesData, sortedValues [][]byte, err error)
	EnableExternalSegments() (index.ExternalSegmentStreamer, error)
}

IndexDB is the interface of index database.

type IndexGranularity

type IndexGranularity int

IndexGranularity denotes the granularity of the local index.

const (
	IndexGranularityBlock IndexGranularity = iota
	IndexGranularitySeries
)

The options of the local index granularity.

type IndexSearchOpts added in v0.7.0

type IndexSearchOpts struct {
	Query       index.Query
	Order       *index.OrderBy
	TimeRange   *timestamp.TimeRange
	Projection  []index.FieldKey
	PreloadSize int
}

IndexSearchOpts is the options for searching index.

type IntervalRule

type IntervalRule struct {
	Unit IntervalUnit
	Num  int
}

IntervalRule defines a length of two points in time.

func MustToIntervalRule

func MustToIntervalRule(ir *commonv1.IntervalRule) (result IntervalRule)

MustToIntervalRule converts a commonv1.IntervalRule to IntervalRule.

func (IntervalRule) NextTime added in v0.9.0

func (ir IntervalRule) NextTime(current time.Time) time.Time

NextTime returns the next time point based on the current time and interval rule.

type IntervalUnit

type IntervalUnit int

IntervalUnit denotes the unit of a time point.

const (
	HOUR IntervalUnit = iota
	DAY
)

Available IntervalUnits. HOUR and DAY are adequate for the APM scenario.

func (IntervalUnit) Standard added in v0.9.0

func (iu IntervalUnit) Standard(t time.Time) time.Time

Standard returns a standardized time based on the interval unit.

func (IntervalUnit) String

func (iu IntervalUnit) String() string

type Metrics added in v0.7.0

type Metrics interface {
	// DeleteAll deletes all metrics.
	DeleteAll()
}

Metrics is the interface of metrics.

type PartInfo added in v0.9.0

type PartInfo struct {
	SourcePath string
	PartType   string
	PartID     uint64
}

PartInfo contains information needed to retry or copy a failed part.

type RetentionConfig added in v0.9.0

type RetentionConfig struct {
	// HighWatermark is the disk usage percentage that triggers forced cleanup (0-100)
	HighWatermark float64
	// LowWatermark is the disk usage percentage where cleanup stops (0-100)
	LowWatermark float64
	// CheckInterval is how often to check disk usage
	CheckInterval time.Duration
	// Cooldown is the sleep duration between segment deletions
	Cooldown time.Duration
	// ForceCleanupEnabled determines whether forced cleanup is enabled
	ForceCleanupEnabled bool
}

RetentionConfig holds the configuration for forced retention cleanup.

type RetentionService added in v0.9.0

type RetentionService interface {
	// GetDataPath returns the service's data directory path
	GetDataPath() string
	// GetSnapshotDir returns the service's snapshot directory path
	GetSnapshotDir() string
	// LoadAllGroups returns all groups managed by this service
	LoadAllGroups() []resourceSchema.Group
	// PeekOldestSegmentEndTimeInGroup returns the end time of the oldest segment in the specified group
	// Returns zero time and false if no segments exist or group not found
	PeekOldestSegmentEndTimeInGroup(group string) (time.Time, bool)
	// DeleteOldestSegmentInGroup deletes the oldest segment in the specified group
	// Returns true if a segment was deleted, false if no segments to delete
	DeleteOldestSegmentInGroup(group string) (bool, error)
	// CleanupOldSnapshots removes snapshots older than the specified duration
	CleanupOldSnapshots(maxAge time.Duration) error
	// GetServiceName returns the service name for metrics and logging
	GetServiceName() string
}

RetentionService defines the interface that services must implement to support forced retention cleanup.

type Segment added in v0.7.0

type Segment[T TSTable, O any] interface {
	DecRef()
	GetTimeRange() timestamp.TimeRange
	CreateTSTableIfNotExist(shardID common.ShardID) (T, error)
	Tables() ([]T, []Cache)
	Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error)
	IndexDB() IndexDB
}

Segment is a time range of data.

type SegmentVisitor added in v0.9.0

type SegmentVisitor interface {
	// VisitSeries visits the series index directory for a segment.
	VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string, shardIDs []common.ShardID) error
	// VisitShard visits a shard directory within a segment.
	VisitShard(segmentTR *timestamp.TimeRange, shardID common.ShardID, shardPath string) error
}

SegmentVisitor defines the interface for visiting segment components.

type SeriesData added in v0.8.0

type SeriesData struct {
	SeriesList pbv1.SeriesList
	Fields     FieldResultList
	Timestamps []int64
	Versions   []int64
}

SeriesData is the result of a series.

type Sizable added in v0.9.0

type Sizable interface {
	Size() uint64
}

Sizable represents an object that can report its memory size.

type SupplyTSDB

type SupplyTSDB[T TSTable] func() T

SupplyTSDB allows getting a tsdb's runtime.

type TSDB

type TSDB[T TSTable, O any] interface {
	io.Closer
	CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error)
	SelectSegments(timeRange timestamp.TimeRange) ([]Segment[T, O], error)
	Tick(ts int64)
	UpdateOptions(opts *commonv1.ResourceOpts)
	TakeFileSnapshot(dst string) error
	GetExpiredSegmentsTimeRange() *timestamp.TimeRange
	DeleteExpiredSegments(segmentSuffixes []string) int64
	// PeekOldestSegmentEndTime returns the end time of the oldest segment.
	// Returns the zero time and false if no segments exist or retention gate cannot be acquired.
	PeekOldestSegmentEndTime() (time.Time, bool)
	// DeleteOldestSegment deletes exactly one oldest segment if it exists and meets safety rules.
	// Returns true if a segment was deleted, false otherwise.
	DeleteOldestSegment() (bool, error)
}

TSDB allows listing and getting shard details.

func OpenTSDB

func OpenTSDB[T TSTable, O any](ctx context.Context, opts TSDBOpts[T, O], cache Cache, group string) (TSDB[T, O], error)

OpenTSDB returns a new tsdb runtime. This constructor will create a new database if it's absent, or load an existing one.

type TSDBOpts

type TSDBOpts[T TSTable, O any] struct {
	Option                         O
	TableMetrics                   Metrics
	TSTableCreator                 TSTableCreator[T, O]
	StorageMetricsFactory          *observability.Factory
	Location                       string
	SegmentInterval                IntervalRule
	TTL                            IntervalRule
	SeriesIndexFlushTimeoutSeconds int64
	SeriesIndexCacheMaxBytes       int
	ShardNum                       uint32
	DisableRetention               bool
	SegmentIdleTimeout             time.Duration
	MemoryLimit                    uint64
}

TSDBOpts wraps options to create a tsdb.

type TSTable

type TSTable interface {
	io.Closer
	Collect(Metrics)
	TakeFileSnapshot(dst string) error
}

TSTable is time series table.

type TSTableCreator

type TSTableCreator[T TSTable, O any] func(fileSystem fs.FileSystem, root string, position common.Position,
	l *logger.Logger, timeRange timestamp.TimeRange, option O, metrics any) (T, error)

TSTableCreator creates a TSTable.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL