From 845cb6121336b18305b094d92154fd35bea703d7 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 18 May 2015 14:35:10 -0700 Subject: [PATCH] storage: add kv and event proto --- scripts/genproto.sh | 2 +- storage/kv.go | 39 +++- storage/storagepb/kv.pb.go | 456 +++++++++++++++++++++++++++++++++++++ storage/storagepb/kv.proto | 35 +++ 4 files changed, 526 insertions(+), 6 deletions(-) create mode 100644 storage/storagepb/kv.pb.go create mode 100644 storage/storagepb/kv.proto diff --git a/scripts/genproto.sh b/scripts/genproto.sh index e4bc91ff9..2c7db3345 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -5,7 +5,7 @@ # PREFIX="github.com/coreos/etcd/Godeps/_workspace/src" -DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./migrate/etcd4pb" +DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./migrate/etcd4pb ./storage/storagepb" SHA="bc946d07d1016848dfd2507f90f0859c9471681e" diff --git a/storage/kv.go b/storage/kv.go index b4a2b6e12..581c28132 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -2,9 +2,11 @@ package storage import ( "encoding/binary" + "log" "time" "github.com/coreos/etcd/storage/backend" + "github.com/coreos/etcd/storage/storagepb" ) var ( @@ -17,14 +19,16 @@ type store struct { b backend.Backend kvindex index - now uint64 // current index of the store + now uint64 // current index of the store + marshalBuf []byte // buffer for marshal protobuf } func newStore(path string) *store { s := &store{ - b: backend.New(path, batchInterval, batchLimit), - kvindex: newTreeIndex(), - now: 0, + b: backend.New(path, batchInterval, batchLimit), + kvindex: newTreeIndex(), + now: 0, + marshalBuf: make([]byte, 1024*1024), } tx := s.b.BatchTx() @@ -47,7 +51,32 @@ func (s *store) Put(key, value []byte) { tx.Lock() defer tx.Unlock() s.now = now - tx.UnsafePut(keyBucketName, ibytes, value) + + event := storagepb.Event{ + Type: storagepb.PUT, + Kv: storagepb.KeyValue{ + Key: key, + Value: value, + }, + } + + var ( + d []byte + err error + n int + ) + + if event.Size() < len(s.marshalBuf) { + n, err = event.MarshalTo(s.marshalBuf) + d = s.marshalBuf[:n] + } else { + d, err = event.Marshal() + } + if err != nil { + log.Fatalf("storage: cannot marshal event: %v", err) + } + + tx.UnsafePut(keyBucketName, ibytes, d) } func (s *store) Get(key []byte) []byte { diff --git a/storage/storagepb/kv.pb.go b/storage/storagepb/kv.pb.go new file mode 100644 index 000000000..1f7566edf --- /dev/null +++ b/storage/storagepb/kv.pb.go @@ -0,0 +1,456 @@ +// Code generated by protoc-gen-gogo. +// source: kv.proto +// DO NOT EDIT! + +/* + Package storagepb is a generated protocol buffer package. + + It is generated from these files: + kv.proto + + It has these top-level messages: + KeyValue + Event +*/ +package storagepb + +import proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" +import math "math" + +// discarding unused import gogoproto "github.com/gogo/protobuf/gogoproto/gogo.pb" + +import io "io" +import fmt "fmt" +import github_com_gogo_protobuf_proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = math.Inf + +type Event_EventType int32 + +const ( + PUT Event_EventType = 0 + DELETE Event_EventType = 1 + EXPIRE Event_EventType = 2 +) + +var Event_EventType_name = map[int32]string{ + 0: "PUT", + 1: "DELETE", + 2: "EXPIRE", +} +var Event_EventType_value = map[string]int32{ + "PUT": 0, + "DELETE": 1, + "EXPIRE": 2, +} + +func (x Event_EventType) Enum() *Event_EventType { + p := new(Event_EventType) + *p = x + return p +} +func (x Event_EventType) String() string { + return proto.EnumName(Event_EventType_name, int32(x)) +} +func (x *Event_EventType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(Event_EventType_value, data, "Event_EventType") + if err != nil { + return err + } + *x = Event_EventType(value) + return nil +} + +type KeyValue struct { + Key []byte `protobuf:"bytes,1,opt,name=key" json:"key"` + // mod_index is the last modified index of the key. + CreateIndex int64 `protobuf:"varint,2,opt,name=create_index" json:"create_index"` + ModIndex int64 `protobuf:"varint,3,opt,name=mod_index" json:"mod_index"` + // version is the version of the key. A deletion resets + // the version to zero and any modification of the key + // increases its version. + Version int64 `protobuf:"varint,4,opt,name=version" json:"version"` + Value []byte `protobuf:"bytes,5,opt,name=value" json:"value"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *KeyValue) Reset() { *m = KeyValue{} } +func (m *KeyValue) String() string { return proto.CompactTextString(m) } +func (*KeyValue) ProtoMessage() {} + +type Event struct { + Type Event_EventType `protobuf:"varint,1,opt,name=type,enum=storagepb.Event_EventType" json:"type"` + // a put event contains the current key-value + // a delete/expire event contains the previous + // key-value + Kv KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Event) Reset() { *m = Event{} } +func (m *Event) String() string { return proto.CompactTextString(m) } +func (*Event) ProtoMessage() {} + +func init() { + proto.RegisterEnum("storagepb.Event_EventType", Event_EventType_name, Event_EventType_value) +} +func (m *KeyValue) Unmarshal(data []byte) error { + l := len(data) + index := 0 + for index < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := index + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Key = append([]byte{}, data[index:postIndex]...) + index = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field CreateIndex", wireType) + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.CreateIndex |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ModIndex", wireType) + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.ModIndex |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType) + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Version |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := index + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Value = append([]byte{}, data[index:postIndex]...) + index = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + index -= sizeOfWire + skippy, err := github_com_gogo_protobuf_proto.Skip(data[index:]) + if err != nil { + return err + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) + index += skippy + } + } + return nil +} +func (m *Event) Unmarshal(data []byte) error { + l := len(data) + index := 0 + for index < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Type |= (Event_EventType(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Kv", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := index + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.Kv.Unmarshal(data[index:postIndex]); err != nil { + return err + } + index = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + index -= sizeOfWire + skippy, err := github_com_gogo_protobuf_proto.Skip(data[index:]) + if err != nil { + return err + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) + index += skippy + } + } + return nil +} +func (m *KeyValue) Size() (n int) { + var l int + _ = l + if m.Key != nil { + l = len(m.Key) + n += 1 + l + sovKv(uint64(l)) + } + n += 1 + sovKv(uint64(m.CreateIndex)) + n += 1 + sovKv(uint64(m.ModIndex)) + n += 1 + sovKv(uint64(m.Version)) + if m.Value != nil { + l = len(m.Value) + n += 1 + l + sovKv(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *Event) Size() (n int) { + var l int + _ = l + n += 1 + sovKv(uint64(m.Type)) + l = m.Kv.Size() + n += 1 + l + sovKv(uint64(l)) + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovKv(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozKv(x uint64) (n int) { + return sovKv(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *KeyValue) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *KeyValue) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + if m.Key != nil { + data[i] = 0xa + i++ + i = encodeVarintKv(data, i, uint64(len(m.Key))) + i += copy(data[i:], m.Key) + } + data[i] = 0x10 + i++ + i = encodeVarintKv(data, i, uint64(m.CreateIndex)) + data[i] = 0x18 + i++ + i = encodeVarintKv(data, i, uint64(m.ModIndex)) + data[i] = 0x20 + i++ + i = encodeVarintKv(data, i, uint64(m.Version)) + if m.Value != nil { + data[i] = 0x2a + i++ + i = encodeVarintKv(data, i, uint64(len(m.Value))) + i += copy(data[i:], m.Value) + } + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} + +func (m *Event) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Event) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + data[i] = 0x8 + i++ + i = encodeVarintKv(data, i, uint64(m.Type)) + data[i] = 0x12 + i++ + i = encodeVarintKv(data, i, uint64(m.Kv.Size())) + n1, err := m.Kv.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n1 + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} + +func encodeFixed64Kv(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Kv(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintKv(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} diff --git a/storage/storagepb/kv.proto b/storage/storagepb/kv.proto new file mode 100644 index 000000000..041d650ce --- /dev/null +++ b/storage/storagepb/kv.proto @@ -0,0 +1,35 @@ +package storagepb; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_getters_all) = false; +option (gogoproto.goproto_enum_prefix_all) = false; + +message KeyValue { + optional bytes key = 1 [(gogoproto.nullable) = false]; + // mod_index is the last modified index of the key. + optional int64 create_index = 2 [(gogoproto.nullable) = false]; + optional int64 mod_index = 3 [(gogoproto.nullable) = false]; + // version is the version of the key. A deletion resets + // the version to zero and any modification of the key + // increases its version. + optional int64 version = 4 [(gogoproto.nullable) = false]; + optional bytes value = 5 [(gogoproto.nullable) = false]; +} + +message Event { + enum EventType { + PUT = 0; + DELETE = 1; + EXPIRE = 2; + } + optional EventType type = 1 [(gogoproto.nullable) = false]; + // a put event contains the current key-value + // a delete/expire event contains the previous + // key-value + optional KeyValue kv = 2 [(gogoproto.nullable) = false]; +} +