mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
snap, etcdserver: tighten up snapshot path handling
Computing the snapshot file path is error prone; snapshot recovery was constructing file paths missing a path separator so the snapshot would never be loaded. Instead, refactor the backend path handling to use helper functions where possible.
This commit is contained in:
parent
066062a5e0
commit
f6cd4d4f5b
81
etcdserver/backend.go
Normal file
81
etcdserver/backend.go
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
// Copyright 2017 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package etcdserver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/lease"
|
||||||
|
"github.com/coreos/etcd/mvcc"
|
||||||
|
"github.com/coreos/etcd/mvcc/backend"
|
||||||
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
|
"github.com/coreos/etcd/snap"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newBackend(cfg *ServerConfig) backend.Backend {
|
||||||
|
bcfg := backend.DefaultBackendConfig()
|
||||||
|
bcfg.Path = cfg.backendPath()
|
||||||
|
if cfg.QuotaBackendBytes > 0 && cfg.QuotaBackendBytes != DefaultQuotaBytes {
|
||||||
|
// permit 10% excess over quota for disarm
|
||||||
|
bcfg.MmapSize = uint64(cfg.QuotaBackendBytes + cfg.QuotaBackendBytes/10)
|
||||||
|
}
|
||||||
|
return backend.New(bcfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
|
||||||
|
func openSnapshotBackend(cfg *ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
|
||||||
|
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("database snapshot file path error: %v", err)
|
||||||
|
}
|
||||||
|
if err := os.Rename(snapPath, cfg.backendPath()); err != nil {
|
||||||
|
return nil, fmt.Errorf("rename snapshot file error: %v", err)
|
||||||
|
}
|
||||||
|
return openBackend(cfg), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// openBackend returns a backend using the current etcd db.
|
||||||
|
func openBackend(cfg *ServerConfig) backend.Backend {
|
||||||
|
fn := cfg.backendPath()
|
||||||
|
beOpened := make(chan backend.Backend)
|
||||||
|
go func() {
|
||||||
|
beOpened <- newBackend(cfg)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case be := <-beOpened:
|
||||||
|
return be
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
plog.Warningf("another etcd process is using %q and holds the file lock.", fn)
|
||||||
|
plog.Warningf("waiting for it to exit before starting...")
|
||||||
|
}
|
||||||
|
return <-beOpened
|
||||||
|
}
|
||||||
|
|
||||||
|
// recoverBackendSnapshot recovers the DB from a snapshot in case etcd crashes
|
||||||
|
// before updating the backend db after persisting raft snapshot to disk,
|
||||||
|
// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
|
||||||
|
// case, replace the db with the snapshot db sent by the leader.
|
||||||
|
func recoverSnapshotBackend(cfg *ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
|
||||||
|
var cIndex consistentIndex
|
||||||
|
kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex)
|
||||||
|
defer kv.Close()
|
||||||
|
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
|
||||||
|
return oldbe, nil
|
||||||
|
}
|
||||||
|
oldbe.Close()
|
||||||
|
return openSnapshotBackend(cfg, snap.New(cfg.SnapDir()), snapshot)
|
||||||
|
}
|
@ -200,3 +200,5 @@ func (c *ServerConfig) bootstrapTimeout() time.Duration {
|
|||||||
}
|
}
|
||||||
return time.Second
|
return time.Second
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ServerConfig) backendPath() string { return filepath.Join(c.SnapDir(), "db") }
|
||||||
|
@ -23,7 +23,6 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
|
||||||
"regexp"
|
"regexp"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
@ -76,7 +75,6 @@ const (
|
|||||||
// (since it will timeout).
|
// (since it will timeout).
|
||||||
monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
|
monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
|
||||||
|
|
||||||
databaseFilename = "db"
|
|
||||||
// max number of in-flight snapshot messages etcdserver allows to have
|
// max number of in-flight snapshot messages etcdserver allows to have
|
||||||
// This number is more than enough for most clusters with 5 machines.
|
// This number is more than enough for most clusters with 5 machines.
|
||||||
maxInFlightMsgSnap = 16
|
maxInFlightMsgSnap = 16
|
||||||
@ -200,7 +198,8 @@ type EtcdServer struct {
|
|||||||
|
|
||||||
cluster *membership.RaftCluster
|
cluster *membership.RaftCluster
|
||||||
|
|
||||||
store store.Store
|
store store.Store
|
||||||
|
snapshotter *snap.Snapshotter
|
||||||
|
|
||||||
applyV2 ApplierV2
|
applyV2 ApplierV2
|
||||||
|
|
||||||
@ -271,10 +270,9 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
}
|
}
|
||||||
ss := snap.New(cfg.SnapDir())
|
ss := snap.New(cfg.SnapDir())
|
||||||
|
|
||||||
bepath := filepath.Join(cfg.SnapDir(), databaseFilename)
|
bepath := cfg.backendPath()
|
||||||
beExist := fileutil.Exist(bepath)
|
beExist := fileutil.Exist(bepath)
|
||||||
|
be := openBackend(cfg)
|
||||||
be := openBackend(bepath, cfg.QuotaBackendBytes)
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -372,9 +370,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
plog.Panicf("recovered store from snapshot error: %v", err)
|
plog.Panicf("recovered store from snapshot error: %v", err)
|
||||||
}
|
}
|
||||||
plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
|
plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
|
||||||
|
if be, err = recoverSnapshotBackend(cfg, be, *snapshot); err != nil {
|
||||||
be, err = checkAndRecoverDB(snapshot, be, cfg.QuotaBackendBytes, cfg.SnapDir())
|
|
||||||
if err != nil {
|
|
||||||
plog.Panicf("recovering backend from snapshot error: %v", err)
|
plog.Panicf("recovering backend from snapshot error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -408,11 +404,12 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
|
|
||||||
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
|
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
|
||||||
srv = &EtcdServer{
|
srv = &EtcdServer{
|
||||||
readych: make(chan struct{}),
|
readych: make(chan struct{}),
|
||||||
Cfg: cfg,
|
Cfg: cfg,
|
||||||
snapCount: cfg.SnapCount,
|
snapCount: cfg.SnapCount,
|
||||||
errorc: make(chan error, 1),
|
errorc: make(chan error, 1),
|
||||||
store: st,
|
store: st,
|
||||||
|
snapshotter: ss,
|
||||||
r: *newRaftNode(
|
r: *newRaftNode(
|
||||||
raftNodeConfig{
|
raftNodeConfig{
|
||||||
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
|
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
|
||||||
@ -795,21 +792,14 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
|||||||
apply.snapshot.Metadata.Index, ep.appliedi)
|
apply.snapshot.Metadata.Index, ep.appliedi)
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for raftNode to persist snashot onto the disk
|
// wait for raftNode to persist snapshot onto the disk
|
||||||
<-apply.notifyc
|
<-apply.notifyc
|
||||||
|
|
||||||
snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index)
|
newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Panicf("get database snapshot file path error: %v", err)
|
plog.Panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn := filepath.Join(s.Cfg.SnapDir(), databaseFilename)
|
|
||||||
if err := os.Rename(snapfn, fn); err != nil {
|
|
||||||
plog.Panicf("rename snapshot file error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
newbe := newBackend(fn, s.Cfg.QuotaBackendBytes)
|
|
||||||
|
|
||||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||||
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||||
if s.lessor != nil {
|
if s.lessor != nil {
|
||||||
@ -1662,13 +1652,3 @@ func (s *EtcdServer) goAttach(f func()) {
|
|||||||
f()
|
f()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBackend(path string, quotaBytes int64) backend.Backend {
|
|
||||||
bcfg := backend.DefaultBackendConfig()
|
|
||||||
bcfg.Path = path
|
|
||||||
if quotaBytes > 0 && quotaBytes != DefaultQuotaBytes {
|
|
||||||
// permit 10% excess over quota for disarm
|
|
||||||
bcfg.MmapSize = uint64(quotaBytes + quotaBytes/10)
|
|
||||||
}
|
|
||||||
return backend.New(bcfg)
|
|
||||||
}
|
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"path/filepath"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -29,6 +30,7 @@ import (
|
|||||||
"github.com/coreos/etcd/lease"
|
"github.com/coreos/etcd/lease"
|
||||||
"github.com/coreos/etcd/mvcc"
|
"github.com/coreos/etcd/mvcc"
|
||||||
"github.com/coreos/etcd/mvcc/backend"
|
"github.com/coreos/etcd/mvcc/backend"
|
||||||
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
"github.com/coreos/etcd/pkg/idutil"
|
"github.com/coreos/etcd/pkg/idutil"
|
||||||
"github.com/coreos/etcd/pkg/mock/mockstorage"
|
"github.com/coreos/etcd/pkg/mock/mockstorage"
|
||||||
"github.com/coreos/etcd/pkg/mock/mockstore"
|
"github.com/coreos/etcd/pkg/mock/mockstore"
|
||||||
@ -40,6 +42,7 @@ import (
|
|||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/rafthttp"
|
"github.com/coreos/etcd/rafthttp"
|
||||||
|
"github.com/coreos/etcd/snap"
|
||||||
"github.com/coreos/etcd/store"
|
"github.com/coreos/etcd/store"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
@ -964,13 +967,15 @@ func TestSnapshotOrdering(t *testing.T) {
|
|||||||
t.Fatalf("couldn't open tempdir (%v)", err)
|
t.Fatalf("couldn't open tempdir (%v)", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(testdir)
|
defer os.RemoveAll(testdir)
|
||||||
if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil {
|
|
||||||
|
snapdir := filepath.Join(testdir, "member", "snap")
|
||||||
|
if err := os.MkdirAll(snapdir, 0755); err != nil {
|
||||||
t.Fatalf("couldn't make snap dir (%v)", err)
|
t.Fatalf("couldn't make snap dir (%v)", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
rs := raft.NewMemoryStorage()
|
rs := raft.NewMemoryStorage()
|
||||||
p := mockstorage.NewStorageRecorderStream(testdir)
|
p := mockstorage.NewStorageRecorderStream(testdir)
|
||||||
tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
|
tr, snapDoneC := rafthttp.NewSnapTransporter(snapdir)
|
||||||
r := newRaftNode(raftNodeConfig{
|
r := newRaftNode(raftNodeConfig{
|
||||||
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
|
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
|
||||||
Node: n,
|
Node: n,
|
||||||
@ -982,10 +987,11 @@ func TestSnapshotOrdering(t *testing.T) {
|
|||||||
Cfg: &ServerConfig{
|
Cfg: &ServerConfig{
|
||||||
DataDir: testdir,
|
DataDir: testdir,
|
||||||
},
|
},
|
||||||
r: *r,
|
r: *r,
|
||||||
store: st,
|
store: st,
|
||||||
cluster: cl,
|
snapshotter: snap.New(snapdir),
|
||||||
SyncTicker: &time.Ticker{},
|
cluster: cl,
|
||||||
|
SyncTicker: &time.Ticker{},
|
||||||
}
|
}
|
||||||
s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}
|
s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}
|
||||||
|
|
||||||
@ -997,40 +1003,30 @@ func TestSnapshotOrdering(t *testing.T) {
|
|||||||
s.start()
|
s.start()
|
||||||
defer s.Stop()
|
defer s.Stop()
|
||||||
|
|
||||||
actionc := p.Chan()
|
|
||||||
n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
|
n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
|
||||||
if ac := <-actionc; ac.Name != "Save" {
|
go func() {
|
||||||
// MsgSnap triggers raftNode to call Save()
|
// get the snapshot sent by the transport
|
||||||
t.Fatalf("expect save() is called, but got %v", ac.Name)
|
snapMsg := <-snapDoneC
|
||||||
|
// Snapshot first triggers raftnode to persists the snapshot onto disk
|
||||||
|
// before renaming db snapshot file to db
|
||||||
|
snapMsg.Snapshot.Metadata.Index = 1
|
||||||
|
n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if ac := <-p.Chan(); ac.Name != "Save" {
|
||||||
|
t.Fatalf("expected Save, got %+v", ac)
|
||||||
}
|
}
|
||||||
|
if ac := <-p.Chan(); ac.Name != "Save" {
|
||||||
// get the snapshot sent by the transport
|
t.Fatalf("expected Save, got %+v", ac)
|
||||||
snapMsg := <-snapDoneC
|
}
|
||||||
|
// confirm snapshot file still present before calling SaveSnap
|
||||||
// Snapshot first triggers raftnode to persists the snapshot onto disk
|
snapPath := filepath.Join(snapdir, fmt.Sprintf("%016x.snap.db", 1))
|
||||||
// before renaming db snapshot file to db
|
if !fileutil.Exist(snapPath) {
|
||||||
snapMsg.Snapshot.Metadata.Index = 1
|
t.Fatalf("expected file %q, got missing", snapPath)
|
||||||
n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
|
}
|
||||||
var seenSaveSnap bool
|
// unblock SaveSnapshot, etcdserver now permitted to move snapshot file
|
||||||
timer := time.After(5 * time.Second)
|
if ac := <-p.Chan(); ac.Name != "SaveSnap" {
|
||||||
for {
|
t.Fatalf("expected SaveSnap, got %+v", ac)
|
||||||
select {
|
|
||||||
case ac := <-actionc:
|
|
||||||
switch ac.Name {
|
|
||||||
// DBFilePath() is called immediately before snapshot renaming.
|
|
||||||
case "DBFilePath":
|
|
||||||
if !seenSaveSnap {
|
|
||||||
t.Fatalf("DBFilePath called before SaveSnap")
|
|
||||||
}
|
|
||||||
return
|
|
||||||
case "SaveSnap":
|
|
||||||
seenSaveSnap = true
|
|
||||||
default:
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
case <-timer:
|
|
||||||
t.Fatalf("timeout waiting on actions")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1119,10 +1115,11 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
|||||||
Cfg: &ServerConfig{
|
Cfg: &ServerConfig{
|
||||||
DataDir: testdir,
|
DataDir: testdir,
|
||||||
},
|
},
|
||||||
r: *r,
|
r: *r,
|
||||||
store: st,
|
store: st,
|
||||||
cluster: cl,
|
snapshotter: snap.New(testdir),
|
||||||
SyncTicker: &time.Ticker{},
|
cluster: cl,
|
||||||
|
SyncTicker: &time.Ticker{},
|
||||||
}
|
}
|
||||||
s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}
|
s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}
|
||||||
|
|
||||||
|
@ -32,9 +32,6 @@ type Storage interface {
|
|||||||
Save(st raftpb.HardState, ents []raftpb.Entry) error
|
Save(st raftpb.HardState, ents []raftpb.Entry) error
|
||||||
// SaveSnap function saves snapshot to the underlying stable storage.
|
// SaveSnap function saves snapshot to the underlying stable storage.
|
||||||
SaveSnap(snap raftpb.Snapshot) error
|
SaveSnap(snap raftpb.Snapshot) error
|
||||||
// DBFilePath returns the file path of database snapshot saved with given
|
|
||||||
// id.
|
|
||||||
DBFilePath(id uint64) (string, error)
|
|
||||||
// Close closes the Storage and performs finalization.
|
// Close closes the Storage and performs finalization.
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
@ -15,18 +15,11 @@
|
|||||||
package etcdserver
|
package etcdserver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/etcdserver/membership"
|
"github.com/coreos/etcd/etcdserver/membership"
|
||||||
"github.com/coreos/etcd/lease"
|
|
||||||
"github.com/coreos/etcd/mvcc"
|
|
||||||
"github.com/coreos/etcd/mvcc/backend"
|
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
|
||||||
"github.com/coreos/etcd/rafthttp"
|
"github.com/coreos/etcd/rafthttp"
|
||||||
"github.com/coreos/etcd/snap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// isConnectedToQuorumSince checks whether the local member is connected to the
|
// isConnectedToQuorumSince checks whether the local member is connected to the
|
||||||
@ -102,55 +95,3 @@ func (nc *notifier) notify(err error) {
|
|||||||
nc.err = err
|
nc.err = err
|
||||||
close(nc.c)
|
close(nc.c)
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkAndRecoverDB attempts to recover db in the scenario when
|
|
||||||
// etcd server crashes before updating its in-state db
|
|
||||||
// and after persisting snapshot to disk from syncing with leader,
|
|
||||||
// snapshot can be newer than db where
|
|
||||||
// (snapshot.Metadata.Index > db.consistentIndex ).
|
|
||||||
//
|
|
||||||
// when that happen:
|
|
||||||
// 1. find xxx.snap.db that matches snap index.
|
|
||||||
// 2. rename xxx.snap.db to db.
|
|
||||||
// 3. open the new db as the backend.
|
|
||||||
func checkAndRecoverDB(snapshot *raftpb.Snapshot, oldbe backend.Backend, quotaBackendBytes int64, snapdir string) (be backend.Backend, err error) {
|
|
||||||
var cIndex consistentIndex
|
|
||||||
kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex)
|
|
||||||
defer kv.Close()
|
|
||||||
kvindex := kv.ConsistentIndex()
|
|
||||||
if snapshot.Metadata.Index <= kvindex {
|
|
||||||
return oldbe, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
id := snapshot.Metadata.Index
|
|
||||||
snapfn, err := snap.DBFilePathFromID(snapdir, id)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("finding %v error: %v", snapdir+fmt.Sprintf("%016x.snap.db", id), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
bepath := snapdir + databaseFilename
|
|
||||||
if err := os.Rename(snapfn, bepath); err != nil {
|
|
||||||
return nil, fmt.Errorf("rename snapshot file error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
oldbe.Close()
|
|
||||||
be = openBackend(bepath, quotaBackendBytes)
|
|
||||||
return be, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func openBackend(bepath string, quotaBackendBytes int64) (be backend.Backend) {
|
|
||||||
beOpened := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
be = newBackend(bepath, quotaBackendBytes)
|
|
||||||
beOpened <- struct{}{}
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-beOpened:
|
|
||||||
case <-time.After(time.Second):
|
|
||||||
plog.Warningf("another etcd process is running with the same data dir and holding the file lock.")
|
|
||||||
plog.Warningf("waiting for it to exit before starting...")
|
|
||||||
<-beOpened
|
|
||||||
}
|
|
||||||
return be
|
|
||||||
}
|
|
||||||
|
@ -15,8 +15,6 @@
|
|||||||
package mockstorage
|
package mockstorage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/testutil"
|
"github.com/coreos/etcd/pkg/testutil"
|
||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
@ -47,13 +45,4 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *storageRecorder) DBFilePath(id uint64) (string, error) {
|
|
||||||
p.Record(testutil.Action{Name: "DBFilePath"})
|
|
||||||
path := p.dbPath
|
|
||||||
if path != "" {
|
|
||||||
path = path + "/"
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("%s%016x.snap.db", path, id), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *storageRecorder) Close() error { return nil }
|
func (p *storageRecorder) Close() error { return nil }
|
||||||
|
20
snap/db.go
20
snap/db.go
@ -44,7 +44,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
|
|||||||
os.Remove(f.Name())
|
os.Remove(f.Name())
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
fn := filepath.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
|
fn := s.dbFilePath(id)
|
||||||
if fileutil.Exist(fn) {
|
if fileutil.Exist(fn) {
|
||||||
os.Remove(f.Name())
|
os.Remove(f.Name())
|
||||||
return n, nil
|
return n, nil
|
||||||
@ -63,19 +63,15 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
|
|||||||
// DBFilePath returns the file path for the snapshot of the database with
|
// DBFilePath returns the file path for the snapshot of the database with
|
||||||
// given id. If the snapshot does not exist, it returns error.
|
// given id. If the snapshot does not exist, it returns error.
|
||||||
func (s *Snapshotter) DBFilePath(id uint64) (string, error) {
|
func (s *Snapshotter) DBFilePath(id uint64) (string, error) {
|
||||||
return DBFilePathFromID(s.dir, id)
|
if _, err := fileutil.ReadDir(s.dir); err != nil {
|
||||||
}
|
|
||||||
|
|
||||||
func DBFilePathFromID(dbPath string, id uint64) (string, error) {
|
|
||||||
fns, err := fileutil.ReadDir(dbPath)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
wfn := fmt.Sprintf("%016x.snap.db", id)
|
if fn := s.dbFilePath(id); fileutil.Exist(fn) {
|
||||||
for _, fn := range fns {
|
return fn, nil
|
||||||
if fn == wfn {
|
|
||||||
return filepath.Join(dbPath, fn), nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return "", ErrNoDBSnapshot
|
return "", ErrNoDBSnapshot
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Snapshotter) dbFilePath(id uint64) string {
|
||||||
|
return filepath.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user