diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index a1b2919dd..b061a6295 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -35,10 +35,11 @@ const ( var errClosed = errors.New("etcdhttp: client closed connection") // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests. -func NewClientHandler(server etcdserver.Server, peers Peers, timeout time.Duration) http.Handler { +func NewClientHandler(server *etcdserver.EtcdServer, peers Peers, timeout time.Duration) http.Handler { sh := &serverHandler{ server: server, peers: peers, + timer: server, timeout: timeout, } if sh.timeout == 0 { @@ -69,6 +70,7 @@ func NewPeerHandler(server etcdserver.Server) http.Handler { type serverHandler struct { timeout time.Duration server etcdserver.Server + timer etcdserver.RaftTimer peers Peers } @@ -94,14 +96,14 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) { switch { case resp.Event != nil: - if err := writeEvent(w, resp.Event); err != nil { + if err := writeEvent(w, resp.Event, h.timer); err != nil { // Should never be reached - log.Println("error writing event: %v", err) + log.Printf("error writing event: %v", err) } case resp.Watcher != nil: ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout) defer cancel() - handleWatch(ctx, w, resp.Watcher, rr.Stream) + handleWatch(ctx, w, resp.Watcher, rr.Stream, h.timer) default: writeError(w, errors.New("received response with no Event/Watcher!")) } @@ -325,12 +327,14 @@ func writeError(w http.ResponseWriter, err error) { // writeEvent serializes a single Event and writes the resulting // JSON to the given ResponseWriter, along with the appropriate // headers -func writeEvent(w http.ResponseWriter, ev *store.Event) error { +func writeEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error { if ev == nil { return errors.New("cannot write empty Event!") } w.Header().Set("Content-Type", "application/json") - w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex)) + w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex)) + w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index())) + w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term())) if ev.IsCreated() { w.WriteHeader(http.StatusCreated) @@ -339,7 +343,7 @@ func writeEvent(w http.ResponseWriter, ev *store.Event) error { return json.NewEncoder(w).Encode(ev) } -func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool) { +func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) { defer wa.Remove() ech := wa.EventChan() var nch <-chan bool @@ -348,6 +352,8 @@ func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, s } w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index())) + w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term())) w.WriteHeader(http.StatusOK) // Ensure headers are flushed early, in case of long polling diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 818750b98..8667567cc 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -499,10 +499,15 @@ func TestWriteError(t *testing.T) { } } +type dummyRaftTimer struct{} + +func (drt dummyRaftTimer) Index() int64 { return int64(100) } +func (drt dummyRaftTimer) Term() int64 { return int64(5) } + func TestWriteEvent(t *testing.T) { // nil event should not panic rw := httptest.NewRecorder() - writeEvent(rw, nil) + writeEvent(rw, nil, dummyRaftTimer{}) h := rw.Header() if len(h) > 0 { t.Fatalf("unexpected non-empty headers: %#v", h) @@ -545,10 +550,16 @@ func TestWriteEvent(t *testing.T) { for i, tt := range tests { rw := httptest.NewRecorder() - writeEvent(rw, tt.ev) + writeEvent(rw, tt.ev, dummyRaftTimer{}) if gct := rw.Header().Get("Content-Type"); gct != "application/json" { t.Errorf("case %d: bad Content-Type: got %q, want application/json", i, gct) } + if gri := rw.Header().Get("X-Raft-Index"); gri != "100" { + t.Errorf("case %d: bad X-Raft-Index header: got %s, want %s", i, gri, "100") + } + if grt := rw.Header().Get("X-Raft-Term"); grt != "5" { + t.Errorf("case %d: bad X-Raft-Term header: got %s, want %s", i, grt, "5") + } if gei := rw.Header().Get("X-Etcd-Index"); gei != tt.idx { t.Errorf("case %d: bad X-Etcd-Index header: got %s, want %s", i, gei, tt.idx) } @@ -970,6 +981,7 @@ func TestServeKeysEvent(t *testing.T) { timeout: time.Hour, server: server, peers: nil, + timer: &dummyRaftTimer{}, } rw := httptest.NewRecorder() @@ -1008,6 +1020,7 @@ func TestServeKeysWatch(t *testing.T) { timeout: time.Hour, server: server, peers: nil, + timer: &dummyRaftTimer{}, } go func() { ec <- &store.Event{ @@ -1047,10 +1060,12 @@ func TestHandleWatch(t *testing.T) { Node: &store.NodeExtern{}, } - handleWatch(context.Background(), rw, wa, false) + handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{}) wcode := http.StatusOK wct := "application/json" + wri := "100" + wrt := "5" wbody := mustMarshalEvent( t, &store.Event{ @@ -1066,6 +1081,12 @@ func TestHandleWatch(t *testing.T) { if ct := h.Get("Content-Type"); ct != wct { t.Errorf("Content-Type=%q, want %q", ct, wct) } + if ri := h.Get("X-Raft-Index"); ri != wri { + t.Errorf("X-Raft-Index=%q, want %q", ri, wri) + } + if rt := h.Get("X-Raft-Term"); rt != wrt { + t.Errorf("X-Raft-Term=%q, want %q", rt, wrt) + } g := rw.Body.String() if g != wbody { t.Errorf("got body=%#v, want %#v", g, wbody) @@ -1079,7 +1100,7 @@ func TestHandleWatchNoEvent(t *testing.T) { } close(wa.echan) - handleWatch(context.Background(), rw, wa, false) + handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{}) wcode := http.StatusOK wct := "application/json" @@ -1115,7 +1136,7 @@ func TestHandleWatchCloseNotified(t *testing.T) { rw.cn <- true wa := &dummyWatcher{} - handleWatch(context.Background(), rw, wa, false) + handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{}) wcode := http.StatusOK wct := "application/json" @@ -1141,7 +1162,7 @@ func TestHandleWatchTimeout(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - handleWatch(ctx, rw, wa, false) + handleWatch(ctx, rw, wa, false, dummyRaftTimer{}) wcode := http.StatusOK wct := "application/json" @@ -1184,7 +1205,7 @@ func TestHandleWatchStreaming(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) done := make(chan struct{}) go func() { - handleWatch(ctx, rw, wa, true) + handleWatch(ctx, rw, wa, true, dummyRaftTimer{}) close(done) }() diff --git a/etcdserver/server.go b/etcdserver/server.go index 5e6a5a4ca..a20d05030 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -4,6 +4,7 @@ import ( "errors" "log" "math/rand" + "sync/atomic" "time" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -66,6 +67,11 @@ type Server interface { Process(ctx context.Context, m raftpb.Message) error } +type RaftTimer interface { + Index() int64 + Term() int64 +} + // EtcdServer is the production implementation of the Server interface type EtcdServer struct { w wait.Wait @@ -86,6 +92,10 @@ type EtcdServer struct { SyncTicker <-chan time.Time SnapCount int64 // number of entries to trigger a snapshot + + // Cache of the latest raft index and raft term the server has seen + raftIndex int64 + raftTerm int64 } // Start prepares and starts server in a new goroutine. It is no longer safe to @@ -138,6 +148,8 @@ func (s *EtcdServer) run() { default: panic("unexpected entry type") } + atomic.StoreInt64(&s.raftIndex, e.Index) + atomic.StoreInt64(&s.raftTerm, e.Term) appliedi = e.Index } @@ -249,6 +261,15 @@ func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error { return s.configure(ctx, cc) } +// Implement the RaftTimer interface +func (s *EtcdServer) Index() int64 { + return atomic.LoadInt64(&s.raftIndex) +} + +func (s *EtcdServer) Term() int64 { + return atomic.LoadInt64(&s.raftTerm) +} + // configure sends configuration change through consensus then performs it. // It will block until the change is performed or there is an error. func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {