mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: address golangci var-naming issues
Signed-off-by: Ivan Valdes <ivan@vald.es>
This commit is contained in:
parent
63e394d090
commit
c613b78e6c
@ -240,7 +240,7 @@ type CheckRegistry struct {
|
||||
func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
|
||||
reg := CheckRegistry{checkType: checkTypeLivez, checks: make(map[string]HealthCheck)}
|
||||
reg.Register("serializable_read", readCheck(server, true /* serializable */))
|
||||
reg.InstallHttpEndpoints(lg, mux)
|
||||
reg.InstallHTTPEndpoints(lg, mux)
|
||||
}
|
||||
|
||||
func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
|
||||
@ -252,7 +252,7 @@ func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHea
|
||||
reg.Register("serializable_read", readCheck(server, true))
|
||||
// linearizable_read check would be replaced by read_index check in 3.6
|
||||
reg.Register("linearizable_read", readCheck(server, false))
|
||||
reg.InstallHttpEndpoints(lg, mux)
|
||||
reg.InstallHTTPEndpoints(lg, mux)
|
||||
}
|
||||
|
||||
func (reg *CheckRegistry) Register(name string, check HealthCheck) {
|
||||
@ -263,14 +263,23 @@ func (reg *CheckRegistry) RootPath() string {
|
||||
return "/" + reg.checkType
|
||||
}
|
||||
|
||||
// InstallHttpEndpoints installs the http handlers for the health checks.
|
||||
// Deprecated: Please use (*CheckRegistry) InstallHTTPEndpoints instead.
|
||||
//
|
||||
//revive:disable:var-naming
|
||||
func (reg *CheckRegistry) InstallHttpEndpoints(lg *zap.Logger, mux *http.ServeMux) {
|
||||
//revive:enable:var-naming
|
||||
reg.InstallHTTPEndpoints(lg, mux)
|
||||
}
|
||||
|
||||
func (reg *CheckRegistry) InstallHTTPEndpoints(lg *zap.Logger, mux *http.ServeMux) {
|
||||
checkNames := make([]string, 0, len(reg.checks))
|
||||
for k := range reg.checks {
|
||||
checkNames = append(checkNames, k)
|
||||
}
|
||||
|
||||
// installs the http handler for the root path.
|
||||
reg.installRootHttpEndpoint(lg, mux, checkNames...)
|
||||
reg.installRootHTTPEndpoint(lg, mux, checkNames...)
|
||||
for _, checkName := range checkNames {
|
||||
// installs the http handler for the individual check sub path.
|
||||
subpath := path.Join(reg.RootPath(), checkName)
|
||||
@ -302,8 +311,8 @@ func (reg *CheckRegistry) runHealthChecks(ctx context.Context, checkNames ...str
|
||||
return h
|
||||
}
|
||||
|
||||
// installRootHttpEndpoint installs the http handler for the root path.
|
||||
func (reg *CheckRegistry) installRootHttpEndpoint(lg *zap.Logger, mux *http.ServeMux, checks ...string) {
|
||||
// installRootHTTPEndpoint installs the http handler for the root path.
|
||||
func (reg *CheckRegistry) installRootHTTPEndpoint(lg *zap.Logger, mux *http.ServeMux, checks ...string) {
|
||||
hfunc := func(r *http.Request) HealthStatus {
|
||||
// extracts the health check names to be excludeList from the query param
|
||||
excluded := getQuerySet(r, "exclude")
|
||||
|
@ -160,12 +160,12 @@ func TestHealthHandler(t *testing.T) {
|
||||
})
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, nil, nil)
|
||||
checkHTTPResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, nil, nil)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHttpSubPath(t *testing.T) {
|
||||
func TestHTTPSubPath(t *testing.T) {
|
||||
be, _ := betesting.NewDefaultTmpBackend(t)
|
||||
defer betesting.Close(t, be)
|
||||
tests := []healthTestCase{
|
||||
@ -198,7 +198,7 @@ func TestHttpSubPath(t *testing.T) {
|
||||
HandleHealth(logger, mux, s)
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkHTTPResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkMetrics(t, tt.healthCheckURL, "", tt.expectStatusCode)
|
||||
})
|
||||
}
|
||||
@ -253,10 +253,10 @@ func TestDataCorruptionCheck(t *testing.T) {
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
// OK before alarms are activated.
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, http.StatusOK, nil, nil)
|
||||
checkHTTPResponse(t, ts, tt.healthCheckURL, http.StatusOK, nil, nil)
|
||||
// Activate the alarms.
|
||||
s.alarms = tt.alarms
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkHTTPResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -297,7 +297,7 @@ func TestSerializableReadCheck(t *testing.T) {
|
||||
HandleHealth(logger, mux, s)
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkHTTPResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkMetrics(t, tt.healthCheckURL, "serializable_read", tt.expectStatusCode)
|
||||
})
|
||||
}
|
||||
@ -338,13 +338,13 @@ func TestLinearizableReadCheck(t *testing.T) {
|
||||
HandleHealth(logger, mux, s)
|
||||
ts := httptest.NewServer(mux)
|
||||
defer ts.Close()
|
||||
checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkHTTPResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult)
|
||||
checkMetrics(t, tt.healthCheckURL, "linearizable_read", tt.expectStatusCode)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) {
|
||||
func checkHTTPResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) {
|
||||
res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)})
|
||||
|
||||
if err != nil {
|
||||
|
@ -49,18 +49,18 @@ type Member struct {
|
||||
// NewMember creates a Member without an ID and generates one based on the
|
||||
// cluster name, peer URLs, and time. This is used for bootstrapping/adding new member.
|
||||
func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
|
||||
memberId := computeMemberId(peerURLs, clusterName, now)
|
||||
return newMember(name, peerURLs, memberId, false)
|
||||
memberID := computeMemberID(peerURLs, clusterName, now)
|
||||
return newMember(name, peerURLs, memberID, false)
|
||||
}
|
||||
|
||||
// NewMemberAsLearner creates a learner Member without an ID and generates one based on the
|
||||
// cluster name, peer URLs, and time. This is used for adding new learner member.
|
||||
func NewMemberAsLearner(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
|
||||
memberId := computeMemberId(peerURLs, clusterName, now)
|
||||
return newMember(name, peerURLs, memberId, true)
|
||||
memberID := computeMemberID(peerURLs, clusterName, now)
|
||||
return newMember(name, peerURLs, memberID, true)
|
||||
}
|
||||
|
||||
func computeMemberId(peerURLs types.URLs, clusterName string, now *time.Time) types.ID {
|
||||
func computeMemberID(peerURLs types.URLs, clusterName string, now *time.Time) types.ID {
|
||||
peerURLstrs := peerURLs.StringSlice()
|
||||
sort.Strings(peerURLstrs)
|
||||
joinedPeerUrls := strings.Join(peerURLstrs, "")
|
||||
@ -75,14 +75,14 @@ func computeMemberId(peerURLs types.URLs, clusterName string, now *time.Time) ty
|
||||
return types.ID(binary.BigEndian.Uint64(hash[:8]))
|
||||
}
|
||||
|
||||
func newMember(name string, peerURLs types.URLs, memberId types.ID, isLearner bool) *Member {
|
||||
func newMember(name string, peerURLs types.URLs, memberID types.ID, isLearner bool) *Member {
|
||||
m := &Member{
|
||||
RaftAttributes: RaftAttributes{
|
||||
PeerURLs: peerURLs.StringSlice(),
|
||||
IsLearner: isLearner,
|
||||
},
|
||||
Attributes: Attributes{Name: name},
|
||||
ID: memberId,
|
||||
ID: memberID,
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
@ -170,7 +170,7 @@ func TestLoadNewestSnap(t *testing.T) {
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
availableWalSnaps []walpb.Snapshot
|
||||
availableWALSnaps []walpb.Snapshot
|
||||
expected *raftpb.Snapshot
|
||||
}{
|
||||
{
|
||||
@ -179,17 +179,17 @@ func TestLoadNewestSnap(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "loadnewestavailable-newest",
|
||||
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}, {Index: 5, Term: 1}},
|
||||
availableWALSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}, {Index: 5, Term: 1}},
|
||||
expected: &newSnap,
|
||||
},
|
||||
{
|
||||
name: "loadnewestavailable-newest-unsorted",
|
||||
availableWalSnaps: []walpb.Snapshot{{Index: 5, Term: 1}, {Index: 1, Term: 1}, {Index: 0, Term: 0}},
|
||||
availableWALSnaps: []walpb.Snapshot{{Index: 5, Term: 1}, {Index: 1, Term: 1}, {Index: 0, Term: 0}},
|
||||
expected: &newSnap,
|
||||
},
|
||||
{
|
||||
name: "loadnewestavailable-previous",
|
||||
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}},
|
||||
availableWALSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}},
|
||||
expected: testSnap,
|
||||
},
|
||||
}
|
||||
@ -197,8 +197,8 @@ func TestLoadNewestSnap(t *testing.T) {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
var err error
|
||||
var g *raftpb.Snapshot
|
||||
if tc.availableWalSnaps != nil {
|
||||
g, err = ss.LoadNewestAvailable(tc.availableWalSnaps)
|
||||
if tc.availableWALSnaps != nil {
|
||||
g, err = ss.LoadNewestAvailable(tc.availableWALSnaps)
|
||||
} else {
|
||||
g, err = ss.Load()
|
||||
}
|
||||
|
@ -139,7 +139,7 @@ func (e Error) Error() string {
|
||||
return e.Message + " (" + e.Cause + ")"
|
||||
}
|
||||
|
||||
func (e Error) toJsonString() string {
|
||||
func (e Error) toJSONString() string {
|
||||
b, _ := json.Marshal(e)
|
||||
return string(b)
|
||||
}
|
||||
@ -156,6 +156,6 @@ func (e Error) WriteTo(w http.ResponseWriter) error {
|
||||
w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index))
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(e.StatusCode())
|
||||
_, err := w.Write([]byte(e.toJsonString() + "\n"))
|
||||
_, err := w.Write([]byte(e.toJSONString() + "\n"))
|
||||
return err
|
||||
}
|
||||
|
@ -33,8 +33,8 @@ func TestErrorWriteTo(t *testing.T) {
|
||||
}
|
||||
|
||||
gbody := strings.TrimSuffix(rr.Body.String(), "\n")
|
||||
if err.toJsonString() != gbody {
|
||||
t.Errorf("HTTP body %q, want %q", gbody, err.toJsonString())
|
||||
if err.toJSONString() != gbody {
|
||||
t.Errorf("HTTP body %q, want %q", gbody, err.toJSONString())
|
||||
}
|
||||
|
||||
wheader := http.Header(map[string][]string{
|
||||
|
@ -21,7 +21,7 @@ import (
|
||||
)
|
||||
|
||||
func TestHeapPushPop(t *testing.T) {
|
||||
h := newTtlKeyHeap()
|
||||
h := newTTLKeyHeap()
|
||||
|
||||
// add from older expire time to earlier expire time
|
||||
// the path is equal to ttl from now
|
||||
@ -45,7 +45,7 @@ func TestHeapPushPop(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHeapUpdate(t *testing.T) {
|
||||
h := newTtlKeyHeap()
|
||||
h := newTTLKeyHeap()
|
||||
|
||||
kvs := make([]*node, 10)
|
||||
|
||||
|
@ -104,7 +104,7 @@ func (s *Stats) clone() *Stats {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Stats) toJson() []byte {
|
||||
func (s *Stats) toJSON() []byte {
|
||||
b, _ := json.Marshal(s)
|
||||
return b
|
||||
}
|
||||
|
@ -99,7 +99,7 @@ func newStore(namespaces ...string) *store {
|
||||
}
|
||||
s.Stats = newStats()
|
||||
s.WatcherHub = newWatchHub(1000)
|
||||
s.ttlKeyHeap = newTtlKeyHeap()
|
||||
s.ttlKeyHeap = newTTLKeyHeap()
|
||||
s.readonlySet = types.NewUnsafeSet(append(namespaces, "/")...)
|
||||
return s
|
||||
}
|
||||
@ -781,15 +781,17 @@ func (s *store) Recovery(state []byte) error {
|
||||
return err
|
||||
}
|
||||
|
||||
s.ttlKeyHeap = newTtlKeyHeap()
|
||||
s.ttlKeyHeap = newTTLKeyHeap()
|
||||
|
||||
s.Root.recoverAndclean()
|
||||
return nil
|
||||
}
|
||||
|
||||
//revive:disable:var-naming
|
||||
func (s *store) JsonStats() []byte {
|
||||
//revive:enable:var-naming
|
||||
s.Stats.Watchers = uint64(s.WatcherHub.count)
|
||||
return s.Stats.toJson()
|
||||
return s.Stats.toJSON()
|
||||
}
|
||||
|
||||
func (s *store) HasTTLKeys() bool {
|
||||
|
@ -22,7 +22,7 @@ type ttlKeyHeap struct {
|
||||
keyMap map[*node]int
|
||||
}
|
||||
|
||||
func newTtlKeyHeap() *ttlKeyHeap {
|
||||
func newTTLKeyHeap() *ttlKeyHeap {
|
||||
h := &ttlKeyHeap{keyMap: make(map[*node]int)}
|
||||
heap.Init(h)
|
||||
return h
|
||||
|
@ -59,7 +59,7 @@ type DiscoveryConfig struct {
|
||||
type memberInfo struct {
|
||||
// peerRegKey is the key used by the member when registering in the
|
||||
// discovery service.
|
||||
// Format: "/_etcd/registry/<ClusterToken>/members/<memberId>".
|
||||
// Format: "/_etcd/registry/<ClusterToken>/members/<memberID>".
|
||||
peerRegKey string
|
||||
// peerURLsMap format: "peerName=peerURLs", i.e., "member1=http://127.0.0.1:2380".
|
||||
peerURLsMap string
|
||||
@ -88,9 +88,9 @@ func getMemberKeyPrefix(clusterToken string) string {
|
||||
return path.Join(getClusterKeyPrefix(clusterToken), "members")
|
||||
}
|
||||
|
||||
// key format for each member: "/_etcd/registry/<ClusterToken>/members/<memberId>".
|
||||
func getMemberKey(cluster, memberId string) string {
|
||||
return path.Join(getMemberKeyPrefix(cluster), memberId)
|
||||
// key format for each member: "/_etcd/registry/<ClusterToken>/members/<memberID>".
|
||||
func getMemberKey(cluster, memberID string) string {
|
||||
return path.Join(getMemberKeyPrefix(cluster), memberID)
|
||||
}
|
||||
|
||||
// GetCluster will connect to the discovery service at the given endpoints and
|
||||
@ -155,7 +155,7 @@ func JoinCluster(lg *zap.Logger, cfg *DiscoveryConfig, id types.ID, config strin
|
||||
type discovery struct {
|
||||
lg *zap.Logger
|
||||
clusterToken string
|
||||
memberId types.ID
|
||||
memberID types.ID
|
||||
c *clientv3.Client
|
||||
retries uint
|
||||
|
||||
@ -182,7 +182,7 @@ func newDiscovery(lg *zap.Logger, dcfg *DiscoveryConfig, id types.ID) (*discover
|
||||
return &discovery{
|
||||
lg: lg,
|
||||
clusterToken: dcfg.Token,
|
||||
memberId: id,
|
||||
memberID: id,
|
||||
c: c,
|
||||
cfg: dcfg,
|
||||
clock: clockwork.NewRealClock(),
|
||||
@ -317,10 +317,10 @@ func (d *discovery) checkCluster() (*clusterInfo, int, int64, error) {
|
||||
d.retries = 0
|
||||
|
||||
// find self position
|
||||
memberSelfId := getMemberKey(d.clusterToken, d.memberId.String())
|
||||
memberSelfID := getMemberKey(d.clusterToken, d.memberID.String())
|
||||
idx := 0
|
||||
for _, m := range cls.members {
|
||||
if m.peerRegKey == memberSelfId {
|
||||
if m.peerRegKey == memberSelfID {
|
||||
break
|
||||
}
|
||||
if idx >= clusterSize-1 {
|
||||
@ -341,7 +341,7 @@ func (d *discovery) registerSelfRetry(contents string) error {
|
||||
|
||||
func (d *discovery) registerSelf(contents string) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RequestTimeout)
|
||||
memberKey := getMemberKey(d.clusterToken, d.memberId.String())
|
||||
memberKey := getMemberKey(d.clusterToken, d.memberID.String())
|
||||
_, err := d.c.Put(ctx, memberKey, contents)
|
||||
cancel()
|
||||
|
||||
|
@ -326,35 +326,35 @@ func TestCheckCluster(t *testing.T) {
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
memberId types.ID
|
||||
memberID types.ID
|
||||
getSizeRetries int
|
||||
getMembersRetries int
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
name: "no retries",
|
||||
memberId: 101,
|
||||
memberID: 101,
|
||||
getSizeRetries: 0,
|
||||
getMembersRetries: 0,
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "2 retries for getClusterSize",
|
||||
memberId: 102,
|
||||
memberID: 102,
|
||||
getSizeRetries: 2,
|
||||
getMembersRetries: 0,
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "2 retries for getClusterMembers",
|
||||
memberId: 103,
|
||||
memberID: 103,
|
||||
getSizeRetries: 0,
|
||||
getMembersRetries: 2,
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "error due to cluster full",
|
||||
memberId: 104,
|
||||
memberID: 104,
|
||||
getSizeRetries: 0,
|
||||
getMembersRetries: 0,
|
||||
expectedError: ErrFullCluster,
|
||||
@ -382,7 +382,7 @@ func TestCheckCluster(t *testing.T) {
|
||||
},
|
||||
cfg: &DiscoveryConfig{},
|
||||
clusterToken: "fakeToken",
|
||||
memberId: tc.memberId,
|
||||
memberID: tc.memberID,
|
||||
clock: clockwork.NewRealClock(),
|
||||
}
|
||||
|
||||
@ -442,7 +442,7 @@ func TestRegisterSelf(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
token string
|
||||
memberId types.ID
|
||||
memberID types.ID
|
||||
expectedRegKey string
|
||||
expectedRegValue string
|
||||
retries int // when retries > 0, then return an error on Put request.
|
||||
@ -450,7 +450,7 @@ func TestRegisterSelf(t *testing.T) {
|
||||
{
|
||||
name: "no retry with token1",
|
||||
token: "token1",
|
||||
memberId: 101,
|
||||
memberID: 101,
|
||||
expectedRegKey: "/_etcd/registry/token1/members/" + types.ID(101).String(),
|
||||
expectedRegValue: "infra=http://127.0.0.1:2380",
|
||||
retries: 0,
|
||||
@ -458,7 +458,7 @@ func TestRegisterSelf(t *testing.T) {
|
||||
{
|
||||
name: "no retry with token2",
|
||||
token: "token2",
|
||||
memberId: 102,
|
||||
memberID: 102,
|
||||
expectedRegKey: "/_etcd/registry/token2/members/" + types.ID(102).String(),
|
||||
expectedRegValue: "infra=http://127.0.0.1:2380",
|
||||
retries: 0,
|
||||
@ -466,7 +466,7 @@ func TestRegisterSelf(t *testing.T) {
|
||||
{
|
||||
name: "2 retries",
|
||||
token: "token3",
|
||||
memberId: 103,
|
||||
memberID: 103,
|
||||
expectedRegKey: "/_etcd/registry/token3/members/" + types.ID(103).String(),
|
||||
expectedRegValue: "infra=http://127.0.0.1:2380",
|
||||
retries: 2,
|
||||
@ -487,7 +487,7 @@ func TestRegisterSelf(t *testing.T) {
|
||||
d := &discovery{
|
||||
lg: lg,
|
||||
clusterToken: tc.token,
|
||||
memberId: tc.memberId,
|
||||
memberID: tc.memberID,
|
||||
cfg: &DiscoveryConfig{},
|
||||
c: &clientv3.Client{
|
||||
KV: fkv,
|
||||
|
@ -121,7 +121,7 @@ const (
|
||||
rangeEnd = "rangeEnd"
|
||||
keyOutsideRange = "rangeEnd_outside"
|
||||
|
||||
LeaseId = 1
|
||||
leaseID = 1
|
||||
)
|
||||
|
||||
func mustCreateRolesAndEnableAuth(t *testing.T, authApplier *authApplierV3) {
|
||||
@ -460,7 +460,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {
|
||||
|
||||
_, err := authApplier.LeaseGrant(&pb.LeaseGrantRequest{
|
||||
TTL: lease.MaxLeaseTTL,
|
||||
ID: LeaseId,
|
||||
ID: leaseID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -469,7 +469,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {
|
||||
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(key),
|
||||
Value: []byte("1"),
|
||||
Lease: LeaseId,
|
||||
Lease: leaseID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -478,7 +478,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {
|
||||
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(keyOutsideRange),
|
||||
Value: []byte("1"),
|
||||
Lease: LeaseId,
|
||||
Lease: leaseID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -487,7 +487,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {
|
||||
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(key),
|
||||
Value: []byte("1"),
|
||||
Lease: LeaseId,
|
||||
Lease: leaseID,
|
||||
})
|
||||
require.Equal(t, err, auth.ErrPermissionDenied)
|
||||
}
|
||||
@ -684,20 +684,20 @@ func TestAuthApplierV3_LeaseRevoke(t *testing.T) {
|
||||
|
||||
_, err := authApplier.LeaseGrant(&pb.LeaseGrantRequest{
|
||||
TTL: lease.MaxLeaseTTL,
|
||||
ID: LeaseId,
|
||||
ID: leaseID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// The user should be able to revoke the lease
|
||||
setAuthInfo(authApplier, userWriteOnly)
|
||||
_, err = authApplier.LeaseRevoke(&pb.LeaseRevokeRequest{
|
||||
ID: LeaseId,
|
||||
ID: leaseID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = authApplier.LeaseGrant(&pb.LeaseGrantRequest{
|
||||
TTL: lease.MaxLeaseTTL,
|
||||
ID: LeaseId,
|
||||
ID: leaseID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -706,14 +706,14 @@ func TestAuthApplierV3_LeaseRevoke(t *testing.T) {
|
||||
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(keyOutsideRange),
|
||||
Value: []byte("1"),
|
||||
Lease: LeaseId,
|
||||
Lease: leaseID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// The user should not be able to revoke the lease anymore
|
||||
setAuthInfo(authApplier, userWriteOnly)
|
||||
_, err = authApplier.LeaseRevoke(&pb.LeaseRevokeRequest{
|
||||
ID: LeaseId,
|
||||
ID: leaseID,
|
||||
})
|
||||
require.Equal(t, err, auth.ErrPermissionDenied)
|
||||
}
|
||||
|
@ -62,14 +62,14 @@ func NewUberApplier(
|
||||
warningApplyDuration time.Duration,
|
||||
txnModeWriteWithSharedBuffer bool,
|
||||
quotaBackendBytesCfg int64) UberApplier {
|
||||
applyV3base_ := newApplierV3(lg, be, kv, alarmStore, authStore, lessor, cluster, raftStatus, snapshotServer, consistentIndex, txnModeWriteWithSharedBuffer, quotaBackendBytesCfg)
|
||||
applyV3base := newApplierV3(lg, be, kv, alarmStore, authStore, lessor, cluster, raftStatus, snapshotServer, consistentIndex, txnModeWriteWithSharedBuffer, quotaBackendBytesCfg)
|
||||
|
||||
ua := &uberApplier{
|
||||
lg: lg,
|
||||
alarmStore: alarmStore,
|
||||
warningApplyDuration: warningApplyDuration,
|
||||
applyV3: applyV3base_,
|
||||
applyV3base: applyV3base_,
|
||||
applyV3: applyV3base,
|
||||
applyV3base: applyV3base,
|
||||
}
|
||||
ua.restoreAlarms()
|
||||
return ua
|
||||
|
@ -34,7 +34,7 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/storage/schema"
|
||||
)
|
||||
|
||||
const memberId = 111195
|
||||
const memberID = 111195
|
||||
|
||||
func defaultUberApplier(t *testing.T) UberApplier {
|
||||
lg := zaptest.NewLogger(t)
|
||||
@ -44,7 +44,7 @@ func defaultUberApplier(t *testing.T) UberApplier {
|
||||
})
|
||||
|
||||
cluster := membership.NewCluster(lg)
|
||||
cluster.AddMember(&membership.Member{ID: memberId}, true)
|
||||
cluster.AddMember(&membership.Member{ID: memberID}, true)
|
||||
lessor := lease.NewLessor(lg, be, cluster, lease.LessorConfig{})
|
||||
kv := mvcc.NewStore(lg, be, lessor, mvcc.StoreConfig{})
|
||||
alarmStore, err := v3alarm.NewAlarmStore(lg, schema.NewAlarmBackend(lg, be))
|
||||
@ -125,7 +125,7 @@ func TestUberApplier_Alarm_Corrupt(t *testing.T) {
|
||||
Header: &pb.RequestHeader{},
|
||||
Alarm: &pb.AlarmRequest{
|
||||
Action: pb.AlarmRequest_ACTIVATE,
|
||||
MemberID: memberId,
|
||||
MemberID: memberID,
|
||||
Alarm: pb.AlarmType_CORRUPT,
|
||||
},
|
||||
})
|
||||
@ -224,7 +224,7 @@ func TestUberApplier_Alarm_Quota(t *testing.T) {
|
||||
Header: &pb.RequestHeader{},
|
||||
Alarm: &pb.AlarmRequest{
|
||||
Action: pb.AlarmRequest_ACTIVATE,
|
||||
MemberID: memberId,
|
||||
MemberID: memberID,
|
||||
Alarm: pb.AlarmType_NOSPACE,
|
||||
},
|
||||
})
|
||||
@ -247,7 +247,7 @@ func TestUberApplier_Alarm_Deactivate(t *testing.T) {
|
||||
Header: &pb.RequestHeader{},
|
||||
Alarm: &pb.AlarmRequest{
|
||||
Action: pb.AlarmRequest_ACTIVATE,
|
||||
MemberID: memberId,
|
||||
MemberID: memberID,
|
||||
Alarm: pb.AlarmType_NOSPACE,
|
||||
},
|
||||
})
|
||||
@ -262,7 +262,7 @@ func TestUberApplier_Alarm_Deactivate(t *testing.T) {
|
||||
Header: &pb.RequestHeader{},
|
||||
Alarm: &pb.AlarmRequest{
|
||||
Action: pb.AlarmRequest_DEACTIVATE,
|
||||
MemberID: memberId,
|
||||
MemberID: memberID,
|
||||
Alarm: pb.AlarmType_NOSPACE,
|
||||
},
|
||||
})
|
||||
|
@ -300,8 +300,8 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R
|
||||
}
|
||||
// TODO: refactor member http handler code
|
||||
// cannot import etcdhttp, so manually construct url
|
||||
requestUrl := url + "/members/promote/" + fmt.Sprintf("%d", id)
|
||||
req, err := http.NewRequest(http.MethodPost, requestUrl, nil)
|
||||
requestURL := url + "/members/promote/" + fmt.Sprintf("%d", id)
|
||||
req, err := http.NewRequest(http.MethodPost, requestURL, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -300,8 +300,8 @@ func (cm *corruptionChecker) CompactHashCheck() {
|
||||
// true: successfully checked hash on whole cluster or raised alarms, so no need to check next hash
|
||||
// false: skipped some members, so need to check next hash
|
||||
func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers []*peerHashKVResp) bool {
|
||||
leaderId := cm.hasher.MemberId()
|
||||
hash2members := map[uint32]types.IDSlice{leaderHash.Hash: {leaderId}}
|
||||
leaderID := cm.hasher.MemberId()
|
||||
hash2members := map[uint32]types.IDSlice{leaderHash.Hash: {leaderID}}
|
||||
|
||||
peersChecked := 0
|
||||
// group all peers by hash
|
||||
@ -319,7 +319,7 @@ func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers
|
||||
}
|
||||
if skipped {
|
||||
cm.lg.Warn("Skipped peer's hash", zap.Int("number-of-peers", len(peers)),
|
||||
zap.String("leader-id", leaderId.String()),
|
||||
zap.String("leader-id", leaderID.String()),
|
||||
zap.String("peer-id", peer.id.String()),
|
||||
zap.String("reason", reason))
|
||||
continue
|
||||
@ -358,7 +358,7 @@ func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers
|
||||
// corrupted. In such situation, we intentionally set the memberID
|
||||
// as 0, it means it affects the whole cluster.
|
||||
cm.lg.Error("Detected compaction hash mismatch but cannot identify the corrupted members, so intentionally set the memberID as 0",
|
||||
zap.String("leader-id", leaderId.String()),
|
||||
zap.String("leader-id", leaderID.String()),
|
||||
zap.Int64("leader-revision", leaderHash.Revision),
|
||||
zap.Int64("leader-compact-revision", leaderHash.CompactRevision),
|
||||
zap.Uint32("leader-hash", leaderHash.Hash),
|
||||
@ -376,7 +376,7 @@ func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers
|
||||
}
|
||||
|
||||
cm.lg.Error("Detected compaction hash mismatch",
|
||||
zap.String("leader-id", leaderId.String()),
|
||||
zap.String("leader-id", leaderID.String()),
|
||||
zap.Int64("leader-revision", leaderHash.Revision),
|
||||
zap.Int64("leader-compact-revision", leaderHash.CompactRevision),
|
||||
zap.Uint32("leader-hash", leaderHash.Hash),
|
||||
@ -582,8 +582,8 @@ func HashByRev(ctx context.Context, cid types.ID, cc *http.Client, url string, r
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
requestUrl := url + PeerHashKVPath
|
||||
req, err := http.NewRequest(http.MethodGet, requestUrl, bytes.NewReader(hashReqBytes))
|
||||
requestURL := url + PeerHashKVPath
|
||||
req, err := http.NewRequest(http.MethodGet, requestURL, bytes.NewReader(hashReqBytes))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -501,8 +501,8 @@ func (f *fakeHasher) LinearizableReadNotify(ctx context.Context) error {
|
||||
return f.linearizableReadNotify
|
||||
}
|
||||
|
||||
func (f *fakeHasher) TriggerCorruptAlarm(memberId types.ID) {
|
||||
f.actions = append(f.actions, fmt.Sprintf("TriggerCorruptAlarm(%d)", memberId))
|
||||
func (f *fakeHasher) TriggerCorruptAlarm(memberID types.ID) {
|
||||
f.actions = append(f.actions, fmt.Sprintf("TriggerCorruptAlarm(%d)", memberID))
|
||||
f.alarmTriggered = true
|
||||
}
|
||||
|
||||
|
@ -240,7 +240,7 @@ type EtcdServer struct {
|
||||
leaderChanged *notify.Notifier
|
||||
|
||||
errorc chan error
|
||||
memberId types.ID
|
||||
memberID types.ID
|
||||
attributes membership.Attributes
|
||||
|
||||
cluster *membership.RaftCluster
|
||||
@ -324,7 +324,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
v2store: b.storage.st,
|
||||
snapshotter: b.ss,
|
||||
r: *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),
|
||||
memberId: b.cluster.nodeID,
|
||||
memberID: b.cluster.nodeID,
|
||||
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||
cluster: b.cluster.cl,
|
||||
stats: sstats,
|
||||
@ -910,11 +910,11 @@ func (s *EtcdServer) ensureLeadership() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
newLeaderId := s.raftStatus().Lead
|
||||
if newLeaderId != uint64(s.MemberId()) {
|
||||
newLeaderID := s.raftStatus().Lead
|
||||
if newLeaderID != uint64(s.MemberId()) {
|
||||
lg.Warn("Current member isn't a leader",
|
||||
zap.Uint64("local-member-id", uint64(s.MemberId())),
|
||||
zap.Uint64("new-lead", newLeaderId))
|
||||
zap.Uint64("new-lead", newLeaderID))
|
||||
return false
|
||||
}
|
||||
|
||||
@ -1669,7 +1669,7 @@ func (s *EtcdServer) FirstCommitInTermNotify() <-chan struct{} {
|
||||
return s.firstCommitInTerm.Receive()
|
||||
}
|
||||
|
||||
func (s *EtcdServer) MemberId() types.ID { return s.memberId }
|
||||
func (s *EtcdServer) MemberId() types.ID { return s.memberID }
|
||||
|
||||
func (s *EtcdServer) Leader() types.ID { return types.ID(s.getLead()) }
|
||||
|
||||
|
@ -423,7 +423,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: lg,
|
||||
memberId: 1,
|
||||
memberID: 1,
|
||||
r: *r,
|
||||
cluster: cl,
|
||||
beHooks: serverstorage.NewBackendHooks(lg, nil),
|
||||
@ -471,7 +471,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: lg,
|
||||
memberId: 1,
|
||||
memberID: 1,
|
||||
r: *realisticRaftNode(lg, 1, nil),
|
||||
cluster: cl,
|
||||
w: wait.New(),
|
||||
@ -565,7 +565,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: lg,
|
||||
memberId: 2,
|
||||
memberID: 2,
|
||||
r: *r,
|
||||
cluster: cl,
|
||||
w: wait.New(),
|
||||
@ -933,7 +933,7 @@ func TestProcessIgnoreMismatchMessage(t *testing.T) {
|
||||
s := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: lg,
|
||||
memberId: 1,
|
||||
memberID: 1,
|
||||
r: *r,
|
||||
v2store: st,
|
||||
cluster: cl,
|
||||
@ -1076,7 +1076,7 @@ func TestPublishV3(t *testing.T) {
|
||||
lg: lg,
|
||||
readych: make(chan struct{}),
|
||||
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
|
||||
memberId: 1,
|
||||
memberID: 1,
|
||||
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
|
||||
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
|
||||
cluster: &membership.RaftCluster{},
|
||||
@ -1147,7 +1147,7 @@ func TestPublishV3Retry(t *testing.T) {
|
||||
lg: lg,
|
||||
readych: make(chan struct{}),
|
||||
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
|
||||
memberId: 1,
|
||||
memberID: 1,
|
||||
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
|
||||
w: mockwait.NewNop(),
|
||||
stopping: make(chan struct{}),
|
||||
@ -1197,7 +1197,7 @@ func TestUpdateVersionV3(t *testing.T) {
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: zaptest.NewLogger(t),
|
||||
memberId: 1,
|
||||
memberID: 1,
|
||||
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
|
||||
r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: n}),
|
||||
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
|
||||
|
@ -790,7 +790,7 @@ func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
|
||||
|
||||
func (s *EtcdServer) linearizableReadLoop() {
|
||||
for {
|
||||
requestId := s.reqIDGen.Next()
|
||||
requestID := s.reqIDGen.Next()
|
||||
leaderChangedNotifier := s.leaderChanged.Receive()
|
||||
select {
|
||||
case <-leaderChangedNotifier:
|
||||
@ -810,7 +810,7 @@ func (s *EtcdServer) linearizableReadLoop() {
|
||||
s.readNotifier = nextnr
|
||||
s.readMu.Unlock()
|
||||
|
||||
confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
|
||||
confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestID)
|
||||
if isStopped(err) {
|
||||
return
|
||||
}
|
||||
@ -845,8 +845,8 @@ func isStopped(err error) bool {
|
||||
return err == raft.ErrStopped || err == errors.ErrStopped
|
||||
}
|
||||
|
||||
func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) {
|
||||
err := s.sendReadIndex(requestId)
|
||||
func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestID uint64) (uint64, error) {
|
||||
err := s.sendReadIndex(requestID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -862,19 +862,19 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
|
||||
for {
|
||||
select {
|
||||
case rs := <-s.r.readStateC:
|
||||
requestIdBytes := uint64ToBigEndianBytes(requestId)
|
||||
gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes)
|
||||
requestIDBytes := uint64ToBigEndianBytes(requestID)
|
||||
gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIDBytes)
|
||||
if !gotOwnResponse {
|
||||
// a previous request might time out. now we should ignore the response of it and
|
||||
// continue waiting for the response of the current requests.
|
||||
responseId := uint64(0)
|
||||
responseID := uint64(0)
|
||||
if len(rs.RequestCtx) == 8 {
|
||||
responseId = binary.BigEndian.Uint64(rs.RequestCtx)
|
||||
responseID = binary.BigEndian.Uint64(rs.RequestCtx)
|
||||
}
|
||||
lg.Warn(
|
||||
"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
|
||||
zap.Uint64("sent-request-id", requestId),
|
||||
zap.Uint64("received-request-id", responseId),
|
||||
zap.Uint64("sent-request-id", requestID),
|
||||
zap.Uint64("received-request-id", responseID),
|
||||
)
|
||||
slowReadIndex.Inc()
|
||||
continue
|
||||
@ -887,7 +887,7 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
|
||||
case <-firstCommitInTermNotifier:
|
||||
firstCommitInTermNotifier = s.firstCommitInTerm.Receive()
|
||||
lg.Info("first commit in current term: resending ReadIndex request")
|
||||
err := s.sendReadIndex(requestId)
|
||||
err := s.sendReadIndex(requestID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -896,10 +896,10 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
|
||||
case <-retryTimer.C:
|
||||
lg.Warn(
|
||||
"waiting for ReadIndex response took too long, retrying",
|
||||
zap.Uint64("sent-request-id", requestId),
|
||||
zap.Uint64("sent-request-id", requestID),
|
||||
zap.Duration("retry-timeout", readIndexRetryTime),
|
||||
)
|
||||
err := s.sendReadIndex(requestId)
|
||||
err := s.sendReadIndex(requestID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user