mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge 1ad93d8e4e4eaa28b8e2cdd222667118d6ba8e23 into 594427d28cf1547a02fb08db6e65ce72b02a8af6
This commit is contained in:
commit
35ff391c23
@ -275,6 +275,10 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
|
||||
onSet(c.lg, c.version)
|
||||
|
||||
for _, m := range c.members {
|
||||
if c.localID == m.ID {
|
||||
setIsLearnerMetric(m)
|
||||
}
|
||||
|
||||
c.lg.Info(
|
||||
"recovered/added member from store",
|
||||
zap.String("cluster-id", c.cid.String()),
|
||||
@ -397,6 +401,11 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if m.ID == c.localID {
|
||||
setIsLearnerMetric(m)
|
||||
}
|
||||
|
||||
if c.be != nil && shouldApplyV3 {
|
||||
beErr = unsafeSaveMemberToBackend(c.lg, c.be, m)
|
||||
if beErr != nil && !errors.Is(beErr, errMemberAlreadyExist) {
|
||||
@ -528,6 +537,11 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
|
||||
if c.v2store != nil {
|
||||
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
|
||||
}
|
||||
|
||||
if id == c.localID {
|
||||
isLearner.Set(0)
|
||||
}
|
||||
|
||||
if c.be != nil && shouldApplyV3 {
|
||||
unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
|
||||
}
|
||||
|
||||
@ -24,8 +24,23 @@ var (
|
||||
Help: "Which version is running. 1 for 'cluster_version' label with current cluster version",
|
||||
},
|
||||
[]string{"cluster_version"})
|
||||
isLearner = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "is_learner",
|
||||
Help: "Whether or not this member is a learner. 1 if is, 0 otherwise.",
|
||||
})
|
||||
)
|
||||
|
||||
func setIsLearnerMetric(m *Member) {
|
||||
if m.IsLearner {
|
||||
isLearner.Set(1)
|
||||
} else {
|
||||
isLearner.Set(0)
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(ClusterVersionMetrics)
|
||||
prometheus.MustRegister(isLearner)
|
||||
}
|
||||
|
||||
@ -44,12 +44,6 @@ var (
|
||||
Name: "leader_changes_seen_total",
|
||||
Help: "The number of leader changes seen.",
|
||||
})
|
||||
isLearner = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "is_learner",
|
||||
Help: "Whether or not this member is a learner. 1 if is, 0 otherwise.",
|
||||
})
|
||||
learnerPromoteFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
@ -195,7 +189,6 @@ func init() {
|
||||
prometheus.MustRegister(currentVersion)
|
||||
prometheus.MustRegister(currentGoVersion)
|
||||
prometheus.MustRegister(serverID)
|
||||
prometheus.MustRegister(isLearner)
|
||||
prometheus.MustRegister(learnerPromoteSucceed)
|
||||
prometheus.MustRegister(learnerPromoteFailed)
|
||||
prometheus.MustRegister(fdUsed)
|
||||
|
||||
@ -2413,15 +2413,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
||||
}
|
||||
}
|
||||
|
||||
// update the isLearner metric when this server id is equal to the id in raft member confChange
|
||||
if confChangeContext.Member.ID == s.id {
|
||||
if cc.Type == raftpb.ConfChangeAddLearnerNode {
|
||||
isLearner.Set(1)
|
||||
} else {
|
||||
isLearner.Set(0)
|
||||
}
|
||||
}
|
||||
|
||||
case raftpb.ConfChangeRemoveNode:
|
||||
id := types.ID(cc.NodeID)
|
||||
s.cluster.RemoveMember(id, shouldApplyV3)
|
||||
|
||||
@ -15,10 +15,14 @@
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/version"
|
||||
"go.etcd.io/etcd/pkg/v3/expect"
|
||||
"go.etcd.io/etcd/tests/v3/framework/config"
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
)
|
||||
|
||||
@ -36,6 +40,18 @@ func TestV3MetricsInsecure(t *testing.T) {
|
||||
testCtl(t, metricsTest)
|
||||
}
|
||||
|
||||
func TestV3LearnerMetricRecover(t *testing.T) {
|
||||
cfg := e2e.NewConfigTLS()
|
||||
cfg.SnapshotCount = 10
|
||||
testCtl(t, learnerMetricRecoverTest, withCfg(*cfg))
|
||||
}
|
||||
|
||||
func TestV3LearnerMetricApplyFromSnapshotTest(t *testing.T) {
|
||||
cfg := e2e.NewConfigTLS()
|
||||
cfg.SnapshotCount = 10
|
||||
testCtl(t, learnerMetricApplyFromSnapshotTest, withCfg(*cfg))
|
||||
}
|
||||
|
||||
func metricsTest(cx ctlCtx) {
|
||||
if err := ctlV3Put(cx, "k", "v", ""); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
@ -68,3 +84,64 @@ func metricsTest(cx ctlCtx) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func learnerMetricRecoverTest(cx ctlCtx) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
if _, err := cx.epc.StartNewProc(ctx, nil, cx.t, true /* addAsLearner */); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
expectLearnerMetrics(cx)
|
||||
|
||||
triggerSnapshot(ctx, cx)
|
||||
|
||||
// Restart cluster
|
||||
if err := cx.epc.Restart(ctx); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
expectLearnerMetrics(cx)
|
||||
}
|
||||
|
||||
func learnerMetricApplyFromSnapshotTest(cx ctlCtx) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Add learner but do not start it
|
||||
_, learnerCfg, err := cx.epc.AddMember(ctx, nil, cx.t, true /* addAsLearner */)
|
||||
if err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
|
||||
triggerSnapshot(ctx, cx)
|
||||
|
||||
// Start the learner
|
||||
if err = cx.epc.StartNewProcFromConfig(ctx, cx.t, learnerCfg); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
expectLearnerMetrics(cx)
|
||||
}
|
||||
|
||||
func triggerSnapshot(ctx context.Context, cx ctlCtx) {
|
||||
etcdctl := cx.epc.Procs[0].Etcdctl()
|
||||
for i := 0; i < int(cx.epc.Cfg.SnapshotCount); i++ {
|
||||
if err := etcdctl.Put(ctx, "k", "v", config.PutOptions{}); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func expectLearnerMetrics(cx ctlCtx) {
|
||||
expectLearnerMetric(cx, 0, "etcd_server_is_learner 0")
|
||||
expectLearnerMetric(cx, 1, "etcd_server_is_learner 1")
|
||||
}
|
||||
|
||||
func expectLearnerMetric(cx ctlCtx, procIdx int, expectMetric string) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
args := e2e.CURLPrefixArgsCluster(cx.epc.Cfg, cx.epc.Procs[procIdx], "GET", e2e.CURLReq{Endpoint: "/metrics"})
|
||||
if err := e2e.SpawnWithExpectsContext(ctx, args, nil, expect.ExpectedResponse{Value: expectMetric}); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user