diff --git a/etcdserver/raft.go b/etcdserver/raft.go index cfe69c3f6..5095e10c0 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -103,7 +103,7 @@ type raftNode struct { // utility ticker <-chan time.Time - raftStorage *raft.MemoryStorage + raftStorage *raftStorage storage Storage // transport specifies the transport to send and receive msgs to members. // Sending messages MUST NOT block. It is okay to drop messages, since @@ -238,7 +238,7 @@ func advanceTicksForElection(n raft.Node, electionTicks int) { } } -func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { +func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raftStorage, w *wal.WAL) { var err error member := cl.MemberByName(cfg.Name) metadata := pbutil.MustMarshal( @@ -263,7 +263,7 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r } id = member.ID plog.Infof("starting member %s in cluster %s", id, cl.ID()) - s = raft.NewMemoryStorage() + s = newRaftStorage() c := &raft.Config{ ID: uint64(id), ElectionTick: cfg.ElectionTicks, @@ -278,7 +278,7 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r return } -func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raftStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -288,7 +288,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit) cl := newCluster("") cl.SetID(cid) - s := raft.NewMemoryStorage() + s := newRaftStorage() if snapshot != nil { s.ApplySnapshot(*snapshot) } @@ -308,7 +308,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust return id, cl, n, s, w } -func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raftStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -340,7 +340,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit) cl := newCluster("") cl.SetID(cid) - s := raft.NewMemoryStorage() + s := newRaftStorage() if snapshot != nil { s.ApplySnapshot(*snapshot) } diff --git a/etcdserver/raft_storage.go b/etcdserver/raft_storage.go new file mode 100644 index 000000000..bbc3010be --- /dev/null +++ b/etcdserver/raft_storage.go @@ -0,0 +1,47 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" +) + +type raftStorage struct { + *raft.MemoryStorage + // snapStore is the place to request snapshot when v3demo is enabled. + // If snapStore is nil, it uses the snapshot saved in MemoryStorage. + snapStore *snapshotStore +} + +func newRaftStorage() *raftStorage { + return &raftStorage{ + MemoryStorage: raft.NewMemoryStorage(), + } +} + +// Snapshot returns raft snapshot. If snapStore is nil, this method +// returns snapshot saved in MemoryStorage. If snapStore exists, this method +// returns snapshot from snapStore. +func (rs *raftStorage) Snapshot() (raftpb.Snapshot, error) { + if rs.snapStore == nil { + return rs.MemoryStorage.Snapshot() + } + snap, err := rs.snapStore.getSnap() + if err != nil { + return raftpb.Snapshot{}, err + } + return snap.raft(), nil +} diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index 7cce36870..336e85e88 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -153,7 +153,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { r := raftNode{ Node: n, storage: &storageRecorder{}, - raftStorage: raft.NewMemoryStorage(), + raftStorage: newRaftStorage(), transport: &nopTransporter{}, } r.start(&EtcdServer{r: r}) diff --git a/etcdserver/server.go b/etcdserver/server.go index dfc31d9f3..e22c11358 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -179,7 +179,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { st := store.New(StoreClusterPrefix, StoreKeysPrefix) var w *wal.WAL var n raft.Node - var s *raft.MemoryStorage + var s *raftStorage var id types.ID var cl *cluster @@ -343,6 +343,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err := srv.kv.Restore(); err != nil { plog.Fatalf("v3 storage restore error: %v", err) } + s.snapStore = newSnapshotStore(cfg.StorageDir(), srv.kv) } // TODO: move transport initialization near the definition of remote diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index fe2d22775..1c041b436 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -522,7 +522,7 @@ func TestDoProposal(t *testing.T) { r: raftNode{ Node: newNodeCommitter(), storage: &storageRecorder{}, - raftStorage: raft.NewMemoryStorage(), + raftStorage: newRaftStorage(), transport: &nopTransporter{}, }, store: st, @@ -661,7 +661,7 @@ func TestSyncTrigger(t *testing.T) { cfg: &ServerConfig{TickMs: 1}, r: raftNode{ Node: n, - raftStorage: raft.NewMemoryStorage(), + raftStorage: newRaftStorage(), transport: &nopTransporter{}, storage: &storageRecorder{}, }, @@ -700,7 +700,7 @@ func TestSyncTrigger(t *testing.T) { // snapshot should snapshot the store and cut the persistent func TestSnapshot(t *testing.T) { - s := raft.NewMemoryStorage() + s := newRaftStorage() s.Append([]raftpb.Entry{{Index: 1}}) st := &storeRecorder{} p := &storageRecorder{} @@ -743,7 +743,7 @@ func TestTriggerSnap(t *testing.T) { snapCount: uint64(snapc), r: raftNode{ Node: newNodeCommitter(), - raftStorage: raft.NewMemoryStorage(), + raftStorage: newRaftStorage(), storage: p, transport: &nopTransporter{}, }, @@ -783,7 +783,7 @@ func TestRecvSnapshot(t *testing.T) { Node: n, transport: &nopTransporter{}, storage: p, - raftStorage: raft.NewMemoryStorage(), + raftStorage: newRaftStorage(), }, store: st, cluster: cl, @@ -812,7 +812,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) { st := &storeRecorder{} cl := newCluster("abc") cl.SetStore(store.New()) - storage := raft.NewMemoryStorage() + storage := newRaftStorage() s := &EtcdServer{ r: raftNode{ Node: n, @@ -860,7 +860,7 @@ func TestAddMember(t *testing.T) { s := &EtcdServer{ r: raftNode{ Node: n, - raftStorage: raft.NewMemoryStorage(), + raftStorage: newRaftStorage(), storage: &storageRecorder{}, transport: &nopTransporter{}, }, @@ -900,7 +900,7 @@ func TestRemoveMember(t *testing.T) { s := &EtcdServer{ r: raftNode{ Node: n, - raftStorage: raft.NewMemoryStorage(), + raftStorage: newRaftStorage(), storage: &storageRecorder{}, transport: &nopTransporter{}, }, @@ -939,7 +939,7 @@ func TestUpdateMember(t *testing.T) { s := &EtcdServer{ r: raftNode{ Node: n, - raftStorage: raft.NewMemoryStorage(), + raftStorage: newRaftStorage(), storage: &storageRecorder{}, transport: &nopTransporter{}, }, diff --git a/etcdserver/snapshot_store.go b/etcdserver/snapshot_store.go new file mode 100644 index 000000000..62eb5304f --- /dev/null +++ b/etcdserver/snapshot_store.go @@ -0,0 +1,126 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path" + + "github.com/coreos/etcd/pkg/fileutil" + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + dstorage "github.com/coreos/etcd/storage" +) + +type snapshot struct { + r raftpb.Snapshot +} + +func (s *snapshot) raft() raftpb.Snapshot { return s.r } + +func (s *snapshot) size() uint64 { return 0 } + +func (s *snapshot) writeTo(w io.Writer) (n int64, err error) { return 0, nil } + +func (s *snapshot) close() {} + +type snapshotStore struct { + // dir to save snapshot data + dir string + kv dstorage.KV + + // send empty to reqsnapc to notify the channel receiver to send back latest + // snapshot to snapc + reqsnapc chan struct{} + // a chan to receive the requested snapshot + // snapshotStore will receive from the chan immediately after it sends empty to reqsnapc + snapc chan raftpb.Snapshot + + snap *snapshot +} + +func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore { + return &snapshotStore{ + dir: dir, + kv: kv, + reqsnapc: make(chan struct{}), + snapc: make(chan raftpb.Snapshot), + } +} + +// getSnap returns a snapshot. +// If there is no available snapshot, ErrSnapshotTemporarilyUnavaliable will be returned. +func (ss *snapshotStore) getSnap() (*snapshot, error) { + if ss.snap != nil { + return nil, raft.ErrSnapshotTemporarilyUnavailable + } + + // ask to generate v2 snapshot + ss.reqsnapc <- struct{}{} + // TODO: generate v3 snapshot at here + raftsnap := <-ss.snapc + ss.snap = &snapshot{ + r: raftsnap, + } + return ss.snap, nil +} + +// saveSnap saves snapshot into disk. +// +// If snapshot has existed in disk, it keeps the original snapshot and returns error. +// The function guarantees that it always saves either complete snapshot or no snapshot, +// even if the call is aborted because program is hard killed. +func (ss *snapshotStore) saveSnap(s *snapshot) error { + f, err := ioutil.TempFile(ss.dir, "tmp") + if err != nil { + return err + } + _, err = s.writeTo(f) + f.Close() + if err != nil { + os.Remove(f.Name()) + return err + } + fn := path.Join(ss.dir, fmt.Sprintf("%016x.db", s.raft().Metadata.Index)) + if fileutil.Exist(fn) { + os.Remove(f.Name()) + return fmt.Errorf("snapshot to save has existed") + } + err = os.Rename(f.Name(), fn) + if err != nil { + os.Remove(f.Name()) + return err + } + return nil +} + +// getSnapFilePath returns the file path for the snapshot with given index. +// If the snapshot does not exist, it returns error. +func (ss *snapshotStore) getSnapFilePath(index uint64) (string, error) { + fns, err := fileutil.ReadDir(ss.dir) + if err != nil { + return "", err + } + wfn := fmt.Sprintf("%016x.db", index) + for _, fn := range fns { + if fn == wfn { + return path.Join(ss.dir, fn), nil + } + } + return "", fmt.Errorf("snapshot file doesn't exist") +}