*: enable v3 by default

This commit is contained in:
Xiang Li 2016-03-23 16:39:15 -07:00
parent 333ac5789a
commit 70a9391378
15 changed files with 78 additions and 198 deletions

View File

@ -250,13 +250,6 @@ Follow the instructions when using these flags.
+ default: false
+ env variable: ETCD_FORCE_NEW_CLUSTER
## Experimental Flags
### --experimental-v3demo
+ Enable experimental [v3 demo API][rfc-v3].
+ default: false
+ env variable: ETCD_EXPERIMENTAL_V3DEMO
## Miscellaneous Flags
### --version

View File

@ -229,7 +229,6 @@ type etcdProcessClusterConfig struct {
isPeerTLS bool
isPeerAutoTLS bool
initialToken string
isV3 bool
}
// newEtcdProcessCluster launches a new cluster from etcd processes, returning
@ -342,9 +341,6 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
"--initial-cluster-token", cfg.initialToken,
"--data-dir", dataDirPath,
}
if cfg.isV3 {
args = append(args, "--experimental-v3demo")
}
args = append(args, cfg.tlsArgs()...)

View File

@ -141,7 +141,6 @@ func setupCtlV3Test(t *testing.T, cfg *etcdProcessClusterConfig, quorum bool) *e
cfg = configStandalone(*cfg)
}
copied := *cfg
copied.isV3 = true
epc, err := newEtcdProcessCluster(&copied)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)

View File

@ -122,7 +122,6 @@ type config struct {
printVersion bool
v3demo bool
autoCompactionRetention int
enablePprof bool
@ -224,7 +223,6 @@ func NewConfig() *config {
fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.")
// demo flag
fs.BoolVar(&cfg.v3demo, "experimental-v3demo", false, "Enable experimental v3 demo API.")
fs.IntVar(&cfg.autoCompactionRetention, "experimental-auto-compaction-retention", 0, "Auto compaction retention in hour. 0 means disable auto compaction.")
// backwards-compatibility with v0.4.6

View File

@ -333,7 +333,6 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
PeerTLSInfo: cfg.peerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.electionTicks(),
V3demo: cfg.v3demo,
AutoCompactionRetention: cfg.autoCompactionRetention,
StrictReconfigCheck: cfg.strictReconfigCheck,
EnablePprof: cfg.enablePprof,

View File

@ -135,8 +135,6 @@ given by the consensus protocol.
experimental flags:
--experimental-v3demo 'false'
enable experimental v3 demo API.
--experimental-auto-compaction-retention '0'
auto compaction retention in hour. 0 means disable auto compaction.

View File

@ -50,7 +50,6 @@ type ServerConfig struct {
ElectionTicks int
BootstrapTimeout time.Duration
V3demo bool
AutoCompactionRetention int
StrictReconfigCheck bool

View File

@ -312,10 +312,7 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r
Storage: s,
MaxSizePerMsg: maxSizePerMsg,
MaxInflightMsgs: maxInflightMsgs,
}
if cfg.V3demo {
c.CheckQuorum = true
CheckQuorum: true,
}
n = raft.StartNode(c, peers)
@ -349,10 +346,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust
Storage: s,
MaxSizePerMsg: maxSizePerMsg,
MaxInflightMsgs: maxInflightMsgs,
}
if cfg.V3demo {
c.CheckQuorum = true
CheckQuorum: true,
}
n := raft.RestartNode(c)

View File

@ -16,7 +16,6 @@ package etcdserver
import (
"encoding/json"
"errors"
"expvar"
"fmt"
"math/rand"
@ -221,10 +220,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
return nil, fmt.Errorf("cannot access data directory: %v", terr)
}
if !cfg.V3demo && fileutil.Exist(path.Join(cfg.SnapDir(), databaseFilename)) {
return nil, errors.New("experimental-v3demo cannot be disabled once it is enabled")
}
// Run the migrations.
dataVer, err := version.DetectDataDir(cfg.DataDir)
if err != nil {
@ -370,15 +365,13 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
}
if cfg.V3demo {
srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
srv.lessor = lease.NewLessor(srv.be)
srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex)
srv.authStore = auth.NewAuthStore(srv.be)
if h := cfg.AutoCompactionRetention; h != 0 {
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
srv.compactor.Run()
}
srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
srv.lessor = lease.NewLessor(srv.be)
srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex)
srv.authStore = auth.NewAuthStore(srv.be)
if h := cfg.AutoCompactionRetention; h != 0 {
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
srv.compactor.Run()
}
// TODO: move transport initialization near the definition of remote
@ -393,7 +386,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
ServerStats: sstats,
LeaderStats: lstats,
ErrorC: srv.errorc,
V3demo: cfg.V3demo,
}
if err := tr.Start(); err != nil {
return nil, err
@ -588,44 +580,43 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
apply.snapshot.Metadata.Index, ep.appliedi)
}
if s.cfg.V3demo {
snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index)
if err != nil {
plog.Panicf("get database snapshot file path error: %v", err)
}
fn := path.Join(s.cfg.SnapDir(), databaseFilename)
if err := os.Rename(snapfn, fn); err != nil {
plog.Panicf("rename snapshot file error: %v", err)
}
newbe := backend.NewDefaultBackend(fn)
if err := s.kv.Restore(newbe); err != nil {
plog.Panicf("restore KV error: %v", err)
}
// Closing old backend might block until all the txns
// on the backend are finished.
// We do not want to wait on closing the old backend.
s.bemu.Lock()
oldbe := s.be
go func() {
if err := oldbe.Close(); err != nil {
plog.Panicf("close backend error: %v", err)
}
}()
s.be = newbe
s.bemu.Unlock()
if s.lessor != nil {
s.lessor.Recover(newbe, s.kv)
}
if s.authStore != nil {
s.authStore.Recover(newbe)
}
snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index)
if err != nil {
plog.Panicf("get database snapshot file path error: %v", err)
}
fn := path.Join(s.cfg.SnapDir(), databaseFilename)
if err := os.Rename(snapfn, fn); err != nil {
plog.Panicf("rename snapshot file error: %v", err)
}
newbe := backend.NewDefaultBackend(fn)
if err := s.kv.Restore(newbe); err != nil {
plog.Panicf("restore KV error: %v", err)
}
// Closing old backend might block until all the txns
// on the backend are finished.
// We do not want to wait on closing the old backend.
s.bemu.Lock()
oldbe := s.be
go func() {
if err := oldbe.Close(); err != nil {
plog.Panicf("close backend error: %v", err)
}
}()
s.be = newbe
s.bemu.Unlock()
if s.lessor != nil {
s.lessor.Recover(newbe, s.kv)
}
if s.authStore != nil {
s.authStore.Recover(newbe)
}
if err := s.store.Recovery(apply.snapshot.Data); err != nil {
plog.Panicf("recovery store error: %v", err)
}
@ -938,20 +929,17 @@ func (s *EtcdServer) send(ms []raftpb.Message) {
ms[i].To = 0
}
if s.cfg.V3demo {
if ms[i].Type == raftpb.MsgSnap {
// There are two separate data store when v3 demo is enabled: the store for v2,
// and the KV for v3.
// The msgSnap only contains the most recent snapshot of store without KV.
// So we need to redirect the msgSnap to etcd server main loop for merging in the
// current store snapshot and KV snapshot.
select {
case s.msgSnapC <- ms[i]:
default:
// drop msgSnap if the inflight chan if full.
}
ms[i].To = 0
if ms[i].Type == raftpb.MsgSnap {
// There are two separate data store: the store for v2, and the KV for v3.
// The msgSnap only contains the most recent snapshot of store without KV.
// So we need to redirect the msgSnap to etcd server main loop for merging in the
// current store snapshot and KV snapshot.
select {
case s.msgSnapC <- ms[i]:
default:
// drop msgSnap if the inflight chan if full.
}
ms[i].To = 0
}
if ms[i].Type == raftpb.MsgHeartbeat {
ok, exceed := s.r.td.Observe(ms[i].To)
@ -1182,11 +1170,9 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
}
plog.Panicf("unexpected create snapshot error %v", err)
}
if s.cfg.V3demo {
// commit v3 storage because WAL file before snapshot index
// could be removed after SaveSnap.
s.getKV().Commit()
}
// commit v3 storage because WAL file before snapshot index
// could be removed after SaveSnap.
s.getKV().Commit()
// SaveSnap saves the snapshot and releases the locked wal files
// to the snapshot index.
if err = s.r.storage.SaveSnap(snap); err != nil {

View File

@ -822,6 +822,11 @@ func TestSyncTrigger(t *testing.T) {
// snapshot should snapshot the store and cut the persistent
func TestSnapshot(t *testing.T) {
be, tmpPath := backend.NewDefaultTmpBackend()
defer func() {
os.RemoveAll(tmpPath)
}()
s := raft.NewMemoryStorage()
s.Append([]raftpb.Entry{{Index: 1}})
st := mockstore.NewRecorder()
@ -835,6 +840,9 @@ func TestSnapshot(t *testing.T) {
},
store: st,
}
srv.kv = dstorage.New(be, &lease.FakeLessor{}, &srv.consistIndex)
srv.be = be
srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
gaction, _ := st.Wait(2)
if len(gaction) != 2 {
@ -857,6 +865,11 @@ func TestSnapshot(t *testing.T) {
// Applied > SnapCount should trigger a SaveSnap event
func TestTriggerSnap(t *testing.T) {
be, tmpPath := backend.NewDefaultTmpBackend()
defer func() {
os.RemoveAll(tmpPath)
}()
snapc := 10
st := mockstore.NewRecorder()
p := mockstorage.NewStorageRecorderStream("")
@ -872,6 +885,9 @@ func TestTriggerSnap(t *testing.T) {
store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.kv = dstorage.New(be, &lease.FakeLessor{}, &srv.consistIndex)
srv.be = be
srv.start()
donec := make(chan struct{})
@ -922,7 +938,6 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
s := &EtcdServer{
cfg: &ServerConfig{
V3demo: true,
DataDir: testdir,
},
r: raftNode{
@ -995,89 +1010,6 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
}
}
// TestRecvSnapshot tests when it receives a snapshot from raft leader,
// it should trigger storage.SaveSnap and also store.Recover.
func TestRecvSnapshot(t *testing.T) {
n := newNopReadyNode()
st := mockstore.NewRecorder()
p := mockstorage.NewStorageRecorder("")
cl := newCluster("abc")
cl.SetStore(store.New())
s := &EtcdServer{
cfg: &ServerConfig{},
r: raftNode{
Node: n,
transport: rafthttp.NewNopTransporter(),
storage: p,
raftStorage: raft.NewMemoryStorage(),
},
store: st,
cluster: cl,
}
s.start()
n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}}
// wait for actions happened on the storage
for len(p.Action()) == 0 {
time.Sleep(10 * time.Millisecond)
}
s.Stop()
wactions := []testutil.Action{{Name: "Recovery"}}
if g := st.Action(); !reflect.DeepEqual(g, wactions) {
t.Errorf("store action = %v, want %v", g, wactions)
}
wactions = []testutil.Action{{Name: "SaveSnap"}, {Name: "Save"}}
if g := p.Action(); !reflect.DeepEqual(g, wactions) {
t.Errorf("storage action = %v, want %v", g, wactions)
}
}
// TestApplySnapshotAndCommittedEntries tests that server applies snapshot
// first and then committed entries.
func TestApplySnapshotAndCommittedEntries(t *testing.T) {
n := newNopReadyNode()
st := mockstore.NewRecorderStream()
cl := newCluster("abc")
cl.SetStore(store.New())
storage := raft.NewMemoryStorage()
s := &EtcdServer{
cfg: &ServerConfig{},
r: raftNode{
Node: n,
storage: mockstorage.NewStorageRecorder(""),
raftStorage: storage,
transport: rafthttp.NewNopTransporter(),
},
store: st,
cluster: cl,
}
s.start()
req := &pb.Request{Method: "QGET"}
n.readyc <- raft.Ready{
Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}},
CommittedEntries: []raftpb.Entry{
{Index: 2, Data: pbutil.MustMarshal(req)},
},
}
// make goroutines move forward to receive snapshot
actions, _ := st.Wait(2)
s.Stop()
if len(actions) != 2 {
t.Fatalf("len(action) = %d, want 2", len(actions))
}
if actions[0].Name != "Recovery" {
t.Errorf("actions[0] = %s, want %s", actions[0].Name, "Recovery")
}
if actions[1].Name != "Get" {
t.Errorf("actions[1] = %s, want %s", actions[1].Name, "Get")
}
}
// TestAddMember tests AddMember can propose and perform node addition.
func TestAddMember(t *testing.T) {
n := newNodeConfChangeCommitterRecorder()

View File

@ -72,7 +72,6 @@ type ClusterConfig struct {
PeerTLS *transport.TLSInfo
ClientTLS *transport.TLSInfo
DiscoveryURL string
UseV3 bool
UseGRPC bool
}
@ -199,7 +198,6 @@ func (c *cluster) mustNewMember(t *testing.T) *member {
name := c.name(rand.Int())
m := mustNewMember(t, name, c.cfg.PeerTLS, c.cfg.ClientTLS)
m.DiscoveryURL = c.cfg.DiscoveryURL
m.V3demo = c.cfg.UseV3
if c.cfg.UseGRPC {
if err := m.listenGRPC(); err != nil {
t.Fatal(err)
@ -471,9 +469,6 @@ func mustNewMember(t *testing.T, name string, peerTLS *transport.TLSInfo, client
// listenGRPC starts a grpc server over a unix domain socket on the member
func (m *member) listenGRPC() error {
if m.V3demo == false {
return fmt.Errorf("starting grpc server without v3 configured")
}
// prefix with localhost so cert has right domain
m.grpcAddr = "localhost:" + m.Name + ".sock"
if err := os.RemoveAll(m.grpcAddr); err != nil {
@ -723,7 +718,6 @@ type ClusterV3 struct {
// NewClusterV3 returns a launched cluster with a grpc client connection
// for each cluster member.
func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
cfg.UseV3 = true
cfg.UseGRPC = true
clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
for _, m := range clus.Members {

View File

@ -629,7 +629,6 @@ func TestV3RangeRequest(t *testing.T) {
}
func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
cfg.UseV3 = true
cfg.UseGRPC = true
clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
clus.Launch(t)

View File

@ -92,9 +92,8 @@ type Peer interface {
// It is only used when the stream has not been established.
type peer struct {
// id of the remote raft peer node
id types.ID
r Raft
v3demo bool
id types.ID
r Raft
status *peerStatus
@ -118,13 +117,12 @@ type peer struct {
stopc chan struct{}
}
func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer {
func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
status := newPeerStatus(to)
picker := newURLPicker(urls)
p := &peer{
id: to,
r: r,
v3demo: v3demo,
status: status,
picker: picker,
msgAppV2Writer: startStreamWriter(to, status, fs, r),

View File

@ -111,7 +111,6 @@ type Transport struct {
// When an error is received from ErrorC, user should stop raft state
// machine and thus stop the Transport.
ErrorC chan error
V3demo bool
streamRt http.RoundTripper // roundTripper used by streams
pipelineRt http.RoundTripper // roundTripper used by pipelines
@ -232,7 +231,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
plog.Panicf("newURLs %+v should never fail: %+v", us, err)
}
fs := t.LeaderStats.Follower(id.String())
t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC, t.V3demo)
t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC)
addPeerToProber(t.prober, id.String(), us)
}

View File

@ -112,10 +112,6 @@ func (c *cluster) Bootstrap() error {
"--initial-cluster", clusterStr,
"--initial-cluster-state", "new",
}
if !c.v2Only {
flags = append(flags,
"--experimental-v3demo")
}
if _, err := a.Start(flags...); err != nil {
// cleanup