mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #12099 from YoyinZyc/downgrade-httphandler
[Etcd downgrade] Add http handler to enable downgrade info communication between each member
This commit is contained in:
commit
bc3a77d298
@ -56,6 +56,7 @@ const (
|
||||
DefaultGRPCKeepAliveMinTime = 5 * time.Second
|
||||
DefaultGRPCKeepAliveInterval = 2 * time.Hour
|
||||
DefaultGRPCKeepAliveTimeout = 20 * time.Second
|
||||
DefaultDowngradeCheckTime = 5 * time.Second
|
||||
|
||||
DefaultListenPeerURLs = "http://localhost:2380"
|
||||
DefaultListenClientURLs = "http://localhost:2379"
|
||||
@ -330,6 +331,8 @@ type Config struct {
|
||||
// UnsafeNoFsync disables all uses of fsync.
|
||||
// Setting this is unsafe and will cause data loss.
|
||||
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
|
||||
|
||||
ExperimentalDowngradeCheckTime time.Duration `json:"experimental-downgrade-check-time"`
|
||||
}
|
||||
|
||||
// configYAML holds the config suitable for yaml parsing
|
||||
@ -413,6 +416,8 @@ func NewConfig() *Config {
|
||||
LogOutputs: []string{DefaultLogOutput},
|
||||
LogLevel: logutil.DefaultLogLevel,
|
||||
EnableGRPCGateway: true,
|
||||
|
||||
ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime,
|
||||
}
|
||||
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
|
||||
return cfg
|
||||
|
@ -201,6 +201,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
||||
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
||||
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
||||
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
|
||||
}
|
||||
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
|
||||
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
|
||||
@ -303,6 +304,7 @@ func print(lg *zap.Logger, ec Config, sc etcdserver.ServerConfig, memberInitiali
|
||||
zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()),
|
||||
zap.String("discovery-url", sc.DiscoveryURL),
|
||||
zap.String("discovery-proxy", sc.DiscoveryProxy),
|
||||
zap.String("downgrade-check-interval", sc.DowngradeCheckTime.String()),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -252,6 +252,7 @@ func newConfig() *config {
|
||||
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
|
||||
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
|
||||
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
|
||||
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")
|
||||
|
||||
// unsafe
|
||||
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
|
||||
|
@ -38,7 +38,7 @@ const (
|
||||
|
||||
// NewPeerHandler generates an http.Handler to handle etcd peer requests.
|
||||
func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler {
|
||||
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler())
|
||||
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler(), s.DowngradeEnabledHandler())
|
||||
}
|
||||
|
||||
func newPeerHandler(
|
||||
@ -47,6 +47,7 @@ func newPeerHandler(
|
||||
raftHandler http.Handler,
|
||||
leaseHandler http.Handler,
|
||||
hashKVHandler http.Handler,
|
||||
downgradeEnabledHandler http.Handler,
|
||||
) http.Handler {
|
||||
if lg == nil {
|
||||
lg = zap.NewNop()
|
||||
@ -64,6 +65,9 @@ func newPeerHandler(
|
||||
mux.Handle(leasehttp.LeasePrefix, leaseHandler)
|
||||
mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
|
||||
}
|
||||
if downgradeEnabledHandler != nil {
|
||||
mux.Handle(etcdserver.DowngradeEnabledPath, downgradeEnabledHandler)
|
||||
}
|
||||
if hashKVHandler != nil {
|
||||
mux.Handle(etcdserver.PeerHashKVPath, hashKVHandler)
|
||||
}
|
||||
|
@ -83,7 +83,7 @@ var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Reque
|
||||
// TestNewPeerHandlerOnRaftPrefix tests that NewPeerHandler returns a handler that
|
||||
// handles raft-prefix requests well.
|
||||
func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
|
||||
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil)
|
||||
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil, nil)
|
||||
srv := httptest.NewServer(ph)
|
||||
defer srv.Close()
|
||||
|
||||
@ -231,7 +231,7 @@ func TestServeMemberPromoteFails(t *testing.T) {
|
||||
|
||||
// TestNewPeerHandlerOnMembersPromotePrefix verifies the request with members promote prefix is routed correctly
|
||||
func TestNewPeerHandlerOnMembersPromotePrefix(t *testing.T) {
|
||||
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil)
|
||||
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil, nil)
|
||||
srv := httptest.NewServer(ph)
|
||||
defer srv.Close()
|
||||
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -369,9 +370,103 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R
|
||||
|
||||
// getDowngradeEnabledFromRemotePeers will get the downgrade enabled status of the cluster.
|
||||
func getDowngradeEnabledFromRemotePeers(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
|
||||
members := cl.Members()
|
||||
|
||||
for _, m := range members {
|
||||
if m.ID == local {
|
||||
continue
|
||||
}
|
||||
enable, err := getDowngradeEnabled(lg, m, rt)
|
||||
if err != nil {
|
||||
lg.Warn("failed to get downgrade enabled status", zap.String("remote-member-id", m.ID.String()), zap.Error(err))
|
||||
} else {
|
||||
// Since the "/downgrade/enabled" serves linearized data,
|
||||
// this function can return once it gets a non-error response from the endpoint.
|
||||
return enable
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// getDowngradeEnabled returns the downgrade enabled status of the given member
|
||||
// via its peerURLs. Returns the last error if it fails to get it.
|
||||
func getDowngradeEnabled(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (bool, error) {
|
||||
cc := &http.Client{
|
||||
Transport: rt,
|
||||
}
|
||||
var (
|
||||
err error
|
||||
resp *http.Response
|
||||
)
|
||||
|
||||
for _, u := range m.PeerURLs {
|
||||
addr := u + DowngradeEnabledPath
|
||||
resp, err = cc.Get(addr)
|
||||
if err != nil {
|
||||
lg.Warn(
|
||||
"failed to reach the peer URL",
|
||||
zap.String("address", addr),
|
||||
zap.String("remote-member-id", m.ID.String()),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
var b []byte
|
||||
b, err = ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
lg.Warn(
|
||||
"failed to read body of response",
|
||||
zap.String("address", addr),
|
||||
zap.String("remote-member-id", m.ID.String()),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
var enable bool
|
||||
if enable, err = strconv.ParseBool(string(b)); err != nil {
|
||||
lg.Warn(
|
||||
"failed to convert response",
|
||||
zap.String("address", addr),
|
||||
zap.String("remote-member-id", m.ID.String()),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
return enable, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
// isMatchedVersions returns true if all server versions are equal to target version, otherwise return false.
|
||||
// It can be used to decide the whether the cluster finishes downgrading to target version.
|
||||
func isMatchedVersions(lg *zap.Logger, targetVersion *semver.Version, vers map[string]*version.Versions) bool {
|
||||
for mid, ver := range vers {
|
||||
if ver == nil {
|
||||
return false
|
||||
}
|
||||
v, err := semver.NewVersion(ver.Cluster)
|
||||
if err != nil {
|
||||
lg.Warn(
|
||||
"failed to parse server version of remote member",
|
||||
zap.String("remote-member-id", mid),
|
||||
zap.String("remote-member-version", ver.Server),
|
||||
zap.Error(err),
|
||||
)
|
||||
return false
|
||||
}
|
||||
if !targetVersion.Equal(*v) {
|
||||
lg.Warn("remotes server has mismatching etcd version",
|
||||
zap.String("remote-member-id", mid),
|
||||
zap.String("current-server-version", v.String()),
|
||||
zap.String("target-version", targetVersion.String()),
|
||||
)
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func convertToClusterVersion(v string) (*semver.Version, error) {
|
||||
ver, err := semver.NewVersion(v)
|
||||
if err != nil {
|
||||
|
@ -215,3 +215,52 @@ func TestDecideAllowedVersionRange(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsMatchedVersions(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
targetVersion *semver.Version
|
||||
versionMap map[string]*version.Versions
|
||||
expectedFinished bool
|
||||
}{
|
||||
{
|
||||
"When downgrade finished",
|
||||
&semver.Version{Major: 3, Minor: 4},
|
||||
map[string]*version.Versions{
|
||||
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
|
||||
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
|
||||
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
|
||||
},
|
||||
true,
|
||||
},
|
||||
{
|
||||
"When cannot parse peer version",
|
||||
&semver.Version{Major: 3, Minor: 4},
|
||||
map[string]*version.Versions{
|
||||
"mem1": {Server: "3.4.1", Cluster: "3.4"},
|
||||
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
|
||||
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
|
||||
},
|
||||
false,
|
||||
},
|
||||
{
|
||||
"When downgrade not finished",
|
||||
&semver.Version{Major: 3, Minor: 4},
|
||||
map[string]*version.Versions{
|
||||
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
|
||||
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
|
||||
"mem3": {Server: "3.5.2", Cluster: "3.5.0"},
|
||||
},
|
||||
false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
actual := isMatchedVersions(zap.NewNop(), tt.targetVersion, tt.versionMap)
|
||||
if actual != tt.expectedFinished {
|
||||
t.Errorf("expected downgrade finished is %v; got %v", tt.expectedFinished, actual)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -162,6 +162,8 @@ type ServerConfig struct {
|
||||
// UnsafeNoFsync disables all uses of fsync.
|
||||
// Setting this is unsafe and will cause data loss.
|
||||
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
|
||||
|
||||
DowngradeCheckTime time.Duration
|
||||
}
|
||||
|
||||
// VerifyBootstrap sanity-checks the initial config for bootstrap case
|
||||
|
@ -337,11 +337,6 @@ func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo
|
||||
return nil, ErrCorrupt
|
||||
}
|
||||
|
||||
type ServerPeerV2 interface {
|
||||
ServerPeer
|
||||
HashKVHandler() http.Handler
|
||||
}
|
||||
|
||||
const PeerHashKVPath = "/members/hashkv"
|
||||
|
||||
type hashKVHandler struct {
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -101,6 +102,8 @@ const (
|
||||
recommendedMaxRequestBytes = 10 * 1024 * 1024
|
||||
|
||||
readyPercent = 0.9
|
||||
|
||||
DowngradeEnabledPath = "/downgrade/enabled"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -705,6 +708,7 @@ func (s *EtcdServer) Start() {
|
||||
s.GoAttach(s.monitorVersions)
|
||||
s.GoAttach(s.linearizableReadLoop)
|
||||
s.GoAttach(s.monitorKVHash)
|
||||
s.GoAttach(s.monitorDowngrade)
|
||||
}
|
||||
|
||||
// start prepares and starts server in a new goroutine. It is no longer safe to
|
||||
@ -814,6 +818,56 @@ func (s *EtcdServer) LeaseHandler() http.Handler {
|
||||
|
||||
func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
|
||||
|
||||
type ServerPeerV2 interface {
|
||||
ServerPeer
|
||||
HashKVHandler() http.Handler
|
||||
DowngradeEnabledHandler() http.Handler
|
||||
}
|
||||
|
||||
func (s *EtcdServer) DowngradeInfo() *membership.DowngradeInfo { return s.cluster.DowngradeInfo() }
|
||||
|
||||
type downgradeEnabledHandler struct {
|
||||
lg *zap.Logger
|
||||
cluster api.Cluster
|
||||
server *EtcdServer
|
||||
}
|
||||
|
||||
func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
|
||||
return &downgradeEnabledHandler{
|
||||
lg: s.getLogger(),
|
||||
cluster: s.cluster,
|
||||
server: s,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
w.Header().Set("Allow", http.MethodGet)
|
||||
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
|
||||
|
||||
if r.URL.Path != DowngradeEnabledPath {
|
||||
http.Error(w, "bad path", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), h.server.Cfg.ReqTimeout())
|
||||
defer cancel()
|
||||
|
||||
// serve with linearized downgrade info
|
||||
if err := h.server.linearizableReadNotify(ctx); err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed linearized read: %v", err),
|
||||
http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
enabled := h.server.DowngradeInfo().Enabled
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
w.Write([]byte(strconv.FormatBool(enabled)))
|
||||
}
|
||||
|
||||
// Process takes a raft message and applies it to the server's raft state
|
||||
// machine, respecting any timeout of the given context.
|
||||
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
|
||||
@ -2318,6 +2372,41 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) monitorDowngrade() {
|
||||
t := s.Cfg.DowngradeCheckTime
|
||||
if t == 0 {
|
||||
return
|
||||
}
|
||||
lg := s.getLogger()
|
||||
for {
|
||||
select {
|
||||
case <-time.After(t):
|
||||
case <-s.stopping:
|
||||
return
|
||||
}
|
||||
|
||||
if !s.isLeader() {
|
||||
continue
|
||||
}
|
||||
|
||||
d := s.cluster.DowngradeInfo()
|
||||
if !d.Enabled {
|
||||
continue
|
||||
}
|
||||
|
||||
targetVersion := d.TargetVersion
|
||||
v := semver.Must(semver.NewVersion(targetVersion))
|
||||
if isMatchedVersions(s.getLogger(), v, getVersions(s.getLogger(), s.cluster, s.id, s.peerRt)) {
|
||||
lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion))
|
||||
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
||||
if _, err := s.downgradeCancel(ctx); err != nil {
|
||||
lg.Warn("failed to cancel downgrade", zap.Error(err))
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
|
||||
switch err {
|
||||
case context.Canceled:
|
||||
|
@ -172,7 +172,7 @@ func TestIssue6361(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
dialTimeout := 7 * time.Second
|
||||
dialTimeout := 10 * time.Second
|
||||
prefixArgs := []string{ctlBinPath, "--endpoints", strings.Join(epc.EndpointsV3(), ","), "--dial-timeout", dialTimeout.String()}
|
||||
|
||||
// write some keys
|
||||
|
Loading…
x
Reference in New Issue
Block a user