From 1321c63f3b70368ff75b358d01c649172a72f368 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 14 Oct 2013 11:12:30 -0600 Subject: [PATCH] Extract Store into an interface. --- server/peer_server.go | 4 +-- server/registry.go | 4 +-- server/server.go | 6 ++-- server/v1/v1.go | 2 +- server/v2/v2.go | 2 +- store/create_command.go | 2 +- store/delete_command.go | 2 +- store/node.go | 38 +++++++++++--------- store/stats_test.go | 4 +-- store/store.go | 68 ++++++++++++++++++++++------------- store/store_test.go | 22 ++++++------ store/test_and_set_command.go | 2 +- store/update_command.go | 2 +- store/watcher_test.go | 2 +- 14 files changed, 92 insertions(+), 68 deletions(-) diff --git a/server/peer_server.go b/server/peer_server.go index 942d9e062..97a7757a3 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -30,7 +30,7 @@ type PeerServer struct { followersStats *raftFollowersStats serverStats *raftServerStats registry *Registry - store *store.Store + store store.Store snapConf *snapshotConf MaxClusterSize int RetryTimes int @@ -49,7 +49,7 @@ type snapshotConf struct { writesThr uint64 } -func NewPeerServer(name string, path string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store *store.Store) *PeerServer { +func NewPeerServer(name string, path string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store) *PeerServer { s := &PeerServer{ name: name, url: url, diff --git a/server/registry.go b/server/registry.go index b9197436b..23ef9ddbb 100644 --- a/server/registry.go +++ b/server/registry.go @@ -18,7 +18,7 @@ const RegistryKey = "/_etcd/machines" // The Registry stores URL information for nodes. type Registry struct { sync.Mutex - store *store.Store + store store.Store nodes map[string]*node } @@ -30,7 +30,7 @@ type node struct { } // Creates a new Registry. -func NewRegistry(s *store.Store) *Registry { +func NewRegistry(s store.Store) *Registry { return &Registry{ store: s, nodes: make(map[string]*node), diff --git a/server/server.go b/server/server.go index 24064fa4e..4e21aa179 100644 --- a/server/server.go +++ b/server/server.go @@ -21,7 +21,7 @@ type Server struct { http.Server peerServer *PeerServer registry *Registry - store *store.Store + store store.Store name string url string tlsConf *TLSConfig @@ -30,7 +30,7 @@ type Server struct { } // Creates a new Server. -func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, peerServer *PeerServer, registry *Registry, store *store.Store) *Server { +func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, peerServer *PeerServer, registry *Registry, store store.Store) *Server { s := &Server{ Server: http.Server{ Handler: mux.NewRouter(), @@ -85,7 +85,7 @@ func (s *Server) PeerURL(name string) (string, bool) { } // Returns a reference to the Store. -func (s *Server) Store() *store.Store { +func (s *Server) Store() store.Store { return s.store } diff --git a/server/v1/v1.go b/server/v1/v1.go index f71ed0622..6b155b7cc 100644 --- a/server/v1/v1.go +++ b/server/v1/v1.go @@ -10,6 +10,6 @@ import ( type Server interface { CommitIndex() uint64 Term() uint64 - Store() *store.Store + Store() store.Store Dispatch(raft.Command, http.ResponseWriter, *http.Request) error } diff --git a/server/v2/v2.go b/server/v2/v2.go index e412e859b..019297331 100644 --- a/server/v2/v2.go +++ b/server/v2/v2.go @@ -13,6 +13,6 @@ type Server interface { CommitIndex() uint64 Term() uint64 PeerURL(string) (string, bool) - Store() *store.Store + Store() store.Store Dispatch(raft.Command, http.ResponseWriter, *http.Request) error } diff --git a/store/create_command.go b/store/create_command.go index 2ccddd103..f13b91790 100644 --- a/store/create_command.go +++ b/store/create_command.go @@ -26,7 +26,7 @@ func (c *CreateCommand) CommandName() string { // Create node func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(*Store) + s, _ := server.StateMachine().(Store) e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, c.Force, c.ExpireTime, server.CommitIndex(), server.Term()) diff --git a/store/delete_command.go b/store/delete_command.go index 324410192..bc84dfc99 100644 --- a/store/delete_command.go +++ b/store/delete_command.go @@ -22,7 +22,7 @@ func (c *DeleteCommand) CommandName() string { // Delete the key func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(*Store) + s, _ := server.StateMachine().(Store) e, err := s.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term()) diff --git a/store/node.go b/store/node.go index 683e6038b..bbb971bd4 100644 --- a/store/node.go +++ b/store/node.go @@ -36,6 +36,9 @@ type Node struct { Value string // for key-value pair Children map[string]*Node // for directory + // A reference to the store this node is attached to. + store *store + // a ttl node will have an expire routine associated with it. // we need a channel to stop that routine when the expiration changes. stopExpire chan bool @@ -46,7 +49,7 @@ type Node struct { } // newKV creates a Key-Value pair -func newKV(nodePath string, value string, createIndex uint64, +func newKV(store *store, nodePath string, value string, createIndex uint64, createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node { return &Node{ @@ -57,6 +60,7 @@ func newKV(nodePath string, value string, createIndex uint64, ModifiedTerm: createTerm, Parent: parent, ACL: ACL, + store: store, stopExpire: make(chan bool, 1), ExpireTime: expireTime, Value: value, @@ -64,7 +68,7 @@ func newKV(nodePath string, value string, createIndex uint64, } // newDir creates a directory -func newDir(nodePath string, createIndex uint64, createTerm uint64, +func newDir(store *store, nodePath string, createIndex uint64, createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node { return &Node{ @@ -76,6 +80,7 @@ func newDir(nodePath string, createIndex uint64, createTerm uint64, stopExpire: make(chan bool, 1), ExpireTime: expireTime, Children: make(map[string]*Node), + store: store, } } @@ -262,17 +267,17 @@ func (n *Node) internalRemove(recursive bool, callback func(path string)) { // if the node is already expired, delete the node and return. // if the node is permanent (this shouldn't happen), return at once. // else wait for a period time, then remove the node. and notify the watchhub. -func (n *Node) Expire(s *Store) { +func (n *Node) Expire() { expired, duration := n.IsExpired() if expired { // has been expired // since the parent function of Expire() runs serially, // there is no need for lock here e := newEvent(Expire, n.Path, UndefIndex, UndefTerm) - s.WatcherHub.notify(e) + n.store.WatcherHub.notify(e) n.Remove(true, nil) - s.Stats.Inc(ExpireCount) + n.store.Stats.Inc(ExpireCount) return } @@ -289,17 +294,17 @@ func (n *Node) Expire(s *Store) { // before expire get the lock, the expiration time // of the node may be updated. // we have to check again when get the lock - s.worldLock.Lock() - defer s.worldLock.Unlock() + n.store.worldLock.Lock() + defer n.store.worldLock.Unlock() expired, _ := n.IsExpired() if expired { e := newEvent(Expire, n.Path, UndefIndex, UndefTerm) - s.WatcherHub.notify(e) + n.store.WatcherHub.notify(e) n.Remove(true, nil) - s.Stats.Inc(ExpireCount) + n.store.Stats.Inc(ExpireCount) } return @@ -355,7 +360,7 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair { } } -func (n *Node) UpdateTTL(expireTime time.Time, s *Store) { +func (n *Node) UpdateTTL(expireTime time.Time) { if !n.IsPermanent() { // check if the node has been expired // if the node is not expired, we need to stop the go routine associated with @@ -369,7 +374,7 @@ func (n *Node) UpdateTTL(expireTime time.Time, s *Store) { if expireTime.Sub(Permanent) != 0 { n.ExpireTime = expireTime - n.Expire(s) + n.Expire() } } @@ -378,10 +383,10 @@ func (n *Node) UpdateTTL(expireTime time.Time, s *Store) { // If the node is a key-value pair, it will clone the pair. func (n *Node) Clone() *Node { if !n.IsDir() { - return newKV(n.Path, n.Value, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime) + return newKV(n.store, n.Path, n.Value, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime) } - clone := newDir(n.Path, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime) + clone := newDir(n.store, n.Path, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime) for key, child := range n.Children { clone.Children[key] = child.Clone() @@ -397,15 +402,16 @@ func (n *Node) Clone() *Node { // call this function on its children. // We check the expire last since we need to recover the whole structure first and add all the // notifications into the event history. -func (n *Node) recoverAndclean(s *Store) { +func (n *Node) recoverAndclean() { if n.IsDir() { for _, child := range n.Children { child.Parent = n - child.recoverAndclean(s) + child.store = n.store + child.recoverAndclean() } } n.stopExpire = make(chan bool, 1) - n.Expire(s) + n.Expire() } diff --git a/store/stats_test.go b/store/stats_test.go index deeb317dc..44d1a8999 100644 --- a/store/stats_test.go +++ b/store/stats_test.go @@ -7,7 +7,7 @@ import ( ) func TestBasicStats(t *testing.T) { - s := New() + s := newStore() keys := GenKeys(rand.Intn(100), 5) var i uint64 @@ -140,7 +140,7 @@ func TestBasicStats(t *testing.T) { t.Fatalf("TestAndSetFail [%d] != Stats.TestAndSetFail [%d]", TestAndSetFail, s.Stats.TestAndSetFail) } - s = New() + s = newStore() SetSuccess = 0 SetFail = 0 diff --git a/store/store.go b/store/store.go index ade44acd9..60eb35da5 100644 --- a/store/store.go +++ b/store/store.go @@ -13,7 +13,21 @@ import ( etcdErr "github.com/coreos/etcd/error" ) -type Store struct { +type Store interface { + Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) + Create(nodePath string, value string, incrementalSuffix bool, force bool, + expireTime time.Time, index uint64, term uint64) (*Event, error) + Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) + TestAndSet(nodePath string, prevValue string, prevIndex uint64, + value string, expireTime time.Time, index uint64, term uint64) (*Event, error) + Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) + Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) + Save() ([]byte, error) + Recovery(state []byte) error + JsonStats() []byte +} + +type store struct { Root *Node WatcherHub *watcherHub Index uint64 @@ -22,9 +36,13 @@ type Store struct { worldLock sync.RWMutex // stop the world lock } -func New() *Store { - s := new(Store) - s.Root = newDir("/", UndefIndex, UndefTerm, nil, "", Permanent) +func New() Store { + return newStore() +} + +func newStore() *store { + s := new(store) + s.Root = newDir(s, "/", UndefIndex, UndefTerm, nil, "", Permanent) s.Stats = newStats() s.WatcherHub = newWatchHub(1000) @@ -34,7 +52,7 @@ func New() *Store { // Get function returns a get event. // If recursive is true, it will return all the content under the node path. // If sorted is true, it will sort the content by keys. -func (s *Store) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) { +func (s *store) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) { s.worldLock.RLock() defer s.worldLock.RUnlock() @@ -89,7 +107,7 @@ func (s *Store) Get(nodePath string, recursive, sorted bool, index uint64, term // Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl. // If the node has already existed, create will fail. // If any node on the path is a file, create will fail. -func (s *Store) Create(nodePath string, value string, incrementalSuffix bool, force bool, +func (s *store) Create(nodePath string, value string, incrementalSuffix bool, force bool, expireTime time.Time, index uint64, term uint64) (*Event, error) { nodePath = path.Clean(path.Join("/", nodePath)) @@ -101,7 +119,7 @@ func (s *Store) Create(nodePath string, value string, incrementalSuffix bool, fo // Update function updates the value/ttl of the node. // If the node is a file, the value and the ttl can be updated. // If the node is a directory, only the ttl can be updated. -func (s *Store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) { +func (s *store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) { s.worldLock.Lock() defer s.worldLock.Unlock() nodePath = path.Clean(path.Join("/", nodePath)) @@ -127,7 +145,7 @@ func (s *Store) Update(nodePath string, newValue string, expireTime time.Time, i } // update ttl - n.UpdateTTL(expireTime, s) + n.UpdateTTL(expireTime) e.Expiration, e.TTL = n.ExpirationAndTTL() @@ -138,7 +156,7 @@ func (s *Store) Update(nodePath string, newValue string, expireTime time.Time, i return e, nil } -func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64, +func (s *store) TestAndSet(nodePath string, prevValue string, prevIndex uint64, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { nodePath = path.Clean(path.Join("/", nodePath)) @@ -168,7 +186,7 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64, // if test succeed, write the value n.Write(value, index, term) - n.UpdateTTL(expireTime, s) + n.UpdateTTL(expireTime) e.Value = value e.Expiration, e.TTL = n.ExpirationAndTTL() @@ -185,7 +203,7 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64, // Delete function deletes the node at the given path. // If the node is a directory, recursive must be true to delete it. -func (s *Store) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) { +func (s *store) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) { nodePath = path.Clean(path.Join("/", nodePath)) s.worldLock.Lock() @@ -224,7 +242,7 @@ func (s *Store) Delete(nodePath string, recursive bool, index uint64, term uint6 return e, nil } -func (s *Store) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) { +func (s *store) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) { prefix = path.Clean(path.Join("/", prefix)) s.worldLock.RLock() @@ -252,7 +270,7 @@ func (s *Store) Watch(prefix string, recursive bool, sinceIndex uint64, index ui } // walk function walks all the nodePath and apply the walkFunc on each directory -func (s *Store) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, *etcdErr.Error)) (*Node, *etcdErr.Error) { +func (s *store) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, *etcdErr.Error)) (*Node, *etcdErr.Error) { components := strings.Split(nodePath, "/") curr := s.Root @@ -273,7 +291,7 @@ func (s *Store) walk(nodePath string, walkFunc func(prev *Node, component string return curr, nil } -func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix bool, force bool, +func (s *store) internalCreate(nodePath string, value string, incrementalSuffix bool, force bool, expireTime time.Time, index uint64, term uint64, action string) (*Event, error) { s.Index, s.Term = index, term @@ -316,12 +334,12 @@ func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix if len(value) != 0 { // create file e.Value = value - n = newKV(nodePath, value, index, term, d, "", expireTime) + n = newKV(s, nodePath, value, index, term, d, "", expireTime) } else { // create directory e.Dir = true - n = newDir(nodePath, index, term, d, "", expireTime) + n = newDir(s, nodePath, index, term, d, "", expireTime) } @@ -334,7 +352,7 @@ func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix // Node with TTL if expireTime.Sub(Permanent) != 0 { - n.Expire(s) + n.Expire() e.Expiration, e.TTL = n.ExpirationAndTTL() } @@ -344,7 +362,7 @@ func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix } // InternalGet function get the node of the given nodePath. -func (s *Store) internalGet(nodePath string, index uint64, term uint64) (*Node, *etcdErr.Error) { +func (s *store) internalGet(nodePath string, index uint64, term uint64) (*Node, *etcdErr.Error) { nodePath = path.Clean(path.Join("/", nodePath)) // update file system known index and term @@ -379,7 +397,7 @@ func (s *Store) internalGet(nodePath string, index uint64, term uint64) (*Node, // If it is a directory, this function will return the pointer to that node. // If it does not exist, this function will create a new directory and return the pointer to that node. // If it is a file, this function will return error. -func (s *Store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) { +func (s *store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) { node, ok := parent.Children[dirName] if ok { @@ -390,7 +408,7 @@ func (s *Store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) { return nil, etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, UndefIndex, UndefTerm) } - n := newDir(path.Join(parent.Path, dirName), s.Index, s.Term, parent, parent.ACL, Permanent) + n := newDir(s, path.Join(parent.Path, dirName), s.Index, s.Term, parent, parent.ACL, Permanent) parent.Children[dirName] = n @@ -401,10 +419,10 @@ func (s *Store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) { // Save function will not be able to save the state of watchers. // Save function will not save the parent field of the node. Or there will // be cyclic dependencies issue for the json package. -func (s *Store) Save() ([]byte, error) { +func (s *store) Save() ([]byte, error) { s.worldLock.Lock() - clonedStore := New() + clonedStore := newStore() clonedStore.Index = s.Index clonedStore.Term = s.Term clonedStore.Root = s.Root.Clone() @@ -426,7 +444,7 @@ func (s *Store) Save() ([]byte, error) { // It needs to recovery the parent field of the nodes. // It needs to delete the expired nodes since the saved time and also // need to create monitor go routines. -func (s *Store) Recovery(state []byte) error { +func (s *store) Recovery(state []byte) error { s.worldLock.Lock() defer s.worldLock.Unlock() err := json.Unmarshal(state, s) @@ -435,11 +453,11 @@ func (s *Store) Recovery(state []byte) error { return err } - s.Root.recoverAndclean(s) + s.Root.recoverAndclean() return nil } -func (s *Store) JsonStats() []byte { +func (s *store) JsonStats() []byte { s.Stats.Watchers = uint64(s.WatcherHub.count) return s.Stats.toJson() } diff --git a/store/store_test.go b/store/store_test.go index 74add4424..958d99007 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -8,7 +8,7 @@ import ( ) func TestCreateAndGet(t *testing.T) { - s := New() + s := newStore() s.Create("/foobar", "bar", false, false, Permanent, 1, 1) @@ -66,7 +66,7 @@ func TestCreateAndGet(t *testing.T) { } func TestUpdateFile(t *testing.T) { - s := New() + s := newStore() _, err := s.Create("/foo/bar", "bar", false, false, Permanent, 1, 1) @@ -161,7 +161,7 @@ func TestUpdateFile(t *testing.T) { } func TestListDirectory(t *testing.T) { - s := New() + s := newStore() // create dir /foo // set key-value /foo/foo=bar @@ -206,7 +206,7 @@ func TestListDirectory(t *testing.T) { } func TestRemove(t *testing.T) { - s := New() + s := newStore() s.Create("/foo", "bar", false, false, Permanent, 1, 1) _, err := s.Delete("/foo", false, 1, 1) @@ -245,7 +245,7 @@ func TestRemove(t *testing.T) { } func TestExpire(t *testing.T) { - s := New() + s := newStore() expire := time.Now().Add(time.Second) @@ -287,7 +287,7 @@ func TestExpire(t *testing.T) { } func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? - s := New() + s := newStore() s.Create("/foo", "bar", false, false, Permanent, 1, 1) // test on wrong previous value @@ -320,7 +320,7 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? } func TestWatch(t *testing.T) { - s := New() + s := newStore() // watch at a deeper path c, _ := s.Watch("/foo/foo/foo", false, 0, 0, 1) s.Create("/foo/foo/foo", "bar", false, false, Permanent, 1, 1) @@ -409,7 +409,7 @@ func TestWatch(t *testing.T) { } func TestSort(t *testing.T) { - s := New() + s := newStore() // simulating random creation keys := GenKeys(80, 4) @@ -447,7 +447,7 @@ func TestSort(t *testing.T) { } func TestSaveAndRecover(t *testing.T) { - s := New() + s := newStore() // simulating random creation keys := GenKeys(8, 4) @@ -469,7 +469,7 @@ func TestSaveAndRecover(t *testing.T) { s.Create("/foo/foo", "bar", false, false, expire, 1, 1) b, err := s.Save() - cloneFs := New() + cloneFs := newStore() time.Sleep(2 * time.Second) cloneFs.Recovery(b) @@ -521,7 +521,7 @@ func GenKeys(num int, depth int) []string { return keys } -func createAndGet(s *Store, path string, t *testing.T) { +func createAndGet(s *store, path string, t *testing.T) { _, err := s.Create(path, "bar", false, false, Permanent, 1, 1) if err != nil { diff --git a/store/test_and_set_command.go b/store/test_and_set_command.go index 3370fed15..811f713e5 100644 --- a/store/test_and_set_command.go +++ b/store/test_and_set_command.go @@ -27,7 +27,7 @@ func (c *TestAndSetCommand) CommandName() string { // Set the key-value pair if the current value of the key equals to the given prevValue func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(*Store) + s, _ := server.StateMachine().(Store) e, err := s.TestAndSet(c.Key, c.PrevValue, c.PrevIndex, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) diff --git a/store/update_command.go b/store/update_command.go index 9ffd6c868..582fb42fe 100644 --- a/store/update_command.go +++ b/store/update_command.go @@ -25,7 +25,7 @@ func (c *UpdateCommand) CommandName() string { // Update node func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(*Store) + s, _ := server.StateMachine().(Store) e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) diff --git a/store/watcher_test.go b/store/watcher_test.go index e437422ad..61da92f7c 100644 --- a/store/watcher_test.go +++ b/store/watcher_test.go @@ -5,7 +5,7 @@ import ( ) func TestWatcher(t *testing.T) { - s := New() + s := newStore() wh := s.WatcherHub c, err := wh.watch("/foo", true, 1) if err != nil {