mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #9748 from gyuho/rename
*: clean up logging, move internal "snap" package to "etcdserver"
This commit is contained in:
commit
346589edd1
8
.words
8
.words
@ -8,7 +8,7 @@ MiB
|
||||
ResourceExhausted
|
||||
RPC
|
||||
RPCs
|
||||
TODO
|
||||
|
||||
WithRequireLeader
|
||||
args
|
||||
backoff
|
||||
@ -61,3 +61,9 @@ unbuffered
|
||||
nils
|
||||
reconnection
|
||||
mutators
|
||||
ConsistentIndexGetter
|
||||
OutputWALDir
|
||||
WAL
|
||||
consistentIndex
|
||||
todo
|
||||
saveWALAndSnap
|
||||
|
@ -35,7 +35,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [
|
||||
- Make [Lease `Lookup` non-blocking with concurrent `Grant`/`Revoke`](https://github.com/coreos/etcd/pull/9229).
|
||||
- Make etcd server return `raft.ErrProposalDropped` on internal Raft proposal drop in [v3 applier](https://github.com/coreos/etcd/pull/9549) and [v2 applier](https://github.com/coreos/etcd/pull/9558).
|
||||
- e.g. a node is removed from cluster, or [`raftpb.MsgProp` arrives at current leader while there is an ongoing leadership transfer](https://github.com/coreos/etcd/issues/8975).
|
||||
- Add [`snapshot`](https://github.com/coreos/etcd/pull/9118) package for easier snapshot workflow (see [`godoc.org/github.com/etcd/snapshot`](https://godoc.org/github.com/coreos/etcd/snapshot) for more).
|
||||
- Add [`snapshot`](https://github.com/coreos/etcd/pull/9118) package for easier snapshot workflow (see [`godoc.org/github.com/etcd/clientv3/snapshot`](https://godoc.org/github.com/coreos/etcd/clientv3/snapshot) for more).
|
||||
- Improve [functional tester](https://github.com/coreos/etcd/tree/master/functional) coverage: [proxy layer to run network fault tests in CI](https://github.com/coreos/etcd/pull/9081), [TLS is enabled both for server and client](https://github.com/coreos/etcd/pull/9534), [liveness mode](https://github.com/coreos/etcd/issues/9230), [shuffle test sequence](https://github.com/coreos/etcd/issues/9381), [membership reconfiguration failure cases](https://github.com/coreos/etcd/pull/9564), [disastrous quorum loss and snapshot recover from a seed member](https://github.com/coreos/etcd/pull/9565), [embedded etcd](https://github.com/coreos/etcd/pull/9572).
|
||||
- Improve [index compaction blocking](https://github.com/coreos/etcd/pull/9511) by using a copy on write clone to avoid holding the lock for the traversal of the entire index.
|
||||
|
||||
@ -87,10 +87,10 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [
|
||||
- Previously, `Create(dirpath string, metadata []byte) (*WAL, error)`, now `Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error)`.
|
||||
- Remove [`pkg/cors` package](https://github.com/coreos/etcd/pull/9490).
|
||||
- Change [`--experimental-enable-v2v3`](TODO) flag to `--enable-v2v3`; v2 storage emulation is now stable.
|
||||
- Move internal package `"github.com/coreos/etcd/snap"` to [`"github.com/coreos/etcd/raftsnap"`](https://github.com/coreos/etcd/pull/9211).
|
||||
- Move internal package `"github.com/coreos/etcd/etcdserver/auth"` to `"github.com/coreos/etcd/etcdserver/api/v2auth"`.
|
||||
- Move internal package `"github.com/coreos/etcd/etcdserver/stats"` to `"github.com/coreos/etcd/etcdserver/api/v2stats"`.
|
||||
- Move internal package `"github.com/coreos/etcd/error"` to `"github.com/coreos/etcd/etcdserver/api/v2error"`.
|
||||
- Move internal package `"github.com/coreos/etcd/snap"` to `"github.com/coreos/etcd/etcdserver/api/snap"`.
|
||||
- Move internal package `"github.com/coreos/etcd/store"` to `"github.com/coreos/etcd/etcdserver/api/v2store"`.
|
||||
|
||||
### Dependency
|
||||
@ -208,7 +208,7 @@ See [security doc](https://github.com/coreos/etcd/blob/master/Documentation/op-g
|
||||
|
||||
### API
|
||||
|
||||
- Add [`snapshot`](https://github.com/coreos/etcd/pull/9118) package for snapshot restore/save operations (see [`godoc.org/github.com/etcd/snapshot`](https://godoc.org/github.com/coreos/etcd/snapshot) for more).
|
||||
- Add [`snapshot`](https://github.com/coreos/etcd/pull/9118) package for snapshot restore/save operations (see [`godoc.org/github.com/etcd/clientv3/snapshot`](https://godoc.org/github.com/coreos/etcd/clientv3/snapshot) for more).
|
||||
- Add [`watch_id` field to `etcdserverpb.WatchCreateRequest`](https://github.com/coreos/etcd/pull/9065) to allow user-provided watch ID to `mvcc`.
|
||||
- Corresponding `watch_id` is returned via `etcdserverpb.WatchResponse`, if any.
|
||||
- Add [`fragment` field to `etcdserverpb.WatchCreateRequest`](https://github.com/coreos/etcd/pull/9291) to request etcd server to [split watch events](https://github.com/coreos/etcd/issues/9294) when the total size of events exceeds `--max-request-bytes` flag value plus gRPC-overhead 512 bytes.
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2store"
|
||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/etcdserver/membership"
|
||||
@ -39,7 +40,6 @@ import (
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
"github.com/coreos/etcd/wal"
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
|
||||
@ -477,7 +477,7 @@ func (s *v3Manager) saveWALAndSnap() error {
|
||||
},
|
||||
},
|
||||
}
|
||||
sn := raftsnap.New(s.lg, s.snapDir)
|
||||
sn := snap.New(s.lg, s.snapDir)
|
||||
if err := sn.SaveSnap(raftSnap); err != nil {
|
||||
return err
|
||||
}
|
@ -21,7 +21,7 @@ import (
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
)
|
||||
|
||||
// a key-value store backed by raft
|
||||
@ -29,7 +29,7 @@ type kvstore struct {
|
||||
proposeC chan<- string // channel for proposing updates
|
||||
mu sync.RWMutex
|
||||
kvStore map[string]string // current committed key-value pairs
|
||||
snapshotter *raftsnap.Snapshotter
|
||||
snapshotter *snap.Snapshotter
|
||||
}
|
||||
|
||||
type kv struct {
|
||||
@ -37,7 +37,7 @@ type kv struct {
|
||||
Val string
|
||||
}
|
||||
|
||||
func newKVStore(snapshotter *raftsnap.Snapshotter, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore {
|
||||
func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore {
|
||||
s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter}
|
||||
// replay log into key-value map
|
||||
s.readCommits(commitC, errorC)
|
||||
@ -67,7 +67,7 @@ func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
|
||||
// done replaying log; new data incoming
|
||||
// OR signaled to load snapshot
|
||||
snapshot, err := s.snapshotter.Load()
|
||||
if err == raftsnap.ErrNoSnapshot {
|
||||
if err == snap.ErrNoSnapshot {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
|
@ -24,13 +24,13 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
stats "github.com/coreos/etcd/etcdserver/api/v2stats"
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
"github.com/coreos/etcd/wal"
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
|
||||
@ -61,8 +61,8 @@ type raftNode struct {
|
||||
raftStorage *raft.MemoryStorage
|
||||
wal *wal.WAL
|
||||
|
||||
snapshotter *raftsnap.Snapshotter
|
||||
snapshotterReady chan *raftsnap.Snapshotter // signals when snapshotter is ready
|
||||
snapshotter *snap.Snapshotter
|
||||
snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready
|
||||
|
||||
snapCount uint64
|
||||
transport *rafthttp.Transport
|
||||
@ -79,7 +79,7 @@ var defaultSnapshotCount uint64 = 10000
|
||||
// commit channel, followed by a nil message (to indicate the channel is
|
||||
// current), then new log entries. To shutdown, close proposeC and read errorC.
|
||||
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
|
||||
confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *raftsnap.Snapshotter) {
|
||||
confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) {
|
||||
|
||||
commitC := make(chan *string)
|
||||
errorC := make(chan error)
|
||||
@ -100,7 +100,7 @@ func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte,
|
||||
httpstopc: make(chan struct{}),
|
||||
httpdonec: make(chan struct{}),
|
||||
|
||||
snapshotterReady: make(chan *raftsnap.Snapshotter, 1),
|
||||
snapshotterReady: make(chan *snap.Snapshotter, 1),
|
||||
// rest of structure populated after WAL replay
|
||||
}
|
||||
go rc.startRaft()
|
||||
@ -190,7 +190,7 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
|
||||
|
||||
func (rc *raftNode) loadSnapshot() *raftpb.Snapshot {
|
||||
snapshot, err := rc.snapshotter.Load()
|
||||
if err != nil && err != raftsnap.ErrNoSnapshot {
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
log.Fatalf("raftexample: error loading snapshot (%v)", err)
|
||||
}
|
||||
return snapshot
|
||||
@ -263,7 +263,7 @@ func (rc *raftNode) startRaft() {
|
||||
log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
|
||||
}
|
||||
}
|
||||
rc.snapshotter = raftsnap.New(zap.NewExample(), rc.snapdir)
|
||||
rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)
|
||||
rc.snapshotterReady <- rc.snapshotter
|
||||
|
||||
oldwal := wal.Exist(rc.waldir)
|
||||
|
@ -23,13 +23,13 @@ import (
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/etcdserver/membership"
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
"github.com/coreos/etcd/pkg/idutil"
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
"github.com/coreos/etcd/wal"
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
|
||||
@ -103,14 +103,14 @@ func handleBackup(c *cli.Context) error {
|
||||
}
|
||||
|
||||
func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) {
|
||||
ss := raftsnap.New(zap.NewExample(), srcSnap)
|
||||
ss := snap.New(zap.NewExample(), srcSnap)
|
||||
snapshot, err := ss.Load()
|
||||
if err != nil && err != raftsnap.ErrNoSnapshot {
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if snapshot != nil {
|
||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||
newss := raftsnap.New(zap.NewExample(), destSnap)
|
||||
newss := snap.New(zap.NewExample(), destSnap)
|
||||
if err = newss.SaveSnap(*snapshot); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api"
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2error"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2store"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
@ -37,7 +38,6 @@ import (
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
"github.com/coreos/etcd/wal"
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
|
||||
@ -136,9 +136,9 @@ func rebuildStoreV2() (v2store.Store, uint64) {
|
||||
}
|
||||
snapdir := filepath.Join(migrateDatadir, "member", "snap")
|
||||
|
||||
ss := raftsnap.New(zap.NewExample(), snapdir)
|
||||
ss := snap.New(zap.NewExample(), snapdir)
|
||||
snapshot, err := ss.Load()
|
||||
if err != nil && err != raftsnap.ErrNoSnapshot {
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
|
||||
|
@ -20,8 +20,8 @@ import (
|
||||
"strings"
|
||||
|
||||
v3 "github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/clientv3/snapshot"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/snapshot"
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
)
|
||||
|
@ -18,9 +18,9 @@ import (
|
||||
"fmt"
|
||||
|
||||
v3 "github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/clientv3/snapshot"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
spb "github.com/coreos/etcd/mvcc/mvccpb"
|
||||
"github.com/coreos/etcd/snapshot"
|
||||
)
|
||||
|
||||
type fieldsPrinter struct{ printer }
|
||||
|
@ -19,7 +19,7 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/coreos/etcd/snapshot"
|
||||
"github.com/coreos/etcd/clientv3/snapshot"
|
||||
)
|
||||
|
||||
type jsonPrinter struct{ printer }
|
||||
|
@ -20,9 +20,9 @@ import (
|
||||
"strings"
|
||||
|
||||
v3 "github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/clientv3/snapshot"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/snapshot"
|
||||
)
|
||||
|
||||
type simplePrinter struct {
|
||||
|
@ -18,7 +18,7 @@ import (
|
||||
"os"
|
||||
|
||||
v3 "github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/snapshot"
|
||||
"github.com/coreos/etcd/clientv3/snapshot"
|
||||
|
||||
"github.com/olekukonko/tablewriter"
|
||||
)
|
||||
|
@ -20,7 +20,7 @@ import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/coreos/etcd/snapshot"
|
||||
"github.com/coreos/etcd/clientv3/snapshot"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/zap"
|
||||
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package raftsnap
|
||||
package snap
|
||||
|
||||
import (
|
||||
"errors"
|
@ -12,6 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package raftsnap handles Raft nodes' states with snapshots.
|
||||
// Package snap handles Raft nodes' states with snapshots.
|
||||
// The snapshot logic is internal to etcd server and raft package.
|
||||
package raftsnap
|
||||
package snap
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package raftsnap
|
||||
package snap
|
||||
|
||||
import (
|
||||
"io"
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package raftsnap
|
||||
package snap
|
||||
|
||||
import "github.com/prometheus/client_golang/prometheus"
|
||||
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package raftsnap
|
||||
package snap
|
||||
|
||||
import (
|
||||
"errors"
|
||||
@ -25,19 +25,17 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap/snappb"
|
||||
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/raftsnap/snappb"
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
snapSuffix = ".snap"
|
||||
)
|
||||
const snapSuffix = ".snap"
|
||||
|
||||
var (
|
||||
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "snap")
|
||||
@ -82,20 +80,29 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
|
||||
d, err := snap.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second))
|
||||
}
|
||||
|
||||
err = pioutil.WriteAndSyncFile(filepath.Join(s.dir, fname), d, 0666)
|
||||
if err == nil {
|
||||
saveDurations.Observe(float64(time.Since(start)) / float64(time.Second))
|
||||
} else {
|
||||
err1 := os.Remove(filepath.Join(s.dir, fname))
|
||||
if err1 != nil {
|
||||
plog.Errorf("failed to remove broken snapshot file %s", filepath.Join(s.dir, fname))
|
||||
marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second))
|
||||
|
||||
spath := filepath.Join(s.dir, fname)
|
||||
err = pioutil.WriteAndSyncFile(spath, d, 0666)
|
||||
if err != nil {
|
||||
if s.lg != nil {
|
||||
s.lg.Warn("failed to write a snap file", zap.String("path", spath), zap.Error(err))
|
||||
}
|
||||
rerr := os.Remove(spath)
|
||||
if rerr != nil {
|
||||
if s.lg != nil {
|
||||
s.lg.Warn("failed to remove a broken snap file", zap.String("path", spath), zap.Error(err))
|
||||
} else {
|
||||
plog.Errorf("failed to remove broken snapshot file %s", spath)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
return err
|
||||
|
||||
saveDurations.Observe(float64(time.Since(start)) / float64(time.Second))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
|
||||
@ -119,7 +126,21 @@ func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) {
|
||||
fpath := filepath.Join(dir, name)
|
||||
snap, err := Read(lg, fpath)
|
||||
if err != nil {
|
||||
renameBroken(fpath)
|
||||
brokenPath := fpath + ".broken"
|
||||
if lg != nil {
|
||||
lg.Warn("failed to read a snap file", zap.String("path", fpath), zap.Error(err))
|
||||
}
|
||||
if rerr := os.Rename(fpath, brokenPath); rerr != nil {
|
||||
if lg != nil {
|
||||
lg.Warn("failed to rename a broken snap file", zap.String("path", fpath), zap.String("broken-path", brokenPath), zap.Error(rerr))
|
||||
} else {
|
||||
plog.Warningf("cannot rename broken snapshot file %v to %v: %v", fpath, brokenPath, rerr)
|
||||
}
|
||||
} else {
|
||||
if lg != nil {
|
||||
lg.Warn("renamed to a broken snap file", zap.String("path", fpath), zap.String("broken-path", brokenPath))
|
||||
}
|
||||
}
|
||||
}
|
||||
return snap, err
|
||||
}
|
||||
@ -129,7 +150,7 @@ func Read(lg *zap.Logger, snapname string) (*raftpb.Snapshot, error) {
|
||||
b, err := ioutil.ReadFile(snapname)
|
||||
if err != nil {
|
||||
if lg != nil {
|
||||
lg.Warn("failed to read snapshot file", zap.String("path", snapname), zap.Error(err))
|
||||
lg.Warn("failed to read a snap file", zap.String("path", snapname), zap.Error(err))
|
||||
} else {
|
||||
plog.Errorf("cannot read file %v: %v", snapname, err)
|
||||
}
|
||||
@ -167,7 +188,7 @@ func Read(lg *zap.Logger, snapname string) (*raftpb.Snapshot, error) {
|
||||
crc := crc32.Update(0, crcTable, serializedSnap.Data)
|
||||
if crc != serializedSnap.Crc {
|
||||
if lg != nil {
|
||||
lg.Warn("found corrupted snapshot file",
|
||||
lg.Warn("snap file is corrupt",
|
||||
zap.String("path", snapname),
|
||||
zap.Uint32("prev-crc", serializedSnap.Crc),
|
||||
zap.Uint32("new-crc", crc),
|
||||
@ -220,7 +241,7 @@ func checkSuffix(lg *zap.Logger, names []string) []string {
|
||||
// a vaild file. If not throw out a warning.
|
||||
if _, ok := validFiles[names[i]]; !ok {
|
||||
if lg != nil {
|
||||
lg.Warn("found unexpected non-snapshot file; skipping", zap.String("path", names[i]))
|
||||
lg.Warn("found unexpected non-snap file; skipping", zap.String("path", names[i]))
|
||||
} else {
|
||||
plog.Warningf("skipped unexpected non snapshot file %v", names[i])
|
||||
}
|
||||
@ -229,10 +250,3 @@ func checkSuffix(lg *zap.Logger, names []string) []string {
|
||||
}
|
||||
return snaps
|
||||
}
|
||||
|
||||
func renameBroken(path string) {
|
||||
brokenPath := path + ".broken"
|
||||
if err := os.Rename(path, brokenPath); err != nil {
|
||||
plog.Warningf("cannot rename broken snapshot file %v to %v: %v", path, brokenPath, err)
|
||||
}
|
||||
}
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package raftsnap
|
||||
package snap
|
||||
|
||||
import (
|
||||
"fmt"
|
@ -19,11 +19,11 @@ import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/mvcc"
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -40,7 +40,7 @@ func newBackend(cfg ServerConfig) backend.Backend {
|
||||
}
|
||||
|
||||
// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
|
||||
func openSnapshotBackend(cfg ServerConfig, ss *raftsnap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
|
||||
func openSnapshotBackend(cfg ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
|
||||
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to find database snapshot file (%v)", err)
|
||||
@ -95,5 +95,5 @@ func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot ra
|
||||
return oldbe, nil
|
||||
}
|
||||
oldbe.Close()
|
||||
return openSnapshotBackend(cfg, raftsnap.New(cfg.Logger, cfg.SnapDir()), snapshot)
|
||||
return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot)
|
||||
}
|
||||
|
@ -34,6 +34,7 @@ import (
|
||||
"github.com/coreos/etcd/compactor"
|
||||
"github.com/coreos/etcd/discovery"
|
||||
"github.com/coreos/etcd/etcdserver/api"
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
|
||||
stats "github.com/coreos/etcd/etcdserver/api/v2stats"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2store"
|
||||
@ -53,7 +54,6 @@ import (
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
"github.com/coreos/etcd/version"
|
||||
"github.com/coreos/etcd/wal"
|
||||
|
||||
@ -219,7 +219,7 @@ type EtcdServer struct {
|
||||
cluster *membership.RaftCluster
|
||||
|
||||
v2store v2store.Store
|
||||
snapshotter *raftsnap.Snapshotter
|
||||
snapshotter *snap.Snapshotter
|
||||
|
||||
applyV2 ApplierV2
|
||||
|
||||
@ -312,7 +312,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
||||
plog.Fatalf("create snapshot directory error: %v", err)
|
||||
}
|
||||
}
|
||||
ss := raftsnap.New(cfg.Logger, cfg.SnapDir())
|
||||
ss := snap.New(cfg.Logger, cfg.SnapDir())
|
||||
|
||||
bepath := cfg.backendPath()
|
||||
beExist := fileutil.Exist(bepath)
|
||||
@ -417,7 +417,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
||||
}
|
||||
}
|
||||
snapshot, err = ss.Load()
|
||||
if err != nil && err != raftsnap.ErrNoSnapshot {
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
return nil, err
|
||||
}
|
||||
if snapshot != nil {
|
||||
@ -1838,7 +1838,7 @@ func (s *EtcdServer) publish(timeout time.Duration) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) sendMergedSnap(merged raftsnap.Message) {
|
||||
func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
|
||||
atomic.AddInt64(&s.inflightSnapshots, 1)
|
||||
|
||||
lg := s.getLogger()
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2store"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/etcdserver/membership"
|
||||
@ -48,7 +49,6 @@ import (
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
)
|
||||
|
||||
// TestDoLocalAction tests requests which do not need to go through raft to be applied,
|
||||
@ -1036,7 +1036,7 @@ func TestSnapshotOrdering(t *testing.T) {
|
||||
Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
|
||||
r: *r,
|
||||
v2store: st,
|
||||
snapshotter: raftsnap.New(zap.NewExample(), snapdir),
|
||||
snapshotter: snap.New(zap.NewExample(), snapdir),
|
||||
cluster: cl,
|
||||
SyncTicker: &time.Ticker{},
|
||||
}
|
||||
@ -1167,7 +1167,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
||||
Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
|
||||
r: *r,
|
||||
v2store: st,
|
||||
snapshotter: raftsnap.New(zap.NewExample(), testdir),
|
||||
snapshotter: snap.New(zap.NewExample(), testdir),
|
||||
cluster: cl,
|
||||
SyncTicker: &time.Ticker{},
|
||||
}
|
||||
@ -1738,7 +1738,7 @@ func newNopTransporter() rafthttp.Transporter {
|
||||
func (s *nopTransporter) Start() error { return nil }
|
||||
func (s *nopTransporter) Handler() http.Handler { return nil }
|
||||
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
||||
func (s *nopTransporter) SendSnapshot(m raftsnap.Message) {}
|
||||
func (s *nopTransporter) SendSnapshot(m snap.Message) {}
|
||||
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
||||
@ -1752,18 +1752,18 @@ func (s *nopTransporter) Resume() {}
|
||||
|
||||
type snapTransporter struct {
|
||||
nopTransporter
|
||||
snapDoneC chan raftsnap.Message
|
||||
snapDoneC chan snap.Message
|
||||
snapDir string
|
||||
}
|
||||
|
||||
func newSnapTransporter(snapDir string) (rafthttp.Transporter, <-chan raftsnap.Message) {
|
||||
ch := make(chan raftsnap.Message, 1)
|
||||
func newSnapTransporter(snapDir string) (rafthttp.Transporter, <-chan snap.Message) {
|
||||
ch := make(chan snap.Message, 1)
|
||||
tr := &snapTransporter{snapDoneC: ch, snapDir: snapDir}
|
||||
return tr, ch
|
||||
}
|
||||
|
||||
func (s *snapTransporter) SendSnapshot(m raftsnap.Message) {
|
||||
ss := raftsnap.New(zap.NewExample(), s.snapDir)
|
||||
func (s *snapTransporter) SendSnapshot(m snap.Message) {
|
||||
ss := snap.New(zap.NewExample(), s.snapDir)
|
||||
ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1)
|
||||
m.CloseWithError(nil)
|
||||
s.snapDoneC <- m
|
||||
|
@ -17,9 +17,9 @@ package etcdserver
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"go.uber.org/zap"
|
||||
@ -28,7 +28,7 @@ import (
|
||||
// createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
|
||||
// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
|
||||
// as ReadCloser.
|
||||
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) raftsnap.Message {
|
||||
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
|
||||
// get a snapshot of v2 store as []byte
|
||||
clone := s.v2store.Clone()
|
||||
d, err := clone.SaveNoCopy()
|
||||
@ -58,7 +58,7 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi
|
||||
}
|
||||
m.Snapshot = snapshot
|
||||
|
||||
return *raftsnap.NewMessage(m, rc, dbsnap.Size())
|
||||
return *snap.NewMessage(m, rc, dbsnap.Size())
|
||||
}
|
||||
|
||||
func newSnapshotReaderCloser(lg *zap.Logger, snapshot backend.Snapshot) io.ReadCloser {
|
||||
|
@ -17,11 +17,11 @@ package etcdserver
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
"github.com/coreos/etcd/wal"
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
|
||||
@ -40,10 +40,10 @@ type Storage interface {
|
||||
|
||||
type storage struct {
|
||||
*wal.WAL
|
||||
*raftsnap.Snapshotter
|
||||
*snap.Snapshotter
|
||||
}
|
||||
|
||||
func NewStorage(w *wal.WAL, s *raftsnap.Snapshotter) Storage {
|
||||
func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
|
||||
return &storage{w, s}
|
||||
}
|
||||
|
||||
|
@ -21,11 +21,11 @@ import (
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/etcdserver/membership"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
)
|
||||
|
||||
func TestLongestConnected(t *testing.T) {
|
||||
@ -78,7 +78,7 @@ func newNopTransporterWithActiveTime(memberIDs []types.ID) rafthttp.Transporter
|
||||
func (s *nopTransporterWithActiveTime) Start() error { return nil }
|
||||
func (s *nopTransporterWithActiveTime) Handler() http.Handler { return nil }
|
||||
func (s *nopTransporterWithActiveTime) Send(m []raftpb.Message) {}
|
||||
func (s *nopTransporterWithActiveTime) SendSnapshot(m raftsnap.Message) {}
|
||||
func (s *nopTransporterWithActiveTime) SendSnapshot(m snap.Message) {}
|
||||
func (s *nopTransporterWithActiveTime) AddRemote(id types.ID, us []string) {}
|
||||
func (s *nopTransporterWithActiveTime) AddPeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporterWithActiveTime) RemovePeer(id types.ID) {}
|
||||
|
@ -23,9 +23,9 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/clientv3/snapshot"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/snapshot"
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
"go.uber.org/zap"
|
||||
|
@ -23,10 +23,10 @@ import (
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
"github.com/coreos/etcd/version"
|
||||
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
@ -168,13 +168,13 @@ type snapshotHandler struct {
|
||||
lg *zap.Logger
|
||||
tr Transporter
|
||||
r Raft
|
||||
snapshotter *raftsnap.Snapshotter
|
||||
snapshotter *snap.Snapshotter
|
||||
|
||||
localID types.ID
|
||||
cid types.ID
|
||||
}
|
||||
|
||||
func newSnapshotHandler(t *Transport, r Raft, snapshotter *raftsnap.Snapshotter, cid types.ID) http.Handler {
|
||||
func newSnapshotHandler(t *Transport, r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler {
|
||||
return &snapshotHandler{
|
||||
lg: t.Logger,
|
||||
tr: t,
|
||||
|
@ -26,10 +26,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
"github.com/coreos/etcd/version"
|
||||
|
||||
"go.uber.org/zap"
|
||||
@ -358,7 +358,7 @@ func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
|
||||
|
||||
type fakePeer struct {
|
||||
msgs []raftpb.Message
|
||||
snapMsgs []raftsnap.Message
|
||||
snapMsgs []snap.Message
|
||||
peerURLs types.URLs
|
||||
connc chan *outgoingConn
|
||||
paused bool
|
||||
@ -379,7 +379,7 @@ func (pr *fakePeer) send(m raftpb.Message) {
|
||||
pr.msgs = append(pr.msgs, m)
|
||||
}
|
||||
|
||||
func (pr *fakePeer) sendSnap(m raftsnap.Message) {
|
||||
func (pr *fakePeer) sendSnap(m snap.Message) {
|
||||
if pr.paused {
|
||||
return
|
||||
}
|
||||
|
@ -19,11 +19,11 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
stats "github.com/coreos/etcd/etcdserver/api/v2stats"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/time/rate"
|
||||
@ -64,7 +64,7 @@ type Peer interface {
|
||||
|
||||
// sendSnap sends the merged snapshot message to the remote peer. Its behavior
|
||||
// is similar to send.
|
||||
sendSnap(m raftsnap.Message)
|
||||
sendSnap(m snap.Message)
|
||||
|
||||
// update updates the urls of remote peer.
|
||||
update(urls types.URLs)
|
||||
@ -280,7 +280,7 @@ func (p *peer) send(m raftpb.Message) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *peer) sendSnap(m raftsnap.Message) {
|
||||
func (p *peer) sendSnap(m snap.Message) {
|
||||
go p.snapSender.send(m)
|
||||
}
|
||||
|
||||
|
@ -22,11 +22,11 @@ import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/pkg/httputil"
|
||||
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
"go.uber.org/zap"
|
||||
@ -66,7 +66,7 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe
|
||||
|
||||
func (s *snapshotSender) stop() { close(s.stopc) }
|
||||
|
||||
func (s *snapshotSender) send(merged raftsnap.Message) {
|
||||
func (s *snapshotSender) send(merged snap.Message) {
|
||||
m := merged.Message
|
||||
|
||||
body := createSnapBody(s.tr.Logger, merged)
|
||||
@ -177,7 +177,7 @@ func (s *snapshotSender) post(req *http.Request) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
func createSnapBody(lg *zap.Logger, merged raftsnap.Message) io.ReadCloser {
|
||||
func createSnapBody(lg *zap.Logger, merged snap.Message) io.ReadCloser {
|
||||
buf := new(bytes.Buffer)
|
||||
enc := &messageEncoder{w: buf}
|
||||
// encode raft message
|
||||
|
@ -25,9 +25,9 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -84,7 +84,7 @@ func TestSnapshotSend(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
sent, files := testSnapshotSend(t, raftsnap.NewMessage(tt.m, tt.rc, tt.size))
|
||||
sent, files := testSnapshotSend(t, snap.NewMessage(tt.m, tt.rc, tt.size))
|
||||
if tt.wsent != sent {
|
||||
t.Errorf("#%d: snapshot expected %v, got %v", i, tt.wsent, sent)
|
||||
}
|
||||
@ -94,7 +94,7 @@ func TestSnapshotSend(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func testSnapshotSend(t *testing.T, sm *raftsnap.Message) (bool, []os.FileInfo) {
|
||||
func testSnapshotSend(t *testing.T, sm *snap.Message) (bool, []os.FileInfo) {
|
||||
d, err := ioutil.TempDir(os.TempDir(), "snapdir")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -104,7 +104,7 @@ func testSnapshotSend(t *testing.T, sm *raftsnap.Message) (bool, []os.FileInfo)
|
||||
r := &fakeRaft{}
|
||||
tr := &Transport{pipelineRt: &http.Transport{}, ClusterID: types.ID(1), Raft: r}
|
||||
ch := make(chan struct{}, 1)
|
||||
h := &syncHandler{newSnapshotHandler(tr, r, raftsnap.New(zap.NewExample(), d), types.ID(1)), ch}
|
||||
h := &syncHandler{newSnapshotHandler(tr, r, snap.New(zap.NewExample(), d), types.ID(1)), ch}
|
||||
srv := httptest.NewServer(h)
|
||||
defer srv.Close()
|
||||
|
||||
|
@ -20,13 +20,13 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
stats "github.com/coreos/etcd/etcdserver/api/v2stats"
|
||||
"github.com/coreos/etcd/pkg/logutil"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"github.com/xiang90/probing"
|
||||
@ -61,7 +61,7 @@ type Transporter interface {
|
||||
Send(m []raftpb.Message)
|
||||
// SendSnapshot sends out the given snapshot message to a remote peer.
|
||||
// The behavior of SendSnapshot is similar to Send.
|
||||
SendSnapshot(m raftsnap.Message)
|
||||
SendSnapshot(m snap.Message)
|
||||
// AddRemote adds a remote with given peer urls into the transport.
|
||||
// A remote helps newly joined member to catch up the progress of cluster,
|
||||
// and will not be used after that.
|
||||
@ -112,7 +112,7 @@ type Transport struct {
|
||||
URLs types.URLs // local peer URLs
|
||||
ClusterID types.ID // raft cluster ID for request validation
|
||||
Raft Raft // raft state machine, to which the Transport forwards received messages and reports status
|
||||
Snapshotter *raftsnap.Snapshotter
|
||||
Snapshotter *snap.Snapshotter
|
||||
ServerStats *stats.ServerStats // used to record general transportation statistics
|
||||
// used to record transportation statistics with followers when
|
||||
// performing as leader in raft protocol
|
||||
@ -412,7 +412,7 @@ func (t *Transport) ActiveSince(id types.ID) time.Time {
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
func (t *Transport) SendSnapshot(m raftsnap.Message) {
|
||||
func (t *Transport) SendSnapshot(m snap.Message) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
p := t.peers[types.ID(m.To)]
|
||||
|
@ -16,7 +16,7 @@ if [[ $(protoc --version | cut -f2 -d' ') != "3.5.1" ]]; then
|
||||
fi
|
||||
|
||||
# directories containing protos to be built
|
||||
DIRS="./wal/walpb ./etcdserver/etcdserverpb ./raftsnap/snappb ./raft/raftpb ./mvcc/mvccpb ./lease/leasepb ./auth/authpb ./etcdserver/api/v3lock/v3lockpb ./etcdserver/api/v3election/v3electionpb"
|
||||
DIRS="./wal/walpb ./etcdserver/etcdserverpb ./etcdserver/api/snap/snappb ./raft/raftpb ./mvcc/mvccpb ./lease/leasepb ./auth/authpb ./etcdserver/api/v3lock/v3lockpb ./etcdserver/api/v3election/v3electionpb"
|
||||
|
||||
# exact version of packages to build
|
||||
GOGO_PROTO_SHA="41168f6614b7bb144818ec8967b8c702705df564"
|
||||
|
@ -25,9 +25,9 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3/snapshot"
|
||||
"github.com/coreos/etcd/pkg/expect"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"github.com/coreos/etcd/snapshot"
|
||||
)
|
||||
|
||||
func TestCtlV3Snapshot(t *testing.T) { testCtl(t, snapshotTest) }
|
||||
|
@ -22,11 +22,11 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
"github.com/coreos/etcd/wal"
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
|
||||
@ -65,10 +65,10 @@ func main() {
|
||||
walsnap.Index = *index
|
||||
} else {
|
||||
if *snapfile == "" {
|
||||
ss := raftsnap.New(zap.NewExample(), snapDir(dataDir))
|
||||
ss := snap.New(zap.NewExample(), snapDir(dataDir))
|
||||
snapshot, err = ss.Load()
|
||||
} else {
|
||||
snapshot, err = raftsnap.Read(zap.NewExample(), filepath.Join(snapDir(dataDir), *snapfile))
|
||||
snapshot, err = snap.Read(zap.NewExample(), filepath.Join(snapDir(dataDir), *snapfile))
|
||||
}
|
||||
|
||||
switch err {
|
||||
@ -77,7 +77,7 @@ func main() {
|
||||
nodes := genIDSlice(snapshot.Metadata.ConfState.Nodes)
|
||||
fmt.Printf("Snapshot:\nterm=%d index=%d nodes=%s\n",
|
||||
walsnap.Term, walsnap.Index, nodes)
|
||||
case raftsnap.ErrNoSnapshot:
|
||||
case snap.ErrNoSnapshot:
|
||||
fmt.Printf("Snapshot:\nempty\n")
|
||||
default:
|
||||
log.Fatalf("Failed loading snapshot: %v", err)
|
||||
|
Loading…
x
Reference in New Issue
Block a user