Documentation
¶
Index ¶
- Constants
- func CastAnyToDynamicMessage(a *anypb.Any) *dynamicpb.Message
- func CastDynamicMessageToAny(m *dynamicpb.Message) *anypb.Any
- func CastMessageToAny(m proto.Message) *anypb.Any
- func CastProtoReflectMessageToAny(m protoreflect.Message) *anypb.Any
- func GenerateMessageID() uint64
- type Channel
- type ChunkBody
- type ChunkHeader
- type CopyPair
- type CreateMessageIndexOption
- type CreateMessageIndexOptions
- type Header
- type Index
- type ListMessagesOption
- type ListMessagesOptions
- type ListTopicsOption
- type ListTopicsOptions
- type Message
- type MessageIndex
- func MergeMessageIndex(mis ...*MessageIndex) (*MessageIndex, error)
- func NewMessageIndex(topics []*Topic, messages []*Message, opts ...Option) (*MessageIndex, error)
- func NewMessageIndexFromPath(p string, opts ...Option) (*MessageIndex, error)
- func NewMessageIndexFromRecord(r *Record, opts ...Option) (*MessageIndex, error)
- func (m *MessageIndex) AddMessage(message *Message) error
- func (m *MessageIndex) AddTopic(topic *Topic)
- func (m *MessageIndex) Copy(srcTopicName, dstTopicName string) error
- func (m *MessageIndex) CopyN(pairs ...CopyPair) error
- func (m *MessageIndex) CreateMessageIndex(opts ...CreateMessageIndexOption) (*MessageIndex, error)
- func (m *MessageIndex) GetTopicByName(name string) (*Topic, bool)
- func (m *MessageIndex) GetTopicFPS(name string) int
- func (m *MessageIndex) ListMessages(opts ...ListMessagesOption) []*Message
- func (m *MessageIndex) ListTopics(opts ...ListTopicsOption) []*Topic
- func (m *MessageIndex) MaxMessage() *Message
- func (m *MessageIndex) MessageLen() int
- func (m *MessageIndex) MessageLenByName(name string) int64
- func (m *MessageIndex) MinMessage() *Message
- func (m *MessageIndex) RepeatMessageIndex(t time.Time, duration time.Duration, topicNames []string) (*MessageIndex, error)
- func (m *MessageIndex) TopicLen() int
- type Option
- type Options
- type ProtoDesc
- type Record
- type SectionHeader
- type Topic
- type TopicNameMapFunc
Constants ¶
View Source
const ( MessageOrderAsc messageOrder = 0 MessageOrderDesc messageOrder = 1 )
Variables ¶
This section is empty.
Functions ¶
func CastProtoReflectMessageToAny ¶
func CastProtoReflectMessageToAny(m protoreflect.Message) *anypb.Any
func GenerateMessageID ¶
func GenerateMessageID() uint64
Types ¶
type Channel ¶
type Channel struct {
SectionHeader *SectionHeader
Channel *pb.Channel
}
type ChunkBody ¶
type ChunkBody struct {
SectionHeader *SectionHeader
ChunkBody *pb.ChunkBody
}
type ChunkHeader ¶
type ChunkHeader struct {
SectionHeader *SectionHeader
ChunkHeader *pb.ChunkHeader
}
func NewChunkHeaderFromReader ¶
func NewChunkHeaderFromReader(r io.Reader) (*ChunkHeader, error)
type CreateMessageIndexOption ¶
type CreateMessageIndexOption func(*CreateMessageIndexOptions)
func CreateMessageIndexWithBegin ¶
func CreateMessageIndexWithBegin(t time.Time) CreateMessageIndexOption
func CreateMessageIndexWithEnd ¶
func CreateMessageIndexWithEnd(t time.Time) CreateMessageIndexOption
func CreateMessageIndexWithTopic ¶
func CreateMessageIndexWithTopic(topics ...string) CreateMessageIndexOption
type Header ¶
type Header struct {
SectionHeader *SectionHeader
Header *pb.Header
}
type Index ¶
type Index struct {
SectionHeader *SectionHeader
IndexSection *pb.Index
}
func NewIndexFromReader ¶
func NewIndexFromReader(r io.ReadSeeker, header *Header) (*Index, error)
type ListMessagesOption ¶
type ListMessagesOption func(*ListMessagesOptions)
func ListMessagesOrderBy ¶
func ListMessagesOrderBy(order messageOrder) ListMessagesOption
func ListMessagesWithBegin ¶
func ListMessagesWithBegin(b time.Time) ListMessagesOption
func ListMessagesWithEnd ¶
func ListMessagesWithEnd(e time.Time) ListMessagesOption
func ListMessagesWithTopics ¶
func ListMessagesWithTopics(topic ...string) ListMessagesOption
type ListMessagesOptions ¶
type ListTopicsOption ¶
type ListTopicsOption func(*ListTopicsOptions)
func ListTopicsWithName ¶
func ListTopicsWithName(names ...string) ListTopicsOption
type ListTopicsOptions ¶
type ListTopicsOptions struct {
Names []string
}
type MessageIndex ¶
type MessageIndex struct {
// contains filtered or unexported fields
}
MessageIndex 对topic和message建立相关index,方便操作 当前使用btree索引,对于两个相同的数据插入时会进行upsert,判断相关的标准就是传入的less 从而使用当前实现的btree索引,最好less中使用的字段能够唯一标识这条数据
func MergeMessageIndex ¶
func MergeMessageIndex(mis ...*MessageIndex) (*MessageIndex, error)
func NewMessageIndex ¶
func NewMessageIndex(topics []*Topic, messages []*Message, opts ...Option) (*MessageIndex, error)
func NewMessageIndexFromPath ¶
func NewMessageIndexFromPath(p string, opts ...Option) (*MessageIndex, error)
func NewMessageIndexFromRecord ¶
func NewMessageIndexFromRecord(r *Record, opts ...Option) (*MessageIndex, error)
func (*MessageIndex) AddMessage ¶
func (m *MessageIndex) AddMessage(message *Message) error
func (*MessageIndex) AddTopic ¶
func (m *MessageIndex) AddTopic(topic *Topic)
func (*MessageIndex) Copy ¶
func (m *MessageIndex) Copy(srcTopicName, dstTopicName string) error
Copy src topic 的所有 message 到 dst topic 中 当前实现只有 src topic 存在且 dst topic 不存在才会拷贝
func (*MessageIndex) CreateMessageIndex ¶
func (m *MessageIndex) CreateMessageIndex(opts ...CreateMessageIndexOption) (*MessageIndex, error)
func (*MessageIndex) GetTopicByName ¶
func (m *MessageIndex) GetTopicByName(name string) (*Topic, bool)
func (*MessageIndex) GetTopicFPS ¶
func (m *MessageIndex) GetTopicFPS(name string) int
func (*MessageIndex) ListMessages ¶
func (m *MessageIndex) ListMessages(opts ...ListMessagesOption) []*Message
func (*MessageIndex) ListTopics ¶
func (m *MessageIndex) ListTopics(opts ...ListTopicsOption) []*Topic
ListTopics 返回的topic list数组顺序是不稳定的
func (*MessageIndex) MaxMessage ¶
func (m *MessageIndex) MaxMessage() *Message
func (*MessageIndex) MessageLen ¶
func (m *MessageIndex) MessageLen() int
func (*MessageIndex) MessageLenByName ¶
func (m *MessageIndex) MessageLenByName(name string) int64
func (*MessageIndex) MinMessage ¶
func (m *MessageIndex) MinMessage() *Message
func (*MessageIndex) RepeatMessageIndex ¶
func (m *MessageIndex) RepeatMessageIndex(t time.Time, duration time.Duration, topicNames []string) (*MessageIndex, error)
RepeatMessageIndex 寻找指定时刻的相关topic的消息(若未找到,则找该时刻前的第一个消息) 按照 step 向前重复,持续 duration
func (*MessageIndex) TopicLen ¶
func (m *MessageIndex) TopicLen() int
type Option ¶
type Option func(*Options)
func WithTopicNameMapFunc ¶
func WithTopicNameMapFunc(fn TopicNameMapFunc) Option
type Options ¶
type Options struct {
TopicNameMapFunc TopicNameMapFunc
}
type ProtoDesc ¶
type ProtoDesc struct {
RegistryFiles *protoregistry.Files
}
func NewProtoDescFromBytes ¶
func NewProtoDescFromFileDescriptor ¶
func NewProtoDescFromFileDescriptor(fd protoreflect.FileDescriptor) (*ProtoDesc, error)
func NewProtoDescFromFiles ¶
func NewProtoDescFromFiles(files *protoregistry.Files) *ProtoDesc
type Record ¶
type Record struct {
Header *Header
Index *Index
Channels []*Channel
ChunkHeaders []*ChunkHeader
ChunkBodies []*ChunkBody
}
func DecodeFromPath ¶
func DecodeFromReader ¶
func DecodeFromReader(f io.ReadSeeker) (*Record, error)
type SectionHeader ¶
type SectionHeader struct {
SectionType pb.SectionType
Size int64
// contains filtered or unexported fields
}
func NewSectionHeaderFromReader ¶
func NewSectionHeaderFromReader(f io.Reader) (*SectionHeader, error)
type Topic ¶
type Topic struct {
Name string
Type string
Files *protoregistry.Files
ProtoDesc []byte
}
type TopicNameMapFunc ¶
Click to show internal directories.
Click to hide internal directories.