filereader

package
v1.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: AGPL-3.0 Imports: 43 Imported by: 0

Documentation

Overview

Package filereader provides generic file reading for structured data formats with composable readers, merge-sort capabilities, and pluggable data transformation.

Overview

The filereader package provides streaming row-by-row access to various telemetry file formats. All readers implement a common interface and can be composed together for complex data processing patterns like merge-sort operations across multiple files. Data transformation is handled by separate translator components for maximum flexibility.

Core Interfaces

All file format readers return raw data without transformation:

type Row map[string]any

type Batch struct {
    Rows []Row
}

type Reader interface {
    Next() (*Batch, error)        // Returns next batch or io.EOF when exhausted
    Close() error
    TotalRowsReturned() int64     // Total rows successfully returned so far
}

type RowTranslator interface {
    TranslateRow(row *Row) error  // Transforms row in-place
}

Use translators for data processing and TranslatingReader for composition.

Format Readers

All format readers return raw, untransformed data from files:

  • ParquetRawReader: Generic Parquet files using parquet-go/parquet-go (requires io.ReaderAt)
  • JSONLinesReader: Streams JSON objects line-by-line from any io.ReadCloser
  • IngestProtoLogsReader: Raw OTEL log records from protobuf
  • ProtoTracesReader: Raw OTEL span data from protobuf

Example usage:

// For compressed JSON, pass in a gzip reader:
gzReader, err := gzip.NewReader(file)
if err != nil {
    return err
}

reader, err := NewJSONLinesReader(gzReader, 1000)
if err != nil {
    return err
}
defer reader.Close()

for {
    batch, err := reader.Next(ctx)
    if err != nil {
        if errors.Is(err, io.EOF) {
            break
        }
        return err
    }
    if batch != nil {
        // process raw row data in batch.Rows
        for _, row := range batch.Rows {
            // process each row
        }
    }
}

Data Translation

Use translators to transform raw data:

// Create a simple translator that adds tags
translator := NewTagsTranslator(map[string]string{
    "source": "myapp",
    "env": "prod",
})

// Wrap any reader with translation
translatingReader := NewTranslatingReader(rawReader, translator)

// Chain multiple translators
chain := NewChainTranslator(
    NewTagsTranslator(someTags),
    customTranslator,  // Implement your own RowTranslator
)
reader := NewTranslatingReader(rawReader, chain)

Built-in translators:

  • NoopTranslator: Pass-through (no transformation)
  • TagsTranslator: Adds static tags to rows
  • ChainTranslator: Applies multiple translators in sequence

Implement custom translators by satisfying the RowTranslator interface.

Composite Readers

Sorting Readers

Choose the appropriate sorting reader based on dataset size and memory constraints:

MemorySortingReader - For smaller datasets (high memory usage, no disk I/O):

reader := NewMemorySortingReader(rawReader, &LogSortKeyProvider{})

DiskSortingReader - For larger datasets (moderate memory usage, 2x disk I/O):

reader := NewDiskSortingReader(rawReader, &LogSortKeyProvider{})

MergesortReader - For merging multiple already-sorted sources (low memory, streaming):

keyProvider := NewTimeOrderedSortKeyProvider("timestamp")
reader := NewMergesortReader([]Reader{r1, r2, r3}, keyProvider)

SequentialReader - Sequential processing (no sorting):

reader := NewSequentialReader([]Reader{r1, r2, r3})

Usage Patterns

Time-ordered merge sort across multiple files:

readers := []Reader{
    NewParquetRawReader(file1, size1),
    NewParquetRawReader(file2, size2),
    NewJSONLinesReader(file3),
}

keyProvider := NewTimeOrderedSortKeyProvider("chq_timestamp")
ordered := NewMergesortReader(readers, keyProvider)
defer ordered.Close()

for {
    batch, err := ordered.Next(ctx)
    if err != nil {
        if errors.Is(err, io.EOF) {
            break
        }
        return err
    }
    if batch != nil {
        // rows arrive in timestamp order across all files
        for _, row := range batch.Rows {
            // process each row
        }
    }
}

Composable reader trees:

// Process multiple file groups in timestamp order,
// then combine groups sequentially
keyProvider := NewTimeOrderedSortKeyProvider("timestamp")
group1 := NewMergesortReader(readers1, keyProvider)
group2 := NewMergesortReader(readers2, keyProvider)
final := NewSequentialReader([]Reader{group1, group2})

Memory Management & Batch Ownership

The filereader package implements efficient memory management through batch ownership:

**Batch Ownership**: Readers own the returned Batch and its Row maps. Callers must NOT retain references to batches beyond the next Next() call.

**Memory Safety**: Use pipeline.CopyBatch() if you need to retain batch data:

for {
    batch, err := reader.Next(ctx)
    if err != nil {
        if errors.Is(err, io.EOF) {
            break
        }
        return err
    }
    if batch != nil {
        // Use data immediately or copy if retention needed
        safeBatch := pipeline.CopyBatch(batch)  // For retention
        // Process batch.Rows directly for immediate use
    }
}

**Data Safety**: Readers maintain clean batch states and handle EOF correctly. Batches must not be accessed after the next Next() call.

**Error Handling**: Next() returns nil batch on errors. Check error before accessing batch.

Resource Management

  • All readers must be closed via Close()
  • Parquet readers use random access (io.ReaderAt) - no buffering
  • Streaming readers (JSON, Proto) process incrementally
  • Composite readers automatically close child readers

Package filereader provides a generic interface for reading rows from various file formats. Callers construct readers directly and compose them as needed for their specific use cases.

Index

Constants

This section is empty.

Variables

View Source
var ErrRowNormalization = errors.New("row normalization failed")

ErrRowNormalization is a sentinel error indicating row normalization failed. Use errors.Is(err, ErrRowNormalization) to check for this error. Use errors.As(err, &RowNormalizationError{}) to extract details.

Functions

This section is empty.

Types

type Batch added in v1.3.0

type Batch = pipeline.Batch

Batch represents a collection of rows with clear ownership semantics. The batch is owned by the reader that returns it.

type CSVLogTranslator added in v1.4.2

type CSVLogTranslator struct {
	// contains filtered or unexported fields
}

CSVLogTranslator handles translation for CSV log files

func NewCSVLogTranslator added in v1.4.2

func NewCSVLogTranslator(opts ReaderOptions) *CSVLogTranslator

NewCSVLogTranslator creates a new CSV log translator

func (*CSVLogTranslator) TranslateRow added in v1.4.2

func (t *CSVLogTranslator) TranslateRow(ctx context.Context, row *pipeline.Row) error

TranslateRow handles CSV-specific field translation for logs

type CSVReader added in v1.4.2

type CSVReader struct {
	// contains filtered or unexported fields
}

CSVReader reads rows from a CSV stream using pipeline semantics.

func NewCSVReader added in v1.4.2

func NewCSVReader(reader io.ReadCloser, batchSize int) (*CSVReader, error)

NewCSVReader creates a new CSVReader for the given io.ReadCloser. The reader takes ownership of the closer and will close it when Close is called. If the reader is seekable (implements io.Seeker), the file will be scanned once to infer column types before being reset for actual reading.

func (*CSVReader) Close added in v1.4.2

func (r *CSVReader) Close() error

Close closes the reader and the underlying io.ReadCloser.

func (*CSVReader) GetSchema added in v1.5.0

func (r *CSVReader) GetSchema() *ReaderSchema

GetSchema returns the inferred schema from scanning the file. Returns empty schema if headers haven't been read or schema inference failed.

func (*CSVReader) Next added in v1.4.2

func (r *CSVReader) Next(ctx context.Context) (*Batch, error)

func (*CSVReader) TotalRowsReturned added in v1.4.2

func (r *CSVReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows that have been successfully returned via Next().

type ChainTranslator

type ChainTranslator struct {
	// contains filtered or unexported fields
}

ChainTranslator applies multiple translators in sequence.

func NewChainTranslator

func NewChainTranslator(translators ...RowTranslator) *ChainTranslator

NewChainTranslator creates a translator that applies multiple translators in order.

func (*ChainTranslator) TranslateRow

func (ct *ChainTranslator) TranslateRow(ctx context.Context, row *pipeline.Row) error

TranslateRow applies all translators in sequence to the row.

type ColumnSchema added in v1.5.0

type ColumnSchema struct {
	Name       wkk.RowKey
	DataType   DataType
	HasNonNull bool
}

ColumnSchema describes a single column in the schema.

type DataType added in v1.5.0

type DataType int

DataType represents the type of data in a column.

const (
	DataTypeUnknown DataType = iota // Unknown/uninitialized type - should not be used
	DataTypeString
	DataTypeInt64
	DataTypeFloat64
	DataTypeBool
	DataTypeBytes
	DataTypeAny // For complex types (list, struct, map) that are passed through as-is
)

func InferTypeFromString added in v1.5.0

func InferTypeFromString(s string) (DataType, any)

InferTypeFromString attempts to parse a string and determine its type. Returns the inferred DataType and the parsed value. This is useful for CSV readers where all values start as strings.

func InferTypeFromValue added in v1.5.0

func InferTypeFromValue(value any) DataType

InferTypeFromValue determines the DataType from a Go value. This handles values from JSON unmarshaling or CSV parsing.

func (DataType) String added in v1.5.0

func (dt DataType) String() string

type DiskSortingReader added in v1.3.0

type DiskSortingReader struct {
	// contains filtered or unexported fields
}

Memory Impact: LOW-MODERATE - Only stores extracted sort keys in memory plus file offsets.

Much more memory-efficient than MemorySortingReader for large datasets.

Disk I/O: 2x data size - Each row written once to temp binary file, then read once during output Stability: Records are only guaranteed to be sorted by the sort function;

if the sort function is not stable, the result will not be stable

func NewDiskSortingReader added in v1.3.0

func NewDiskSortingReader(reader Reader, keyProvider SortKeyProvider, batchSize int) (*DiskSortingReader, error)

NewDiskSortingReader creates a reader that uses disk-based sorting with custom binary encoding.

Use this for large datasets that may not fit in memory. The temp file is automatically cleaned up when the reader is closed. Custom binary encoding provides efficient storage and serialization for the temporary data with no reflection overhead.

The keyProvider creates sort keys from rows to minimize memory usage during sorting.

func (*DiskSortingReader) Close added in v1.3.0

func (r *DiskSortingReader) Close() error

Close closes the reader and cleans up temp file.

func (*DiskSortingReader) GetOTELMetrics added in v1.3.0

func (r *DiskSortingReader) GetOTELMetrics() (any, error)

GetOTELMetrics implements the OTELMetricsProvider interface if the underlying reader supports it.

func (*DiskSortingReader) GetSchema added in v1.5.0

func (r *DiskSortingReader) GetSchema() *ReaderSchema

GetSchema delegates to the wrapped reader.

func (*DiskSortingReader) Next added in v1.3.0

func (r *DiskSortingReader) Next(ctx context.Context) (*Batch, error)

Next returns the next batch of sorted rows by reading from the temp file in index order.

func (*DiskSortingReader) TotalRowsReturned added in v1.3.0

func (r *DiskSortingReader) TotalRowsReturned() int64

TotalRowsReturned returns the number of rows that have been returned via Next().

type IngestLogParquetReader added in v1.5.0

type IngestLogParquetReader struct {
	// contains filtered or unexported fields
}

IngestLogParquetReader reads parquet files using Apache Arrow for log ingestion. This reader performs two-pass reading: 1. First pass: Scan the file to extract schema with flattening 2. Second pass: Read rows using the extracted schema with flattening applied

It handles NULL-type columns gracefully and flattens nested structures (maps, structs, lists).

func NewIngestLogParquetReader added in v1.5.0

func NewIngestLogParquetReader(ctx context.Context, reader parquet.ReaderAtSeeker, batchSize int) (*IngestLogParquetReader, error)

NewIngestLogParquetReader creates an IngestLogParquetReader for the given parquet.ReaderAtSeeker. It performs a first pass to extract the schema with flattening, then prepares for reading rows.

func (*IngestLogParquetReader) Close added in v1.5.0

func (r *IngestLogParquetReader) Close() error

Close releases resources associated with the reader.

func (*IngestLogParquetReader) GetSchema added in v1.5.0

func (r *IngestLogParquetReader) GetSchema() *ReaderSchema

GetSchema returns a copy of the schema extracted from the Arrow metadata with flattening. Returns a copy to prevent external mutation of the internal schema.

func (*IngestLogParquetReader) Next added in v1.5.0

Next returns the next batch of rows from the parquet file with flattening applied.

func (*IngestLogParquetReader) TotalRowsReturned added in v1.5.0

func (r *IngestLogParquetReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows successfully returned.

type IngestProtoLogsReader added in v1.3.0

type IngestProtoLogsReader struct {
	// contains filtered or unexported fields
}

IngestProtoLogsReader reads rows from OpenTelemetry protobuf logs format.

Implements OTELLogsProvider interface.

func NewIngestProtoLogsReader added in v1.3.0

func NewIngestProtoLogsReader(reader io.Reader, opts ReaderOptions) (*IngestProtoLogsReader, error)

NewIngestProtoLogsReader creates a new IngestProtoLogsReader for the given io.Reader.

func (*IngestProtoLogsReader) Close added in v1.3.0

func (r *IngestProtoLogsReader) Close() error

Close closes the reader and releases resources.

func (*IngestProtoLogsReader) GetSchema added in v1.5.0

func (r *IngestProtoLogsReader) GetSchema() *ReaderSchema

GetSchema returns the schema extracted from the OTEL logs.

func (*IngestProtoLogsReader) Next added in v1.3.0

func (r *IngestProtoLogsReader) Next(ctx context.Context) (*Batch, error)

Next returns the next batch of rows from the OTEL logs.

func (*IngestProtoLogsReader) TotalRowsReturned added in v1.3.0

func (r *IngestProtoLogsReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows that have been successfully returned via Next().

type IngestProtoTracesReader added in v1.4.5

type IngestProtoTracesReader struct {
	// contains filtered or unexported fields
}

IngestProtoTracesReader reads rows from OpenTelemetry protobuf traces format. Returns raw OTEL trace data without signal-specific transformations.

func NewIngestProtoTracesReader added in v1.4.0

func NewIngestProtoTracesReader(reader io.Reader, opts ReaderOptions) (*IngestProtoTracesReader, error)

NewIngestProtoTracesReader creates a new ProtoTracesReader for ingestion with exemplar processing.

func NewProtoTracesReader

func NewProtoTracesReader(reader io.Reader, batchSize int) (*IngestProtoTracesReader, error)

NewProtoTracesReader creates a new ProtoTracesReader for the given io.Reader. The caller is responsible for closing the underlying reader.

func (*IngestProtoTracesReader) Close added in v1.4.5

func (r *IngestProtoTracesReader) Close() error

Close closes the reader and releases resources.

func (*IngestProtoTracesReader) GetSchema added in v1.5.0

func (r *IngestProtoTracesReader) GetSchema() *ReaderSchema

GetSchema returns the schema extracted from the OTEL traces.

func (*IngestProtoTracesReader) Next added in v1.4.5

Next returns the next batch of rows from the OTEL traces.

func (*IngestProtoTracesReader) TotalRowsReturned added in v1.4.5

func (r *IngestProtoTracesReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows that have been successfully returned via Next().

type JSONLinesReader

type JSONLinesReader struct {
	// contains filtered or unexported fields
}

JSONLinesReader reads rows from a JSON lines stream using pipeline semantics.

func NewJSONLinesReader

func NewJSONLinesReader(reader io.ReadCloser, batchSize int) (*JSONLinesReader, error)

NewJSONLinesReader creates a new JSONLinesReader for the given io.ReadCloser. The reader takes ownership of the closer and will close it when Close is called. If the reader is seekable (implements io.Seeker), the file will be scanned once to infer schema before being reset for actual reading.

func (*JSONLinesReader) Close

func (r *JSONLinesReader) Close() error

Close closes the reader and the underlying io.ReadCloser.

func (*JSONLinesReader) GetSchema added in v1.5.0

func (r *JSONLinesReader) GetSchema() *ReaderSchema

GetSchema returns the inferred schema from scanning the file. Returns empty schema if file was not seekable or schema inference failed.

func (*JSONLinesReader) Next added in v1.3.0

func (r *JSONLinesReader) Next(ctx context.Context) (*Batch, error)

func (*JSONLinesReader) TotalRowsReturned added in v1.3.0

func (r *JSONLinesReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows that have been successfully returned via Next().

type LogSortKey added in v1.6.2

type LogSortKey struct {
	ServiceIdentifier string
	TimestampNs       int64
	ServiceOk         bool
	TsOk              bool
}

LogSortKey represents the sort key for logs: [service_identifier, timestamp_ns] where service_identifier is resource_customer_domain if set, otherwise resource_service_name Timestamp is in nanoseconds for ordering precision.

func (*LogSortKey) Compare added in v1.6.2

func (k *LogSortKey) Compare(other SortKey) int

Compare implements SortKey interface for LogSortKey

func (*LogSortKey) Release added in v1.6.2

func (k *LogSortKey) Release()

Release returns the LogSortKey to the pool for reuse

type LogSortKeyProvider added in v1.6.2

type LogSortKeyProvider struct {
	StreamField string // Optional: specific field to use for stream identification
}

LogSortKeyProvider creates LogSortKey instances from rows. If StreamField is set, that field is used for the service identifier. Otherwise, falls back to resource_customer_domain then resource_service_name.

func NewLogSortKeyProvider added in v1.7.0

func NewLogSortKeyProvider(streamField string) *LogSortKeyProvider

func (*LogSortKeyProvider) MakeKey added in v1.6.2

func (p *LogSortKeyProvider) MakeKey(row pipeline.Row) SortKey

MakeKey implements SortKeyProvider interface for logs

type MemorySortingReader added in v1.3.0

type MemorySortingReader struct {
	// contains filtered or unexported fields
}

MemorySortingReader reads all rows from an underlying reader, then sorts them using a custom sort function and returns them in order. This is useful when you need sorted output with flexible sorting criteria.

Memory Impact: HIGH - All rows are loaded into memory at once Disk I/O: None (pure in-memory operations) Stability: Records are only guaranteed to be sorted by the sort function;

if the sort function is not stable, the result will not be stable

func NewMemorySortingReader added in v1.3.0

func NewMemorySortingReader(reader Reader, keyProvider SortKeyProvider, batchSize int) (*MemorySortingReader, error)

NewMemorySortingReader creates a reader that buffers all rows, sorts them using the provided key provider, then returns them in order.

Use this for smaller datasets that fit comfortably in memory. For large datasets, consider DiskSortingReader to avoid OOM issues.

func (*MemorySortingReader) Close added in v1.3.0

func (r *MemorySortingReader) Close() error

Close closes the reader and underlying reader.

func (*MemorySortingReader) GetOTELMetrics added in v1.3.0

func (r *MemorySortingReader) GetOTELMetrics() (any, error)

GetOTELMetrics implements the OTELMetricsProvider interface if the underlying reader supports it.

func (*MemorySortingReader) GetSchema added in v1.5.0

func (r *MemorySortingReader) GetSchema() *ReaderSchema

GetSchema delegates to the wrapped reader.

func (*MemorySortingReader) Next added in v1.3.0

func (r *MemorySortingReader) Next(ctx context.Context) (*Batch, error)

Next returns the next batch of sorted rows from the buffer.

func (*MemorySortingReader) TotalRowsReturned added in v1.3.0

func (r *MemorySortingReader) TotalRowsReturned() int64

TotalRowsReturned returns the number of rows that have been returned via Next().

type MergesortReader added in v1.3.0

type MergesortReader struct {
	// contains filtered or unexported fields
}

MergesortReader implements merge-sort style reading across multiple pre-sorted readers. It assumes each individual reader returns rows in sorted order according to the provided SortKeyProvider.

func NewMergesortReader added in v1.3.0

func NewMergesortReader(ctx context.Context, readers []Reader, keyProvider SortKeyProvider, batchSize int) (*MergesortReader, error)

NewMergesortReader creates a new MergesortReader that merges rows from multiple readers in sorted order using the new algorithm with active reader management.

func (*MergesortReader) ActiveReaderCount added in v1.3.0

func (or *MergesortReader) ActiveReaderCount() int

ActiveReaderCount returns the number of readers that still have data available.

func (*MergesortReader) Close added in v1.3.0

func (or *MergesortReader) Close() error

Close closes all underlying readers and releases resources.

func (*MergesortReader) GetSchema added in v1.5.0

func (or *MergesortReader) GetSchema() *ReaderSchema

GetSchema returns a copy of the merged schema from all child readers. Returns a copy to prevent external mutation of the internal schema.

func (*MergesortReader) Next added in v1.3.0

func (or *MergesortReader) Next(ctx context.Context) (*Batch, error)

Next returns the next batch of rows in sorted order across all readers.

func (*MergesortReader) TotalRowsReturned added in v1.3.0

func (or *MergesortReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows returned by this reader.

type NoopTranslator

type NoopTranslator struct{}

NoopTranslator returns rows unchanged for high performance. This is a code example of the most efficient translator implementation.

func NewNoopTranslator

func NewNoopTranslator() *NoopTranslator

NewNoopTranslator creates a translator that passes rows through unchanged.

func (*NoopTranslator) TranslateRow

func (nt *NoopTranslator) TranslateRow(ctx context.Context, row *pipeline.Row) error

TranslateRow does nothing for maximum performance.

type ParquetRawReader added in v1.3.0

type ParquetRawReader struct {
	// contains filtered or unexported fields
}

ParquetRawReader reads rows from a generic Parquet stream. This reader provides raw parquet data without any opinionated transformations. Use wrapper readers like CookedMetricTranslatingReader for domain-specific logic.

func NewParquetRawReader added in v1.3.0

func NewParquetRawReader(reader io.ReaderAt, size int64, batchSize int) (*ParquetRawReader, error)

NewParquetRawReader creates a new ParquetRawReader for the given io.ReaderAt.

func (*ParquetRawReader) Close added in v1.3.0

func (r *ParquetRawReader) Close() error

Close closes the reader and releases resources.

func (*ParquetRawReader) GetSchema added in v1.5.0

func (r *ParquetRawReader) GetSchema() *ReaderSchema

GetSchema returns the schema extracted from the parquet metadata.

func (*ParquetRawReader) Next added in v1.3.0

func (r *ParquetRawReader) Next(ctx context.Context) (*Batch, error)

Next returns the next batch of rows from the parquet file.

func (*ParquetRawReader) TotalRowsReturned added in v1.3.0

func (r *ParquetRawReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows that have been successfully returned via Next().

type PrefixedRowKeyCache added in v1.5.0

type PrefixedRowKeyCache struct {
	// contains filtered or unexported fields
}

PrefixedRowKeyCache caches prefixed RowKeys to avoid repeated string building and interning. Each unique attribute name is transformed once and reused.

Thread-safety: This cache is NOT thread-safe. It's designed for single-threaded use within a reader that processes one file at a time.

func NewPrefixedRowKeyCache added in v1.5.0

func NewPrefixedRowKeyCache(prefix string) *PrefixedRowKeyCache

NewPrefixedRowKeyCache creates a new RowKey cache with the given prefix. The prefix will be prepended to all attribute names (e.g., "resource", "scope", "attr").

func (*PrefixedRowKeyCache) Clear added in v1.5.0

func (c *PrefixedRowKeyCache) Clear()

Clear removes all cached entries. Useful for resetting between files.

func (*PrefixedRowKeyCache) Get added in v1.5.0

func (c *PrefixedRowKeyCache) Get(name string) wkk.RowKey

Get returns the cached RowKey for the given attribute name, or computes and caches it if not already present. The returned RowKey has the prefix applied and dots replaced with underscores (e.g., "service.name" with prefix "resource" → "resource_service_name").

func (*PrefixedRowKeyCache) Len added in v1.5.0

func (c *PrefixedRowKeyCache) Len() int

Len returns the number of cached RowKeys.

type ProtoBinLogTranslator added in v1.3.0

type ProtoBinLogTranslator struct {
	// contains filtered or unexported fields
}

ProtoBinLogTranslator handles translation for protobuf binary log files

func NewProtoBinLogTranslator added in v1.3.0

func NewProtoBinLogTranslator(opts ReaderOptions) *ProtoBinLogTranslator

NewProtoBinLogTranslator creates a new protobuf log translator

func (*ProtoBinLogTranslator) TranslateRow added in v1.3.0

func (t *ProtoBinLogTranslator) TranslateRow(ctx context.Context, row *pipeline.Row) error

TranslateRow handles protobuf-specific field translation

type Reader

type Reader interface {
	// Next returns the next batch of rows, or io.EOF when exhausted.
	// The returned batch is owned by the reader and must not be retained
	// beyond the next Next() call. Use pipeline.CopyBatch() if you need to retain.
	// The context can be used for cancellation, deadlines, and updating metrics.
	Next(ctx context.Context) (*Batch, error)

	// Close releases any resources held by the reader.
	Close() error

	// TotalRowsReturned returns the total number of rows that have been successfully
	// returned via Next() calls from this reader so far.
	TotalRowsReturned() int64

	// GetSchema returns the ReaderSchema extracted from the content.  Must not return nil, and
	// must include all columns that may be returned by Next() with the types in the schema.
	GetSchema() *ReaderSchema
}

Reader is the core interface for reading rows from any file format using pipeline semantics. This eliminates memory ownership issues by establishing clear ownership: batches are owned by the reader and must not be retained beyond the next Next() call.

func ReaderForFile added in v1.3.0

func ReaderForFile(filename string, signalType SignalType, orgId string) (Reader, error)

ReaderForFile creates a Reader for the given file based on its extension and signal type. This is a convenience function that uses default options.

func ReaderForFileWithOptions added in v1.3.0

func ReaderForFileWithOptions(filename string, opts ReaderOptions) (Reader, error)

ReaderForFileWithOptions creates a Reader for the given file with the provided options. Supported file formats:

  • .parquet: Creates a ParquetRawReader (works for all signal types)
  • .json.gz: Creates a JSONLinesReader with gzip decompression (works for all signal types)
  • .json: Creates a JSONLinesReader (works for all signal types)
  • .csv: Creates a CSVReader (works for all signal types)
  • .csv.gz: Creates a CSVReader with gzip decompression (works for all signal types)
  • .binpb: Creates a signal-specific proto reader (NewIngestProtoLogsReader or NewProtoTracesReader)
  • .binpb.gz: Creates a signal-specific proto reader with gzip decompression

type ReaderOptions added in v1.3.0

type ReaderOptions struct {
	SignalType SignalType
	BatchSize  int // Batch size for readers (default: 1000)
	// Translation options for protobuf logs and metrics
	OrgID    string
	Bucket   string
	ObjectID string
	// Aggregation options for metrics
	EnableAggregation   bool  // Enable streaming aggregation
	AggregationPeriodMs int64 // Aggregation period in milliseconds (e.g., 10000 for 10s)
}

ReaderOptions provides options for creating readers.

type ReaderSchema added in v1.5.0

type ReaderSchema struct {
	// contains filtered or unexported fields
}

ReaderSchema represents the complete schema for a reader.

func ExtractCompleteParquetSchema added in v1.5.0

func ExtractCompleteParquetSchema(ctx context.Context, pf *file.Reader, fr *pqarrow.FileReader) (*ReaderSchema, error)

ExtractCompleteParquetSchema performs a full two-pass scan of a parquet file to discover all columns, including dynamic map keys and nested structures.

This function: 1. Reads parquet metadata to get column types (physical types from parquet schema) 2. Reads Arrow schema to get structure (for nested types) 3. Scans ALL rows in the file to discover all map keys and track actual data types 4. Returns a complete ReaderSchema with all flattened column paths and proper types

For simple types (int32, int64, string, etc): use parquet physical type For maps: discover keys by scanning data, track value types For nested structs: recursively flatten all paths For arrays/lists: treat as-is (not flattened)

func NewReaderSchema added in v1.5.0

func NewReaderSchema() *ReaderSchema

NewReaderSchema creates a new empty schema.

func (*ReaderSchema) AddColumn added in v1.5.0

func (s *ReaderSchema) AddColumn(newName, originalName wkk.RowKey, dataType DataType, hasNonNull bool)

AddColumn adds or updates a column in the schema with name mapping. The column is stored under the new (normalized) name. The mapping tracks the relationship between new and original names. For identity mappings (where new == original), pass the same name for both parameters.

func (*ReaderSchema) ColumnCount added in v1.9.0

func (s *ReaderSchema) ColumnCount() int

ColumnCount returns the number of columns in the schema without allocating.

func (*ReaderSchema) Columns added in v1.5.0

func (s *ReaderSchema) Columns() []*ColumnSchema

Columns returns all column schemas.

func (*ReaderSchema) Copy added in v1.5.0

func (s *ReaderSchema) Copy() *ReaderSchema

Copy returns a deep copy of the schema. This is important because callers may mutate the returned schema.

func (*ReaderSchema) GetAllOriginalNames added in v1.5.0

func (s *ReaderSchema) GetAllOriginalNames() map[wkk.RowKey]wkk.RowKey

GetAllOriginalNames returns a map of all new names to original names.

func (*ReaderSchema) GetColumnType added in v1.5.0

func (s *ReaderSchema) GetColumnType(name string) DataType

GetColumnType returns the data type for a column name.

func (*ReaderSchema) GetOriginalName added in v1.5.0

func (s *ReaderSchema) GetOriginalName(newName wkk.RowKey) wkk.RowKey

GetOriginalName returns the original name for a column, or the same name if no mapping exists.

func (*ReaderSchema) HasColumn added in v1.5.0

func (s *ReaderSchema) HasColumn(name string) bool

HasColumn returns true if the schema has the specified column.

type RowIndex added in v1.3.0

type RowIndex struct {
	SortKey    SortKey // Extracted sort key for sorting
	FileOffset int64
	ByteLength int32
}

RowIndex represents a lightweight pointer to a binary-encoded row in the temp file. It stores only the extracted sort key plus file location info.

type RowNormalizationError added in v1.5.0

type RowNormalizationError struct {
	// Column is the name of the column that caused the error
	Column string
	// Reason describes what went wrong
	Reason string
	// Err is the underlying error if any
	Err error
}

RowNormalizationError represents an error that occurred while normalizing a single row. These errors are row-specific and indicate data quality issues rather than systemic failures.

func (*RowNormalizationError) Error added in v1.5.0

func (e *RowNormalizationError) Error() string

func (*RowNormalizationError) Is added in v1.5.0

func (e *RowNormalizationError) Is(target error) bool

func (*RowNormalizationError) Unwrap added in v1.5.0

func (e *RowNormalizationError) Unwrap() error

type RowTranslator

type RowTranslator interface {
	// TranslateRow transforms a row in-place by modifying the provided row pointer.
	TranslateRow(ctx context.Context, row *pipeline.Row) error
}

RowTranslator transforms rows from one format to another.

type SchemaBuilder added in v1.5.0

type SchemaBuilder struct {
	// contains filtered or unexported fields
}

SchemaBuilder helps build schemas by scanning data and tracking type promotions.

func NewSchemaBuilder added in v1.5.0

func NewSchemaBuilder() *SchemaBuilder

NewSchemaBuilder creates a new schema builder.

func (*SchemaBuilder) AddStringValue added in v1.5.0

func (sb *SchemaBuilder) AddStringValue(columnName string, stringValue string)

AddStringValue parses a string value, infers its type, and adds to schema.

func (*SchemaBuilder) AddValue added in v1.5.0

func (sb *SchemaBuilder) AddValue(columnName string, value any)

AddValue adds a value to the schema, inferring its type and promoting if needed.

func (*SchemaBuilder) Build added in v1.5.0

func (sb *SchemaBuilder) Build() *ReaderSchema

Build returns the built schema.

type SequentialReader added in v1.3.0

type SequentialReader struct {
	// contains filtered or unexported fields
}

SequentialReader reads from multiple readers sequentially in the order provided. It reads all rows from the first reader, then all rows from the second reader, etc. This is useful when you want to concatenate multiple files without any ordering requirements.

func NewSequentialReader added in v1.3.0

func NewSequentialReader(readers []Reader, batchSize int) (*SequentialReader, error)

NewSequentialReader creates a new SequentialReader that reads from the provided readers sequentially. Readers will be closed when the SequentialReader is closed.

func (*SequentialReader) Close added in v1.3.0

func (sr *SequentialReader) Close() error

Close closes all underlying readers and releases resources.

func (*SequentialReader) CurrentReaderIndex added in v1.3.0

func (sr *SequentialReader) CurrentReaderIndex() int

CurrentReaderIndex returns the index of the reader currently being read from. Returns -1 if all readers are exhausted or the reader is closed.

func (*SequentialReader) GetSchema added in v1.5.0

func (sr *SequentialReader) GetSchema() *ReaderSchema

GetSchema merges schemas from all child readers using the same type promotion rules as MergesortReader.

func (*SequentialReader) Next added in v1.3.0

func (sr *SequentialReader) Next(ctx context.Context) (*Batch, error)

func (*SequentialReader) RemainingReaderCount added in v1.3.0

func (sr *SequentialReader) RemainingReaderCount() int

RemainingReaderCount returns the number of readers that haven't been fully processed yet.

func (*SequentialReader) TotalReaderCount added in v1.3.0

func (sr *SequentialReader) TotalReaderCount() int

TotalReaderCount returns the total number of readers in this SequentialReader.

func (*SequentialReader) TotalRowsReturned added in v1.3.0

func (sr *SequentialReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows that have been successfully returned via Next() from all readers.

type SignalType added in v1.3.0

type SignalType int

SignalType represents the type of telemetry signal being processed.

const (
	// SignalTypeLogs represents log data
	SignalTypeLogs SignalType = iota
	// SignalTypeTraces represents trace data
	SignalTypeTraces
)

func (SignalType) String added in v1.3.0

func (s SignalType) String() string

String returns the string representation of the signal type.

type SortKey added in v1.3.0

type SortKey interface {
	// Compare returns:
	// -1 if this key should come before other
	//  0 if this key equals other
	//  1 if this key should come after other
	Compare(other SortKey) int

	// Release returns the key to its pool for reuse
	Release()
}

SortKey represents a key that can be compared for sorting

type SortKeyProvider added in v1.3.0

type SortKeyProvider interface {
	// MakeKey creates a sort key from a row
	MakeKey(row pipeline.Row) SortKey
}

SortKeyProvider creates sort keys from rows

type TagsTranslator

type TagsTranslator struct {
	// contains filtered or unexported fields
}

TagsTranslator adds static tags to every row.

func NewTagsTranslator

func NewTagsTranslator(tags map[string]string) *TagsTranslator

NewTagsTranslator creates a translator that adds the given tags to each row.

func (*TagsTranslator) TranslateRow

func (tt *TagsTranslator) TranslateRow(ctx context.Context, row *pipeline.Row) error

TranslateRow adds tags to the row in-place.

type TimeOrderedSortKey added in v1.3.0

type TimeOrderedSortKey struct {
	// contains filtered or unexported fields
}

TimeOrderedSortKey represents a sort key for timestamp-based ordering

func (*TimeOrderedSortKey) Compare added in v1.3.0

func (k *TimeOrderedSortKey) Compare(other SortKey) int

func (*TimeOrderedSortKey) Release added in v1.3.0

func (k *TimeOrderedSortKey) Release()

type TimeOrderedSortKeyProvider added in v1.3.0

type TimeOrderedSortKeyProvider struct {
	// contains filtered or unexported fields
}

TimeOrderedSortKeyProvider creates sort keys based on a single timestamp field

func NewTimeOrderedSortKeyProvider added in v1.3.0

func NewTimeOrderedSortKeyProvider(fieldName string) *TimeOrderedSortKeyProvider

NewTimeOrderedSortKeyProvider creates a provider that sorts by a timestamp field

func (*TimeOrderedSortKeyProvider) MakeKey added in v1.3.0

type TranslatingReader

type TranslatingReader struct {
	// contains filtered or unexported fields
}

TranslatingReader wraps another Reader and applies row transformations. This enables composition where any Reader can be enhanced with signal-specific translation logic without coupling file parsing to data transformation.

func NewTranslatingReader

func NewTranslatingReader(reader Reader, translator RowTranslator, batchSize int) (*TranslatingReader, error)

NewTranslatingReader creates a new TranslatingReader that applies the given translator to each row returned by the underlying reader.

The TranslatingReader takes ownership of the underlying reader and will close it when Close() is called.

func (*TranslatingReader) Close

func (tr *TranslatingReader) Close() error

Close closes the underlying reader and releases resources.

func (*TranslatingReader) GetSchema added in v1.5.0

func (tr *TranslatingReader) GetSchema() *ReaderSchema

GetSchema delegates to the wrapped reader.

func (*TranslatingReader) Next added in v1.3.0

func (tr *TranslatingReader) Next(ctx context.Context) (*Batch, error)

Next returns the next batch of translated rows from the underlying reader.

func (*TranslatingReader) TotalRowsReturned added in v1.3.0

func (tr *TranslatingReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows that have been successfully returned via Next() after translation by this reader.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL