Extract Store into an interface.

This commit is contained in:
Ben Johnson 2013-10-14 11:12:30 -06:00
parent a6a32a592d
commit 1321c63f3b
14 changed files with 92 additions and 68 deletions

View File

@ -30,7 +30,7 @@ type PeerServer struct {
followersStats *raftFollowersStats followersStats *raftFollowersStats
serverStats *raftServerStats serverStats *raftServerStats
registry *Registry registry *Registry
store *store.Store store store.Store
snapConf *snapshotConf snapConf *snapshotConf
MaxClusterSize int MaxClusterSize int
RetryTimes int RetryTimes int
@ -49,7 +49,7 @@ type snapshotConf struct {
writesThr uint64 writesThr uint64
} }
func NewPeerServer(name string, path string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store *store.Store) *PeerServer { func NewPeerServer(name string, path string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store) *PeerServer {
s := &PeerServer{ s := &PeerServer{
name: name, name: name,
url: url, url: url,

View File

@ -18,7 +18,7 @@ const RegistryKey = "/_etcd/machines"
// The Registry stores URL information for nodes. // The Registry stores URL information for nodes.
type Registry struct { type Registry struct {
sync.Mutex sync.Mutex
store *store.Store store store.Store
nodes map[string]*node nodes map[string]*node
} }
@ -30,7 +30,7 @@ type node struct {
} }
// Creates a new Registry. // Creates a new Registry.
func NewRegistry(s *store.Store) *Registry { func NewRegistry(s store.Store) *Registry {
return &Registry{ return &Registry{
store: s, store: s,
nodes: make(map[string]*node), nodes: make(map[string]*node),

View File

@ -21,7 +21,7 @@ type Server struct {
http.Server http.Server
peerServer *PeerServer peerServer *PeerServer
registry *Registry registry *Registry
store *store.Store store store.Store
name string name string
url string url string
tlsConf *TLSConfig tlsConf *TLSConfig
@ -30,7 +30,7 @@ type Server struct {
} }
// Creates a new Server. // Creates a new Server.
func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, peerServer *PeerServer, registry *Registry, store *store.Store) *Server { func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, peerServer *PeerServer, registry *Registry, store store.Store) *Server {
s := &Server{ s := &Server{
Server: http.Server{ Server: http.Server{
Handler: mux.NewRouter(), Handler: mux.NewRouter(),
@ -85,7 +85,7 @@ func (s *Server) PeerURL(name string) (string, bool) {
} }
// Returns a reference to the Store. // Returns a reference to the Store.
func (s *Server) Store() *store.Store { func (s *Server) Store() store.Store {
return s.store return s.store
} }

View File

@ -10,6 +10,6 @@ import (
type Server interface { type Server interface {
CommitIndex() uint64 CommitIndex() uint64
Term() uint64 Term() uint64
Store() *store.Store Store() store.Store
Dispatch(raft.Command, http.ResponseWriter, *http.Request) error Dispatch(raft.Command, http.ResponseWriter, *http.Request) error
} }

View File

@ -13,6 +13,6 @@ type Server interface {
CommitIndex() uint64 CommitIndex() uint64
Term() uint64 Term() uint64
PeerURL(string) (string, bool) PeerURL(string) (string, bool)
Store() *store.Store Store() store.Store
Dispatch(raft.Command, http.ResponseWriter, *http.Request) error Dispatch(raft.Command, http.ResponseWriter, *http.Request) error
} }

View File

@ -26,7 +26,7 @@ func (c *CreateCommand) CommandName() string {
// Create node // Create node
func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) { func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(*Store) s, _ := server.StateMachine().(Store)
e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, c.Force, c.ExpireTime, server.CommitIndex(), server.Term()) e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, c.Force, c.ExpireTime, server.CommitIndex(), server.Term())

View File

@ -22,7 +22,7 @@ func (c *DeleteCommand) CommandName() string {
// Delete the key // Delete the key
func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) { func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(*Store) s, _ := server.StateMachine().(Store)
e, err := s.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term()) e, err := s.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term())

View File

@ -36,6 +36,9 @@ type Node struct {
Value string // for key-value pair Value string // for key-value pair
Children map[string]*Node // for directory Children map[string]*Node // for directory
// A reference to the store this node is attached to.
store *store
// a ttl node will have an expire routine associated with it. // a ttl node will have an expire routine associated with it.
// we need a channel to stop that routine when the expiration changes. // we need a channel to stop that routine when the expiration changes.
stopExpire chan bool stopExpire chan bool
@ -46,7 +49,7 @@ type Node struct {
} }
// newKV creates a Key-Value pair // newKV creates a Key-Value pair
func newKV(nodePath string, value string, createIndex uint64, func newKV(store *store, nodePath string, value string, createIndex uint64,
createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node { createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node {
return &Node{ return &Node{
@ -57,6 +60,7 @@ func newKV(nodePath string, value string, createIndex uint64,
ModifiedTerm: createTerm, ModifiedTerm: createTerm,
Parent: parent, Parent: parent,
ACL: ACL, ACL: ACL,
store: store,
stopExpire: make(chan bool, 1), stopExpire: make(chan bool, 1),
ExpireTime: expireTime, ExpireTime: expireTime,
Value: value, Value: value,
@ -64,7 +68,7 @@ func newKV(nodePath string, value string, createIndex uint64,
} }
// newDir creates a directory // newDir creates a directory
func newDir(nodePath string, createIndex uint64, createTerm uint64, func newDir(store *store, nodePath string, createIndex uint64, createTerm uint64,
parent *Node, ACL string, expireTime time.Time) *Node { parent *Node, ACL string, expireTime time.Time) *Node {
return &Node{ return &Node{
@ -76,6 +80,7 @@ func newDir(nodePath string, createIndex uint64, createTerm uint64,
stopExpire: make(chan bool, 1), stopExpire: make(chan bool, 1),
ExpireTime: expireTime, ExpireTime: expireTime,
Children: make(map[string]*Node), Children: make(map[string]*Node),
store: store,
} }
} }
@ -262,17 +267,17 @@ func (n *Node) internalRemove(recursive bool, callback func(path string)) {
// if the node is already expired, delete the node and return. // if the node is already expired, delete the node and return.
// if the node is permanent (this shouldn't happen), return at once. // if the node is permanent (this shouldn't happen), return at once.
// else wait for a period time, then remove the node. and notify the watchhub. // else wait for a period time, then remove the node. and notify the watchhub.
func (n *Node) Expire(s *Store) { func (n *Node) Expire() {
expired, duration := n.IsExpired() expired, duration := n.IsExpired()
if expired { // has been expired if expired { // has been expired
// since the parent function of Expire() runs serially, // since the parent function of Expire() runs serially,
// there is no need for lock here // there is no need for lock here
e := newEvent(Expire, n.Path, UndefIndex, UndefTerm) e := newEvent(Expire, n.Path, UndefIndex, UndefTerm)
s.WatcherHub.notify(e) n.store.WatcherHub.notify(e)
n.Remove(true, nil) n.Remove(true, nil)
s.Stats.Inc(ExpireCount) n.store.Stats.Inc(ExpireCount)
return return
} }
@ -289,17 +294,17 @@ func (n *Node) Expire(s *Store) {
// before expire get the lock, the expiration time // before expire get the lock, the expiration time
// of the node may be updated. // of the node may be updated.
// we have to check again when get the lock // we have to check again when get the lock
s.worldLock.Lock() n.store.worldLock.Lock()
defer s.worldLock.Unlock() defer n.store.worldLock.Unlock()
expired, _ := n.IsExpired() expired, _ := n.IsExpired()
if expired { if expired {
e := newEvent(Expire, n.Path, UndefIndex, UndefTerm) e := newEvent(Expire, n.Path, UndefIndex, UndefTerm)
s.WatcherHub.notify(e) n.store.WatcherHub.notify(e)
n.Remove(true, nil) n.Remove(true, nil)
s.Stats.Inc(ExpireCount) n.store.Stats.Inc(ExpireCount)
} }
return return
@ -355,7 +360,7 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
} }
} }
func (n *Node) UpdateTTL(expireTime time.Time, s *Store) { func (n *Node) UpdateTTL(expireTime time.Time) {
if !n.IsPermanent() { if !n.IsPermanent() {
// check if the node has been expired // check if the node has been expired
// if the node is not expired, we need to stop the go routine associated with // if the node is not expired, we need to stop the go routine associated with
@ -369,7 +374,7 @@ func (n *Node) UpdateTTL(expireTime time.Time, s *Store) {
if expireTime.Sub(Permanent) != 0 { if expireTime.Sub(Permanent) != 0 {
n.ExpireTime = expireTime n.ExpireTime = expireTime
n.Expire(s) n.Expire()
} }
} }
@ -378,10 +383,10 @@ func (n *Node) UpdateTTL(expireTime time.Time, s *Store) {
// If the node is a key-value pair, it will clone the pair. // If the node is a key-value pair, it will clone the pair.
func (n *Node) Clone() *Node { func (n *Node) Clone() *Node {
if !n.IsDir() { if !n.IsDir() {
return newKV(n.Path, n.Value, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime) return newKV(n.store, n.Path, n.Value, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime)
} }
clone := newDir(n.Path, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime) clone := newDir(n.store, n.Path, n.CreateIndex, n.CreateTerm, n.Parent, n.ACL, n.ExpireTime)
for key, child := range n.Children { for key, child := range n.Children {
clone.Children[key] = child.Clone() clone.Children[key] = child.Clone()
@ -397,15 +402,16 @@ func (n *Node) Clone() *Node {
// call this function on its children. // call this function on its children.
// We check the expire last since we need to recover the whole structure first and add all the // We check the expire last since we need to recover the whole structure first and add all the
// notifications into the event history. // notifications into the event history.
func (n *Node) recoverAndclean(s *Store) { func (n *Node) recoverAndclean() {
if n.IsDir() { if n.IsDir() {
for _, child := range n.Children { for _, child := range n.Children {
child.Parent = n child.Parent = n
child.recoverAndclean(s) child.store = n.store
child.recoverAndclean()
} }
} }
n.stopExpire = make(chan bool, 1) n.stopExpire = make(chan bool, 1)
n.Expire(s) n.Expire()
} }

View File

@ -7,7 +7,7 @@ import (
) )
func TestBasicStats(t *testing.T) { func TestBasicStats(t *testing.T) {
s := New() s := newStore()
keys := GenKeys(rand.Intn(100), 5) keys := GenKeys(rand.Intn(100), 5)
var i uint64 var i uint64
@ -140,7 +140,7 @@ func TestBasicStats(t *testing.T) {
t.Fatalf("TestAndSetFail [%d] != Stats.TestAndSetFail [%d]", TestAndSetFail, s.Stats.TestAndSetFail) t.Fatalf("TestAndSetFail [%d] != Stats.TestAndSetFail [%d]", TestAndSetFail, s.Stats.TestAndSetFail)
} }
s = New() s = newStore()
SetSuccess = 0 SetSuccess = 0
SetFail = 0 SetFail = 0

View File

@ -13,7 +13,21 @@ import (
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
) )
type Store struct { type Store interface {
Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error)
Create(nodePath string, value string, incrementalSuffix bool, force bool,
expireTime time.Time, index uint64, term uint64) (*Event, error)
Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error)
TestAndSet(nodePath string, prevValue string, prevIndex uint64,
value string, expireTime time.Time, index uint64, term uint64) (*Event, error)
Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error)
Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error)
Save() ([]byte, error)
Recovery(state []byte) error
JsonStats() []byte
}
type store struct {
Root *Node Root *Node
WatcherHub *watcherHub WatcherHub *watcherHub
Index uint64 Index uint64
@ -22,9 +36,13 @@ type Store struct {
worldLock sync.RWMutex // stop the world lock worldLock sync.RWMutex // stop the world lock
} }
func New() *Store { func New() Store {
s := new(Store) return newStore()
s.Root = newDir("/", UndefIndex, UndefTerm, nil, "", Permanent) }
func newStore() *store {
s := new(store)
s.Root = newDir(s, "/", UndefIndex, UndefTerm, nil, "", Permanent)
s.Stats = newStats() s.Stats = newStats()
s.WatcherHub = newWatchHub(1000) s.WatcherHub = newWatchHub(1000)
@ -34,7 +52,7 @@ func New() *Store {
// Get function returns a get event. // Get function returns a get event.
// If recursive is true, it will return all the content under the node path. // If recursive is true, it will return all the content under the node path.
// If sorted is true, it will sort the content by keys. // If sorted is true, it will sort the content by keys.
func (s *Store) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) { func (s *store) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) {
s.worldLock.RLock() s.worldLock.RLock()
defer s.worldLock.RUnlock() defer s.worldLock.RUnlock()
@ -89,7 +107,7 @@ func (s *Store) Get(nodePath string, recursive, sorted bool, index uint64, term
// Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl. // Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl.
// If the node has already existed, create will fail. // If the node has already existed, create will fail.
// If any node on the path is a file, create will fail. // If any node on the path is a file, create will fail.
func (s *Store) Create(nodePath string, value string, incrementalSuffix bool, force bool, func (s *store) Create(nodePath string, value string, incrementalSuffix bool, force bool,
expireTime time.Time, index uint64, term uint64) (*Event, error) { expireTime time.Time, index uint64, term uint64) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath)) nodePath = path.Clean(path.Join("/", nodePath))
@ -101,7 +119,7 @@ func (s *Store) Create(nodePath string, value string, incrementalSuffix bool, fo
// Update function updates the value/ttl of the node. // Update function updates the value/ttl of the node.
// If the node is a file, the value and the ttl can be updated. // If the node is a file, the value and the ttl can be updated.
// If the node is a directory, only the ttl can be updated. // If the node is a directory, only the ttl can be updated.
func (s *Store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) { func (s *store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
s.worldLock.Lock() s.worldLock.Lock()
defer s.worldLock.Unlock() defer s.worldLock.Unlock()
nodePath = path.Clean(path.Join("/", nodePath)) nodePath = path.Clean(path.Join("/", nodePath))
@ -127,7 +145,7 @@ func (s *Store) Update(nodePath string, newValue string, expireTime time.Time, i
} }
// update ttl // update ttl
n.UpdateTTL(expireTime, s) n.UpdateTTL(expireTime)
e.Expiration, e.TTL = n.ExpirationAndTTL() e.Expiration, e.TTL = n.ExpirationAndTTL()
@ -138,7 +156,7 @@ func (s *Store) Update(nodePath string, newValue string, expireTime time.Time, i
return e, nil return e, nil
} }
func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64, func (s *store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
value string, expireTime time.Time, index uint64, term uint64) (*Event, error) { value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath)) nodePath = path.Clean(path.Join("/", nodePath))
@ -168,7 +186,7 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
// if test succeed, write the value // if test succeed, write the value
n.Write(value, index, term) n.Write(value, index, term)
n.UpdateTTL(expireTime, s) n.UpdateTTL(expireTime)
e.Value = value e.Value = value
e.Expiration, e.TTL = n.ExpirationAndTTL() e.Expiration, e.TTL = n.ExpirationAndTTL()
@ -185,7 +203,7 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
// Delete function deletes the node at the given path. // Delete function deletes the node at the given path.
// If the node is a directory, recursive must be true to delete it. // If the node is a directory, recursive must be true to delete it.
func (s *Store) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) { func (s *store) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) {
nodePath = path.Clean(path.Join("/", nodePath)) nodePath = path.Clean(path.Join("/", nodePath))
s.worldLock.Lock() s.worldLock.Lock()
@ -224,7 +242,7 @@ func (s *Store) Delete(nodePath string, recursive bool, index uint64, term uint6
return e, nil return e, nil
} }
func (s *Store) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) { func (s *store) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) {
prefix = path.Clean(path.Join("/", prefix)) prefix = path.Clean(path.Join("/", prefix))
s.worldLock.RLock() s.worldLock.RLock()
@ -252,7 +270,7 @@ func (s *Store) Watch(prefix string, recursive bool, sinceIndex uint64, index ui
} }
// walk function walks all the nodePath and apply the walkFunc on each directory // walk function walks all the nodePath and apply the walkFunc on each directory
func (s *Store) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, *etcdErr.Error)) (*Node, *etcdErr.Error) { func (s *store) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, *etcdErr.Error)) (*Node, *etcdErr.Error) {
components := strings.Split(nodePath, "/") components := strings.Split(nodePath, "/")
curr := s.Root curr := s.Root
@ -273,7 +291,7 @@ func (s *Store) walk(nodePath string, walkFunc func(prev *Node, component string
return curr, nil return curr, nil
} }
func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix bool, force bool, func (s *store) internalCreate(nodePath string, value string, incrementalSuffix bool, force bool,
expireTime time.Time, index uint64, term uint64, action string) (*Event, error) { expireTime time.Time, index uint64, term uint64, action string) (*Event, error) {
s.Index, s.Term = index, term s.Index, s.Term = index, term
@ -316,12 +334,12 @@ func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix
if len(value) != 0 { // create file if len(value) != 0 { // create file
e.Value = value e.Value = value
n = newKV(nodePath, value, index, term, d, "", expireTime) n = newKV(s, nodePath, value, index, term, d, "", expireTime)
} else { // create directory } else { // create directory
e.Dir = true e.Dir = true
n = newDir(nodePath, index, term, d, "", expireTime) n = newDir(s, nodePath, index, term, d, "", expireTime)
} }
@ -334,7 +352,7 @@ func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix
// Node with TTL // Node with TTL
if expireTime.Sub(Permanent) != 0 { if expireTime.Sub(Permanent) != 0 {
n.Expire(s) n.Expire()
e.Expiration, e.TTL = n.ExpirationAndTTL() e.Expiration, e.TTL = n.ExpirationAndTTL()
} }
@ -344,7 +362,7 @@ func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix
} }
// InternalGet function get the node of the given nodePath. // InternalGet function get the node of the given nodePath.
func (s *Store) internalGet(nodePath string, index uint64, term uint64) (*Node, *etcdErr.Error) { func (s *store) internalGet(nodePath string, index uint64, term uint64) (*Node, *etcdErr.Error) {
nodePath = path.Clean(path.Join("/", nodePath)) nodePath = path.Clean(path.Join("/", nodePath))
// update file system known index and term // update file system known index and term
@ -379,7 +397,7 @@ func (s *Store) internalGet(nodePath string, index uint64, term uint64) (*Node,
// If it is a directory, this function will return the pointer to that node. // If it is a directory, this function will return the pointer to that node.
// If it does not exist, this function will create a new directory and return the pointer to that node. // If it does not exist, this function will create a new directory and return the pointer to that node.
// If it is a file, this function will return error. // If it is a file, this function will return error.
func (s *Store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) { func (s *store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) {
node, ok := parent.Children[dirName] node, ok := parent.Children[dirName]
if ok { if ok {
@ -390,7 +408,7 @@ func (s *Store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) {
return nil, etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, UndefIndex, UndefTerm) return nil, etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, UndefIndex, UndefTerm)
} }
n := newDir(path.Join(parent.Path, dirName), s.Index, s.Term, parent, parent.ACL, Permanent) n := newDir(s, path.Join(parent.Path, dirName), s.Index, s.Term, parent, parent.ACL, Permanent)
parent.Children[dirName] = n parent.Children[dirName] = n
@ -401,10 +419,10 @@ func (s *Store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) {
// Save function will not be able to save the state of watchers. // Save function will not be able to save the state of watchers.
// Save function will not save the parent field of the node. Or there will // Save function will not save the parent field of the node. Or there will
// be cyclic dependencies issue for the json package. // be cyclic dependencies issue for the json package.
func (s *Store) Save() ([]byte, error) { func (s *store) Save() ([]byte, error) {
s.worldLock.Lock() s.worldLock.Lock()
clonedStore := New() clonedStore := newStore()
clonedStore.Index = s.Index clonedStore.Index = s.Index
clonedStore.Term = s.Term clonedStore.Term = s.Term
clonedStore.Root = s.Root.Clone() clonedStore.Root = s.Root.Clone()
@ -426,7 +444,7 @@ func (s *Store) Save() ([]byte, error) {
// It needs to recovery the parent field of the nodes. // It needs to recovery the parent field of the nodes.
// It needs to delete the expired nodes since the saved time and also // It needs to delete the expired nodes since the saved time and also
// need to create monitor go routines. // need to create monitor go routines.
func (s *Store) Recovery(state []byte) error { func (s *store) Recovery(state []byte) error {
s.worldLock.Lock() s.worldLock.Lock()
defer s.worldLock.Unlock() defer s.worldLock.Unlock()
err := json.Unmarshal(state, s) err := json.Unmarshal(state, s)
@ -435,11 +453,11 @@ func (s *Store) Recovery(state []byte) error {
return err return err
} }
s.Root.recoverAndclean(s) s.Root.recoverAndclean()
return nil return nil
} }
func (s *Store) JsonStats() []byte { func (s *store) JsonStats() []byte {
s.Stats.Watchers = uint64(s.WatcherHub.count) s.Stats.Watchers = uint64(s.WatcherHub.count)
return s.Stats.toJson() return s.Stats.toJson()
} }

View File

@ -8,7 +8,7 @@ import (
) )
func TestCreateAndGet(t *testing.T) { func TestCreateAndGet(t *testing.T) {
s := New() s := newStore()
s.Create("/foobar", "bar", false, false, Permanent, 1, 1) s.Create("/foobar", "bar", false, false, Permanent, 1, 1)
@ -66,7 +66,7 @@ func TestCreateAndGet(t *testing.T) {
} }
func TestUpdateFile(t *testing.T) { func TestUpdateFile(t *testing.T) {
s := New() s := newStore()
_, err := s.Create("/foo/bar", "bar", false, false, Permanent, 1, 1) _, err := s.Create("/foo/bar", "bar", false, false, Permanent, 1, 1)
@ -161,7 +161,7 @@ func TestUpdateFile(t *testing.T) {
} }
func TestListDirectory(t *testing.T) { func TestListDirectory(t *testing.T) {
s := New() s := newStore()
// create dir /foo // create dir /foo
// set key-value /foo/foo=bar // set key-value /foo/foo=bar
@ -206,7 +206,7 @@ func TestListDirectory(t *testing.T) {
} }
func TestRemove(t *testing.T) { func TestRemove(t *testing.T) {
s := New() s := newStore()
s.Create("/foo", "bar", false, false, Permanent, 1, 1) s.Create("/foo", "bar", false, false, Permanent, 1, 1)
_, err := s.Delete("/foo", false, 1, 1) _, err := s.Delete("/foo", false, 1, 1)
@ -245,7 +245,7 @@ func TestRemove(t *testing.T) {
} }
func TestExpire(t *testing.T) { func TestExpire(t *testing.T) {
s := New() s := newStore()
expire := time.Now().Add(time.Second) expire := time.Now().Add(time.Second)
@ -287,7 +287,7 @@ func TestExpire(t *testing.T) {
} }
func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ?
s := New() s := newStore()
s.Create("/foo", "bar", false, false, Permanent, 1, 1) s.Create("/foo", "bar", false, false, Permanent, 1, 1)
// test on wrong previous value // test on wrong previous value
@ -320,7 +320,7 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ?
} }
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
s := New() s := newStore()
// watch at a deeper path // watch at a deeper path
c, _ := s.Watch("/foo/foo/foo", false, 0, 0, 1) c, _ := s.Watch("/foo/foo/foo", false, 0, 0, 1)
s.Create("/foo/foo/foo", "bar", false, false, Permanent, 1, 1) s.Create("/foo/foo/foo", "bar", false, false, Permanent, 1, 1)
@ -409,7 +409,7 @@ func TestWatch(t *testing.T) {
} }
func TestSort(t *testing.T) { func TestSort(t *testing.T) {
s := New() s := newStore()
// simulating random creation // simulating random creation
keys := GenKeys(80, 4) keys := GenKeys(80, 4)
@ -447,7 +447,7 @@ func TestSort(t *testing.T) {
} }
func TestSaveAndRecover(t *testing.T) { func TestSaveAndRecover(t *testing.T) {
s := New() s := newStore()
// simulating random creation // simulating random creation
keys := GenKeys(8, 4) keys := GenKeys(8, 4)
@ -469,7 +469,7 @@ func TestSaveAndRecover(t *testing.T) {
s.Create("/foo/foo", "bar", false, false, expire, 1, 1) s.Create("/foo/foo", "bar", false, false, expire, 1, 1)
b, err := s.Save() b, err := s.Save()
cloneFs := New() cloneFs := newStore()
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
cloneFs.Recovery(b) cloneFs.Recovery(b)
@ -521,7 +521,7 @@ func GenKeys(num int, depth int) []string {
return keys return keys
} }
func createAndGet(s *Store, path string, t *testing.T) { func createAndGet(s *store, path string, t *testing.T) {
_, err := s.Create(path, "bar", false, false, Permanent, 1, 1) _, err := s.Create(path, "bar", false, false, Permanent, 1, 1)
if err != nil { if err != nil {

View File

@ -27,7 +27,7 @@ func (c *TestAndSetCommand) CommandName() string {
// Set the key-value pair if the current value of the key equals to the given prevValue // Set the key-value pair if the current value of the key equals to the given prevValue
func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) { func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(*Store) s, _ := server.StateMachine().(Store)
e, err := s.TestAndSet(c.Key, c.PrevValue, c.PrevIndex, e, err := s.TestAndSet(c.Key, c.PrevValue, c.PrevIndex,
c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) c.Value, c.ExpireTime, server.CommitIndex(), server.Term())

View File

@ -25,7 +25,7 @@ func (c *UpdateCommand) CommandName() string {
// Update node // Update node
func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) { func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(*Store) s, _ := server.StateMachine().(Store)
e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())

View File

@ -5,7 +5,7 @@ import (
) )
func TestWatcher(t *testing.T) { func TestWatcher(t *testing.T) {
s := New() s := newStore()
wh := s.WatcherHub wh := s.WatcherHub
c, err := wh.watch("/foo", true, 1) c, err := wh.watch("/foo", true, 1)
if err != nil { if err != nil {