/* Copyright 2013 CoreOS Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package store import ( "encoding/json" "fmt" "path" "strconv" "sync" "time" etcdErr "github.com/coreos/etcd/error" ) //------------------------------------------------------------------------------ // // Typedefs // //------------------------------------------------------------------------------ // The main struct of the Key-Value store type Store struct { // key-value store structure Tree *tree // This mutex protects everything except add watcher member. // Add watch member does not depend on the current state of the store. // And watch will return when other protected function is called and reach // the watching condition. // It is needed so that clone() can atomically replicate the Store // and do the log snapshot in a go routine. mutex sync.RWMutex // WatcherHub is where we register all the clients // who issue a watch request watcher *WatcherHub // The string channel to send messages to the outside world // Now we use it to send changes to the hub of the web service messager chan<- string // A map to keep the recent response to the clients ResponseMap map[string]*Response // The max number of the recent responses we can record ResponseMaxSize int // The current number of the recent responses we have recorded ResponseCurrSize uint // The index of the first recent responses we have ResponseStartIndex uint64 // Current index of the raft machine Index uint64 // Basic statistics information of etcd storage BasicStats EtcdStats } // A Node represents a Value in the Key-Value pair in the store // It has its value, expire time and a channel used to update the // expire time (since we do countdown in a go routine, we need to // communicate with it via channel) type Node struct { // The string value of the node Value string `json:"value"` // If the node is a permanent one the ExprieTime will be Unix(0,0) // Otherwise after the expireTime, the node will be deleted ExpireTime time.Time `json:"expireTime"` // A channel to update the expireTime of the node update chan time.Time `json:"-"` } // The response from the store to the user who issue a command type Response struct { Action string `json:"action"` Key string `json:"key"` Dir bool `json:"dir,omitempty"` PrevValue string `json:"prevValue,omitempty"` Value string `json:"value,omitempty"` // If the key did not exist before the action, // this field should be set to true NewKey bool `json:"newKey,omitempty"` Expiration *time.Time `json:"expiration,omitempty"` // Time to live in second TTL int64 `json:"ttl,omitempty"` // The command index of the raft machine when the command is executed Index uint64 `json:"index"` } // A listNode represent the simplest Key-Value pair with its type // It is only used when do list opeartion // We want to have a file system like store, thus we distingush "file" // and "directory" type ListNode struct { Key string Value string Type string } var PERMANENT = time.Unix(0, 0) //------------------------------------------------------------------------------ // // Methods // //------------------------------------------------------------------------------ // Create a new stroe // Arguement max is the max number of response we want to record func CreateStore(max int) *Store { s := new(Store) s.messager = nil s.ResponseMap = make(map[string]*Response) s.ResponseStartIndex = 0 s.ResponseMaxSize = max s.ResponseCurrSize = 0 s.Tree = &tree{ &treeNode{ Node{ "/", time.Unix(0, 0), nil, }, true, make(map[string]*treeNode), }, } s.watcher = newWatcherHub() return s } // Set the messager of the store func (s *Store) SetMessager(messager chan<- string) { s.messager = messager } func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) { s.mutex.Lock() defer s.mutex.Unlock() return s.internalSet(key, value, expireTime, index) } // Set the key to value with expiration time func (s *Store) internalSet(key string, value string, expireTime time.Time, index uint64) ([]byte, error) { //Update index s.Index = index //Update stats s.BasicStats.Sets++ key = path.Clean("/" + key) isExpire := !expireTime.Equal(PERMANENT) // base response resp := Response{ Action: "SET", Key: key, Value: value, Index: index, } // When the slow follower receive the set command // the key may be expired, we should not add the node // also if the node exist, we need to delete the node if isExpire && expireTime.Sub(time.Now()) < 0 { return s.internalDelete(key, index) } var TTL int64 // Update ttl if isExpire { TTL = int64(expireTime.Sub(time.Now()) / time.Second) resp.Expiration = &expireTime resp.TTL = TTL } // Get the node node, ok := s.Tree.get(key) if ok { // Update when node exists // Node is not permanent if !node.ExpireTime.Equal(PERMANENT) { // If node is not permanent // Update its expireTime node.update <- expireTime } else { // If we want the permanent node to have expire time // We need to create a go routine with a channel if isExpire { node.update = make(chan time.Time) go s.monitorExpiration(key, node.update, expireTime) } } // Update the information of the node s.Tree.set(key, Node{value, expireTime, node.update}) resp.PrevValue = node.Value s.watcher.notify(resp) msg, err := json.Marshal(resp) // Send to the messager if s.messager != nil && err == nil { s.messager <- string(msg) } s.addToResponseMap(index, &resp) return msg, err // Add new node } else { update := make(chan time.Time) ok := s.Tree.set(key, Node{value, expireTime, update}) if !ok { return nil, etcdErr.NewError(102, "set: "+key) } if isExpire { go s.monitorExpiration(key, update, expireTime) } resp.NewKey = true msg, err := json.Marshal(resp) // Nofity the watcher s.watcher.notify(resp) // Send to the messager if s.messager != nil && err == nil { s.messager <- string(msg) } s.addToResponseMap(index, &resp) return msg, err } } // Get the value of the key and return the raw response func (s *Store) internalGet(key string) *Response { key = path.Clean("/" + key) node, ok := s.Tree.get(key) if ok { var TTL int64 var isExpire bool = false isExpire = !node.ExpireTime.Equal(PERMANENT) resp := &Response{ Action: "GET", Key: key, Value: node.Value, Index: s.Index, } // Update ttl if isExpire { TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second) resp.Expiration = &node.ExpireTime resp.TTL = TTL } return resp } else { // we do not found the key return nil } } // Get all the items under key // If key is a file return the file // If key is a directory reuturn an array of files func (s *Store) Get(key string) ([]byte, error) { s.mutex.RLock() defer s.mutex.RUnlock() resps, err := s.RawGet(key) if err != nil { return nil, err } key = path.Clean("/" + key) // If the number of resps == 1 and the response key // is the key we query, a signal key-value should // be returned if len(resps) == 1 && resps[0].Key == key { return json.Marshal(resps[0]) } return json.Marshal(resps) } func (s *Store) rawGetNode(key string, node *Node) ([]*Response, error) { resps := make([]*Response, 1) isExpire := !node.ExpireTime.Equal(PERMANENT) resps[0] = &Response{ Action: "GET", Index: s.Index, Key: key, Value: node.Value, } // Update ttl if isExpire { TTL := int64(node.ExpireTime.Sub(time.Now()) / time.Second) resps[0].Expiration = &node.ExpireTime resps[0].TTL = TTL } return resps, nil } func (s *Store) rawGetNodeList(key string, keys []string, nodes []*Node) ([]*Response, error) { resps := make([]*Response, len(nodes)) // TODO: check if nodes and keys are the same length for i := 0; i < len(nodes); i++ { var TTL int64 var isExpire bool = false isExpire = !nodes[i].ExpireTime.Equal(PERMANENT) resps[i] = &Response{ Action: "GET", Index: s.Index, Key: path.Join(key, keys[i]), } if len(nodes[i].Value) != 0 { resps[i].Value = nodes[i].Value } else { resps[i].Dir = true } // Update ttl if isExpire { TTL = int64(nodes[i].ExpireTime.Sub(time.Now()) / time.Second) resps[i].Expiration = &nodes[i].ExpireTime resps[i].TTL = TTL } } return resps, nil } func (s *Store) RawGet(key string) ([]*Response, error) { // Update stats s.BasicStats.Gets++ key = path.Clean("/" + key) nodes, keys, ok := s.Tree.list(key) if !ok { return nil, etcdErr.NewError(100, "get: "+key) } switch node := nodes.(type) { case *Node: return s.rawGetNode(key, node) case []*Node: return s.rawGetNodeList(key, keys, node) default: panic("invalid cast ") } } func (s *Store) Delete(key string, index uint64) ([]byte, error) { s.mutex.Lock() defer s.mutex.Unlock() return s.internalDelete(key, index) } // Delete the key func (s *Store) internalDelete(key string, index uint64) ([]byte, error) { // Update stats s.BasicStats.Deletes++ key = path.Clean("/" + key) // Update index s.Index = index node, ok := s.Tree.get(key) if !ok { return nil, etcdErr.NewError(100, "delete: "+key) } resp := Response{ Action: "DELETE", Key: key, PrevValue: node.Value, Index: index, } if node.ExpireTime.Equal(PERMANENT) { s.Tree.delete(key) } else { resp.Expiration = &node.ExpireTime // Kill the expire go routine node.update <- PERMANENT s.Tree.delete(key) } msg, err := json.Marshal(resp) s.watcher.notify(resp) // notify the messager if s.messager != nil && err == nil { s.messager <- string(msg) } s.addToResponseMap(index, &resp) return msg, err } // Set the value of the key to the value if the given prevValue is equal to the value of the key func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) { s.mutex.Lock() defer s.mutex.Unlock() // Update stats s.BasicStats.TestAndSets++ resp := s.internalGet(key) if resp == nil { if prevValue != "" { errmsg := fmt.Sprintf("TestAndSet: key not found and previousValue is not empty %s:%s ", key, prevValue) return nil, etcdErr.NewError(100, errmsg) } return s.internalSet(key, value, expireTime, index) } if resp.Value == prevValue { // If test succeed, do set return s.internalSet(key, value, expireTime, index) } else { // If fails, return err return nil, etcdErr.NewError(101, fmt.Sprintf("TestAndSet: %s!=%s", resp.Value, prevValue)) } } // Add a channel to the watchHub. // The watchHub will send response to the channel when any key under the prefix // changes [since the sinceIndex if given] func (s *Store) AddWatcher(prefix string, watcher *Watcher, sinceIndex uint64) error { return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, s.ResponseMap) } // This function should be created as a go routine to delete the key-value pair // when it reaches expiration time func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime time.Time) { duration := expireTime.Sub(time.Now()) for { select { // Timeout delete the node case <-time.After(duration): node, ok := s.Tree.get(key) if !ok { return } else { s.mutex.Lock() s.Tree.delete(key) resp := Response{ Action: "DELETE", Key: key, PrevValue: node.Value, Expiration: &node.ExpireTime, Index: s.Index, } s.mutex.Unlock() msg, err := json.Marshal(resp) s.watcher.notify(resp) // notify the messager if s.messager != nil && err == nil { s.messager <- string(msg) } return } case updateTime := <-update: // Update duration // If the node become a permanent one, the go routine is // not needed if updateTime.Equal(PERMANENT) { return } // Update duration duration = updateTime.Sub(time.Now()) } } } // When we receive a command that will change the state of the key-value store // We will add the result of it to the ResponseMap for the use of watch command // Also we may remove the oldest response when we add new one func (s *Store) addToResponseMap(index uint64, resp *Response) { // zero case if s.ResponseMaxSize == 0 { return } strIndex := strconv.FormatUint(index, 10) s.ResponseMap[strIndex] = resp // unlimited if s.ResponseMaxSize < 0 { s.ResponseCurrSize++ return } // if we reach the max point, we need to delete the most latest // response and update the startIndex if s.ResponseCurrSize == uint(s.ResponseMaxSize) { s.ResponseStartIndex++ delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10)) } else { s.ResponseCurrSize++ } } func (s *Store) clone() *Store { newStore := &Store{ ResponseMaxSize: s.ResponseMaxSize, ResponseCurrSize: s.ResponseCurrSize, ResponseStartIndex: s.ResponseStartIndex, Index: s.Index, BasicStats: s.BasicStats, } newStore.Tree = s.Tree.clone() newStore.ResponseMap = make(map[string]*Response) for index, response := range s.ResponseMap { newStore.ResponseMap[index] = response } return newStore } // Save the current state of the storage system func (s *Store) Save() ([]byte, error) { // first we clone the store // json is very slow, we cannot hold the lock for such a long time s.mutex.Lock() cloneStore := s.clone() s.mutex.Unlock() b, err := json.Marshal(cloneStore) if err != nil { fmt.Println(err) return nil, err } return b, nil } // Recovery the state of the stroage system from a previous state func (s *Store) Recovery(state []byte) error { s.mutex.Lock() defer s.mutex.Unlock() // we need to stop all the current watchers // recovery will clear watcherHub s.watcher.stopWatchers() err := json.Unmarshal(state, s) // The only thing need to change after the recovery is the // node with expiration time, we need to delete all the node // that have been expired and setup go routines to monitor the // other ones s.checkExpiration() return err } // Clean the expired nodes // Set up go routines to mon func (s *Store) checkExpiration() { s.Tree.traverse(s.checkNode, false) } // Check each node func (s *Store) checkNode(key string, node *Node) { if node.ExpireTime.Equal(PERMANENT) { return } else { if node.ExpireTime.Sub(time.Now()) >= time.Second { node.update = make(chan time.Time) go s.monitorExpiration(key, node.update, node.ExpireTime) } else { // we should delete this node s.Tree.delete(key) } } }