etcdserver, embed, integration: don't use pointer for ServerConfig

ServerConfig is owned by etdcserver and unshared, so don't pass or store by
pointer. Also removes duplicated field 'snapCount'.
This commit is contained in:
Anthony Romano 2017-05-10 13:55:06 -07:00
parent 1f206c027a
commit dcf52bbfac
6 changed files with 27 additions and 40 deletions

View File

@ -119,7 +119,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
} }
} }
srvcfg := &etcdserver.ServerConfig{ srvcfg := etcdserver.ServerConfig{
Name: cfg.Name, Name: cfg.Name,
ClientURLs: cfg.ACUrls, ClientURLs: cfg.ACUrls,
PeerURLs: cfg.APUrls, PeerURLs: cfg.APUrls,

View File

@ -26,7 +26,7 @@ import (
"github.com/coreos/etcd/snap" "github.com/coreos/etcd/snap"
) )
func newBackend(cfg *ServerConfig) backend.Backend { func newBackend(cfg ServerConfig) backend.Backend {
bcfg := backend.DefaultBackendConfig() bcfg := backend.DefaultBackendConfig()
bcfg.Path = cfg.backendPath() bcfg.Path = cfg.backendPath()
if cfg.QuotaBackendBytes > 0 && cfg.QuotaBackendBytes != DefaultQuotaBytes { if cfg.QuotaBackendBytes > 0 && cfg.QuotaBackendBytes != DefaultQuotaBytes {
@ -37,7 +37,7 @@ func newBackend(cfg *ServerConfig) backend.Backend {
} }
// openSnapshotBackend renames a snapshot db to the current etcd db and opens it. // openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
func openSnapshotBackend(cfg *ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) { func openSnapshotBackend(cfg ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index) snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
if err != nil { if err != nil {
return nil, fmt.Errorf("database snapshot file path error: %v", err) return nil, fmt.Errorf("database snapshot file path error: %v", err)
@ -49,7 +49,7 @@ func openSnapshotBackend(cfg *ServerConfig, ss *snap.Snapshotter, snapshot raftp
} }
// openBackend returns a backend using the current etcd db. // openBackend returns a backend using the current etcd db.
func openBackend(cfg *ServerConfig) backend.Backend { func openBackend(cfg ServerConfig) backend.Backend {
fn := cfg.backendPath() fn := cfg.backendPath()
beOpened := make(chan backend.Backend) beOpened := make(chan backend.Backend)
go func() { go func() {
@ -69,7 +69,7 @@ func openBackend(cfg *ServerConfig) backend.Backend {
// before updating the backend db after persisting raft snapshot to disk, // before updating the backend db after persisting raft snapshot to disk,
// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this // violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
// case, replace the db with the snapshot db sent by the leader. // case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg *ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) { func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
var cIndex consistentIndex var cIndex consistentIndex
kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex) kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex)
defer kv.Close() defer kv.Close()

View File

@ -378,7 +378,7 @@ func advanceTicksForElection(n raft.Node, electionTicks int) {
} }
} }
func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
var err error var err error
member := cl.MemberByName(cfg.Name) member := cl.MemberByName(cfg.Name)
metadata := pbutil.MustMarshal( metadata := pbutil.MustMarshal(
@ -419,7 +419,7 @@ func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (i
return return
} }
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot var walsnap walpb.Snapshot
if snapshot != nil { if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
@ -453,7 +453,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membe
return id, cl, n, s, w return id, cl, n, s, w
} }
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot var walsnap walpb.Snapshot
if snapshot != nil { if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term

View File

@ -170,12 +170,10 @@ type EtcdServer struct {
// consistIndex used to hold the offset of current executing entry // consistIndex used to hold the offset of current executing entry
// It is initialized to 0 before executing any entry. // It is initialized to 0 before executing any entry.
consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned. consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned.
Cfg *ServerConfig r raftNode // uses 64-bit atomics; keep 64-bit aligned.
readych chan struct{} readych chan struct{}
r raftNode Cfg ServerConfig
snapCount uint64
w wait.Wait w wait.Wait
@ -250,7 +248,7 @@ type EtcdServer struct {
// NewServer creates a new EtcdServer from the supplied configuration. The // NewServer creates a new EtcdServer from the supplied configuration. The
// configuration is considered static for the lifetime of the EtcdServer. // configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
st := store.New(StoreClusterPrefix, StoreKeysPrefix) st := store.New(StoreClusterPrefix, StoreKeysPrefix)
var ( var (
@ -408,7 +406,6 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
srv = &EtcdServer{ srv = &EtcdServer{
readych: make(chan struct{}), readych: make(chan struct{}),
Cfg: cfg, Cfg: cfg,
snapCount: cfg.SnapCount,
errorc: make(chan error, 1), errorc: make(chan error, 1),
store: st, store: st,
snapshotter: ss, snapshotter: ss,
@ -530,9 +527,9 @@ func (s *EtcdServer) Start() {
// modify a server's fields after it has been sent to Start. // modify a server's fields after it has been sent to Start.
// This function is just used for testing. // This function is just used for testing.
func (s *EtcdServer) start() { func (s *EtcdServer) start() {
if s.snapCount == 0 { if s.Cfg.SnapCount == 0 {
plog.Infof("set snapshot count to default %d", DefaultSnapCount) plog.Infof("set snapshot count to default %d", DefaultSnapCount)
s.snapCount = DefaultSnapCount s.Cfg.SnapCount = DefaultSnapCount
} }
s.w = wait.New() s.w = wait.New()
s.applyWait = wait.NewTimeList() s.applyWait = wait.NewTimeList()
@ -915,7 +912,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
} }
func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
if ep.appliedi-ep.snapi <= s.snapCount { if ep.appliedi-ep.snapi <= s.Cfg.SnapCount {
return return
} }

View File

@ -178,7 +178,6 @@ func TestApplyRepeat(t *testing.T) {
}) })
s := &EtcdServer{ s := &EtcdServer{
r: *r, r: *r,
Cfg: &ServerConfig{},
store: st, store: st,
cluster: cl, cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -530,7 +529,6 @@ func TestApplyConfChangeError(t *testing.T) {
srv := &EtcdServer{ srv := &EtcdServer{
r: *newRaftNode(raftNodeConfig{Node: n}), r: *newRaftNode(raftNodeConfig{Node: n}),
cluster: cl, cluster: cl,
Cfg: &ServerConfig{},
} }
_, err := srv.applyConfChange(tt.cc, nil) _, err := srv.applyConfChange(tt.cc, nil)
if err != tt.werr { if err != tt.werr {
@ -685,7 +683,7 @@ func TestDoProposal(t *testing.T) {
transport: rafthttp.NewNopTransporter(), transport: rafthttp.NewNopTransporter(),
}) })
srv := &EtcdServer{ srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1}, Cfg: ServerConfig{TickMs: 1},
r: *r, r: *r,
store: st, store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -713,7 +711,7 @@ func TestDoProposal(t *testing.T) {
func TestDoProposalCancelled(t *testing.T) { func TestDoProposalCancelled(t *testing.T) {
wt := mockwait.NewRecorder() wt := mockwait.NewRecorder()
srv := &EtcdServer{ srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1}, Cfg: ServerConfig{TickMs: 1},
r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
w: wt, w: wt,
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -735,7 +733,7 @@ func TestDoProposalCancelled(t *testing.T) {
func TestDoProposalTimeout(t *testing.T) { func TestDoProposalTimeout(t *testing.T) {
srv := &EtcdServer{ srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1}, Cfg: ServerConfig{TickMs: 1},
r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
w: mockwait.NewNop(), w: mockwait.NewNop(),
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -751,7 +749,7 @@ func TestDoProposalTimeout(t *testing.T) {
func TestDoProposalStopped(t *testing.T) { func TestDoProposalStopped(t *testing.T) {
srv := &EtcdServer{ srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1}, Cfg: ServerConfig{TickMs: 1},
r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
w: mockwait.NewNop(), w: mockwait.NewNop(),
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -855,7 +853,7 @@ func TestSyncTrigger(t *testing.T) {
}) })
srv := &EtcdServer{ srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1}, Cfg: ServerConfig{TickMs: 1},
r: *r, r: *r,
store: mockstore.NewNop(), store: mockstore.NewNop(),
SyncTicker: tk, SyncTicker: tk,
@ -913,7 +911,6 @@ func TestSnapshot(t *testing.T) {
storage: p, storage: p,
}) })
srv := &EtcdServer{ srv := &EtcdServer{
Cfg: &ServerConfig{},
r: *r, r: *r,
store: st, store: st,
} }
@ -984,9 +981,7 @@ func TestSnapshotOrdering(t *testing.T) {
raftStorage: rs, raftStorage: rs,
}) })
s := &EtcdServer{ s := &EtcdServer{
Cfg: &ServerConfig{ Cfg: ServerConfig{DataDir: testdir},
DataDir: testdir,
},
r: *r, r: *r,
store: st, store: st,
snapshotter: snap.New(snapdir), snapshotter: snap.New(snapdir),
@ -1047,8 +1042,7 @@ func TestTriggerSnap(t *testing.T) {
transport: rafthttp.NewNopTransporter(), transport: rafthttp.NewNopTransporter(),
}) })
srv := &EtcdServer{ srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1}, Cfg: ServerConfig{TickMs: 1, SnapCount: uint64(snapc)},
snapCount: uint64(snapc),
r: *r, r: *r,
store: st, store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -1112,9 +1106,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
raftStorage: rs, raftStorage: rs,
}) })
s := &EtcdServer{ s := &EtcdServer{
Cfg: &ServerConfig{ Cfg: ServerConfig{DataDir: testdir},
DataDir: testdir,
},
r: *r, r: *r,
store: st, store: st,
snapshotter: snap.New(testdir), snapshotter: snap.New(testdir),
@ -1199,7 +1191,6 @@ func TestAddMember(t *testing.T) {
}) })
s := &EtcdServer{ s := &EtcdServer{
r: *r, r: *r,
Cfg: &ServerConfig{},
store: st, store: st,
cluster: cl, cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -1241,7 +1232,6 @@ func TestRemoveMember(t *testing.T) {
}) })
s := &EtcdServer{ s := &EtcdServer{
r: *r, r: *r,
Cfg: &ServerConfig{},
store: st, store: st,
cluster: cl, cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -1316,7 +1306,7 @@ func TestPublish(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
srv := &EtcdServer{ srv := &EtcdServer{
readych: make(chan struct{}), readych: make(chan struct{}),
Cfg: &ServerConfig{TickMs: 1}, Cfg: ServerConfig{TickMs: 1},
id: 1, id: 1,
r: *newRaftNode(raftNodeConfig{Node: n}), r: *newRaftNode(raftNodeConfig{Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
@ -1366,7 +1356,7 @@ func TestPublishStopped(t *testing.T) {
transport: rafthttp.NewNopTransporter(), transport: rafthttp.NewNopTransporter(),
}) })
srv := &EtcdServer{ srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1}, Cfg: ServerConfig{TickMs: 1},
r: *r, r: *r,
cluster: &membership.RaftCluster{}, cluster: &membership.RaftCluster{},
w: mockwait.NewNop(), w: mockwait.NewNop(),
@ -1388,7 +1378,7 @@ func TestPublishRetry(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
n := newNodeRecorderStream() n := newNodeRecorderStream()
srv := &EtcdServer{ srv := &EtcdServer{
Cfg: &ServerConfig{TickMs: 1}, Cfg: ServerConfig{TickMs: 1},
r: *newRaftNode(raftNodeConfig{Node: n}), r: *newRaftNode(raftNodeConfig{Node: n}),
w: mockwait.NewNop(), w: mockwait.NewNop(),
stopping: make(chan struct{}), stopping: make(chan struct{}),
@ -1429,7 +1419,7 @@ func TestUpdateVersion(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
srv := &EtcdServer{ srv := &EtcdServer{
id: 1, id: 1,
Cfg: &ServerConfig{TickMs: 1}, Cfg: ServerConfig{TickMs: 1},
r: *newRaftNode(raftNodeConfig{Node: n}), r: *newRaftNode(raftNodeConfig{Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
cluster: &membership.RaftCluster{}, cluster: &membership.RaftCluster{},

View File

@ -640,7 +640,7 @@ func (m *member) Clone(t *testing.T) *member {
func (m *member) Launch() error { func (m *member) Launch() error {
plog.Printf("launching %s (%s)", m.Name, m.grpcAddr) plog.Printf("launching %s (%s)", m.Name, m.grpcAddr)
var err error var err error
if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil { if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
return fmt.Errorf("failed to initialize the etcd server: %v", err) return fmt.Errorf("failed to initialize the etcd server: %v", err)
} }
m.s.SyncTicker = time.NewTicker(500 * time.Millisecond) m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)