diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index e177a6302..d153cbb65 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -275,6 +275,10 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { onSet(c.lg, c.version) for _, m := range c.members { + if c.localID == m.ID { + setIsLearnerMetric(m) + } + c.lg.Info( "recovered/added member from store", zap.String("cluster-id", c.cid.String()), @@ -397,6 +401,11 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) { } } } + + if m.ID == c.localID { + setIsLearnerMetric(m) + } + if c.be != nil && shouldApplyV3 { beErr = unsafeSaveMemberToBackend(c.lg, c.be, m) if beErr != nil && !errors.Is(beErr, errMemberAlreadyExist) { @@ -528,6 +537,11 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) { if c.v2store != nil { mustUpdateMemberInStore(c.lg, c.v2store, c.members[id]) } + + if id == c.localID { + isLearner.Set(0) + } + if c.be != nil && shouldApplyV3 { unsafeSaveMemberToBackend(c.lg, c.be, c.members[id]) } diff --git a/server/etcdserver/api/membership/metrics.go b/server/etcdserver/api/membership/metrics.go index b3212bc80..ec3693dac 100644 --- a/server/etcdserver/api/membership/metrics.go +++ b/server/etcdserver/api/membership/metrics.go @@ -24,8 +24,23 @@ var ( Help: "Which version is running. 1 for 'cluster_version' label with current cluster version", }, []string{"cluster_version"}) + isLearner = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "is_learner", + Help: "Whether or not this member is a learner. 1 if is, 0 otherwise.", + }) ) +func setIsLearnerMetric(m *Member) { + if m.IsLearner { + isLearner.Set(1) + } else { + isLearner.Set(0) + } +} + func init() { prometheus.MustRegister(ClusterVersionMetrics) + prometheus.MustRegister(isLearner) } diff --git a/server/etcdserver/metrics.go b/server/etcdserver/metrics.go index 06263a9cd..3ee7aaee9 100644 --- a/server/etcdserver/metrics.go +++ b/server/etcdserver/metrics.go @@ -44,12 +44,6 @@ var ( Name: "leader_changes_seen_total", Help: "The number of leader changes seen.", }) - isLearner = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "etcd", - Subsystem: "server", - Name: "is_learner", - Help: "Whether or not this member is a learner. 1 if is, 0 otherwise.", - }) learnerPromoteFailed = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "etcd", Subsystem: "server", @@ -195,7 +189,6 @@ func init() { prometheus.MustRegister(currentVersion) prometheus.MustRegister(currentGoVersion) prometheus.MustRegister(serverID) - prometheus.MustRegister(isLearner) prometheus.MustRegister(learnerPromoteSucceed) prometheus.MustRegister(learnerPromoteFailed) prometheus.MustRegister(fdUsed) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 2655d47d6..025fbace3 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2413,15 +2413,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con } } - // update the isLearner metric when this server id is equal to the id in raft member confChange - if confChangeContext.Member.ID == s.id { - if cc.Type == raftpb.ConfChangeAddLearnerNode { - isLearner.Set(1) - } else { - isLearner.Set(0) - } - } - case raftpb.ConfChangeRemoveNode: id := types.ID(cc.NodeID) s.cluster.RemoveMember(id, shouldApplyV3) diff --git a/tests/e2e/metrics_test.go b/tests/e2e/metrics_test.go index e0628fe88..fdb50858f 100644 --- a/tests/e2e/metrics_test.go +++ b/tests/e2e/metrics_test.go @@ -15,10 +15,14 @@ package e2e import ( + "context" "fmt" "testing" + "time" "go.etcd.io/etcd/api/v3/version" + "go.etcd.io/etcd/pkg/v3/expect" + "go.etcd.io/etcd/tests/v3/framework/config" "go.etcd.io/etcd/tests/v3/framework/e2e" ) @@ -36,6 +40,18 @@ func TestV3MetricsInsecure(t *testing.T) { testCtl(t, metricsTest) } +func TestV3LearnerMetricRecover(t *testing.T) { + cfg := e2e.NewConfigTLS() + cfg.SnapshotCount = 10 + testCtl(t, learnerMetricRecoverTest, withCfg(*cfg)) +} + +func TestV3LearnerMetricApplyFromSnapshotTest(t *testing.T) { + cfg := e2e.NewConfigTLS() + cfg.SnapshotCount = 10 + testCtl(t, learnerMetricApplyFromSnapshotTest, withCfg(*cfg)) +} + func metricsTest(cx ctlCtx) { if err := ctlV3Put(cx, "k", "v", ""); err != nil { cx.t.Fatal(err) @@ -68,3 +84,64 @@ func metricsTest(cx ctlCtx) { } } } + +func learnerMetricRecoverTest(cx ctlCtx) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if _, err := cx.epc.StartNewProc(ctx, nil, cx.t, true /* addAsLearner */); err != nil { + cx.t.Fatal(err) + } + expectLearnerMetrics(cx) + + triggerSnapshot(ctx, cx) + + // Restart cluster + if err := cx.epc.Restart(ctx); err != nil { + cx.t.Fatal(err) + } + expectLearnerMetrics(cx) +} + +func learnerMetricApplyFromSnapshotTest(cx ctlCtx) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Add learner but do not start it + _, learnerCfg, err := cx.epc.AddMember(ctx, nil, cx.t, true /* addAsLearner */) + if err != nil { + cx.t.Fatal(err) + } + + triggerSnapshot(ctx, cx) + + // Start the learner + if err = cx.epc.StartNewProcFromConfig(ctx, cx.t, learnerCfg); err != nil { + cx.t.Fatal(err) + } + expectLearnerMetrics(cx) +} + +func triggerSnapshot(ctx context.Context, cx ctlCtx) { + etcdctl := cx.epc.Procs[0].Etcdctl() + for i := 0; i < int(cx.epc.Cfg.SnapshotCount); i++ { + if err := etcdctl.Put(ctx, "k", "v", config.PutOptions{}); err != nil { + cx.t.Fatal(err) + } + } +} + +func expectLearnerMetrics(cx ctlCtx) { + expectLearnerMetric(cx, 0, "etcd_server_is_learner 0") + expectLearnerMetric(cx, 1, "etcd_server_is_learner 1") +} + +func expectLearnerMetric(cx ctlCtx, procIdx int, expectMetric string) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + args := e2e.CURLPrefixArgsCluster(cx.epc.Cfg, cx.epc.Procs[procIdx], "GET", e2e.CURLReq{Endpoint: "/metrics"}) + if err := e2e.SpawnWithExpectsContext(ctx, args, nil, expect.ExpectedResponse{Value: expectMetric}); err != nil { + cx.t.Fatal(err) + } +}