Documentation
¶
Overview ¶
Package fqueue is a generated protocol buffer package.
It is generated from these files:
broker_service.proto
It has these top-level messages:
MsgBatch GetReq GetResp SubReq SubResp PullReq CreateTopicReq AssignTopicReq TopicPartition TopicPartitionOffset TopicPartitionLeader Resp
Index ¶
- Constants
- Variables
- func ByteToInt(bytes []byte) uint
- func ByteToUint32(bytes []byte) (num uint32)
- func ByteToUint64(bytes []byte) (num uint64)
- func CalcCrc32(bytes []byte) uint32
- func GeneratorKey(topic string, partition uint32) string
- func HomePath() string
- func MurmurHash2(data []byte) (h uint32)
- func NewBrokerServerServer(broker *Broker, appendChan chan *MsgBatch) *brokerServiceServer
- func RegisterBrokerServiceServer(s *grpc.Server, srv BrokerServiceServer)
- func Uint32ToByte(num uint32, bytes []byte)
- func Uint64ToByte(num uint64, bytes []byte)
- func UintToByte(num uint, bytes []byte)
- type AssignTopicReq
- func (*AssignTopicReq) Descriptor() ([]byte, []int)
- func (m *AssignTopicReq) GetLeaderPartitions() []uint32
- func (m *AssignTopicReq) GetPartitions() []uint32
- func (m *AssignTopicReq) GetTopic() string
- func (*AssignTopicReq) ProtoMessage()
- func (m *AssignTopicReq) Reset()
- func (m *AssignTopicReq) String() string
- type Broker
- type BrokerConfig
- type BrokerMember
- type BrokerServiceClient
- type BrokerServiceServer
- type BrokerService_AppendClient
- type BrokerService_AppendServer
- type BrokerService_PullClient
- type BrokerService_PullServer
- type BrokerService_PushClient
- type BrokerService_PushServer
- type CreateTopicReq
- func (*CreateTopicReq) Descriptor() ([]byte, []int)
- func (m *CreateTopicReq) GetPartitionCount() uint32
- func (m *CreateTopicReq) GetReplicaCount() uint32
- func (m *CreateTopicReq) GetTopic() string
- func (*CreateTopicReq) ProtoMessage()
- func (m *CreateTopicReq) Reset()
- func (m *CreateTopicReq) String() string
- type EtcdBroker
- type EtcdConsumer
- type EtcdConsumerGroup
- type EtcdTopic
- type FileIndex
- type FilePartition
- func (p *FilePartition) Close() (err error)
- func (p *FilePartition) OffsetLag(offset uint64) uint64
- func (p *FilePartition) ReadMsg(offset uint64) (msg []byte, err error)
- func (p *FilePartition) ReadMultiMsg(offset uint64, size uint32) (msgs [][]byte, err error)
- func (p *FilePartition) String() string
- func (p *FilePartition) WriteMsg(msg *Msg) (err error)
- func (p *FilePartition) WriteMultiMsg(msgs []*Msg) (err error)
- type FileTopic
- func (ft *FileTopic) Close() (err error)
- func (ft *FileTopic) OffsetLag(offset uint64, partition uint32) (lag uint64)
- func (ft *FileTopic) Read(offset uint64, partition uint32) ([]byte, error)
- func (ft *FileTopic) ReadMulti(offset uint64, partition, count uint32) ([][]byte, int)
- func (ft *FileTopic) Write(msg *Msg, partition uint32) (err error)
- func (ft *FileTopic) WriteMulti(msgs []*Msg, partition uint32) error
- func (ft *FileTopic) WriteMultiBytes(bytes [][]byte, partition uint32) error
- type GetReq
- type GetResp
- type Index
- type Msg
- type MsgBatch
- type MsgIndex
- type Partition
- type PartitionConfig
- type PartitionInfo
- type PullReq
- type Resp
- type RespStatus
- type SubReq
- type SubResp
- type Topic
- type TopicConfig
- type TopicPartition
- type TopicPartitionLeader
- func (*TopicPartitionLeader) Descriptor() ([]byte, []int)
- func (m *TopicPartitionLeader) GetPartitionLeader() map[uint32]string
- func (m *TopicPartitionLeader) GetTopic() string
- func (*TopicPartitionLeader) ProtoMessage()
- func (m *TopicPartitionLeader) Reset()
- func (m *TopicPartitionLeader) String() string
- type TopicPartitionOffset
- func (*TopicPartitionOffset) Descriptor() ([]byte, []int)
- func (m *TopicPartitionOffset) GetPartitionOffset() map[uint32]uint64
- func (m *TopicPartitionOffset) GetTopic() string
- func (*TopicPartitionOffset) ProtoMessage()
- func (m *TopicPartitionOffset) Reset()
- func (m *TopicPartitionOffset) String() string
Constants ¶
const ( MAX_SEND_COUNT = 1000 DEFAULT_CHAN_COUNT = 10 VERSION = 0x01 DefaultTimeout = 6 * time.Second )
const ( BROKER_FORMATER = "/brokers/ids/%s" TOPIC_PATTERN_PREFIX = "/brokers/topics/" LEASE_TTL = 1 )
const ( MSG_HEADER_LENGTH = 22 INDEX_LENGTH = 20 )
const ( IndexFilePostfix = "index" MsgFilePostfix = "msg" DefaultFlushTime = 500 // 500ms )
const ( M = 0x5bd1e995 R = 24 )
Mixing constants; generated offline.
const (
DEFAULT_BATCH_COUNT = 3
)
Variables ¶
var LOST_MSG_ERR = errors.New("lost some message, can't append")
var RespStatus_name = map[int32]string{
0: "OK",
1: "ERROR",
}
var RespStatus_value = map[string]int32{
"OK": 0,
"ERROR": 1,
}
Functions ¶
func ByteToUint32 ¶
func ByteToUint64 ¶
func GeneratorKey ¶
func MurmurHash2 ¶
The original MurmurHash2 32-bit algorithm by Austin Appleby.
func NewBrokerServerServer ¶
func RegisterBrokerServiceServer ¶
func RegisterBrokerServiceServer(s *grpc.Server, srv BrokerServiceServer)
func Uint32ToByte ¶
func Uint64ToByte ¶
func UintToByte ¶
Types ¶
type AssignTopicReq ¶
type AssignTopicReq struct {
Topic string `protobuf:"bytes,1,opt,name=topic" json:"topic,omitempty"`
// 需要新建的分区 eg: [1, 3, 4]
Partitions []uint32 `protobuf:"varint,2,rep,packed,name=partitions" json:"partitions,omitempty"`
// 由接收此消息的broker做leader的partition, eg: [1, 4]
LeaderPartitions []uint32 `protobuf:"varint,3,rep,packed,name=leaderPartitions" json:"leaderPartitions,omitempty"`
}
func (*AssignTopicReq) Descriptor ¶
func (*AssignTopicReq) Descriptor() ([]byte, []int)
func (*AssignTopicReq) GetLeaderPartitions ¶
func (m *AssignTopicReq) GetLeaderPartitions() []uint32
func (*AssignTopicReq) GetPartitions ¶
func (m *AssignTopicReq) GetPartitions() []uint32
func (*AssignTopicReq) GetTopic ¶
func (m *AssignTopicReq) GetTopic() string
func (*AssignTopicReq) ProtoMessage ¶
func (*AssignTopicReq) ProtoMessage()
func (*AssignTopicReq) Reset ¶
func (m *AssignTopicReq) Reset()
func (*AssignTopicReq) String ¶
func (m *AssignTopicReq) String() string
type Broker ¶
type Broker struct {
Name string
// rpc地址
ListenerAddress string
// etcd地址
EtcdEndPoints []string
// 存放数据的地址
DataPath string
// rpc
RpcServer *grpc.Server
//
Topics map[string]*FileTopic
// append chan, 收到别的broker发来的append
AppendMsgChan chan *MsgBatch
// error chan, 发生错误
ErrorChan chan error
// contains filtered or unexported fields
}
broke需要和etcd进行结合
func NewBrokerAndStart ¶
func NewBrokerAndStart(config *BrokerConfig) (broker *Broker, err error)
func (*Broker) AddBrokerMember ¶
func (broker *Broker) AddBrokerMember(config *BrokerConfig) (err error)
1,brokerClients上添加此broker 2,找到此broker上存在的partition 3. 找到此broker上leader的partition
func (*Broker) RemoveBroker ¶
func (broker *Broker) RemoveBroker(config *BrokerConfig) (err error)
移除broker
func (*Broker) UpdatePartitionBrokers ¶
func (broker *Broker) UpdatePartitionBrokers(partitions TopicPartition, newBroker []string, add bool)
更新partition关联的broker, 即包含此partition的所有broker 1,当broker添加或者lost的时候; 2, 当新建partition的时候 add=true增加broker, false减少
type BrokerConfig ¶
type BrokerMember ¶
type BrokerMember struct {
Name string
ListenerAddress string
Client BrokerServiceClient
}
type BrokerServiceClient ¶
type BrokerServiceClient interface {
// broker
// append from other broker
Append(ctx context.Context, opts ...grpc.CallOption) (BrokerService_AppendClient, error)
// get msg
Get(ctx context.Context, in *GetReq, opts ...grpc.CallOption) (*GetResp, error)
// producer
Push(ctx context.Context, opts ...grpc.CallOption) (BrokerService_PushClient, error)
// 创建分区
CreateTopic(ctx context.Context, in *CreateTopicReq, opts ...grpc.CallOption) (*Resp, error)
// 分配分区
AssignTopic(ctx context.Context, in *AssignTopicReq, opts ...grpc.CallOption) (*Resp, error)
// consumer
// subscribe topics
Subscribe(ctx context.Context, in *SubReq, opts ...grpc.CallOption) (*SubResp, error)
// Producer pull msg
Pull(ctx context.Context, in *PullReq, opts ...grpc.CallOption) (BrokerService_PullClient, error)
}
func NewBrokerServiceClient ¶
func NewBrokerServiceClient(cc *grpc.ClientConn) BrokerServiceClient
type BrokerServiceServer ¶
type BrokerServiceServer interface {
// broker
// append from other broker
Append(BrokerService_AppendServer) error
// get msg
Get(context.Context, *GetReq) (*GetResp, error)
// producer
Push(BrokerService_PushServer) error
// 创建分区
CreateTopic(context.Context, *CreateTopicReq) (*Resp, error)
// 分配分区
AssignTopic(context.Context, *AssignTopicReq) (*Resp, error)
// consumer
// subscribe topics
Subscribe(context.Context, *SubReq) (*SubResp, error)
// Producer pull msg
Pull(*PullReq, BrokerService_PullServer) error
}
type BrokerService_PullClient ¶
type BrokerService_PullClient interface {
Recv() (*MsgBatch, error)
grpc.ClientStream
}
type BrokerService_PullServer ¶
type BrokerService_PullServer interface {
Send(*MsgBatch) error
grpc.ServerStream
}
type CreateTopicReq ¶
type CreateTopicReq struct {
Topic string `protobuf:"bytes,1,opt,name=topic" json:"topic,omitempty"`
PartitionCount uint32 `protobuf:"varint,2,opt,name=partitionCount" json:"partitionCount,omitempty"`
ReplicaCount uint32 `protobuf:"varint,3,opt,name=replicaCount" json:"replicaCount,omitempty"`
}
func (*CreateTopicReq) Descriptor ¶
func (*CreateTopicReq) Descriptor() ([]byte, []int)
func (*CreateTopicReq) GetPartitionCount ¶
func (m *CreateTopicReq) GetPartitionCount() uint32
func (*CreateTopicReq) GetReplicaCount ¶
func (m *CreateTopicReq) GetReplicaCount() uint32
func (*CreateTopicReq) GetTopic ¶
func (m *CreateTopicReq) GetTopic() string
func (*CreateTopicReq) ProtoMessage ¶
func (*CreateTopicReq) ProtoMessage()
func (*CreateTopicReq) Reset ¶
func (m *CreateTopicReq) Reset()
func (*CreateTopicReq) String ¶
func (m *CreateTopicReq) String() string
type EtcdBroker ¶
type EtcdConsumer ¶
type EtcdConsumer struct {
Version uint32 `json:"version"`
Subscription map[string][]uint32 `json:"subscription"`
}
consumer订阅的topic和partition
type EtcdConsumerGroup ¶
consumer group订阅的topics
type FileIndex ¶
type FileIndex struct {
// contains filtered or unexported fields
}
保存 8byte offset, 8 byte position, 4 byte len = 20
type FilePartition ¶
type FilePartition struct {
Id uint32
Index *FileIndex
// 最新消息的offset
LatestMsgOffset uint64
// 刷盘
FlushTime time.Duration
// contains filtered or unexported fields
}
分区 需要写入消息,获取消息 不支持切换文件
func NewFilePartition ¶
func NewFilePartition(config *PartitionConfig) (fp *FilePartition, err error)
TODO 增加file lock,防止冲突
func (*FilePartition) Close ¶
func (p *FilePartition) Close() (err error)
func (*FilePartition) OffsetLag ¶
func (p *FilePartition) OffsetLag(offset uint64) uint64
func (*FilePartition) ReadMsg ¶
func (p *FilePartition) ReadMsg(offset uint64) (msg []byte, err error)
读取指定offset的数据
func (*FilePartition) ReadMultiMsg ¶
func (p *FilePartition) ReadMultiMsg(offset uint64, size uint32) (msgs [][]byte, err error)
当数量不够时,读取多少返回多少
func (*FilePartition) String ¶
func (p *FilePartition) String() string
func (*FilePartition) WriteMsg ¶
func (p *FilePartition) WriteMsg(msg *Msg) (err error)
写入新数据 保留的offset和position均为下一条消息的起点
func (*FilePartition) WriteMultiMsg ¶
func (p *FilePartition) WriteMultiMsg(msgs []*Msg) (err error)
type FileTopic ¶
type FileTopic struct {
Name string
PartitionIds []uint32
PartitionNumber uint32
Partitions map[uint32]*FilePartition
// 存放,可能没用
//MsgChan []chan *Msg
// 一次读取的数量
BatchCount uint32
}
func NewFileTopic ¶
func NewFileTopic(config *TopicConfig) (ft *FileTopic, err error)
func (*FileTopic) WriteMulti ¶
批量写入同一分区,保证同时写入成功或者同时失败
type GetReq ¶
type GetReq struct {
Topic string `protobuf:"bytes,1,opt,name=topic" json:"topic,omitempty"`
Partition uint32 `protobuf:"varint,2,opt,name=partition" json:"partition,omitempty"`
StartOffset uint64 `protobuf:"varint,3,opt,name=startOffset" json:"startOffset,omitempty"`
}
获取缺失的消息
func (*GetReq) Descriptor ¶
func (*GetReq) GetPartition ¶
func (*GetReq) GetStartOffset ¶
func (*GetReq) ProtoMessage ¶
func (*GetReq) ProtoMessage()
type GetResp ¶
type GetResp struct {
Resp *Resp `protobuf:"bytes,1,opt,name=resp" json:"resp,omitempty"`
Msgs *MsgBatch `protobuf:"bytes,2,opt,name=msgs" json:"msgs,omitempty"`
// 如果跟上, 则不用继续发get
Enough bool `protobuf:"varint,3,opt,name=enough" json:"enough,omitempty"`
}
get请求的resp
func (*GetResp) Descriptor ¶
func (*GetResp) ProtoMessage ¶
func (*GetResp) ProtoMessage()
type Msg ¶
type Msg struct {
Offset uint64 // 8 byte
Size uint32 // 4 byte
Crc uint32 // 4 byte
Type byte // 1 byte
Version byte // 1 byte
KeyLen uint32 // 4 byte // fixed_header_length = 22
Key []byte // KeyLen byte
Value []byte // Size = fixed_header_length + keyLen + ValueLen
Source []byte
}
包含消息
func NewMessage ¶
func NewMessageFromSource ¶
type MsgBatch ¶
type MsgBatch struct {
Topic string `protobuf:"bytes,1,opt,name=topic" json:"topic,omitempty"`
Partition uint32 `protobuf:"varint,2,opt,name=partition" json:"partition,omitempty"`
StartOffset uint64 `protobuf:"varint,3,opt,name=startOffset" json:"startOffset,omitempty"`
Msgs [][]byte `protobuf:"bytes,4,rep,name=msgs,proto3" json:"msgs,omitempty"`
}
一次发送单个分区的多个消息
func (*MsgBatch) Descriptor ¶
func (*MsgBatch) GetPartition ¶
func (*MsgBatch) GetStartOffset ¶
func (*MsgBatch) ProtoMessage ¶
func (*MsgBatch) ProtoMessage()
type Partition ¶
type Partition interface {
// 写入新数据
WriteMsg(*Msg) error
WriteMultiMsg([]*Msg) error
// 读取指定offset的数据
ReadMsg(uint64) ([]byte, error)
ReadMultiMsg(uint64, uint32) ([][]byte, error)
OffsetLag(uint64) uint64
}
为了
type PartitionConfig ¶
type PartitionInfo ¶
type PullReq ¶
type PullReq struct {
Count uint32 `protobuf:"varint,1,opt,name=count" json:"count,omitempty"`
// 超时时间, ms, <=0代表立即返回
Timeout int64 `protobuf:"varint,2,opt,name=timeout" json:"timeout,omitempty"`
// 这些信息由客户端存储
TpSet *TopicPartitionOffset `protobuf:"bytes,3,opt,name=tpSet" json:"tpSet,omitempty"`
}
func (*PullReq) Descriptor ¶
func (*PullReq) GetTimeout ¶
func (*PullReq) GetTpSet ¶
func (m *PullReq) GetTpSet() *TopicPartitionOffset
func (*PullReq) ProtoMessage ¶
func (*PullReq) ProtoMessage()
type Resp ¶
type Resp struct {
Status RespStatus `protobuf:"varint,1,opt,name=status,enum=fqueue.RespStatus" json:"status,omitempty"`
Comment string `protobuf:"bytes,2,opt,name=comment" json:"comment,omitempty"`
}
func (*Resp) Descriptor ¶
func (*Resp) GetComment ¶
func (*Resp) GetStatus ¶
func (m *Resp) GetStatus() RespStatus
func (*Resp) ProtoMessage ¶
func (*Resp) ProtoMessage()
type RespStatus ¶
type RespStatus int32
响应的状态
const ( RespStatus_OK RespStatus = 0 RespStatus_ERROR RespStatus = 1 )
func (RespStatus) EnumDescriptor ¶
func (RespStatus) EnumDescriptor() ([]byte, []int)
func (RespStatus) String ¶
func (x RespStatus) String() string
type SubReq ¶
type SubReq struct {
Topics []*TopicPartition `protobuf:"bytes,1,rep,name=topics" json:"topics,omitempty"`
}
订阅, 默认从最新位置开始读取
func (*SubReq) Descriptor ¶
func (*SubReq) GetTopics ¶
func (m *SubReq) GetTopics() []*TopicPartition
func (*SubReq) ProtoMessage ¶
func (*SubReq) ProtoMessage()
type SubResp ¶
type SubResp struct {
Resp *Resp `protobuf:"bytes,1,opt,name=resp" json:"resp,omitempty"`
// 分配给它的topic, partition, 以及最新的offset
TopicPartitionOffset []*TopicPartitionOffset `protobuf:"bytes,2,rep,name=topicPartitionOffset" json:"topicPartitionOffset,omitempty"`
}
func (*SubResp) Descriptor ¶
func (*SubResp) GetTopicPartitionOffset ¶
func (m *SubResp) GetTopicPartitionOffset() []*TopicPartitionOffset
func (*SubResp) ProtoMessage ¶
func (*SubResp) ProtoMessage()
type Topic ¶
type Topic interface {
// 写到指定的分区
Write(msg *Msg, partition uint32) error
// 批量写入同一分区,保证同时写入成功或者同时失败
WriteMulti(msgs []*Msg, partition uint32) error
// 读取单条消息
Read(offset uint64, partition uint32) ([]byte, error)
// 批量读取
ReadMulti(offset uint64, partition, count uint32) ([][]byte, int)
OffsetLag(offset uint64, partition uint32) uint64
Close() error
}
负责发送和接收
type TopicConfig ¶
type TopicPartition ¶
type TopicPartition struct {
Topic string `protobuf:"bytes,1,opt,name=topic" json:"topic,omitempty"`
Partition []uint32 `protobuf:"varint,2,rep,packed,name=partition" json:"partition,omitempty"`
}
func (*TopicPartition) Descriptor ¶
func (*TopicPartition) Descriptor() ([]byte, []int)
func (*TopicPartition) GetPartition ¶
func (m *TopicPartition) GetPartition() []uint32
func (*TopicPartition) GetTopic ¶
func (m *TopicPartition) GetTopic() string
func (*TopicPartition) ProtoMessage ¶
func (*TopicPartition) ProtoMessage()
func (*TopicPartition) Reset ¶
func (m *TopicPartition) Reset()
func (*TopicPartition) String ¶
func (m *TopicPartition) String() string
type TopicPartitionLeader ¶
type TopicPartitionLeader struct {
Topic string `protobuf:"bytes,1,opt,name=topic" json:"topic,omitempty"`
PartitionLeader map[uint32]string `` /* 151-byte string literal not displayed */
}
func (*TopicPartitionLeader) Descriptor ¶
func (*TopicPartitionLeader) Descriptor() ([]byte, []int)
func (*TopicPartitionLeader) GetPartitionLeader ¶
func (m *TopicPartitionLeader) GetPartitionLeader() map[uint32]string
func (*TopicPartitionLeader) GetTopic ¶
func (m *TopicPartitionLeader) GetTopic() string
func (*TopicPartitionLeader) ProtoMessage ¶
func (*TopicPartitionLeader) ProtoMessage()
func (*TopicPartitionLeader) Reset ¶
func (m *TopicPartitionLeader) Reset()
func (*TopicPartitionLeader) String ¶
func (m *TopicPartitionLeader) String() string
type TopicPartitionOffset ¶
type TopicPartitionOffset struct {
Topic string `protobuf:"bytes,1,opt,name=topic" json:"topic,omitempty"`
// partition, offset
PartitionOffset map[uint32]uint64 `` /* 152-byte string literal not displayed */
}
func (*TopicPartitionOffset) Descriptor ¶
func (*TopicPartitionOffset) Descriptor() ([]byte, []int)
func (*TopicPartitionOffset) GetPartitionOffset ¶
func (m *TopicPartitionOffset) GetPartitionOffset() map[uint32]uint64
func (*TopicPartitionOffset) GetTopic ¶
func (m *TopicPartitionOffset) GetTopic() string
func (*TopicPartitionOffset) ProtoMessage ¶
func (*TopicPartitionOffset) ProtoMessage()
func (*TopicPartitionOffset) Reset ¶
func (m *TopicPartitionOffset) Reset()
func (*TopicPartitionOffset) String ¶
func (m *TopicPartitionOffset) String() string