Documentation
¶
Overview ¶
Package storage implements a time-series-based storage engine. It provides:
- Partition data based on a time axis.
- Sharding data based on a series id which represents a unique entity of stream/measure
- Retrieving data based on index.Filter.
- Cleaning expired data, or the data retention.
Index ¶
- Constants
- Variables
- func DeleteOldSnapshots(root string, maxAge time.Duration, lfs fs.FileSystem)
- func DeleteStaleSnapshots(root string, maxNum int, lfs fs.FileSystem)
- func GetCompatibleVersions() []string
- func GetCurrentVersion() string
- func VisitSegmentsInTimeRange(tsdbRootPath string, timeRange timestamp.TimeRange, visitor SegmentVisitor, ...) ([]string, error)
- type Cache
- type CacheConfig
- type DiskMonitor
- type EntryKey
- type FailedPartsHandler
- type FieldResult
- type FieldResultList
- type IndexDB
- type IndexGranularity
- type IndexSearchOpts
- type IntervalRule
- type IntervalUnit
- type Metrics
- type PartInfo
- type RetentionConfig
- type RetentionService
- type Segment
- type SegmentVisitor
- type SeriesData
- type Sizable
- type SupplyTSDB
- type TSDB
- type TSDBOpts
- type TSTable
- type TSTableCreator
Constants ¶
const ( // DefaultInitialRetryDelay is the initial delay before the first retry. DefaultInitialRetryDelay = 1 * time.Second // DefaultMaxRetries is the maximum number of retry attempts. DefaultMaxRetries = 3 // DefaultBackoffMultiplier is the multiplier for exponential backoff. DefaultBackoffMultiplier = 2 // FailedPartsDirName is the name of the directory for failed parts. FailedPartsDirName = "failed-parts" )
const ( // DirPerm is the permission of the directory. DirPerm = 0o700 // SnapshotsDir is the directory for snapshots. SnapshotsDir = "snapshots" // RepairDir is the directory for repairs. RepairDir = "repairs" // DataDir is the directory for data. DataDir = "data" // FilePerm is the permission of the file. FilePerm = 0o600 )
Variables ¶
var ErrSegmentClosed = errors.New("segment closed")
ErrSegmentClosed is returned when trying to access a closed segment.
var ( // ErrUnknownShard indicates that the shard is not found. ErrUnknownShard = errors.New("unknown shard") )
Functions ¶
func DeleteOldSnapshots ¶ added in v0.9.0
func DeleteOldSnapshots(root string, maxAge time.Duration, lfs fs.FileSystem)
DeleteOldSnapshots deletes snapshots older than the specified maxAge duration. This function is used during forced cleanup to remove old snapshots regardless of count.
func DeleteStaleSnapshots ¶ added in v0.8.0
func DeleteStaleSnapshots(root string, maxNum int, lfs fs.FileSystem)
DeleteStaleSnapshots deletes the stale snapshots in the root directory.
func GetCompatibleVersions ¶ added in v0.9.0
func GetCompatibleVersions() []string
GetCompatibleVersions returns the list of compatible storage versions.
func GetCurrentVersion ¶ added in v0.9.0
func GetCurrentVersion() string
GetCurrentVersion returns the current storage version.
func VisitSegmentsInTimeRange ¶ added in v0.9.0
func VisitSegmentsInTimeRange(tsdbRootPath string, timeRange timestamp.TimeRange, visitor SegmentVisitor, segmentInterval IntervalRule) ([]string, error)
VisitSegmentsInTimeRange traverses segments within the specified time range and calls the visitor methods for series index and shard directories. This function works directly with the filesystem without requiring a database instance. Returns a list of segment suffixes that were visited.
Types ¶
type Cache ¶ added in v0.9.0
type Cache interface {
Get(key EntryKey) Sizable
Put(key EntryKey, value Sizable)
Close()
Requests() uint64
Misses() uint64
Entries() uint64
Size() uint64
}
Cache encapsulates the cache operations.
func NewBypassCache ¶ added in v0.9.0
func NewBypassCache() Cache
NewBypassCache creates a no-op cache implementation.
func NewServiceCache ¶ added in v0.9.0
func NewServiceCache() Cache
NewServiceCache creates a cache for service with default configuration.
func NewServiceCacheWithConfig ¶ added in v0.9.0
func NewServiceCacheWithConfig(config CacheConfig) Cache
NewServiceCacheWithConfig creates a cache for service with custom configuration.
type CacheConfig ¶ added in v0.9.0
type CacheConfig struct {
MaxCacheSize run.Bytes
CleanupInterval time.Duration
IdleTimeout time.Duration
}
CacheConfig holds configuration parameters for the cache.
func DefaultCacheConfig ¶ added in v0.9.0
func DefaultCacheConfig() CacheConfig
DefaultCacheConfig returns the default cache configuration.
type DiskMonitor ¶ added in v0.9.0
type DiskMonitor struct {
// contains filtered or unexported fields
}
DiskMonitor monitors disk usage and orchestrates forced retention cleanup for a service when disk usage exceeds configured watermarks.
func NewDiskMonitor ¶ added in v0.9.0
func NewDiskMonitor(service RetentionService, config RetentionConfig, omr observability.MetricsRegistry) *DiskMonitor
NewDiskMonitor creates a new disk monitor for the given service.
func (*DiskMonitor) Start ¶ added in v0.9.0
func (dm *DiskMonitor) Start()
Start begins monitoring disk usage and starts the forced retention process.
func (*DiskMonitor) Stop ¶ added in v0.9.0
func (dm *DiskMonitor) Stop()
Stop stops the disk monitor gracefully.
type EntryKey ¶ added in v0.9.0
type EntryKey struct {
// contains filtered or unexported fields
}
EntryKey is the key of an entry in the cache.
func NewEntryKey ¶ added in v0.9.0
NewEntryKey creates an entry key with partID and offset.
type FailedPartsHandler ¶ added in v0.9.0
type FailedPartsHandler struct {
// contains filtered or unexported fields
}
FailedPartsHandler handles retry logic and filesystem fallback for failed parts.
func NewFailedPartsHandler ¶ added in v0.9.0
func NewFailedPartsHandler(fileSystem fs.FileSystem, root string, l *logger.Logger, maxTotalSizeBytes uint64) *FailedPartsHandler
NewFailedPartsHandler creates a new handler for failed parts.
func (*FailedPartsHandler) CopyToFailedPartsDir ¶ added in v0.9.0
func (h *FailedPartsHandler) CopyToFailedPartsDir(partID uint64, sourcePath string, destSubDir string) error
CopyToFailedPartsDir copies a part to the failed-parts directory using hard links.
func (*FailedPartsHandler) RetryFailedParts ¶ added in v0.9.0
func (h *FailedPartsHandler) RetryFailedParts( ctx context.Context, failedParts []queue.FailedPart, partsInfo map[uint64][]*PartInfo, syncFunc func(partIDs []uint64) ([]queue.FailedPart, error), ) ([]uint64, error)
RetryFailedParts attempts to retry failed parts with exponential backoff. Returns the list of permanently failed part IDs after all retries are exhausted.
type FieldResult ¶ added in v0.7.0
FieldResult is the result of a field.
type FieldResultList ¶ added in v0.7.0
type FieldResultList []FieldResult
FieldResultList is a list of FieldResult.
type IndexDB ¶
type IndexDB interface {
Insert(docs index.Documents) error
Update(docs index.Documents) error
Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts) (SeriesData, [][]byte, error)
SearchWithoutSeries(ctx context.Context, opts IndexSearchOpts) (sd SeriesData, sortedValues [][]byte, err error)
EnableExternalSegments() (index.ExternalSegmentStreamer, error)
}
IndexDB is the interface of index database.
type IndexGranularity ¶
type IndexGranularity int
IndexGranularity denotes the granularity of the local index.
const ( IndexGranularityBlock IndexGranularity = iota IndexGranularitySeries )
The options of the local index granularity.
type IndexSearchOpts ¶ added in v0.7.0
type IndexSearchOpts struct {
Query index.Query
Order *index.OrderBy
TimeRange *timestamp.TimeRange
Projection []index.FieldKey
PreloadSize int
}
IndexSearchOpts is the options for searching index.
type IntervalRule ¶
type IntervalRule struct {
Unit IntervalUnit
Num int
}
IntervalRule defines a length of two points in time.
func MustToIntervalRule ¶
func MustToIntervalRule(ir *commonv1.IntervalRule) (result IntervalRule)
MustToIntervalRule converts a commonv1.IntervalRule to IntervalRule.
type IntervalUnit ¶
type IntervalUnit int
IntervalUnit denotes the unit of a time point.
const ( HOUR IntervalUnit = iota DAY )
Available IntervalUnits. HOUR and DAY are adequate for the APM scenario.
func (IntervalUnit) Standard ¶ added in v0.9.0
func (iu IntervalUnit) Standard(t time.Time) time.Time
Standard returns a standardized time based on the interval unit.
func (IntervalUnit) String ¶
func (iu IntervalUnit) String() string
type Metrics ¶ added in v0.7.0
type Metrics interface {
// DeleteAll deletes all metrics.
DeleteAll()
}
Metrics is the interface of metrics.
type PartInfo ¶ added in v0.9.0
PartInfo contains information needed to retry or copy a failed part.
type RetentionConfig ¶ added in v0.9.0
type RetentionConfig struct {
// HighWatermark is the disk usage percentage that triggers forced cleanup (0-100)
HighWatermark float64
// LowWatermark is the disk usage percentage where cleanup stops (0-100)
LowWatermark float64
// CheckInterval is how often to check disk usage
CheckInterval time.Duration
// Cooldown is the sleep duration between segment deletions
Cooldown time.Duration
// ForceCleanupEnabled determines whether forced cleanup is enabled
ForceCleanupEnabled bool
}
RetentionConfig holds the configuration for forced retention cleanup.
type RetentionService ¶ added in v0.9.0
type RetentionService interface {
// GetDataPath returns the service's data directory path
GetDataPath() string
// GetSnapshotDir returns the service's snapshot directory path
GetSnapshotDir() string
// LoadAllGroups returns all groups managed by this service
LoadAllGroups() []resourceSchema.Group
// PeekOldestSegmentEndTimeInGroup returns the end time of the oldest segment in the specified group
// Returns zero time and false if no segments exist or group not found
PeekOldestSegmentEndTimeInGroup(group string) (time.Time, bool)
// DeleteOldestSegmentInGroup deletes the oldest segment in the specified group
// Returns true if a segment was deleted, false if no segments to delete
DeleteOldestSegmentInGroup(group string) (bool, error)
// CleanupOldSnapshots removes snapshots older than the specified duration
CleanupOldSnapshots(maxAge time.Duration) error
// GetServiceName returns the service name for metrics and logging
GetServiceName() string
}
RetentionService defines the interface that services must implement to support forced retention cleanup.
type Segment ¶ added in v0.7.0
type Segment[T TSTable, O any] interface { DecRef() GetTimeRange() timestamp.TimeRange CreateTSTableIfNotExist(shardID common.ShardID) (T, error) Tables() ([]T, []Cache) Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error) IndexDB() IndexDB }
Segment is a time range of data.
type SegmentVisitor ¶ added in v0.9.0
type SegmentVisitor interface {
// VisitSeries visits the series index directory for a segment.
VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string, shardIDs []common.ShardID) error
// VisitShard visits a shard directory within a segment.
VisitShard(segmentTR *timestamp.TimeRange, shardID common.ShardID, shardPath string) error
}
SegmentVisitor defines the interface for visiting segment components.
type SeriesData ¶ added in v0.8.0
type SeriesData struct {
SeriesList pbv1.SeriesList
Fields FieldResultList
Timestamps []int64
Versions []int64
}
SeriesData is the result of a series.
type Sizable ¶ added in v0.9.0
type Sizable interface {
Size() uint64
}
Sizable represents an object that can report its memory size.
type TSDB ¶
type TSDB[T TSTable, O any] interface { io.Closer CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error) SelectSegments(timeRange timestamp.TimeRange) ([]Segment[T, O], error) Tick(ts int64) UpdateOptions(opts *commonv1.ResourceOpts) TakeFileSnapshot(dst string) error GetExpiredSegmentsTimeRange() *timestamp.TimeRange DeleteExpiredSegments(segmentSuffixes []string) int64 // PeekOldestSegmentEndTime returns the end time of the oldest segment. // Returns the zero time and false if no segments exist or retention gate cannot be acquired. PeekOldestSegmentEndTime() (time.Time, bool) // DeleteOldestSegment deletes exactly one oldest segment if it exists and meets safety rules. // Returns true if a segment was deleted, false otherwise. DeleteOldestSegment() (bool, error) }
TSDB allows listing and getting shard details.
type TSDBOpts ¶
type TSDBOpts[T TSTable, O any] struct { Option O TableMetrics Metrics TSTableCreator TSTableCreator[T, O] StorageMetricsFactory *observability.Factory Location string SegmentInterval IntervalRule TTL IntervalRule SeriesIndexFlushTimeoutSeconds int64 SeriesIndexCacheMaxBytes int ShardNum uint32 DisableRetention bool SegmentIdleTimeout time.Duration MemoryLimit uint64 }
TSDBOpts wraps options to create a tsdb.