diff --git a/config/cluster_config.go b/config/cluster_config.go index 58ae65e74..47df9e3be 100644 --- a/config/cluster_config.go +++ b/config/cluster_config.go @@ -23,3 +23,15 @@ func NewClusterConfig() *ClusterConfig { SyncInterval: DefaultSyncInterval, } } + +func (c *ClusterConfig) Sanitize() { + if c.ActiveSize < MinActiveSize { + c.ActiveSize = MinActiveSize + } + if c.RemoveDelay < MinRemoveDelay { + c.RemoveDelay = MinRemoveDelay + } + if c.SyncInterval < MinSyncInterval { + c.SyncInterval = MinSyncInterval + } +} diff --git a/etcd/etcd.go b/etcd/etcd.go index cd1e3cc57..fa1fbc6bc 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -27,6 +27,9 @@ const ( v2LeaderPrefix = "/v2/leader" v2StoreStatsPrefix = "/v2/stats/store" + v2configKVPrefix = "/_etcd/config" + v2adminConfigPrefix = "/v2/admin/config" + raftPrefix = "/raft" ) @@ -103,6 +106,7 @@ func New(c *config.Config, id int64) *Server { m.Handle(v2peersPrefix, handlerErr(s.serveMachines)) m.Handle(v2LeaderPrefix, handlerErr(s.serveLeader)) m.Handle(v2StoreStatsPrefix, handlerErr(s.serveStoreStats)) + m.Handle(v2adminConfigPrefix, handlerErr(s.serveAdminConfig)) s.Handler = m return s } @@ -115,6 +119,16 @@ func (s *Server) RaftHandler() http.Handler { return s.t } +func (s *Server) ClusterConfig() *config.ClusterConfig { + c := config.NewClusterConfig() + // This is used for backward compatibility because it doesn't + // set cluster config in older version. + if e, err := s.Get(v2configKVPrefix, false, false); err == nil { + json.Unmarshal([]byte(*e.Node.Value), c) + } + return c +} + func (s *Server) Run() { if len(s.config.Peers) == 0 { s.Bootstrap() diff --git a/etcd/v2_admin.go b/etcd/v2_admin.go new file mode 100644 index 000000000..d836bf3ac --- /dev/null +++ b/etcd/v2_admin.go @@ -0,0 +1,36 @@ +package etcd + +import ( + "encoding/json" + "net/http" + + "github.com/coreos/etcd/store" +) + +func (s *Server) serveAdminConfig(w http.ResponseWriter, r *http.Request) error { + switch r.Method { + case "GET": + case "PUT": + if !s.node.IsLeader() { + return s.redirect(w, r, s.node.Leader()) + } + c := s.ClusterConfig() + if err := json.NewDecoder(r.Body).Decode(c); err != nil { + return err + } + c.Sanitize() + b, err := json.Marshal(c) + if err != nil { + return err + } + if _, err := s.Set(v2configKVPrefix, false, string(b), store.Permanent); err != nil { + return err + } + default: + return allow(w, "GET", "PUT") + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(s.ClusterConfig()) + return nil +} diff --git a/etcd/v2_http_endpoint_test.go b/etcd/v2_http_endpoint_test.go index 663172d64..19836c08c 100644 --- a/etcd/v2_http_endpoint_test.go +++ b/etcd/v2_http_endpoint_test.go @@ -1,6 +1,7 @@ package etcd import ( + "bytes" "encoding/json" "io/ioutil" "net/http" @@ -8,7 +9,9 @@ import ( "sort" "strings" "testing" + "time" + "github.com/coreos/etcd/config" "github.com/coreos/etcd/store" ) @@ -114,3 +117,123 @@ func TestStoreStatsEndPoint(t *testing.T) { } afterTest(t) } + +func TestGetAdminConfigEndPoint(t *testing.T) { + es, hs := buildCluster(3, false) + waitCluster(t, es) + + for i := range hs { + r, err := http.Get(hs[i].URL + v2adminConfigPrefix) + if err != nil { + t.Errorf("%v", err) + continue + } + if g := r.StatusCode; g != 200 { + t.Errorf("#%d: status = %d, want %d", i, g, 200) + } + if g := r.Header.Get("Content-Type"); g != "application/json" { + t.Errorf("#%d: ContentType = %d, want application/json", i, g) + } + + conf := new(config.ClusterConfig) + err = json.NewDecoder(r.Body).Decode(conf) + r.Body.Close() + if err != nil { + t.Errorf("%v", err) + continue + } + w := config.NewClusterConfig() + if !reflect.DeepEqual(conf, w) { + t.Errorf("#%d: config = %+v, want %+v", i, conf, w) + } + } + + for i := range es { + es[len(es)-i-1].Stop() + } + for i := range hs { + hs[len(hs)-i-1].Close() + } + afterTest(t) +} + +func TestPutAdminConfigEndPoint(t *testing.T) { + tests := []struct { + c, wc string + }{ + { + `{"activeSize":1,"removeDelay":1,"syncInterval":1}`, + `{"activeSize":3,"removeDelay":2,"syncInterval":1}`, + }, + { + `{"activeSize":5,"removeDelay":20.5,"syncInterval":1.5}`, + `{"activeSize":5,"removeDelay":20.5,"syncInterval":1.5}`, + }, + { + `{"activeSize":5 , "removeDelay":20 , "syncInterval": 2 }`, + `{"activeSize":5,"removeDelay":20,"syncInterval":2}`, + }, + { + `{"activeSize":3, "removeDelay":60}`, + `{"activeSize":3,"removeDelay":60,"syncInterval":5}`, + }, + } + + for i, tt := range tests { + es, hs := buildCluster(3, false) + waitCluster(t, es) + + r, err := NewTestClient().Put(hs[0].URL+v2adminConfigPrefix, "application/json", bytes.NewBufferString(tt.c)) + if err != nil { + t.Fatalf("%v", err) + } + b, err := ioutil.ReadAll(r.Body) + r.Body.Close() + if err != nil { + t.Fatalf("%v", err) + } + if wbody := append([]byte(tt.wc), '\n'); !reflect.DeepEqual(b, wbody) { + t.Errorf("#%d: put result = %s, want %s", i, b, wbody) + } + + barrier(t, 0, es) + + for j := range es { + e, err := es[j].Get(v2configKVPrefix, false, false) + if err != nil { + t.Errorf("%v", err) + continue + } + if g := *e.Node.Value; g != tt.wc { + t.Errorf("#%d.%d: %s = %s, want %s", i, j, v2configKVPrefix, g, tt.wc) + } + } + + for j := range es { + es[len(es)-j-1].Stop() + } + for j := range hs { + hs[len(hs)-j-1].Close() + } + afterTest(t) + } +} + +// barrier ensures that all servers have made further progress on applied index +// compared to the base one. +func barrier(t *testing.T, base int, es []*Server) { + applied := es[base].node.Applied() + // time used for goroutine scheduling + time.Sleep(5 * time.Millisecond) + for i, e := range es { + for j := 0; ; j++ { + if e.node.Applied() >= applied { + break + } + time.Sleep(defaultHeartbeat * defaultTickDuration) + if j == 2 { + t.Fatalf("#%d: applied = %d, want >= %d", i, e.node.Applied(), applied) + } + } + } +}