etcdserver: support update cluster version through raft

1. Persist the cluster version change through raft. When the member is restarted, it can recover
the previous known decided cluster version.

2. When there is a new leader, it is forced to do a version checking immediately. This helps to
update the first cluster version fast.
This commit is contained in:
Xiang Li 2015-05-08 11:10:12 -07:00
parent 0a6f481ca5
commit e866314b94
8 changed files with 262 additions and 101 deletions

View File

@ -26,6 +26,7 @@ import (
"strings"
"sync"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/pkg/flags"
"github.com/coreos/etcd/pkg/netutil"
"github.com/coreos/etcd/pkg/types"
@ -60,7 +61,8 @@ type Cluster struct {
token string
store store.Store
sync.Mutex // guards members and removed map
sync.Mutex // guards the fields below
version *semver.Version
members map[types.ID]*Member
// removed contains the ids of removed members in the cluster.
// removed id cannot be reused.
@ -100,6 +102,7 @@ func NewClusterFromStore(token string, st store.Store) *Cluster {
c := newCluster(token)
c.store = st
c.members, c.removed = membersFromStore(c.store)
c.version = clusterVersionFromStore(c.store)
return c
}
@ -232,6 +235,7 @@ func (c *Cluster) SetStore(st store.Store) { c.store = st }
func (c *Cluster) Recover() {
c.members, c.removed = membersFromStore(c.store)
c.version = clusterVersionFromStore(c.store)
}
// ValidateConfigurationChange takes a proposed ConfChange and
@ -347,6 +351,26 @@ func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
c.members[id].RaftAttributes = raftAttr
}
func (c *Cluster) Version() *semver.Version {
c.Lock()
defer c.Unlock()
if c.version == nil {
return nil
}
return semver.Must(semver.NewVersion(c.version.String()))
}
func (c *Cluster) SetVersion(ver *semver.Version) {
c.Lock()
defer c.Unlock()
if c.version != nil {
log.Printf("etcdsever: updated the cluster version from %v to %v", c.version.String(), ver.String())
} else {
log.Printf("etcdsever: set the initial cluster version to %v", ver.String())
}
c.version = ver
}
// Validate ensures that there is no identical urls in the cluster peer list
func (c *Cluster) Validate() error {
urlMap := make(map[string]bool)
@ -392,6 +416,17 @@ func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool)
return members, removed
}
func clusterVersionFromStore(st store.Store) *semver.Version {
e, err := st.Get(path.Join(StoreClusterPrefix, "version"), false, false)
if err != nil {
if isKeyNotFound(err) {
return nil
}
log.Panicf("etcdserver: unexpected error (%v) when getting cluster version from store", err)
}
return semver.Must(semver.NewVersion(*e.Node.Value))
}
// ValidateClusterAndAssignIDs validates the local cluster by matching the PeerURLs
// with the existing cluster. If the validation succeeds, it assigns the IDs
// from the existing cluster to the local cluster.

View File

@ -21,6 +21,7 @@ import (
"reflect"
"testing"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
@ -79,33 +80,48 @@ func TestClusterFromStringBad(t *testing.T) {
func TestClusterFromStore(t *testing.T) {
tests := []struct {
mems []*Member
ver *semver.Version
}{
{
[]*Member{newTestMember(1, nil, "", nil)},
semver.Must(semver.NewVersion("2.0.0")),
},
{
nil,
nil,
},
{
[]*Member{
newTestMember(1, nil, "", nil),
newTestMember(2, nil, "", nil),
},
semver.Must(semver.NewVersion("2.0.0")),
},
}
for i, tt := range tests {
st := store.New()
hc := newTestCluster(nil)
hc.SetStore(store.New())
hc.SetStore(st)
for _, m := range tt.mems {
hc.AddMember(m)
}
c := NewClusterFromStore("abc", hc.store)
if tt.ver != nil {
_, err := st.Set(path.Join(StoreClusterPrefix, "version"), false, tt.ver.String(), store.Permanent)
if err != nil {
t.Fatal(err)
}
}
c := NewClusterFromStore("abc", st)
if c.token != "abc" {
t.Errorf("#%d: token = %v, want %v", i, c.token, "abc")
}
if !reflect.DeepEqual(c.Members(), tt.mems) {
t.Errorf("#%d: members = %v, want %v", i, c.Members(), tt.mems)
}
if !reflect.DeepEqual(c.Version(), tt.ver) {
t.Errorf("#%d: ver = %v, want %v", i, c.Version(), tt.ver)
}
}
}

View File

@ -156,3 +156,34 @@ func decideClusterVersion(vers map[string]string) *semver.Version {
}
return cv
}
// getVersion returns the version of the given member via its
// peerURLs. Returns the last error if it fails to get the version.
func getVersion(m *Member, tr *http.Transport) (string, error) {
cc := &http.Client{
Transport: tr,
Timeout: time.Second,
}
var (
err error
resp *http.Response
)
for _, u := range m.PeerURLs {
resp, err = cc.Get(u + "/version")
if err != nil {
continue
}
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
continue
}
var vers version.Versions
if err := json.Unmarshal(b, &vers); err != nil {
continue
}
return vers.Server, nil
}
return "", err
}

View File

@ -19,17 +19,14 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math/rand"
"net/http"
"path"
"sort"
"time"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/version"
)
// RaftAttributes represents the raft related attributes of an etcd member.
@ -152,37 +149,6 @@ func nodeToMember(n *store.NodeExtern) (*Member, error) {
return m, nil
}
// getVersion returns the version of the given member via its
// peerURLs. Returns the last error if it fails to get the version.
func getVersion(m *Member, tr *http.Transport) (string, error) {
cc := &http.Client{
Transport: tr,
Timeout: time.Second,
}
var (
err error
resp *http.Response
)
for _, u := range m.PeerURLs {
resp, err = cc.Get(u + "/version")
if err != nil {
continue
}
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
continue
}
var vers version.Versions
if err := json.Unmarshal(b, &vers); err != nil {
continue
}
return vers.Server, nil
}
return "", err
}
// implement sort by ID interface
type SortableMemberSlice []*Member

View File

@ -23,7 +23,6 @@ import (
"net/http"
"path"
"regexp"
"sync"
"sync/atomic"
"time"
@ -62,7 +61,8 @@ const (
StoreKeysPrefix = "/1"
purgeFileInterval = 30 * time.Second
monitorVersionInterval = 10 * time.Second
monitorVersionInterval = 5 * time.Second
versionUpdateTimeout = 1 * time.Second
)
var (
@ -127,11 +127,16 @@ type Server interface {
// Cluster version is set to the min version that a etcd member is
// compatible with when first bootstrap.
//
// ClusterVersion is nil until the cluster is bootstrapped (has a quorum).
//
// During a rolling upgrades, the ClusterVersion will be updated
// automatically after a sync. (10 second by default)
// automatically after a sync. (5 second by default)
//
// The API/raft component can utilize ClusterVersion to determine if
// it can accept a client request or a raft RPC.
// NOTE: ClusterVersion might be nil when etcd 2.1 works with etcd 2.0 and
// the leader is etcd 2.0. etcd 2.0 leader will not update clusterVersion since
// this feature is introduced post 2.0.
ClusterVersion() *semver.Version
}
@ -160,8 +165,9 @@ type EtcdServer struct {
reqIDGen *idutil.Generator
verMu sync.Mutex
clusterVersion *semver.Version
// forceVersionC is used to force the version monitor loop
// to detect the cluster version immediately.
forceVersionC chan struct{}
}
// NewServer creates a new EtcdServer from the supplied configuration. The
@ -280,14 +286,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
raftStorage: s,
storage: NewStorage(w, ss),
},
id: id,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
Cluster: cfg.Cluster,
stats: sstats,
lstats: lstats,
SyncTicker: time.Tick(500 * time.Millisecond),
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
clusterVersion: semver.Must(semver.NewVersion(version.MinClusterVersion)),
id: id,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
Cluster: cfg.Cluster,
stats: sstats,
lstats: lstats,
SyncTicker: time.Tick(500 * time.Millisecond),
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
forceVersionC: make(chan struct{}),
}
// TODO: move transport initialization near the definition of remote
@ -329,6 +335,11 @@ func (s *EtcdServer) start() {
s.w = wait.New()
s.done = make(chan struct{})
s.stop = make(chan struct{})
if s.ClusterVersion() != nil {
log.Printf("etcdserver: starting server... [version: %v, cluster version: %v]", version.Version, s.ClusterVersion())
} else {
log.Printf("etcdserver: starting server... [version: %v, cluster version: to_be_decided]", version.Version)
}
// TODO: if this is an empty log, writes all peer infos
// into the first entry
go s.run()
@ -709,6 +720,10 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
// raft state machine may generate noop entry when leader confirmation.
// skip it in advance to avoid some potential bug in the future
if len(e.Data) == 0 {
select {
case s.forceVersionC <- struct{}{}:
default:
}
break
}
var r pb.Request
@ -754,6 +769,8 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
case r.PrevIndex > 0 || r.PrevValue != "":
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
default:
// TODO (yicheng): cluster should be the owner of cluster prefix store
// we should not modify cluster store here.
if storeMemberAttributeRegexp.MatchString(r.Path) {
id := mustParseMemberIDFromKey(path.Dir(r.Path))
var attr Attributes
@ -762,6 +779,9 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
}
s.Cluster.UpdateAttributes(id, attr)
}
if r.Path == path.Join(StoreClusterPrefix, "version") {
s.Cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
}
return f(s.store.Set(r.Path, r.Dir, r.Val, expr))
}
case "DELETE":
@ -883,10 +903,10 @@ func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
func (s *EtcdServer) ClusterVersion() *semver.Version {
s.verMu.Lock()
defer s.verMu.Unlock()
// deep copy
return semver.Must(semver.NewVersion(s.clusterVersion.String()))
if s.Cluster == nil {
return nil
}
return s.Cluster.Version()
}
// monitorVersions checks the member's version every monitorVersion interval.
@ -896,24 +916,66 @@ func (s *EtcdServer) ClusterVersion() *semver.Version {
func (s *EtcdServer) monitorVersions() {
for {
select {
case <-s.forceVersionC:
case <-time.After(monitorVersionInterval):
v := decideClusterVersion(getVersions(s.Cluster, s.cfg.Transport))
if v == nil {
continue
}
s.verMu.Lock()
// clear patch version
v.Patch = 0
if s.clusterVersion.LessThan(*v) {
log.Printf("etcdsever: updated the cluster version from %v to %v", s.clusterVersion, v.String())
// TODO: persist the version upgrade via raft. Then etcdserver will be able to use the
// upgraded version without syncing with others after a restart.
s.clusterVersion = v
}
s.verMu.Unlock()
case <-s.done:
return
}
if s.Leader() != s.ID() {
continue
}
v := decideClusterVersion(getVersions(s.Cluster, s.cfg.Transport))
if v != nil {
// only keep major.minor version for comparasion
v = &semver.Version{
Major: v.Major,
Minor: v.Minor,
}
}
// if the current version is nil:
// 1. use the decided version if possible
// 2. or use the min cluster version
if s.Cluster.Version() == nil {
if v != nil {
go s.updateClusterVersion(v.String())
} else {
go s.updateClusterVersion(version.MinClusterVersion)
}
continue
}
// update cluster version only if the decided version is greater than
// the current cluster version
if v != nil && s.Cluster.Version().LessThan(*v) {
go s.updateClusterVersion(v.String())
}
}
}
func (s *EtcdServer) updateClusterVersion(ver string) {
if s.Cluster.Version() == nil {
log.Printf("etcdsever: setting up the initial cluster version to %v", ver)
} else {
log.Printf("etcdsever: updating the cluster version from %v to %v", s.Cluster.Version(), ver)
}
req := pb.Request{
Method: "PUT",
Path: path.Join(StoreClusterPrefix, "version"),
Val: ver,
}
ctx, cancel := context.WithTimeout(context.Background(), versionUpdateTimeout)
_, err := s.Do(ctx, req)
cancel()
switch err {
case nil:
return
case ErrStopped:
log.Printf("etcdserver: aborting update cluster version because server is stopped")
return
default:
log.Printf("etcdserver: error updating cluster version (%v)", err)
}
}

View File

@ -1041,6 +1041,45 @@ func TestPublishRetry(t *testing.T) {
}
}
func TestUpdateVersion(t *testing.T) {
n := &nodeRecorder{}
ch := make(chan interface{}, 1)
// simulate that request has gone through consensus
ch <- Response{}
w := &waitWithResponse{ch: ch}
srv := &EtcdServer{
id: 1,
r: raftNode{Node: n},
attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
Cluster: &Cluster{},
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.updateClusterVersion("2.0.0")
action := n.Action()
if len(action) != 1 {
t.Fatalf("len(action) = %d, want 1", len(action))
}
if action[0].Name != "Propose" {
t.Fatalf("action = %s, want Propose", action[0].Name)
}
data := action[0].Params[0].([]byte)
var r pb.Request
if err := r.Unmarshal(data); err != nil {
t.Fatalf("unmarshal request error: %v", err)
}
if r.Method != "PUT" {
t.Errorf("method = %s, want PUT", r.Method)
}
if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath {
t.Errorf("path = %s, want %s", r.Path, wpath)
}
if r.Val != "2.0.0" {
t.Errorf("val = %s, want %s", r.Val, "2.0.0")
}
}
func TestStopNotify(t *testing.T) {
s := &EtcdServer{
stop: make(chan struct{}),

View File

@ -384,6 +384,7 @@ func (c *cluster) Launch(t *testing.T) {
}
// wait cluster to be stable to receive future client requests
c.waitMembersMatch(t, c.HTTPMembers())
c.waitVersion()
}
func (c *cluster) URL(i int) string {
@ -537,6 +538,17 @@ func (c *cluster) waitLeader(t *testing.T, membs []*member) {
}
}
func (c *cluster) waitVersion() {
for _, m := range c.Members {
for {
if m.s.ClusterVersion() != nil {
break
}
time.Sleep(tickDuration)
}
}
}
func (c *cluster) name(i int) string {
return fmt.Sprint("node", i)
}

View File

@ -53,19 +53,19 @@ func TestV2Set(t *testing.T) {
"/v2/keys/foo/bar",
v,
http.StatusCreated,
`{"action":"set","node":{"key":"/foo/bar","value":"bar","modifiedIndex":7,"createdIndex":7}}`,
`{"action":"set","node":{"key":"/foo/bar","value":"bar","modifiedIndex":8,"createdIndex":8}}`,
},
{
"/v2/keys/foodir?dir=true",
url.Values{},
http.StatusCreated,
`{"action":"set","node":{"key":"/foodir","dir":true,"modifiedIndex":8,"createdIndex":8}}`,
`{"action":"set","node":{"key":"/foodir","dir":true,"modifiedIndex":9,"createdIndex":9}}`,
},
{
"/v2/keys/fooempty",
url.Values(map[string][]string{"value": {""}}),
http.StatusCreated,
`{"action":"set","node":{"key":"/fooempty","value":"","modifiedIndex":9,"createdIndex":9}}`,
`{"action":"set","node":{"key":"/fooempty","value":"","modifiedIndex":10,"createdIndex":10}}`,
},
}
@ -214,12 +214,12 @@ func TestV2CAS(t *testing.T) {
},
{
"/v2/keys/cas/foo",
url.Values(map[string][]string{"value": {"YYY"}, "prevIndex": {"7"}}),
url.Values(map[string][]string{"value": {"YYY"}, "prevIndex": {"8"}}),
http.StatusOK,
map[string]interface{}{
"node": map[string]interface{}{
"value": "YYY",
"modifiedIndex": float64(8),
"modifiedIndex": float64(9),
},
"action": "compareAndSwap",
},
@ -231,8 +231,8 @@ func TestV2CAS(t *testing.T) {
map[string]interface{}{
"errorCode": float64(101),
"message": "Compare failed",
"cause": "[10 != 8]",
"index": float64(8),
"cause": "[10 != 9]",
"index": float64(9),
},
},
{
@ -281,7 +281,7 @@ func TestV2CAS(t *testing.T) {
map[string]interface{}{
"errorCode": float64(101),
"message": "Compare failed",
"cause": "[bad_value != ZZZ] [100 != 9]",
"cause": "[bad_value != ZZZ] [100 != 10]",
},
},
{
@ -291,12 +291,12 @@ func TestV2CAS(t *testing.T) {
map[string]interface{}{
"errorCode": float64(101),
"message": "Compare failed",
"cause": "[100 != 9]",
"cause": "[100 != 10]",
},
},
{
"/v2/keys/cas/foo",
url.Values(map[string][]string{"value": {"XXX"}, "prevValue": {"bad_value"}, "prevIndex": {"9"}}),
url.Values(map[string][]string{"value": {"XXX"}, "prevValue": {"bad_value"}, "prevIndex": {"10"}}),
http.StatusPreconditionFailed,
map[string]interface{}{
"errorCode": float64(101),
@ -446,7 +446,7 @@ func TestV2CAD(t *testing.T) {
map[string]interface{}{
"errorCode": float64(101),
"message": "Compare failed",
"cause": "[100 != 7]",
"cause": "[100 != 8]",
},
},
{
@ -458,12 +458,12 @@ func TestV2CAD(t *testing.T) {
},
},
{
"/v2/keys/foo?prevIndex=7",
"/v2/keys/foo?prevIndex=8",
http.StatusOK,
map[string]interface{}{
"node": map[string]interface{}{
"key": "/foo",
"modifiedIndex": float64(9),
"modifiedIndex": float64(10),
},
"action": "compareAndDelete",
},
@ -491,7 +491,7 @@ func TestV2CAD(t *testing.T) {
map[string]interface{}{
"node": map[string]interface{}{
"key": "/foovalue",
"modifiedIndex": float64(10),
"modifiedIndex": float64(11),
},
"action": "compareAndDelete",
},
@ -529,7 +529,7 @@ func TestV2Unique(t *testing.T) {
http.StatusCreated,
map[string]interface{}{
"node": map[string]interface{}{
"key": "/foo/7",
"key": "/foo/8",
"value": "XXX",
},
"action": "create",
@ -541,7 +541,7 @@ func TestV2Unique(t *testing.T) {
http.StatusCreated,
map[string]interface{}{
"node": map[string]interface{}{
"key": "/foo/8",
"key": "/foo/9",
"value": "XXX",
},
"action": "create",
@ -553,7 +553,7 @@ func TestV2Unique(t *testing.T) {
http.StatusCreated,
map[string]interface{}{
"node": map[string]interface{}{
"key": "/bar/9",
"key": "/bar/10",
"value": "XXX",
},
"action": "create",
@ -615,8 +615,8 @@ func TestV2Get(t *testing.T) {
map[string]interface{}{
"key": "/foo/bar",
"dir": true,
"createdIndex": float64(7),
"modifiedIndex": float64(7),
"createdIndex": float64(8),
"modifiedIndex": float64(8),
},
},
},
@ -634,14 +634,14 @@ func TestV2Get(t *testing.T) {
map[string]interface{}{
"key": "/foo/bar",
"dir": true,
"createdIndex": float64(7),
"modifiedIndex": float64(7),
"createdIndex": float64(8),
"modifiedIndex": float64(8),
"nodes": []interface{}{
map[string]interface{}{
"key": "/foo/bar/zar",
"value": "XXX",
"createdIndex": float64(7),
"modifiedIndex": float64(7),
"createdIndex": float64(8),
"modifiedIndex": float64(8),
},
},
},
@ -709,8 +709,8 @@ func TestV2QuorumGet(t *testing.T) {
map[string]interface{}{
"key": "/foo/bar",
"dir": true,
"createdIndex": float64(7),
"modifiedIndex": float64(7),
"createdIndex": float64(8),
"modifiedIndex": float64(8),
},
},
},
@ -728,14 +728,14 @@ func TestV2QuorumGet(t *testing.T) {
map[string]interface{}{
"key": "/foo/bar",
"dir": true,
"createdIndex": float64(7),
"modifiedIndex": float64(7),
"createdIndex": float64(8),
"modifiedIndex": float64(8),
"nodes": []interface{}{
map[string]interface{}{
"key": "/foo/bar/zar",
"value": "XXX",
"createdIndex": float64(7),
"modifiedIndex": float64(7),
"createdIndex": float64(8),
"modifiedIndex": float64(8),
},
},
},
@ -781,7 +781,7 @@ func TestV2Watch(t *testing.T) {
"node": map[string]interface{}{
"key": "/foo/bar",
"value": "XXX",
"modifiedIndex": float64(7),
"modifiedIndex": float64(8),
},
"action": "set",
}
@ -802,7 +802,7 @@ func TestV2WatchWithIndex(t *testing.T) {
var body map[string]interface{}
c := make(chan bool, 1)
go func() {
resp, _ := tc.Get(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?wait=true&waitIndex=8"))
resp, _ := tc.Get(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?wait=true&waitIndex=9"))
body = tc.ReadBodyJSON(resp)
c <- true
}()
@ -839,7 +839,7 @@ func TestV2WatchWithIndex(t *testing.T) {
"node": map[string]interface{}{
"key": "/foo/bar",
"value": "XXX",
"modifiedIndex": float64(8),
"modifiedIndex": float64(9),
},
"action": "set",
}