diff --git a/Makefile b/Makefile index ee0c3fd9b..c6352c776 100644 --- a/Makefile +++ b/Makefile @@ -157,7 +157,7 @@ test-smoke: test-full: $(info log-file: test-$(TEST_SUFFIX).log) - PASSES="fmt build unit integration functional e2e grpcproxy" ./test.sh 2<&1 | tee test-$(TEST_SUFFIX).log + PASSES="fmt build release unit integration functional e2e grpcproxy" ./test.sh 2<&1 | tee test-$(TEST_SUFFIX).log docker-test: $(info GO_VERSION: $(GO_VERSION)) diff --git a/etcdctl/ctlv2/command/backup_command.go b/etcdctl/ctlv2/command/backup_command.go index cd9428201..3b30d813a 100644 --- a/etcdctl/ctlv2/command/backup_command.go +++ b/etcdctl/ctlv2/command/backup_command.go @@ -171,6 +171,12 @@ func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metad continue } + if raftReq.ClusterMemberAttrSet != nil { + log.Println("ignoring cluster_member_attr_set") + remove() + continue + } + if v3 || raftReq.Header == nil { continue } diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index d16f5bf6a..f80f89ca3 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -221,17 +221,17 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { case r.AuthRoleList != nil: op = "AuthRoleList" ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList) - case r.ClusterVersionSet != nil: + case r.ClusterVersionSet != nil: // Implemented in 3.5.x op = "ClusterVersionSet" a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet) case r.ClusterMemberAttrSet != nil: - op = "ClusterMemberAttrSet" + op = "ClusterMemberAttrSet" // Implemented in 3.5.x a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet) case r.DowngradeInfoSet != nil: - op = "DowngradeInfoSet" + op = "DowngradeInfoSet" // Implemented in 3.5.x a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet) default: - panic("not implemented") + a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r)) } return ar } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index e7e2e109f..7aee4d0b9 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -721,6 +721,8 @@ func (s *EtcdServer) adjustTicks() { func (s *EtcdServer) Start() { s.start() s.GoAttach(func() { s.adjustTicks() }) + // TODO: Switch to publishV3 in 3.6. + // Support for cluster_member_set_attr was added in 3.5. s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) s.GoAttach(s.purgeFile) s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) }) @@ -1855,7 +1857,6 @@ func (s *EtcdServer) sync(timeout time.Duration) { // with the static clientURLs of the server. // The function keeps attempting to register until it succeeds, // or its server is stopped. -// TODO: replace publish() in 3.6 func (s *EtcdServer) publishV3(timeout time.Duration) { req := &membershippb.ClusterMemberAttrSetRequest{ Member_ID: uint64(s.id), @@ -1916,7 +1917,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) { // but does not go through v2 API endpoint, which means even with v2 // client handler disabled (e.g. --enable-v2=false), cluster can still // process publish requests through rafthttp -// TODO: Deprecate v2 store in 3.6 +// TODO: Remove in 3.6 (start using publishV3) func (s *EtcdServer) publish(timeout time.Duration) { lg := s.Logger() b, err := json.Marshal(s.attributes) diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index d5a747113..38d6ef2cf 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -27,7 +27,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/membershippb" "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/client/pkg/v3/types" @@ -36,6 +38,7 @@ import ( "go.etcd.io/etcd/pkg/v3/wait" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" + "go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" @@ -49,6 +52,7 @@ import ( "go.etcd.io/etcd/server/v3/mvcc" "go.etcd.io/etcd/server/v3/mvcc/backend" "go.uber.org/zap" + "go.uber.org/zap/zaptest" ) // TestDoLocalAction tests requests which do not need to go through raft to be applied, @@ -98,7 +102,7 @@ func TestDoLocalAction(t *testing.T) { v2store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), } - resp, err := srv.Do(context.TODO(), tt.req) + resp, err := srv.Do(context.Background(), tt.req) if err != tt.werr { t.Fatalf("#%d: err = %+v, want %+v", i, err, tt.werr) @@ -832,7 +836,7 @@ func TestDoProposalStopped(t *testing.T) { // TestSync tests sync 1. is nonblocking 2. proposes SYNC request. func TestSync(t *testing.T) { n := newNodeRecorder() - ctx, cancel := context.WithCancel(context.TODO()) + ctx, cancel := context.WithCancel(context.Background()) srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), @@ -877,7 +881,7 @@ func TestSync(t *testing.T) { // after timeout func TestSyncTimeout(t *testing.T) { n := newProposalBlockerRecorder() - ctx, cancel := context.WithCancel(context.TODO()) + ctx, cancel := context.WithCancel(context.Background()) srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), @@ -1313,7 +1317,7 @@ func TestAddMember(t *testing.T) { } s.start() m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}} - _, err := s.AddMember(context.TODO(), m) + _, err := s.AddMember(context.Background(), m) gaction := n.Action() s.Stop() @@ -1357,7 +1361,7 @@ func TestRemoveMember(t *testing.T) { consistIndex: cindex.NewFakeConsistentIndex(0), } s.start() - _, err := s.RemoveMember(context.TODO(), 1234) + _, err := s.RemoveMember(context.Background(), 1234) gaction := n.Action() s.Stop() @@ -1402,7 +1406,7 @@ func TestUpdateMember(t *testing.T) { } s.start() wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} - _, err := s.UpdateMember(context.TODO(), wm) + _, err := s.UpdateMember(context.Background(), wm) gaction := n.Action() s.Stop() @@ -1426,7 +1430,7 @@ func TestPublish(t *testing.T) { // simulate that request has gone through consensus ch <- Response{} w := wait.NewWithResponse(ch) - ctx, cancel := context.WithCancel(context.TODO()) + ctx, cancel := context.WithCancel(context.Background()) srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), @@ -1475,7 +1479,7 @@ func TestPublish(t *testing.T) { // TestPublishStopped tests that publish will be stopped if server is stopped. func TestPublishStopped(t *testing.T) { - ctx, cancel := context.WithCancel(context.TODO()) + ctx, cancel := context.WithCancel(context.Background()) r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: newNodeNop(), @@ -1503,7 +1507,7 @@ func TestPublishStopped(t *testing.T) { // TestPublishRetry tests that publish will keep retry until success. func TestPublishRetry(t *testing.T) { - ctx, cancel := context.WithCancel(context.TODO()) + ctx, cancel := context.WithCancel(context.Background()) n := newNodeRecorderStream() srv := &EtcdServer{ lgMu: new(sync.RWMutex), @@ -1540,6 +1544,127 @@ func TestPublishRetry(t *testing.T) { <-ch } +func TestPublishV3(t *testing.T) { + n := newNodeRecorder() + ch := make(chan interface{}, 1) + // simulate that request has gone through consensus + ch <- &applyResult{} + w := wait.NewWithResponse(ch) + ctx, cancel := context.WithCancel(context.Background()) + lg := zaptest.NewLogger(t) + be, _ := backend.NewDefaultTmpBackend(t) + srv := &EtcdServer{ + lgMu: new(sync.RWMutex), + lg: lg, + readych: make(chan struct{}), + Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000}, + id: 1, + r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}), + attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, + cluster: &membership.RaftCluster{}, + w: w, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, + authStore: auth.NewAuthStore(lg, be, nil, nil, 0), + be: be, + ctx: ctx, + cancel: cancel, + } + srv.publishV3(time.Hour) + + action := n.Action() + if len(action) != 1 { + t.Fatalf("len(action) = %d, want 1", len(action)) + } + if action[0].Name != "Propose" { + t.Fatalf("action = %s, want Propose", action[0].Name) + } + data := action[0].Params[0].([]byte) + var r pb.InternalRaftRequest + if err := r.Unmarshal(data); err != nil { + t.Fatalf("unmarshal request error: %v", err) + } + assert.Equal(t, &membershippb.ClusterMemberAttrSetRequest{Member_ID: 0x1, MemberAttributes: &membershippb.Attributes{ + Name: "node1", ClientUrls: []string{"http://a", "http://b"}}}, r.ClusterMemberAttrSet) +} + +// TestPublishStopped tests that publish will be stopped if server is stopped. +func TestPublishV3Stopped(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + r := newRaftNode(raftNodeConfig{ + lg: zap.NewExample(), + Node: newNodeNop(), + transport: newNopTransporter(), + }) + srv := &EtcdServer{ + lgMu: new(sync.RWMutex), + lg: zap.NewExample(), + Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + r: *r, + cluster: &membership.RaftCluster{}, + w: mockwait.NewNop(), + done: make(chan struct{}), + stopping: make(chan struct{}), + stop: make(chan struct{}), + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, + + ctx: ctx, + cancel: cancel, + } + close(srv.stopping) + srv.publishV3(time.Hour) +} + +// TestPublishRetry tests that publish will keep retry until success. +func TestPublishV3Retry(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + n := newNodeRecorderStream() + + lg := zaptest.NewLogger(t) + be, _ := backend.NewDefaultTmpBackend(t) + srv := &EtcdServer{ + lgMu: new(sync.RWMutex), + lg: lg, + readych: make(chan struct{}), + Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000}, + id: 1, + r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}), + w: mockwait.NewNop(), + stopping: make(chan struct{}), + attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, + cluster: &membership.RaftCluster{}, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, + authStore: auth.NewAuthStore(lg, be, nil, nil, 0), + be: be, + ctx: ctx, + cancel: cancel, + } + + // expect multiple proposals from retrying + ch := make(chan struct{}) + go func() { + defer close(ch) + if action, err := n.Wait(2); err != nil { + t.Errorf("len(action) = %d, want >= 2 (%v)", len(action), err) + } + close(srv.stopping) + // drain remaining actions, if any, so publish can terminate + for { + select { + case <-ch: + return + default: + n.Action() + } + } + }() + srv.publishV3(10 * time.Nanosecond) + ch <- struct{}{} + <-ch +} + func TestStopNotify(t *testing.T) { s := &EtcdServer{ lgMu: new(sync.RWMutex), diff --git a/server/go.mod b/server/go.mod index 2af585cec..be81c5a71 100644 --- a/server/go.mod +++ b/server/go.mod @@ -23,6 +23,7 @@ require ( github.com/sirupsen/logrus v1.7.0 // indirect github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.1.1 + github.com/stretchr/testify v1.5.1 github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 go.etcd.io/bbolt v1.3.5 diff --git a/test.sh b/test.sh index 715b335d2..2726255ca 100755 --- a/test.sh +++ b/test.sh @@ -613,7 +613,7 @@ function dep_pass { function release_pass { rm -f ./bin/etcd-last-release # to grab latest patch release; bump this up for every minor release - UPGRADE_VER=$(git tag -l --sort=-version:refname "v3.3.*" | head -1) + UPGRADE_VER=$(git tag -l --sort=-version:refname "v3.4.*" | head -1) if [ -n "$MANUAL_VER" ]; then # in case, we need to test against different version UPGRADE_VER=$MANUAL_VER diff --git a/tests/e2e/main_test.go b/tests/e2e/main_test.go index 791c9d58e..75adb1e8e 100644 --- a/tests/e2e/main_test.go +++ b/tests/e2e/main_test.go @@ -6,7 +6,9 @@ package e2e import ( "flag" + "log" "os" + "path/filepath" "runtime" "testing" @@ -36,8 +38,18 @@ func TestMain(m *testing.M) { os.Setenv("ETCD_UNSUPPORTED_ARCH", runtime.GOARCH) os.Unsetenv("ETCDCTL_API") - flag.StringVar(&binDir, "bin-dir", "../../bin", "The directory for store etcd and etcdctl binaries.") - flag.StringVar(&certDir, "cert-dir", "../fixtures", "The directory for store certificate files.") + binDirDef, err := filepath.Abs("../../bin") + if err != nil { + log.Fatal(err) + } + + certDirDef, err := filepath.Abs("../fixtures") + if err != nil { + log.Fatal(err) + } + + flag.StringVar(&binDir, "bin-dir", binDirDef, "The directory for store etcd and etcdctl binaries.") + flag.StringVar(&certDir, "cert-dir", certDirDef, "The directory for store certificate files.") flag.Parse() binPath = binDir + "/etcd"