mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #17614 from ivanvc/address-etcdserver-var-naming-lint-rule
etcdserver: address var naming lint rule
This commit is contained in:
commit
97029d768a
@ -279,7 +279,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
|
||||
e.cfg.logger.Info(
|
||||
"now serving peer/client/metrics",
|
||||
zap.String("local-member-id", e.Server.MemberId().String()),
|
||||
zap.String("local-member-id", e.Server.MemberID().String()),
|
||||
zap.Strings("initial-advertise-peer-urls", e.cfg.getAdvertisePeerUrls()),
|
||||
zap.Strings("listen-peer-urls", e.cfg.getListenPeerUrls()),
|
||||
zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientUrls()),
|
||||
|
@ -70,7 +70,7 @@ func (s *serverVersionAdapter) GetDowngradeInfo() *serverversion.DowngradeInfo {
|
||||
}
|
||||
|
||||
func (s *serverVersionAdapter) GetMembersVersions() map[string]*version.Versions {
|
||||
return getMembersVersions(s.lg, s.cluster, s.MemberId(), s.peerRt, s.Cfg.ReqTimeout())
|
||||
return getMembersVersions(s.lg, s.cluster, s.MemberID(), s.peerRt, s.Cfg.ReqTimeout())
|
||||
}
|
||||
|
||||
func (s *serverVersionAdapter) GetStorageVersion() *semver.Version {
|
||||
|
@ -240,7 +240,7 @@ type CheckRegistry struct {
|
||||
func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
|
||||
reg := CheckRegistry{checkType: checkTypeLivez, checks: make(map[string]HealthCheck)}
|
||||
reg.Register("serializable_read", readCheck(server, true /* serializable */))
|
||||
reg.InstallHttpEndpoints(lg, mux)
|
||||
reg.InstallHTTPEndpoints(lg, mux)
|
||||
}
|
||||
|
||||
func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
|
||||
@ -252,7 +252,7 @@ func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHea
|
||||
reg.Register("serializable_read", readCheck(server, true))
|
||||
// linearizable_read check would be replaced by read_index check in 3.6
|
||||
reg.Register("linearizable_read", readCheck(server, false))
|
||||
reg.InstallHttpEndpoints(lg, mux)
|
||||
reg.InstallHTTPEndpoints(lg, mux)
|
||||
}
|
||||
|
||||
func (reg *CheckRegistry) Register(name string, check HealthCheck) {
|
||||
@ -263,14 +263,23 @@ func (reg *CheckRegistry) RootPath() string {
|
||||
return "/" + reg.checkType
|
||||
}
|
||||
|
||||
// InstallHttpEndpoints installs the http handlers for the health checks.
|
||||
// Deprecated: Please use (*CheckRegistry) InstallHTTPEndpoints instead.
|
||||
//
|
||||
//revive:disable:var-naming
|
||||
func (reg *CheckRegistry) InstallHttpEndpoints(lg *zap.Logger, mux *http.ServeMux) {
|
||||
//revive:enable:var-naming
|
||||
reg.InstallHTTPEndpoints(lg, mux)
|
||||
}
|
||||
|
||||
func (reg *CheckRegistry) InstallHTTPEndpoints(lg *zap.Logger, mux *http.ServeMux) {
|
||||
checkNames := make([]string, 0, len(reg.checks))
|
||||
for k := range reg.checks {
|
||||
checkNames = append(checkNames, k)
|
||||
}
|
||||
|
||||
// installs the http handler for the root path.
|
||||
reg.installRootHttpEndpoint(lg, mux, checkNames...)
|
||||
reg.installRootHTTPEndpoint(lg, mux, checkNames...)
|
||||
for _, checkName := range checkNames {
|
||||
// installs the http handler for the individual check sub path.
|
||||
subpath := path.Join(reg.RootPath(), checkName)
|
||||
@ -302,8 +311,8 @@ func (reg *CheckRegistry) runHealthChecks(ctx context.Context, checkNames ...str
|
||||
return h
|
||||
}
|
||||
|
||||
// installRootHttpEndpoint installs the http handler for the root path.
|
||||
func (reg *CheckRegistry) installRootHttpEndpoint(lg *zap.Logger, mux *http.ServeMux, checks ...string) {
|
||||
// installRootHTTPEndpoint installs the http handler for the root path.
|
||||
func (reg *CheckRegistry) installRootHTTPEndpoint(lg *zap.Logger, mux *http.ServeMux, checks ...string) {
|
||||
hfunc := func(r *http.Request) HealthStatus {
|
||||
// extracts the health check names to be excludeList from the query param
|
||||
excluded := getQuerySet(r, "exclude")
|
||||
|
@ -160,12 +160,12 @@ func TestHealthHandler(t *testing.T) {
|
||||
})
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, nil, nil)
|
||||
checkHTTPResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, nil, nil)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHttpSubPath(t *testing.T) {
|
||||
func TestHTTPSubPath(t *testing.T) {
|
||||
be, _ := betesting.NewDefaultTmpBackend(t)
|
||||
defer betesting.Close(t, be)
|
||||
tests := []healthTestCase{
|
||||
@ -198,7 +198,7 @@ func TestHttpSubPath(t *testing.T) {
|
||||
HandleHealth(logger, mux, s)
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkHTTPResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkMetrics(t, tt.healthCheckURL, "", tt.expectStatusCode)
|
||||
})
|
||||
}
|
||||
@ -253,10 +253,10 @@ func TestDataCorruptionCheck(t *testing.T) {
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
// OK before alarms are activated.
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, http.StatusOK, nil, nil)
|
||||
checkHTTPResponse(t, ts, tt.healthCheckURL, http.StatusOK, nil, nil)
|
||||
// Activate the alarms.
|
||||
s.alarms = tt.alarms
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkHTTPResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -297,7 +297,7 @@ func TestSerializableReadCheck(t *testing.T) {
|
||||
HandleHealth(logger, mux, s)
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkHTTPResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkMetrics(t, tt.healthCheckURL, "serializable_read", tt.expectStatusCode)
|
||||
})
|
||||
}
|
||||
@ -338,13 +338,13 @@ func TestLinearizableReadCheck(t *testing.T) {
|
||||
HandleHealth(logger, mux, s)
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkHTTPResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkMetrics(t, tt.healthCheckURL, "linearizable_read", tt.expectStatusCode)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) {
|
||||
func checkHTTPResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) {
|
||||
res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)})
|
||||
|
||||
if err != nil {
|
||||
|
@ -49,18 +49,18 @@ type Member struct {
|
||||
// NewMember creates a Member without an ID and generates one based on the
|
||||
// cluster name, peer URLs, and time. This is used for bootstrapping/adding new member.
|
||||
func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
|
||||
memberId := computeMemberId(peerURLs, clusterName, now)
|
||||
return newMember(name, peerURLs, memberId, false)
|
||||
memberID := computeMemberID(peerURLs, clusterName, now)
|
||||
return newMember(name, peerURLs, memberID, false)
|
||||
}
|
||||
|
||||
// NewMemberAsLearner creates a learner Member without an ID and generates one based on the
|
||||
// cluster name, peer URLs, and time. This is used for adding new learner member.
|
||||
func NewMemberAsLearner(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
|
||||
memberId := computeMemberId(peerURLs, clusterName, now)
|
||||
return newMember(name, peerURLs, memberId, true)
|
||||
memberID := computeMemberID(peerURLs, clusterName, now)
|
||||
return newMember(name, peerURLs, memberID, true)
|
||||
}
|
||||
|
||||
func computeMemberId(peerURLs types.URLs, clusterName string, now *time.Time) types.ID {
|
||||
func computeMemberID(peerURLs types.URLs, clusterName string, now *time.Time) types.ID {
|
||||
peerURLstrs := peerURLs.StringSlice()
|
||||
sort.Strings(peerURLstrs)
|
||||
joinedPeerUrls := strings.Join(peerURLstrs, "")
|
||||
@ -75,14 +75,14 @@ func computeMemberId(peerURLs types.URLs, clusterName string, now *time.Time) ty
|
||||
return types.ID(binary.BigEndian.Uint64(hash[:8]))
|
||||
}
|
||||
|
||||
func newMember(name string, peerURLs types.URLs, memberId types.ID, isLearner bool) *Member {
|
||||
func newMember(name string, peerURLs types.URLs, memberID types.ID, isLearner bool) *Member {
|
||||
m := &Member{
|
||||
RaftAttributes: RaftAttributes{
|
||||
PeerURLs: peerURLs.StringSlice(),
|
||||
IsLearner: isLearner,
|
||||
},
|
||||
Attributes: Attributes{Name: name},
|
||||
ID: memberId,
|
||||
ID: memberID,
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
@ -170,7 +170,7 @@ func TestLoadNewestSnap(t *testing.T) {
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
availableWalSnaps []walpb.Snapshot
|
||||
availableWALSnaps []walpb.Snapshot
|
||||
expected *raftpb.Snapshot
|
||||
}{
|
||||
{
|
||||
@ -179,17 +179,17 @@ func TestLoadNewestSnap(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "loadnewestavailable-newest",
|
||||
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}, {Index: 5, Term: 1}},
|
||||
availableWALSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}, {Index: 5, Term: 1}},
|
||||
expected: &newSnap,
|
||||
},
|
||||
{
|
||||
name: "loadnewestavailable-newest-unsorted",
|
||||
availableWalSnaps: []walpb.Snapshot{{Index: 5, Term: 1}, {Index: 1, Term: 1}, {Index: 0, Term: 0}},
|
||||
availableWALSnaps: []walpb.Snapshot{{Index: 5, Term: 1}, {Index: 1, Term: 1}, {Index: 0, Term: 0}},
|
||||
expected: &newSnap,
|
||||
},
|
||||
{
|
||||
name: "loadnewestavailable-previous",
|
||||
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}},
|
||||
availableWALSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}},
|
||||
expected: testSnap,
|
||||
},
|
||||
}
|
||||
@ -197,8 +197,8 @@ func TestLoadNewestSnap(t *testing.T) {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
var err error
|
||||
var g *raftpb.Snapshot
|
||||
if tc.availableWalSnaps != nil {
|
||||
g, err = ss.LoadNewestAvailable(tc.availableWalSnaps)
|
||||
if tc.availableWALSnaps != nil {
|
||||
g, err = ss.LoadNewestAvailable(tc.availableWALSnaps)
|
||||
} else {
|
||||
g, err = ss.Load()
|
||||
}
|
||||
|
@ -139,7 +139,7 @@ func (e Error) Error() string {
|
||||
return e.Message + " (" + e.Cause + ")"
|
||||
}
|
||||
|
||||
func (e Error) toJsonString() string {
|
||||
func (e Error) toJSONString() string {
|
||||
b, _ := json.Marshal(e)
|
||||
return string(b)
|
||||
}
|
||||
@ -156,6 +156,6 @@ func (e Error) WriteTo(w http.ResponseWriter) error {
|
||||
w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index))
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(e.StatusCode())
|
||||
_, err := w.Write([]byte(e.toJsonString() + "\n"))
|
||||
_, err := w.Write([]byte(e.toJSONString() + "\n"))
|
||||
return err
|
||||
}
|
||||
|
@ -33,8 +33,8 @@ func TestErrorWriteTo(t *testing.T) {
|
||||
}
|
||||
|
||||
gbody := strings.TrimSuffix(rr.Body.String(), "\n")
|
||||
if err.toJsonString() != gbody {
|
||||
t.Errorf("HTTP body %q, want %q", gbody, err.toJsonString())
|
||||
if err.toJSONString() != gbody {
|
||||
t.Errorf("HTTP body %q, want %q", gbody, err.toJSONString())
|
||||
}
|
||||
|
||||
wheader := http.Header(map[string][]string{
|
||||
|
@ -21,7 +21,7 @@ import (
|
||||
)
|
||||
|
||||
func TestHeapPushPop(t *testing.T) {
|
||||
h := newTtlKeyHeap()
|
||||
h := newTTLKeyHeap()
|
||||
|
||||
// add from older expire time to earlier expire time
|
||||
// the path is equal to ttl from now
|
||||
@ -45,7 +45,7 @@ func TestHeapPushPop(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHeapUpdate(t *testing.T) {
|
||||
h := newTtlKeyHeap()
|
||||
h := newTTLKeyHeap()
|
||||
|
||||
kvs := make([]*node, 10)
|
||||
|
||||
|
@ -104,7 +104,7 @@ func (s *Stats) clone() *Stats {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Stats) toJson() []byte {
|
||||
func (s *Stats) toJSON() []byte {
|
||||
b, _ := json.Marshal(s)
|
||||
return b
|
||||
}
|
||||
|
@ -99,7 +99,7 @@ func newStore(namespaces ...string) *store {
|
||||
}
|
||||
s.Stats = newStats()
|
||||
s.WatcherHub = newWatchHub(1000)
|
||||
s.ttlKeyHeap = newTtlKeyHeap()
|
||||
s.ttlKeyHeap = newTTLKeyHeap()
|
||||
s.readonlySet = types.NewUnsafeSet(append(namespaces, "/")...)
|
||||
return s
|
||||
}
|
||||
@ -781,15 +781,17 @@ func (s *store) Recovery(state []byte) error {
|
||||
return err
|
||||
}
|
||||
|
||||
s.ttlKeyHeap = newTtlKeyHeap()
|
||||
s.ttlKeyHeap = newTTLKeyHeap()
|
||||
|
||||
s.Root.recoverAndclean()
|
||||
return nil
|
||||
}
|
||||
|
||||
//revive:disable:var-naming
|
||||
func (s *store) JsonStats() []byte {
|
||||
//revive:enable:var-naming
|
||||
s.Stats.Watchers = uint64(s.WatcherHub.count)
|
||||
return s.Stats.toJson()
|
||||
return s.Stats.toJSON()
|
||||
}
|
||||
|
||||
func (s *store) HasTTLKeys() bool {
|
||||
|
@ -22,7 +22,7 @@ type ttlKeyHeap struct {
|
||||
keyMap map[*node]int
|
||||
}
|
||||
|
||||
func newTtlKeyHeap() *ttlKeyHeap {
|
||||
func newTTLKeyHeap() *ttlKeyHeap {
|
||||
h := &ttlKeyHeap{keyMap: make(map[*node]int)}
|
||||
heap.Init(h)
|
||||
return h
|
||||
|
@ -59,7 +59,7 @@ type DiscoveryConfig struct {
|
||||
type memberInfo struct {
|
||||
// peerRegKey is the key used by the member when registering in the
|
||||
// discovery service.
|
||||
// Format: "/_etcd/registry/<ClusterToken>/members/<memberId>".
|
||||
// Format: "/_etcd/registry/<ClusterToken>/members/<memberID>".
|
||||
peerRegKey string
|
||||
// peerURLsMap format: "peerName=peerURLs", i.e., "member1=http://127.0.0.1:2380".
|
||||
peerURLsMap string
|
||||
@ -88,9 +88,9 @@ func getMemberKeyPrefix(clusterToken string) string {
|
||||
return path.Join(getClusterKeyPrefix(clusterToken), "members")
|
||||
}
|
||||
|
||||
// key format for each member: "/_etcd/registry/<ClusterToken>/members/<memberId>".
|
||||
func getMemberKey(cluster, memberId string) string {
|
||||
return path.Join(getMemberKeyPrefix(cluster), memberId)
|
||||
// key format for each member: "/_etcd/registry/<ClusterToken>/members/<memberID>".
|
||||
func getMemberKey(cluster, memberID string) string {
|
||||
return path.Join(getMemberKeyPrefix(cluster), memberID)
|
||||
}
|
||||
|
||||
// GetCluster will connect to the discovery service at the given endpoints and
|
||||
@ -155,7 +155,7 @@ func JoinCluster(lg *zap.Logger, cfg *DiscoveryConfig, id types.ID, config strin
|
||||
type discovery struct {
|
||||
lg *zap.Logger
|
||||
clusterToken string
|
||||
memberId types.ID
|
||||
memberID types.ID
|
||||
c *clientv3.Client
|
||||
retries uint
|
||||
|
||||
@ -182,7 +182,7 @@ func newDiscovery(lg *zap.Logger, dcfg *DiscoveryConfig, id types.ID) (*discover
|
||||
return &discovery{
|
||||
lg: lg,
|
||||
clusterToken: dcfg.Token,
|
||||
memberId: id,
|
||||
memberID: id,
|
||||
c: c,
|
||||
cfg: dcfg,
|
||||
clock: clockwork.NewRealClock(),
|
||||
@ -317,10 +317,10 @@ func (d *discovery) checkCluster() (*clusterInfo, int, int64, error) {
|
||||
d.retries = 0
|
||||
|
||||
// find self position
|
||||
memberSelfId := getMemberKey(d.clusterToken, d.memberId.String())
|
||||
memberSelfID := getMemberKey(d.clusterToken, d.memberID.String())
|
||||
idx := 0
|
||||
for _, m := range cls.members {
|
||||
if m.peerRegKey == memberSelfId {
|
||||
if m.peerRegKey == memberSelfID {
|
||||
break
|
||||
}
|
||||
if idx >= clusterSize-1 {
|
||||
@ -341,7 +341,7 @@ func (d *discovery) registerSelfRetry(contents string) error {
|
||||
|
||||
func (d *discovery) registerSelf(contents string) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RequestTimeout)
|
||||
memberKey := getMemberKey(d.clusterToken, d.memberId.String())
|
||||
memberKey := getMemberKey(d.clusterToken, d.memberID.String())
|
||||
_, err := d.c.Put(ctx, memberKey, contents)
|
||||
cancel()
|
||||
|
||||
|
@ -326,35 +326,35 @@ func TestCheckCluster(t *testing.T) {
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
memberId types.ID
|
||||
memberID types.ID
|
||||
getSizeRetries int
|
||||
getMembersRetries int
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
name: "no retries",
|
||||
memberId: 101,
|
||||
memberID: 101,
|
||||
getSizeRetries: 0,
|
||||
getMembersRetries: 0,
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "2 retries for getClusterSize",
|
||||
memberId: 102,
|
||||
memberID: 102,
|
||||
getSizeRetries: 2,
|
||||
getMembersRetries: 0,
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "2 retries for getClusterMembers",
|
||||
memberId: 103,
|
||||
memberID: 103,
|
||||
getSizeRetries: 0,
|
||||
getMembersRetries: 2,
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "error due to cluster full",
|
||||
memberId: 104,
|
||||
memberID: 104,
|
||||
getSizeRetries: 0,
|
||||
getMembersRetries: 0,
|
||||
expectedError: ErrFullCluster,
|
||||
@ -382,7 +382,7 @@ func TestCheckCluster(t *testing.T) {
|
||||
},
|
||||
cfg: &DiscoveryConfig{},
|
||||
clusterToken: "fakeToken",
|
||||
memberId: tc.memberId,
|
||||
memberID: tc.memberID,
|
||||
clock: clockwork.NewRealClock(),
|
||||
}
|
||||
|
||||
@ -442,7 +442,7 @@ func TestRegisterSelf(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
token string
|
||||
memberId types.ID
|
||||
memberID types.ID
|
||||
expectedRegKey string
|
||||
expectedRegValue string
|
||||
retries int // when retries > 0, then return an error on Put request.
|
||||
@ -450,7 +450,7 @@ func TestRegisterSelf(t *testing.T) {
|
||||
{
|
||||
name: "no retry with token1",
|
||||
token: "token1",
|
||||
memberId: 101,
|
||||
memberID: 101,
|
||||
expectedRegKey: "/_etcd/registry/token1/members/" + types.ID(101).String(),
|
||||
expectedRegValue: "infra=http://127.0.0.1:2380",
|
||||
retries: 0,
|
||||
@ -458,7 +458,7 @@ func TestRegisterSelf(t *testing.T) {
|
||||
{
|
||||
name: "no retry with token2",
|
||||
token: "token2",
|
||||
memberId: 102,
|
||||
memberID: 102,
|
||||
expectedRegKey: "/_etcd/registry/token2/members/" + types.ID(102).String(),
|
||||
expectedRegValue: "infra=http://127.0.0.1:2380",
|
||||
retries: 0,
|
||||
@ -466,7 +466,7 @@ func TestRegisterSelf(t *testing.T) {
|
||||
{
|
||||
name: "2 retries",
|
||||
token: "token3",
|
||||
memberId: 103,
|
||||
memberID: 103,
|
||||
expectedRegKey: "/_etcd/registry/token3/members/" + types.ID(103).String(),
|
||||
expectedRegValue: "infra=http://127.0.0.1:2380",
|
||||
retries: 2,
|
||||
@ -487,7 +487,7 @@ func TestRegisterSelf(t *testing.T) {
|
||||
d := &discovery{
|
||||
lg: lg,
|
||||
clusterToken: tc.token,
|
||||
memberId: tc.memberId,
|
||||
memberID: tc.memberID,
|
||||
cfg: &DiscoveryConfig{},
|
||||
c: &clientv3.Client{
|
||||
KV: fkv,
|
||||
|
@ -30,7 +30,7 @@ type header struct {
|
||||
func newHeader(s *etcdserver.EtcdServer) header {
|
||||
return header{
|
||||
clusterID: int64(s.Cluster().ID()),
|
||||
memberID: int64(s.MemberId()),
|
||||
memberID: int64(s.MemberID()),
|
||||
sg: s,
|
||||
rev: func() int64 { return s.KV().Rev() },
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
||||
return nil, rpctypes.ErrGRPCNotCapable
|
||||
}
|
||||
|
||||
if s.IsMemberExist(s.MemberId()) && s.IsLearner() && !isRPCSupportedForLearner(req) {
|
||||
if s.IsMemberExist(s.MemberID()) && s.IsLearner() && !isRPCSupportedForLearner(req) {
|
||||
return nil, rpctypes.ErrGRPCNotSupportedForLearner
|
||||
}
|
||||
|
||||
@ -218,7 +218,7 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
|
||||
return rpctypes.ErrGRPCNotCapable
|
||||
}
|
||||
|
||||
if s.IsMemberExist(s.MemberId()) && s.IsLearner() && info.FullMethod != snapshotMethod { // learner does not support stream RPC except Snapshot
|
||||
if s.IsMemberExist(s.MemberID()) && s.IsLearner() && info.FullMethod != snapshotMethod { // learner does not support stream RPC except Snapshot
|
||||
return rpctypes.ErrGRPCNotSupportedForLearner
|
||||
}
|
||||
|
||||
|
@ -255,7 +255,7 @@ func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (
|
||||
}
|
||||
|
||||
func (ms *maintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
|
||||
if ms.rg.MemberId() != ms.rg.Leader() {
|
||||
if ms.rg.MemberID() != ms.rg.Leader() {
|
||||
return nil, rpctypes.ErrGRPCNotLeader
|
||||
}
|
||||
|
||||
|
@ -106,7 +106,7 @@ func (cs *ClusterServer) MemberPromote(ctx context.Context, r *pb.MemberPromoteR
|
||||
}
|
||||
|
||||
func (cs *ClusterServer) header() *pb.ResponseHeader {
|
||||
return &pb.ResponseHeader{ClusterId: uint64(cs.cluster.ID()), MemberId: uint64(cs.server.MemberId()), RaftTerm: cs.server.Term()}
|
||||
return &pb.ResponseHeader{ClusterId: uint64(cs.cluster.ID()), MemberId: uint64(cs.server.MemberID()), RaftTerm: cs.server.Term()}
|
||||
}
|
||||
|
||||
func membersToProtoMembers(membs []*membership.Member) []*pb.Member {
|
||||
|
@ -53,7 +53,7 @@ func (qa *quotaAlarmer) check(ctx context.Context, r any) error {
|
||||
func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {
|
||||
return "aKVServer{
|
||||
NewKVServer(s),
|
||||
quotaAlarmer{newBackendQuota(s, "kv"), s, s.MemberId()},
|
||||
quotaAlarmer{newBackendQuota(s, "kv"), s, s.MemberID()},
|
||||
}
|
||||
}
|
||||
|
||||
@ -86,7 +86,7 @@ func (s *quotaLeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequ
|
||||
func NewQuotaLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
|
||||
return "aLeaseServer{
|
||||
NewLeaseServer(s),
|
||||
quotaAlarmer{newBackendQuota(s, "lease"), s, s.MemberId()},
|
||||
quotaAlarmer{newBackendQuota(s, "lease"), s, s.MemberID()},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -55,7 +55,7 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
|
||||
lg: s.Cfg.Logger,
|
||||
|
||||
clusterID: int64(s.Cluster().ID()),
|
||||
memberID: int64(s.MemberId()),
|
||||
memberID: int64(s.MemberID()),
|
||||
|
||||
maxRequestBytes: int(s.Cfg.MaxRequestBytes + grpcOverheadBytes),
|
||||
|
||||
|
@ -45,7 +45,7 @@ const (
|
||||
|
||||
// RaftStatusGetter represents etcd server and Raft progress.
|
||||
type RaftStatusGetter interface {
|
||||
MemberId() types.ID
|
||||
MemberID() types.ID
|
||||
Leader() types.ID
|
||||
CommittedIndex() uint64
|
||||
AppliedIndex() uint64
|
||||
@ -484,7 +484,7 @@ func (a *quotaApplierV3) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantRes
|
||||
func (a *applierV3backend) newHeader() *pb.ResponseHeader {
|
||||
return &pb.ResponseHeader{
|
||||
ClusterId: uint64(a.cluster.ID()),
|
||||
MemberId: uint64(a.raftStatus.MemberId()),
|
||||
MemberId: uint64(a.raftStatus.MemberID()),
|
||||
Revision: a.kv.Rev(),
|
||||
RaftTerm: a.raftStatus.Term(),
|
||||
}
|
||||
|
@ -50,7 +50,7 @@ func dummyApplyFunc(_ context.Context, _ *pb.InternalRaftRequest) *Result {
|
||||
|
||||
type fakeRaftStatusGetter struct{}
|
||||
|
||||
func (*fakeRaftStatusGetter) MemberId() types.ID {
|
||||
func (*fakeRaftStatusGetter) MemberID() types.ID {
|
||||
return 0
|
||||
}
|
||||
func (*fakeRaftStatusGetter) Leader() types.ID {
|
||||
@ -121,7 +121,7 @@ const (
|
||||
rangeEnd = "rangeEnd"
|
||||
keyOutsideRange = "rangeEnd_outside"
|
||||
|
||||
LeaseId = 1
|
||||
leaseID = 1
|
||||
)
|
||||
|
||||
func mustCreateRolesAndEnableAuth(t *testing.T, authApplier *authApplierV3) {
|
||||
@ -460,7 +460,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {
|
||||
|
||||
_, err := authApplier.LeaseGrant(&pb.LeaseGrantRequest{
|
||||
TTL: lease.MaxLeaseTTL,
|
||||
ID: LeaseId,
|
||||
ID: leaseID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -469,7 +469,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {
|
||||
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(key),
|
||||
Value: []byte("1"),
|
||||
Lease: LeaseId,
|
||||
Lease: leaseID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -478,7 +478,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {
|
||||
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(keyOutsideRange),
|
||||
Value: []byte("1"),
|
||||
Lease: LeaseId,
|
||||
Lease: leaseID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -487,7 +487,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {
|
||||
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(key),
|
||||
Value: []byte("1"),
|
||||
Lease: LeaseId,
|
||||
Lease: leaseID,
|
||||
})
|
||||
require.Equal(t, err, auth.ErrPermissionDenied)
|
||||
}
|
||||
@ -684,20 +684,20 @@ func TestAuthApplierV3_LeaseRevoke(t *testing.T) {
|
||||
|
||||
_, err := authApplier.LeaseGrant(&pb.LeaseGrantRequest{
|
||||
TTL: lease.MaxLeaseTTL,
|
||||
ID: LeaseId,
|
||||
ID: leaseID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// The user should be able to revoke the lease
|
||||
setAuthInfo(authApplier, userWriteOnly)
|
||||
_, err = authApplier.LeaseRevoke(&pb.LeaseRevokeRequest{
|
||||
ID: LeaseId,
|
||||
ID: leaseID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = authApplier.LeaseGrant(&pb.LeaseGrantRequest{
|
||||
TTL: lease.MaxLeaseTTL,
|
||||
ID: LeaseId,
|
||||
ID: leaseID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -706,14 +706,14 @@ func TestAuthApplierV3_LeaseRevoke(t *testing.T) {
|
||||
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(keyOutsideRange),
|
||||
Value: []byte("1"),
|
||||
Lease: LeaseId,
|
||||
Lease: leaseID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// The user should not be able to revoke the lease anymore
|
||||
setAuthInfo(authApplier, userWriteOnly)
|
||||
_, err = authApplier.LeaseRevoke(&pb.LeaseRevokeRequest{
|
||||
ID: LeaseId,
|
||||
ID: leaseID,
|
||||
})
|
||||
require.Equal(t, err, auth.ErrPermissionDenied)
|
||||
}
|
||||
|
@ -62,14 +62,14 @@ func NewUberApplier(
|
||||
warningApplyDuration time.Duration,
|
||||
txnModeWriteWithSharedBuffer bool,
|
||||
quotaBackendBytesCfg int64) UberApplier {
|
||||
applyV3base_ := newApplierV3(lg, be, kv, alarmStore, authStore, lessor, cluster, raftStatus, snapshotServer, consistentIndex, txnModeWriteWithSharedBuffer, quotaBackendBytesCfg)
|
||||
applyV3base := newApplierV3(lg, be, kv, alarmStore, authStore, lessor, cluster, raftStatus, snapshotServer, consistentIndex, txnModeWriteWithSharedBuffer, quotaBackendBytesCfg)
|
||||
|
||||
ua := &uberApplier{
|
||||
lg: lg,
|
||||
alarmStore: alarmStore,
|
||||
warningApplyDuration: warningApplyDuration,
|
||||
applyV3: applyV3base_,
|
||||
applyV3base: applyV3base_,
|
||||
applyV3: applyV3base,
|
||||
applyV3base: applyV3base,
|
||||
}
|
||||
ua.restoreAlarms()
|
||||
return ua
|
||||
|
@ -34,7 +34,7 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/storage/schema"
|
||||
)
|
||||
|
||||
const memberId = 111195
|
||||
const memberID = 111195
|
||||
|
||||
func defaultUberApplier(t *testing.T) UberApplier {
|
||||
lg := zaptest.NewLogger(t)
|
||||
@ -44,7 +44,7 @@ func defaultUberApplier(t *testing.T) UberApplier {
|
||||
})
|
||||
|
||||
cluster := membership.NewCluster(lg)
|
||||
cluster.AddMember(&membership.Member{ID: memberId}, true)
|
||||
cluster.AddMember(&membership.Member{ID: memberID}, true)
|
||||
lessor := lease.NewLessor(lg, be, cluster, lease.LessorConfig{})
|
||||
kv := mvcc.NewStore(lg, be, lessor, mvcc.StoreConfig{})
|
||||
alarmStore, err := v3alarm.NewAlarmStore(lg, schema.NewAlarmBackend(lg, be))
|
||||
@ -125,7 +125,7 @@ func TestUberApplier_Alarm_Corrupt(t *testing.T) {
|
||||
Header: &pb.RequestHeader{},
|
||||
Alarm: &pb.AlarmRequest{
|
||||
Action: pb.AlarmRequest_ACTIVATE,
|
||||
MemberID: memberId,
|
||||
MemberID: memberID,
|
||||
Alarm: pb.AlarmType_CORRUPT,
|
||||
},
|
||||
})
|
||||
@ -224,7 +224,7 @@ func TestUberApplier_Alarm_Quota(t *testing.T) {
|
||||
Header: &pb.RequestHeader{},
|
||||
Alarm: &pb.AlarmRequest{
|
||||
Action: pb.AlarmRequest_ACTIVATE,
|
||||
MemberID: memberId,
|
||||
MemberID: memberID,
|
||||
Alarm: pb.AlarmType_NOSPACE,
|
||||
},
|
||||
})
|
||||
@ -247,7 +247,7 @@ func TestUberApplier_Alarm_Deactivate(t *testing.T) {
|
||||
Header: &pb.RequestHeader{},
|
||||
Alarm: &pb.AlarmRequest{
|
||||
Action: pb.AlarmRequest_ACTIVATE,
|
||||
MemberID: memberId,
|
||||
MemberID: memberID,
|
||||
Alarm: pb.AlarmType_NOSPACE,
|
||||
},
|
||||
})
|
||||
@ -262,7 +262,7 @@ func TestUberApplier_Alarm_Deactivate(t *testing.T) {
|
||||
Header: &pb.RequestHeader{},
|
||||
Alarm: &pb.AlarmRequest{
|
||||
Action: pb.AlarmRequest_DEACTIVATE,
|
||||
MemberID: memberId,
|
||||
MemberID: memberID,
|
||||
Alarm: pb.AlarmType_NOSPACE,
|
||||
},
|
||||
})
|
||||
|
@ -300,8 +300,8 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R
|
||||
}
|
||||
// TODO: refactor member http handler code
|
||||
// cannot import etcdhttp, so manually construct url
|
||||
requestUrl := url + "/members/promote/" + fmt.Sprintf("%d", id)
|
||||
req, err := http.NewRequest(http.MethodPost, requestUrl, nil)
|
||||
requestURL := url + "/members/promote/" + fmt.Sprintf("%d", id)
|
||||
req, err := http.NewRequest(http.MethodPost, requestURL, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ type corruptionChecker struct {
|
||||
type Hasher interface {
|
||||
mvcc.HashStorage
|
||||
ReqTimeout() time.Duration
|
||||
MemberId() types.ID
|
||||
MemberID() types.ID
|
||||
PeerHashByRev(int64) []*peerHashKVResp
|
||||
LinearizableReadNotify(context.Context) error
|
||||
TriggerCorruptAlarm(types.ID)
|
||||
@ -89,13 +89,13 @@ func (h hasherAdapter) TriggerCorruptAlarm(memberID types.ID) {
|
||||
func (cm *corruptionChecker) InitialCheck() error {
|
||||
cm.lg.Info(
|
||||
"starting initial corruption check",
|
||||
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
||||
zap.String("local-member-id", cm.hasher.MemberID().String()),
|
||||
zap.Duration("timeout", cm.hasher.ReqTimeout()),
|
||||
)
|
||||
|
||||
h, _, err := cm.hasher.HashByRev(0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s failed to fetch hash (%v)", cm.hasher.MemberId(), err)
|
||||
return fmt.Errorf("%s failed to fetch hash (%v)", cm.hasher.MemberID(), err)
|
||||
}
|
||||
peers := cm.hasher.PeerHashByRev(h.Revision)
|
||||
mismatch := 0
|
||||
@ -103,7 +103,7 @@ func (cm *corruptionChecker) InitialCheck() error {
|
||||
if p.resp != nil {
|
||||
peerID := types.ID(p.resp.Header.MemberId)
|
||||
fields := []zap.Field{
|
||||
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
||||
zap.String("local-member-id", cm.hasher.MemberID().String()),
|
||||
zap.Int64("local-member-revision", h.Revision),
|
||||
zap.Int64("local-member-compact-revision", h.CompactRevision),
|
||||
zap.Uint32("local-member-hash", h.Hash),
|
||||
@ -131,7 +131,7 @@ func (cm *corruptionChecker) InitialCheck() error {
|
||||
case rpctypes.ErrFutureRev:
|
||||
cm.lg.Warn(
|
||||
"cannot fetch hash from slow remote peer",
|
||||
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
||||
zap.String("local-member-id", cm.hasher.MemberID().String()),
|
||||
zap.Int64("local-member-revision", h.Revision),
|
||||
zap.Int64("local-member-compact-revision", h.CompactRevision),
|
||||
zap.Uint32("local-member-hash", h.Hash),
|
||||
@ -142,7 +142,7 @@ func (cm *corruptionChecker) InitialCheck() error {
|
||||
case rpctypes.ErrCompacted:
|
||||
cm.lg.Warn(
|
||||
"cannot fetch hash from remote peer; local member is behind",
|
||||
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
||||
zap.String("local-member-id", cm.hasher.MemberID().String()),
|
||||
zap.Int64("local-member-revision", h.Revision),
|
||||
zap.Int64("local-member-compact-revision", h.CompactRevision),
|
||||
zap.Uint32("local-member-hash", h.Hash),
|
||||
@ -153,7 +153,7 @@ func (cm *corruptionChecker) InitialCheck() error {
|
||||
case rpctypes.ErrClusterIDMismatch:
|
||||
cm.lg.Warn(
|
||||
"cluster ID mismatch",
|
||||
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
||||
zap.String("local-member-id", cm.hasher.MemberID().String()),
|
||||
zap.Int64("local-member-revision", h.Revision),
|
||||
zap.Int64("local-member-compact-revision", h.CompactRevision),
|
||||
zap.Uint32("local-member-hash", h.Hash),
|
||||
@ -165,12 +165,12 @@ func (cm *corruptionChecker) InitialCheck() error {
|
||||
}
|
||||
}
|
||||
if mismatch > 0 {
|
||||
return fmt.Errorf("%s found data inconsistency with peers", cm.hasher.MemberId())
|
||||
return fmt.Errorf("%s found data inconsistency with peers", cm.hasher.MemberID())
|
||||
}
|
||||
|
||||
cm.lg.Info(
|
||||
"initial corruption checking passed; no corruption",
|
||||
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
||||
zap.String("local-member-id", cm.hasher.MemberID().String()),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
@ -213,7 +213,7 @@ func (cm *corruptionChecker) PeriodicCheck() error {
|
||||
zap.Int64("compact-revision-2", h2.CompactRevision),
|
||||
zap.Uint32("hash-2", h2.Hash),
|
||||
)
|
||||
mismatch(cm.hasher.MemberId())
|
||||
mismatch(cm.hasher.MemberID())
|
||||
}
|
||||
|
||||
checkedCount := 0
|
||||
@ -275,7 +275,7 @@ func (cm *corruptionChecker) PeriodicCheck() error {
|
||||
// method still passes without raising alarm.
|
||||
func (cm *corruptionChecker) CompactHashCheck() {
|
||||
cm.lg.Info("starting compact hash check",
|
||||
zap.String("local-member-id", cm.hasher.MemberId().String()),
|
||||
zap.String("local-member-id", cm.hasher.MemberID().String()),
|
||||
zap.Duration("timeout", cm.hasher.ReqTimeout()),
|
||||
)
|
||||
hashes := cm.uncheckedRevisions()
|
||||
@ -300,8 +300,8 @@ func (cm *corruptionChecker) CompactHashCheck() {
|
||||
// true: successfully checked hash on whole cluster or raised alarms, so no need to check next hash
|
||||
// false: skipped some members, so need to check next hash
|
||||
func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers []*peerHashKVResp) bool {
|
||||
leaderId := cm.hasher.MemberId()
|
||||
hash2members := map[uint32]types.IDSlice{leaderHash.Hash: {leaderId}}
|
||||
leaderID := cm.hasher.MemberID()
|
||||
hash2members := map[uint32]types.IDSlice{leaderHash.Hash: {leaderID}}
|
||||
|
||||
peersChecked := 0
|
||||
// group all peers by hash
|
||||
@ -319,7 +319,7 @@ func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers
|
||||
}
|
||||
if skipped {
|
||||
cm.lg.Warn("Skipped peer's hash", zap.Int("number-of-peers", len(peers)),
|
||||
zap.String("leader-id", leaderId.String()),
|
||||
zap.String("leader-id", leaderID.String()),
|
||||
zap.String("peer-id", peer.id.String()),
|
||||
zap.String("reason", reason))
|
||||
continue
|
||||
@ -358,7 +358,7 @@ func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers
|
||||
// corrupted. In such situation, we intentionally set the memberID
|
||||
// as 0, it means it affects the whole cluster.
|
||||
cm.lg.Error("Detected compaction hash mismatch but cannot identify the corrupted members, so intentionally set the memberID as 0",
|
||||
zap.String("leader-id", leaderId.String()),
|
||||
zap.String("leader-id", leaderID.String()),
|
||||
zap.Int64("leader-revision", leaderHash.Revision),
|
||||
zap.Int64("leader-compact-revision", leaderHash.CompactRevision),
|
||||
zap.Uint32("leader-hash", leaderHash.Hash),
|
||||
@ -376,7 +376,7 @@ func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers
|
||||
}
|
||||
|
||||
cm.lg.Error("Detected compaction hash mismatch",
|
||||
zap.String("leader-id", leaderId.String()),
|
||||
zap.String("leader-id", leaderID.String()),
|
||||
zap.Int64("leader-revision", leaderHash.Revision),
|
||||
zap.Int64("leader-compact-revision", leaderHash.CompactRevision),
|
||||
zap.Uint32("leader-hash", leaderHash.Hash),
|
||||
@ -459,7 +459,7 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp {
|
||||
members := s.cluster.Members()
|
||||
peers := make([]peerInfo, 0, len(members))
|
||||
for _, m := range members {
|
||||
if m.ID == s.MemberId() {
|
||||
if m.ID == s.MemberID() {
|
||||
continue
|
||||
}
|
||||
peers = append(peers, peerInfo{id: m.ID, eps: m.PeerURLs})
|
||||
@ -493,7 +493,7 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp {
|
||||
}
|
||||
lg.Warn(
|
||||
"failed hash kv request",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.Int64("requested-revision", rev),
|
||||
zap.String("remote-peer-endpoint", ep),
|
||||
zap.Error(lastErr),
|
||||
@ -582,8 +582,8 @@ func HashByRev(ctx context.Context, cid types.ID, cc *http.Client, url string, r
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
requestUrl := url + PeerHashKVPath
|
||||
req, err := http.NewRequest(http.MethodGet, requestUrl, bytes.NewReader(hashReqBytes))
|
||||
requestURL := url + PeerHashKVPath
|
||||
req, err := http.NewRequest(http.MethodGet, requestURL, bytes.NewReader(hashReqBytes))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -52,56 +52,56 @@ func TestInitialCheck(t *testing.T) {
|
||||
hasher: fakeHasher{
|
||||
hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Revision: 10}}},
|
||||
},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(10)", "MemberId()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(10)", "MemberID()"},
|
||||
},
|
||||
{
|
||||
name: "Error getting hash",
|
||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{err: fmt.Errorf("error getting hash")}}},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "MemberId()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "HashByRev(0)", "MemberID()"},
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "Peer with empty response",
|
||||
hasher: fakeHasher{peerHashes: []*peerHashKVResp{{}}},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberID()"},
|
||||
},
|
||||
{
|
||||
name: "Peer returned ErrFutureRev",
|
||||
hasher: fakeHasher{peerHashes: []*peerHashKVResp{{err: rpctypes.ErrFutureRev}}},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberID()", "MemberID()"},
|
||||
},
|
||||
{
|
||||
name: "Peer returned ErrCompacted",
|
||||
hasher: fakeHasher{peerHashes: []*peerHashKVResp{{err: rpctypes.ErrCompacted}}},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberID()", "MemberID()"},
|
||||
},
|
||||
{
|
||||
name: "Peer returned other error",
|
||||
hasher: fakeHasher{peerHashes: []*peerHashKVResp{{err: rpctypes.ErrCorrupt}}},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberID()"},
|
||||
},
|
||||
{
|
||||
name: "Peer returned same hash",
|
||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 1}}}},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberID()", "MemberID()"},
|
||||
},
|
||||
{
|
||||
name: "Peer returned different hash with same compaction rev",
|
||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 1}}}},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberID()", "MemberID()"},
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "Peer returned different hash and compaction rev",
|
||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 2}}}},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberID()", "MemberID()"},
|
||||
},
|
||||
{
|
||||
name: "Cluster ID Mismatch does not fail CorruptionChecker.InitialCheck()",
|
||||
hasher: fakeHasher{
|
||||
peerHashes: []*peerHashKVResp{{err: rpctypes.ErrClusterIDMismatch}},
|
||||
},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberID()", "MemberID()"},
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
@ -166,7 +166,7 @@ func TestPeriodicCheck(t *testing.T) {
|
||||
{
|
||||
name: "Different local hash and same revisions",
|
||||
hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1, Revision: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 1, Revision: 1}, revision: 1}}},
|
||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "MemberId()", "TriggerCorruptAlarm(1)"},
|
||||
expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "MemberID()", "TriggerCorruptAlarm(1)"},
|
||||
expectCorrupt: true,
|
||||
},
|
||||
{
|
||||
@ -259,7 +259,7 @@ func TestCompactHashCheck(t *testing.T) {
|
||||
}{
|
||||
{
|
||||
name: "No hashes",
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "Hashes()"},
|
||||
},
|
||||
{
|
||||
name: "No peers, check new checked from largest to smallest",
|
||||
@ -267,7 +267,7 @@ func TestCompactHashCheck(t *testing.T) {
|
||||
hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}, {Revision: 3}, {Revision: 4}},
|
||||
},
|
||||
lastRevisionChecked: 2,
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(4)", "PeerHashByRev(3)"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "Hashes()", "PeerHashByRev(4)", "PeerHashByRev(3)"},
|
||||
expectLastRevisionChecked: 2,
|
||||
},
|
||||
{
|
||||
@ -276,7 +276,7 @@ func TestCompactHashCheck(t *testing.T) {
|
||||
hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}},
|
||||
peerHashes: []*peerHashKVResp{{err: fmt.Errorf("failed getting hash")}},
|
||||
},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberID()", "PeerHashByRev(1)", "MemberID()"},
|
||||
},
|
||||
{
|
||||
name: "Peer returned different compaction revision is skipped",
|
||||
@ -284,7 +284,7 @@ func TestCompactHashCheck(t *testing.T) {
|
||||
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1}, {Revision: 2, CompactRevision: 2}},
|
||||
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{CompactRevision: 3}}},
|
||||
},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberID()", "PeerHashByRev(1)", "MemberID()"},
|
||||
},
|
||||
{
|
||||
name: "Etcd can identify two corrupted members in 5 member cluster",
|
||||
@ -297,7 +297,7 @@ func TestCompactHashCheck(t *testing.T) {
|
||||
{peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 7}},
|
||||
},
|
||||
},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(44)", "TriggerCorruptAlarm(45)"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberID()", "TriggerCorruptAlarm(44)", "TriggerCorruptAlarm(45)"},
|
||||
expectCorrupt: true,
|
||||
},
|
||||
{
|
||||
@ -309,7 +309,7 @@ func TestCompactHashCheck(t *testing.T) {
|
||||
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
|
||||
},
|
||||
},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberID()", "PeerHashByRev(1)", "MemberID()"},
|
||||
expectCorrupt: false,
|
||||
},
|
||||
{
|
||||
@ -321,7 +321,7 @@ func TestCompactHashCheck(t *testing.T) {
|
||||
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
|
||||
},
|
||||
},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(43)"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberID()", "TriggerCorruptAlarm(43)"},
|
||||
expectCorrupt: true,
|
||||
},
|
||||
{
|
||||
@ -335,7 +335,7 @@ func TestCompactHashCheck(t *testing.T) {
|
||||
{peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
|
||||
},
|
||||
},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(44)"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberID()", "TriggerCorruptAlarm(44)"},
|
||||
expectCorrupt: true,
|
||||
},
|
||||
{
|
||||
@ -347,7 +347,7 @@ func TestCompactHashCheck(t *testing.T) {
|
||||
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
|
||||
},
|
||||
},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(0)"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberID()", "TriggerCorruptAlarm(0)"},
|
||||
expectCorrupt: true,
|
||||
},
|
||||
{
|
||||
@ -363,7 +363,7 @@ func TestCompactHashCheck(t *testing.T) {
|
||||
{peerInfo: peerInfo{id: 47}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
|
||||
},
|
||||
},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(0)"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberID()", "TriggerCorruptAlarm(0)"},
|
||||
expectCorrupt: true,
|
||||
},
|
||||
{
|
||||
@ -377,7 +377,7 @@ func TestCompactHashCheck(t *testing.T) {
|
||||
{peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
|
||||
},
|
||||
},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(44)"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberID()", "TriggerCorruptAlarm(44)"},
|
||||
expectCorrupt: true,
|
||||
},
|
||||
{
|
||||
@ -389,7 +389,7 @@ func TestCompactHashCheck(t *testing.T) {
|
||||
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
|
||||
},
|
||||
},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(1)"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberID()", "TriggerCorruptAlarm(1)"},
|
||||
expectCorrupt: true,
|
||||
},
|
||||
{
|
||||
@ -398,7 +398,7 @@ func TestCompactHashCheck(t *testing.T) {
|
||||
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 1}},
|
||||
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}},
|
||||
},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberID()"},
|
||||
expectLastRevisionChecked: 2,
|
||||
},
|
||||
{
|
||||
@ -410,7 +410,7 @@ func TestCompactHashCheck(t *testing.T) {
|
||||
{err: fmt.Errorf("failed getting hash")},
|
||||
},
|
||||
},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)", "MemberId()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)", "MemberID()"},
|
||||
},
|
||||
{
|
||||
name: "Cluster ID Mismatch does not fail CorruptionChecker.CompactHashCheck()",
|
||||
@ -418,7 +418,7 @@ func TestCompactHashCheck(t *testing.T) {
|
||||
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}},
|
||||
peerHashes: []*peerHashKVResp{{err: rpctypes.ErrClusterIDMismatch}},
|
||||
},
|
||||
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)", "MemberId()"},
|
||||
expectActions: []string{"MemberID()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)", "MemberID()"},
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
@ -486,8 +486,8 @@ func (f *fakeHasher) ReqTimeout() time.Duration {
|
||||
return time.Second
|
||||
}
|
||||
|
||||
func (f *fakeHasher) MemberId() types.ID {
|
||||
f.actions = append(f.actions, "MemberId()")
|
||||
func (f *fakeHasher) MemberID() types.ID {
|
||||
f.actions = append(f.actions, "MemberID()")
|
||||
return 1
|
||||
}
|
||||
|
||||
@ -501,8 +501,8 @@ func (f *fakeHasher) LinearizableReadNotify(ctx context.Context) error {
|
||||
return f.linearizableReadNotify
|
||||
}
|
||||
|
||||
func (f *fakeHasher) TriggerCorruptAlarm(memberId types.ID) {
|
||||
f.actions = append(f.actions, fmt.Sprintf("TriggerCorruptAlarm(%d)", memberId))
|
||||
func (f *fakeHasher) TriggerCorruptAlarm(memberID types.ID) {
|
||||
f.actions = append(f.actions, fmt.Sprintf("TriggerCorruptAlarm(%d)", memberID))
|
||||
f.alarmTriggered = true
|
||||
}
|
||||
|
||||
|
@ -240,7 +240,7 @@ type EtcdServer struct {
|
||||
leaderChanged *notify.Notifier
|
||||
|
||||
errorc chan error
|
||||
memberId types.ID
|
||||
memberID types.ID
|
||||
attributes membership.Attributes
|
||||
|
||||
cluster *membership.RaftCluster
|
||||
@ -324,7 +324,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
v2store: b.storage.st,
|
||||
snapshotter: b.ss,
|
||||
r: *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),
|
||||
memberId: b.cluster.nodeID,
|
||||
memberID: b.cluster.nodeID,
|
||||
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||
cluster: b.cluster.cl,
|
||||
stats: sstats,
|
||||
@ -399,7 +399,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error {
|
||||
if !srv.ensureLeadership() {
|
||||
srv.lg.Warn("Ignore the checkpoint request because current member isn't a leader",
|
||||
zap.Uint64("local-member-id", uint64(srv.MemberId())))
|
||||
zap.Uint64("local-member-id", uint64(srv.MemberID())))
|
||||
return lease.ErrNotPrimary
|
||||
}
|
||||
|
||||
@ -469,7 +469,7 @@ func (s *EtcdServer) adjustTicks() {
|
||||
ticks := s.Cfg.ElectionTicks - 1
|
||||
lg.Info(
|
||||
"started as single-node; fast-forwarding election ticks",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.Int("forward-ticks", ticks),
|
||||
zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)),
|
||||
zap.Int("election-ticks", s.Cfg.ElectionTicks),
|
||||
@ -508,7 +508,7 @@ func (s *EtcdServer) adjustTicks() {
|
||||
|
||||
lg.Info(
|
||||
"initialized peer connections; fast-forwarding election ticks",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.Int("forward-ticks", ticks),
|
||||
zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)),
|
||||
zap.Int("election-ticks", s.Cfg.ElectionTicks),
|
||||
@ -575,7 +575,7 @@ func (s *EtcdServer) start() {
|
||||
if s.ClusterVersion() != nil {
|
||||
lg.Info(
|
||||
"starting etcd server",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("local-server-version", version.Version),
|
||||
zap.String("cluster-id", s.Cluster().ID().String()),
|
||||
zap.String("cluster-version", version.Cluster(s.ClusterVersion().String())),
|
||||
@ -584,7 +584,7 @@ func (s *EtcdServer) start() {
|
||||
} else {
|
||||
lg.Info(
|
||||
"starting etcd server",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("local-server-version", version.Version),
|
||||
zap.String("cluster-version", "to_be_decided"),
|
||||
)
|
||||
@ -704,15 +704,15 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
|
||||
if s.cluster.IsIDRemoved(types.ID(m.From)) {
|
||||
lg.Warn(
|
||||
"rejected Raft message from removed member",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("removed-member-id", types.ID(m.From).String()),
|
||||
)
|
||||
return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
|
||||
}
|
||||
if s.MemberId() != types.ID(m.To) {
|
||||
if s.MemberID() != types.ID(m.To) {
|
||||
lg.Warn(
|
||||
"rejected Raft message to mismatch member",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("mismatch-member-id", types.ID(m.To).String()),
|
||||
)
|
||||
return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message to mismatch member")
|
||||
@ -862,7 +862,7 @@ func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
|
||||
lg := s.Logger()
|
||||
if !s.ensureLeadership() {
|
||||
lg.Warn("Ignore the lease revoking request because current member isn't a leader",
|
||||
zap.Uint64("local-member-id", uint64(s.MemberId())))
|
||||
zap.Uint64("local-member-id", uint64(s.MemberID())))
|
||||
return
|
||||
}
|
||||
|
||||
@ -910,11 +910,11 @@ func (s *EtcdServer) ensureLeadership() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
newLeaderId := s.raftStatus().Lead
|
||||
if newLeaderId != uint64(s.MemberId()) {
|
||||
newLeaderID := s.raftStatus().Lead
|
||||
if newLeaderID != uint64(s.MemberID()) {
|
||||
lg.Warn("Current member isn't a leader",
|
||||
zap.Uint64("local-member-id", uint64(s.MemberId())),
|
||||
zap.Uint64("new-lead", newLeaderId))
|
||||
zap.Uint64("local-member-id", uint64(s.MemberID())),
|
||||
zap.Uint64("new-lead", newLeaderID))
|
||||
return false
|
||||
}
|
||||
|
||||
@ -1099,7 +1099,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
|
||||
lg.Info("adding peers from new cluster configuration")
|
||||
|
||||
for _, m := range s.cluster.Members() {
|
||||
if m.ID == s.MemberId() {
|
||||
if m.ID == s.MemberID() {
|
||||
continue
|
||||
}
|
||||
s.r.transport.AddPeer(m.ID, m.PeerURLs)
|
||||
@ -1175,7 +1175,7 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
|
||||
lg := s.Logger()
|
||||
lg.Info(
|
||||
"triggering snapshot",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.Uint64("local-member-applied-index", ep.appliedi),
|
||||
zap.Uint64("local-member-snapshot-index", ep.snapi),
|
||||
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
|
||||
@ -1196,7 +1196,7 @@ func (s *EtcdServer) hasMultipleVotingMembers() bool {
|
||||
}
|
||||
|
||||
func (s *EtcdServer) isLeader() bool {
|
||||
return uint64(s.MemberId()) == s.Lead()
|
||||
return uint64(s.MemberID()) == s.Lead()
|
||||
}
|
||||
|
||||
// MoveLeader transfers the leader to the given transferee.
|
||||
@ -1211,7 +1211,7 @@ func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) er
|
||||
lg := s.Logger()
|
||||
lg.Info(
|
||||
"leadership transfer starting",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("current-leader-member-id", types.ID(lead).String()),
|
||||
zap.String("transferee-member-id", types.ID(transferee).String()),
|
||||
)
|
||||
@ -1228,7 +1228,7 @@ func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) er
|
||||
// TODO: drain all requests, or drop all messages to the old leader
|
||||
lg.Info(
|
||||
"leadership transfer finished",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("old-leader-member-id", types.ID(lead).String()),
|
||||
zap.String("new-leader-member-id", types.ID(transferee).String()),
|
||||
zap.Duration("took", time.Since(now)),
|
||||
@ -1242,7 +1242,7 @@ func (s *EtcdServer) TryTransferLeadershipOnShutdown() error {
|
||||
if !s.isLeader() {
|
||||
lg.Info(
|
||||
"skipped leadership transfer; local server is not leader",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("current-leader-member-id", types.ID(s.Lead()).String()),
|
||||
)
|
||||
return nil
|
||||
@ -1251,7 +1251,7 @@ func (s *EtcdServer) TryTransferLeadershipOnShutdown() error {
|
||||
if !s.hasMultipleVotingMembers() {
|
||||
lg.Info(
|
||||
"skipped leadership transfer for single voting member cluster",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("current-leader-member-id", types.ID(s.Lead()).String()),
|
||||
)
|
||||
return nil
|
||||
@ -1288,7 +1288,7 @@ func (s *EtcdServer) HardStop() {
|
||||
func (s *EtcdServer) Stop() {
|
||||
lg := s.Logger()
|
||||
if err := s.TryTransferLeadershipOnShutdown(); err != nil {
|
||||
lg.Warn("leadership transfer failed", zap.String("local-member-id", s.MemberId().String()), zap.Error(err))
|
||||
lg.Warn("leadership transfer failed", zap.String("local-member-id", s.MemberID().String()), zap.Error(err))
|
||||
}
|
||||
s.HardStop()
|
||||
}
|
||||
@ -1376,17 +1376,17 @@ func (s *EtcdServer) mayAddMember(memb membership.Member) error {
|
||||
if !memb.IsLearner && !s.cluster.IsReadyToAddVotingMember() {
|
||||
lg.Warn(
|
||||
"rejecting member add request; not enough healthy members",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
|
||||
zap.Error(errors.ErrNotEnoughStartedMembers),
|
||||
)
|
||||
return errors.ErrNotEnoughStartedMembers
|
||||
}
|
||||
|
||||
if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.MemberId(), s.cluster.VotingMembers()) {
|
||||
if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.MemberID(), s.cluster.VotingMembers()) {
|
||||
lg.Warn(
|
||||
"rejecting member add request; local member has not been connected to all peers, reconfigure breaks active quorum",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
|
||||
zap.Error(errors.ErrUnhealthy),
|
||||
)
|
||||
@ -1504,7 +1504,7 @@ func (s *EtcdServer) mayPromoteMember(id types.ID) error {
|
||||
if !s.cluster.IsReadyToPromoteMember(uint64(id)) {
|
||||
lg.Warn(
|
||||
"rejecting member promote request; not enough healthy members",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("requested-member-remove-id", id.String()),
|
||||
zap.Error(errors.ErrNotEnoughStartedMembers),
|
||||
)
|
||||
@ -1579,7 +1579,7 @@ func (s *EtcdServer) mayRemoveMember(id types.ID) error {
|
||||
if !s.cluster.IsReadyToRemoveVotingMember(uint64(id)) {
|
||||
lg.Warn(
|
||||
"rejecting member remove request; not enough healthy members",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("requested-member-remove-id", id.String()),
|
||||
zap.Error(errors.ErrNotEnoughStartedMembers),
|
||||
)
|
||||
@ -1587,17 +1587,17 @@ func (s *EtcdServer) mayRemoveMember(id types.ID) error {
|
||||
}
|
||||
|
||||
// downed member is safe to remove since it's not part of the active quorum
|
||||
if t := s.r.transport.ActiveSince(id); id != s.MemberId() && t.IsZero() {
|
||||
if t := s.r.transport.ActiveSince(id); id != s.MemberID() && t.IsZero() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// protect quorum if some members are down
|
||||
m := s.cluster.VotingMembers()
|
||||
active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.MemberId(), m)
|
||||
active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.MemberID(), m)
|
||||
if (active - 1) < 1+((len(m)-1)/2) {
|
||||
lg.Warn(
|
||||
"rejecting member remove request; local member has not been connected to all peers, reconfigure breaks active quorum",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("requested-member-remove", id.String()),
|
||||
zap.Int("active-peers", active),
|
||||
zap.Error(errors.ErrUnhealthy),
|
||||
@ -1669,7 +1669,15 @@ func (s *EtcdServer) FirstCommitInTermNotify() <-chan struct{} {
|
||||
return s.firstCommitInTerm.Receive()
|
||||
}
|
||||
|
||||
func (s *EtcdServer) MemberId() types.ID { return s.memberId }
|
||||
// MemberId returns the ID of the local member.
|
||||
// Deprecated: Please use (*EtcdServer) MemberID instead.
|
||||
//
|
||||
//revive:disable:var-naming
|
||||
func (s *EtcdServer) MemberId() types.ID { return s.MemberID() }
|
||||
|
||||
//revive:enable:var-naming
|
||||
|
||||
func (s *EtcdServer) MemberID() types.ID { return s.memberID }
|
||||
|
||||
func (s *EtcdServer) Leader() types.ID { return types.ID(s.getLead()) }
|
||||
|
||||
@ -1714,7 +1722,7 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*me
|
||||
<-resp.raftAdvanceC
|
||||
lg.Info(
|
||||
"applied a configuration change through raft",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("raft-conf-change", cc.Type.String()),
|
||||
zap.String("raft-conf-change-node-id", types.ID(cc.NodeID).String()),
|
||||
)
|
||||
@ -1736,7 +1744,7 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*me
|
||||
// or its server is stopped.
|
||||
func (s *EtcdServer) publishV3(timeout time.Duration) {
|
||||
req := &membershippb.ClusterMemberAttrSetRequest{
|
||||
Member_ID: uint64(s.MemberId()),
|
||||
Member_ID: uint64(s.MemberID()),
|
||||
MemberAttributes: &membershippb.Attributes{
|
||||
Name: s.attributes.Name,
|
||||
ClientUrls: s.attributes.ClientURLs,
|
||||
@ -1748,7 +1756,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
|
||||
case <-s.stopping:
|
||||
lg.Warn(
|
||||
"stopped publish because server is stopping",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
|
||||
zap.Duration("publish-timeout", timeout),
|
||||
)
|
||||
@ -1765,7 +1773,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
|
||||
close(s.readych)
|
||||
lg.Info(
|
||||
"published local member to cluster through raft",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
|
||||
zap.String("cluster-id", s.cluster.ID().String()),
|
||||
zap.Duration("publish-timeout", timeout),
|
||||
@ -1775,7 +1783,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
|
||||
default:
|
||||
lg.Warn(
|
||||
"failed to publish local member to cluster through raft",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
|
||||
zap.Duration("publish-timeout", timeout),
|
||||
zap.Error(err),
|
||||
@ -1789,7 +1797,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
|
||||
|
||||
lg := s.Logger()
|
||||
fields := []zap.Field{
|
||||
zap.String("from", s.MemberId().String()),
|
||||
zap.String("from", s.MemberID().String()),
|
||||
zap.String("to", types.ID(merged.To).String()),
|
||||
zap.Int64("bytes", merged.TotalSize),
|
||||
zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
|
||||
@ -1965,7 +1973,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership.
|
||||
|
||||
s.GoAttach(func() {
|
||||
a := &pb.AlarmRequest{
|
||||
MemberID: uint64(s.MemberId()),
|
||||
MemberID: uint64(s.MemberID()),
|
||||
Action: pb.AlarmRequest_ACTIVATE,
|
||||
Alarm: pb.AlarmType_NOSPACE,
|
||||
}
|
||||
@ -2065,7 +2073,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
||||
} else {
|
||||
s.cluster.AddMember(&confChangeContext.Member, shouldApplyV3)
|
||||
|
||||
if confChangeContext.Member.ID != s.MemberId() {
|
||||
if confChangeContext.Member.ID != s.MemberID() {
|
||||
s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs)
|
||||
}
|
||||
}
|
||||
@ -2073,7 +2081,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
||||
case raftpb.ConfChangeRemoveNode:
|
||||
id := types.ID(cc.NodeID)
|
||||
s.cluster.RemoveMember(id, shouldApplyV3)
|
||||
if id == s.MemberId() {
|
||||
if id == s.MemberID() {
|
||||
return true, nil
|
||||
}
|
||||
s.r.transport.RemovePeer(id)
|
||||
@ -2091,7 +2099,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
||||
)
|
||||
}
|
||||
s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, shouldApplyV3)
|
||||
if m.ID != s.MemberId() {
|
||||
if m.ID != s.MemberID() {
|
||||
s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
|
||||
}
|
||||
}
|
||||
@ -2226,7 +2234,7 @@ func (s *EtcdServer) monitorClusterVersions() {
|
||||
return
|
||||
}
|
||||
|
||||
if s.Leader() != s.MemberId() {
|
||||
if s.Leader() != s.MemberID() {
|
||||
continue
|
||||
}
|
||||
err := monitor.UpdateClusterVersionIfNeeded()
|
||||
@ -2263,7 +2271,7 @@ func (s *EtcdServer) monitorKVHash() {
|
||||
lg := s.Logger()
|
||||
lg.Info(
|
||||
"enabled corruption checking",
|
||||
zap.String("local-member-id", s.MemberId().String()),
|
||||
zap.String("local-member-id", s.MemberID().String()),
|
||||
zap.Duration("interval", t),
|
||||
)
|
||||
for {
|
||||
@ -2377,8 +2385,8 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
|
||||
switch lead {
|
||||
case types.ID(raft.None):
|
||||
// TODO: return error to specify it happens because the cluster does not have leader now
|
||||
case s.MemberId():
|
||||
if !isConnectedToQuorumSince(s.r.transport, start, s.MemberId(), s.cluster.Members()) {
|
||||
case s.MemberID():
|
||||
if !isConnectedToQuorumSince(s.r.transport, start, s.MemberID(), s.cluster.Members()) {
|
||||
return errors.ErrTimeoutDueToConnectionLost
|
||||
}
|
||||
default:
|
||||
|
@ -423,7 +423,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: lg,
|
||||
memberId: 1,
|
||||
memberID: 1,
|
||||
r: *r,
|
||||
cluster: cl,
|
||||
beHooks: serverstorage.NewBackendHooks(lg, nil),
|
||||
@ -471,7 +471,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: lg,
|
||||
memberId: 1,
|
||||
memberID: 1,
|
||||
r: *realisticRaftNode(lg, 1, nil),
|
||||
cluster: cl,
|
||||
w: wait.New(),
|
||||
@ -565,7 +565,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: lg,
|
||||
memberId: 2,
|
||||
memberID: 2,
|
||||
r: *r,
|
||||
cluster: cl,
|
||||
w: wait.New(),
|
||||
@ -933,7 +933,7 @@ func TestProcessIgnoreMismatchMessage(t *testing.T) {
|
||||
s := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: lg,
|
||||
memberId: 1,
|
||||
memberID: 1,
|
||||
r: *r,
|
||||
v2store: st,
|
||||
cluster: cl,
|
||||
@ -945,13 +945,13 @@ func TestProcessIgnoreMismatchMessage(t *testing.T) {
|
||||
// Mock a mad switch dispatching messages to wrong node.
|
||||
m := raftpb.Message{
|
||||
Type: raftpb.MsgHeartbeat,
|
||||
To: 2, // Wrong ID, s.MemberId() is 1.
|
||||
To: 2, // Wrong ID, s.MemberID() is 1.
|
||||
From: 3,
|
||||
Term: 11,
|
||||
Commit: 42, // Commit is larger than the last index 11.
|
||||
}
|
||||
if types.ID(m.To) == s.MemberId() {
|
||||
t.Fatalf("m.To (%d) is expected to mismatch s.MemberId (%d)", m.To, s.MemberId())
|
||||
if types.ID(m.To) == s.MemberID() {
|
||||
t.Fatalf("m.To (%d) is expected to mismatch s.MemberID (%d)", m.To, s.MemberID())
|
||||
}
|
||||
err := s.Process(context.Background(), m)
|
||||
if err == nil {
|
||||
@ -1076,7 +1076,7 @@ func TestPublishV3(t *testing.T) {
|
||||
lg: lg,
|
||||
readych: make(chan struct{}),
|
||||
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
|
||||
memberId: 1,
|
||||
memberID: 1,
|
||||
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
|
||||
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
|
||||
cluster: &membership.RaftCluster{},
|
||||
@ -1147,7 +1147,7 @@ func TestPublishV3Retry(t *testing.T) {
|
||||
lg: lg,
|
||||
readych: make(chan struct{}),
|
||||
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
|
||||
memberId: 1,
|
||||
memberID: 1,
|
||||
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
|
||||
w: mockwait.NewNop(),
|
||||
stopping: make(chan struct{}),
|
||||
@ -1197,7 +1197,7 @@ func TestUpdateVersionV3(t *testing.T) {
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: zaptest.NewLogger(t),
|
||||
memberId: 1,
|
||||
memberID: 1,
|
||||
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
|
||||
r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: n}),
|
||||
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
|
||||
|
@ -429,7 +429,7 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
|
||||
func (s *EtcdServer) newHeader() *pb.ResponseHeader {
|
||||
return &pb.ResponseHeader{
|
||||
ClusterId: uint64(s.cluster.ID()),
|
||||
MemberId: uint64(s.MemberId()),
|
||||
MemberId: uint64(s.MemberID()),
|
||||
Revision: s.KV().Rev(),
|
||||
RaftTerm: s.Term(),
|
||||
}
|
||||
@ -790,7 +790,7 @@ func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
|
||||
|
||||
func (s *EtcdServer) linearizableReadLoop() {
|
||||
for {
|
||||
requestId := s.reqIDGen.Next()
|
||||
requestID := s.reqIDGen.Next()
|
||||
leaderChangedNotifier := s.leaderChanged.Receive()
|
||||
select {
|
||||
case <-leaderChangedNotifier:
|
||||
@ -810,7 +810,7 @@ func (s *EtcdServer) linearizableReadLoop() {
|
||||
s.readNotifier = nextnr
|
||||
s.readMu.Unlock()
|
||||
|
||||
confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
|
||||
confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestID)
|
||||
if isStopped(err) {
|
||||
return
|
||||
}
|
||||
@ -845,8 +845,8 @@ func isStopped(err error) bool {
|
||||
return err == raft.ErrStopped || err == errors.ErrStopped
|
||||
}
|
||||
|
||||
func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) {
|
||||
err := s.sendReadIndex(requestId)
|
||||
func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestID uint64) (uint64, error) {
|
||||
err := s.sendReadIndex(requestID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -862,19 +862,19 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
|
||||
for {
|
||||
select {
|
||||
case rs := <-s.r.readStateC:
|
||||
requestIdBytes := uint64ToBigEndianBytes(requestId)
|
||||
gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes)
|
||||
requestIDBytes := uint64ToBigEndianBytes(requestID)
|
||||
gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIDBytes)
|
||||
if !gotOwnResponse {
|
||||
// a previous request might time out. now we should ignore the response of it and
|
||||
// continue waiting for the response of the current requests.
|
||||
responseId := uint64(0)
|
||||
responseID := uint64(0)
|
||||
if len(rs.RequestCtx) == 8 {
|
||||
responseId = binary.BigEndian.Uint64(rs.RequestCtx)
|
||||
responseID = binary.BigEndian.Uint64(rs.RequestCtx)
|
||||
}
|
||||
lg.Warn(
|
||||
"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
|
||||
zap.Uint64("sent-request-id", requestId),
|
||||
zap.Uint64("received-request-id", responseId),
|
||||
zap.Uint64("sent-request-id", requestID),
|
||||
zap.Uint64("received-request-id", responseID),
|
||||
)
|
||||
slowReadIndex.Inc()
|
||||
continue
|
||||
@ -887,7 +887,7 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
|
||||
case <-firstCommitInTermNotifier:
|
||||
firstCommitInTermNotifier = s.firstCommitInTerm.Receive()
|
||||
lg.Info("first commit in current term: resending ReadIndex request")
|
||||
err := s.sendReadIndex(requestId)
|
||||
err := s.sendReadIndex(requestID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -896,10 +896,10 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
|
||||
case <-retryTimer.C:
|
||||
lg.Warn(
|
||||
"waiting for ReadIndex response took too long, retrying",
|
||||
zap.Uint64("sent-request-id", requestId),
|
||||
zap.Uint64("sent-request-id", requestID),
|
||||
zap.Duration("retry-timeout", readIndexRetryTime),
|
||||
)
|
||||
err := s.sendReadIndex(requestId)
|
||||
err := s.sendReadIndex(requestID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -359,7 +359,7 @@ func (c *Cluster) RemoveMember(t testutil.TB, cc *clientv3.Client, id uint64) er
|
||||
}
|
||||
newMembers := make([]*Member, 0)
|
||||
for _, m := range c.Members {
|
||||
if uint64(m.Server.MemberId()) != id {
|
||||
if uint64(m.Server.MemberID()) != id {
|
||||
newMembers = append(newMembers, m)
|
||||
} else {
|
||||
m.Client.Close()
|
||||
@ -370,7 +370,7 @@ func (c *Cluster) RemoveMember(t testutil.TB, cc *clientv3.Client, id uint64) er
|
||||
// TODO: remove connection write timeout by selecting on http response closeNotifier
|
||||
// blocking on https://github.com/golang/go/issues/9524
|
||||
case <-time.After(time.Second + time.Duration(ElectionTicks)*framecfg.TickDuration + time.Second + rafthttp.ConnWriteTimeout):
|
||||
t.Fatalf("failed to remove member %s in time", m.Server.MemberId())
|
||||
t.Fatalf("failed to remove member %s in time", m.Server.MemberID())
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -444,7 +444,7 @@ func (c *Cluster) waitMembersForLeader(ctx context.Context, t testing.TB, membs
|
||||
possibleLead := make(map[uint64]bool)
|
||||
var lead uint64
|
||||
for _, m := range membs {
|
||||
possibleLead[uint64(m.Server.MemberId())] = true
|
||||
possibleLead[uint64(m.Server.MemberID())] = true
|
||||
}
|
||||
cc, err := c.ClusterClient(t)
|
||||
if err != nil {
|
||||
@ -478,7 +478,7 @@ func (c *Cluster) waitMembersForLeader(ctx context.Context, t testing.TB, membs
|
||||
}
|
||||
|
||||
for i, m := range membs {
|
||||
if uint64(m.Server.MemberId()) == lead {
|
||||
if uint64(m.Server.MemberID()) == lead {
|
||||
t.Logf("waitMembersForLeader found leader. Member: %v lead: %x", i, lead)
|
||||
return i
|
||||
}
|
||||
@ -888,7 +888,7 @@ func (m *Member) ElectionTimeout() time.Duration {
|
||||
return time.Duration(m.Server.Cfg.ElectionTicks*int(m.Server.Cfg.TickMs)) * time.Millisecond
|
||||
}
|
||||
|
||||
func (m *Member) ID() types.ID { return m.Server.MemberId() }
|
||||
func (m *Member) ID() types.ID { return m.Server.MemberID() }
|
||||
|
||||
// NewClientV3 creates a new grpc client connection to the member
|
||||
func NewClientV3(m *Member) (*clientv3.Client, error) {
|
||||
@ -1358,18 +1358,18 @@ func (m *Member) Metric(metricName string, expectLabels ...string) (string, erro
|
||||
// InjectPartition drops connections from m to others, vice versa.
|
||||
func (m *Member) InjectPartition(t testutil.TB, others ...*Member) {
|
||||
for _, other := range others {
|
||||
m.Server.CutPeer(other.Server.MemberId())
|
||||
other.Server.CutPeer(m.Server.MemberId())
|
||||
t.Logf("network partition injected between: %v <-> %v", m.Server.MemberId(), other.Server.MemberId())
|
||||
m.Server.CutPeer(other.Server.MemberID())
|
||||
other.Server.CutPeer(m.Server.MemberID())
|
||||
t.Logf("network partition injected between: %v <-> %v", m.Server.MemberID(), other.Server.MemberID())
|
||||
}
|
||||
}
|
||||
|
||||
// RecoverPartition recovers connections from m to others, vice versa.
|
||||
func (m *Member) RecoverPartition(t testutil.TB, others ...*Member) {
|
||||
for _, other := range others {
|
||||
m.Server.MendPeer(other.Server.MemberId())
|
||||
other.Server.MendPeer(m.Server.MemberId())
|
||||
t.Logf("network partition between: %v <-> %v", m.Server.MemberId(), other.Server.MemberId())
|
||||
m.Server.MendPeer(other.Server.MemberID())
|
||||
other.Server.MendPeer(m.Server.MemberID())
|
||||
t.Logf("network partition between: %v <-> %v", m.Server.MemberID(), other.Server.MemberID())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -108,7 +108,7 @@ func testDecreaseClusterSize(t *testing.T, size int) {
|
||||
|
||||
// TODO: remove the last but one member
|
||||
for i := 0; i < size-1; i++ {
|
||||
id := c.Members[len(c.Members)-1].Server.MemberId()
|
||||
id := c.Members[len(c.Members)-1].Server.MemberID()
|
||||
// may hit second leader election on slow machines
|
||||
if err := c.RemoveMember(t, c.Members[0].Client, uint64(id)); err != nil {
|
||||
if strings.Contains(err.Error(), "no leader") {
|
||||
@ -186,7 +186,7 @@ func TestAddMemberAfterClusterFullRotation(t *testing.T) {
|
||||
|
||||
// remove all the previous three members and add in three new members.
|
||||
for i := 0; i < 3; i++ {
|
||||
if err := c.RemoveMember(t, c.Members[0].Client, uint64(c.Members[1].Server.MemberId())); err != nil {
|
||||
if err := c.RemoveMember(t, c.Members[0].Client, uint64(c.Members[1].Server.MemberID())); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c.WaitMembersForLeader(t, c.Members)
|
||||
@ -207,7 +207,7 @@ func TestIssue2681(t *testing.T) {
|
||||
c := integration.NewCluster(t, &integration.ClusterConfig{Size: 5, DisableStrictReconfigCheck: true})
|
||||
defer c.Terminate(t)
|
||||
|
||||
if err := c.RemoveMember(t, c.Members[0].Client, uint64(c.Members[4].Server.MemberId())); err != nil {
|
||||
if err := c.RemoveMember(t, c.Members[0].Client, uint64(c.Members[4].Server.MemberID())); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c.WaitMembersForLeader(t, c.Members)
|
||||
@ -233,7 +233,7 @@ func testIssue2746(t *testing.T, members int) {
|
||||
clusterMustProgress(t, c.Members)
|
||||
}
|
||||
|
||||
if err := c.RemoveMember(t, c.Members[0].Client, uint64(c.Members[members-1].Server.MemberId())); err != nil {
|
||||
if err := c.RemoveMember(t, c.Members[0].Client, uint64(c.Members[members-1].Server.MemberID())); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c.WaitMembersForLeader(t, c.Members)
|
||||
@ -258,7 +258,7 @@ func TestIssue2904(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), integration.RequestTimeout)
|
||||
// the proposal is not committed because member 1 is stopped, but the
|
||||
// proposal is appended to leader'Server raft log.
|
||||
c.Members[0].Client.MemberRemove(ctx, uint64(c.Members[2].Server.MemberId()))
|
||||
c.Members[0].Client.MemberRemove(ctx, uint64(c.Members[2].Server.MemberID()))
|
||||
cancel()
|
||||
|
||||
// restart member, and expect it to send UpdateAttributes request.
|
||||
@ -388,7 +388,7 @@ func TestRejectUnhealthyRemove(t *testing.T) {
|
||||
leader := c.WaitLeader(t)
|
||||
|
||||
// reject remove active member since (3,2)-(1,0) => (2,2) lacks quorum
|
||||
err := c.RemoveMember(t, c.Members[leader].Client, uint64(c.Members[2].Server.MemberId()))
|
||||
err := c.RemoveMember(t, c.Members[leader].Client, uint64(c.Members[2].Server.MemberID()))
|
||||
if err == nil {
|
||||
t.Fatalf("should reject quorum breaking remove: %s", err)
|
||||
}
|
||||
@ -401,7 +401,7 @@ func TestRejectUnhealthyRemove(t *testing.T) {
|
||||
time.Sleep(time.Duration(integration.ElectionTicks * int(config.TickDuration)))
|
||||
|
||||
// permit remove dead member since (3,2) - (0,1) => (3,1) has quorum
|
||||
if err = c.RemoveMember(t, c.Members[2].Client, uint64(c.Members[0].Server.MemberId())); err != nil {
|
||||
if err = c.RemoveMember(t, c.Members[2].Client, uint64(c.Members[0].Server.MemberID())); err != nil {
|
||||
t.Fatalf("should accept removing down member: %s", err)
|
||||
}
|
||||
|
||||
@ -412,7 +412,7 @@ func TestRejectUnhealthyRemove(t *testing.T) {
|
||||
time.Sleep((3 * etcdserver.HealthInterval) / 2)
|
||||
|
||||
// accept remove member since (4,1)-(1,0) => (3,1) has quorum
|
||||
if err = c.RemoveMember(t, c.Members[1].Client, uint64(c.Members[0].Server.MemberId())); err != nil {
|
||||
if err = c.RemoveMember(t, c.Members[1].Client, uint64(c.Members[0].Server.MemberID())); err != nil {
|
||||
t.Fatalf("expected to remove member, got error %v", err)
|
||||
}
|
||||
}
|
||||
@ -436,7 +436,7 @@ func TestRestartRemoved(t *testing.T) {
|
||||
firstMember.KeepDataDirTerminate = true
|
||||
|
||||
// 3. remove first member, shut down without deleting data
|
||||
if err := c.RemoveMember(t, c.Members[1].Client, uint64(firstMember.Server.MemberId())); err != nil {
|
||||
if err := c.RemoveMember(t, c.Members[1].Client, uint64(firstMember.Server.MemberID())); err != nil {
|
||||
t.Fatalf("expected to remove member, got error %v", err)
|
||||
}
|
||||
c.WaitLeader(t)
|
||||
|
@ -96,7 +96,7 @@ func testNetworkPartition5MembersLeaderInMajority(t *testing.T) error {
|
||||
|
||||
// leader must be hold in majority
|
||||
leadIndex2 := clus.WaitMembersForLeader(t, majorityMembers)
|
||||
leadID, leadID2 := clus.Members[leadIndex].Server.MemberId(), majorityMembers[leadIndex2].Server.MemberId()
|
||||
leadID, leadID2 := clus.Members[leadIndex].Server.MemberID(), majorityMembers[leadIndex2].Server.MemberID()
|
||||
if leadID != leadID2 {
|
||||
return fmt.Errorf("unexpected leader change from %s, got %s", leadID, leadID2)
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ func testMoveLeader(t *testing.T, auto bool) {
|
||||
defer clus.Terminate(t)
|
||||
|
||||
oldLeadIdx := clus.WaitLeader(t)
|
||||
oldLeadID := uint64(clus.Members[oldLeadIdx].Server.MemberId())
|
||||
oldLeadID := uint64(clus.Members[oldLeadIdx].Server.MemberID())
|
||||
|
||||
// ensure followers go through leader transition while leadership transfer
|
||||
idc := make(chan uint64)
|
||||
@ -56,7 +56,7 @@ func testMoveLeader(t *testing.T, auto bool) {
|
||||
}
|
||||
}
|
||||
|
||||
target := uint64(clus.Members[(oldLeadIdx+1)%3].Server.MemberId())
|
||||
target := uint64(clus.Members[(oldLeadIdx+1)%3].Server.MemberID())
|
||||
if auto {
|
||||
err := clus.Members[oldLeadIdx].Server.TryTransferLeadershipOnShutdown()
|
||||
if err != nil {
|
||||
@ -108,7 +108,7 @@ func TestMoveLeaderError(t *testing.T) {
|
||||
oldLeadIdx := clus.WaitLeader(t)
|
||||
followerIdx := (oldLeadIdx + 1) % 3
|
||||
|
||||
target := uint64(clus.Members[(oldLeadIdx+2)%3].Server.MemberId())
|
||||
target := uint64(clus.Members[(oldLeadIdx+2)%3].Server.MemberID())
|
||||
|
||||
mvc := integration.ToGRPC(clus.Client(followerIdx)).Maintenance
|
||||
_, err := mvc.MoveLeader(context.TODO(), &pb.MoveLeaderRequest{TargetID: target})
|
||||
|
@ -82,7 +82,7 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
|
||||
|
||||
clus.Members[0].InjectPartition(t, clus.Members[1:]...)
|
||||
initialLead := clus.WaitMembersForLeader(t, clus.Members[1:]) + 1
|
||||
t.Logf("elected lead: %v", clus.Members[initialLead].Server.MemberId())
|
||||
t.Logf("elected lead: %v", clus.Members[initialLead].Server.MemberID())
|
||||
t.Logf("sleeping for 2 seconds")
|
||||
time.Sleep(2 * time.Second)
|
||||
t.Logf("sleeping for 2 seconds DONE")
|
||||
|
Loading…
x
Reference in New Issue
Block a user