diff --git a/clientv3/integration/txn_test.go b/clientv3/integration/txn_test.go new file mode 100644 index 000000000..ffc013125 --- /dev/null +++ b/clientv3/integration/txn_test.go @@ -0,0 +1,103 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "testing" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/integration" + "github.com/coreos/etcd/pkg/testutil" +) + +func TestTxnWriteFail(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + kv := clientv3.NewKV(clus.Client(0)) + clus.Members[0].Stop(t) + <-clus.Members[0].StopNotify() + + resp, err := kv.Txn().Then(clientv3.OpPut("foo", "bar", 0)).Commit() + if err == nil { + t.Fatalf("expected error, got response %v", resp) + } + + // reconnect so cluster terminate doesn't complain about double-close + clus.Members[0].Restart(t) + + // and ensure the put didn't take + gresp, gerr := kv.Get("foo", 0) + if gerr != nil { + t.Fatal(gerr) + } + if len(gresp.Kvs) != 0 { + t.Fatalf("expected no keys, got %v", gresp.Kvs) + } +} + +func TestTxnReadRetry(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + kv := clientv3.NewKV(clus.Client(0)) + clus.Members[0].Stop(t) + <-clus.Members[0].StopNotify() + + donec := make(chan struct{}) + go func() { + _, err := kv.Txn().Then(clientv3.OpGet("foo", 0)).Commit() + if err != nil { + t.Fatalf("expected response, got error %v", err) + } + donec <- struct{}{} + }() + // wait for txn to fail on disconnect + time.Sleep(100 * time.Millisecond) + + // restart node; client should resume + clus.Members[0].Restart(t) + select { + case <-donec: + case <-time.After(5 * time.Second): + t.Fatalf("waited too long") + } +} + +func TestTxnSuccess(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + kv := clientv3.NewKV(clus.Client(0)) + _, err := kv.Txn().Then(clientv3.OpPut("foo", "bar", 0)).Commit() + if err != nil { + t.Fatal(err) + } + + resp, err := kv.Get("foo", 0) + if err != nil { + t.Fatal(err) + } + if len(resp.Kvs) != 1 || string(resp.Kvs[0].Key) != "foo" { + t.Fatalf("unexpected Get response %v", resp) + } +} diff --git a/clientv3/txn.go b/clientv3/txn.go index 0e0804457..a09e2ec73 100644 --- a/clientv3/txn.go +++ b/clientv3/txn.go @@ -82,6 +82,8 @@ func (txn *txn) If(cs ...Cmp) Txn { panic("cannot call If after Else!") } + txn.cif = true + for _, cmp := range cs { txn.cmps = append(txn.cmps, (*pb.Compare)(&cmp)) } diff --git a/clientv3/txn_test.go b/clientv3/txn_test.go new file mode 100644 index 000000000..463a4f0a6 --- /dev/null +++ b/clientv3/txn_test.go @@ -0,0 +1,106 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "testing" + "time" + + "github.com/coreos/etcd/pkg/testutil" +) + +func TestTxnPanics(t *testing.T) { + defer testutil.AfterTest(t) + + kv := NewKV(&Client{}) + + errc := make(chan string) + df := func() { + if s := recover(); s != nil { + errc <- s.(string) + } + } + + k, tgt := CreatedRevision("foo") + cmp := Compare(k, tgt, "=", 0) + op := OpPut("foo", "bar", 0) + + tests := []struct { + f func() + + err string + }{ + { + f: func() { + defer df() + kv.Txn().If(cmp).If(cmp) + }, + + err: "cannot call If twice!", + }, + { + f: func() { + defer df() + kv.Txn().Then(op).If(cmp) + }, + + err: "cannot call If after Then!", + }, + { + f: func() { + defer df() + kv.Txn().Else(op).If(cmp) + }, + + err: "cannot call If after Else!", + }, + { + f: func() { + defer df() + kv.Txn().Then(op).Then(op) + }, + + err: "cannot call Then twice!", + }, + { + f: func() { + defer df() + kv.Txn().Else(op).Then(op) + }, + + err: "cannot call Then after Else!", + }, + { + f: func() { + defer df() + kv.Txn().Else(op).Else(op) + }, + + err: "cannot call Else twice!", + }, + } + + for i, tt := range tests { + go tt.f() + select { + case err := <-errc: + if err != tt.err { + t.Errorf("#%d: got %s, wanted %s", i, err, tt.err) + } + case <-time.After(time.Second): + t.Errorf("#%d: did not panic, wanted panic %s", i, tt.err) + } + } +} diff --git a/integration/cluster.go b/integration/cluster.go index f4b1b626c..6c884032e 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -630,6 +630,11 @@ func (m *member) Stop(t *testing.T) { m.hss = nil } +// StopNotify unblocks when a member stop completes +func (m *member) StopNotify() <-chan struct{} { + return m.s.StopNotify() +} + // Restart starts the member using the preserved data dir. func (m *member) Restart(t *testing.T) error { newPeerListeners := make([]net.Listener, 0)