mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: --enable-v2 and --enable-v2v3 is decomissioned
This commit is contained in:
parent
e0a0fdc984
commit
ee5ef42c5c
@ -1 +0,0 @@
|
||||
../../tests/integration/client/examples/example_keys_test.go
|
@ -69,9 +69,6 @@ initial-cluster-state: 'new'
|
||||
# Reject reconfiguration requests that would cause quorum loss.
|
||||
strict-reconfig-check: false
|
||||
|
||||
# Accept etcd V2 client requests
|
||||
enable-v2: true
|
||||
|
||||
# Enable runtime profiling data via HTTP server
|
||||
enable-pprof: true
|
||||
|
||||
|
@ -86,9 +86,6 @@ const (
|
||||
// DefaultStrictReconfigCheck is the default value for "--strict-reconfig-check" flag.
|
||||
// It's enabled by default.
|
||||
DefaultStrictReconfigCheck = true
|
||||
// DefaultEnableV2 is the default value for "--enable-v2" flag.
|
||||
// v2 API is disabled by default.
|
||||
DefaultEnableV2 = false
|
||||
|
||||
// maxElectionMs specifies the maximum value of election timeout.
|
||||
// More details are listed in ../Documentation/tuning.md#time-parameters.
|
||||
@ -224,11 +221,6 @@ type Config struct {
|
||||
InitialClusterToken string `json:"initial-cluster-token"`
|
||||
StrictReconfigCheck bool `json:"strict-reconfig-check"`
|
||||
|
||||
// EnableV2 exposes the deprecated V2 API surface.
|
||||
// TODO: Delete in 3.6 (https://github.com/etcd-io/etcd/issues/12913)
|
||||
// Deprecated in 3.5.
|
||||
EnableV2 bool `json:"enable-v2"`
|
||||
|
||||
// AutoCompactionMode is either 'periodic' or 'revision'.
|
||||
AutoCompactionMode string `json:"auto-compaction-mode"`
|
||||
// AutoCompactionRetention is either duration string with time unit
|
||||
@ -311,10 +303,6 @@ type Config struct {
|
||||
|
||||
ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
|
||||
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
|
||||
// ExperimentalEnableV2V3 configures URLs that expose deprecated V2 API working on V3 store.
|
||||
// Deprecated in v3.5.
|
||||
// TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913)
|
||||
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
|
||||
// ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
|
||||
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
|
||||
// ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
|
||||
@ -488,7 +476,6 @@ func NewConfig() *Config {
|
||||
|
||||
StrictReconfigCheck: DefaultStrictReconfigCheck,
|
||||
Metrics: "basic",
|
||||
EnableV2: DefaultEnableV2,
|
||||
|
||||
CORS: map[string]struct{}{"*": {}},
|
||||
HostWhitelist: map[string]struct{}{"*": {}},
|
||||
|
@ -38,9 +38,6 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/etcdserver"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2http"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2v3"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
|
||||
"go.etcd.io/etcd/server/v3/storage"
|
||||
"go.etcd.io/etcd/server/v3/verify"
|
||||
@ -695,25 +692,9 @@ func (e *Etcd) serveClients() (err error) {
|
||||
}
|
||||
|
||||
// Start a client server goroutine for each listen address
|
||||
var h http.Handler
|
||||
if e.Config().EnableV2 {
|
||||
if e.Config().V2DeprecationEffective().IsAtLeast(config.V2_DEPR_1_WRITE_ONLY) {
|
||||
return fmt.Errorf("--enable-v2 and --v2-deprecation=%s are mutually exclusive", e.Config().V2DeprecationEffective())
|
||||
}
|
||||
e.cfg.logger.Warn("Flag `enable-v2` is deprecated and will get removed in etcd 3.6.")
|
||||
if len(e.Config().ExperimentalEnableV2V3) > 0 {
|
||||
e.cfg.logger.Warn("Flag `experimental-enable-v2v3` is deprecated and will get removed in etcd 3.6.")
|
||||
srv := v2v3.NewServer(e.cfg.logger, v3client.New(e.Server), e.cfg.ExperimentalEnableV2V3)
|
||||
h = v2http.NewClientHandler(e.GetLogger(), srv, e.Server.Cfg.ReqTimeout())
|
||||
} else {
|
||||
h = v2http.NewClientHandler(e.GetLogger(), e.Server, e.Server.Cfg.ReqTimeout())
|
||||
}
|
||||
} else {
|
||||
mux := http.NewServeMux()
|
||||
etcdhttp.HandleBasic(e.cfg.logger, mux, e.Server)
|
||||
etcdhttp.HandleMetricsHealthForV3(e.cfg.logger, mux, e.Server)
|
||||
h = mux
|
||||
}
|
||||
mux := http.NewServeMux()
|
||||
etcdhttp.HandleBasic(e.cfg.logger, mux, e.Server)
|
||||
etcdhttp.HandleMetricsHealthForV3(e.cfg.logger, mux, e.Server)
|
||||
|
||||
gopts := []grpc.ServerOption{}
|
||||
if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
|
||||
@ -733,7 +714,7 @@ func (e *Etcd) serveClients() (err error) {
|
||||
// start client servers in each goroutine
|
||||
for _, sctx := range e.sctxs {
|
||||
go func(s *serveCtx) {
|
||||
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...))
|
||||
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, gopts...))
|
||||
}(sctx)
|
||||
}
|
||||
return nil
|
||||
|
@ -200,8 +200,6 @@ func newConfig() *config {
|
||||
|
||||
fs.BoolVar(&cfg.ec.PreVote, "pre-vote", cfg.ec.PreVote, "Enable to run an additional Raft election phase.")
|
||||
|
||||
fs.BoolVar(&cfg.ec.EnableV2, "enable-v2", cfg.ec.EnableV2, "Accept etcd V2 client requests. Deprecated in v3.5. Will be decommission in v3.6.")
|
||||
fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state. Deprecated in 3.5. Will be decommissioned in 3.6.")
|
||||
fs.Var(cfg.cf.v2deprecation, "v2-deprecation", fmt.Sprintf("v2store deprecation stage: %q. ", cfg.cf.proxy.Valids()))
|
||||
|
||||
// proxy
|
||||
|
@ -123,8 +123,6 @@ Clustering:
|
||||
Auto compaction retention length. 0 means disable auto compaction.
|
||||
--auto-compaction-mode 'periodic'
|
||||
Interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.
|
||||
--enable-v2 '` + strconv.FormatBool(embed.DefaultEnableV2) + `'
|
||||
Accept etcd V2 client requests. Deprecated and to be decommissioned in v3.6.
|
||||
--v2-deprecation '` + string(cconfig.V2_DEPR_DEFAULT) + `'
|
||||
Phase of v2store deprecation. Allows to opt-in for higher compatibility mode.
|
||||
Supported values:
|
||||
@ -234,8 +232,6 @@ Experimental feature:
|
||||
Enable to check data corruption before serving any client/peer traffic.
|
||||
--experimental-corrupt-check-time '0s'
|
||||
Duration of time between cluster corruption check passes.
|
||||
--experimental-enable-v2v3 ''
|
||||
Serve v2 requests through the v3 backend under a given prefix. Deprecated and to be decommissioned in v3.6.
|
||||
--experimental-enable-lease-checkpoint 'false'
|
||||
ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
|
||||
--experimental-compaction-batch-limit 1000
|
||||
|
@ -1,31 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package v2v3
|
||||
|
||||
import (
|
||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
)
|
||||
|
||||
func (s *v2v3Server) ID() types.ID {
|
||||
// TODO: use an actual member ID
|
||||
return types.ID(0xe7cd2f00d)
|
||||
}
|
||||
func (s *v2v3Server) ClientURLs() []string { panic("STUB") }
|
||||
func (s *v2v3Server) Members() []*membership.Member { panic("STUB") }
|
||||
func (s *v2v3Server) Member(id types.ID) *membership.Member { panic("STUB") }
|
||||
func (s *v2v3Server) Version() *semver.Version { panic("STUB") }
|
@ -1,16 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package v2v3 provides a ServerV2 implementation backed by clientv3.Client.
|
||||
package v2v3
|
@ -1,130 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package v2v3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type fakeStats struct{}
|
||||
|
||||
func (s *fakeStats) SelfStats() []byte { return nil }
|
||||
func (s *fakeStats) LeaderStats() []byte { return nil }
|
||||
func (s *fakeStats) StoreStats() []byte { return nil }
|
||||
|
||||
type v2v3Server struct {
|
||||
lg *zap.Logger
|
||||
c *clientv3.Client
|
||||
store *v2v3Store
|
||||
fakeStats
|
||||
}
|
||||
|
||||
func NewServer(lg *zap.Logger, c *clientv3.Client, pfx string) etcdserver.ServerPeer {
|
||||
return &v2v3Server{lg: lg, c: c, store: newStore(c, pfx)}
|
||||
}
|
||||
|
||||
func (s *v2v3Server) ClientCertAuthEnabled() bool { return false }
|
||||
|
||||
func (s *v2v3Server) LeaseHandler() http.Handler { panic("STUB: lease handler") }
|
||||
func (s *v2v3Server) RaftHandler() http.Handler { panic("STUB: raft handler") }
|
||||
|
||||
func (s *v2v3Server) Leader() types.ID {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||
defer cancel()
|
||||
resp, err := s.c.Status(ctx, s.c.Endpoints()[0])
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return types.ID(resp.Leader)
|
||||
}
|
||||
|
||||
func (s *v2v3Server) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
|
||||
// adding member as learner is not supported by V2 Server.
|
||||
resp, err := s.c.MemberAdd(ctx, memb.PeerURLs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v3MembersToMembership(resp.Members), nil
|
||||
}
|
||||
|
||||
func (s *v2v3Server) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
|
||||
resp, err := s.c.MemberRemove(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v3MembersToMembership(resp.Members), nil
|
||||
}
|
||||
|
||||
func (s *v2v3Server) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
|
||||
resp, err := s.c.MemberPromote(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v3MembersToMembership(resp.Members), nil
|
||||
}
|
||||
|
||||
func (s *v2v3Server) UpdateMember(ctx context.Context, m membership.Member) ([]*membership.Member, error) {
|
||||
resp, err := s.c.MemberUpdate(ctx, uint64(m.ID), m.PeerURLs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v3MembersToMembership(resp.Members), nil
|
||||
}
|
||||
|
||||
func v3MembersToMembership(v3membs []*pb.Member) []*membership.Member {
|
||||
membs := make([]*membership.Member, len(v3membs))
|
||||
for i, m := range v3membs {
|
||||
membs[i] = &membership.Member{
|
||||
ID: types.ID(m.ID),
|
||||
RaftAttributes: membership.RaftAttributes{
|
||||
PeerURLs: m.PeerURLs,
|
||||
IsLearner: m.IsLearner,
|
||||
},
|
||||
Attributes: membership.Attributes{
|
||||
Name: m.Name,
|
||||
ClientURLs: m.ClientURLs,
|
||||
},
|
||||
}
|
||||
}
|
||||
return membs
|
||||
}
|
||||
|
||||
func (s *v2v3Server) ClusterVersion() *semver.Version { return s.Version() }
|
||||
func (s *v2v3Server) Cluster() api.Cluster { return s }
|
||||
func (s *v2v3Server) Alarms() []*pb.AlarmMember { return nil }
|
||||
func (s *v2v3Server) LeaderChangedNotify() <-chan struct{} { return nil }
|
||||
|
||||
func (s *v2v3Server) Do(ctx context.Context, r pb.Request) (etcdserver.Response, error) {
|
||||
applier := etcdserver.NewApplierV2(s.lg, s.store, nil)
|
||||
reqHandler := etcdserver.NewStoreRequestV2Handler(s.store, applier)
|
||||
req := (*etcdserver.RequestV2)(&r)
|
||||
resp, err := req.Handle(ctx, reqHandler)
|
||||
if resp.Err != nil {
|
||||
return resp, resp.Err
|
||||
}
|
||||
return resp, err
|
||||
}
|
@ -1,638 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package v2v3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/client/v3/concurrency"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
)
|
||||
|
||||
// store implements the Store interface for V2 using
|
||||
// a v3 client.
|
||||
type v2v3Store struct {
|
||||
c *clientv3.Client
|
||||
// pfx is the v3 prefix where keys should be stored.
|
||||
pfx string
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
const maxPathDepth = 63
|
||||
|
||||
var errUnsupported = fmt.Errorf("TTLs are unsupported")
|
||||
|
||||
func NewStore(c *clientv3.Client, pfx string) v2store.Store { return newStore(c, pfx) }
|
||||
|
||||
func newStore(c *clientv3.Client, pfx string) *v2v3Store { return &v2v3Store{c, pfx, c.Ctx()} }
|
||||
|
||||
func (s *v2v3Store) Index() uint64 { panic("STUB") }
|
||||
|
||||
func (s *v2v3Store) Get(nodePath string, recursive, sorted bool) (*v2store.Event, error) {
|
||||
key := s.mkPath(nodePath)
|
||||
resp, err := s.c.Txn(s.ctx).Then(
|
||||
clientv3.OpGet(key+"/"),
|
||||
clientv3.OpGet(key),
|
||||
).Commit()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if kvs := resp.Responses[0].GetResponseRange().Kvs; len(kvs) != 0 || isRoot(nodePath) {
|
||||
nodes, err := s.getDir(nodePath, recursive, sorted, resp.Header.Revision)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cidx, midx := uint64(0), uint64(0)
|
||||
if len(kvs) > 0 {
|
||||
cidx, midx = mkV2Rev(kvs[0].CreateRevision), mkV2Rev(kvs[0].ModRevision)
|
||||
}
|
||||
return &v2store.Event{
|
||||
Action: v2store.Get,
|
||||
Node: &v2store.NodeExtern{
|
||||
Key: nodePath,
|
||||
Dir: true,
|
||||
Nodes: nodes,
|
||||
CreatedIndex: cidx,
|
||||
ModifiedIndex: midx,
|
||||
},
|
||||
EtcdIndex: mkV2Rev(resp.Header.Revision),
|
||||
}, nil
|
||||
}
|
||||
|
||||
kvs := resp.Responses[1].GetResponseRange().Kvs
|
||||
if len(kvs) == 0 {
|
||||
return nil, v2error.NewError(v2error.EcodeKeyNotFound, nodePath, mkV2Rev(resp.Header.Revision))
|
||||
}
|
||||
|
||||
return &v2store.Event{
|
||||
Action: v2store.Get,
|
||||
Node: s.mkV2Node(kvs[0]),
|
||||
EtcdIndex: mkV2Rev(resp.Header.Revision),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *v2v3Store) getDir(nodePath string, recursive, sorted bool, rev int64) ([]*v2store.NodeExtern, error) {
|
||||
rootNodes, err := s.getDirDepth(nodePath, 1, rev)
|
||||
if err != nil || !recursive {
|
||||
if sorted {
|
||||
sort.Sort(v2store.NodeExterns(rootNodes))
|
||||
}
|
||||
return rootNodes, err
|
||||
}
|
||||
nextNodes := rootNodes
|
||||
nodes := make(map[string]*v2store.NodeExtern)
|
||||
// Breadth walk the subdirectories
|
||||
for i := 2; len(nextNodes) > 0; i++ {
|
||||
for _, n := range nextNodes {
|
||||
nodes[n.Key] = n
|
||||
if parent := nodes[path.Dir(n.Key)]; parent != nil {
|
||||
parent.Nodes = append(parent.Nodes, n)
|
||||
}
|
||||
}
|
||||
if nextNodes, err = s.getDirDepth(nodePath, i, rev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if sorted {
|
||||
sort.Sort(v2store.NodeExterns(rootNodes))
|
||||
}
|
||||
return rootNodes, nil
|
||||
}
|
||||
|
||||
func (s *v2v3Store) getDirDepth(nodePath string, depth int, rev int64) ([]*v2store.NodeExtern, error) {
|
||||
pd := s.mkPathDepth(nodePath, depth)
|
||||
resp, err := s.c.Get(s.ctx, pd, clientv3.WithPrefix(), clientv3.WithRev(rev))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodes := make([]*v2store.NodeExtern, len(resp.Kvs))
|
||||
for i, kv := range resp.Kvs {
|
||||
nodes[i] = s.mkV2Node(kv)
|
||||
}
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
func (s *v2v3Store) Set(
|
||||
nodePath string,
|
||||
dir bool,
|
||||
value string,
|
||||
expireOpts v2store.TTLOptionSet,
|
||||
) (*v2store.Event, error) {
|
||||
if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
|
||||
return nil, errUnsupported
|
||||
}
|
||||
|
||||
if isRoot(nodePath) {
|
||||
return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
|
||||
}
|
||||
|
||||
ecode := 0
|
||||
applyf := func(stm concurrency.STM) error {
|
||||
// build path if any directories in path do not exist
|
||||
dirs := []string{}
|
||||
for p := path.Dir(nodePath); !isRoot(p); p = path.Dir(p) {
|
||||
pp := s.mkPath(p)
|
||||
if stm.Rev(pp) > 0 {
|
||||
ecode = v2error.EcodeNotDir
|
||||
return nil
|
||||
}
|
||||
if stm.Rev(pp+"/") == 0 {
|
||||
dirs = append(dirs, pp+"/")
|
||||
}
|
||||
}
|
||||
for _, d := range dirs {
|
||||
stm.Put(d, "")
|
||||
}
|
||||
|
||||
key := s.mkPath(nodePath)
|
||||
if dir {
|
||||
if stm.Rev(key) != 0 {
|
||||
// exists as non-dir
|
||||
ecode = v2error.EcodeNotDir
|
||||
return nil
|
||||
}
|
||||
key = key + "/"
|
||||
} else if stm.Rev(key+"/") != 0 {
|
||||
ecode = v2error.EcodeNotFile
|
||||
return nil
|
||||
}
|
||||
stm.Put(key, value, clientv3.WithPrevKV())
|
||||
stm.Put(s.mkActionKey(), v2store.Set)
|
||||
return nil
|
||||
}
|
||||
|
||||
resp, err := s.newSTM(applyf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ecode != 0 {
|
||||
return nil, v2error.NewError(ecode, nodePath, mkV2Rev(resp.Header.Revision))
|
||||
}
|
||||
|
||||
createRev := resp.Header.Revision
|
||||
var pn *v2store.NodeExtern
|
||||
if pkv := prevKeyFromPuts(resp); pkv != nil {
|
||||
pn = s.mkV2Node(pkv)
|
||||
createRev = pkv.CreateRevision
|
||||
}
|
||||
|
||||
vp := &value
|
||||
if dir {
|
||||
vp = nil
|
||||
}
|
||||
return &v2store.Event{
|
||||
Action: v2store.Set,
|
||||
Node: &v2store.NodeExtern{
|
||||
Key: nodePath,
|
||||
Value: vp,
|
||||
Dir: dir,
|
||||
ModifiedIndex: mkV2Rev(resp.Header.Revision),
|
||||
CreatedIndex: mkV2Rev(createRev),
|
||||
},
|
||||
PrevNode: pn,
|
||||
EtcdIndex: mkV2Rev(resp.Header.Revision),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *v2v3Store) Update(nodePath, newValue string, expireOpts v2store.TTLOptionSet) (*v2store.Event, error) {
|
||||
if isRoot(nodePath) {
|
||||
return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
|
||||
}
|
||||
|
||||
if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
|
||||
return nil, errUnsupported
|
||||
}
|
||||
|
||||
key := s.mkPath(nodePath)
|
||||
ecode := 0
|
||||
applyf := func(stm concurrency.STM) error {
|
||||
if rev := stm.Rev(key + "/"); rev != 0 {
|
||||
ecode = v2error.EcodeNotFile
|
||||
return nil
|
||||
}
|
||||
if rev := stm.Rev(key); rev == 0 {
|
||||
ecode = v2error.EcodeKeyNotFound
|
||||
return nil
|
||||
}
|
||||
stm.Put(key, newValue, clientv3.WithPrevKV())
|
||||
stm.Put(s.mkActionKey(), v2store.Update)
|
||||
return nil
|
||||
}
|
||||
|
||||
resp, err := s.newSTM(applyf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ecode != 0 {
|
||||
return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, mkV2Rev(resp.Header.Revision))
|
||||
}
|
||||
|
||||
pkv := prevKeyFromPuts(resp)
|
||||
return &v2store.Event{
|
||||
Action: v2store.Update,
|
||||
Node: &v2store.NodeExtern{
|
||||
Key: nodePath,
|
||||
Value: &newValue,
|
||||
ModifiedIndex: mkV2Rev(resp.Header.Revision),
|
||||
CreatedIndex: mkV2Rev(pkv.CreateRevision),
|
||||
},
|
||||
PrevNode: s.mkV2Node(pkv),
|
||||
EtcdIndex: mkV2Rev(resp.Header.Revision),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *v2v3Store) Create(
|
||||
nodePath string,
|
||||
dir bool,
|
||||
value string,
|
||||
unique bool,
|
||||
expireOpts v2store.TTLOptionSet,
|
||||
) (*v2store.Event, error) {
|
||||
if isRoot(nodePath) {
|
||||
return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
|
||||
}
|
||||
if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
|
||||
return nil, errUnsupported
|
||||
}
|
||||
ecode := 0
|
||||
applyf := func(stm concurrency.STM) error {
|
||||
ecode = 0
|
||||
key := s.mkPath(nodePath)
|
||||
if unique {
|
||||
// append unique item under the node path
|
||||
for {
|
||||
key = nodePath + "/" + fmt.Sprintf("%020s", time.Now())
|
||||
key = path.Clean(path.Join("/", key))
|
||||
key = s.mkPath(key)
|
||||
if stm.Rev(key) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if stm.Rev(key) > 0 || stm.Rev(key+"/") > 0 {
|
||||
ecode = v2error.EcodeNodeExist
|
||||
return nil
|
||||
}
|
||||
// build path if any directories in path do not exist
|
||||
dirs := []string{}
|
||||
for p := path.Dir(nodePath); !isRoot(p); p = path.Dir(p) {
|
||||
pp := s.mkPath(p)
|
||||
if stm.Rev(pp) > 0 {
|
||||
ecode = v2error.EcodeNotDir
|
||||
return nil
|
||||
}
|
||||
if stm.Rev(pp+"/") == 0 {
|
||||
dirs = append(dirs, pp+"/")
|
||||
}
|
||||
}
|
||||
for _, d := range dirs {
|
||||
stm.Put(d, "")
|
||||
}
|
||||
|
||||
if dir {
|
||||
// directories marked with extra slash in key name
|
||||
key += "/"
|
||||
}
|
||||
stm.Put(key, value)
|
||||
stm.Put(s.mkActionKey(), v2store.Create)
|
||||
return nil
|
||||
}
|
||||
|
||||
resp, err := s.newSTM(applyf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ecode != 0 {
|
||||
return nil, v2error.NewError(ecode, nodePath, mkV2Rev(resp.Header.Revision))
|
||||
}
|
||||
|
||||
var v *string
|
||||
if !dir {
|
||||
v = &value
|
||||
}
|
||||
|
||||
return &v2store.Event{
|
||||
Action: v2store.Create,
|
||||
Node: &v2store.NodeExtern{
|
||||
Key: nodePath,
|
||||
Value: v,
|
||||
Dir: dir,
|
||||
ModifiedIndex: mkV2Rev(resp.Header.Revision),
|
||||
CreatedIndex: mkV2Rev(resp.Header.Revision),
|
||||
},
|
||||
EtcdIndex: mkV2Rev(resp.Header.Revision),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *v2v3Store) CompareAndSwap(
|
||||
nodePath string,
|
||||
prevValue string,
|
||||
prevIndex uint64,
|
||||
value string,
|
||||
expireOpts v2store.TTLOptionSet,
|
||||
) (*v2store.Event, error) {
|
||||
if isRoot(nodePath) {
|
||||
return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
|
||||
}
|
||||
if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
|
||||
return nil, errUnsupported
|
||||
}
|
||||
|
||||
key := s.mkPath(nodePath)
|
||||
resp, err := s.c.Txn(s.ctx).If(
|
||||
s.mkCompare(nodePath, prevValue, prevIndex)...,
|
||||
).Then(
|
||||
clientv3.OpPut(key, value, clientv3.WithPrevKV()),
|
||||
clientv3.OpPut(s.mkActionKey(), v2store.CompareAndSwap),
|
||||
).Else(
|
||||
clientv3.OpGet(key),
|
||||
clientv3.OpGet(key+"/"),
|
||||
).Commit()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !resp.Succeeded {
|
||||
return nil, compareFail(nodePath, prevValue, prevIndex, resp)
|
||||
}
|
||||
|
||||
pkv := resp.Responses[0].GetResponsePut().PrevKv
|
||||
return &v2store.Event{
|
||||
Action: v2store.CompareAndSwap,
|
||||
Node: &v2store.NodeExtern{
|
||||
Key: nodePath,
|
||||
Value: &value,
|
||||
CreatedIndex: mkV2Rev(pkv.CreateRevision),
|
||||
ModifiedIndex: mkV2Rev(resp.Header.Revision),
|
||||
},
|
||||
PrevNode: s.mkV2Node(pkv),
|
||||
EtcdIndex: mkV2Rev(resp.Header.Revision),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *v2v3Store) Delete(nodePath string, dir, recursive bool) (*v2store.Event, error) {
|
||||
if isRoot(nodePath) {
|
||||
return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
|
||||
}
|
||||
if !dir && !recursive {
|
||||
return s.deleteNode(nodePath)
|
||||
}
|
||||
if !recursive {
|
||||
return s.deleteEmptyDir(nodePath)
|
||||
}
|
||||
|
||||
dels := make([]clientv3.Op, maxPathDepth+1)
|
||||
dels[0] = clientv3.OpDelete(s.mkPath(nodePath)+"/", clientv3.WithPrevKV())
|
||||
for i := 1; i < maxPathDepth; i++ {
|
||||
dels[i] = clientv3.OpDelete(s.mkPathDepth(nodePath, i), clientv3.WithPrefix())
|
||||
}
|
||||
dels[maxPathDepth] = clientv3.OpPut(s.mkActionKey(), v2store.Delete)
|
||||
|
||||
resp, err := s.c.Txn(s.ctx).If(
|
||||
clientv3.Compare(clientv3.Version(s.mkPath(nodePath)+"/"), ">", 0),
|
||||
clientv3.Compare(clientv3.Version(s.mkPathDepth(nodePath, maxPathDepth)+"/"), "=", 0),
|
||||
).Then(
|
||||
dels...,
|
||||
).Commit()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !resp.Succeeded {
|
||||
return nil, v2error.NewError(v2error.EcodeNodeExist, nodePath, mkV2Rev(resp.Header.Revision))
|
||||
}
|
||||
dresp := resp.Responses[0].GetResponseDeleteRange()
|
||||
return &v2store.Event{
|
||||
Action: v2store.Delete,
|
||||
PrevNode: s.mkV2Node(dresp.PrevKvs[0]),
|
||||
EtcdIndex: mkV2Rev(resp.Header.Revision),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *v2v3Store) deleteEmptyDir(nodePath string) (*v2store.Event, error) {
|
||||
resp, err := s.c.Txn(s.ctx).If(
|
||||
clientv3.Compare(clientv3.Version(s.mkPathDepth(nodePath, 1)), "=", 0).WithPrefix(),
|
||||
).Then(
|
||||
clientv3.OpDelete(s.mkPath(nodePath)+"/", clientv3.WithPrevKV()),
|
||||
clientv3.OpPut(s.mkActionKey(), v2store.Delete),
|
||||
).Commit()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !resp.Succeeded {
|
||||
return nil, v2error.NewError(v2error.EcodeDirNotEmpty, nodePath, mkV2Rev(resp.Header.Revision))
|
||||
}
|
||||
dresp := resp.Responses[0].GetResponseDeleteRange()
|
||||
if len(dresp.PrevKvs) == 0 {
|
||||
return nil, v2error.NewError(v2error.EcodeNodeExist, nodePath, mkV2Rev(resp.Header.Revision))
|
||||
}
|
||||
return &v2store.Event{
|
||||
Action: v2store.Delete,
|
||||
PrevNode: s.mkV2Node(dresp.PrevKvs[0]),
|
||||
EtcdIndex: mkV2Rev(resp.Header.Revision),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *v2v3Store) deleteNode(nodePath string) (*v2store.Event, error) {
|
||||
resp, err := s.c.Txn(s.ctx).If(
|
||||
clientv3.Compare(clientv3.Version(s.mkPath(nodePath)+"/"), "=", 0),
|
||||
).Then(
|
||||
clientv3.OpDelete(s.mkPath(nodePath), clientv3.WithPrevKV()),
|
||||
clientv3.OpPut(s.mkActionKey(), v2store.Delete),
|
||||
).Commit()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !resp.Succeeded {
|
||||
return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, mkV2Rev(resp.Header.Revision))
|
||||
}
|
||||
pkvs := resp.Responses[0].GetResponseDeleteRange().PrevKvs
|
||||
if len(pkvs) == 0 {
|
||||
return nil, v2error.NewError(v2error.EcodeKeyNotFound, nodePath, mkV2Rev(resp.Header.Revision))
|
||||
}
|
||||
pkv := pkvs[0]
|
||||
return &v2store.Event{
|
||||
Action: v2store.Delete,
|
||||
Node: &v2store.NodeExtern{
|
||||
Key: nodePath,
|
||||
CreatedIndex: mkV2Rev(pkv.CreateRevision),
|
||||
ModifiedIndex: mkV2Rev(resp.Header.Revision),
|
||||
},
|
||||
PrevNode: s.mkV2Node(pkv),
|
||||
EtcdIndex: mkV2Rev(resp.Header.Revision),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *v2v3Store) CompareAndDelete(nodePath, prevValue string, prevIndex uint64) (*v2store.Event, error) {
|
||||
if isRoot(nodePath) {
|
||||
return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
|
||||
}
|
||||
|
||||
key := s.mkPath(nodePath)
|
||||
resp, err := s.c.Txn(s.ctx).If(
|
||||
s.mkCompare(nodePath, prevValue, prevIndex)...,
|
||||
).Then(
|
||||
clientv3.OpDelete(key, clientv3.WithPrevKV()),
|
||||
clientv3.OpPut(s.mkActionKey(), v2store.CompareAndDelete),
|
||||
).Else(
|
||||
clientv3.OpGet(key),
|
||||
clientv3.OpGet(key+"/"),
|
||||
).Commit()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !resp.Succeeded {
|
||||
return nil, compareFail(nodePath, prevValue, prevIndex, resp)
|
||||
}
|
||||
|
||||
// len(pkvs) > 1 since txn only succeeds when key exists
|
||||
pkv := resp.Responses[0].GetResponseDeleteRange().PrevKvs[0]
|
||||
return &v2store.Event{
|
||||
Action: v2store.CompareAndDelete,
|
||||
Node: &v2store.NodeExtern{
|
||||
Key: nodePath,
|
||||
CreatedIndex: mkV2Rev(pkv.CreateRevision),
|
||||
ModifiedIndex: mkV2Rev(resp.Header.Revision),
|
||||
},
|
||||
PrevNode: s.mkV2Node(pkv),
|
||||
EtcdIndex: mkV2Rev(resp.Header.Revision),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func compareFail(nodePath, prevValue string, prevIndex uint64, resp *clientv3.TxnResponse) error {
|
||||
if dkvs := resp.Responses[1].GetResponseRange().Kvs; len(dkvs) > 0 {
|
||||
return v2error.NewError(v2error.EcodeNotFile, nodePath, mkV2Rev(resp.Header.Revision))
|
||||
}
|
||||
kvs := resp.Responses[0].GetResponseRange().Kvs
|
||||
if len(kvs) == 0 {
|
||||
return v2error.NewError(v2error.EcodeKeyNotFound, nodePath, mkV2Rev(resp.Header.Revision))
|
||||
}
|
||||
kv := kvs[0]
|
||||
indexMatch := prevIndex == 0 || kv.ModRevision == int64(prevIndex)
|
||||
valueMatch := prevValue == "" || string(kv.Value) == prevValue
|
||||
var cause string
|
||||
switch {
|
||||
case indexMatch && !valueMatch:
|
||||
cause = fmt.Sprintf("[%v != %v]", prevValue, string(kv.Value))
|
||||
case valueMatch && !indexMatch:
|
||||
cause = fmt.Sprintf("[%v != %v]", prevIndex, kv.ModRevision)
|
||||
default:
|
||||
cause = fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, string(kv.Value), prevIndex, kv.ModRevision)
|
||||
}
|
||||
return v2error.NewError(v2error.EcodeTestFailed, cause, mkV2Rev(resp.Header.Revision))
|
||||
}
|
||||
|
||||
func (s *v2v3Store) mkCompare(nodePath, prevValue string, prevIndex uint64) []clientv3.Cmp {
|
||||
key := s.mkPath(nodePath)
|
||||
cmps := []clientv3.Cmp{clientv3.Compare(clientv3.Version(key), ">", 0)}
|
||||
if prevIndex != 0 {
|
||||
cmps = append(cmps, clientv3.Compare(clientv3.ModRevision(key), "=", mkV3Rev(prevIndex)))
|
||||
}
|
||||
if prevValue != "" {
|
||||
cmps = append(cmps, clientv3.Compare(clientv3.Value(key), "=", prevValue))
|
||||
}
|
||||
return cmps
|
||||
}
|
||||
|
||||
func (s *v2v3Store) JsonStats() []byte { panic("STUB") }
|
||||
func (s *v2v3Store) DeleteExpiredKeys(cutoff time.Time) { panic("STUB") }
|
||||
|
||||
func (s *v2v3Store) Version() int { return 2 }
|
||||
|
||||
// TODO: move this out of the Store interface?
|
||||
|
||||
func (s *v2v3Store) Save() ([]byte, error) { panic("STUB") }
|
||||
func (s *v2v3Store) Recovery(state []byte) error { panic("STUB") }
|
||||
func (s *v2v3Store) Clone() v2store.Store { panic("STUB") }
|
||||
func (s *v2v3Store) SaveNoCopy() ([]byte, error) { panic("STUB") }
|
||||
func (s *v2v3Store) HasTTLKeys() bool { panic("STUB") }
|
||||
|
||||
func (s *v2v3Store) mkPath(nodePath string) string { return s.mkPathDepth(nodePath, 0) }
|
||||
|
||||
func (s *v2v3Store) mkNodePath(p string) string {
|
||||
return path.Clean(p[len(s.pfx)+len("/k/000/"):])
|
||||
}
|
||||
|
||||
// mkPathDepth makes a path to a key that encodes its directory depth
|
||||
// for fast directory listing. If a depth is provided, it is added
|
||||
// to the computed depth.
|
||||
func (s *v2v3Store) mkPathDepth(nodePath string, depth int) string {
|
||||
normalForm := path.Clean(path.Join("/", nodePath))
|
||||
n := strings.Count(normalForm, "/") + depth
|
||||
return fmt.Sprintf("%s/%03d/k/%s", s.pfx, n, normalForm)
|
||||
}
|
||||
|
||||
func (s *v2v3Store) mkActionKey() string { return s.pfx + "/act" }
|
||||
|
||||
func isRoot(s string) bool { return len(s) == 0 || s == "/" || s == "/0" || s == "/1" }
|
||||
|
||||
func mkV2Rev(v3Rev int64) uint64 {
|
||||
if v3Rev == 0 {
|
||||
return 0
|
||||
}
|
||||
return uint64(v3Rev - 1)
|
||||
}
|
||||
|
||||
func mkV3Rev(v2Rev uint64) int64 {
|
||||
if v2Rev == 0 {
|
||||
return 0
|
||||
}
|
||||
return int64(v2Rev + 1)
|
||||
}
|
||||
|
||||
// mkV2Node creates a V2 NodeExtern from a V3 KeyValue
|
||||
func (s *v2v3Store) mkV2Node(kv *mvccpb.KeyValue) *v2store.NodeExtern {
|
||||
if kv == nil {
|
||||
return nil
|
||||
}
|
||||
n := &v2store.NodeExtern{
|
||||
Key: s.mkNodePath(string(kv.Key)),
|
||||
Dir: kv.Key[len(kv.Key)-1] == '/',
|
||||
CreatedIndex: mkV2Rev(kv.CreateRevision),
|
||||
ModifiedIndex: mkV2Rev(kv.ModRevision),
|
||||
}
|
||||
if !n.Dir {
|
||||
v := string(kv.Value)
|
||||
n.Value = &v
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// prevKeyFromPuts gets the prev key that is being put; ignores
|
||||
// the put action response.
|
||||
func prevKeyFromPuts(resp *clientv3.TxnResponse) *mvccpb.KeyValue {
|
||||
for _, r := range resp.Responses {
|
||||
pkv := r.GetResponsePut().PrevKv
|
||||
if pkv != nil && pkv.CreateRevision > 0 {
|
||||
return pkv
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *v2v3Store) newSTM(applyf func(concurrency.STM) error) (*clientv3.TxnResponse, error) {
|
||||
return concurrency.NewSTM(s.c, applyf, concurrency.WithIsolation(concurrency.Serializable))
|
||||
}
|
@ -1,142 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package v2v3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
)
|
||||
|
||||
func (s *v2v3Store) Watch(prefix string, recursive, stream bool, sinceIndex uint64) (v2store.Watcher, error) {
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
wch := s.c.Watch(
|
||||
ctx,
|
||||
// TODO: very pricey; use a single store-wide watch in future
|
||||
s.pfx,
|
||||
clientv3.WithPrefix(),
|
||||
clientv3.WithRev(int64(sinceIndex)),
|
||||
clientv3.WithCreatedNotify(),
|
||||
clientv3.WithPrevKV())
|
||||
resp, ok := <-wch
|
||||
if err := resp.Err(); err != nil || !ok {
|
||||
cancel()
|
||||
return nil, v2error.NewError(v2error.EcodeRaftInternal, prefix, 0)
|
||||
}
|
||||
|
||||
evc, donec := make(chan *v2store.Event), make(chan struct{})
|
||||
go func() {
|
||||
defer func() {
|
||||
close(evc)
|
||||
close(donec)
|
||||
}()
|
||||
for resp := range wch {
|
||||
for _, ev := range s.mkV2Events(resp) {
|
||||
k := ev.Node.Key
|
||||
if recursive {
|
||||
if !strings.HasPrefix(k, prefix) {
|
||||
continue
|
||||
}
|
||||
// accept events on hidden keys given in prefix
|
||||
k = strings.Replace(k, prefix, "/", 1)
|
||||
// ignore hidden keys deeper than prefix
|
||||
if strings.Contains(k, "/_") {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if !recursive && k != prefix {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case evc <- ev:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
if !stream {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return &v2v3Watcher{
|
||||
startRev: resp.Header.Revision,
|
||||
evc: evc,
|
||||
donec: donec,
|
||||
cancel: cancel,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *v2v3Store) mkV2Events(wr clientv3.WatchResponse) (evs []*v2store.Event) {
|
||||
ak := s.mkActionKey()
|
||||
for _, rev := range mkRevs(wr) {
|
||||
var act, key *clientv3.Event
|
||||
for _, ev := range rev {
|
||||
if string(ev.Kv.Key) == ak {
|
||||
act = ev
|
||||
} else if key != nil && len(key.Kv.Key) < len(ev.Kv.Key) {
|
||||
// use longest key to ignore intermediate new
|
||||
// directories from Create.
|
||||
key = ev
|
||||
} else if key == nil {
|
||||
key = ev
|
||||
}
|
||||
}
|
||||
if act != nil && act.Kv != nil && key != nil {
|
||||
v2ev := &v2store.Event{
|
||||
Action: string(act.Kv.Value),
|
||||
Node: s.mkV2Node(key.Kv),
|
||||
PrevNode: s.mkV2Node(key.PrevKv),
|
||||
EtcdIndex: mkV2Rev(wr.Header.Revision),
|
||||
}
|
||||
evs = append(evs, v2ev)
|
||||
}
|
||||
}
|
||||
return evs
|
||||
}
|
||||
|
||||
func mkRevs(wr clientv3.WatchResponse) (revs [][]*clientv3.Event) {
|
||||
var curRev []*clientv3.Event
|
||||
for _, ev := range wr.Events {
|
||||
if curRev != nil && ev.Kv.ModRevision != curRev[0].Kv.ModRevision {
|
||||
revs = append(revs, curRev)
|
||||
curRev = nil
|
||||
}
|
||||
curRev = append(curRev, ev)
|
||||
}
|
||||
if curRev != nil {
|
||||
revs = append(revs, curRev)
|
||||
}
|
||||
return revs
|
||||
}
|
||||
|
||||
type v2v3Watcher struct {
|
||||
startRev int64
|
||||
evc chan *v2store.Event
|
||||
donec chan struct{}
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (w *v2v3Watcher) StartIndex() uint64 { return mkV2Rev(w.startRev) }
|
||||
|
||||
func (w *v2v3Watcher) Remove() {
|
||||
w.cancel()
|
||||
<-w.donec
|
||||
}
|
||||
|
||||
func (w *v2v3Watcher) EventChan() chan *v2store.Event { return w.evc }
|
@ -1718,8 +1718,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
|
||||
// or its server is stopped.
|
||||
//
|
||||
// Use v2 store to encode member attributes, and apply through Raft
|
||||
// but does not go through v2 API endpoint, which means even with v2
|
||||
// client handler disabled (e.g. --enable-v2=false), cluster can still
|
||||
// but does not go through v2 API endpoint, which means cluster can still
|
||||
// process publish requests through rafthttp
|
||||
// TODO: Remove in 3.6 (start using publishV3)
|
||||
func (s *EtcdServer) publish(timeout time.Duration) {
|
||||
|
4
test.sh
4
test.sh
@ -100,7 +100,7 @@ function unit_pass {
|
||||
function integration_extra {
|
||||
if [ -z "${PKG}" ] ; then
|
||||
run_for_module "." go_test "./contrib/raftexample" "keep_going" : -timeout="${TIMEOUT:-5m}" "${RUN_ARG[@]}" "${COMMON_TEST_FLAGS[@]}" "$@" || return $?
|
||||
run_for_module "tests" go_test "./integration/v2store/..." "keep_going" : -tags v2v3 -timeout="${TIMEOUT:-5m}" "${RUN_ARG[@]}" "${COMMON_TEST_FLAGS[@]}" "$@" || return $?
|
||||
run_for_module "tests" go_test "./integration/v2store/..." "keep_going" : -timeout="${TIMEOUT:-5m}" "${RUN_ARG[@]}" "${COMMON_TEST_FLAGS[@]}" "$@" || return $?
|
||||
else
|
||||
log_warning "integration_extra ignored when PKG is specified"
|
||||
fi
|
||||
@ -317,7 +317,7 @@ function cov_pass {
|
||||
-timeout=30m "${gocov_build_flags[@]}" "$@" || failed="$failed integration"
|
||||
# integration-store-v2
|
||||
run_for_module "tests" go_test "./integration/v2store/..." "keep_going" "pkg_to_coverprofileflag store_v2" \
|
||||
-tags v2v3 -timeout=5m "${gocov_build_flags[@]}" "$@" || failed="$failed integration_v2v3"
|
||||
-timeout=5m "${gocov_build_flags[@]}" "$@" || failed="$failed integration_v2"
|
||||
# integration_cluster_proxy
|
||||
run_for_module "tests" go_test "./integration/..." "parallel" "pkg_to_coverprofileflag integration_cluster_proxy" \
|
||||
-tags cluster_proxy -timeout=5m "${gocov_build_flags[@]}" || failed="$failed integration_cluster_proxy"
|
||||
|
@ -1,535 +0,0 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
)
|
||||
|
||||
func BeforeTestV2(t testing.TB) {
|
||||
e2e.BeforeTest(t)
|
||||
os.Setenv("ETCDCTL_API", "2")
|
||||
t.Cleanup(func() {
|
||||
os.Unsetenv("ETCDCTL_API")
|
||||
})
|
||||
}
|
||||
|
||||
func TestCtlV2Set(t *testing.T) { testCtlV2Set(t, e2e.NewConfigNoTLS(), false) }
|
||||
func TestCtlV2SetQuorum(t *testing.T) { testCtlV2Set(t, e2e.NewConfigNoTLS(), true) }
|
||||
func TestCtlV2SetClientTLS(t *testing.T) { testCtlV2Set(t, e2e.NewConfigClientTLS(), false) }
|
||||
func TestCtlV2SetPeerTLS(t *testing.T) { testCtlV2Set(t, e2e.NewConfigPeerTLS(), false) }
|
||||
func TestCtlV2SetTLS(t *testing.T) { testCtlV2Set(t, e2e.NewConfigTLS(), false) }
|
||||
func testCtlV2Set(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, quorum bool) {
|
||||
BeforeTestV2(t)
|
||||
|
||||
cfg.EnableV2 = true
|
||||
epc := setupEtcdctlTest(t, cfg, quorum)
|
||||
defer cleanupEtcdProcessCluster(epc, t)
|
||||
|
||||
key, value := "foo", "bar"
|
||||
|
||||
if err := etcdctlSet(epc, key, value); err != nil {
|
||||
t.Fatalf("failed set (%v)", err)
|
||||
}
|
||||
|
||||
if err := etcdctlGet(epc, key, value, quorum); err != nil {
|
||||
t.Fatalf("failed get (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCtlV2Mk(t *testing.T) { testCtlV2Mk(t, e2e.NewConfigNoTLS(), false) }
|
||||
func TestCtlV2MkQuorum(t *testing.T) { testCtlV2Mk(t, e2e.NewConfigNoTLS(), true) }
|
||||
func TestCtlV2MkTLS(t *testing.T) { testCtlV2Mk(t, e2e.NewConfigTLS(), false) }
|
||||
func testCtlV2Mk(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, quorum bool) {
|
||||
BeforeTestV2(t)
|
||||
|
||||
cfg.EnableV2 = true
|
||||
epc := setupEtcdctlTest(t, cfg, quorum)
|
||||
defer cleanupEtcdProcessCluster(epc, t)
|
||||
|
||||
key, value := "foo", "bar"
|
||||
|
||||
if err := etcdctlMk(epc, key, value, true); err != nil {
|
||||
t.Fatalf("failed mk (%v)", err)
|
||||
}
|
||||
if err := etcdctlMk(epc, key, value, false); err != nil {
|
||||
t.Fatalf("failed mk (%v)", err)
|
||||
}
|
||||
|
||||
if err := etcdctlGet(epc, key, value, quorum); err != nil {
|
||||
t.Fatalf("failed get (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCtlV2Rm(t *testing.T) { testCtlV2Rm(t, e2e.NewConfigNoTLS()) }
|
||||
func TestCtlV2RmTLS(t *testing.T) { testCtlV2Rm(t, e2e.NewConfigTLS()) }
|
||||
func testCtlV2Rm(t *testing.T, cfg *e2e.EtcdProcessClusterConfig) {
|
||||
BeforeTestV2(t)
|
||||
|
||||
cfg.EnableV2 = true
|
||||
epc := setupEtcdctlTest(t, cfg, true)
|
||||
defer cleanupEtcdProcessCluster(epc, t)
|
||||
|
||||
key, value := "foo", "bar"
|
||||
|
||||
if err := etcdctlSet(epc, key, value); err != nil {
|
||||
t.Fatalf("failed set (%v)", err)
|
||||
}
|
||||
|
||||
if err := etcdctlRm(epc, key, value, true); err != nil {
|
||||
t.Fatalf("failed rm (%v)", err)
|
||||
}
|
||||
if err := etcdctlRm(epc, key, value, false); err != nil {
|
||||
t.Fatalf("failed rm (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCtlV2Ls(t *testing.T) { testCtlV2Ls(t, e2e.NewConfigNoTLS(), false) }
|
||||
func TestCtlV2LsQuorum(t *testing.T) { testCtlV2Ls(t, e2e.NewConfigNoTLS(), true) }
|
||||
func TestCtlV2LsTLS(t *testing.T) { testCtlV2Ls(t, e2e.NewConfigTLS(), false) }
|
||||
func testCtlV2Ls(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, quorum bool) {
|
||||
BeforeTestV2(t)
|
||||
|
||||
cfg.EnableV2 = true
|
||||
epc := setupEtcdctlTest(t, cfg, quorum)
|
||||
defer cleanupEtcdProcessCluster(epc, t)
|
||||
|
||||
key, value := "foo", "bar"
|
||||
|
||||
if err := etcdctlSet(epc, key, value); err != nil {
|
||||
t.Fatalf("failed set (%v)", err)
|
||||
}
|
||||
|
||||
if err := etcdctlLs(epc, key, quorum); err != nil {
|
||||
t.Fatalf("failed ls (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCtlV2Watch(t *testing.T) { testCtlV2Watch(t, e2e.NewConfigNoTLS(), false) }
|
||||
func TestCtlV2WatchTLS(t *testing.T) { testCtlV2Watch(t, e2e.NewConfigTLS(), false) }
|
||||
|
||||
func testCtlV2Watch(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, noSync bool) {
|
||||
BeforeTestV2(t)
|
||||
|
||||
cfg.EnableV2 = true
|
||||
epc := setupEtcdctlTest(t, cfg, true)
|
||||
defer cleanupEtcdProcessCluster(epc, t)
|
||||
|
||||
key, value := "foo", "bar"
|
||||
errc := etcdctlWatch(epc, key, value, noSync)
|
||||
if err := etcdctlSet(epc, key, value); err != nil {
|
||||
t.Fatalf("failed set (%v)", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-errc:
|
||||
if err != nil {
|
||||
t.Fatalf("failed watch (%v)", err)
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("watch timed out")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCtlV2GetRoleUser(t *testing.T) {
|
||||
BeforeTestV2(t)
|
||||
|
||||
copied := e2e.NewConfigNoTLS()
|
||||
copied.EnableV2 = true
|
||||
epc := setupEtcdctlTest(t, copied, false)
|
||||
defer cleanupEtcdProcessCluster(epc, t)
|
||||
|
||||
if err := etcdctlRoleAdd(epc, "foo"); err != nil {
|
||||
t.Fatalf("failed to add role (%v)", err)
|
||||
}
|
||||
if err := etcdctlUserAdd(epc, "username", "password"); err != nil {
|
||||
t.Fatalf("failed to add user (%v)", err)
|
||||
}
|
||||
if err := etcdctlUserGrant(epc, "username", "foo"); err != nil {
|
||||
t.Fatalf("failed to grant role (%v)", err)
|
||||
}
|
||||
if err := etcdctlUserGet(epc, "username"); err != nil {
|
||||
t.Fatalf("failed to get user (%v)", err)
|
||||
}
|
||||
|
||||
// ensure double grant gives an error; was crashing in 2.3.1
|
||||
regrantArgs := etcdctlPrefixArgs(epc)
|
||||
regrantArgs = append(regrantArgs, "user", "grant", "--roles", "foo", "username")
|
||||
if err := e2e.SpawnWithExpect(regrantArgs, "duplicate"); err != nil {
|
||||
t.Fatalf("missing duplicate error on double grant role (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCtlV2UserListUsername(t *testing.T) { testCtlV2UserList(t, "username") }
|
||||
func TestCtlV2UserListRoot(t *testing.T) { testCtlV2UserList(t, "root") }
|
||||
func testCtlV2UserList(t *testing.T, username string) {
|
||||
BeforeTestV2(t)
|
||||
|
||||
copied := e2e.NewConfigNoTLS()
|
||||
copied.EnableV2 = true
|
||||
epc := setupEtcdctlTest(t, copied, false)
|
||||
defer cleanupEtcdProcessCluster(epc, t)
|
||||
|
||||
if err := etcdctlUserAdd(epc, username, "password"); err != nil {
|
||||
t.Fatalf("failed to add user (%v)", err)
|
||||
}
|
||||
if err := etcdctlUserList(epc, username); err != nil {
|
||||
t.Fatalf("failed to list users (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCtlV2RoleList(t *testing.T) {
|
||||
BeforeTestV2(t)
|
||||
|
||||
copied := e2e.NewConfigNoTLS()
|
||||
copied.EnableV2 = true
|
||||
epc := setupEtcdctlTest(t, copied, false)
|
||||
defer cleanupEtcdProcessCluster(epc, t)
|
||||
|
||||
if err := etcdctlRoleAdd(epc, "foo"); err != nil {
|
||||
t.Fatalf("failed to add role (%v)", err)
|
||||
}
|
||||
if err := etcdctlRoleList(epc, "foo"); err != nil {
|
||||
t.Fatalf("failed to list roles (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUtlCtlV2Backup(t *testing.T) {
|
||||
for snap := range []int{0, 1} {
|
||||
for _, v3 := range []bool{true, false} {
|
||||
for _, utl := range []bool{true, false} {
|
||||
t.Run(fmt.Sprintf("etcdutl:%v;snap:%v;v3:%v", utl, snap, v3),
|
||||
func(t *testing.T) {
|
||||
testUtlCtlV2Backup(t, snap, v3, utl)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testUtlCtlV2Backup(t *testing.T, snapCount int, v3 bool, utl bool) {
|
||||
BeforeTestV2(t)
|
||||
|
||||
backupDir, err := os.MkdirTemp(t.TempDir(), "testbackup0.etcd")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
etcdCfg := e2e.NewConfigNoTLS()
|
||||
etcdCfg.SnapshotCount = snapCount
|
||||
etcdCfg.EnableV2 = true
|
||||
t.Log("Starting etcd-1")
|
||||
epc1 := setupEtcdctlTest(t, etcdCfg, false)
|
||||
|
||||
// v3 put before v2 set so snapshot happens after v3 operations to confirm
|
||||
// v3 data is preserved after snapshot.
|
||||
os.Setenv("ETCDCTL_API", "3")
|
||||
if err := ctlV3Put(ctlCtx{t: t, epc: epc1}, "v3key", "123", ""); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
os.Setenv("ETCDCTL_API", "2")
|
||||
|
||||
t.Log("Setting key in etcd-1")
|
||||
if err := etcdctlSet(epc1, "foo1", "bar1"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if v3 {
|
||||
t.Log("Stopping etcd-1")
|
||||
// v3 must lock the db to backup, so stop process
|
||||
if err := epc1.Stop(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
t.Log("Triggering etcd backup")
|
||||
if err := etcdctlBackup(t, epc1, epc1.Procs[0].Config().DataDirPath, backupDir, v3, utl); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("Closing etcd-1 backup")
|
||||
if err := epc1.Close(); err != nil {
|
||||
t.Fatalf("error closing etcd processes (%v)", err)
|
||||
}
|
||||
|
||||
t.Logf("Backup directory: %s", backupDir)
|
||||
|
||||
t.Log("Starting etcd-2 (post backup)")
|
||||
// restart from the backup directory
|
||||
cfg2 := e2e.NewConfigNoTLS()
|
||||
cfg2.DataDirPath = backupDir
|
||||
cfg2.KeepDataDir = true
|
||||
cfg2.ForceNewCluster = true
|
||||
cfg2.EnableV2 = true
|
||||
epc2 := setupEtcdctlTest(t, cfg2, false)
|
||||
// Make sure a failing test is not leaking resources (running server).
|
||||
defer epc2.Close()
|
||||
|
||||
t.Log("Getting examplar key")
|
||||
// check if backup went through correctly
|
||||
if err := etcdctlGet(epc2, "foo1", "bar1", false); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
os.Setenv("ETCDCTL_API", "3")
|
||||
ctx2 := ctlCtx{t: t, epc: epc2}
|
||||
if v3 {
|
||||
t.Log("Getting v3 examplar key")
|
||||
if err := ctlV3Get(ctx2, []string{"v3key"}, kv{"v3key", "123"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
} else {
|
||||
if err := ctlV3Get(ctx2, []string{"v3key"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
os.Setenv("ETCDCTL_API", "2")
|
||||
|
||||
t.Log("Getting examplar key foo2")
|
||||
// check if it can serve client requests
|
||||
if err := etcdctlSet(epc2, "foo2", "bar2"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := etcdctlGet(epc2, "foo2", "bar2", false); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Log("Closing etcd-2")
|
||||
if err := epc2.Close(); err != nil {
|
||||
t.Fatalf("error closing etcd processes (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCtlV2AuthWithCommonName(t *testing.T) {
|
||||
BeforeTestV2(t)
|
||||
|
||||
copiedCfg := e2e.NewConfigClientTLS()
|
||||
copiedCfg.ClientCertAuthEnabled = true
|
||||
copiedCfg.EnableV2 = true
|
||||
epc := setupEtcdctlTest(t, copiedCfg, false)
|
||||
defer cleanupEtcdProcessCluster(epc, t)
|
||||
|
||||
if err := etcdctlRoleAdd(epc, "testrole"); err != nil {
|
||||
t.Fatalf("failed to add role (%v)", err)
|
||||
}
|
||||
if err := etcdctlRoleGrant(epc, "testrole", "--rw", "--path=/foo"); err != nil {
|
||||
t.Fatalf("failed to grant role (%v)", err)
|
||||
}
|
||||
if err := etcdctlUserAdd(epc, "root", "123"); err != nil {
|
||||
t.Fatalf("failed to add user (%v)", err)
|
||||
}
|
||||
if err := etcdctlUserAdd(epc, "Autogenerated CA", "123"); err != nil {
|
||||
t.Fatalf("failed to add user (%v)", err)
|
||||
}
|
||||
if err := etcdctlUserGrant(epc, "Autogenerated CA", "testrole"); err != nil {
|
||||
t.Fatalf("failed to grant role (%v)", err)
|
||||
}
|
||||
if err := etcdctlAuthEnable(epc); err != nil {
|
||||
t.Fatalf("failed to enable auth (%v)", err)
|
||||
}
|
||||
if err := etcdctlSet(epc, "foo", "bar"); err != nil {
|
||||
t.Fatalf("failed to write (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCtlV2ClusterHealth(t *testing.T) {
|
||||
BeforeTestV2(t)
|
||||
|
||||
copied := e2e.NewConfigNoTLS()
|
||||
copied.EnableV2 = true
|
||||
epc := setupEtcdctlTest(t, copied, true)
|
||||
defer cleanupEtcdProcessCluster(epc, t)
|
||||
|
||||
// all members available
|
||||
if err := etcdctlClusterHealth(epc, "cluster is healthy"); err != nil {
|
||||
t.Fatalf("cluster-health expected to be healthy (%v)", err)
|
||||
}
|
||||
|
||||
// missing members, has quorum
|
||||
epc.Procs[0].Stop()
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
err := etcdctlClusterHealth(epc, "cluster is degraded")
|
||||
if err == nil {
|
||||
break
|
||||
} else if i == 2 {
|
||||
t.Fatalf("cluster-health expected to be degraded (%v)", err)
|
||||
}
|
||||
// possibly no leader yet; retry
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
// no quorum
|
||||
epc.Procs[1].Stop()
|
||||
if err := etcdctlClusterHealth(epc, "cluster is unavailable"); err != nil {
|
||||
t.Fatalf("cluster-health expected to be unavailable (%v)", err)
|
||||
}
|
||||
|
||||
epc.Procs[0], epc.Procs[1] = nil, nil
|
||||
}
|
||||
|
||||
func etcdctlPrefixArgs(clus *e2e.EtcdProcessCluster) []string {
|
||||
endpoints := strings.Join(clus.EndpointsV2(), ",")
|
||||
cmdArgs := []string{e2e.CtlBinPath}
|
||||
|
||||
cmdArgs = append(cmdArgs, "--endpoints", endpoints)
|
||||
if clus.Cfg.ClientTLS == e2e.ClientTLS {
|
||||
cmdArgs = append(cmdArgs, "--ca-file", e2e.CaPath, "--cert-file", e2e.CertPath, "--key-file", e2e.PrivateKeyPath)
|
||||
}
|
||||
return cmdArgs
|
||||
}
|
||||
|
||||
func etcductlPrefixArgs(utl bool) []string {
|
||||
if utl {
|
||||
return []string{e2e.UtlBinPath}
|
||||
}
|
||||
return []string{e2e.CtlBinPath}
|
||||
}
|
||||
|
||||
func etcdctlClusterHealth(clus *e2e.EtcdProcessCluster, val string) error {
|
||||
cmdArgs := append(etcdctlPrefixArgs(clus), "cluster-health")
|
||||
return e2e.SpawnWithExpect(cmdArgs, val)
|
||||
}
|
||||
|
||||
func etcdctlSet(clus *e2e.EtcdProcessCluster, key, value string) error {
|
||||
cmdArgs := append(etcdctlPrefixArgs(clus), "set", key, value)
|
||||
return e2e.SpawnWithExpect(cmdArgs, value)
|
||||
}
|
||||
|
||||
func etcdctlMk(clus *e2e.EtcdProcessCluster, key, value string, first bool) error {
|
||||
cmdArgs := append(etcdctlPrefixArgs(clus), "mk", key, value)
|
||||
if first {
|
||||
return e2e.SpawnWithExpect(cmdArgs, value)
|
||||
}
|
||||
return e2e.SpawnWithExpect(cmdArgs, "Error: 105: Key already exists")
|
||||
}
|
||||
|
||||
func etcdctlGet(clus *e2e.EtcdProcessCluster, key, value string, quorum bool) error {
|
||||
cmdArgs := append(etcdctlPrefixArgs(clus), "get", key)
|
||||
if quorum {
|
||||
cmdArgs = append(cmdArgs, "--quorum")
|
||||
}
|
||||
return e2e.SpawnWithExpect(cmdArgs, value)
|
||||
}
|
||||
|
||||
func etcdctlRm(clus *e2e.EtcdProcessCluster, key, value string, first bool) error {
|
||||
cmdArgs := append(etcdctlPrefixArgs(clus), "rm", key)
|
||||
if first {
|
||||
return e2e.SpawnWithExpect(cmdArgs, "PrevNode.Value: "+value)
|
||||
}
|
||||
return e2e.SpawnWithExpect(cmdArgs, "Error: 100: Key not found")
|
||||
}
|
||||
|
||||
func etcdctlLs(clus *e2e.EtcdProcessCluster, key string, quorum bool) error {
|
||||
cmdArgs := append(etcdctlPrefixArgs(clus), "ls")
|
||||
if quorum {
|
||||
cmdArgs = append(cmdArgs, "--quorum")
|
||||
}
|
||||
return e2e.SpawnWithExpect(cmdArgs, key)
|
||||
}
|
||||
|
||||
func etcdctlWatch(clus *e2e.EtcdProcessCluster, key, value string, noSync bool) <-chan error {
|
||||
cmdArgs := append(etcdctlPrefixArgs(clus), "watch", "--after-index=1", key)
|
||||
if noSync {
|
||||
cmdArgs = append(cmdArgs, "--no-sync")
|
||||
}
|
||||
errc := make(chan error, 1)
|
||||
go func() {
|
||||
errc <- e2e.SpawnWithExpect(cmdArgs, value)
|
||||
}()
|
||||
return errc
|
||||
}
|
||||
|
||||
func etcdctlRoleAdd(clus *e2e.EtcdProcessCluster, role string) error {
|
||||
cmdArgs := append(etcdctlPrefixArgs(clus), "role", "add", role)
|
||||
return e2e.SpawnWithExpect(cmdArgs, role)
|
||||
}
|
||||
|
||||
func etcdctlRoleGrant(clus *e2e.EtcdProcessCluster, role string, perms ...string) error {
|
||||
cmdArgs := append(etcdctlPrefixArgs(clus), "role", "grant")
|
||||
cmdArgs = append(cmdArgs, perms...)
|
||||
cmdArgs = append(cmdArgs, role)
|
||||
return e2e.SpawnWithExpect(cmdArgs, role)
|
||||
}
|
||||
|
||||
func etcdctlRoleList(clus *e2e.EtcdProcessCluster, expectedRole string) error {
|
||||
cmdArgs := append(etcdctlPrefixArgs(clus), "role", "list")
|
||||
return e2e.SpawnWithExpect(cmdArgs, expectedRole)
|
||||
}
|
||||
|
||||
func etcdctlUserAdd(clus *e2e.EtcdProcessCluster, user, pass string) error {
|
||||
cmdArgs := append(etcdctlPrefixArgs(clus), "user", "add", user+":"+pass)
|
||||
return e2e.SpawnWithExpect(cmdArgs, "User "+user+" created")
|
||||
}
|
||||
|
||||
func etcdctlUserGrant(clus *e2e.EtcdProcessCluster, user, role string) error {
|
||||
cmdArgs := append(etcdctlPrefixArgs(clus), "user", "grant", "--roles", role, user)
|
||||
return e2e.SpawnWithExpect(cmdArgs, "User "+user+" updated")
|
||||
}
|
||||
|
||||
func etcdctlUserGet(clus *e2e.EtcdProcessCluster, user string) error {
|
||||
cmdArgs := append(etcdctlPrefixArgs(clus), "user", "get", user)
|
||||
return e2e.SpawnWithExpect(cmdArgs, "User: "+user)
|
||||
}
|
||||
|
||||
func etcdctlUserList(clus *e2e.EtcdProcessCluster, expectedUser string) error {
|
||||
cmdArgs := append(etcdctlPrefixArgs(clus), "user", "list")
|
||||
return e2e.SpawnWithExpect(cmdArgs, expectedUser)
|
||||
}
|
||||
|
||||
func etcdctlAuthEnable(clus *e2e.EtcdProcessCluster) error {
|
||||
cmdArgs := append(etcdctlPrefixArgs(clus), "auth", "enable")
|
||||
return e2e.SpawnWithExpect(cmdArgs, "Authentication Enabled")
|
||||
}
|
||||
|
||||
func etcdctlBackup(t testing.TB, clus *e2e.EtcdProcessCluster, dataDir, backupDir string, v3 bool, utl bool) error {
|
||||
cmdArgs := append(etcductlPrefixArgs(utl), "backup", "--data-dir", dataDir, "--backup-dir", backupDir)
|
||||
if v3 {
|
||||
cmdArgs = append(cmdArgs, "--with-v3")
|
||||
} else if utl {
|
||||
cmdArgs = append(cmdArgs, "--with-v3=false")
|
||||
}
|
||||
t.Logf("Running: %v", cmdArgs)
|
||||
proc, err := e2e.SpawnCmd(cmdArgs, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = proc.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return proc.ProcessError()
|
||||
}
|
||||
|
||||
func setupEtcdctlTest(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, quorum bool) *e2e.EtcdProcessCluster {
|
||||
if !quorum {
|
||||
cfg = e2e.ConfigStandalone(*cfg)
|
||||
}
|
||||
epc, err := e2e.NewEtcdProcessCluster(t, cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||
}
|
||||
return epc
|
||||
}
|
||||
|
||||
func cleanupEtcdProcessCluster(epc *e2e.EtcdProcessCluster, t *testing.T) {
|
||||
if errC := epc.Close(); errC != nil {
|
||||
t.Fatalf("error closing etcd processes (%v)", errC)
|
||||
}
|
||||
}
|
@ -118,3 +118,14 @@ func testCtlV3MoveLeader(t *testing.T, cfg e2e.EtcdProcessClusterConfig) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func setupEtcdctlTest(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, quorum bool) *e2e.EtcdProcessCluster {
|
||||
if !quorum {
|
||||
cfg = e2e.ConfigStandalone(*cfg)
|
||||
}
|
||||
epc, err := e2e.NewEtcdProcessCluster(t, cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||
}
|
||||
return epc
|
||||
}
|
||||
|
@ -1,115 +0,0 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
)
|
||||
|
||||
func TestV2CurlNoTLS(t *testing.T) { testCurlPutGet(t, e2e.NewConfigNoTLS()) }
|
||||
func TestV2CurlAutoTLS(t *testing.T) { testCurlPutGet(t, e2e.NewConfigAutoTLS()) }
|
||||
func TestV2CurlAllTLS(t *testing.T) { testCurlPutGet(t, e2e.NewConfigTLS()) }
|
||||
func TestV2CurlPeerTLS(t *testing.T) { testCurlPutGet(t, e2e.NewConfigPeerTLS()) }
|
||||
func TestV2CurlClientTLS(t *testing.T) { testCurlPutGet(t, e2e.NewConfigClientTLS()) }
|
||||
func TestV2CurlClientBoth(t *testing.T) { testCurlPutGet(t, e2e.NewConfigClientBoth()) }
|
||||
func testCurlPutGet(t *testing.T, cfg *e2e.EtcdProcessClusterConfig) {
|
||||
BeforeTestV2(t)
|
||||
|
||||
// test doesn't use quorum gets, so ensure there are no followers to avoid
|
||||
// stale reads that will break the test
|
||||
cfg = e2e.ConfigStandalone(*cfg)
|
||||
|
||||
cfg.EnableV2 = true
|
||||
epc, err := e2e.NewEtcdProcessCluster(t, cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := epc.Close(); err != nil {
|
||||
t.Fatalf("error closing etcd processes (%v)", err)
|
||||
}
|
||||
}()
|
||||
|
||||
var (
|
||||
expectPut = `{"action":"set","node":{"key":"/foo","value":"bar","`
|
||||
expectGet = `{"action":"get","node":{"key":"/foo","value":"bar","`
|
||||
)
|
||||
if err := e2e.CURLPut(epc, e2e.CURLReq{Endpoint: "/v2/keys/foo", Value: "bar", Expected: expectPut}); err != nil {
|
||||
t.Fatalf("failed put with curl (%v)", err)
|
||||
}
|
||||
if err := e2e.CURLGet(epc, e2e.CURLReq{Endpoint: "/v2/keys/foo", Expected: expectGet}); err != nil {
|
||||
t.Fatalf("failed get with curl (%v)", err)
|
||||
}
|
||||
if cfg.ClientTLS == e2e.ClientTLSAndNonTLS {
|
||||
if err := e2e.CURLGet(epc, e2e.CURLReq{Endpoint: "/v2/keys/foo", Expected: expectGet, IsTLS: true}); err != nil {
|
||||
t.Fatalf("failed get with curl (%v)", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestV2CurlIssue5182(t *testing.T) {
|
||||
BeforeTestV2(t)
|
||||
|
||||
copied := e2e.NewConfigNoTLS()
|
||||
copied.EnableV2 = true
|
||||
epc := setupEtcdctlTest(t, copied, false)
|
||||
defer func() {
|
||||
if err := epc.Close(); err != nil {
|
||||
t.Fatalf("error closing etcd processes (%v)", err)
|
||||
}
|
||||
}()
|
||||
|
||||
expectPut := `{"action":"set","node":{"key":"/foo","value":"bar","`
|
||||
if err := e2e.CURLPut(epc, e2e.CURLReq{Endpoint: "/v2/keys/foo", Value: "bar", Expected: expectPut}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expectUserAdd := `{"user":"foo","roles":null}`
|
||||
if err := e2e.CURLPut(epc, e2e.CURLReq{Endpoint: "/v2/auth/users/foo", Value: `{"user":"foo", "password":"pass"}`, Expected: expectUserAdd}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expectRoleAdd := `{"role":"foo","permissions":{"kv":{"read":["/foo/*"],"write":null}}`
|
||||
if err := e2e.CURLPut(epc, e2e.CURLReq{Endpoint: "/v2/auth/roles/foo", Value: `{"role":"foo", "permissions": {"kv": {"read": ["/foo/*"]}}}`, Expected: expectRoleAdd}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expectUserUpdate := `{"user":"foo","roles":["foo"]}`
|
||||
if err := e2e.CURLPut(epc, e2e.CURLReq{Endpoint: "/v2/auth/users/foo", Value: `{"user": "foo", "grant": ["foo"]}`, Expected: expectUserUpdate}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := etcdctlUserAdd(epc, "root", "a"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := etcdctlAuthEnable(epc); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := e2e.CURLGet(epc, e2e.CURLReq{Endpoint: "/v2/keys/foo/", Username: "root", Password: "a", Expected: "bar"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := e2e.CURLGet(epc, e2e.CURLReq{Endpoint: "/v2/keys/foo/", Username: "foo", Password: "pass", Expected: "bar"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := e2e.CURLGet(epc, e2e.CURLReq{Endpoint: "/v2/keys/foo/", Username: "foo", Password: "", Expected: "bar"}); err != nil {
|
||||
if !strings.Contains(err.Error(), `The request requires user authentication`) {
|
||||
t.Fatalf("expected 'The request requires user authentication' error, got %v", err)
|
||||
}
|
||||
} else {
|
||||
t.Fatalf("expected 'The request requires user authentication' error")
|
||||
}
|
||||
}
|
@ -19,13 +19,14 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
)
|
||||
|
||||
func createV2store(t testing.TB, dataDirPath string) {
|
||||
func createV2store(t testing.TB, lastReleaseBinary string, dataDirPath string) {
|
||||
t.Log("Creating not-yet v2-deprecated etcd")
|
||||
|
||||
cfg := e2e.ConfigStandalone(e2e.EtcdProcessClusterConfig{EnableV2: true, DataDirPath: dataDirPath, SnapshotCount: 5})
|
||||
cfg := e2e.ConfigStandalone(e2e.EtcdProcessClusterConfig{ExecPath: lastReleaseBinary, EnableV2: true, DataDirPath: dataDirPath, SnapshotCount: 5})
|
||||
epc, err := e2e.NewEtcdProcessCluster(t, cfg)
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -46,20 +47,13 @@ func createV2store(t testing.TB, dataDirPath string) {
|
||||
func assertVerifyCanStartV2deprecationNotYet(t testing.TB, dataDirPath string) {
|
||||
t.Log("verify: possible to start etcd with --v2-deprecation=not-yet mode")
|
||||
|
||||
cfg := e2e.ConfigStandalone(e2e.EtcdProcessClusterConfig{EnableV2: true, DataDirPath: dataDirPath, V2deprecation: "not-yet", KeepDataDir: true})
|
||||
cfg := e2e.ConfigStandalone(e2e.EtcdProcessClusterConfig{DataDirPath: dataDirPath, V2deprecation: "not-yet", KeepDataDir: true})
|
||||
epc, err := e2e.NewEtcdProcessCluster(t, cfg)
|
||||
assert.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
assert.NoError(t, epc.Stop())
|
||||
}()
|
||||
|
||||
if err := e2e.CURLGet(epc, e2e.CURLReq{
|
||||
Endpoint: "/v2/keys/foo",
|
||||
Expected: `{"action":"get","node":{"key":"/foo","value":"bar9","modifiedIndex":13,"createdIndex":13}}`}); err != nil {
|
||||
t.Fatalf("failed get with curl (%v)", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func assertVerifyCannotStartV2deprecationWriteOnly(t testing.TB, dataDirPath string) {
|
||||
@ -75,8 +69,13 @@ func TestV2Deprecation(t *testing.T) {
|
||||
e2e.BeforeTest(t)
|
||||
dataDirPath := t.TempDir()
|
||||
|
||||
lastReleaseBinary := e2e.BinDir + "/etcd-last-release"
|
||||
if !fileutil.Exist(lastReleaseBinary) {
|
||||
t.Skipf("%q does not exist", lastReleaseBinary)
|
||||
}
|
||||
|
||||
t.Run("create-storev2-data", func(t *testing.T) {
|
||||
createV2store(t, dataDirPath)
|
||||
createV2store(t, lastReleaseBinary, dataDirPath)
|
||||
})
|
||||
|
||||
t.Run("--v2-deprecation=write-only fails", func(t *testing.T) {
|
||||
@ -88,12 +87,3 @@ func TestV2Deprecation(t *testing.T) {
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestV2DeprecationWriteOnlyNoV2Api(t *testing.T) {
|
||||
e2e.BeforeTest(t)
|
||||
proc, err := e2e.SpawnCmd([]string{e2e.BinDir + "/etcd", "--v2-deprecation=write-only", "--enable-v2"}, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = proc.Expect("--enable-v2 and --v2-deprecation=write-only are mutually exclusive")
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
@ -273,7 +273,6 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []*
|
||||
"--data-dir", dataDirPath,
|
||||
"--snapshot-count", fmt.Sprintf("%d", cfg.SnapshotCount),
|
||||
}
|
||||
args = AddV2Args(args)
|
||||
if cfg.ForceNewCluster {
|
||||
args = append(args, "--force-new-cluster")
|
||||
}
|
||||
|
@ -1,20 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !v2v3
|
||||
// +build !v2v3
|
||||
|
||||
package e2e
|
||||
|
||||
func AddV2Args(args []string) []string { return args }
|
@ -1,22 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build v2v3
|
||||
// +build v2v3
|
||||
|
||||
package e2e
|
||||
|
||||
func AddV2Args(args []string) []string {
|
||||
return append(args, "--experimental-enable-v2v3", "v2/")
|
||||
}
|
@ -1,111 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package client_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sort"
|
||||
|
||||
"go.etcd.io/etcd/client/v2"
|
||||
)
|
||||
|
||||
func mockKeysAPI_directory() {
|
||||
// TODO: Replace with proper mocking
|
||||
fmt.Println(`Key: "/myNodes/key1", Value: "value1"`)
|
||||
fmt.Println(`Key: "/myNodes/key2", Value: "value2"`)
|
||||
}
|
||||
|
||||
func ExampleKeysAPI_directory() {
|
||||
forUnitTestsRunInMockedContext(
|
||||
mockKeysAPI_directory,
|
||||
func() {
|
||||
c, err := client.New(client.Config{
|
||||
Endpoints: exampleEndpoints(),
|
||||
Transport: exampleTransport(),
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
kapi := client.NewKeysAPI(c)
|
||||
|
||||
// Setting '/myNodes' to create a directory that will hold some keys.
|
||||
o := client.SetOptions{Dir: true}
|
||||
resp, err := kapi.Set(context.Background(), "/myNodes", "", &o)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// Add keys to /myNodes directory.
|
||||
resp, err = kapi.Set(context.Background(), "/myNodes/key1", "value1", nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
resp, err = kapi.Set(context.Background(), "/myNodes/key2", "value2", nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// fetch directory
|
||||
resp, err = kapi.Get(context.Background(), "/myNodes", nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// print directory keys
|
||||
sort.Sort(resp.Node.Nodes)
|
||||
for _, n := range resp.Node.Nodes {
|
||||
fmt.Printf("Key: %q, Value: %q\n", n.Key, n.Value)
|
||||
}
|
||||
})
|
||||
|
||||
// Output:
|
||||
// Key: "/myNodes/key1", Value: "value1"
|
||||
// Key: "/myNodes/key2", Value: "value2"
|
||||
}
|
||||
|
||||
func mockKeysAPI_setget() {
|
||||
fmt.Println(`"/foo" key has "bar" value`)
|
||||
}
|
||||
|
||||
func ExampleKeysAPI_setget() {
|
||||
forUnitTestsRunInMockedContext(
|
||||
mockKeysAPI_setget,
|
||||
func() {
|
||||
c, err := client.New(client.Config{
|
||||
Endpoints: exampleEndpoints(),
|
||||
Transport: exampleTransport(),
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
kapi := client.NewKeysAPI(c)
|
||||
|
||||
// Set key "/foo" to value "bar".
|
||||
resp, err := kapi.Set(context.Background(), "/foo", "bar", nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// Get key "/foo"
|
||||
resp, err = kapi.Get(context.Background(), "/foo", nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
fmt.Printf("%q key has %q value\n", resp.Node.Key, resp.Node.Value)
|
||||
})
|
||||
|
||||
// Output: "/foo" key has "bar" value
|
||||
}
|
@ -1,44 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package client_test
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
||||
"go.etcd.io/etcd/tests/v3/integration"
|
||||
)
|
||||
|
||||
var lazyCluster = integration.NewLazyCluster()
|
||||
|
||||
func exampleEndpoints() []string { return lazyCluster.EndpointsV2() }
|
||||
func exampleTransport() *http.Transport { return lazyCluster.Transport() }
|
||||
|
||||
func forUnitTestsRunInMockedContext(mocking func(), example func()) {
|
||||
// For integration tests runs in the provided environment
|
||||
example()
|
||||
}
|
||||
|
||||
// TestMain sets up an etcd cluster if running the examples.
|
||||
func TestMain(m *testing.M) {
|
||||
v := m.Run()
|
||||
lazyCluster.Terminate()
|
||||
if v == 0 {
|
||||
testutil.MustCheckLeakedGoroutine()
|
||||
}
|
||||
os.Exit(v)
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@ -12,9 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !v2v3
|
||||
// +build !v2v3
|
||||
|
||||
package v2store_test
|
||||
|
||||
import (
|
||||
@ -26,24 +23,10 @@ import (
|
||||
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
|
||||
)
|
||||
|
||||
type v2TestStore struct {
|
||||
v2store.Store
|
||||
}
|
||||
|
||||
func (s *v2TestStore) Close() {}
|
||||
|
||||
func newTestStore(t *testing.T, ns ...string) StoreCloser {
|
||||
if len(ns) == 0 {
|
||||
t.Logf("new v2 store with no namespace")
|
||||
}
|
||||
return &v2TestStore{v2store.New(ns...)}
|
||||
}
|
||||
|
||||
// Ensure that the store can recover from a previously saved state.
|
||||
func TestStoreRecover(t *testing.T) {
|
||||
integration2.BeforeTest(t)
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
var eidx uint64 = 4
|
||||
s.Create("/foo", true, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
s.Create("/foo/x", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -52,7 +35,7 @@ func TestStoreRecover(t *testing.T) {
|
||||
b, err := s.Save()
|
||||
testutil.AssertNil(t, err)
|
||||
|
||||
s2 := newTestStore(t)
|
||||
s2 := v2store.New()
|
||||
s2.Recovery(b)
|
||||
|
||||
e, err := s.Get("/foo/x", false, false)
|
@ -1,44 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build v2v3
|
||||
// +build v2v3
|
||||
|
||||
package v2store_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2v3"
|
||||
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
|
||||
)
|
||||
|
||||
type v2v3TestStore struct {
|
||||
v2store.Store
|
||||
clus *integration2.ClusterV3
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (s *v2v3TestStore) Close() { s.clus.Terminate(s.t) }
|
||||
|
||||
func newTestStore(t *testing.T, ns ...string) StoreCloser {
|
||||
integration2.BeforeTest(t)
|
||||
clus := integration2.NewClusterV3(t, &integration2.ClusterConfig{Size: 1})
|
||||
return &v2v3TestStore{
|
||||
v2v3.NewStore(clus.Client(0), "/v2/"),
|
||||
clus,
|
||||
t,
|
||||
}
|
||||
}
|
@ -31,8 +31,7 @@ type StoreCloser interface {
|
||||
}
|
||||
|
||||
func TestNewStoreWithNamespaces(t *testing.T) {
|
||||
s := newTestStore(t, "/0", "/1")
|
||||
defer s.Close()
|
||||
s := v2store.New("/0", "/1")
|
||||
|
||||
_, err := s.Get("/0", false, false)
|
||||
testutil.AssertNil(t, err)
|
||||
@ -42,8 +41,7 @@ func TestNewStoreWithNamespaces(t *testing.T) {
|
||||
|
||||
// Ensure that the store can retrieve an existing value.
|
||||
func TestStoreGetValue(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
s.Create("/foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
var eidx uint64 = 1
|
||||
@ -57,8 +55,7 @@ func TestStoreGetValue(t *testing.T) {
|
||||
|
||||
// Ensure that the store can retrieve a directory in sorted order.
|
||||
func TestStoreGetSorted(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
s.Create("/foo", true, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
s.Create("/foo/x", false, "0", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -93,8 +90,7 @@ func TestStoreGetSorted(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSet(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
// Set /foo=""
|
||||
var eidx uint64 = 1
|
||||
@ -177,8 +173,7 @@ func TestSet(t *testing.T) {
|
||||
|
||||
// Ensure that the store can create a new key if it doesn't already exist.
|
||||
func TestStoreCreateValue(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
// Create /foo=bar
|
||||
var eidx uint64 = 1
|
||||
@ -212,8 +207,7 @@ func TestStoreCreateValue(t *testing.T) {
|
||||
|
||||
// Ensure that the store can create a new directory if it doesn't already exist.
|
||||
func TestStoreCreateDirectory(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
var eidx uint64 = 1
|
||||
e, err := s.Create("/foo", true, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -226,8 +220,7 @@ func TestStoreCreateDirectory(t *testing.T) {
|
||||
|
||||
// Ensure that the store fails to create a key if it already exists.
|
||||
func TestStoreCreateFailsIfExists(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
// create /foo as dir
|
||||
s.Create("/foo", true, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -244,8 +237,7 @@ func TestStoreCreateFailsIfExists(t *testing.T) {
|
||||
|
||||
// Ensure that the store can update a key if it already exists.
|
||||
func TestStoreUpdateValue(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
// create /foo=bar
|
||||
s.Create("/foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -294,8 +286,7 @@ func TestStoreUpdateValue(t *testing.T) {
|
||||
|
||||
// Ensure that the store cannot update a directory.
|
||||
func TestStoreUpdateFailsIfDirectory(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
s.Create("/foo", true, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
e, _err := s.Update("/foo", "baz", v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -308,8 +299,7 @@ func TestStoreUpdateFailsIfDirectory(t *testing.T) {
|
||||
|
||||
// Ensure that the store can delete a value.
|
||||
func TestStoreDeleteValue(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
var eidx uint64 = 2
|
||||
s.Create("/foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -325,8 +315,7 @@ func TestStoreDeleteValue(t *testing.T) {
|
||||
|
||||
// Ensure that the store can delete a directory if recursive is specified.
|
||||
func TestStoreDeleteDirectory(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
// create directory /foo
|
||||
var eidx uint64 = 2
|
||||
@ -363,8 +352,7 @@ func TestStoreDeleteDirectory(t *testing.T) {
|
||||
// Ensure that the store cannot delete a directory if both of recursive
|
||||
// and dir are not specified.
|
||||
func TestStoreDeleteDirectoryFailsIfNonRecursiveAndDir(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
s.Create("/foo", true, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
e, _err := s.Delete("/foo", false, false)
|
||||
@ -375,8 +363,7 @@ func TestStoreDeleteDirectoryFailsIfNonRecursiveAndDir(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRootRdOnly(t *testing.T) {
|
||||
s := newTestStore(t, "/0")
|
||||
defer s.Close()
|
||||
s := v2store.New("/0")
|
||||
|
||||
for _, tt := range []string{"/", "/0"} {
|
||||
_, err := s.Set(tt, true, "", v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -397,8 +384,7 @@ func TestRootRdOnly(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStoreCompareAndDeletePrevValue(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
var eidx uint64 = 2
|
||||
s.Create("/foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -417,8 +403,7 @@ func TestStoreCompareAndDeletePrevValue(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStoreCompareAndDeletePrevValueFailsIfNotMatch(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
var eidx uint64 = 1
|
||||
s.Create("/foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -433,8 +418,7 @@ func TestStoreCompareAndDeletePrevValueFailsIfNotMatch(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStoreCompareAndDeletePrevIndex(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
var eidx uint64 = 2
|
||||
s.Create("/foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -451,8 +435,7 @@ func TestStoreCompareAndDeletePrevIndex(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStoreCompareAndDeletePrevIndexFailsIfNotMatch(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
var eidx uint64 = 1
|
||||
s.Create("/foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -469,8 +452,7 @@ func TestStoreCompareAndDeletePrevIndexFailsIfNotMatch(t *testing.T) {
|
||||
|
||||
// Ensure that the store cannot delete a directory.
|
||||
func TestStoreCompareAndDeleteDirectoryFail(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
s.Create("/foo", true, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
_, _err := s.CompareAndDelete("/foo", "", 0)
|
||||
@ -481,8 +463,7 @@ func TestStoreCompareAndDeleteDirectoryFail(t *testing.T) {
|
||||
|
||||
// Ensure that the store can conditionally update a key if it has a previous value.
|
||||
func TestStoreCompareAndSwapPrevValue(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
|
||||
var eidx uint64 = 2
|
||||
s.Create("/foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -504,8 +485,7 @@ func TestStoreCompareAndSwapPrevValue(t *testing.T) {
|
||||
|
||||
// Ensure that the store cannot conditionally update a key if it has the wrong previous value.
|
||||
func TestStoreCompareAndSwapPrevValueFailsIfNotMatch(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
var eidx uint64 = 1
|
||||
s.Create("/foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
e, _err := s.CompareAndSwap("/foo", "wrong_value", 0, "baz", v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -520,8 +500,7 @@ func TestStoreCompareAndSwapPrevValueFailsIfNotMatch(t *testing.T) {
|
||||
|
||||
// Ensure that the store can conditionally update a key if it has a previous index.
|
||||
func TestStoreCompareAndSwapPrevIndex(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
var eidx uint64 = 2
|
||||
s.Create("/foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
e, err := s.CompareAndSwap("/foo", "", 1, "baz", v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -543,8 +522,7 @@ func TestStoreCompareAndSwapPrevIndex(t *testing.T) {
|
||||
|
||||
// Ensure that the store cannot conditionally update a key if it has the wrong previous index.
|
||||
func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
var eidx uint64 = 1
|
||||
s.Create("/foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
e, _err := s.CompareAndSwap("/foo", "", 100, "baz", v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -559,8 +537,7 @@ func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) {
|
||||
|
||||
// Ensure that the store can watch for key creation.
|
||||
func TestStoreWatchCreate(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
var eidx uint64 = 0
|
||||
w, _ := s.Watch("/foo", false, false, 0)
|
||||
c := w.EventChan()
|
||||
@ -580,8 +557,7 @@ func TestStoreWatchCreate(t *testing.T) {
|
||||
|
||||
// Ensure that the store can watch for recursive key creation.
|
||||
func TestStoreWatchRecursiveCreate(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
var eidx uint64 = 0
|
||||
w, err := s.Watch("/foo", true, false, 0)
|
||||
testutil.AssertNil(t, err)
|
||||
@ -596,8 +572,7 @@ func TestStoreWatchRecursiveCreate(t *testing.T) {
|
||||
|
||||
// Ensure that the store can watch for key updates.
|
||||
func TestStoreWatchUpdate(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
var eidx uint64 = 1
|
||||
s.Create("/foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
w, _ := s.Watch("/foo", false, false, 0)
|
||||
@ -612,8 +587,7 @@ func TestStoreWatchUpdate(t *testing.T) {
|
||||
|
||||
// Ensure that the store can watch for recursive key updates.
|
||||
func TestStoreWatchRecursiveUpdate(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
var eidx uint64 = 1
|
||||
s.Create("/foo/bar", false, "baz", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
w, err := s.Watch("/foo", true, false, 0)
|
||||
@ -629,8 +603,7 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) {
|
||||
|
||||
// Ensure that the store can watch for key deletions.
|
||||
func TestStoreWatchDelete(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
var eidx uint64 = 1
|
||||
s.Create("/foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
w, _ := s.Watch("/foo", false, false, 0)
|
||||
@ -645,8 +618,7 @@ func TestStoreWatchDelete(t *testing.T) {
|
||||
|
||||
// Ensure that the store can watch for recursive key deletions.
|
||||
func TestStoreWatchRecursiveDelete(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
var eidx uint64 = 1
|
||||
s.Create("/foo/bar", false, "baz", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
w, err := s.Watch("/foo", true, false, 0)
|
||||
@ -662,8 +634,7 @@ func TestStoreWatchRecursiveDelete(t *testing.T) {
|
||||
|
||||
// Ensure that the store can watch for CAS updates.
|
||||
func TestStoreWatchCompareAndSwap(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
var eidx uint64 = 1
|
||||
s.Create("/foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
w, _ := s.Watch("/foo", false, false, 0)
|
||||
@ -678,8 +649,7 @@ func TestStoreWatchCompareAndSwap(t *testing.T) {
|
||||
|
||||
// Ensure that the store can watch for recursive CAS updates.
|
||||
func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
var eidx uint64 = 1
|
||||
s.Create("/foo/bar", false, "baz", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
w, _ := s.Watch("/foo", true, false, 0)
|
||||
@ -694,8 +664,7 @@ func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
|
||||
|
||||
// Ensure that the store can watch in streaming mode.
|
||||
func TestStoreWatchStream(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
var eidx uint64 = 1
|
||||
w, _ := s.Watch("/foo", false, true, 0)
|
||||
// first modification
|
||||
@ -727,8 +696,7 @@ func TestStoreWatchStream(t *testing.T) {
|
||||
|
||||
// Ensure that the store can watch for hidden keys as long as it's an exact path match.
|
||||
func TestStoreWatchCreateWithHiddenKey(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
var eidx uint64 = 1
|
||||
w, _ := s.Watch("/_foo", false, false, 0)
|
||||
s.Create("/_foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -745,8 +713,7 @@ func TestStoreWatchCreateWithHiddenKey(t *testing.T) {
|
||||
|
||||
// Ensure that the store doesn't see hidden key creates without an exact path match in recursive mode.
|
||||
func TestStoreWatchRecursiveCreateWithHiddenKey(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
w, _ := s.Watch("/foo", true, false, 0)
|
||||
s.Create("/foo/_bar", false, "baz", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
e := nbselect(w.EventChan())
|
||||
@ -768,8 +735,7 @@ func TestStoreWatchRecursiveCreateWithHiddenKey(t *testing.T) {
|
||||
|
||||
// Ensure that the store doesn't see hidden key updates.
|
||||
func TestStoreWatchUpdateWithHiddenKey(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
s.Create("/_foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
w, _ := s.Watch("/_foo", false, false, 0)
|
||||
s.Update("/_foo", "baz", v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -782,8 +748,7 @@ func TestStoreWatchUpdateWithHiddenKey(t *testing.T) {
|
||||
|
||||
// Ensure that the store doesn't see hidden key updates without an exact path match in recursive mode.
|
||||
func TestStoreWatchRecursiveUpdateWithHiddenKey(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
s.Create("/foo/_bar", false, "baz", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
w, _ := s.Watch("/foo", true, false, 0)
|
||||
s.Update("/foo/_bar", "baz", v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -793,8 +758,7 @@ func TestStoreWatchRecursiveUpdateWithHiddenKey(t *testing.T) {
|
||||
|
||||
// Ensure that the store can watch for key deletions.
|
||||
func TestStoreWatchDeleteWithHiddenKey(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
var eidx uint64 = 2
|
||||
s.Create("/_foo", false, "bar", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
w, _ := s.Watch("/_foo", false, false, 0)
|
||||
@ -809,8 +773,7 @@ func TestStoreWatchDeleteWithHiddenKey(t *testing.T) {
|
||||
|
||||
// Ensure that the store doesn't see hidden key deletes without an exact path match in recursive mode.
|
||||
func TestStoreWatchRecursiveDeleteWithHiddenKey(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
s.Create("/foo/_bar", false, "baz", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
w, _ := s.Watch("/foo", true, false, 0)
|
||||
s.Delete("/foo/_bar", false, false)
|
||||
@ -820,8 +783,7 @@ func TestStoreWatchRecursiveDeleteWithHiddenKey(t *testing.T) {
|
||||
|
||||
// Ensure that the store does see hidden key creates if watching deeper than a hidden key in recursive mode.
|
||||
func TestStoreWatchRecursiveCreateDeeperThanHiddenKey(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
var eidx uint64 = 1
|
||||
w, _ := s.Watch("/_foo/bar", true, false, 0)
|
||||
s.Create("/_foo/bar/baz", false, "baz", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
|
||||
@ -841,8 +803,7 @@ func TestStoreWatchRecursiveCreateDeeperThanHiddenKey(t *testing.T) {
|
||||
// This test ensures that after closing the channel, the store can continue
|
||||
// to operate correctly.
|
||||
func TestStoreWatchSlowConsumer(t *testing.T) {
|
||||
s := newTestStore(t)
|
||||
defer s.Close()
|
||||
s := v2store.New()
|
||||
s.Watch("/foo", true, true, 0) // stream must be true
|
||||
// Fill watch channel with 100 events
|
||||
for i := 1; i <= 100; i++ {
|
||||
|
@ -1,165 +0,0 @@
|
||||
// Copyright 2019 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package v2store_test
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2v3"
|
||||
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
|
||||
)
|
||||
|
||||
// TODO: fix tests
|
||||
|
||||
func runWithCluster(t testing.TB, runner func(testing.TB, []string)) {
|
||||
integration2.BeforeTest(t)
|
||||
cfg := integration2.ClusterConfig{Size: 1}
|
||||
clus := integration2.NewClusterV3(t, &cfg)
|
||||
defer clus.Terminate(t)
|
||||
endpoints := []string{clus.Client(0).Endpoints()[0]}
|
||||
runner(t, endpoints)
|
||||
}
|
||||
|
||||
func TestCreateKV(t *testing.T) { runWithCluster(t, testCreateKV) }
|
||||
|
||||
func testCreateKV(t testing.TB, endpoints []string) {
|
||||
integration2.BeforeTest(t)
|
||||
testCases := []struct {
|
||||
key string
|
||||
value string
|
||||
nodes int
|
||||
unique bool
|
||||
wantErr bool
|
||||
wantKeyMatch bool
|
||||
}{
|
||||
{key: "/cdir/create", value: "1", nodes: 1, wantKeyMatch: true},
|
||||
{key: "/cdir/create", value: "4", wantErr: true},
|
||||
// TODO: unique doesn't create nodes, skip these tests for now
|
||||
//{key: "hello", value: "2", unique: true, wantKeyMatch: false},
|
||||
//{key: "hello", value: "3", unique: true, wantKeyMatch: false},
|
||||
}
|
||||
|
||||
cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: endpoints})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cli.Close()
|
||||
v2 := v2v3.NewStore(cli, "")
|
||||
|
||||
for ti, tc := range testCases {
|
||||
ev, err := v2.Create(tc.key, false, tc.value, tc.unique, v2store.TTLOptionSet{})
|
||||
if tc.wantErr && err != nil {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
t.Skipf("%d: got err %v", ti, err)
|
||||
}
|
||||
|
||||
if tc.wantKeyMatch && tc.key != ev.Node.Key {
|
||||
t.Skipf("%d: %v != %v", ti, tc.key, ev.Node.Key)
|
||||
}
|
||||
if !tc.wantKeyMatch && !strings.HasPrefix(ev.Node.Key, tc.key) {
|
||||
t.Skipf("%d: %v is not prefix of %v", ti, tc.key, ev.Node.Key)
|
||||
}
|
||||
|
||||
evg, err := v2.Get(tc.key, false, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if evg.Node.CreatedIndex != ev.Node.CreatedIndex {
|
||||
t.Skipf("%d: %v != %v", ti, evg.Node.CreatedIndex, ev.Node.CreatedIndex)
|
||||
}
|
||||
|
||||
t.Logf("%d: %v %s %v\n", ti, ev.Node.Key, *ev.Node.Value, ev.Node.CreatedIndex)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetKV(t *testing.T) { runWithCluster(t, testSetKV) }
|
||||
|
||||
func testSetKV(t testing.TB, endpoints []string) {
|
||||
testCases := []struct {
|
||||
key string
|
||||
value string
|
||||
wantIndexMatch bool
|
||||
}{
|
||||
{key: "/sdir/set", value: "1", wantIndexMatch: true},
|
||||
{key: "/sdir/set", value: "4", wantIndexMatch: false},
|
||||
}
|
||||
|
||||
cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: endpoints})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cli.Close()
|
||||
v2 := v2v3.NewStore(cli, "")
|
||||
|
||||
for ti, tc := range testCases {
|
||||
ev, err := v2.Set(tc.key, false, tc.value, v2store.TTLOptionSet{})
|
||||
if err != nil {
|
||||
t.Skipf("%d: got err %v", ti, err)
|
||||
}
|
||||
|
||||
if tc.value != *ev.Node.Value {
|
||||
t.Skipf("%d: %v != %v", ti, tc.value, *ev.Node.Value)
|
||||
}
|
||||
|
||||
if tc.wantIndexMatch && ev.Node.CreatedIndex != ev.Node.ModifiedIndex {
|
||||
t.Skipf("%d: index %v != %v", ti, ev.Node.CreatedIndex, ev.Node.ModifiedIndex)
|
||||
}
|
||||
|
||||
t.Logf("%d: %v %s %v\n", ti, ev.Node.Key, *ev.Node.Value, ev.Node.CreatedIndex)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateSetDir(t *testing.T) { runWithCluster(t, testCreateSetDir) }
|
||||
|
||||
func testCreateSetDir(t testing.TB, endpoints []string) {
|
||||
integration2.BeforeTest(t)
|
||||
testCases := []struct {
|
||||
dir string
|
||||
}{
|
||||
{dir: "/ddir/1/2/3"},
|
||||
{dir: "/ddir/1/2/3"},
|
||||
}
|
||||
|
||||
cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: endpoints})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cli.Close()
|
||||
v2 := v2v3.NewStore(cli, "")
|
||||
|
||||
for ti, tc := range testCases {
|
||||
_, err := v2.Create(tc.dir, true, "", false, v2store.TTLOptionSet{})
|
||||
if err != nil {
|
||||
t.Skipf("%d: got err %v", ti, err)
|
||||
}
|
||||
_, err = v2.Create(tc.dir, true, "", false, v2store.TTLOptionSet{})
|
||||
if err == nil {
|
||||
t.Skipf("%d: expected err got nil", ti)
|
||||
}
|
||||
|
||||
ev, err := v2.Delete("ddir", true, true)
|
||||
if err != nil {
|
||||
t.Skipf("%d: got err %v", ti, err)
|
||||
}
|
||||
|
||||
t.Logf("%d: %v %s %v\n", ti, ev.EtcdIndex, ev.PrevNode.Key, ev.PrevNode.CreatedIndex)
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user