use v2 api to update cluster version

This commit is contained in:
Chao Chen 2021-05-17 10:57:29 -07:00
parent 932d42b027
commit 783c5ad2d2
3 changed files with 101 additions and 7 deletions

View File

@ -21,7 +21,9 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/etcdserver/api"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
@ -92,9 +94,12 @@ func (a *applierV2store) Put(r *RequestV2) Response {
// return an empty response since there is no consumer. // return an empty response since there is no consumer.
return Response{} return Response{}
} }
// remove v2 version set to avoid the conflict between v2 and v3. // TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
if r.Path == membership.StoreClusterVersionKey() { if r.Path == membership.StoreClusterVersionKey() {
// return an empty response since there is no consumer. if a.cluster != nil {
// persist to backend given v2store can be very stale
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, membership.ApplyBoth)
}
return Response{} return Response{}
} }
return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions)) return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions))

View File

@ -2428,6 +2428,7 @@ func (s *EtcdServer) ClusterVersion() *semver.Version {
// It updates the cluster version if all members agrees on a higher one. // It updates the cluster version if all members agrees on a higher one.
// It prints out log if there is a member with a higher version than the // It prints out log if there is a member with a higher version than the
// local version. // local version.
// TODO switch to updateClusterVersionV3 in 3.6
func (s *EtcdServer) monitorVersions() { func (s *EtcdServer) monitorVersions() {
for { for {
select { select {
@ -2458,27 +2459,67 @@ func (s *EtcdServer) monitorVersions() {
if v != nil { if v != nil {
verStr = v.String() verStr = v.String()
} }
s.GoAttach(func() { s.updateClusterVersion(verStr) }) s.GoAttach(func() { s.updateClusterVersionV2(verStr) })
continue continue
} }
if v != nil && membership.IsValidVersionChange(s.cluster.Version(), v) { if v != nil && membership.IsValidVersionChange(s.cluster.Version(), v) {
s.GoAttach(func() { s.updateClusterVersion(v.String()) }) s.GoAttach(func() { s.updateClusterVersionV2(v.String()) })
} }
} }
} }
func (s *EtcdServer) updateClusterVersion(ver string) { func (s *EtcdServer) updateClusterVersionV2(ver string) {
lg := s.Logger() lg := s.Logger()
if s.cluster.Version() == nil { if s.cluster.Version() == nil {
lg.Info( lg.Info(
"setting up initial cluster version", "setting up initial cluster version using v2 API",
zap.String("cluster-version", version.Cluster(ver)), zap.String("cluster-version", version.Cluster(ver)),
) )
} else { } else {
lg.Info( lg.Info(
"updating cluster version", "updating cluster version using v2 API",
zap.String("from", version.Cluster(s.cluster.Version().String())),
zap.String("to", version.Cluster(ver)),
)
}
req := pb.Request{
Method: "PUT",
Path: membership.StoreClusterVersionKey(),
Val: ver,
}
ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
_, err := s.Do(ctx, req)
cancel()
switch err {
case nil:
lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver)))
return
case ErrStopped:
lg.Warn("aborting cluster version update; server is stopped", zap.Error(err))
return
default:
lg.Warn("failed to update cluster version", zap.Error(err))
}
}
func (s *EtcdServer) updateClusterVersionV3(ver string) {
lg := s.Logger()
if s.cluster.Version() == nil {
lg.Info(
"setting up initial cluster version using v3 API",
zap.String("cluster-version", version.Cluster(ver)),
)
} else {
lg.Info(
"updating cluster version using v3 API",
zap.String("from", version.Cluster(s.cluster.Version().String())), zap.String("from", version.Cluster(s.cluster.Version().String())),
zap.String("to", version.Cluster(ver)), zap.String("to", version.Cluster(ver)),
) )

View File

@ -22,6 +22,7 @@ import (
"math" "math"
"net/http" "net/http"
"os" "os"
"path"
"path/filepath" "path/filepath"
"reflect" "reflect"
"sync" "sync"
@ -1716,6 +1717,53 @@ func TestPublishV3Retry(t *testing.T) {
<-ch <-ch
} }
func TestUpdateVersion(t *testing.T) {
n := newNodeRecorder()
ch := make(chan interface{}, 1)
// simulate that request has gone through consensus
ch <- Response{}
w := wait.NewWithResponse(ch)
ctx, cancel := context.WithCancel(context.TODO())
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
id: 1,
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
cluster: &membership.RaftCluster{},
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
ctx: ctx,
cancel: cancel,
}
srv.updateClusterVersionV2("2.0.0")
action := n.Action()
if len(action) != 1 {
t.Fatalf("len(action) = %d, want 1", len(action))
}
if action[0].Name != "Propose" {
t.Fatalf("action = %s, want Propose", action[0].Name)
}
data := action[0].Params[0].([]byte)
var r pb.Request
if err := r.Unmarshal(data); err != nil {
t.Fatalf("unmarshal request error: %v", err)
}
if r.Method != "PUT" {
t.Errorf("method = %s, want PUT", r.Method)
}
if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath {
t.Errorf("path = %s, want %s", r.Path, wpath)
}
if r.Val != "2.0.0" {
t.Errorf("val = %s, want %s", r.Val, "2.0.0")
}
}
func TestStopNotify(t *testing.T) { func TestStopNotify(t *testing.T) {
s := &EtcdServer{ s := &EtcdServer{
lgMu: new(sync.RWMutex), lgMu: new(sync.RWMutex),