From 1f8c7b33e7a0081bd2b3b2a3a713866cdb656186 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 20 Mar 2017 20:34:05 -0700 Subject: [PATCH] namespace: a wrapper for clientv3 to namespace requests --- clientv3/namespace/doc.go | 43 ++++++++ clientv3/namespace/kv.go | 189 ++++++++++++++++++++++++++++++++ clientv3/namespace/lease.go | 58 ++++++++++ clientv3/namespace/util.go | 42 +++++++ clientv3/namespace/util_test.go | 75 +++++++++++++ clientv3/namespace/watch.go | 84 ++++++++++++++ 6 files changed, 491 insertions(+) create mode 100644 clientv3/namespace/doc.go create mode 100644 clientv3/namespace/kv.go create mode 100644 clientv3/namespace/lease.go create mode 100644 clientv3/namespace/util.go create mode 100644 clientv3/namespace/util_test.go create mode 100644 clientv3/namespace/watch.go diff --git a/clientv3/namespace/doc.go b/clientv3/namespace/doc.go new file mode 100644 index 000000000..c3ce14b9d --- /dev/null +++ b/clientv3/namespace/doc.go @@ -0,0 +1,43 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package namespace is a clientv3 wrapper that translates all keys to begin +// with a given prefix. +// +// First, create a client: +// +// cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"localhost:2379"}}) +// if err != nil { +// // handle error! +// } +// +// Next, override the client interfaces: +// +// unprefixedKV := cli.KV +// cli.KV = namespace.NewKV(cli.KV, "my-prefix/") +// cli.Watcher = namespace.NewWatcher(cli.Watcher, "my-prefix/") +// cli.Leases = namespace.NewLease(cli.Lease, "my-prefix/") +// +// Now calls using 'cli' will namespace / prefix all keys with "my-prefix/": +// +// cli.Put(context.TODO(), "abc", "123") +// resp, _ := unprefixedKV.Get(context.TODO(), "my-prefix/abc") +// fmt.Printf("%s\n", resp.Kvs[0].Value) +// // Output: 123 +// unprefixedKV.Put(context.TODO(), "my-prefix/abc", "456") +// resp, _ = cli.Get("abc") +// fmt.Printf("%s\n", resp.Kvs[0].Value) +// // Output: 456 +// +package namespace diff --git a/clientv3/namespace/kv.go b/clientv3/namespace/kv.go new file mode 100644 index 000000000..f3e82d6b8 --- /dev/null +++ b/clientv3/namespace/kv.go @@ -0,0 +1,189 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package namespace + +import ( + "golang.org/x/net/context" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +type kvPrefix struct { + clientv3.KV + pfx string +} + +// NewKV wraps a KV instance so that all requests +// are prefixed with a given string. +func NewKV(kv clientv3.KV, prefix string) clientv3.KV { + return &kvPrefix{kv, prefix} +} + +func (kv *kvPrefix) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) { + if len(key) == 0 { + return nil, rpctypes.ErrEmptyKey + } + op := kv.prefixOp(clientv3.OpPut(key, val, opts...)) + r, err := kv.KV.Do(ctx, op) + if err != nil { + return nil, err + } + put := r.Put() + kv.unprefixPutResponse(put) + return put, nil +} + +func (kv *kvPrefix) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + if len(key) == 0 { + return nil, rpctypes.ErrEmptyKey + } + r, err := kv.KV.Do(ctx, kv.prefixOp(clientv3.OpGet(key, opts...))) + if err != nil { + return nil, err + } + get := r.Get() + kv.unprefixGetResponse(get) + return get, nil +} + +func (kv *kvPrefix) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { + if len(key) == 0 { + return nil, rpctypes.ErrEmptyKey + } + r, err := kv.KV.Do(ctx, kv.prefixOp(clientv3.OpDelete(key, opts...))) + if err != nil { + return nil, err + } + del := r.Del() + kv.unprefixDeleteResponse(del) + return del, nil +} + +func (kv *kvPrefix) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) { + if len(op.KeyBytes()) == 0 { + return clientv3.OpResponse{}, rpctypes.ErrEmptyKey + } + r, err := kv.KV.Do(ctx, kv.prefixOp(op)) + if err != nil { + return r, err + } + switch { + case r.Get() != nil: + kv.unprefixGetResponse(r.Get()) + case r.Put() != nil: + kv.unprefixPutResponse(r.Put()) + case r.Del() != nil: + kv.unprefixDeleteResponse(r.Del()) + } + return r, nil +} + +type txnPrefix struct { + clientv3.Txn + kv *kvPrefix +} + +func (kv *kvPrefix) Txn(ctx context.Context) clientv3.Txn { + return &txnPrefix{kv.KV.Txn(ctx), kv} +} + +func (txn *txnPrefix) If(cs ...clientv3.Cmp) clientv3.Txn { + newCmps := make([]clientv3.Cmp, len(cs)) + for i := range cs { + newCmps[i] = cs[i] + pfxKey, _ := txn.kv.prefixInterval(cs[i].KeyBytes(), nil) + newCmps[i].WithKeyBytes(pfxKey) + } + txn.Txn = txn.Txn.If(newCmps...) + return txn +} + +func (txn *txnPrefix) Then(ops ...clientv3.Op) clientv3.Txn { + newOps := make([]clientv3.Op, len(ops)) + for i := range ops { + newOps[i] = txn.kv.prefixOp(ops[i]) + } + txn.Txn = txn.Txn.Then(newOps...) + return txn +} + +func (txn *txnPrefix) Else(ops ...clientv3.Op) clientv3.Txn { + newOps := make([]clientv3.Op, len(ops)) + for i := range ops { + newOps[i] = txn.kv.prefixOp(ops[i]) + } + txn.Txn = txn.Txn.Else(newOps...) + return txn +} + +func (txn *txnPrefix) Commit() (*clientv3.TxnResponse, error) { + resp, err := txn.Txn.Commit() + if err != nil { + return nil, err + } + txn.kv.unprefixTxnResponse(resp) + return resp, nil +} + +func (kv *kvPrefix) prefixOp(op clientv3.Op) clientv3.Op { + begin, end := kv.prefixInterval(op.KeyBytes(), op.RangeBytes()) + op.WithKeyBytes(begin) + op.WithRangeBytes(end) + return op +} + +func (kv *kvPrefix) unprefixGetResponse(resp *clientv3.GetResponse) { + for i := range resp.Kvs { + resp.Kvs[i].Key = resp.Kvs[i].Key[len(kv.pfx):] + } +} + +func (kv *kvPrefix) unprefixPutResponse(resp *clientv3.PutResponse) { + if resp.PrevKv != nil { + resp.PrevKv.Key = resp.PrevKv.Key[len(kv.pfx):] + } +} + +func (kv *kvPrefix) unprefixDeleteResponse(resp *clientv3.DeleteResponse) { + for i := range resp.PrevKvs { + resp.PrevKvs[i].Key = resp.PrevKvs[i].Key[len(kv.pfx):] + } +} + +func (kv *kvPrefix) unprefixTxnResponse(resp *clientv3.TxnResponse) { + for _, r := range resp.Responses { + switch tv := r.Response.(type) { + case *pb.ResponseOp_ResponseRange: + if tv.ResponseRange != nil { + kv.unprefixGetResponse((*clientv3.GetResponse)(tv.ResponseRange)) + } + case *pb.ResponseOp_ResponsePut: + if tv.ResponsePut != nil { + kv.unprefixPutResponse((*clientv3.PutResponse)(tv.ResponsePut)) + } + case *pb.ResponseOp_ResponseDeleteRange: + if tv.ResponseDeleteRange != nil { + kv.unprefixDeleteResponse((*clientv3.DeleteResponse)(tv.ResponseDeleteRange)) + } + default: + } + } +} + +func (p *kvPrefix) prefixInterval(key, end []byte) (pfxKey []byte, pfxEnd []byte) { + return prefixInterval(p.pfx, key, end) +} diff --git a/clientv3/namespace/lease.go b/clientv3/namespace/lease.go new file mode 100644 index 000000000..fc7c22869 --- /dev/null +++ b/clientv3/namespace/lease.go @@ -0,0 +1,58 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package namespace + +import ( + "bytes" + + "golang.org/x/net/context" + + "github.com/coreos/etcd/clientv3" +) + +type leasePrefix struct { + clientv3.Lease + pfx []byte +} + +// NewLease wraps a Lease interface to filter for only keys with a prefix +// and remove that prefix when fetching attached keys through TimeToLive. +func NewLease(l clientv3.Lease, prefix string) clientv3.Lease { + return &leasePrefix{l, []byte(prefix)} +} + +func (l *leasePrefix) TimeToLive(ctx context.Context, id clientv3.LeaseID, opts ...clientv3.LeaseOption) (*clientv3.LeaseTimeToLiveResponse, error) { + resp, err := l.Lease.TimeToLive(ctx, id, opts...) + if err != nil { + return nil, err + } + if len(resp.Keys) > 0 { + var outKeys [][]byte + for i := range resp.Keys { + if len(resp.Keys[i]) < len(l.pfx) { + // too short + continue + } + if !bytes.Equal(resp.Keys[i][:len(l.pfx)], l.pfx) { + // doesn't match prefix + continue + } + // strip prefix + outKeys = append(outKeys, resp.Keys[i][len(l.pfx):]) + } + resp.Keys = outKeys + } + return resp, nil +} diff --git a/clientv3/namespace/util.go b/clientv3/namespace/util.go new file mode 100644 index 000000000..ecf04046c --- /dev/null +++ b/clientv3/namespace/util.go @@ -0,0 +1,42 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package namespace + +func prefixInterval(pfx string, key, end []byte) (pfxKey []byte, pfxEnd []byte) { + pfxKey = make([]byte, len(pfx)+len(key)) + copy(pfxKey[copy(pfxKey, pfx):], key) + + if len(end) == 1 && end[0] == 0 { + // the edge of the keyspace + pfxEnd = make([]byte, len(pfx)) + copy(pfxEnd, pfx) + ok := false + for i := len(pfxEnd) - 1; i >= 0; i-- { + if pfxEnd[i]++; pfxEnd[i] != 0 { + ok = true + break + } + } + if !ok { + // 0xff..ff => 0x00 + pfxEnd = []byte{0} + } + } else if len(end) >= 1 { + pfxEnd = make([]byte, len(pfx)+len(end)) + copy(pfxEnd[copy(pfxEnd, pfx):], end) + } + + return pfxKey, pfxEnd +} diff --git a/clientv3/namespace/util_test.go b/clientv3/namespace/util_test.go new file mode 100644 index 000000000..9ba472b0a --- /dev/null +++ b/clientv3/namespace/util_test.go @@ -0,0 +1,75 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package namespace + +import ( + "bytes" + "testing" +) + +func TestPrefixInterval(t *testing.T) { + tests := []struct { + pfx string + key []byte + end []byte + + wKey []byte + wEnd []byte + }{ + // single key + { + pfx: "pfx/", + key: []byte("a"), + + wKey: []byte("pfx/a"), + }, + // range + { + pfx: "pfx/", + key: []byte("abc"), + end: []byte("def"), + + wKey: []byte("pfx/abc"), + wEnd: []byte("pfx/def"), + }, + // one-sided range + { + pfx: "pfx/", + key: []byte("abc"), + end: []byte{0}, + + wKey: []byte("pfx/abc"), + wEnd: []byte("pfx0"), + }, + // one-sided range, end of keyspace + { + pfx: "\xff\xff", + key: []byte("abc"), + end: []byte{0}, + + wKey: []byte("\xff\xffabc"), + wEnd: []byte{0}, + }, + } + for i, tt := range tests { + pfxKey, pfxEnd := prefixInterval(tt.pfx, tt.key, tt.end) + if !bytes.Equal(pfxKey, tt.wKey) { + t.Errorf("#%d: expected key=%q, got key=%q", i, tt.wKey, pfxKey) + } + if !bytes.Equal(pfxEnd, tt.wEnd) { + t.Errorf("#%d: expected end=%q, got end=%q", i, tt.wEnd, pfxEnd) + } + } +} diff --git a/clientv3/namespace/watch.go b/clientv3/namespace/watch.go new file mode 100644 index 000000000..5697f4496 --- /dev/null +++ b/clientv3/namespace/watch.go @@ -0,0 +1,84 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package namespace + +import ( + "sync" + + "golang.org/x/net/context" + + "github.com/coreos/etcd/clientv3" +) + +type watcherPrefix struct { + clientv3.Watcher + pfx string + + wg sync.WaitGroup + stopc chan struct{} + stopOnce sync.Once +} + +// NewWatcher wraps a Watcher instance so that all Watch requests +// are prefixed with a given string and all Watch responses have +// the prefix removed. +func NewWatcher(w clientv3.Watcher, prefix string) clientv3.Watcher { + return &watcherPrefix{Watcher: w, pfx: prefix, stopc: make(chan struct{})} +} + +func (w *watcherPrefix) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { + // since OpOption is opaque, determine range for prefixing through an OpGet + op := clientv3.OpGet("abc", opts...) + end := op.RangeBytes() + pfxBegin, pfxEnd := prefixInterval(w.pfx, []byte(key), end) + if pfxEnd != nil { + opts = append(opts, clientv3.WithRange(string(pfxEnd))) + } + + wch := w.Watcher.Watch(ctx, string(pfxBegin), opts...) + + // translate watch events from prefixed to unprefixed + pfxWch := make(chan clientv3.WatchResponse) + w.wg.Add(1) + go func() { + defer func() { + close(pfxWch) + w.wg.Done() + }() + for wr := range wch { + for i := range wr.Events { + wr.Events[i].Kv.Key = wr.Events[i].Kv.Key[len(w.pfx):] + if wr.Events[i].PrevKv != nil { + wr.Events[i].PrevKv.Key = wr.Events[i].Kv.Key + } + } + select { + case pfxWch <- wr: + case <-ctx.Done(): + return + case <-w.stopc: + return + } + } + }() + return pfxWch +} + +func (w *watcherPrefix) Close() error { + err := w.Watcher.Close() + w.stopOnce.Do(func() { close(w.stopc) }) + w.wg.Wait() + return err +}