Merge pull request #7195 from gyuho/fix-stm-restart

concurrency: fix stm restart on concurrent key deletion
This commit is contained in:
Gyu-Ho Lee 2017-01-20 09:49:16 -08:00 committed by GitHub
commit 3902d5ab0a
2 changed files with 49 additions and 3 deletions

View File

@ -249,11 +249,10 @@ func (s *stmReadCommitted) commit() *v3.TxnResponse {
}
func isKeyCurrent(k string, r *v3.GetResponse) v3.Cmp {
rev := r.Header.Revision + 1
if len(r.Kvs) != 0 {
rev = r.Kvs[0].ModRevision + 1
return v3.Compare(v3.ModRevision(k), "=", r.Kvs[0].ModRevision)
}
return v3.Compare(v3.ModRevision(k), "<", rev)
return v3.Compare(v3.ModRevision(k), "=", 0)
}
func respToValue(resp *v3.GetResponse) string {

View File

@ -197,3 +197,50 @@ func TestSTMSerialize(t *testing.T) {
}
}
}
// TestSTMApplyOnConcurrentDeletion ensures that concurrent key deletion
// fails the first GET revision comparison within STM; trigger retry.
func TestSTMApplyOnConcurrentDeletion(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
etcdc := clus.RandClient()
if _, err := etcdc.Put(context.TODO(), "foo", "bar"); err != nil {
t.Fatal(err)
}
donec, readyc := make(chan struct{}), make(chan struct{})
go func() {
<-readyc
if _, err := etcdc.Delete(context.TODO(), "foo"); err != nil {
t.Fatal(err)
}
close(donec)
}()
try := 0
applyf := func(stm concurrency.STM) error {
try++
stm.Get("foo")
if try == 1 {
// trigger delete to make GET rev comparison outdated
close(readyc)
<-donec
}
stm.Put("foo2", "bar2")
return nil
}
if _, err := concurrency.NewSTMRepeatable(context.TODO(), etcdc, applyf); err != nil {
t.Fatalf("error on stm txn (%v)", err)
}
if try != 2 {
t.Fatalf("STM apply expected to run twice, got %d", try)
}
resp, err := etcdc.Get(context.TODO(), "foo2")
if err != nil {
t.Fatalf("error fetching key (%v)", err)
}
if string(resp.Kvs[0].Value) != "bar2" {
t.Fatalf("bad value. got %+v, expected 'bar2' value", resp)
}
}