From ac330bb7c921784ec3a3f1567a2523ae3c73397a Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 31 Dec 2015 22:28:54 -0800 Subject: [PATCH] *: update watch related proto 1. Add watch/cancel request 2. Add necessary fields in response to return watch error 3. Add watch_id into watch response --- etcdctlv3/command/watch_command.go | 4 +- etcdserver/api/v3rpc/watch.go | 20 +- etcdserver/etcdserverpb/rpc.pb.go | 445 +++++++++++++++++++++++++++-- etcdserver/etcdserverpb/rpc.proto | 36 ++- tools/benchmark/cmd/watch.go | 2 +- 5 files changed, 477 insertions(+), 30 deletions(-) diff --git a/etcdctlv3/command/watch_command.go b/etcdctlv3/command/watch_command.go index ecc265933..31c663b4f 100644 --- a/etcdctlv3/command/watch_command.go +++ b/etcdctlv3/command/watch_command.go @@ -74,9 +74,9 @@ func watchCommandFunc(cmd *cobra.Command, args []string) { var r *pb.WatchRequest switch segs[0] { case "watch": - r = &pb.WatchRequest{Key: []byte(segs[1])} + r = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte(segs[1])}} case "watchprefix": - r = &pb.WatchRequest{Prefix: []byte(segs[1])} + r = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte(segs[1])}} default: fmt.Fprintf(os.Stderr, "Invalid watch request format: use watch key or watchprefix prefix\n") continue diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index bee12e385..00eaecbac 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -48,14 +48,20 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { return err } - var prefix bool - toWatch := req.Key - if len(req.Key) == 0 { - toWatch = req.Prefix - prefix = true + switch { + case req.CreateRequest != nil: + creq := req.CreateRequest + var prefix bool + toWatch := creq.Key + if len(creq.Key) == 0 { + toWatch = creq.Prefix + prefix = true + } + watcher.Watch(toWatch, prefix, creq.StartRevision) + default: + // TODO: support cancellation + panic("not implemented") } - // TODO: support cancellation - watcher.Watch(toWatch, prefix, req.StartRevision) } } diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index a3a8f7b4b..da9848371 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -355,6 +355,29 @@ func (m *CompactionResponse) GetHeader() *ResponseHeader { } type WatchRequest struct { + CreateRequest *WatchCreateRequest `protobuf:"bytes,1,opt,name=create_request" json:"create_request,omitempty"` + CancelRequest *WatchCancelRequest `protobuf:"bytes,2,opt,name=cancel_request" json:"cancel_request,omitempty"` +} + +func (m *WatchRequest) Reset() { *m = WatchRequest{} } +func (m *WatchRequest) String() string { return proto.CompactTextString(m) } +func (*WatchRequest) ProtoMessage() {} + +func (m *WatchRequest) GetCreateRequest() *WatchCreateRequest { + if m != nil { + return m.CreateRequest + } + return nil +} + +func (m *WatchRequest) GetCancelRequest() *WatchCancelRequest { + if m != nil { + return m.CancelRequest + } + return nil +} + +type WatchCreateRequest struct { // the key to be watched Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` // the prefix to be watched. @@ -363,13 +386,39 @@ type WatchRequest struct { StartRevision int64 `protobuf:"varint,3,opt,name=start_revision,proto3" json:"start_revision,omitempty"` } -func (m *WatchRequest) Reset() { *m = WatchRequest{} } -func (m *WatchRequest) String() string { return proto.CompactTextString(m) } -func (*WatchRequest) ProtoMessage() {} +func (m *WatchCreateRequest) Reset() { *m = WatchCreateRequest{} } +func (m *WatchCreateRequest) String() string { return proto.CompactTextString(m) } +func (*WatchCreateRequest) ProtoMessage() {} + +type WatchCancelRequest struct { + WatchId int64 `protobuf:"varint,1,opt,name=watch_id,proto3" json:"watch_id,omitempty"` +} + +func (m *WatchCancelRequest) Reset() { *m = WatchCancelRequest{} } +func (m *WatchCancelRequest) String() string { return proto.CompactTextString(m) } +func (*WatchCancelRequest) ProtoMessage() {} type WatchResponse struct { - Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` - Events []*storagepb.Event `protobuf:"bytes,2,rep,name=events" json:"events,omitempty"` + Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` + // watch_id is the ID of the watching the response sent to. + WatchId int64 `protobuf:"varint,2,opt,name=watch_id,proto3" json:"watch_id,omitempty"` + // If the response is for a create watch request, created is set to true. + // Client should record the watch_id and prepare for receiving events for + // that watching from the same stream. + // All events sent to the created watching will attach with the same watch_id. + Created bool `protobuf:"varint,3,opt,name=created,proto3" json:"created,omitempty"` + // If the response is for a cancel watch request, cancel is set to true. + // No further events will be sent to the canceled watching. + Canceled bool `protobuf:"varint,4,opt,name=canceled,proto3" json:"canceled,omitempty"` + // If a watching tries to watch at a compacted index, compacted will be set to true. + // + // This happens when creating a watching at a compacted revision or the watching cannot + // catch up with the progress of the KV. + // + // Client should treat the watching as canceled and should not try to create any + // watching with same start_revision again. + Compacted bool `protobuf:"varint,5,opt,name=compacted,proto3" json:"compacted,omitempty"` + Events []*storagepb.Event `protobuf:"bytes,11,rep,name=events" json:"events,omitempty"` } func (m *WatchResponse) Reset() { *m = WatchResponse{} } @@ -685,7 +734,7 @@ func NewWatchClient(cc *grpc.ClientConn) WatchClient { } func (c *watchClient) Watch(ctx context.Context, opts ...grpc.CallOption) (Watch_WatchClient, error) { - stream, err := grpc.NewClientStream(ctx, &_Watch_serviceDesc.Streams[0], c.cc, "/etcdserverpb.watch/Watch", opts...) + stream, err := grpc.NewClientStream(ctx, &_Watch_serviceDesc.Streams[0], c.cc, "/etcdserverpb.Watch/Watch", opts...) if err != nil { return nil, err } @@ -756,7 +805,7 @@ func (x *watchWatchServer) Recv() (*WatchRequest, error) { } var _Watch_serviceDesc = grpc.ServiceDesc{ - ServiceName: "etcdserverpb.watch", + ServiceName: "etcdserverpb.Watch", HandlerType: (*WatchServer)(nil), Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ @@ -1513,6 +1562,44 @@ func (m *WatchRequest) Marshal() (data []byte, err error) { } func (m *WatchRequest) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.CreateRequest != nil { + data[i] = 0xa + i++ + i = encodeVarintRpc(data, i, uint64(m.CreateRequest.Size())) + n12, err := m.CreateRequest.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n12 + } + if m.CancelRequest != nil { + data[i] = 0x12 + i++ + i = encodeVarintRpc(data, i, uint64(m.CancelRequest.Size())) + n13, err := m.CancelRequest.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n13 + } + return i, nil +} + +func (m *WatchCreateRequest) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *WatchCreateRequest) MarshalTo(data []byte) (int, error) { var i int _ = i var l int @@ -1541,6 +1628,29 @@ func (m *WatchRequest) MarshalTo(data []byte) (int, error) { return i, nil } +func (m *WatchCancelRequest) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *WatchCancelRequest) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.WatchId != 0 { + data[i] = 0x8 + i++ + i = encodeVarintRpc(data, i, uint64(m.WatchId)) + } + return i, nil +} + func (m *WatchResponse) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) @@ -1560,15 +1670,50 @@ func (m *WatchResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n12, err := m.Header.MarshalTo(data[i:]) + n14, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n12 + i += n14 + } + if m.WatchId != 0 { + data[i] = 0x10 + i++ + i = encodeVarintRpc(data, i, uint64(m.WatchId)) + } + if m.Created { + data[i] = 0x18 + i++ + if m.Created { + data[i] = 1 + } else { + data[i] = 0 + } + i++ + } + if m.Canceled { + data[i] = 0x20 + i++ + if m.Canceled { + data[i] = 1 + } else { + data[i] = 0 + } + i++ + } + if m.Compacted { + data[i] = 0x28 + i++ + if m.Compacted { + data[i] = 1 + } else { + data[i] = 0 + } + i++ } if len(m.Events) > 0 { for _, msg := range m.Events { - data[i] = 0x12 + data[i] = 0x5a i++ i = encodeVarintRpc(data, i, uint64(msg.Size())) n, err := msg.MarshalTo(data[i:]) @@ -1623,11 +1768,11 @@ func (m *LeaseCreateResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n13, err := m.Header.MarshalTo(data[i:]) + n15, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n13 + i += n15 } if m.LeaseId != 0 { data[i] = 0x10 @@ -1690,11 +1835,11 @@ func (m *LeaseRevokeResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n14, err := m.Header.MarshalTo(data[i:]) + n16, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n14 + i += n16 } return i, nil } @@ -1741,11 +1886,11 @@ func (m *LeaseKeepAliveResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n15, err := m.Header.MarshalTo(data[i:]) + n17, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n15 + i += n17 } if m.LeaseId != 0 { data[i] = 0x10 @@ -2039,6 +2184,20 @@ func (m *CompactionResponse) Size() (n int) { } func (m *WatchRequest) Size() (n int) { + var l int + _ = l + if m.CreateRequest != nil { + l = m.CreateRequest.Size() + n += 1 + l + sovRpc(uint64(l)) + } + if m.CancelRequest != nil { + l = m.CancelRequest.Size() + n += 1 + l + sovRpc(uint64(l)) + } + return n +} + +func (m *WatchCreateRequest) Size() (n int) { var l int _ = l if m.Key != nil { @@ -2059,6 +2218,15 @@ func (m *WatchRequest) Size() (n int) { return n } +func (m *WatchCancelRequest) Size() (n int) { + var l int + _ = l + if m.WatchId != 0 { + n += 1 + sovRpc(uint64(m.WatchId)) + } + return n +} + func (m *WatchResponse) Size() (n int) { var l int _ = l @@ -2066,6 +2234,18 @@ func (m *WatchResponse) Size() (n int) { l = m.Header.Size() n += 1 + l + sovRpc(uint64(l)) } + if m.WatchId != 0 { + n += 1 + sovRpc(uint64(m.WatchId)) + } + if m.Created { + n += 2 + } + if m.Canceled { + n += 2 + } + if m.Compacted { + n += 2 + } if len(m.Events) > 0 { for _, e := range m.Events { l = e.Size() @@ -3704,6 +3884,111 @@ func (m *CompactionResponse) Unmarshal(data []byte) error { return nil } func (m *WatchRequest) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CreateRequest", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.CreateRequest == nil { + m.CreateRequest = &WatchCreateRequest{} + } + if err := m.CreateRequest.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CancelRequest", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.CancelRequest == nil { + m.CancelRequest = &WatchCancelRequest{} + } + if err := m.CancelRequest.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} +func (m *WatchCreateRequest) Unmarshal(data []byte) error { l := len(data) iNdEx := 0 for iNdEx < l { @@ -3814,6 +4099,67 @@ func (m *WatchRequest) Unmarshal(data []byte) error { return nil } +func (m *WatchCancelRequest) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field WatchId", wireType) + } + m.WatchId = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.WatchId |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} func (m *WatchResponse) Unmarshal(data []byte) error { l := len(data) iNdEx := 0 @@ -3864,6 +4210,73 @@ func (m *WatchResponse) Unmarshal(data []byte) error { } iNdEx = postIndex case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field WatchId", wireType) + } + m.WatchId = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.WatchId |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Created", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Created = bool(v != 0) + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Canceled", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Canceled = bool(v != 0) + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Compacted", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Compacted = bool(v != 0) + case 11: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Events", wireType) } diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index 31b456239..eb5f2f5d8 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -32,7 +32,7 @@ service KV { rpc Compact(CompactionRequest) returns (CompactionResponse) {} } -service watch { +service Watch { // Watch watches the events happening or happened. Both input and output // are stream. One watch rpc can watch for multiple keys or prefixs and // get a stream of events. The whole events history can be watched unless @@ -197,6 +197,13 @@ message CompactionResponse { } message WatchRequest { + oneof request_union { + WatchCreateRequest create_request = 1; + WatchCancelRequest cancel_request = 2; + } +} + +message WatchCreateRequest { // the key to be watched bytes key = 1; // the prefix to be watched. @@ -204,13 +211,34 @@ message WatchRequest { // start_revision is an optional revision (including) to watch from. No start_revision is "now". int64 start_revision = 3; // TODO: support Range watch? - // TODO: support notification every time interval or revision increase? - // TODO: support cancel watch if the server cannot reach with majority? +} + +message WatchCancelRequest { + int64 watch_id = 1; } message WatchResponse { ResponseHeader header = 1; - repeated storagepb.Event events = 2; + // watch_id is the ID of the watching the response sent to. + int64 watch_id = 2; + // If the response is for a create watch request, created is set to true. + // Client should record the watch_id and prepare for receiving events for + // that watching from the same stream. + // All events sent to the created watching will attach with the same watch_id. + bool created = 3; + // If the response is for a cancel watch request, cancel is set to true. + // No further events will be sent to the canceled watching. + bool canceled = 4; + // If a watching tries to watch at a compacted index, compacted will be set to true. + // + // This happens when creating a watching at a compacted revision or the watching cannot + // catch up with the progress of the KV. + // + // Client should treat the watching as canceled and should not try to create any + // watching with same start_revision again. + bool compacted = 5; + + repeated storagepb.Event events = 11; } message LeaseCreateRequest { diff --git a/tools/benchmark/cmd/watch.go b/tools/benchmark/cmd/watch.go index b96bc70dd..81a9cd40a 100644 --- a/tools/benchmark/cmd/watch.go +++ b/tools/benchmark/cmd/watch.go @@ -109,7 +109,7 @@ func watchFunc(cmd *cobra.Command, args []string) { go func() { for i := 0; i < watchTotal; i++ { requests <- etcdserverpb.WatchRequest{ - Key: watched[i%(len(watched))], + CreateRequest: &etcdserverpb.WatchCreateRequest{Key: watched[i%(len(watched))]}, } } close(requests)