mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: use new "snap" import paths
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
parent
a0b4ecbd4c
commit
1e4f56114e
@ -19,11 +19,11 @@ import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/mvcc"
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -40,7 +40,7 @@ func newBackend(cfg ServerConfig) backend.Backend {
|
||||
}
|
||||
|
||||
// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
|
||||
func openSnapshotBackend(cfg ServerConfig, ss *raftsnap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
|
||||
func openSnapshotBackend(cfg ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
|
||||
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to find database snapshot file (%v)", err)
|
||||
@ -95,5 +95,5 @@ func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot ra
|
||||
return oldbe, nil
|
||||
}
|
||||
oldbe.Close()
|
||||
return openSnapshotBackend(cfg, raftsnap.New(cfg.Logger, cfg.SnapDir()), snapshot)
|
||||
return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot)
|
||||
}
|
||||
|
@ -34,6 +34,7 @@ import (
|
||||
"github.com/coreos/etcd/compactor"
|
||||
"github.com/coreos/etcd/discovery"
|
||||
"github.com/coreos/etcd/etcdserver/api"
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
|
||||
stats "github.com/coreos/etcd/etcdserver/api/v2stats"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2store"
|
||||
@ -53,7 +54,6 @@ import (
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
"github.com/coreos/etcd/version"
|
||||
"github.com/coreos/etcd/wal"
|
||||
|
||||
@ -219,7 +219,7 @@ type EtcdServer struct {
|
||||
cluster *membership.RaftCluster
|
||||
|
||||
v2store v2store.Store
|
||||
snapshotter *raftsnap.Snapshotter
|
||||
snapshotter *snap.Snapshotter
|
||||
|
||||
applyV2 ApplierV2
|
||||
|
||||
@ -312,7 +312,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
||||
plog.Fatalf("create snapshot directory error: %v", err)
|
||||
}
|
||||
}
|
||||
ss := raftsnap.New(cfg.Logger, cfg.SnapDir())
|
||||
ss := snap.New(cfg.Logger, cfg.SnapDir())
|
||||
|
||||
bepath := cfg.backendPath()
|
||||
beExist := fileutil.Exist(bepath)
|
||||
@ -417,7 +417,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
||||
}
|
||||
}
|
||||
snapshot, err = ss.Load()
|
||||
if err != nil && err != raftsnap.ErrNoSnapshot {
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
return nil, err
|
||||
}
|
||||
if snapshot != nil {
|
||||
@ -1838,7 +1838,7 @@ func (s *EtcdServer) publish(timeout time.Duration) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) sendMergedSnap(merged raftsnap.Message) {
|
||||
func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
|
||||
atomic.AddInt64(&s.inflightSnapshots, 1)
|
||||
|
||||
lg := s.getLogger()
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2store"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/etcdserver/membership"
|
||||
@ -48,7 +49,6 @@ import (
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
)
|
||||
|
||||
// TestDoLocalAction tests requests which do not need to go through raft to be applied,
|
||||
@ -1036,7 +1036,7 @@ func TestSnapshotOrdering(t *testing.T) {
|
||||
Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
|
||||
r: *r,
|
||||
v2store: st,
|
||||
snapshotter: raftsnap.New(zap.NewExample(), snapdir),
|
||||
snapshotter: snap.New(zap.NewExample(), snapdir),
|
||||
cluster: cl,
|
||||
SyncTicker: &time.Ticker{},
|
||||
}
|
||||
@ -1167,7 +1167,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
||||
Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
|
||||
r: *r,
|
||||
v2store: st,
|
||||
snapshotter: raftsnap.New(zap.NewExample(), testdir),
|
||||
snapshotter: snap.New(zap.NewExample(), testdir),
|
||||
cluster: cl,
|
||||
SyncTicker: &time.Ticker{},
|
||||
}
|
||||
@ -1738,7 +1738,7 @@ func newNopTransporter() rafthttp.Transporter {
|
||||
func (s *nopTransporter) Start() error { return nil }
|
||||
func (s *nopTransporter) Handler() http.Handler { return nil }
|
||||
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
||||
func (s *nopTransporter) SendSnapshot(m raftsnap.Message) {}
|
||||
func (s *nopTransporter) SendSnapshot(m snap.Message) {}
|
||||
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
||||
@ -1752,18 +1752,18 @@ func (s *nopTransporter) Resume() {}
|
||||
|
||||
type snapTransporter struct {
|
||||
nopTransporter
|
||||
snapDoneC chan raftsnap.Message
|
||||
snapDoneC chan snap.Message
|
||||
snapDir string
|
||||
}
|
||||
|
||||
func newSnapTransporter(snapDir string) (rafthttp.Transporter, <-chan raftsnap.Message) {
|
||||
ch := make(chan raftsnap.Message, 1)
|
||||
func newSnapTransporter(snapDir string) (rafthttp.Transporter, <-chan snap.Message) {
|
||||
ch := make(chan snap.Message, 1)
|
||||
tr := &snapTransporter{snapDoneC: ch, snapDir: snapDir}
|
||||
return tr, ch
|
||||
}
|
||||
|
||||
func (s *snapTransporter) SendSnapshot(m raftsnap.Message) {
|
||||
ss := raftsnap.New(zap.NewExample(), s.snapDir)
|
||||
func (s *snapTransporter) SendSnapshot(m snap.Message) {
|
||||
ss := snap.New(zap.NewExample(), s.snapDir)
|
||||
ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1)
|
||||
m.CloseWithError(nil)
|
||||
s.snapDoneC <- m
|
||||
|
@ -17,9 +17,9 @@ package etcdserver
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"go.uber.org/zap"
|
||||
@ -28,7 +28,7 @@ import (
|
||||
// createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
|
||||
// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
|
||||
// as ReadCloser.
|
||||
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) raftsnap.Message {
|
||||
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
|
||||
// get a snapshot of v2 store as []byte
|
||||
clone := s.v2store.Clone()
|
||||
d, err := clone.SaveNoCopy()
|
||||
@ -58,7 +58,7 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi
|
||||
}
|
||||
m.Snapshot = snapshot
|
||||
|
||||
return *raftsnap.NewMessage(m, rc, dbsnap.Size())
|
||||
return *snap.NewMessage(m, rc, dbsnap.Size())
|
||||
}
|
||||
|
||||
func newSnapshotReaderCloser(lg *zap.Logger, snapshot backend.Snapshot) io.ReadCloser {
|
||||
|
@ -17,11 +17,11 @@ package etcdserver
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
"github.com/coreos/etcd/wal"
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
|
||||
@ -40,10 +40,10 @@ type Storage interface {
|
||||
|
||||
type storage struct {
|
||||
*wal.WAL
|
||||
*raftsnap.Snapshotter
|
||||
*snap.Snapshotter
|
||||
}
|
||||
|
||||
func NewStorage(w *wal.WAL, s *raftsnap.Snapshotter) Storage {
|
||||
func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
|
||||
return &storage{w, s}
|
||||
}
|
||||
|
||||
|
@ -21,11 +21,11 @@ import (
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/snap"
|
||||
"github.com/coreos/etcd/etcdserver/membership"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
"github.com/coreos/etcd/raftsnap"
|
||||
)
|
||||
|
||||
func TestLongestConnected(t *testing.T) {
|
||||
@ -78,7 +78,7 @@ func newNopTransporterWithActiveTime(memberIDs []types.ID) rafthttp.Transporter
|
||||
func (s *nopTransporterWithActiveTime) Start() error { return nil }
|
||||
func (s *nopTransporterWithActiveTime) Handler() http.Handler { return nil }
|
||||
func (s *nopTransporterWithActiveTime) Send(m []raftpb.Message) {}
|
||||
func (s *nopTransporterWithActiveTime) SendSnapshot(m raftsnap.Message) {}
|
||||
func (s *nopTransporterWithActiveTime) SendSnapshot(m snap.Message) {}
|
||||
func (s *nopTransporterWithActiveTime) AddRemote(id types.ID, us []string) {}
|
||||
func (s *nopTransporterWithActiveTime) AddPeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporterWithActiveTime) RemovePeer(id types.ID) {}
|
||||
|
Loading…
x
Reference in New Issue
Block a user