From 70a9391378a707b43c434f592bf6a221695141f6 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 23 Mar 2016 16:39:15 -0700 Subject: [PATCH] *: enable v3 by default --- Documentation/configuration.md | 7 - e2e/etcd_test.go | 4 - e2e/etcdctlv3_test.go | 1 - etcdmain/config.go | 2 - etcdmain/etcd.go | 1 - etcdmain/help.go | 2 - etcdserver/config.go | 1 - etcdserver/raft.go | 10 +- etcdserver/server.go | 126 ++++++++---------- etcdserver/server_test.go | 100 +++----------- integration/cluster.go | 6 - integration/v3_grpc_test.go | 1 - rafthttp/peer.go | 8 +- rafthttp/transport.go | 3 +- .../functional-tester/etcd-tester/cluster.go | 4 - 15 files changed, 78 insertions(+), 198 deletions(-) diff --git a/Documentation/configuration.md b/Documentation/configuration.md index 4cf713630..99817aa67 100644 --- a/Documentation/configuration.md +++ b/Documentation/configuration.md @@ -250,13 +250,6 @@ Follow the instructions when using these flags. + default: false + env variable: ETCD_FORCE_NEW_CLUSTER -## Experimental Flags - -### --experimental-v3demo -+ Enable experimental [v3 demo API][rfc-v3]. -+ default: false -+ env variable: ETCD_EXPERIMENTAL_V3DEMO - ## Miscellaneous Flags ### --version diff --git a/e2e/etcd_test.go b/e2e/etcd_test.go index 6e47967a6..f3daf38bb 100644 --- a/e2e/etcd_test.go +++ b/e2e/etcd_test.go @@ -229,7 +229,6 @@ type etcdProcessClusterConfig struct { isPeerTLS bool isPeerAutoTLS bool initialToken string - isV3 bool } // newEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -342,9 +341,6 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig { "--initial-cluster-token", cfg.initialToken, "--data-dir", dataDirPath, } - if cfg.isV3 { - args = append(args, "--experimental-v3demo") - } args = append(args, cfg.tlsArgs()...) diff --git a/e2e/etcdctlv3_test.go b/e2e/etcdctlv3_test.go index bffdf9e3b..6024df7d9 100644 --- a/e2e/etcdctlv3_test.go +++ b/e2e/etcdctlv3_test.go @@ -141,7 +141,6 @@ func setupCtlV3Test(t *testing.T, cfg *etcdProcessClusterConfig, quorum bool) *e cfg = configStandalone(*cfg) } copied := *cfg - copied.isV3 = true epc, err := newEtcdProcessCluster(&copied) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) diff --git a/etcdmain/config.go b/etcdmain/config.go index 7823f1b63..77d2fce3d 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -122,7 +122,6 @@ type config struct { printVersion bool - v3demo bool autoCompactionRetention int enablePprof bool @@ -224,7 +223,6 @@ func NewConfig() *config { fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.") // demo flag - fs.BoolVar(&cfg.v3demo, "experimental-v3demo", false, "Enable experimental v3 demo API.") fs.IntVar(&cfg.autoCompactionRetention, "experimental-auto-compaction-retention", 0, "Auto compaction retention in hour. 0 means disable auto compaction.") // backwards-compatibility with v0.4.6 diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 9602847bc..c9531d049 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -333,7 +333,6 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { PeerTLSInfo: cfg.peerTLSInfo, TickMs: cfg.TickMs, ElectionTicks: cfg.electionTicks(), - V3demo: cfg.v3demo, AutoCompactionRetention: cfg.autoCompactionRetention, StrictReconfigCheck: cfg.strictReconfigCheck, EnablePprof: cfg.enablePprof, diff --git a/etcdmain/help.go b/etcdmain/help.go index 5379c36f0..84f9ae7ee 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -135,8 +135,6 @@ given by the consensus protocol. experimental flags: - --experimental-v3demo 'false' - enable experimental v3 demo API. --experimental-auto-compaction-retention '0' auto compaction retention in hour. 0 means disable auto compaction. diff --git a/etcdserver/config.go b/etcdserver/config.go index 0a5384182..b897b03ab 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -50,7 +50,6 @@ type ServerConfig struct { ElectionTicks int BootstrapTimeout time.Duration - V3demo bool AutoCompactionRetention int StrictReconfigCheck bool diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 56f25b45f..803d5cb55 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -312,10 +312,7 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r Storage: s, MaxSizePerMsg: maxSizePerMsg, MaxInflightMsgs: maxInflightMsgs, - } - - if cfg.V3demo { - c.CheckQuorum = true + CheckQuorum: true, } n = raft.StartNode(c, peers) @@ -349,10 +346,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust Storage: s, MaxSizePerMsg: maxSizePerMsg, MaxInflightMsgs: maxInflightMsgs, - } - - if cfg.V3demo { - c.CheckQuorum = true + CheckQuorum: true, } n := raft.RestartNode(c) diff --git a/etcdserver/server.go b/etcdserver/server.go index 405b929b3..8e14170e4 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -16,7 +16,6 @@ package etcdserver import ( "encoding/json" - "errors" "expvar" "fmt" "math/rand" @@ -221,10 +220,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { return nil, fmt.Errorf("cannot access data directory: %v", terr) } - if !cfg.V3demo && fileutil.Exist(path.Join(cfg.SnapDir(), databaseFilename)) { - return nil, errors.New("experimental-v3demo cannot be disabled once it is enabled") - } - // Run the migrations. dataVer, err := version.DetectDataDir(cfg.DataDir) if err != nil { @@ -370,15 +365,13 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), } - if cfg.V3demo { - srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename)) - srv.lessor = lease.NewLessor(srv.be) - srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex) - srv.authStore = auth.NewAuthStore(srv.be) - if h := cfg.AutoCompactionRetention; h != 0 { - srv.compactor = compactor.NewPeriodic(h, srv.kv, srv) - srv.compactor.Run() - } + srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename)) + srv.lessor = lease.NewLessor(srv.be) + srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex) + srv.authStore = auth.NewAuthStore(srv.be) + if h := cfg.AutoCompactionRetention; h != 0 { + srv.compactor = compactor.NewPeriodic(h, srv.kv, srv) + srv.compactor.Run() } // TODO: move transport initialization near the definition of remote @@ -393,7 +386,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { ServerStats: sstats, LeaderStats: lstats, ErrorC: srv.errorc, - V3demo: cfg.V3demo, } if err := tr.Start(); err != nil { return nil, err @@ -588,44 +580,43 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { apply.snapshot.Metadata.Index, ep.appliedi) } - if s.cfg.V3demo { - snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index) - if err != nil { - plog.Panicf("get database snapshot file path error: %v", err) - } - - fn := path.Join(s.cfg.SnapDir(), databaseFilename) - if err := os.Rename(snapfn, fn); err != nil { - plog.Panicf("rename snapshot file error: %v", err) - } - - newbe := backend.NewDefaultBackend(fn) - if err := s.kv.Restore(newbe); err != nil { - plog.Panicf("restore KV error: %v", err) - } - - // Closing old backend might block until all the txns - // on the backend are finished. - // We do not want to wait on closing the old backend. - s.bemu.Lock() - oldbe := s.be - go func() { - if err := oldbe.Close(); err != nil { - plog.Panicf("close backend error: %v", err) - } - }() - - s.be = newbe - s.bemu.Unlock() - - if s.lessor != nil { - s.lessor.Recover(newbe, s.kv) - } - - if s.authStore != nil { - s.authStore.Recover(newbe) - } + snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index) + if err != nil { + plog.Panicf("get database snapshot file path error: %v", err) } + + fn := path.Join(s.cfg.SnapDir(), databaseFilename) + if err := os.Rename(snapfn, fn); err != nil { + plog.Panicf("rename snapshot file error: %v", err) + } + + newbe := backend.NewDefaultBackend(fn) + if err := s.kv.Restore(newbe); err != nil { + plog.Panicf("restore KV error: %v", err) + } + + // Closing old backend might block until all the txns + // on the backend are finished. + // We do not want to wait on closing the old backend. + s.bemu.Lock() + oldbe := s.be + go func() { + if err := oldbe.Close(); err != nil { + plog.Panicf("close backend error: %v", err) + } + }() + + s.be = newbe + s.bemu.Unlock() + + if s.lessor != nil { + s.lessor.Recover(newbe, s.kv) + } + + if s.authStore != nil { + s.authStore.Recover(newbe) + } + if err := s.store.Recovery(apply.snapshot.Data); err != nil { plog.Panicf("recovery store error: %v", err) } @@ -938,20 +929,17 @@ func (s *EtcdServer) send(ms []raftpb.Message) { ms[i].To = 0 } - if s.cfg.V3demo { - if ms[i].Type == raftpb.MsgSnap { - // There are two separate data store when v3 demo is enabled: the store for v2, - // and the KV for v3. - // The msgSnap only contains the most recent snapshot of store without KV. - // So we need to redirect the msgSnap to etcd server main loop for merging in the - // current store snapshot and KV snapshot. - select { - case s.msgSnapC <- ms[i]: - default: - // drop msgSnap if the inflight chan if full. - } - ms[i].To = 0 + if ms[i].Type == raftpb.MsgSnap { + // There are two separate data store: the store for v2, and the KV for v3. + // The msgSnap only contains the most recent snapshot of store without KV. + // So we need to redirect the msgSnap to etcd server main loop for merging in the + // current store snapshot and KV snapshot. + select { + case s.msgSnapC <- ms[i]: + default: + // drop msgSnap if the inflight chan if full. } + ms[i].To = 0 } if ms[i].Type == raftpb.MsgHeartbeat { ok, exceed := s.r.td.Observe(ms[i].To) @@ -1182,11 +1170,9 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { } plog.Panicf("unexpected create snapshot error %v", err) } - if s.cfg.V3demo { - // commit v3 storage because WAL file before snapshot index - // could be removed after SaveSnap. - s.getKV().Commit() - } + // commit v3 storage because WAL file before snapshot index + // could be removed after SaveSnap. + s.getKV().Commit() // SaveSnap saves the snapshot and releases the locked wal files // to the snapshot index. if err = s.r.storage.SaveSnap(snap); err != nil { diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index bf72d0d54..2e17d3fad 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -822,6 +822,11 @@ func TestSyncTrigger(t *testing.T) { // snapshot should snapshot the store and cut the persistent func TestSnapshot(t *testing.T) { + be, tmpPath := backend.NewDefaultTmpBackend() + defer func() { + os.RemoveAll(tmpPath) + }() + s := raft.NewMemoryStorage() s.Append([]raftpb.Entry{{Index: 1}}) st := mockstore.NewRecorder() @@ -835,6 +840,9 @@ func TestSnapshot(t *testing.T) { }, store: st, } + srv.kv = dstorage.New(be, &lease.FakeLessor{}, &srv.consistIndex) + srv.be = be + srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}}) gaction, _ := st.Wait(2) if len(gaction) != 2 { @@ -857,6 +865,11 @@ func TestSnapshot(t *testing.T) { // Applied > SnapCount should trigger a SaveSnap event func TestTriggerSnap(t *testing.T) { + be, tmpPath := backend.NewDefaultTmpBackend() + defer func() { + os.RemoveAll(tmpPath) + }() + snapc := 10 st := mockstore.NewRecorder() p := mockstorage.NewStorageRecorderStream("") @@ -872,6 +885,9 @@ func TestTriggerSnap(t *testing.T) { store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), } + srv.kv = dstorage.New(be, &lease.FakeLessor{}, &srv.consistIndex) + srv.be = be + srv.start() donec := make(chan struct{}) @@ -922,7 +938,6 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { tr, snapDoneC := rafthttp.NewSnapTransporter(testdir) s := &EtcdServer{ cfg: &ServerConfig{ - V3demo: true, DataDir: testdir, }, r: raftNode{ @@ -995,89 +1010,6 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { } } -// TestRecvSnapshot tests when it receives a snapshot from raft leader, -// it should trigger storage.SaveSnap and also store.Recover. -func TestRecvSnapshot(t *testing.T) { - n := newNopReadyNode() - st := mockstore.NewRecorder() - p := mockstorage.NewStorageRecorder("") - cl := newCluster("abc") - cl.SetStore(store.New()) - s := &EtcdServer{ - cfg: &ServerConfig{}, - r: raftNode{ - Node: n, - transport: rafthttp.NewNopTransporter(), - storage: p, - raftStorage: raft.NewMemoryStorage(), - }, - store: st, - cluster: cl, - } - - s.start() - n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}} - - // wait for actions happened on the storage - for len(p.Action()) == 0 { - time.Sleep(10 * time.Millisecond) - } - - s.Stop() - - wactions := []testutil.Action{{Name: "Recovery"}} - if g := st.Action(); !reflect.DeepEqual(g, wactions) { - t.Errorf("store action = %v, want %v", g, wactions) - } - wactions = []testutil.Action{{Name: "SaveSnap"}, {Name: "Save"}} - if g := p.Action(); !reflect.DeepEqual(g, wactions) { - t.Errorf("storage action = %v, want %v", g, wactions) - } -} - -// TestApplySnapshotAndCommittedEntries tests that server applies snapshot -// first and then committed entries. -func TestApplySnapshotAndCommittedEntries(t *testing.T) { - n := newNopReadyNode() - st := mockstore.NewRecorderStream() - cl := newCluster("abc") - cl.SetStore(store.New()) - storage := raft.NewMemoryStorage() - s := &EtcdServer{ - cfg: &ServerConfig{}, - r: raftNode{ - Node: n, - storage: mockstorage.NewStorageRecorder(""), - raftStorage: storage, - transport: rafthttp.NewNopTransporter(), - }, - store: st, - cluster: cl, - } - - s.start() - req := &pb.Request{Method: "QGET"} - n.readyc <- raft.Ready{ - Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}, - CommittedEntries: []raftpb.Entry{ - {Index: 2, Data: pbutil.MustMarshal(req)}, - }, - } - // make goroutines move forward to receive snapshot - actions, _ := st.Wait(2) - s.Stop() - - if len(actions) != 2 { - t.Fatalf("len(action) = %d, want 2", len(actions)) - } - if actions[0].Name != "Recovery" { - t.Errorf("actions[0] = %s, want %s", actions[0].Name, "Recovery") - } - if actions[1].Name != "Get" { - t.Errorf("actions[1] = %s, want %s", actions[1].Name, "Get") - } -} - // TestAddMember tests AddMember can propose and perform node addition. func TestAddMember(t *testing.T) { n := newNodeConfChangeCommitterRecorder() diff --git a/integration/cluster.go b/integration/cluster.go index 71d16f186..373679e3a 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -72,7 +72,6 @@ type ClusterConfig struct { PeerTLS *transport.TLSInfo ClientTLS *transport.TLSInfo DiscoveryURL string - UseV3 bool UseGRPC bool } @@ -199,7 +198,6 @@ func (c *cluster) mustNewMember(t *testing.T) *member { name := c.name(rand.Int()) m := mustNewMember(t, name, c.cfg.PeerTLS, c.cfg.ClientTLS) m.DiscoveryURL = c.cfg.DiscoveryURL - m.V3demo = c.cfg.UseV3 if c.cfg.UseGRPC { if err := m.listenGRPC(); err != nil { t.Fatal(err) @@ -471,9 +469,6 @@ func mustNewMember(t *testing.T, name string, peerTLS *transport.TLSInfo, client // listenGRPC starts a grpc server over a unix domain socket on the member func (m *member) listenGRPC() error { - if m.V3demo == false { - return fmt.Errorf("starting grpc server without v3 configured") - } // prefix with localhost so cert has right domain m.grpcAddr = "localhost:" + m.Name + ".sock" if err := os.RemoveAll(m.grpcAddr); err != nil { @@ -723,7 +718,6 @@ type ClusterV3 struct { // NewClusterV3 returns a launched cluster with a grpc client connection // for each cluster member. func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 { - cfg.UseV3 = true cfg.UseGRPC = true clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)} for _, m := range clus.Members { diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index daf2af62e..3cc2cbf5c 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -629,7 +629,6 @@ func TestV3RangeRequest(t *testing.T) { } func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 { - cfg.UseV3 = true cfg.UseGRPC = true clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)} clus.Launch(t) diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 145c53fef..87986e72b 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -92,9 +92,8 @@ type Peer interface { // It is only used when the stream has not been established. type peer struct { // id of the remote raft peer node - id types.ID - r Raft - v3demo bool + id types.ID + r Raft status *peerStatus @@ -118,13 +117,12 @@ type peer struct { stopc chan struct{} } -func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer { +func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer { status := newPeerStatus(to) picker := newURLPicker(urls) p := &peer{ id: to, r: r, - v3demo: v3demo, status: status, picker: picker, msgAppV2Writer: startStreamWriter(to, status, fs, r), diff --git a/rafthttp/transport.go b/rafthttp/transport.go index b2654ef73..f9ee78bdf 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -111,7 +111,6 @@ type Transport struct { // When an error is received from ErrorC, user should stop raft state // machine and thus stop the Transport. ErrorC chan error - V3demo bool streamRt http.RoundTripper // roundTripper used by streams pipelineRt http.RoundTripper // roundTripper used by pipelines @@ -232,7 +231,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) { plog.Panicf("newURLs %+v should never fail: %+v", us, err) } fs := t.LeaderStats.Follower(id.String()) - t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC, t.V3demo) + t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC) addPeerToProber(t.prober, id.String(), us) } diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go index 4338fe442..e663fbac2 100644 --- a/tools/functional-tester/etcd-tester/cluster.go +++ b/tools/functional-tester/etcd-tester/cluster.go @@ -112,10 +112,6 @@ func (c *cluster) Bootstrap() error { "--initial-cluster", clusterStr, "--initial-cluster-state", "new", } - if !c.v2Only { - flags = append(flags, - "--experimental-v3demo") - } if _, err := a.Start(flags...); err != nil { // cleanup