diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 908aeed29..f9c60fe1e 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -26,6 +26,7 @@ import ( "strings" "sync" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver" "github.com/coreos/etcd/pkg/flags" "github.com/coreos/etcd/pkg/netutil" "github.com/coreos/etcd/pkg/types" @@ -60,7 +61,8 @@ type Cluster struct { token string store store.Store - sync.Mutex // guards members and removed map + sync.Mutex // guards the fields below + version *semver.Version members map[types.ID]*Member // removed contains the ids of removed members in the cluster. // removed id cannot be reused. @@ -100,6 +102,7 @@ func NewClusterFromStore(token string, st store.Store) *Cluster { c := newCluster(token) c.store = st c.members, c.removed = membersFromStore(c.store) + c.version = clusterVersionFromStore(c.store) return c } @@ -232,6 +235,7 @@ func (c *Cluster) SetStore(st store.Store) { c.store = st } func (c *Cluster) Recover() { c.members, c.removed = membersFromStore(c.store) + c.version = clusterVersionFromStore(c.store) } // ValidateConfigurationChange takes a proposed ConfChange and @@ -347,6 +351,26 @@ func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { c.members[id].RaftAttributes = raftAttr } +func (c *Cluster) Version() *semver.Version { + c.Lock() + defer c.Unlock() + if c.version == nil { + return nil + } + return semver.Must(semver.NewVersion(c.version.String())) +} + +func (c *Cluster) SetVersion(ver *semver.Version) { + c.Lock() + defer c.Unlock() + if c.version != nil { + log.Printf("etcdsever: updated the cluster version from %v to %v", c.version.String(), ver.String()) + } else { + log.Printf("etcdsever: set the initial cluster version to %v", ver.String()) + } + c.version = ver +} + // Validate ensures that there is no identical urls in the cluster peer list func (c *Cluster) Validate() error { urlMap := make(map[string]bool) @@ -392,6 +416,17 @@ func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) return members, removed } +func clusterVersionFromStore(st store.Store) *semver.Version { + e, err := st.Get(path.Join(StoreClusterPrefix, "version"), false, false) + if err != nil { + if isKeyNotFound(err) { + return nil + } + log.Panicf("etcdserver: unexpected error (%v) when getting cluster version from store", err) + } + return semver.Must(semver.NewVersion(*e.Node.Value)) +} + // ValidateClusterAndAssignIDs validates the local cluster by matching the PeerURLs // with the existing cluster. If the validation succeeds, it assigns the IDs // from the existing cluster to the local cluster. diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 52df62419..529890819 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -21,6 +21,7 @@ import ( "reflect" "testing" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" @@ -79,33 +80,48 @@ func TestClusterFromStringBad(t *testing.T) { func TestClusterFromStore(t *testing.T) { tests := []struct { mems []*Member + ver *semver.Version }{ { []*Member{newTestMember(1, nil, "", nil)}, + semver.Must(semver.NewVersion("2.0.0")), }, { nil, + nil, }, { []*Member{ newTestMember(1, nil, "", nil), newTestMember(2, nil, "", nil), }, + semver.Must(semver.NewVersion("2.0.0")), }, } for i, tt := range tests { + st := store.New() hc := newTestCluster(nil) - hc.SetStore(store.New()) + hc.SetStore(st) for _, m := range tt.mems { hc.AddMember(m) } - c := NewClusterFromStore("abc", hc.store) + if tt.ver != nil { + _, err := st.Set(path.Join(StoreClusterPrefix, "version"), false, tt.ver.String(), store.Permanent) + if err != nil { + t.Fatal(err) + } + } + + c := NewClusterFromStore("abc", st) if c.token != "abc" { t.Errorf("#%d: token = %v, want %v", i, c.token, "abc") } if !reflect.DeepEqual(c.Members(), tt.mems) { t.Errorf("#%d: members = %v, want %v", i, c.Members(), tt.mems) } + if !reflect.DeepEqual(c.Version(), tt.ver) { + t.Errorf("#%d: ver = %v, want %v", i, c.Version(), tt.ver) + } } } diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index de3c6efb9..203bbd0f5 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -156,3 +156,34 @@ func decideClusterVersion(vers map[string]string) *semver.Version { } return cv } + +// getVersion returns the version of the given member via its +// peerURLs. Returns the last error if it fails to get the version. +func getVersion(m *Member, tr *http.Transport) (string, error) { + cc := &http.Client{ + Transport: tr, + Timeout: time.Second, + } + var ( + err error + resp *http.Response + ) + + for _, u := range m.PeerURLs { + resp, err = cc.Get(u + "/version") + if err != nil { + continue + } + b, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + continue + } + var vers version.Versions + if err := json.Unmarshal(b, &vers); err != nil { + continue + } + return vers.Server, nil + } + return "", err +} diff --git a/etcdserver/member.go b/etcdserver/member.go index 36c3d61a1..aa125163f 100644 --- a/etcdserver/member.go +++ b/etcdserver/member.go @@ -19,17 +19,14 @@ import ( "encoding/binary" "encoding/json" "fmt" - "io/ioutil" "log" "math/rand" - "net/http" "path" "sort" "time" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/store" - "github.com/coreos/etcd/version" ) // RaftAttributes represents the raft related attributes of an etcd member. @@ -152,37 +149,6 @@ func nodeToMember(n *store.NodeExtern) (*Member, error) { return m, nil } -// getVersion returns the version of the given member via its -// peerURLs. Returns the last error if it fails to get the version. -func getVersion(m *Member, tr *http.Transport) (string, error) { - cc := &http.Client{ - Transport: tr, - Timeout: time.Second, - } - var ( - err error - resp *http.Response - ) - - for _, u := range m.PeerURLs { - resp, err = cc.Get(u + "/version") - if err != nil { - continue - } - b, err := ioutil.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - continue - } - var vers version.Versions - if err := json.Unmarshal(b, &vers); err != nil { - continue - } - return vers.Server, nil - } - return "", err -} - // implement sort by ID interface type SortableMemberSlice []*Member diff --git a/etcdserver/server.go b/etcdserver/server.go index 8c7bc54f2..c3a31e8d0 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -23,7 +23,6 @@ import ( "net/http" "path" "regexp" - "sync" "sync/atomic" "time" @@ -62,7 +61,8 @@ const ( StoreKeysPrefix = "/1" purgeFileInterval = 30 * time.Second - monitorVersionInterval = 10 * time.Second + monitorVersionInterval = 5 * time.Second + versionUpdateTimeout = 1 * time.Second ) var ( @@ -127,11 +127,16 @@ type Server interface { // Cluster version is set to the min version that a etcd member is // compatible with when first bootstrap. // + // ClusterVersion is nil until the cluster is bootstrapped (has a quorum). + // // During a rolling upgrades, the ClusterVersion will be updated - // automatically after a sync. (10 second by default) + // automatically after a sync. (5 second by default) // // The API/raft component can utilize ClusterVersion to determine if // it can accept a client request or a raft RPC. + // NOTE: ClusterVersion might be nil when etcd 2.1 works with etcd 2.0 and + // the leader is etcd 2.0. etcd 2.0 leader will not update clusterVersion since + // this feature is introduced post 2.0. ClusterVersion() *semver.Version } @@ -160,8 +165,9 @@ type EtcdServer struct { reqIDGen *idutil.Generator - verMu sync.Mutex - clusterVersion *semver.Version + // forceVersionC is used to force the version monitor loop + // to detect the cluster version immediately. + forceVersionC chan struct{} } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -280,14 +286,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { raftStorage: s, storage: NewStorage(w, ss), }, - id: id, - attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - Cluster: cfg.Cluster, - stats: sstats, - lstats: lstats, - SyncTicker: time.Tick(500 * time.Millisecond), - reqIDGen: idutil.NewGenerator(uint8(id), time.Now()), - clusterVersion: semver.Must(semver.NewVersion(version.MinClusterVersion)), + id: id, + attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, + Cluster: cfg.Cluster, + stats: sstats, + lstats: lstats, + SyncTicker: time.Tick(500 * time.Millisecond), + reqIDGen: idutil.NewGenerator(uint8(id), time.Now()), + forceVersionC: make(chan struct{}), } // TODO: move transport initialization near the definition of remote @@ -329,6 +335,11 @@ func (s *EtcdServer) start() { s.w = wait.New() s.done = make(chan struct{}) s.stop = make(chan struct{}) + if s.ClusterVersion() != nil { + log.Printf("etcdserver: starting server... [version: %v, cluster version: %v]", version.Version, s.ClusterVersion()) + } else { + log.Printf("etcdserver: starting server... [version: %v, cluster version: to_be_decided]", version.Version) + } // TODO: if this is an empty log, writes all peer infos // into the first entry go s.run() @@ -709,6 +720,10 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint // raft state machine may generate noop entry when leader confirmation. // skip it in advance to avoid some potential bug in the future if len(e.Data) == 0 { + select { + case s.forceVersionC <- struct{}{}: + default: + } break } var r pb.Request @@ -754,6 +769,8 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { case r.PrevIndex > 0 || r.PrevValue != "": return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr)) default: + // TODO (yicheng): cluster should be the owner of cluster prefix store + // we should not modify cluster store here. if storeMemberAttributeRegexp.MatchString(r.Path) { id := mustParseMemberIDFromKey(path.Dir(r.Path)) var attr Attributes @@ -762,6 +779,9 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { } s.Cluster.UpdateAttributes(id, attr) } + if r.Path == path.Join(StoreClusterPrefix, "version") { + s.Cluster.SetVersion(semver.Must(semver.NewVersion(r.Val))) + } return f(s.store.Set(r.Path, r.Dir, r.Val, expr)) } case "DELETE": @@ -883,10 +903,10 @@ func (s *EtcdServer) PauseSending() { s.r.pauseSending() } func (s *EtcdServer) ResumeSending() { s.r.resumeSending() } func (s *EtcdServer) ClusterVersion() *semver.Version { - s.verMu.Lock() - defer s.verMu.Unlock() - // deep copy - return semver.Must(semver.NewVersion(s.clusterVersion.String())) + if s.Cluster == nil { + return nil + } + return s.Cluster.Version() } // monitorVersions checks the member's version every monitorVersion interval. @@ -896,24 +916,66 @@ func (s *EtcdServer) ClusterVersion() *semver.Version { func (s *EtcdServer) monitorVersions() { for { select { + case <-s.forceVersionC: case <-time.After(monitorVersionInterval): - v := decideClusterVersion(getVersions(s.Cluster, s.cfg.Transport)) - if v == nil { - continue - } - - s.verMu.Lock() - // clear patch version - v.Patch = 0 - if s.clusterVersion.LessThan(*v) { - log.Printf("etcdsever: updated the cluster version from %v to %v", s.clusterVersion, v.String()) - // TODO: persist the version upgrade via raft. Then etcdserver will be able to use the - // upgraded version without syncing with others after a restart. - s.clusterVersion = v - } - s.verMu.Unlock() case <-s.done: return } + + if s.Leader() != s.ID() { + continue + } + + v := decideClusterVersion(getVersions(s.Cluster, s.cfg.Transport)) + if v != nil { + // only keep major.minor version for comparasion + v = &semver.Version{ + Major: v.Major, + Minor: v.Minor, + } + } + + // if the current version is nil: + // 1. use the decided version if possible + // 2. or use the min cluster version + if s.Cluster.Version() == nil { + if v != nil { + go s.updateClusterVersion(v.String()) + } else { + go s.updateClusterVersion(version.MinClusterVersion) + } + continue + } + + // update cluster version only if the decided version is greater than + // the current cluster version + if v != nil && s.Cluster.Version().LessThan(*v) { + go s.updateClusterVersion(v.String()) + } + } +} + +func (s *EtcdServer) updateClusterVersion(ver string) { + if s.Cluster.Version() == nil { + log.Printf("etcdsever: setting up the initial cluster version to %v", ver) + } else { + log.Printf("etcdsever: updating the cluster version from %v to %v", s.Cluster.Version(), ver) + } + req := pb.Request{ + Method: "PUT", + Path: path.Join(StoreClusterPrefix, "version"), + Val: ver, + } + ctx, cancel := context.WithTimeout(context.Background(), versionUpdateTimeout) + _, err := s.Do(ctx, req) + cancel() + switch err { + case nil: + return + case ErrStopped: + log.Printf("etcdserver: aborting update cluster version because server is stopped") + return + default: + log.Printf("etcdserver: error updating cluster version (%v)", err) } } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 8da0a36f0..a5ac90490 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1041,6 +1041,45 @@ func TestPublishRetry(t *testing.T) { } } +func TestUpdateVersion(t *testing.T) { + n := &nodeRecorder{} + ch := make(chan interface{}, 1) + // simulate that request has gone through consensus + ch <- Response{} + w := &waitWithResponse{ch: ch} + srv := &EtcdServer{ + id: 1, + r: raftNode{Node: n}, + attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, + Cluster: &Cluster{}, + w: w, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + } + srv.updateClusterVersion("2.0.0") + + action := n.Action() + if len(action) != 1 { + t.Fatalf("len(action) = %d, want 1", len(action)) + } + if action[0].Name != "Propose" { + t.Fatalf("action = %s, want Propose", action[0].Name) + } + data := action[0].Params[0].([]byte) + var r pb.Request + if err := r.Unmarshal(data); err != nil { + t.Fatalf("unmarshal request error: %v", err) + } + if r.Method != "PUT" { + t.Errorf("method = %s, want PUT", r.Method) + } + if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath { + t.Errorf("path = %s, want %s", r.Path, wpath) + } + if r.Val != "2.0.0" { + t.Errorf("val = %s, want %s", r.Val, "2.0.0") + } +} + func TestStopNotify(t *testing.T) { s := &EtcdServer{ stop: make(chan struct{}), diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 411762427..7c93da0bd 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -384,6 +384,7 @@ func (c *cluster) Launch(t *testing.T) { } // wait cluster to be stable to receive future client requests c.waitMembersMatch(t, c.HTTPMembers()) + c.waitVersion() } func (c *cluster) URL(i int) string { @@ -537,6 +538,17 @@ func (c *cluster) waitLeader(t *testing.T, membs []*member) { } } +func (c *cluster) waitVersion() { + for _, m := range c.Members { + for { + if m.s.ClusterVersion() != nil { + break + } + time.Sleep(tickDuration) + } + } +} + func (c *cluster) name(i int) string { return fmt.Sprint("node", i) } diff --git a/integration/v2_http_kv_test.go b/integration/v2_http_kv_test.go index 8b9525ed1..3bc6b313c 100644 --- a/integration/v2_http_kv_test.go +++ b/integration/v2_http_kv_test.go @@ -53,19 +53,19 @@ func TestV2Set(t *testing.T) { "/v2/keys/foo/bar", v, http.StatusCreated, - `{"action":"set","node":{"key":"/foo/bar","value":"bar","modifiedIndex":7,"createdIndex":7}}`, + `{"action":"set","node":{"key":"/foo/bar","value":"bar","modifiedIndex":8,"createdIndex":8}}`, }, { "/v2/keys/foodir?dir=true", url.Values{}, http.StatusCreated, - `{"action":"set","node":{"key":"/foodir","dir":true,"modifiedIndex":8,"createdIndex":8}}`, + `{"action":"set","node":{"key":"/foodir","dir":true,"modifiedIndex":9,"createdIndex":9}}`, }, { "/v2/keys/fooempty", url.Values(map[string][]string{"value": {""}}), http.StatusCreated, - `{"action":"set","node":{"key":"/fooempty","value":"","modifiedIndex":9,"createdIndex":9}}`, + `{"action":"set","node":{"key":"/fooempty","value":"","modifiedIndex":10,"createdIndex":10}}`, }, } @@ -214,12 +214,12 @@ func TestV2CAS(t *testing.T) { }, { "/v2/keys/cas/foo", - url.Values(map[string][]string{"value": {"YYY"}, "prevIndex": {"7"}}), + url.Values(map[string][]string{"value": {"YYY"}, "prevIndex": {"8"}}), http.StatusOK, map[string]interface{}{ "node": map[string]interface{}{ "value": "YYY", - "modifiedIndex": float64(8), + "modifiedIndex": float64(9), }, "action": "compareAndSwap", }, @@ -231,8 +231,8 @@ func TestV2CAS(t *testing.T) { map[string]interface{}{ "errorCode": float64(101), "message": "Compare failed", - "cause": "[10 != 8]", - "index": float64(8), + "cause": "[10 != 9]", + "index": float64(9), }, }, { @@ -281,7 +281,7 @@ func TestV2CAS(t *testing.T) { map[string]interface{}{ "errorCode": float64(101), "message": "Compare failed", - "cause": "[bad_value != ZZZ] [100 != 9]", + "cause": "[bad_value != ZZZ] [100 != 10]", }, }, { @@ -291,12 +291,12 @@ func TestV2CAS(t *testing.T) { map[string]interface{}{ "errorCode": float64(101), "message": "Compare failed", - "cause": "[100 != 9]", + "cause": "[100 != 10]", }, }, { "/v2/keys/cas/foo", - url.Values(map[string][]string{"value": {"XXX"}, "prevValue": {"bad_value"}, "prevIndex": {"9"}}), + url.Values(map[string][]string{"value": {"XXX"}, "prevValue": {"bad_value"}, "prevIndex": {"10"}}), http.StatusPreconditionFailed, map[string]interface{}{ "errorCode": float64(101), @@ -446,7 +446,7 @@ func TestV2CAD(t *testing.T) { map[string]interface{}{ "errorCode": float64(101), "message": "Compare failed", - "cause": "[100 != 7]", + "cause": "[100 != 8]", }, }, { @@ -458,12 +458,12 @@ func TestV2CAD(t *testing.T) { }, }, { - "/v2/keys/foo?prevIndex=7", + "/v2/keys/foo?prevIndex=8", http.StatusOK, map[string]interface{}{ "node": map[string]interface{}{ "key": "/foo", - "modifiedIndex": float64(9), + "modifiedIndex": float64(10), }, "action": "compareAndDelete", }, @@ -491,7 +491,7 @@ func TestV2CAD(t *testing.T) { map[string]interface{}{ "node": map[string]interface{}{ "key": "/foovalue", - "modifiedIndex": float64(10), + "modifiedIndex": float64(11), }, "action": "compareAndDelete", }, @@ -529,7 +529,7 @@ func TestV2Unique(t *testing.T) { http.StatusCreated, map[string]interface{}{ "node": map[string]interface{}{ - "key": "/foo/7", + "key": "/foo/8", "value": "XXX", }, "action": "create", @@ -541,7 +541,7 @@ func TestV2Unique(t *testing.T) { http.StatusCreated, map[string]interface{}{ "node": map[string]interface{}{ - "key": "/foo/8", + "key": "/foo/9", "value": "XXX", }, "action": "create", @@ -553,7 +553,7 @@ func TestV2Unique(t *testing.T) { http.StatusCreated, map[string]interface{}{ "node": map[string]interface{}{ - "key": "/bar/9", + "key": "/bar/10", "value": "XXX", }, "action": "create", @@ -615,8 +615,8 @@ func TestV2Get(t *testing.T) { map[string]interface{}{ "key": "/foo/bar", "dir": true, - "createdIndex": float64(7), - "modifiedIndex": float64(7), + "createdIndex": float64(8), + "modifiedIndex": float64(8), }, }, }, @@ -634,14 +634,14 @@ func TestV2Get(t *testing.T) { map[string]interface{}{ "key": "/foo/bar", "dir": true, - "createdIndex": float64(7), - "modifiedIndex": float64(7), + "createdIndex": float64(8), + "modifiedIndex": float64(8), "nodes": []interface{}{ map[string]interface{}{ "key": "/foo/bar/zar", "value": "XXX", - "createdIndex": float64(7), - "modifiedIndex": float64(7), + "createdIndex": float64(8), + "modifiedIndex": float64(8), }, }, }, @@ -709,8 +709,8 @@ func TestV2QuorumGet(t *testing.T) { map[string]interface{}{ "key": "/foo/bar", "dir": true, - "createdIndex": float64(7), - "modifiedIndex": float64(7), + "createdIndex": float64(8), + "modifiedIndex": float64(8), }, }, }, @@ -728,14 +728,14 @@ func TestV2QuorumGet(t *testing.T) { map[string]interface{}{ "key": "/foo/bar", "dir": true, - "createdIndex": float64(7), - "modifiedIndex": float64(7), + "createdIndex": float64(8), + "modifiedIndex": float64(8), "nodes": []interface{}{ map[string]interface{}{ "key": "/foo/bar/zar", "value": "XXX", - "createdIndex": float64(7), - "modifiedIndex": float64(7), + "createdIndex": float64(8), + "modifiedIndex": float64(8), }, }, }, @@ -781,7 +781,7 @@ func TestV2Watch(t *testing.T) { "node": map[string]interface{}{ "key": "/foo/bar", "value": "XXX", - "modifiedIndex": float64(7), + "modifiedIndex": float64(8), }, "action": "set", } @@ -802,7 +802,7 @@ func TestV2WatchWithIndex(t *testing.T) { var body map[string]interface{} c := make(chan bool, 1) go func() { - resp, _ := tc.Get(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?wait=true&waitIndex=8")) + resp, _ := tc.Get(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?wait=true&waitIndex=9")) body = tc.ReadBodyJSON(resp) c <- true }() @@ -839,7 +839,7 @@ func TestV2WatchWithIndex(t *testing.T) { "node": map[string]interface{}{ "key": "/foo/bar", "value": "XXX", - "modifiedIndex": float64(8), + "modifiedIndex": float64(9), }, "action": "set", }