*: add cluster version and cluster version detection.

Cluster version is the min major.minor of all members in
the etcd cluster. Cluster version is set to the min version
that a etcd member is compatible with when first bootstrapp.

During a rolling upgrades, the cluster version will be updated
automatically.

For example:

```
Cluster [a:1, b:1 ,c:1] -> clusterVersion 1

update a -> 2, b -> 2

after a detection

Cluster [a:2, b:2 ,c:1] -> clusterVersion 1, since c is still 1

update c -> 2

after a detection

Cluster [a:2, b:2 ,c:2] -> clusterVersion 2
```

The API/raft component can utilize clusterVersion to determine if
it can accept a client request or a raft RPC.

We choose polling rather than pushing since we want to use the same
logic for cluster version detection and (TODO) cluster version checking.

Before a member actually joins a etcd cluster, it should check the version
of the cluster. Push does not work since the other members cannot push
version info to it before it actually joins. Moreover, we do not want our
raft RPC system (which is doing the heartbeat pushing) to coordinate cluster version.
This commit is contained in:
Xiang Li 2015-04-29 10:56:34 -07:00
parent 33febb979c
commit 6699107f61
11 changed files with 675 additions and 9 deletions

4
Godeps/Godeps.json generated
View File

@ -24,6 +24,10 @@
"ImportPath": "github.com/coreos/pkg/capnslog",
"Rev": "9d5dd4632f9ece71bdf83d31253593a633e73df5"
},
{
"ImportPath": "github.com/coreos/go-semver/semver",
"Rev": "568e959cd89871e61434c1143528d9162da89ef2"
},
{
"ImportPath": "github.com/gogo/protobuf/proto",
"Rev": "bc946d07d1016848dfd2507f90f0859c9471681e"

View File

@ -0,0 +1,209 @@
package semver
import (
"bytes"
"errors"
"fmt"
"strconv"
"strings"
)
type Version struct {
Major int64
Minor int64
Patch int64
PreRelease PreRelease
Metadata string
}
type PreRelease string
func splitOff(input *string, delim string) (val string) {
parts := strings.SplitN(*input, delim, 2)
if len(parts) == 2 {
*input = parts[0]
val = parts[1]
}
return val
}
func NewVersion(version string) (*Version, error) {
v := Version{}
dotParts := strings.SplitN(version, ".", 3)
if len(dotParts) != 3 {
return nil, errors.New(fmt.Sprintf("%s is not in dotted-tri format", version))
}
v.Metadata = splitOff(&dotParts[2], "+")
v.PreRelease = PreRelease(splitOff(&dotParts[2], "-"))
parsed := make([]int64, 3, 3)
for i, v := range dotParts[:3] {
val, err := strconv.ParseInt(v, 10, 64)
parsed[i] = val
if err != nil {
return nil, err
}
}
v.Major = parsed[0]
v.Minor = parsed[1]
v.Patch = parsed[2]
return &v, nil
}
func Must(v *Version, err error) *Version {
if err != nil {
panic(err)
}
return v
}
func (v *Version) String() string {
var buffer bytes.Buffer
base := fmt.Sprintf("%d.%d.%d", v.Major, v.Minor, v.Patch)
buffer.WriteString(base)
if v.PreRelease != "" {
buffer.WriteString(fmt.Sprintf("-%s", v.PreRelease))
}
if v.Metadata != "" {
buffer.WriteString(fmt.Sprintf("+%s", v.Metadata))
}
return buffer.String()
}
func (v *Version) LessThan(versionB Version) bool {
versionA := *v
cmp := recursiveCompare(versionA.Slice(), versionB.Slice())
if cmp == 0 {
cmp = preReleaseCompare(versionA, versionB)
}
if cmp == -1 {
return true
}
return false
}
/* Slice converts the comparable parts of the semver into a slice of strings */
func (v *Version) Slice() []int64 {
return []int64{v.Major, v.Minor, v.Patch}
}
func (p *PreRelease) Slice() []string {
preRelease := string(*p)
return strings.Split(preRelease, ".")
}
func preReleaseCompare(versionA Version, versionB Version) int {
a := versionA.PreRelease
b := versionB.PreRelease
/* Handle the case where if two versions are otherwise equal it is the
* one without a PreRelease that is greater */
if len(a) == 0 && (len(b) > 0) {
return 1
} else if len(b) == 0 && (len(a) > 0) {
return -1
}
// If there is a prelease, check and compare each part.
return recursivePreReleaseCompare(a.Slice(), b.Slice())
}
func recursiveCompare(versionA []int64, versionB []int64) int {
if len(versionA) == 0 {
return 0
}
a := versionA[0]
b := versionB[0]
if a > b {
return 1
} else if a < b {
return -1
}
return recursiveCompare(versionA[1:], versionB[1:])
}
func recursivePreReleaseCompare(versionA []string, versionB []string) int {
// Handle slice length disparity.
if len(versionA) == 0 {
// Nothing to compare too, so we return 0
return 0
} else if len(versionB) == 0 {
// We're longer than versionB so return 1.
return 1
}
a := versionA[0]
b := versionB[0]
aInt := false; bInt := false
aI, err := strconv.Atoi(versionA[0])
if err == nil {
aInt = true
}
bI, err := strconv.Atoi(versionB[0])
if err == nil {
bInt = true
}
// Handle Integer Comparison
if aInt && bInt {
if aI > bI {
return 1
} else if aI < bI {
return -1
}
}
// Handle String Comparison
if a > b {
return 1
} else if a < b {
return -1
}
return recursivePreReleaseCompare(versionA[1:], versionB[1:])
}
// BumpMajor increments the Major field by 1 and resets all other fields to their default values
func (v *Version) BumpMajor() {
v.Major += 1
v.Minor = 0
v.Patch = 0
v.PreRelease = PreRelease("")
v.Metadata = ""
}
// BumpMinor increments the Minor field by 1 and resets all other fields to their default values
func (v *Version) BumpMinor() {
v.Minor += 1
v.Patch = 0
v.PreRelease = PreRelease("")
v.Metadata = ""
}
// BumpPatch increments the Patch field by 1 and resets all other fields to their default values
func (v *Version) BumpPatch() {
v.Patch += 1
v.PreRelease = PreRelease("")
v.Metadata = ""
}

View File

@ -0,0 +1,223 @@
package semver
import (
"errors"
"math/rand"
"reflect"
"testing"
"time"
)
type fixture struct {
greaterVersion string
lesserVersion string
}
var fixtures = []fixture{
fixture{"0.0.0", "0.0.0-foo"},
fixture{"0.0.1", "0.0.0"},
fixture{"1.0.0", "0.9.9"},
fixture{"0.10.0", "0.9.0"},
fixture{"0.99.0", "0.10.0"},
fixture{"2.0.0", "1.2.3"},
fixture{"0.0.0", "0.0.0-foo"},
fixture{"0.0.1", "0.0.0"},
fixture{"1.0.0", "0.9.9"},
fixture{"0.10.0", "0.9.0"},
fixture{"0.99.0", "0.10.0"},
fixture{"2.0.0", "1.2.3"},
fixture{"0.0.0", "0.0.0-foo"},
fixture{"0.0.1", "0.0.0"},
fixture{"1.0.0", "0.9.9"},
fixture{"0.10.0", "0.9.0"},
fixture{"0.99.0", "0.10.0"},
fixture{"2.0.0", "1.2.3"},
fixture{"1.2.3", "1.2.3-asdf"},
fixture{"1.2.3", "1.2.3-4"},
fixture{"1.2.3", "1.2.3-4-foo"},
fixture{"1.2.3-5-foo", "1.2.3-5"},
fixture{"1.2.3-5", "1.2.3-4"},
fixture{"1.2.3-5-foo", "1.2.3-5-Foo"},
fixture{"3.0.0", "2.7.2+asdf"},
fixture{"3.0.0+foobar", "2.7.2"},
fixture{"1.2.3-a.10", "1.2.3-a.5"},
fixture{"1.2.3-a.b", "1.2.3-a.5"},
fixture{"1.2.3-a.b", "1.2.3-a"},
fixture{"1.2.3-a.b.c.10.d.5", "1.2.3-a.b.c.5.d.100"},
fixture{"1.0.0", "1.0.0-rc.1"},
fixture{"1.0.0-rc.2", "1.0.0-rc.1"},
fixture{"1.0.0-rc.1", "1.0.0-beta.11"},
fixture{"1.0.0-beta.11", "1.0.0-beta.2"},
fixture{"1.0.0-beta.2", "1.0.0-beta"},
fixture{"1.0.0-beta", "1.0.0-alpha.beta"},
fixture{"1.0.0-alpha.beta", "1.0.0-alpha.1"},
fixture{"1.0.0-alpha.1", "1.0.0-alpha"},
}
func TestCompare(t *testing.T) {
for _, v := range fixtures {
gt, err := NewVersion(v.greaterVersion)
if err != nil {
t.Error(err)
}
lt, err := NewVersion(v.lesserVersion)
if err != nil {
t.Error(err)
}
if gt.LessThan(*lt) == true {
t.Errorf("%s should not be less than %s", gt, lt)
}
}
}
func testString(t *testing.T, orig string, version *Version) {
if orig != version.String() {
t.Errorf("%s != %s", orig, version)
}
}
func TestString(t *testing.T) {
for _, v := range fixtures {
gt, err := NewVersion(v.greaterVersion)
if err != nil {
t.Error(err)
}
testString(t, v.greaterVersion, gt)
lt, err := NewVersion(v.lesserVersion)
if err != nil {
t.Error(err)
}
testString(t, v.lesserVersion, lt)
}
}
func shuffleStringSlice(src []string) []string {
dest := make([]string, len(src))
rand.Seed(time.Now().Unix())
perm := rand.Perm(len(src))
for i, v := range perm {
dest[v] = src[i]
}
return dest
}
func TestSort(t *testing.T) {
sortedVersions := []string{"1.0.0", "1.0.2", "1.2.0", "3.1.1"}
unsortedVersions := shuffleStringSlice(sortedVersions)
semvers := []*Version{}
for _, v := range unsortedVersions {
sv, err := NewVersion(v)
if err != nil {
t.Fatal(err)
}
semvers = append(semvers, sv)
}
Sort(semvers)
for idx, sv := range semvers {
if sv.String() != sortedVersions[idx] {
t.Fatalf("incorrect sort at index %v", idx)
}
}
}
func TestBumpMajor(t *testing.T) {
version, _ := NewVersion("1.0.0")
version.BumpMajor()
if version.Major != 2 {
t.Fatalf("bumping major on 1.0.0 resulted in %v", version)
}
version, _ = NewVersion("1.5.2")
version.BumpMajor()
if version.Minor != 0 && version.Patch != 0 {
t.Fatalf("bumping major on 1.5.2 resulted in %v", version)
}
version, _ = NewVersion("1.0.0+build.1-alpha.1")
version.BumpMajor()
if version.PreRelease != "" && version.PreRelease != "" {
t.Fatalf("bumping major on 1.0.0+build.1-alpha.1 resulted in %v", version)
}
}
func TestBumpMinor(t *testing.T) {
version, _ := NewVersion("1.0.0")
version.BumpMinor()
if version.Major != 1 {
t.Fatalf("bumping minor on 1.0.0 resulted in %v", version)
}
if version.Minor != 1 {
t.Fatalf("bumping major on 1.0.0 resulted in %v", version)
}
version, _ = NewVersion("1.0.0+build.1-alpha.1")
version.BumpMinor()
if version.PreRelease != "" && version.PreRelease != "" {
t.Fatalf("bumping major on 1.0.0+build.1-alpha.1 resulted in %v", version)
}
}
func TestBumpPatch(t *testing.T) {
version, _ := NewVersion("1.0.0")
version.BumpPatch()
if version.Major != 1 {
t.Fatalf("bumping minor on 1.0.0 resulted in %v", version)
}
if version.Minor != 0 {
t.Fatalf("bumping major on 1.0.0 resulted in %v", version)
}
if version.Patch != 1 {
t.Fatalf("bumping major on 1.0.0 resulted in %v", version)
}
version, _ = NewVersion("1.0.0+build.1-alpha.1")
version.BumpPatch()
if version.PreRelease != "" && version.PreRelease != "" {
t.Fatalf("bumping major on 1.0.0+build.1-alpha.1 resulted in %v", version)
}
}
func TestMust(t *testing.T) {
tests := []struct {
versionStr string
version *Version
recov interface{}
}{
{
versionStr: "1.0.0",
version: &Version{Major: 1},
},
{
versionStr: "version number",
recov: errors.New("version number is not in dotted-tri format"),
},
}
for _, tt := range tests {
func() {
defer func() {
recov := recover()
if !reflect.DeepEqual(tt.recov, recov) {
t.Fatalf("incorrect panic for %q: want %v, got %v", tt.versionStr, tt.recov, recov)
}
}()
version := Must(NewVersion(tt.versionStr))
if !reflect.DeepEqual(tt.version, version) {
t.Fatalf("incorrect version for %q: want %+v, got %+v", tt.versionStr, tt.version, version)
}
}()
}
}

View File

@ -0,0 +1,24 @@
package semver
import (
"sort"
)
type Versions []*Version
func (s Versions) Len() int {
return len(s)
}
func (s Versions) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s Versions) Less(i, j int) bool {
return s[i].LessThan(*s[j])
}
// Sort sorts the given slice of Version
func Sort(versions []*Version) {
sort.Sort(Versions(versions))
}

View File

@ -23,7 +23,9 @@ import (
"sort"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/version"
)
// isMemberBootstrapped tries to check if the given member has been bootstrapped
@ -106,3 +108,51 @@ func getRemotePeerURLs(cl ClusterInfo, local string) []string {
sort.Strings(us)
return us
}
// getVersions returns the versions of the members in the given cluster.
// The key of the returned map is the member's ID. The value of the returned map
// is the semver version string. If it fails to get the version of a member, the key
// will be an empty string.
func getVersions(cl ClusterInfo, tr *http.Transport) map[string]string {
members := cl.Members()
vers := make(map[string]string)
for _, m := range members {
ver, err := getVersion(m, tr)
if err != nil {
log.Printf("etcdserver: cannot get the version of member %s (%v)", m.ID, err)
vers[m.ID.String()] = ""
} else {
vers[m.ID.String()] = ver
}
}
return vers
}
// decideClusterVersion decides the cluster version based on the versions map.
// The returned version is the min version in the map, or nil if the min
// version in unknown.
func decideClusterVersion(vers map[string]string) *semver.Version {
var cv *semver.Version
lv := semver.Must(semver.NewVersion(version.Version))
for mid, ver := range vers {
if len(ver) == 0 {
return nil
}
v, err := semver.NewVersion(ver)
if err != nil {
log.Printf("etcdserver: cannot understand the version of member %s (%v)", mid, err)
return nil
}
if lv.LessThan(*v) {
log.Printf("etcdserver: the etcd version %s is not up-to-date", lv.String())
log.Printf("etcdserver: member %s has a higher version %s", mid, ver)
}
if cv == nil {
cv = v
} else if v.LessThan(*cv) {
cv = v
}
}
return cv
}

View File

@ -0,0 +1,58 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
import (
"reflect"
"testing"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
)
func TestDecideClusterVersion(t *testing.T) {
tests := []struct {
vers map[string]string
wdver *semver.Version
}{
{
map[string]string{"a": "2.0.0"},
semver.Must(semver.NewVersion("2.0.0")),
},
// unknow
{
map[string]string{"a": ""},
nil,
},
{
map[string]string{"a": "2.0.0", "b": "2.1.0", "c": "2.1.0"},
semver.Must(semver.NewVersion("2.0.0")),
},
{
map[string]string{"a": "2.1.0", "b": "2.1.0", "c": "2.1.0"},
semver.Must(semver.NewVersion("2.1.0")),
},
{
map[string]string{"a": "", "b": "2.1.0", "c": "2.1.0"},
nil,
},
}
for i, tt := range tests {
dver := decideClusterVersion(tt.vers)
if !reflect.DeepEqual(dver, tt.wdver) {
t.Errorf("#%d: ver = %+v, want %+v", i, dver, tt.wdver)
}
}
}

View File

@ -28,6 +28,7 @@ import (
"testing"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
etcdErr "github.com/coreos/etcd/error"
@ -116,6 +117,8 @@ func (s *serverRecorder) UpdateMember(_ context.Context, m etcdserver.Member) er
return nil
}
func (s *serverRecorder) ClusterVersion() *semver.Version { return nil }
type action struct {
name string
params []interface{}
@ -149,6 +152,7 @@ func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error
func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil }
func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil }
func (rs *resServer) UpdateMember(_ context.Context, _ etcdserver.Member) error { return nil }
func (rs *resServer) ClusterVersion() *semver.Version { return nil }
func boolp(b bool) *bool { return &b }

View File

@ -21,6 +21,7 @@ import (
"sort"
"testing"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver"
@ -74,6 +75,8 @@ func (fs *errServer) UpdateMember(ctx context.Context, m etcdserver.Member) erro
return fs.err
}
func (fs *errServer) ClusterVersion() *semver.Version { return nil }
func TestWriteError(t *testing.T) {
// nil error should not panic
rec := httptest.NewRecorder()

View File

@ -19,14 +19,17 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math/rand"
"net/http"
"path"
"sort"
"time"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/version"
)
// RaftAttributes represents the raft related attributes of an etcd member.
@ -149,6 +152,37 @@ func nodeToMember(n *store.NodeExtern) (*Member, error) {
return m, nil
}
// getVersion returns the version of the given member via its
// peerURLs. Returns the last error if it fails to get the version.
func getVersion(m *Member, tr *http.Transport) (string, error) {
cc := &http.Client{
Transport: tr,
Timeout: time.Second,
}
var (
err error
resp *http.Response
)
for _, u := range m.PeerURLs {
resp, err = cc.Get(u + "/version")
if err != nil {
continue
}
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
continue
}
var vers version.Versions
if err := json.Unmarshal(b, &vers); err != nil {
continue
}
return vers.Server, nil
}
return "", err
}
// implement sort by ID interface
type SortableMemberSlice []*Member

View File

@ -23,9 +23,11 @@ import (
"net/http"
"path"
"regexp"
"sync"
"sync/atomic"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/discovery"
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
@ -59,7 +61,8 @@ const (
StoreAdminPrefix = "/0"
StoreKeysPrefix = "/1"
purgeFileInterval = 30 * time.Second
purgeFileInterval = 30 * time.Second
monitorVersionInterval = 10 * time.Second
)
var (
@ -119,6 +122,17 @@ type Server interface {
// UpdateMember attempts to update a existing member in the cluster. It will
// return ErrIDNotFound if the member ID does not exist.
UpdateMember(ctx context.Context, updateMemb Member) error
// ClusterVersion is the cluster-wide minimum major.minor version.
// Cluster version is set to the min version that a etcd member is
// compatible with when first bootstrap.
//
// During a rolling upgrades, the ClusterVersion will be updated
// automatically after a sync. (10 second by default)
//
// The API/raft component can utilize ClusterVersion to determine if
// it can accept a client request or a raft RPC.
ClusterVersion() *semver.Version
}
// EtcdServer is the production implementation of the Server interface
@ -145,6 +159,9 @@ type EtcdServer struct {
SyncTicker <-chan time.Time
reqIDGen *idutil.Generator
verMu sync.Mutex
clusterVersion *semver.Version
}
// NewServer creates a new EtcdServer from the supplied configuration. The
@ -263,13 +280,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
raftStorage: s,
storage: NewStorage(w, ss),
},
id: id,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
Cluster: cfg.Cluster,
stats: sstats,
lstats: lstats,
SyncTicker: time.Tick(500 * time.Millisecond),
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
id: id,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
Cluster: cfg.Cluster,
stats: sstats,
lstats: lstats,
SyncTicker: time.Tick(500 * time.Millisecond),
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
clusterVersion: semver.Must(semver.NewVersion(version.MinClusterVersion)),
}
// TODO: move transport initialization near the definition of remote
@ -297,6 +315,7 @@ func (s *EtcdServer) Start() {
go s.publish(defaultPublishRetryInterval)
go s.purgeFile()
go monitorFileDescriptor(s.done)
go s.monitorVersions()
}
// start prepares and starts server in a new goroutine. It is no longer safe to
@ -862,3 +881,39 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
func (s *EtcdServer) ClusterVersion() *semver.Version {
s.verMu.Lock()
defer s.verMu.Unlock()
// deep copy
return semver.Must(semver.NewVersion(s.clusterVersion.String()))
}
// monitorVersions checks the member's version every monitorVersion interval.
// It updates the cluster version if all members agrees on a higher one.
// It prints out log if there is a member with a higher version than the
// local version.
func (s *EtcdServer) monitorVersions() {
for {
select {
case <-time.After(monitorVersionInterval):
v := decideClusterVersion(getVersions(s.Cluster, s.cfg.Transport))
if v == nil {
continue
}
s.verMu.Lock()
// clear patch version
v.Patch = 0
if s.clusterVersion.LessThan(*v) {
log.Printf("etcdsever: updated the cluster version from %v to %v", s.clusterVersion, v.String())
// TODO: persist the version upgrade via raft. Then etcdserver will be able to use the
// upgraded version without syncing with others after a restart.
s.clusterVersion = v
}
s.verMu.Unlock()
case <-s.done:
return
}
}
}

View File

@ -25,7 +25,9 @@ import (
)
var (
Version = "2.1.0-alpha.0+git"
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.0.0"
Version = "2.1.0-alpha.0+git"
)
// WalVersion is an enum for versions of etcd logs.