mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: fix learner metric incorrect issue
Signed-off-by: YaoC <chengyao09@hotmail.com>
This commit is contained in:
parent
a2eb17c809
commit
f7ab7adf29
@ -284,6 +284,10 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
|
|||||||
onSet(c.lg, c.version)
|
onSet(c.lg, c.version)
|
||||||
|
|
||||||
for _, m := range c.members {
|
for _, m := range c.members {
|
||||||
|
if c.localID == m.ID {
|
||||||
|
setIsLearnerMetric(m)
|
||||||
|
}
|
||||||
|
|
||||||
c.lg.Info(
|
c.lg.Info(
|
||||||
"recovered/added member from store",
|
"recovered/added member from store",
|
||||||
zap.String("cluster-id", c.cid.String()),
|
zap.String("cluster-id", c.cid.String()),
|
||||||
@ -393,6 +397,11 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
|
|||||||
if c.v2store != nil {
|
if c.v2store != nil {
|
||||||
mustSaveMemberToStore(c.lg, c.v2store, m)
|
mustSaveMemberToStore(c.lg, c.v2store, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if m.ID == c.localID {
|
||||||
|
setIsLearnerMetric(m)
|
||||||
|
}
|
||||||
|
|
||||||
if c.be != nil && shouldApplyV3 {
|
if c.be != nil && shouldApplyV3 {
|
||||||
c.be.MustSaveMemberToBackend(m)
|
c.be.MustSaveMemberToBackend(m)
|
||||||
|
|
||||||
@ -505,6 +514,10 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
|
|||||||
mustUpdateMemberInStore(c.lg, c.v2store, &m)
|
mustUpdateMemberInStore(c.lg, c.v2store, &m)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if id == c.localID {
|
||||||
|
isLearner.Set(0)
|
||||||
|
}
|
||||||
|
|
||||||
if c.be != nil && shouldApplyV3 {
|
if c.be != nil && shouldApplyV3 {
|
||||||
c.members[id].RaftAttributes.IsLearner = false
|
c.members[id].RaftAttributes.IsLearner = false
|
||||||
c.updateMembershipMetric(id, true)
|
c.updateMembershipMetric(id, true)
|
||||||
|
@ -32,9 +32,24 @@ var (
|
|||||||
},
|
},
|
||||||
[]string{"Local", "Remote"},
|
[]string{"Local", "Remote"},
|
||||||
)
|
)
|
||||||
|
isLearner = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "is_learner",
|
||||||
|
Help: "Whether or not this member is a learner. 1 if is, 0 otherwise.",
|
||||||
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func setIsLearnerMetric(m *Member) {
|
||||||
|
if m.IsLearner {
|
||||||
|
isLearner.Set(1)
|
||||||
|
} else {
|
||||||
|
isLearner.Set(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
prometheus.MustRegister(ClusterVersionMetrics)
|
prometheus.MustRegister(ClusterVersionMetrics)
|
||||||
prometheus.MustRegister(knownPeers)
|
prometheus.MustRegister(knownPeers)
|
||||||
|
prometheus.MustRegister(isLearner)
|
||||||
}
|
}
|
||||||
|
@ -44,12 +44,6 @@ var (
|
|||||||
Name: "leader_changes_seen_total",
|
Name: "leader_changes_seen_total",
|
||||||
Help: "The number of leader changes seen.",
|
Help: "The number of leader changes seen.",
|
||||||
})
|
})
|
||||||
isLearner = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
||||||
Namespace: "etcd",
|
|
||||||
Subsystem: "server",
|
|
||||||
Name: "is_learner",
|
|
||||||
Help: "Whether or not this member is a learner. 1 if is, 0 otherwise.",
|
|
||||||
})
|
|
||||||
learnerPromoteFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
|
learnerPromoteFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
Namespace: "etcd",
|
Namespace: "etcd",
|
||||||
Subsystem: "server",
|
Subsystem: "server",
|
||||||
@ -171,7 +165,6 @@ func init() {
|
|||||||
prometheus.MustRegister(currentVersion)
|
prometheus.MustRegister(currentVersion)
|
||||||
prometheus.MustRegister(currentGoVersion)
|
prometheus.MustRegister(currentGoVersion)
|
||||||
prometheus.MustRegister(serverID)
|
prometheus.MustRegister(serverID)
|
||||||
prometheus.MustRegister(isLearner)
|
|
||||||
prometheus.MustRegister(learnerPromoteSucceed)
|
prometheus.MustRegister(learnerPromoteSucceed)
|
||||||
prometheus.MustRegister(learnerPromoteFailed)
|
prometheus.MustRegister(learnerPromoteFailed)
|
||||||
prometheus.MustRegister(fdUsed)
|
prometheus.MustRegister(fdUsed)
|
||||||
|
@ -2060,15 +2060,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the isLearner metric when this server id is equal to the id in raft member confChange
|
|
||||||
if confChangeContext.Member.ID == s.MemberId() {
|
|
||||||
if cc.Type == raftpb.ConfChangeAddLearnerNode {
|
|
||||||
isLearner.Set(1)
|
|
||||||
} else {
|
|
||||||
isLearner.Set(0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
case raftpb.ConfChangeRemoveNode:
|
case raftpb.ConfChangeRemoveNode:
|
||||||
id := types.ID(cc.NodeID)
|
id := types.ID(cc.NodeID)
|
||||||
s.cluster.RemoveMember(id, shouldApplyV3)
|
s.cluster.RemoveMember(id, shouldApplyV3)
|
||||||
|
@ -15,11 +15,14 @@
|
|||||||
package e2e
|
package e2e
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"go.etcd.io/etcd/api/v3/version"
|
"go.etcd.io/etcd/api/v3/version"
|
||||||
"go.etcd.io/etcd/pkg/v3/expect"
|
"go.etcd.io/etcd/pkg/v3/expect"
|
||||||
|
"go.etcd.io/etcd/tests/v3/framework/config"
|
||||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -37,6 +40,18 @@ func TestV3MetricsInsecure(t *testing.T) {
|
|||||||
testCtl(t, metricsTest)
|
testCtl(t, metricsTest)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestV3LearnerMetricRecover(t *testing.T) {
|
||||||
|
cfg := e2e.NewConfigTLS()
|
||||||
|
cfg.ServerConfig.SnapshotCount = 10
|
||||||
|
testCtl(t, learnerMetricRecoverTest, withCfg(*cfg))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestV3LearnerMetricApplyFromSnapshotTest(t *testing.T) {
|
||||||
|
cfg := e2e.NewConfigTLS()
|
||||||
|
cfg.ServerConfig.SnapshotCount = 10
|
||||||
|
testCtl(t, learnerMetricApplyFromSnapshotTest, withCfg(*cfg))
|
||||||
|
}
|
||||||
|
|
||||||
func metricsTest(cx ctlCtx) {
|
func metricsTest(cx ctlCtx) {
|
||||||
if err := ctlV3Put(cx, "k", "v", ""); err != nil {
|
if err := ctlV3Put(cx, "k", "v", ""); err != nil {
|
||||||
cx.t.Fatal(err)
|
cx.t.Fatal(err)
|
||||||
@ -69,3 +84,64 @@ func metricsTest(cx ctlCtx) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func learnerMetricRecoverTest(cx ctlCtx) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if _, err := cx.epc.StartNewProc(ctx, nil, cx.t, true /* addAsLearner */); err != nil {
|
||||||
|
cx.t.Fatal(err)
|
||||||
|
}
|
||||||
|
expectLearnerMetrics(cx)
|
||||||
|
|
||||||
|
triggerSnapshot(ctx, cx)
|
||||||
|
|
||||||
|
// Restart cluster
|
||||||
|
if err := cx.epc.Restart(ctx); err != nil {
|
||||||
|
cx.t.Fatal(err)
|
||||||
|
}
|
||||||
|
expectLearnerMetrics(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func learnerMetricApplyFromSnapshotTest(cx ctlCtx) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Add learner but do not start it
|
||||||
|
_, learnerCfg, err := cx.epc.AddMember(ctx, nil, cx.t, true /* addAsLearner */)
|
||||||
|
if err != nil {
|
||||||
|
cx.t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
triggerSnapshot(ctx, cx)
|
||||||
|
|
||||||
|
// Start the learner
|
||||||
|
if err = cx.epc.StartNewProcFromConfig(ctx, cx.t, learnerCfg); err != nil {
|
||||||
|
cx.t.Fatal(err)
|
||||||
|
}
|
||||||
|
expectLearnerMetrics(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func triggerSnapshot(ctx context.Context, cx ctlCtx) {
|
||||||
|
etcdctl := cx.epc.Procs[0].Etcdctl()
|
||||||
|
for i := 0; i < int(cx.epc.Cfg.ServerConfig.SnapshotCount); i++ {
|
||||||
|
if err := etcdctl.Put(ctx, "k", "v", config.PutOptions{}); err != nil {
|
||||||
|
cx.t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func expectLearnerMetrics(cx ctlCtx) {
|
||||||
|
expectLearnerMetric(cx, 0, "etcd_server_is_learner 0")
|
||||||
|
expectLearnerMetric(cx, 1, "etcd_server_is_learner 1")
|
||||||
|
}
|
||||||
|
|
||||||
|
func expectLearnerMetric(cx ctlCtx, procIdx int, expectMetric string) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
args := e2e.CURLPrefixArgsCluster(cx.epc.Cfg, cx.epc.Procs[procIdx], "GET", e2e.CURLReq{Endpoint: "/metrics"})
|
||||||
|
if err := e2e.SpawnWithExpectsContext(ctx, args, nil, expect.ExpectedResponse{Value: expectMetric}); err != nil {
|
||||||
|
cx.t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -780,7 +780,21 @@ func (epc *EtcdProcessCluster) CloseProc(ctx context.Context, finder func(EtcdPr
|
|||||||
// Phase 1 - Inform cluster of new configuration
|
// Phase 1 - Inform cluster of new configuration
|
||||||
// Phase 2 - Start new member
|
// Phase 2 - Start new member
|
||||||
func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProcessClusterConfig, tb testing.TB, addAsLearner bool, opts ...config.ClientOption) (memberID uint64, err error) {
|
func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProcessClusterConfig, tb testing.TB, addAsLearner bool, opts ...config.ClientOption) (memberID uint64, err error) {
|
||||||
var serverCfg *EtcdServerProcessConfig
|
memberID, serverCfg, err := epc.AddMember(ctx, cfg, tb, addAsLearner, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then start process
|
||||||
|
if err = epc.StartNewProcFromConfig(ctx, tb, serverCfg); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return memberID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddMember adds a new member to the cluster without starting it.
|
||||||
|
func (epc *EtcdProcessCluster) AddMember(ctx context.Context, cfg *EtcdProcessClusterConfig, tb testing.TB, addAsLearner bool, opts ...config.ClientOption) (memberID uint64, serverCfg *EtcdServerProcessConfig, err error) {
|
||||||
if cfg != nil {
|
if cfg != nil {
|
||||||
serverCfg = cfg.EtcdServerProcessConfig(tb, epc.nextSeq)
|
serverCfg = cfg.EtcdServerProcessConfig(tb, epc.nextSeq)
|
||||||
} else {
|
} else {
|
||||||
@ -808,20 +822,24 @@ func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProces
|
|||||||
resp, err = memberCtl.MemberAdd(ctx, serverCfg.Name, []string{serverCfg.PeerURL.String()})
|
resp, err = memberCtl.MemberAdd(ctx, serverCfg.Name, []string{serverCfg.PeerURL.String()})
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("failed to add new member: %w", err)
|
return 0, nil, fmt.Errorf("failed to add new member: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then start process
|
return resp.Member.ID, serverCfg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartNewProcFromConfig starts a new member process from the given config.
|
||||||
|
func (epc *EtcdProcessCluster) StartNewProcFromConfig(ctx context.Context, tb testing.TB, serverCfg *EtcdServerProcessConfig) error {
|
||||||
tb.Log("start new member")
|
tb.Log("start new member")
|
||||||
proc, err := NewEtcdProcess(tb, serverCfg)
|
proc, err := NewEtcdProcess(tb, serverCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
epc.Close()
|
epc.Close()
|
||||||
return 0, fmt.Errorf("cannot configure: %v", err)
|
return fmt.Errorf("cannot configure: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
epc.Procs = append(epc.Procs, proc)
|
epc.Procs = append(epc.Procs, proc)
|
||||||
|
|
||||||
return resp.Member.ID, proc.Start(ctx)
|
return proc.Start(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateProcOptions updates the options for a specific process. If no opt is set, then the config is identical
|
// UpdateProcOptions updates the options for a specific process. If no opt is set, then the config is identical
|
||||||
|
Loading…
x
Reference in New Issue
Block a user