record

package
v0.0.0-...-c7e0420 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2024 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MessageOrderAsc  messageOrder = 0
	MessageOrderDesc messageOrder = 1
)

Variables

This section is empty.

Functions

func CastAnyToDynamicMessage

func CastAnyToDynamicMessage(a *anypb.Any) *dynamicpb.Message

func CastDynamicMessageToAny

func CastDynamicMessageToAny(m *dynamicpb.Message) *anypb.Any

func CastMessageToAny

func CastMessageToAny(m proto.Message) *anypb.Any

func CastProtoReflectMessageToAny

func CastProtoReflectMessageToAny(m protoreflect.Message) *anypb.Any

func GenerateMessageID

func GenerateMessageID() uint64

Types

type Channel

type Channel struct {
	SectionHeader *SectionHeader
	Channel       *pb.Channel
}

func NewChannelFromReader

func NewChannelFromReader(r io.Reader) (*Channel, error)

func (Channel) WriteTo

func (c Channel) WriteTo(w io.Writer) (int64, error)

type ChunkBody

type ChunkBody struct {
	SectionHeader *SectionHeader
	ChunkBody     *pb.ChunkBody
}

func NewChunkBodyFromReader

func NewChunkBodyFromReader(r io.Reader) (*ChunkBody, error)

func (ChunkBody) WriteTo

func (c ChunkBody) WriteTo(w io.Writer) (int64, error)

type ChunkHeader

type ChunkHeader struct {
	SectionHeader *SectionHeader
	ChunkHeader   *pb.ChunkHeader
}

func NewChunkHeaderFromReader

func NewChunkHeaderFromReader(r io.Reader) (*ChunkHeader, error)

func (ChunkHeader) WriteTo

func (c ChunkHeader) WriteTo(w io.Writer) (int64, error)

type CopyPair

type CopyPair struct {
	SrcTopicName string
	DstTopicName string
}

type CreateMessageIndexOption

type CreateMessageIndexOption func(*CreateMessageIndexOptions)

func CreateMessageIndexWithBegin

func CreateMessageIndexWithBegin(t time.Time) CreateMessageIndexOption

func CreateMessageIndexWithEnd

func CreateMessageIndexWithEnd(t time.Time) CreateMessageIndexOption

func CreateMessageIndexWithTopic

func CreateMessageIndexWithTopic(topics ...string) CreateMessageIndexOption

type CreateMessageIndexOptions

type CreateMessageIndexOptions struct {
	Topics []string
	Begin  *time.Time
	End    *time.Time
}
type Header struct {
	SectionHeader *SectionHeader
	Header        *pb.Header
}

func NewHeaderFromReader

func NewHeaderFromReader(r io.Reader) (*Header, error)

func (Header) WriteTo

func (h Header) WriteTo(w io.Writer) (int64, error)

type Index

type Index struct {
	SectionHeader *SectionHeader
	IndexSection  *pb.Index
}

func NewIndexFromReader

func NewIndexFromReader(r io.ReadSeeker, header *Header) (*Index, error)

func (Index) WriteTo

func (i Index) WriteTo(w io.Writer) (int64, error)

type ListMessagesOption

type ListMessagesOption func(*ListMessagesOptions)

func ListMessagesOrderBy

func ListMessagesOrderBy(order messageOrder) ListMessagesOption

func ListMessagesWithBegin

func ListMessagesWithBegin(b time.Time) ListMessagesOption

func ListMessagesWithEnd

func ListMessagesWithEnd(e time.Time) ListMessagesOption

func ListMessagesWithTopics

func ListMessagesWithTopics(topic ...string) ListMessagesOption

type ListMessagesOptions

type ListMessagesOptions struct {
	Topics       []string
	Begin        *time.Time
	End          *time.Time
	MessageOrder messageOrder
}

type ListTopicsOption

type ListTopicsOption func(*ListTopicsOptions)

func ListTopicsWithName

func ListTopicsWithName(names ...string) ListTopicsOption

type ListTopicsOptions

type ListTopicsOptions struct {
	Names []string
}

type Message

type Message struct {
	Time time.Time

	Data []byte
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(t time.Time, topic *Topic, m proto.Message) (*Message, error)

func NewMessageFromData

func NewMessageFromData(t time.Time, topic *Topic, data []byte) *Message

func (*Message) CloneMessage

func (m *Message) CloneMessage() *Message

func (*Message) DataLen

func (m *Message) DataLen() int

func (*Message) DeepCloneData

func (m *Message) DeepCloneData() []byte

func (*Message) DynamicMessage

func (m *Message) DynamicMessage() (*dynamicpb.Message, error)

func (*Message) ID

func (m *Message) ID() uint64

func (*Message) To

func (m *Message) To(v proto.Message) error

func (*Message) TopicName

func (m *Message) TopicName() string

type MessageIndex

type MessageIndex struct {
	// contains filtered or unexported fields
}

MessageIndex 对topic和message建立相关index,方便操作 当前使用btree索引,对于两个相同的数据插入时会进行upsert,判断相关的标准就是传入的less 从而使用当前实现的btree索引,最好less中使用的字段能够唯一标识这条数据

func MergeMessageIndex

func MergeMessageIndex(mis ...*MessageIndex) (*MessageIndex, error)

func NewMessageIndex

func NewMessageIndex(topics []*Topic, messages []*Message, opts ...Option) (*MessageIndex, error)

func NewMessageIndexFromPath

func NewMessageIndexFromPath(p string, opts ...Option) (*MessageIndex, error)

func NewMessageIndexFromRecord

func NewMessageIndexFromRecord(r *Record, opts ...Option) (*MessageIndex, error)

func (*MessageIndex) AddMessage

func (m *MessageIndex) AddMessage(message *Message) error

func (*MessageIndex) AddTopic

func (m *MessageIndex) AddTopic(topic *Topic)

func (*MessageIndex) Copy

func (m *MessageIndex) Copy(srcTopicName, dstTopicName string) error

Copy src topic 的所有 message 到 dst topic 中 当前实现只有 src topic 存在且 dst topic 不存在才会拷贝

func (*MessageIndex) CopyN

func (m *MessageIndex) CopyN(pairs ...CopyPair) error

CopyN 拷贝多个 topic

func (*MessageIndex) CreateMessageIndex

func (m *MessageIndex) CreateMessageIndex(opts ...CreateMessageIndexOption) (*MessageIndex, error)

func (*MessageIndex) GetTopicByName

func (m *MessageIndex) GetTopicByName(name string) (*Topic, bool)

func (*MessageIndex) GetTopicFPS

func (m *MessageIndex) GetTopicFPS(name string) int

func (*MessageIndex) ListMessages

func (m *MessageIndex) ListMessages(opts ...ListMessagesOption) []*Message

func (*MessageIndex) ListTopics

func (m *MessageIndex) ListTopics(opts ...ListTopicsOption) []*Topic

ListTopics 返回的topic list数组顺序是不稳定的

func (*MessageIndex) MaxMessage

func (m *MessageIndex) MaxMessage() *Message

func (*MessageIndex) MessageLen

func (m *MessageIndex) MessageLen() int

func (*MessageIndex) MessageLenByName

func (m *MessageIndex) MessageLenByName(name string) int64

func (*MessageIndex) MinMessage

func (m *MessageIndex) MinMessage() *Message

func (*MessageIndex) RepeatMessageIndex

func (m *MessageIndex) RepeatMessageIndex(t time.Time, duration time.Duration,
	topicNames []string) (*MessageIndex, error)

RepeatMessageIndex 寻找指定时刻的相关topic的消息(若未找到,则找该时刻前的第一个消息) 按照 step 向前重复,持续 duration

func (*MessageIndex) TopicLen

func (m *MessageIndex) TopicLen() int

type Option

type Option func(*Options)

func WithTopicNameMapFunc

func WithTopicNameMapFunc(fn TopicNameMapFunc) Option

type Options

type Options struct {
	TopicNameMapFunc TopicNameMapFunc
}

type ProtoDesc

type ProtoDesc struct {
	RegistryFiles *protoregistry.Files
}

func NewProtoDesc

func NewProtoDesc(message proto.Message) (*ProtoDesc, error)

func NewProtoDescFromBytes

func NewProtoDescFromBytes(data []byte) (*ProtoDesc, error)

func NewProtoDescFromFileDescriptor

func NewProtoDescFromFileDescriptor(fd protoreflect.FileDescriptor) (*ProtoDesc, error)

func NewProtoDescFromFiles

func NewProtoDescFromFiles(files *protoregistry.Files) *ProtoDesc

func (*ProtoDesc) Bytes

func (p *ProtoDesc) Bytes(topicType string) ([]byte, error)

type Record

type Record struct {
	Header       *Header
	Index        *Index
	Channels     []*Channel
	ChunkHeaders []*ChunkHeader
	ChunkBodies  []*ChunkBody
}

func DecodeFromFile

func DecodeFromFile(f *os.File) (*Record, error)

func DecodeFromPath

func DecodeFromPath(path string) (*Record, error)

func DecodeFromReader

func DecodeFromReader(f io.ReadSeeker) (*Record, error)

type SectionHeader

type SectionHeader struct {
	SectionType pb.SectionType

	Size int64
	// contains filtered or unexported fields
}

func NewSectionHeaderFromReader

func NewSectionHeaderFromReader(f io.Reader) (*SectionHeader, error)

func (SectionHeader) WriteTo

func (h SectionHeader) WriteTo(w io.Writer) (int64, error)

type Topic

type Topic struct {
	Name      string
	Type      string
	Files     *protoregistry.Files
	ProtoDesc []byte
}

func NewTopic

func NewTopic(name string, message proto.Message) (*Topic, error)

type TopicNameMapFunc

type TopicNameMapFunc func(name string) string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL