etcdserver: support sorted range requests in v3 api

Fixes #4166
This commit is contained in:
Anthony Romano 2016-01-08 16:03:05 -08:00
parent 374b14e471
commit 82eeffbd58
5 changed files with 251 additions and 5 deletions

View File

@ -27,6 +27,7 @@ const (
ExitError
ExitBadConnection
ExitInvalidInput // for txn, watch command
ExitBadFeature // provided a valid flag with an unsupported value
ExitBadArgs = 128
)

View File

@ -16,6 +16,7 @@ package command
import (
"fmt"
"strings"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
@ -23,13 +24,23 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
var (
rangeLimit int
rangeSortOrder string
rangeSortTarget string
)
// NewRangeCommand returns the cobra command for "range".
func NewRangeCommand() *cobra.Command {
return &cobra.Command{
cmd := &cobra.Command{
Use: "range",
Short: "Range gets the keys in the range from the store.",
Run: rangeCommandFunc,
}
cmd.Flags().StringVar(&rangeSortOrder, "order", "", "order of results; ASCEND or DESCEND")
cmd.Flags().StringVar(&rangeSortTarget, "sort-by", "", "sort target; CREATE, KEY, MODIFY, VALUE, or VERSION")
cmd.Flags().IntVar(&rangeLimit, "limit", 0, "maximum number of results")
return cmd
}
// rangeCommandFunc executes the "range" command.
@ -48,13 +59,51 @@ func rangeCommandFunc(cmd *cobra.Command, args []string) {
if err != nil {
ExitWithError(ExitError, err)
}
sortByOrder := pb.RangeRequest_NONE
sortOrder := strings.ToUpper(rangeSortOrder)
switch {
case sortOrder == "ASCEND":
sortByOrder = pb.RangeRequest_ASCEND
case sortOrder == "DESCEND":
sortByOrder = pb.RangeRequest_DESCEND
case sortOrder == "":
sortByOrder = pb.RangeRequest_NONE
default:
ExitWithError(ExitBadFeature, fmt.Errorf("bad sort order %v", rangeSortOrder))
}
sortByTarget := pb.RangeRequest_KEY
sortTarget := strings.ToUpper(rangeSortTarget)
switch {
case sortTarget == "CREATE":
sortByTarget = pb.RangeRequest_CREATE
case sortTarget == "KEY":
sortByTarget = pb.RangeRequest_KEY
case sortTarget == "MODIFY":
sortByTarget = pb.RangeRequest_MOD
case sortTarget == "VALUE":
sortByTarget = pb.RangeRequest_VALUE
case sortTarget == "VERSION":
sortByTarget = pb.RangeRequest_VERSION
case sortTarget == "":
sortByTarget = pb.RangeRequest_KEY
default:
ExitWithError(ExitBadFeature, fmt.Errorf("bad sort target %v", rangeSortTarget))
}
conn, err := grpc.Dial(endpoint)
if err != nil {
ExitWithError(ExitBadConnection, err)
}
kv := pb.NewKVClient(conn)
req := &pb.RangeRequest{Key: key, RangeEnd: rangeEnd}
req := &pb.RangeRequest{
Key: key,
RangeEnd: rangeEnd,
SortOrder: sortByOrder,
SortTarget: sortByTarget,
Limit: int64(rangeLimit),
}
resp, err := kv.Range(context.Background(), req)
for _, kv := range resp.Kvs {
fmt.Printf("%s %s\n", string(kv.Key), string(kv.Value))

View File

@ -20,6 +20,58 @@ import fmt "fmt"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
type RangeRequest_SortOrder int32
const (
RangeRequest_NONE RangeRequest_SortOrder = 0
RangeRequest_ASCEND RangeRequest_SortOrder = 1
RangeRequest_DESCEND RangeRequest_SortOrder = 2
)
var RangeRequest_SortOrder_name = map[int32]string{
0: "NONE",
1: "ASCEND",
2: "DESCEND",
}
var RangeRequest_SortOrder_value = map[string]int32{
"NONE": 0,
"ASCEND": 1,
"DESCEND": 2,
}
func (x RangeRequest_SortOrder) String() string {
return proto.EnumName(RangeRequest_SortOrder_name, int32(x))
}
type RangeRequest_SortTarget int32
const (
RangeRequest_KEY RangeRequest_SortTarget = 0
RangeRequest_VERSION RangeRequest_SortTarget = 1
RangeRequest_CREATE RangeRequest_SortTarget = 2
RangeRequest_MOD RangeRequest_SortTarget = 3
RangeRequest_VALUE RangeRequest_SortTarget = 4
)
var RangeRequest_SortTarget_name = map[int32]string{
0: "KEY",
1: "VERSION",
2: "CREATE",
3: "MOD",
4: "VALUE",
}
var RangeRequest_SortTarget_value = map[string]int32{
"KEY": 0,
"VERSION": 1,
"CREATE": 2,
"MOD": 3,
"VALUE": 4,
}
func (x RangeRequest_SortTarget) String() string {
return proto.EnumName(RangeRequest_SortTarget_name, int32(x))
}
type Compare_CompareResult int32
const (
@ -94,6 +146,10 @@ type RangeRequest struct {
// if the revision has been compacted, ErrCompaction will be returned in
// response.
Revision int64 `protobuf:"varint,4,opt,name=revision,proto3" json:"revision,omitempty"`
// sort_order is the requested order for returned the results
SortOrder RangeRequest_SortOrder `protobuf:"varint,5,opt,name=sort_order,proto3,enum=etcdserverpb.RangeRequest_SortOrder" json:"sort_order,omitempty"`
// sort_target is the kv field to use for sorting
SortTarget RangeRequest_SortTarget `protobuf:"varint,6,opt,name=sort_target,proto3,enum=etcdserverpb.RangeRequest_SortTarget" json:"sort_target,omitempty"`
}
func (m *RangeRequest) Reset() { *m = RangeRequest{} }
@ -516,6 +572,8 @@ func (m *LeaseKeepAliveResponse) GetHeader() *ResponseHeader {
}
func init() {
proto.RegisterEnum("etcdserverpb.RangeRequest_SortOrder", RangeRequest_SortOrder_name, RangeRequest_SortOrder_value)
proto.RegisterEnum("etcdserverpb.RangeRequest_SortTarget", RangeRequest_SortTarget_name, RangeRequest_SortTarget_value)
proto.RegisterEnum("etcdserverpb.Compare_CompareResult", Compare_CompareResult_name, Compare_CompareResult_value)
proto.RegisterEnum("etcdserverpb.Compare_CompareTarget", Compare_CompareTarget_name, Compare_CompareTarget_value)
}
@ -1059,6 +1117,16 @@ func (m *RangeRequest) MarshalTo(data []byte) (int, error) {
i++
i = encodeVarintRpc(data, i, uint64(m.Revision))
}
if m.SortOrder != 0 {
data[i] = 0x28
i++
i = encodeVarintRpc(data, i, uint64(m.SortOrder))
}
if m.SortTarget != 0 {
data[i] = 0x30
i++
i = encodeVarintRpc(data, i, uint64(m.SortTarget))
}
return i, nil
}
@ -1971,6 +2039,12 @@ func (m *RangeRequest) Size() (n int) {
if m.Revision != 0 {
n += 1 + sovRpc(uint64(m.Revision))
}
if m.SortOrder != 0 {
n += 1 + sovRpc(uint64(m.SortOrder))
}
if m.SortTarget != 0 {
n += 1 + sovRpc(uint64(m.SortTarget))
}
return n
}
@ -2551,6 +2625,38 @@ func (m *RangeRequest) Unmarshal(data []byte) error {
break
}
}
case 5:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field SortOrder", wireType)
}
m.SortOrder = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.SortOrder |= (RangeRequest_SortOrder(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 6:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field SortTarget", wireType)
}
m.SortTarget = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.SortTarget |= (RangeRequest_SortTarget(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
var sizeOfWire int
for {

View File

@ -67,6 +67,19 @@ message ResponseHeader {
}
message RangeRequest {
enum SortOrder {
NONE = 0; // default, no sorting
ASCEND = 1; // lowest target value first
DESCEND = 2; // highest target value first
}
enum SortTarget {
KEY = 0;
VERSION = 1;
CREATE = 2;
MOD = 3;
VALUE = 4;
}
// if the range_end is not given, the request returns the key.
bytes key = 1;
// if the range_end is given, it gets the keys in range [key, range_end).
@ -78,6 +91,12 @@ message RangeRequest {
// if the revision has been compacted, ErrCompaction will be returned in
// response.
int64 revision = 4;
// sort_order is the requested order for returned the results
SortOrder sort_order = 5;
// sort_target is the kv field to use for sorting
SortTarget sort_target = 6;
}
message RangeResponse {

View File

@ -17,6 +17,7 @@ package etcdserver
import (
"bytes"
"fmt"
"sort"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
@ -191,6 +192,45 @@ func applyPut(txnID int64, kv dstorage.KV, p *pb.PutRequest) (*pb.PutResponse, e
return resp, nil
}
type kvSort struct{ kvs []storagepb.KeyValue }
func (s *kvSort) Swap(i, j int) {
t := s.kvs[i]
s.kvs[i] = s.kvs[j]
s.kvs[j] = t
}
func (s *kvSort) Len() int { return len(s.kvs) }
type kvSortByKey struct{ *kvSort }
func (s *kvSortByKey) Less(i, j int) bool {
return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0
}
type kvSortByVersion struct{ *kvSort }
func (s *kvSortByVersion) Less(i, j int) bool {
return (s.kvs[i].Version - s.kvs[j].Version) < 0
}
type kvSortByCreate struct{ *kvSort }
func (s *kvSortByCreate) Less(i, j int) bool {
return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0
}
type kvSortByMod struct{ *kvSort }
func (s *kvSortByMod) Less(i, j int) bool {
return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0
}
type kvSortByValue struct{ *kvSort }
func (s *kvSortByValue) Less(i, j int) bool {
return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
}
func applyRange(txnID int64, kv dstorage.KV, r *pb.RangeRequest) (*pb.RangeResponse, error) {
resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}
@ -201,18 +241,49 @@ func applyRange(txnID int64, kv dstorage.KV, r *pb.RangeRequest) (*pb.RangeRespo
err error
)
limit := r.Limit
if r.SortOrder != pb.RangeRequest_NONE {
// fetch everything; sort and truncate afterwards
limit = 0
}
if txnID != noTxn {
kvs, rev, err = kv.TxnRange(txnID, r.Key, r.RangeEnd, r.Limit, 0)
kvs, rev, err = kv.TxnRange(txnID, r.Key, r.RangeEnd, limit, 0)
if err != nil {
return nil, err
}
} else {
kvs, rev, err = kv.Range(r.Key, r.RangeEnd, r.Limit, 0)
kvs, rev, err = kv.Range(r.Key, r.RangeEnd, limit, 0)
if err != nil {
return nil, err
}
}
if r.SortOrder != pb.RangeRequest_NONE {
var sorter sort.Interface
switch {
case r.SortTarget == pb.RangeRequest_KEY:
sorter = &kvSortByKey{&kvSort{kvs}}
case r.SortTarget == pb.RangeRequest_VERSION:
sorter = &kvSortByVersion{&kvSort{kvs}}
case r.SortTarget == pb.RangeRequest_CREATE:
sorter = &kvSortByCreate{&kvSort{kvs}}
case r.SortTarget == pb.RangeRequest_MOD:
sorter = &kvSortByMod{&kvSort{kvs}}
case r.SortTarget == pb.RangeRequest_VALUE:
sorter = &kvSortByValue{&kvSort{kvs}}
}
switch {
case r.SortOrder == pb.RangeRequest_ASCEND:
sort.Sort(sorter)
case r.SortOrder == pb.RangeRequest_DESCEND:
sort.Sort(sort.Reverse(sorter))
}
if r.Limit > 0 && len(kvs) > int(r.Limit) {
kvs = kvs[:r.Limit]
}
}
resp.Header.Revision = rev
for i := range kvs {
resp.Kvs = append(resp.Kvs, &kvs[i])