clean up tree.go and watcher.go

This commit is contained in:
Xiang Li 2013-07-09 13:14:12 -07:00
parent 2589a76d66
commit 1e9307e88a
5 changed files with 224 additions and 181 deletions

View File

@ -33,7 +33,7 @@ func (c *SetCommand) CommandName() string {
// Set the value of key to value
func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) {
return store.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex())
return etcdStore.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex())
}
// TestAndSet command
@ -51,7 +51,7 @@ func (c *TestAndSetCommand) CommandName() string {
// Set the value of key to value
func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) {
return store.TestAndSet(c.Key, c.PrevValue, c.Value, c.ExpireTime, server.CommitIndex())
return etcdStore.TestAndSet(c.Key, c.PrevValue, c.Value, c.ExpireTime, server.CommitIndex())
}
// Get command
@ -66,7 +66,7 @@ func (c *GetCommand) CommandName() string {
// Set the value of key to value
func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
res := store.Get(c.Key)
res := etcdStore.Get(c.Key)
return json.Marshal(res)
}
@ -82,7 +82,7 @@ func (c *ListCommand) CommandName() string {
// Set the value of key to value
func (c *ListCommand) Apply(server *raft.Server) (interface{}, error) {
return store.List(c.Prefix)
return etcdStore.List(c.Prefix)
}
// Delete command
@ -97,7 +97,7 @@ func (c *DeleteCommand) CommandName() string {
// Delete the key
func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
return store.Delete(c.Key, server.CommitIndex())
return etcdStore.Delete(c.Key, server.CommitIndex())
}
// Watch command
@ -112,13 +112,13 @@ func (c *WatchCommand) CommandName() string {
}
func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
ch := make(chan store.Response, 1)
watcher := store.CreateWatcher()
// add to the watchers list
store.AddWatcher(c.Key, ch, c.SinceIndex)
etcdStore.AddWatcher(c.Key, watcher, c.SinceIndex)
// wait for the notification for any changing
res := <-ch
res := <-watcher.C
return json.Marshal(res)
}

11
etcd.go
View File

@ -113,9 +113,7 @@ type Info struct {
var server *raft.Server
var serverTransHandler transHandler
var logger *log.Logger
var storeMsg chan string
var etcdStore *store.Store
//------------------------------------------------------------------------------
//
@ -129,7 +127,6 @@ var storeMsg chan string
func main() {
var err error
logger = log.New(os.Stdout, "", log.LstdFlags)
flag.Parse()
// Setup commands.
@ -162,10 +159,10 @@ func main() {
serverTransHandler = createTranHandler(st)
// Setup new raft server.
s := store.CreateStore(maxSize)
etcdStore = store.CreateStore(maxSize)
// create raft server
server, err = raft.NewServer(name, dirPath, serverTransHandler, s, nil)
server, err = raft.NewServer(name, dirPath, serverTransHandler, etcdStore, nil)
if err != nil {
fatal("%v", err)
@ -226,7 +223,7 @@ func main() {
if webPort != -1 {
// start web
s.SetMessager(&storeMsg)
etcdStore.SetMessager(&storeMsg)
go webHelper()
go web.Start(server, webPort)
}

View File

@ -38,6 +38,7 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.AppendEntriesRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil {
debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries))
if resp := server.AppendEntries(aereq); resp != nil {
@ -121,7 +122,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
duration, err := strconv.Atoi(strDuration)
if err != nil {
warn("raftd: Bad duration: %v", err)
warn("Bad duration: %v", err)
(*w).WriteHeader(http.StatusInternalServerError)
return
}
@ -150,7 +151,7 @@ func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
duration, err := strconv.Atoi(strDuration)
if err != nil {
warn("raftd: Bad duration: %v", err)
warn("Bad duration: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
@ -266,7 +267,7 @@ func ListHttpHandler(w http.ResponseWriter, req *http.Request) {
command.Prefix = prefix
if body, err := command.Apply(server); err != nil {
warn("raftd: Unable to write file: %v", err)
warn("Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
} else {
@ -309,7 +310,7 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
}
if body, err := command.Apply(server); err != nil {
warn("raftd: Unable to write file: %v", err)
warn("Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
} else {

View File

@ -4,186 +4,205 @@ import (
"encoding/json"
"fmt"
"path"
"time"
"strconv"
"time"
)
// global store
var s *Store
// CONSTANTS
const (
ERROR = -1 + iota
SET
DELETE
GET
)
var PERMANENT = time.Unix(0, 0)
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
// The main struct of the Key-Value store
type Store struct {
// // use the build-in hash map as the key-value store structure
// Nodes map[string]Node `json:"nodes"`
// use treeMap as the key-value stroe structure
// key-value store structure
Tree *tree
// the string channel to send messages to the outside world
// now we use it to send changes to the hub of the web service
// WatcherHub is where we register all the clients
// who issue a watch request
watcher *WatcherHub
// The string channel to send messages to the outside world
// Now we use it to send changes to the hub of the web service
messager *chan string
//
// A map to keep the recent response to the clients
ResponseMap map[string]Response
//
// The max number of the recent responses we can record
ResponseMaxSize int
// The current number of the recent responses we have recorded
ResponseCurrSize uint
// at some point, we may need to compact the Response
// The index of the first recent responses we have
ResponseStartIndex uint64
// current Index
// Current index of the raft machine
Index uint64
}
// A Node represents a Value in the Key-Value pair in the store
// It has its value, expire time and a channel used to update the
// expire time (since we do countdown in a go routine, we need to
// communicate with it via channel)
type Node struct {
// The string value of the node
Value string `json:"value"`
// if the node is a permanent one the ExprieTime will be Unix(0,0)
// If the node is a permanent one the ExprieTime will be Unix(0,0)
// Otherwise after the expireTime, the node will be deleted
ExpireTime time.Time `json:"expireTime"`
// a channel to update the expireTime of the node
// A channel to update the expireTime of the node
update chan time.Time `json:"-"`
}
// The response from the store to the user who issue a command
type Response struct {
Action int `json:"action"`
Key string `json:"key"`
Action string `json:"action"`
Key string `json:"key"`
PrevValue string `json:"prevValue"`
Value string `json:"value"`
Value string `json:"value"`
// if the key existed before the action, this field should be true
// if the key did not exist before the action, this field should be false
// If the key existed before the action, this field should be true
// If the key did not exist before the action, this field should be false
Exist bool `json:"exist"`
Expiration time.Time `json:"expiration"`
// countdown until expiration in seconds
// Time to live in second
TTL int64 `json:"ttl"`
// The command index of the raft machine when the command is executed
Index uint64 `json:"index"`
}
// A listNode represent the simplest Key-Value pair with its type
// It is only used when do list opeartion
// We want to have a file system like store, thus we distingush "file"
// and "directory"
type ListNode struct {
Key string
Value string
Type string
Key string
Value string
Type string
}
// make a new stroe
var PERMANENT = time.Unix(0, 0)
//------------------------------------------------------------------------------
//
// Methods
//
//------------------------------------------------------------------------------
// Create a new stroe
// Arguement max is the max number of response we want to record
func CreateStore(max int) *Store {
s = new(Store)
s := new(Store)
s.messager = nil
s.ResponseMap = make(map[string]Response)
s.ResponseStartIndex = 0
s.ResponseMaxSize = max
s.ResponseCurrSize = 0
s.Tree = &tree{
&treeNode{
Node {
s.Tree = &tree{
&treeNode{
Node{
"/",
time.Unix(0,0),
time.Unix(0, 0),
nil,
},
true,
true,
make(map[string]*treeNode),
},
}
}
s.watcher = createWatcherHub()
return s
}
// return a pointer to the store
func GetStore() *Store {
return s
}
// set the messager of the store
// Set the messager of the store
func (s *Store) SetMessager(messager *chan string) {
s.messager = messager
}
// set the key to value, return the old value if the key exists
func Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
// Set the key to value with expiration time
func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
//update index
//Update index
s.Index = index
key = "/" + key
key = path.Clean(key)
key = path.Clean("/" + key)
var isExpire bool = false
isExpire := !expireTime.Equal(PERMANENT)
isExpire = !expireTime.Equal(PERMANENT)
// when the slow follower receive the set command
// When the slow follower receive the set command
// the key may be expired, we should not add the node
// also if the node exist, we need to delete the node
if isExpire && expireTime.Sub(time.Now()) < 0 {
return Delete(key, index)
return s.Delete(key, index)
}
var TTL int64
// update ttl
// Update ttl
if isExpire {
TTL = int64(expireTime.Sub(time.Now()) / time.Second)
} else {
// For permanent value, we set ttl to -1
TTL = -1
}
// get the node
// Get the node
node, ok := s.Tree.get(key)
if ok {
// if node is not permanent before
// update its expireTime
// Update when node exists
// Node is not permanent
if !node.ExpireTime.Equal(PERMANENT) {
// If node is not permanent
// Update its expireTime
node.update <- expireTime
} else {
// if we want the permanent node to have expire time
// we need to create a chan and create a go routine
// If we want the permanent node to have expire time
// We need to create create a go routine with a channel
if isExpire {
node.update = make(chan time.Time)
go expire(key, node.update, expireTime)
go s.monitorExpiration(key, node.update, expireTime)
}
}
// update the information of the node
// Update the information of the node
s.Tree.set(key, Node{value, expireTime, node.update})
resp := Response{SET, key, node.Value, value, true, expireTime, TTL, index}
resp := Response{"SET", key, node.Value, value, true, expireTime, TTL, index}
s.watcher.notify(resp)
msg, err := json.Marshal(resp)
notify(resp)
// send to the messager
// Send to the messager
if s.messager != nil && err == nil {
*s.messager <- string(msg)
}
updateMap(index, &resp)
s.addToResponseMap(index, &resp)
return msg, err
// add new node
// Add new node
} else {
update := make(chan time.Time)
@ -191,32 +210,31 @@ func Set(key string, value string, expireTime time.Time, index uint64) ([]byte,
s.Tree.set(key, Node{value, expireTime, update})
if isExpire {
go expire(key, update, expireTime)
go s.monitorExpiration(key, update, expireTime)
}
resp := Response{SET, key, "", value, false, expireTime, TTL, index}
resp := Response{"SET", key, "", value, false, expireTime, TTL, index}
msg, err := json.Marshal(resp)
// nofity the watcher
notify(resp)
// Nofity the watcher
s.watcher.notify(resp)
// notify the web interface
// Send to the messager
if s.messager != nil && err == nil {
*s.messager <- string(msg)
}
updateMap(index, &resp)
s.addToResponseMap(index, &resp)
return msg, err
}
}
// get the value of the key
func Get(key string) Response {
key = "/" + key
key = path.Clean(key)
// Get the value of the key
func (s *Store) Get(key string) Response {
key = path.Clean("/" + key)
node, ok := s.Tree.get(key)
@ -226,22 +244,24 @@ func Get(key string) Response {
isExpire = !node.ExpireTime.Equal(PERMANENT)
// update ttl
// Update ttl
if isExpire {
TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second)
} else {
TTL = -1
}
return Response{GET, key, node.Value, node.Value, true, node.ExpireTime, TTL, s.Index}
} else {
return Response{"GET", key, node.Value, node.Value, true, node.ExpireTime, TTL, s.Index}
return Response{GET, key, "", "", false, time.Unix(0, 0), 0, s.Index}
} else {
// we do not found the key
return Response{"GET", key, "", "", false, time.Unix(0, 0), 0, s.Index}
}
}
// // List all the item in the prefix
func List(prefix string) ([]byte, error) {
// List all the item in the prefix
func (s *Store) List(prefix string) ([]byte, error) {
nodes, keys, dirs, ok := s.Tree.list(prefix)
var ln []ListNode
@ -256,15 +276,14 @@ func List(prefix string) ([]byte, error) {
return json.Marshal(ln)
}
// delete the key
func Delete(key string, index uint64) ([]byte, error) {
//update index
key = "/" + key
// Delete the key
func (s *Store) Delete(key string, index uint64) ([]byte, error) {
key = path.Clean("/" + key)
//Update index
s.Index = index
key = path.Clean(key)
node, ok := s.Tree.get(key)
if ok {
@ -275,17 +294,17 @@ func Delete(key string, index uint64) ([]byte, error) {
} else {
// kill the expire go routine
// Kill the expire go routine
node.update <- PERMANENT
s.Tree.delete(key)
}
resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, index}
resp := Response{"DELETE", key, node.Value, "", true, node.ExpireTime, 0, index}
msg, err := json.Marshal(resp)
notify(resp)
s.watcher.notify(resp)
// notify the messager
if s.messager != nil && err == nil {
@ -293,50 +312,70 @@ func Delete(key string, index uint64) ([]byte, error) {
*s.messager <- string(msg)
}
updateMap(index, &resp)
s.addToResponseMap(index, &resp)
return msg, err
} else {
resp := Response{DELETE, key, "", "", false, time.Unix(0, 0), 0, index}
resp := Response{"DELETE", key, "", "", false, time.Unix(0, 0), 0, index}
updateMap(index, &resp)
s.addToResponseMap(index, &resp)
return json.Marshal(resp)
}
}
// set the value of the key to the value if the given prevValue is equal to the value of the key
func TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
resp := Get(key)
// Set the value of the key to the value if the given prevValue is equal to the value of the key
func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
resp := s.Get(key)
if resp.PrevValue == prevValue {
return Set(key, value, expireTime, index)
// If test success, do set
return s.Set(key, value, expireTime, index)
} else {
// If fails, return the result of get which contains the current
// status of the key-value pair
return json.Marshal(resp)
}
}
// should be used as a go routine to delete the key when it expires
func expire(key string, update chan time.Time, expireTime time.Time) {
// Add a channel to the watchHub.
// The watchHub will send response to the channel when any key under the prefix
// changes [since the sinceIndex if given]
func (s *Store) AddWatcher(prefix string, watcher *Watcher, sinceIndex uint64) error {
return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, &s.ResponseMap)
}
// This function should be created as a go routine to delete the key-value pair
// when it reaches expiration time
func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime time.Time) {
duration := expireTime.Sub(time.Now())
for {
select {
// timeout delete the node
// Timeout delete the node
case <-time.After(duration):
node, ok := s.Tree.get(key)
if !ok {
return
} else {
s.Tree.delete(key)
resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.Index}
resp := Response{"DELETE", key, node.Value, "", true, node.ExpireTime, 0, s.Index}
msg, err := json.Marshal(resp)
notify(resp)
s.watcher.notify(resp)
// notify the messager
if s.messager != nil && err == nil {
@ -349,21 +388,25 @@ func expire(key string, update chan time.Time, expireTime time.Time) {
}
case updateTime := <-update:
//update duration
// if the node become a permanent one, the go routine is
// Update duration
// If the node become a permanent one, the go routine is
// not needed
if updateTime.Equal(PERMANENT) {
fmt.Println("permanent")
return
}
// update duration
// Update duration
duration = updateTime.Sub(time.Now())
}
}
}
func updateMap(index uint64, resp *Response) {
// When we receive a command that will change the state of the key-value store
// We will add the result of it to the ResponseMap for the use of watch command
// Also we may remove the oldest response when we add new one
func (s *Store) addToResponseMap(index uint64, resp *Response) {
// zero case
if s.ResponseMaxSize == 0 {
return
}
@ -372,11 +415,13 @@ func updateMap(index uint64, resp *Response) {
s.ResponseMap[strIndex] = *resp
// unlimited
if s.ResponseMaxSize < 0{
if s.ResponseMaxSize < 0 {
s.ResponseCurrSize++
return
}
// if we reach the max point, we need to delete the most latest
// response and update the startIndex
if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
s.ResponseStartIndex++
delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
@ -385,8 +430,7 @@ func updateMap(index uint64, resp *Response) {
}
}
// save the current state of the storage system
// Save the current state of the storage system
func (s *Store) Save() ([]byte, error) {
b, err := json.Marshal(s)
if err != nil {
@ -396,31 +440,35 @@ func (s *Store) Save() ([]byte, error) {
return b, nil
}
// recovery the state of the stroage system from a previous state
// Recovery the state of the stroage system from a previous state
func (s *Store) Recovery(state []byte) error {
err := json.Unmarshal(state, s)
// clean the expired nodes
clean()
// The only thing need to change after the recovery is the
// node with expiration time, we need to delete all the node
// that have been expired and setup go routines to monitor the
// other ones
s.checkExpiration()
return err
}
// clean all expired keys
func clean() {
s.Tree.traverse(cleanNode, false)
// Clean the expired nodes
// Set up go routines to mon
func (s *Store) checkExpiration() {
s.Tree.traverse(s.checkNode, false)
}
func cleanNode(key string, node *Node) {
// Check each node
func (s *Store) checkNode(key string, node *Node) {
if node.ExpireTime.Equal(PERMANENT) {
return
} else {
if node.ExpireTime.Sub(time.Now()) >= time.Second {
node.update = make(chan time.Time)
go expire(key, node.update, node.ExpireTime)
go s.monitorExpiration(key, node.update, node.ExpireTime)
} else {
// we should delete this node

View File

@ -5,43 +5,44 @@ import (
"strconv"
"strings"
)
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
// WatcherHub is where the client register its watcher
type WatcherHub struct {
watchers map[string][]Watcher
watchers map[string][]*Watcher
}
// Currently watcher only contains a response channel
type Watcher struct {
c chan Response
C chan Response
}
// global watcher
var w *WatcherHub
// init the global watcher
func init() {
w = createWatcherHub()
}
// create a new watcher
// Create a new watcherHub
func createWatcherHub() *WatcherHub {
w := new(WatcherHub)
w.watchers = make(map[string][]Watcher)
w.watchers = make(map[string][]*Watcher)
return w
}
func GetWatcherHub() *WatcherHub {
return w
// Create a new watcher
func CreateWatcher() *Watcher {
return &Watcher{C: make(chan Response, 1)}
}
// register a function with channel and prefix to the watcher
func AddWatcher(prefix string, c chan Response, sinceIndex uint64) error {
// Add a watcher to the watcherHub
func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint64,
responseStartIndex uint64, currentIndex uint64, resMap *map[string]Response) error {
prefix = "/" + path.Clean(prefix)
prefix = path.Clean("/" + prefix)
if sinceIndex != 0 && sinceIndex >= s.ResponseStartIndex {
for i := sinceIndex; i <= s.Index; i++ {
if check(prefix, i) {
c <- s.ResponseMap[strconv.FormatUint(i, 10)]
if sinceIndex != 0 && sinceIndex >= responseStartIndex {
for i := sinceIndex; i <= currentIndex; i++ {
if checkResponse(prefix, i, resMap) {
watcher.C <- (*resMap)[strconv.FormatUint(i, 10)]
return nil
}
}
@ -51,25 +52,21 @@ func AddWatcher(prefix string, c chan Response, sinceIndex uint64) error {
if !ok {
w.watchers[prefix] = make([]Watcher, 0)
watcher := Watcher{c}
w.watchers[prefix] = make([]*Watcher, 0)
w.watchers[prefix] = append(w.watchers[prefix], watcher)
} else {
watcher := Watcher{c}
w.watchers[prefix] = append(w.watchers[prefix], watcher)
}
return nil
}
// check if the response has what we are watching
func check(prefix string, index uint64) bool {
// Check if the response has what we are watching
func checkResponse(prefix string, index uint64, resMap *map[string]Response) bool {
resp, ok := s.ResponseMap[strconv.FormatUint(index, 10)]
resp, ok := (*resMap)[strconv.FormatUint(index, 10)]
if !ok {
// not storage system command
@ -89,8 +86,8 @@ func check(prefix string, index uint64) bool {
}
// notify the watcher a action happened
func notify(resp Response) error {
// Notify the watcher a action happened
func (w *WatcherHub) notify(resp Response) error {
resp.Key = path.Clean(resp.Key)
segments := strings.Split(resp.Key, "/")
@ -104,10 +101,10 @@ func notify(resp Response) error {
if ok {
newWatchers := make([]Watcher, 0)
newWatchers := make([]*Watcher, 0)
// notify all the watchers
for _, watcher := range watchers {
watcher.c <- resp
watcher.C <- resp
}
if len(newWatchers) == 0 {