Merge pull request #5781 from gyuho/compact_client

*: compact with physical in client side
This commit is contained in:
Gyu-Ho Lee
2016-06-27 14:46:54 -07:00
committed by GitHub
7 changed files with 116 additions and 11 deletions

53
clientv3/compact_op.go Normal file
View File

@@ -0,0 +1,53 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
// CompactOp represents a compact operation.
type CompactOp struct {
revision int64
physical bool
}
// CompactOption configures compact operation.
type CompactOption func(*CompactOp)
func (op *CompactOp) applyCompactOpts(opts []CompactOption) {
for _, opt := range opts {
opt(op)
}
}
// OpCompact wraps slice CompactOption to create a CompactOp.
func OpCompact(rev int64, opts ...CompactOption) CompactOp {
ret := CompactOp{revision: rev}
ret.applyCompactOpts(opts)
return ret
}
func (op CompactOp) toRequest() *pb.CompactionRequest {
return &pb.CompactionRequest{Revision: op.revision, Physical: op.physical}
}
// WithCompactPhysical makes compact RPC call wait until
// the compaction is physically applied to the local database
// such that compacted entries are totally removed from the
// backend database.
func WithCompactPhysical() CompactOption {
return func(op *CompactOp) { op.physical = true }
}

View File

@@ -0,0 +1,30 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
"reflect"
"testing"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
)
func TestCompactOp(t *testing.T) {
req1 := OpCompact(100, WithCompactPhysical()).toRequest()
req2 := &etcdserverpb.CompactionRequest{Revision: 100, Physical: true}
if !reflect.DeepEqual(req1, req2) {
t.Fatalf("expected %+v, got %+v", req2, req1)
}
}

View File

@@ -47,7 +47,7 @@ type KV interface {
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
// Compact compacts etcd KV history before the given rev.
Compact(ctx context.Context, rev int64) error
Compact(ctx context.Context, rev int64, opts ...CompactOption) error
// Do applies a single Op on KV without a transaction.
// Do is useful when declaring operations to be issued at a later time
@@ -98,8 +98,8 @@ func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*Delete
return r.del, toErr(ctx, err)
}
func (kv *kv) Compact(ctx context.Context, rev int64) error {
if _, err := kv.remote.Compact(ctx, &pb.CompactionRequest{Revision: rev}); err != nil {
func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) error {
if _, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest()); err != nil {
return toErr(ctx, err)
}
return nil

View File

@@ -20,10 +20,12 @@ import (
"testing"
)
func TestCtlV3Compact(t *testing.T) { testCtl(t, compactTest) }
func TestCtlV3Compact(t *testing.T) { testCtl(t, compactTest) }
func TestCtlV3CompactPhysical(t *testing.T) { testCtl(t, compactTest, withCompactPhysical()) }
func compactTest(cx ctlCtx) {
if err := ctlV3Compact(cx, 2); err != nil {
compactPhysical := cx.compactPhysical
if err := ctlV3Compact(cx, 2, compactPhysical); err != nil {
if !strings.Contains(err.Error(), "required revision is a future revision") {
cx.t.Fatal(err)
}
@@ -42,7 +44,7 @@ func compactTest(cx ctlCtx) {
cx.t.Errorf("compactTest: ctlV3Get error (%v)", err)
}
if err := ctlV3Compact(cx, 4); err != nil {
if err := ctlV3Compact(cx, 4, compactPhysical); err != nil {
cx.t.Fatal(err)
}
@@ -54,7 +56,7 @@ func compactTest(cx ctlCtx) {
cx.t.Fatalf("expected '...has been compacted' error, got <nil>")
}
if err := ctlV3Compact(cx, 2); err != nil {
if err := ctlV3Compact(cx, 2, compactPhysical); err != nil {
if !strings.Contains(err.Error(), "required revision has been compacted") {
cx.t.Fatal(err)
}
@@ -63,8 +65,11 @@ func compactTest(cx ctlCtx) {
}
}
func ctlV3Compact(cx ctlCtx, rev int64) error {
func ctlV3Compact(cx ctlCtx, rev int64, physical bool) error {
rs := strconv.FormatInt(rev, 10)
cmdArgs := append(cx.PrefixArgs(), "compact", rs)
if physical {
cmdArgs = append(cmdArgs, "--physical")
}
return spawnWithExpect(cmdArgs, "compacted revision "+rs)
}

View File

@@ -26,7 +26,7 @@ func defragTest(cx ctlCtx) {
}
}
if err := ctlV3Compact(cx, 4); err != nil {
if err := ctlV3Compact(cx, 4, cx.compactPhysical); err != nil {
cx.t.Fatal(err)
}

View File

@@ -51,6 +51,9 @@ type ctlCtx struct {
user string
pass string
// for compaction
compactPhysical bool
}
type ctlOption func(*ctlCtx)
@@ -81,6 +84,10 @@ func withQuota(b int64) ctlOption {
return func(cx *ctlCtx) { cx.quotaBackendBytes = b }
}
func withCompactPhysical() ctlOption {
return func(cx *ctlCtx) { cx.compactPhysical = true }
}
func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
defer testutil.AfterTest(t)

View File

@@ -18,16 +18,21 @@ import (
"fmt"
"strconv"
"github.com/coreos/etcd/clientv3"
"github.com/spf13/cobra"
)
var compactPhysical bool
// NewCompactionCommand returns the cobra command for "compaction".
func NewCompactionCommand() *cobra.Command {
return &cobra.Command{
cmd := &cobra.Command{
Use: "compaction <revision>",
Short: "Compaction compacts the event history in etcd.",
Run: compactionCommandFunc,
}
cmd.Flags().BoolVar(&compactPhysical, "physical", false, "'true' to wait for compaction to physically remove all old revisions.")
return cmd
}
// compactionCommandFunc executes the "compaction" command.
@@ -41,9 +46,14 @@ func compactionCommandFunc(cmd *cobra.Command, args []string) {
ExitWithError(ExitError, err)
}
var opts []clientv3.CompactOption
if compactPhysical {
opts = append(opts, clientv3.WithCompactPhysical())
}
c := mustClientFromCmd(cmd)
ctx, cancel := commandCtx(cmd)
cerr := c.Compact(ctx, rev)
cerr := c.Compact(ctx, rev, opts...)
cancel()
if cerr != nil {
ExitWithError(ExitError, cerr)