Merge pull request #3614 from yichengq/snapshot-store

etcdserver: add snapshotStore and raftStorage
This commit is contained in:
Yicheng Qin 2015-10-01 19:35:34 -07:00
commit ccce61bda9
6 changed files with 192 additions and 18 deletions

View File

@ -103,7 +103,7 @@ type raftNode struct {
// utility
ticker <-chan time.Time
raftStorage *raft.MemoryStorage
raftStorage *raftStorage
storage Storage
// transport specifies the transport to send and receive msgs to members.
// Sending messages MUST NOT block. It is okay to drop messages, since
@ -238,7 +238,7 @@ func advanceTicksForElection(n raft.Node, electionTicks int) {
}
}
func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raftStorage, w *wal.WAL) {
var err error
member := cl.MemberByName(cfg.Name)
metadata := pbutil.MustMarshal(
@ -263,7 +263,7 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r
}
id = member.ID
plog.Infof("starting member %s in cluster %s", id, cl.ID())
s = raft.NewMemoryStorage()
s = newRaftStorage()
c := &raft.Config{
ID: uint64(id),
ElectionTick: cfg.ElectionTicks,
@ -278,7 +278,7 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r
return
}
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raftStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
@ -288,7 +288,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust
plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := newCluster("")
cl.SetID(cid)
s := raft.NewMemoryStorage()
s := newRaftStorage()
if snapshot != nil {
s.ApplySnapshot(*snapshot)
}
@ -308,7 +308,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust
return id, cl, n, s, w
}
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raftStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
@ -340,7 +340,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := newCluster("")
cl.SetID(cid)
s := raft.NewMemoryStorage()
s := newRaftStorage()
if snapshot != nil {
s.ApplySnapshot(*snapshot)
}

View File

@ -0,0 +1,47 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
import (
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
)
type raftStorage struct {
*raft.MemoryStorage
// snapStore is the place to request snapshot when v3demo is enabled.
// If snapStore is nil, it uses the snapshot saved in MemoryStorage.
snapStore *snapshotStore
}
func newRaftStorage() *raftStorage {
return &raftStorage{
MemoryStorage: raft.NewMemoryStorage(),
}
}
// Snapshot returns raft snapshot. If snapStore is nil, this method
// returns snapshot saved in MemoryStorage. If snapStore exists, this method
// returns snapshot from snapStore.
func (rs *raftStorage) Snapshot() (raftpb.Snapshot, error) {
if rs.snapStore == nil {
return rs.MemoryStorage.Snapshot()
}
snap, err := rs.snapStore.getSnap()
if err != nil {
return raftpb.Snapshot{}, err
}
return snap.raft(), nil
}

View File

@ -153,7 +153,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
r := raftNode{
Node: n,
storage: &storageRecorder{},
raftStorage: raft.NewMemoryStorage(),
raftStorage: newRaftStorage(),
transport: &nopTransporter{},
}
r.start(&EtcdServer{r: r})

View File

@ -179,7 +179,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
st := store.New(StoreClusterPrefix, StoreKeysPrefix)
var w *wal.WAL
var n raft.Node
var s *raft.MemoryStorage
var s *raftStorage
var id types.ID
var cl *cluster
@ -343,6 +343,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := srv.kv.Restore(); err != nil {
plog.Fatalf("v3 storage restore error: %v", err)
}
s.snapStore = newSnapshotStore(cfg.StorageDir(), srv.kv)
}
// TODO: move transport initialization near the definition of remote

View File

@ -522,7 +522,7 @@ func TestDoProposal(t *testing.T) {
r: raftNode{
Node: newNodeCommitter(),
storage: &storageRecorder{},
raftStorage: raft.NewMemoryStorage(),
raftStorage: newRaftStorage(),
transport: &nopTransporter{},
},
store: st,
@ -661,7 +661,7 @@ func TestSyncTrigger(t *testing.T) {
cfg: &ServerConfig{TickMs: 1},
r: raftNode{
Node: n,
raftStorage: raft.NewMemoryStorage(),
raftStorage: newRaftStorage(),
transport: &nopTransporter{},
storage: &storageRecorder{},
},
@ -700,7 +700,7 @@ func TestSyncTrigger(t *testing.T) {
// snapshot should snapshot the store and cut the persistent
func TestSnapshot(t *testing.T) {
s := raft.NewMemoryStorage()
s := newRaftStorage()
s.Append([]raftpb.Entry{{Index: 1}})
st := &storeRecorder{}
p := &storageRecorder{}
@ -743,7 +743,7 @@ func TestTriggerSnap(t *testing.T) {
snapCount: uint64(snapc),
r: raftNode{
Node: newNodeCommitter(),
raftStorage: raft.NewMemoryStorage(),
raftStorage: newRaftStorage(),
storage: p,
transport: &nopTransporter{},
},
@ -783,7 +783,7 @@ func TestRecvSnapshot(t *testing.T) {
Node: n,
transport: &nopTransporter{},
storage: p,
raftStorage: raft.NewMemoryStorage(),
raftStorage: newRaftStorage(),
},
store: st,
cluster: cl,
@ -812,7 +812,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
st := &storeRecorder{}
cl := newCluster("abc")
cl.SetStore(store.New())
storage := raft.NewMemoryStorage()
storage := newRaftStorage()
s := &EtcdServer{
r: raftNode{
Node: n,
@ -860,7 +860,7 @@ func TestAddMember(t *testing.T) {
s := &EtcdServer{
r: raftNode{
Node: n,
raftStorage: raft.NewMemoryStorage(),
raftStorage: newRaftStorage(),
storage: &storageRecorder{},
transport: &nopTransporter{},
},
@ -900,7 +900,7 @@ func TestRemoveMember(t *testing.T) {
s := &EtcdServer{
r: raftNode{
Node: n,
raftStorage: raft.NewMemoryStorage(),
raftStorage: newRaftStorage(),
storage: &storageRecorder{},
transport: &nopTransporter{},
},
@ -939,7 +939,7 @@ func TestUpdateMember(t *testing.T) {
s := &EtcdServer{
r: raftNode{
Node: n,
raftStorage: raft.NewMemoryStorage(),
raftStorage: newRaftStorage(),
storage: &storageRecorder{},
transport: &nopTransporter{},
},

View File

@ -0,0 +1,126 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
import (
"fmt"
"io"
"io/ioutil"
"os"
"path"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
dstorage "github.com/coreos/etcd/storage"
)
type snapshot struct {
r raftpb.Snapshot
}
func (s *snapshot) raft() raftpb.Snapshot { return s.r }
func (s *snapshot) size() uint64 { return 0 }
func (s *snapshot) writeTo(w io.Writer) (n int64, err error) { return 0, nil }
func (s *snapshot) close() {}
type snapshotStore struct {
// dir to save snapshot data
dir string
kv dstorage.KV
// send empty to reqsnapc to notify the channel receiver to send back latest
// snapshot to snapc
reqsnapc chan struct{}
// a chan to receive the requested snapshot
// snapshotStore will receive from the chan immediately after it sends empty to reqsnapc
snapc chan raftpb.Snapshot
snap *snapshot
}
func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore {
return &snapshotStore{
dir: dir,
kv: kv,
reqsnapc: make(chan struct{}),
snapc: make(chan raftpb.Snapshot),
}
}
// getSnap returns a snapshot.
// If there is no available snapshot, ErrSnapshotTemporarilyUnavaliable will be returned.
func (ss *snapshotStore) getSnap() (*snapshot, error) {
if ss.snap != nil {
return nil, raft.ErrSnapshotTemporarilyUnavailable
}
// ask to generate v2 snapshot
ss.reqsnapc <- struct{}{}
// TODO: generate v3 snapshot at here
raftsnap := <-ss.snapc
ss.snap = &snapshot{
r: raftsnap,
}
return ss.snap, nil
}
// saveSnap saves snapshot into disk.
//
// If snapshot has existed in disk, it keeps the original snapshot and returns error.
// The function guarantees that it always saves either complete snapshot or no snapshot,
// even if the call is aborted because program is hard killed.
func (ss *snapshotStore) saveSnap(s *snapshot) error {
f, err := ioutil.TempFile(ss.dir, "tmp")
if err != nil {
return err
}
_, err = s.writeTo(f)
f.Close()
if err != nil {
os.Remove(f.Name())
return err
}
fn := path.Join(ss.dir, fmt.Sprintf("%016x.db", s.raft().Metadata.Index))
if fileutil.Exist(fn) {
os.Remove(f.Name())
return fmt.Errorf("snapshot to save has existed")
}
err = os.Rename(f.Name(), fn)
if err != nil {
os.Remove(f.Name())
return err
}
return nil
}
// getSnapFilePath returns the file path for the snapshot with given index.
// If the snapshot does not exist, it returns error.
func (ss *snapshotStore) getSnapFilePath(index uint64) (string, error) {
fns, err := fileutil.ReadDir(ss.dir)
if err != nil {
return "", err
}
wfn := fmt.Sprintf("%016x.db", index)
for _, fn := range fns {
if fn == wfn {
return path.Join(ss.dir, fn), nil
}
}
return "", fmt.Errorf("snapshot file doesn't exist")
}