mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
commit
04ac8969a1
173
store/store.go
173
store/store.go
@ -112,20 +112,34 @@ func (s *store) Index() uint64 {
|
|||||||
// If recursive is true, it will return all the content under the node path.
|
// If recursive is true, it will return all the content under the node path.
|
||||||
// If sorted is true, it will sort the content by keys.
|
// If sorted is true, it will sort the content by keys.
|
||||||
func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
|
func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
|
||||||
s.worldLock.RLock()
|
var err *etcdErr.Error
|
||||||
defer s.worldLock.RUnlock()
|
|
||||||
|
|
||||||
nodePath = path.Clean(path.Join("/", nodePath))
|
s.worldLock.Lock()
|
||||||
|
defer s.worldLock.Unlock()
|
||||||
|
|
||||||
n, err := s.internalGet(nodePath)
|
defer func() {
|
||||||
|
if err == nil {
|
||||||
|
s.Stats.Inc(GetSuccess)
|
||||||
|
if recursive {
|
||||||
|
reportReadSuccess(GetRecursive)
|
||||||
|
} else {
|
||||||
|
reportReadSuccess(Get)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
s.Stats.Inc(GetFail)
|
s.Stats.Inc(GetFail)
|
||||||
if recursive {
|
if recursive {
|
||||||
reportReadFailure(GetRecursive)
|
reportReadFailure(GetRecursive)
|
||||||
} else {
|
} else {
|
||||||
reportReadFailure(Get)
|
reportReadFailure(Get)
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
nodePath = path.Clean(path.Join("/", nodePath))
|
||||||
|
|
||||||
|
n, err := s.internalGet(nodePath)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -133,13 +147,6 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
|
|||||||
e.EtcdIndex = s.CurrentIndex
|
e.EtcdIndex = s.CurrentIndex
|
||||||
e.Node.loadInternalNode(n, recursive, sorted, s.clock)
|
e.Node.loadInternalNode(n, recursive, sorted, s.clock)
|
||||||
|
|
||||||
s.Stats.Inc(GetSuccess)
|
|
||||||
if recursive {
|
|
||||||
reportReadSuccess(GetRecursive)
|
|
||||||
} else {
|
|
||||||
reportReadSuccess(Get)
|
|
||||||
}
|
|
||||||
|
|
||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -147,26 +154,36 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
|
|||||||
// If the node has already existed, create will fail.
|
// If the node has already existed, create will fail.
|
||||||
// If any node on the path is a file, create will fail.
|
// If any node on the path is a file, create will fail.
|
||||||
func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireTime time.Time) (*Event, error) {
|
func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireTime time.Time) (*Event, error) {
|
||||||
|
var err *etcdErr.Error
|
||||||
|
|
||||||
s.worldLock.Lock()
|
s.worldLock.Lock()
|
||||||
defer s.worldLock.Unlock()
|
defer s.worldLock.Unlock()
|
||||||
e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create)
|
|
||||||
|
|
||||||
if err == nil {
|
defer func() {
|
||||||
e.EtcdIndex = s.CurrentIndex
|
if err == nil {
|
||||||
s.WatcherHub.notify(e)
|
s.Stats.Inc(CreateSuccess)
|
||||||
s.Stats.Inc(CreateSuccess)
|
reportWriteSuccess(Create)
|
||||||
reportWriteSuccess(Create)
|
return
|
||||||
} else {
|
}
|
||||||
|
|
||||||
s.Stats.Inc(CreateFail)
|
s.Stats.Inc(CreateFail)
|
||||||
reportWriteFailure(Create)
|
reportWriteFailure(Create)
|
||||||
|
}()
|
||||||
|
|
||||||
|
e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return e, err
|
e.EtcdIndex = s.CurrentIndex
|
||||||
|
s.WatcherHub.notify(e)
|
||||||
|
|
||||||
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set creates or replace the node at nodePath.
|
// Set creates or replace the node at nodePath.
|
||||||
func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) {
|
func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) {
|
||||||
var err error
|
var err *etcdErr.Error
|
||||||
|
|
||||||
s.worldLock.Lock()
|
s.worldLock.Lock()
|
||||||
defer s.worldLock.Unlock()
|
defer s.worldLock.Unlock()
|
||||||
@ -175,10 +192,11 @@ func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Tim
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
s.Stats.Inc(SetSuccess)
|
s.Stats.Inc(SetSuccess)
|
||||||
reportWriteSuccess(Set)
|
reportWriteSuccess(Set)
|
||||||
} else {
|
return
|
||||||
s.Stats.Inc(SetFail)
|
|
||||||
reportWriteFailure(Set)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.Stats.Inc(SetFail)
|
||||||
|
reportWriteFailure(Set)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Get prevNode value
|
// Get prevNode value
|
||||||
@ -222,9 +240,22 @@ func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64)
|
|||||||
func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
|
func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
|
||||||
value string, expireTime time.Time) (*Event, error) {
|
value string, expireTime time.Time) (*Event, error) {
|
||||||
|
|
||||||
|
var err *etcdErr.Error
|
||||||
|
|
||||||
s.worldLock.Lock()
|
s.worldLock.Lock()
|
||||||
defer s.worldLock.Unlock()
|
defer s.worldLock.Unlock()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if err == nil {
|
||||||
|
s.Stats.Inc(CompareAndSwapSuccess)
|
||||||
|
reportWriteSuccess(CompareAndSwap)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Stats.Inc(CompareAndSwapFail)
|
||||||
|
reportWriteFailure(CompareAndSwap)
|
||||||
|
}()
|
||||||
|
|
||||||
nodePath = path.Clean(path.Join("/", nodePath))
|
nodePath = path.Clean(path.Join("/", nodePath))
|
||||||
// we do not allow the user to change "/"
|
// we do not allow the user to change "/"
|
||||||
if s.readonlySet.Contains(nodePath) {
|
if s.readonlySet.Contains(nodePath) {
|
||||||
@ -232,26 +263,20 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
|
|||||||
}
|
}
|
||||||
|
|
||||||
n, err := s.internalGet(nodePath)
|
n, err := s.internalGet(nodePath)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.Stats.Inc(CompareAndSwapFail)
|
|
||||||
reportWriteFailure(CompareAndSwap)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if n.IsDir() { // can only compare and swap file
|
if n.IsDir() { // can only compare and swap file
|
||||||
s.Stats.Inc(CompareAndSwapFail)
|
err = etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
|
||||||
reportWriteFailure(CompareAndSwap)
|
return nil, err
|
||||||
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If both of the prevValue and prevIndex are given, we will test both of them.
|
// If both of the prevValue and prevIndex are given, we will test both of them.
|
||||||
// Command will be executed, only if both of the tests are successful.
|
// Command will be executed, only if both of the tests are successful.
|
||||||
if ok, which := n.Compare(prevValue, prevIndex); !ok {
|
if ok, which := n.Compare(prevValue, prevIndex); !ok {
|
||||||
cause := getCompareFailCause(n, which, prevValue, prevIndex)
|
cause := getCompareFailCause(n, which, prevValue, prevIndex)
|
||||||
s.Stats.Inc(CompareAndSwapFail)
|
err = etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
|
||||||
reportWriteFailure(CompareAndSwap)
|
return nil, err
|
||||||
return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// update etcd index
|
// update etcd index
|
||||||
@ -272,8 +297,6 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
|
|||||||
eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
|
eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
|
||||||
|
|
||||||
s.WatcherHub.notify(e)
|
s.WatcherHub.notify(e)
|
||||||
s.Stats.Inc(CompareAndSwapSuccess)
|
|
||||||
reportWriteSuccess(CompareAndSwap)
|
|
||||||
|
|
||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
@ -281,9 +304,22 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
|
|||||||
// Delete deletes the node at the given path.
|
// Delete deletes the node at the given path.
|
||||||
// If the node is a directory, recursive must be true to delete it.
|
// If the node is a directory, recursive must be true to delete it.
|
||||||
func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
|
func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
|
||||||
|
var err *etcdErr.Error
|
||||||
|
|
||||||
s.worldLock.Lock()
|
s.worldLock.Lock()
|
||||||
defer s.worldLock.Unlock()
|
defer s.worldLock.Unlock()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if err == nil {
|
||||||
|
s.Stats.Inc(DeleteSuccess)
|
||||||
|
reportWriteSuccess(Delete)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Stats.Inc(DeleteFail)
|
||||||
|
reportWriteFailure(Delete)
|
||||||
|
}()
|
||||||
|
|
||||||
nodePath = path.Clean(path.Join("/", nodePath))
|
nodePath = path.Clean(path.Join("/", nodePath))
|
||||||
// we do not allow the user to change "/"
|
// we do not allow the user to change "/"
|
||||||
if s.readonlySet.Contains(nodePath) {
|
if s.readonlySet.Contains(nodePath) {
|
||||||
@ -296,10 +332,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
n, err := s.internalGet(nodePath)
|
n, err := s.internalGet(nodePath)
|
||||||
|
|
||||||
if err != nil { // if the node does not exist, return error
|
if err != nil { // if the node does not exist, return error
|
||||||
s.Stats.Inc(DeleteFail)
|
|
||||||
reportWriteFailure(Delete)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -319,10 +352,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = n.Remove(dir, recursive, callback)
|
err = n.Remove(dir, recursive, callback)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.Stats.Inc(DeleteFail)
|
|
||||||
reportWriteFailure(Delete)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -331,29 +361,33 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
|
|||||||
|
|
||||||
s.WatcherHub.notify(e)
|
s.WatcherHub.notify(e)
|
||||||
|
|
||||||
s.Stats.Inc(DeleteSuccess)
|
|
||||||
reportWriteSuccess(Delete)
|
|
||||||
|
|
||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
|
func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
|
||||||
nodePath = path.Clean(path.Join("/", nodePath))
|
var err *etcdErr.Error
|
||||||
|
|
||||||
s.worldLock.Lock()
|
s.worldLock.Lock()
|
||||||
defer s.worldLock.Unlock()
|
defer s.worldLock.Unlock()
|
||||||
|
|
||||||
n, err := s.internalGet(nodePath)
|
defer func() {
|
||||||
|
if err == nil {
|
||||||
|
s.Stats.Inc(CompareAndDeleteSuccess)
|
||||||
|
reportWriteSuccess(CompareAndDelete)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil { // if the node does not exist, return error
|
|
||||||
s.Stats.Inc(CompareAndDeleteFail)
|
s.Stats.Inc(CompareAndDeleteFail)
|
||||||
reportWriteFailure(CompareAndDelete)
|
reportWriteFailure(CompareAndDelete)
|
||||||
|
}()
|
||||||
|
|
||||||
|
nodePath = path.Clean(path.Join("/", nodePath))
|
||||||
|
|
||||||
|
n, err := s.internalGet(nodePath)
|
||||||
|
if err != nil { // if the node does not exist, return error
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if n.IsDir() { // can only compare and delete file
|
if n.IsDir() { // can only compare and delete file
|
||||||
s.Stats.Inc(CompareAndSwapFail)
|
|
||||||
reportWriteFailure(CompareAndDelete)
|
|
||||||
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
|
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -361,8 +395,6 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
|
|||||||
// Command will be executed, only if both of the tests are successful.
|
// Command will be executed, only if both of the tests are successful.
|
||||||
if ok, which := n.Compare(prevValue, prevIndex); !ok {
|
if ok, which := n.Compare(prevValue, prevIndex); !ok {
|
||||||
cause := getCompareFailCause(n, which, prevValue, prevIndex)
|
cause := getCompareFailCause(n, which, prevValue, prevIndex)
|
||||||
s.Stats.Inc(CompareAndDeleteFail)
|
|
||||||
reportWriteFailure(CompareAndDelete)
|
|
||||||
return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
|
return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -384,8 +416,6 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
|
|||||||
}
|
}
|
||||||
|
|
||||||
s.WatcherHub.notify(e)
|
s.WatcherHub.notify(e)
|
||||||
s.Stats.Inc(CompareAndDeleteSuccess)
|
|
||||||
reportWriteSuccess(CompareAndDelete)
|
|
||||||
|
|
||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
@ -423,7 +453,6 @@ func (s *store) walk(nodePath string, walkFunc func(prev *node, component string
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return curr, nil
|
return curr, nil
|
||||||
@ -433,9 +462,22 @@ func (s *store) walk(nodePath string, walkFunc func(prev *node, component string
|
|||||||
// If the node is a file, the value and the ttl can be updated.
|
// If the node is a file, the value and the ttl can be updated.
|
||||||
// If the node is a directory, only the ttl can be updated.
|
// If the node is a directory, only the ttl can be updated.
|
||||||
func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
|
func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
|
||||||
|
var err *etcdErr.Error
|
||||||
|
|
||||||
s.worldLock.Lock()
|
s.worldLock.Lock()
|
||||||
defer s.worldLock.Unlock()
|
defer s.worldLock.Unlock()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if err == nil {
|
||||||
|
s.Stats.Inc(UpdateSuccess)
|
||||||
|
reportWriteSuccess(Update)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Stats.Inc(UpdateFail)
|
||||||
|
reportWriteFailure(Update)
|
||||||
|
}()
|
||||||
|
|
||||||
nodePath = path.Clean(path.Join("/", nodePath))
|
nodePath = path.Clean(path.Join("/", nodePath))
|
||||||
// we do not allow the user to change "/"
|
// we do not allow the user to change "/"
|
||||||
if s.readonlySet.Contains(nodePath) {
|
if s.readonlySet.Contains(nodePath) {
|
||||||
@ -445,25 +487,19 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
|
|||||||
currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
|
currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
|
||||||
|
|
||||||
n, err := s.internalGet(nodePath)
|
n, err := s.internalGet(nodePath)
|
||||||
|
|
||||||
if err != nil { // if the node does not exist, return error
|
if err != nil { // if the node does not exist, return error
|
||||||
s.Stats.Inc(UpdateFail)
|
|
||||||
reportWriteFailure(Update)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if n.IsDir() && len(newValue) != 0 {
|
||||||
|
// if the node is a directory, we cannot update value to non-empty
|
||||||
|
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
|
||||||
|
}
|
||||||
|
|
||||||
e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
|
e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
|
||||||
e.EtcdIndex = nextIndex
|
e.EtcdIndex = nextIndex
|
||||||
e.PrevNode = n.Repr(false, false, s.clock)
|
e.PrevNode = n.Repr(false, false, s.clock)
|
||||||
eNode := e.Node
|
eNode := e.Node
|
||||||
|
|
||||||
if n.IsDir() && len(newValue) != 0 {
|
|
||||||
// if the node is a directory, we cannot update value to non-empty
|
|
||||||
s.Stats.Inc(UpdateFail)
|
|
||||||
reportWriteFailure(Update)
|
|
||||||
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
|
|
||||||
}
|
|
||||||
|
|
||||||
n.Write(newValue, nextIndex)
|
n.Write(newValue, nextIndex)
|
||||||
|
|
||||||
if n.IsDir() {
|
if n.IsDir() {
|
||||||
@ -481,16 +517,13 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
|
|||||||
|
|
||||||
s.WatcherHub.notify(e)
|
s.WatcherHub.notify(e)
|
||||||
|
|
||||||
s.Stats.Inc(UpdateSuccess)
|
|
||||||
reportWriteSuccess(Update)
|
|
||||||
|
|
||||||
s.CurrentIndex = nextIndex
|
s.CurrentIndex = nextIndex
|
||||||
|
|
||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
|
func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
|
||||||
expireTime time.Time, action string) (*Event, error) {
|
expireTime time.Time, action string) (*Event, *etcdErr.Error) {
|
||||||
|
|
||||||
currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
|
currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user