support testandset

This commit is contained in:
Xiang Li 2013-09-05 15:38:22 -04:00
parent 4c286fde23
commit 227d79e2bf
5 changed files with 109 additions and 33 deletions

View File

@ -14,7 +14,7 @@ func init() {
// command related errors
errors[100] = "Key Not Found"
errors[101] = "The given PrevValue is not equal to the value of the key"
errors[101] = "Test Failed"
errors[102] = "Not A File"
errors[103] = "Reached the max number of machines in the cluster"
errors[104] = "Not A Directory"

View File

@ -7,11 +7,10 @@ import (
)
const (
Get = "get"
Set = "set"
Delete = "delete"
TestAndSet = "testAndSet"
TestIAndSet = "testiAndSet"
Get = "get"
Set = "set"
Delete = "delete"
TestAndSet = "testAndSet"
)
type Event struct {

View File

@ -1,6 +1,7 @@
package fileSystem
import (
"fmt"
"path"
"strings"
"time"
@ -84,7 +85,7 @@ func (fs *FileSystem) Set(keyPath string, value string, expireTime time.Time, in
if f != nil { // update previous file if exist
e.PrevValue = f.Value
f.Write(e.Value)
f.Write(e.Value, index, term)
// if the previous ExpireTime is not Permanent and expireTime is given
// we stop the previous expire routine
@ -115,12 +116,43 @@ func (fs *FileSystem) Set(keyPath string, value string, expireTime time.Time, in
return e, nil
}
func (fs *FileSystem) TestAndSet(keyPath string, prevValue string, prevIndex uint64, index uint64, term uint64) {
func (fs *FileSystem) TestAndSet(keyPath string, prevValue string, prevIndex uint64, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
f, err := fs.InternalGet(keyPath, index, term)
}
if err != nil {
func (fs *FileSystem) TestIndexAndSet() {
etcdError, _ := err.(etcdErr.Error)
if etcdError.ErrorCode == 100 { // file does not exist
if prevValue == "" && prevIndex == 0 { // test against if prevValue is empty
fs.Set(keyPath, value, expireTime, index, term)
e := newEvent(TestAndSet, keyPath, index, term)
e.Value = value
return e, nil
}
return nil, err
}
return nil, err
}
if f.IsDir() { // can only test and set file
return nil, etcdErr.NewError(102, keyPath)
}
if f.Value == prevValue || f.ModifiedIndex == prevIndex {
// if test succeed, write the value
e := newEvent(TestAndSet, keyPath, index, term)
e.PrevValue = f.Value
e.Value = value
f.Write(value, index, term)
return e, nil
}
cause := fmt.Sprintf("[%v/%v] [%v/%v]", prevValue, f.Value, prevIndex, f.ModifiedIndex)
return nil, etcdErr.NewError(101, cause)
}
func (fs *FileSystem) Delete(keyPath string, recurisive bool, index uint64, term uint64) (*Event, error) {
@ -176,13 +208,18 @@ func (fs *FileSystem) InternalGet(keyPath string, index uint64, term uint64) (*N
// update file system known index and term
fs.Index, fs.Term = index, term
walkFunc := func(parent *Node, dirName string) (*Node, error) {
child, ok := parent.Children[dirName]
walkFunc := func(parent *Node, name string) (*Node, error) {
if !parent.IsDir() {
return nil, etcdErr.NewError(104, parent.Path)
}
child, ok := parent.Children[name]
if ok {
return child, nil
}
return nil, etcdErr.NewError(100, "get")
return nil, etcdErr.NewError(100, path.Join(parent.Path, name))
}
f, err := fs.walk(keyPath, walkFunc)

View File

@ -157,6 +157,40 @@ func TestExpire(t *testing.T) {
}
func TestTestAndSet(t *testing.T) {
fs := New()
fs.Set("/foo", "bar", Permanent, 1, 1)
// test on wrong previous value
_, err := fs.TestAndSet("/foo", "barbar", 0, "car", Permanent, 2, 1)
if err == nil {
t.Fatal("test and set should fail barbar != bar")
}
// test on value
e, err := fs.TestAndSet("/foo", "bar", 0, "car", Permanent, 3, 1)
if err != nil {
t.Fatal("test and set should succeed bar == bar")
}
if e.PrevValue != "bar" || e.Value != "car" {
t.Fatalf("[%v/%v] [%v/%v]", e.PrevValue, "bar", e.Value, "car")
}
// test on index
e, err = fs.TestAndSet("/foo", "", 3, "bar", Permanent, 4, 1)
if err != nil {
t.Fatal("test and set should succeed index 3 == 3")
}
if e.PrevValue != "car" || e.Value != "bar" {
t.Fatalf("[%v/%v] [%v/%v]", e.PrevValue, "car", e.Value, "bar")
}
}
func setAndGet(fs *FileSystem, path string, t *testing.T) {
_, err := fs.Set(path, "bar", Permanent, 1, 1)

View File

@ -19,29 +19,33 @@ const (
)
type Node struct {
Path string
CreateIndex uint64
CreateTerm uint64
Parent *Node
ExpireTime time.Time
ACL string
Value string // for key-value pair
Children map[string]*Node // for directory
status int
mu sync.Mutex
stopExpire chan bool // stop expire routine channel
Path string
CreateIndex uint64
CreateTerm uint64
ModifiedIndex uint64
ModifiedTerm uint64
Parent *Node
ExpireTime time.Time
ACL string
Value string // for key-value pair
Children map[string]*Node // for directory
status int
mu sync.Mutex
stopExpire chan bool // stop expire routine channel
}
func newFile(keyPath string, value string, createIndex uint64, createTerm uint64, parent *Node, ACL string, expireTime time.Time) *Node {
return &Node{
Path: keyPath,
CreateIndex: createIndex,
CreateTerm: createTerm,
Parent: parent,
ACL: ACL,
stopExpire: make(chan bool, 1),
ExpireTime: expireTime,
Value: value,
Path: keyPath,
CreateIndex: createIndex,
CreateTerm: createTerm,
ModifiedIndex: createIndex,
ModifiedTerm: createTerm,
Parent: parent,
ACL: ACL,
stopExpire: make(chan bool, 1),
ExpireTime: expireTime,
Value: value,
}
}
@ -113,12 +117,14 @@ func (n *Node) Read() (string, error) {
// Set function set the value of the node to the given value.
// If the receiver node is a directory, a "Not A File" error will be returned.
func (n *Node) Write(value string) error {
func (n *Node) Write(value string, index uint64, term uint64) error {
if n.IsDir() {
return etcdErr.NewError(102, "")
}
n.Value = value
n.ModifiedIndex = index
n.ModifiedTerm = term
return nil
}