clear watchers before recover from a snapshot

This commit is contained in:
Xiang Li 2013-08-03 23:29:05 -07:00
parent 8f551e3dc1
commit 66f4e0aa19
5 changed files with 33 additions and 10 deletions

View File

@ -300,8 +300,8 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
}
if body, err := command.Apply(raftServer); err != nil {
warnf("Unable to do watch command: %v", err)
w.WriteHeader(http.StatusInternalServerError)
w.Write(newJsonError(500, key))
} else {
w.WriteHeader(http.StatusOK)

View File

@ -101,6 +101,10 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
// wait for the notification for any changing
res := <-watcher.C
if res == nil {
return nil, fmt.Errorf("watcher is cleared")
}
return json.Marshal(res)
}

View File

@ -20,7 +20,7 @@ func init() {
errors[201] = "PrevValue is Required in POST form"
errors[202] = "The given TTL in POST form is not a number"
errors[203] = "The given index in POST form is not a number"
// raft related errors
errors[300] = "Raft Internal Error"
errors[301] = "During Leader Election"
@ -28,6 +28,9 @@ func init() {
// keyword
errors[400] = "The prefix of the given key is a keyword in etcd"
// etcd related errors
errors[500] = "watcher is cleared due to etcd recovery"
}
type jsonError struct {

View File

@ -29,7 +29,7 @@ type Store struct {
messager *chan string
// A map to keep the recent response to the clients
ResponseMap map[string]Response
ResponseMap map[string]*Response
// The max number of the recent responses we can record
ResponseMaxSize int
@ -109,7 +109,7 @@ func CreateStore(max int) *Store {
s.messager = nil
s.ResponseMap = make(map[string]Response)
s.ResponseMap = make(map[string]*Response)
s.ResponseStartIndex = 0
s.ResponseMaxSize = max
s.ResponseCurrSize = 0
@ -502,7 +502,7 @@ func (s *Store) addToResponseMap(index uint64, resp *Response) {
}
strIndex := strconv.FormatUint(index, 10)
s.ResponseMap[strIndex] = *resp
s.ResponseMap[strIndex] = resp
// unlimited
if s.ResponseMaxSize < 0 {
@ -532,6 +532,11 @@ func (s *Store) Save() ([]byte, error) {
// Recovery the state of the stroage system from a previous state
func (s *Store) Recovery(state []byte) error {
// we need to stop all the current watchers
// recovery will clear watcherHub
s.watcher.stopWatchers()
err := json.Unmarshal(state, s)
// The only thing need to change after the recovery is the

View File

@ -19,7 +19,7 @@ type WatcherHub struct {
// Currently watcher only contains a response channel
type Watcher struct {
C chan Response
C chan *Response
}
// Create a new watcherHub
@ -31,12 +31,12 @@ func createWatcherHub() *WatcherHub {
// Create a new watcher
func CreateWatcher() *Watcher {
return &Watcher{C: make(chan Response, 1)}
return &Watcher{C: make(chan *Response, 1)}
}
// Add a watcher to the watcherHub
func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint64,
responseStartIndex uint64, currentIndex uint64, resMap *map[string]Response) error {
responseStartIndex uint64, currentIndex uint64, resMap *map[string]*Response) error {
prefix = path.Clean("/" + prefix)
@ -65,7 +65,7 @@ func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint
}
// Check if the response has what we are watching
func checkResponse(prefix string, index uint64, resMap *map[string]Response) bool {
func checkResponse(prefix string, index uint64, resMap *map[string]*Response) bool {
resp, ok := (*resMap)[strconv.FormatUint(index, 10)]
@ -104,7 +104,7 @@ func (w *WatcherHub) notify(resp Response) error {
newWatchers := make([]*Watcher, 0)
// notify all the watchers
for _, watcher := range watchers {
watcher.C <- resp
watcher.C <- &resp
}
if len(newWatchers) == 0 {
@ -120,3 +120,14 @@ func (w *WatcherHub) notify(resp Response) error {
return nil
}
// stopWatchers stops all the watchers
// This function is used when the etcd recovery from a snapshot at runtime
func (w *WatcherHub) stopWatchers() {
for _, subWatchers := range w.watchers{
for _, watcher := range subWatchers{
watcher.C <- nil
}
}
w.watchers = nil
}