From dd45d04b2d7cf07c27f6322dfb794efae84978ea Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Tue, 6 Oct 2020 14:19:02 +0200 Subject: [PATCH] clientv3/concurrency: Expose examples close to the source-code. --- clientv3/concurrency/example_election_test.go | 1 + clientv3/concurrency/example_mutex_test.go | 1 + clientv3/concurrency/example_stm_test.go | 1 + clientv3/concurrency/main_test.go | 35 ++++ .../clientv3/concurrency/election_test.go | 2 +- .../concurrency/example_election_test.go | 107 +++++----- .../concurrency/example_mutex_test.go | 183 ++++++++++-------- .../clientv3/concurrency/example_stm_test.go | 130 +++++++------ .../clientv3/concurrency/main_test.go | 26 ++- .../clientv3/concurrency/mutex_test.go | 2 +- 10 files changed, 281 insertions(+), 207 deletions(-) create mode 120000 clientv3/concurrency/example_election_test.go create mode 120000 clientv3/concurrency/example_mutex_test.go create mode 120000 clientv3/concurrency/example_stm_test.go create mode 100644 clientv3/concurrency/main_test.go diff --git a/clientv3/concurrency/example_election_test.go b/clientv3/concurrency/example_election_test.go new file mode 120000 index 000000000..6ae1dffe2 --- /dev/null +++ b/clientv3/concurrency/example_election_test.go @@ -0,0 +1 @@ +../../tests/integration/clientv3/concurrency/example_election_test.go \ No newline at end of file diff --git a/clientv3/concurrency/example_mutex_test.go b/clientv3/concurrency/example_mutex_test.go new file mode 120000 index 000000000..2968b42d4 --- /dev/null +++ b/clientv3/concurrency/example_mutex_test.go @@ -0,0 +1 @@ +../../tests/integration/clientv3/concurrency/example_mutex_test.go \ No newline at end of file diff --git a/clientv3/concurrency/example_stm_test.go b/clientv3/concurrency/example_stm_test.go new file mode 120000 index 000000000..b1f22c258 --- /dev/null +++ b/clientv3/concurrency/example_stm_test.go @@ -0,0 +1 @@ +../../tests/integration/clientv3/concurrency/example_stm_test.go \ No newline at end of file diff --git a/clientv3/concurrency/main_test.go b/clientv3/concurrency/main_test.go new file mode 100644 index 000000000..099fa904b --- /dev/null +++ b/clientv3/concurrency/main_test.go @@ -0,0 +1,35 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package concurrency_test + +import ( + "testing" + + "go.etcd.io/etcd/v3/pkg/testutil" +) + +func exampleEndpoints() []string { return nil } + +func forUnitTestsRunInMockedContext(mocking func(), example func()) { + mocking() + // TODO: Call 'example' when mocking() provides realistic mocking of transport. + + // The real testing logic of examples gets executed + // as part of ./tests/integration/clientv3/integration/... +} + +func TestMain(m *testing.M) { + testutil.MustTestMainWithLeakDetection(m) +} diff --git a/tests/integration/clientv3/concurrency/election_test.go b/tests/integration/clientv3/concurrency/election_test.go index a6e032419..ab0cb6709 100644 --- a/tests/integration/clientv3/concurrency/election_test.go +++ b/tests/integration/clientv3/concurrency/election_test.go @@ -28,7 +28,7 @@ import ( func TestResumeElection(t *testing.T) { const prefix = "/resume-election/" - cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) + cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()}) if err != nil { log.Fatal(err) } diff --git a/tests/integration/clientv3/concurrency/example_election_test.go b/tests/integration/clientv3/concurrency/example_election_test.go index e0a1c3e6e..1d6864cfb 100644 --- a/tests/integration/clientv3/concurrency/example_election_test.go +++ b/tests/integration/clientv3/concurrency/example_election_test.go @@ -25,64 +25,73 @@ import ( "go.etcd.io/etcd/v3/clientv3/concurrency" ) +func mockElection_Campaign() { + fmt.Println("completed first election with e2") + fmt.Println("completed second election with e1") +} + func ExampleElection_Campaign() { - cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) - if err != nil { - log.Fatal(err) - } - defer cli.Close() + forUnitTestsRunInMockedContext( + mockElection_Campaign, + func() { + cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()}) + if err != nil { + log.Fatal(err) + } + defer cli.Close() - // create two separate sessions for election competition - s1, err := concurrency.NewSession(cli) - if err != nil { - log.Fatal(err) - } - defer s1.Close() - e1 := concurrency.NewElection(s1, "/my-election/") + // create two separate sessions for election competition + s1, err := concurrency.NewSession(cli) + if err != nil { + log.Fatal(err) + } + defer s1.Close() + e1 := concurrency.NewElection(s1, "/my-election/") - s2, err := concurrency.NewSession(cli) - if err != nil { - log.Fatal(err) - } - defer s2.Close() - e2 := concurrency.NewElection(s2, "/my-election/") + s2, err := concurrency.NewSession(cli) + if err != nil { + log.Fatal(err) + } + defer s2.Close() + e2 := concurrency.NewElection(s2, "/my-election/") - // create competing candidates, with e1 initially losing to e2 - var wg sync.WaitGroup - wg.Add(2) - electc := make(chan *concurrency.Election, 2) - go func() { - defer wg.Done() - // delay candidacy so e2 wins first - time.Sleep(3 * time.Second) - if err := e1.Campaign(context.Background(), "e1"); err != nil { - log.Fatal(err) - } - electc <- e1 - }() - go func() { - defer wg.Done() - if err := e2.Campaign(context.Background(), "e2"); err != nil { - log.Fatal(err) - } - electc <- e2 - }() + // create competing candidates, with e1 initially losing to e2 + var wg sync.WaitGroup + wg.Add(2) + electc := make(chan *concurrency.Election, 2) + go func() { + defer wg.Done() + // delay candidacy so e2 wins first + time.Sleep(3 * time.Second) + if err := e1.Campaign(context.Background(), "e1"); err != nil { + log.Fatal(err) + } + electc <- e1 + }() + go func() { + defer wg.Done() + if err := e2.Campaign(context.Background(), "e2"); err != nil { + log.Fatal(err) + } + electc <- e2 + }() - cctx, cancel := context.WithCancel(context.TODO()) - defer cancel() + cctx, cancel := context.WithCancel(context.TODO()) + defer cancel() - e := <-electc - fmt.Println("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value)) + e := <-electc + fmt.Println("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value)) - // resign so next candidate can be elected - if err := e.Resign(context.TODO()); err != nil { - log.Fatal(err) - } + // resign so next candidate can be elected + if err := e.Resign(context.TODO()); err != nil { + log.Fatal(err) + } - e = <-electc - fmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value)) + e = <-electc + fmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value)) - wg.Wait() + wg.Wait() + }) // Output: // completed first election with e2 diff --git a/tests/integration/clientv3/concurrency/example_mutex_test.go b/tests/integration/clientv3/concurrency/example_mutex_test.go index a4cf19f2c..357806663 100644 --- a/tests/integration/clientv3/concurrency/example_mutex_test.go +++ b/tests/integration/clientv3/concurrency/example_mutex_test.go @@ -23,49 +23,60 @@ import ( "go.etcd.io/etcd/v3/clientv3/concurrency" ) -func ExampleMutex_TryLock() { - cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) - if err != nil { - log.Fatal(err) - } - defer cli.Close() - - // create two separate sessions for lock competition - s1, err := concurrency.NewSession(cli) - if err != nil { - log.Fatal(err) - } - defer s1.Close() - m1 := concurrency.NewMutex(s1, "/my-lock") - - s2, err := concurrency.NewSession(cli) - if err != nil { - log.Fatal(err) - } - defer s2.Close() - m2 := concurrency.NewMutex(s2, "/my-lock") - - // acquire lock for s1 - if err = m1.Lock(context.TODO()); err != nil { - log.Fatal(err) - } +func mockMutex_TryLock() { fmt.Println("acquired lock for s1") - - if err = m2.TryLock(context.TODO()); err == nil { - log.Fatal("should not acquire lock") - } - if err == concurrency.ErrLocked { - fmt.Println("cannot acquire lock for s2, as already locked in another session") - } - - if err = m1.Unlock(context.TODO()); err != nil { - log.Fatal(err) - } + fmt.Println("cannot acquire lock for s2, as already locked in another session") fmt.Println("released lock for s1") - if err = m2.TryLock(context.TODO()); err != nil { - log.Fatal(err) - } fmt.Println("acquired lock for s2") +} + +func ExampleMutex_TryLock() { + forUnitTestsRunInMockedContext( + mockMutex_TryLock, + func() { + cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()}) + if err != nil { + log.Fatal(err) + } + defer cli.Close() + + // create two separate sessions for lock competition + s1, err := concurrency.NewSession(cli) + if err != nil { + log.Fatal(err) + } + defer s1.Close() + m1 := concurrency.NewMutex(s1, "/my-lock") + + s2, err := concurrency.NewSession(cli) + if err != nil { + log.Fatal(err) + } + defer s2.Close() + m2 := concurrency.NewMutex(s2, "/my-lock") + + // acquire lock for s1 + if err = m1.Lock(context.TODO()); err != nil { + log.Fatal(err) + } + fmt.Println("acquired lock for s1") + + if err = m2.TryLock(context.TODO()); err == nil { + log.Fatal("should not acquire lock") + } + if err == concurrency.ErrLocked { + fmt.Println("cannot acquire lock for s2, as already locked in another session") + } + + if err = m1.Unlock(context.TODO()); err != nil { + log.Fatal(err) + } + fmt.Println("released lock for s1") + if err = m2.TryLock(context.TODO()); err != nil { + log.Fatal(err) + } + fmt.Println("acquired lock for s2") + }) // Output: // acquired lock for s1 @@ -74,50 +85,60 @@ func ExampleMutex_TryLock() { // acquired lock for s2 } -func ExampleMutex_Lock() { - cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) - if err != nil { - log.Fatal(err) - } - defer cli.Close() - - // create two separate sessions for lock competition - s1, err := concurrency.NewSession(cli) - if err != nil { - log.Fatal(err) - } - defer s1.Close() - m1 := concurrency.NewMutex(s1, "/my-lock/") - - s2, err := concurrency.NewSession(cli) - if err != nil { - log.Fatal(err) - } - defer s2.Close() - m2 := concurrency.NewMutex(s2, "/my-lock/") - - // acquire lock for s1 - if err := m1.Lock(context.TODO()); err != nil { - log.Fatal(err) - } +func mockMutex_Lock() { fmt.Println("acquired lock for s1") - - m2Locked := make(chan struct{}) - go func() { - defer close(m2Locked) - // wait until s1 is locks /my-lock/ - if err := m2.Lock(context.TODO()); err != nil { - log.Fatal(err) - } - }() - - if err := m1.Unlock(context.TODO()); err != nil { - log.Fatal(err) - } fmt.Println("released lock for s1") - - <-m2Locked fmt.Println("acquired lock for s2") +} + +func ExampleMutex_Lock() { + forUnitTestsRunInMockedContext( + mockMutex_Lock, + func() { + cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()}) + if err != nil { + log.Fatal(err) + } + defer cli.Close() + + // create two separate sessions for lock competition + s1, err := concurrency.NewSession(cli) + if err != nil { + log.Fatal(err) + } + defer s1.Close() + m1 := concurrency.NewMutex(s1, "/my-lock/") + + s2, err := concurrency.NewSession(cli) + if err != nil { + log.Fatal(err) + } + defer s2.Close() + m2 := concurrency.NewMutex(s2, "/my-lock/") + + // acquire lock for s1 + if err := m1.Lock(context.TODO()); err != nil { + log.Fatal(err) + } + fmt.Println("acquired lock for s1") + + m2Locked := make(chan struct{}) + go func() { + defer close(m2Locked) + // wait until s1 is locks /my-lock/ + if err := m2.Lock(context.TODO()); err != nil { + log.Fatal(err) + } + }() + + if err := m1.Unlock(context.TODO()); err != nil { + log.Fatal(err) + } + fmt.Println("released lock for s1") + + <-m2Locked + fmt.Println("acquired lock for s2") + }) // Output: // acquired lock for s1 diff --git a/tests/integration/clientv3/concurrency/example_stm_test.go b/tests/integration/clientv3/concurrency/example_stm_test.go index bdcfab4a1..a66fbabe2 100644 --- a/tests/integration/clientv3/concurrency/example_stm_test.go +++ b/tests/integration/clientv3/concurrency/example_stm_test.go @@ -25,73 +25,81 @@ import ( "go.etcd.io/etcd/v3/clientv3/concurrency" ) +func mockSTM_apply() { + fmt.Println("account sum is 500") +} + // ExampleSTM_apply shows how to use STM with a transactional // transfer between balances. func ExampleSTM_apply() { - cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) - if err != nil { - log.Fatal(err) - } - defer cli.Close() - - // set up "accounts" - totalAccounts := 5 - for i := 0; i < totalAccounts; i++ { - k := fmt.Sprintf("accts/%d", i) - if _, err = cli.Put(context.TODO(), k, "100"); err != nil { - log.Fatal(err) - } - } - - exchange := func(stm concurrency.STM) error { - from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts) - if from == to { - // nothing to do - return nil - } - // read values - fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to) - fromV, toV := stm.Get(fromK), stm.Get(toK) - fromInt, toInt := 0, 0 - fmt.Sscanf(fromV, "%d", &fromInt) - fmt.Sscanf(toV, "%d", &toInt) - - // transfer amount - xfer := fromInt / 2 - fromInt, toInt = fromInt-xfer, toInt+xfer - - // write back - stm.Put(fromK, fmt.Sprintf("%d", fromInt)) - stm.Put(toK, fmt.Sprintf("%d", toInt)) - return nil - } - - // concurrently exchange values between accounts - var wg sync.WaitGroup - wg.Add(10) - for i := 0; i < 10; i++ { - go func() { - defer wg.Done() - if _, serr := concurrency.NewSTM(cli, exchange); serr != nil { - log.Fatal(serr) + forUnitTestsRunInMockedContext( + mockSTM_apply, + func() { + cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()}) + if err != nil { + log.Fatal(err) } - }() - } - wg.Wait() + defer cli.Close() - // confirm account sum matches sum from beginning. - sum := 0 - accts, err := cli.Get(context.TODO(), "accts/", clientv3.WithPrefix()) - if err != nil { - log.Fatal(err) - } - for _, kv := range accts.Kvs { - v := 0 - fmt.Sscanf(string(kv.Value), "%d", &v) - sum += v - } + // set up "accounts" + totalAccounts := 5 + for i := 0; i < totalAccounts; i++ { + k := fmt.Sprintf("accts/%d", i) + if _, err = cli.Put(context.TODO(), k, "100"); err != nil { + log.Fatal(err) + } + } - fmt.Println("account sum is", sum) + exchange := func(stm concurrency.STM) error { + from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts) + if from == to { + // nothing to do + return nil + } + // read values + fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to) + fromV, toV := stm.Get(fromK), stm.Get(toK) + fromInt, toInt := 0, 0 + fmt.Sscanf(fromV, "%d", &fromInt) + fmt.Sscanf(toV, "%d", &toInt) + + // transfer amount + xfer := fromInt / 2 + fromInt, toInt = fromInt-xfer, toInt+xfer + + // write back + stm.Put(fromK, fmt.Sprintf("%d", fromInt)) + stm.Put(toK, fmt.Sprintf("%d", toInt)) + return nil + } + + // concurrently exchange values between accounts + var wg sync.WaitGroup + wg.Add(10) + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + if _, serr := concurrency.NewSTM(cli, exchange); serr != nil { + log.Fatal(serr) + } + }() + } + wg.Wait() + + // confirm account sum matches sum from beginning. + sum := 0 + accts, err := cli.Get(context.TODO(), "accts/", clientv3.WithPrefix()) + if err != nil { + log.Fatal(err) + } + for _, kv := range accts.Kvs { + v := 0 + fmt.Sscanf(string(kv.Value), "%d", &v) + sum += v + } + + fmt.Println("account sum is", sum) + }) // Output: // account sum is 500 } diff --git a/tests/integration/clientv3/concurrency/main_test.go b/tests/integration/clientv3/concurrency/main_test.go index c3d5bfc93..f714c0f37 100644 --- a/tests/integration/clientv3/concurrency/main_test.go +++ b/tests/integration/clientv3/concurrency/main_test.go @@ -15,30 +15,28 @@ package concurrency_test import ( - "fmt" "os" "testing" - "time" "go.etcd.io/etcd/tests/v3/integration" "go.etcd.io/etcd/v3/pkg/testutil" ) -var endpoints []string +var lazyCluster = integration.NewLazyCluster() -// TestMain sets up an etcd cluster for running the examples. +func exampleEndpoints() []string { return lazyCluster.EndpointsV3() } + +func forUnitTestsRunInMockedContext(mocking func(), example func()) { + // For integration tests runs in the provided environment + example() +} + +// TestMain sets up an etcd cluster if running the examples. func TestMain(m *testing.M) { - cfg := integration.ClusterConfig{Size: 1} - clus := integration.NewClusterV3(nil, &cfg) - endpoints = []string{clus.Client(0).Endpoints()[0]} v := m.Run() - clus.Terminate(nil) - if err := testutil.CheckAfterTest(time.Second); err != nil { - fmt.Fprintf(os.Stderr, "%v", err) - os.Exit(1) - } - if v == 0 && testutil.CheckLeakedGoroutine() { - os.Exit(1) + lazyCluster.Terminate() + if v == 0 { + testutil.MustCheckLeakedGoroutine() } os.Exit(v) } diff --git a/tests/integration/clientv3/concurrency/mutex_test.go b/tests/integration/clientv3/concurrency/mutex_test.go index b9f7e4160..e6b3c3466 100644 --- a/tests/integration/clientv3/concurrency/mutex_test.go +++ b/tests/integration/clientv3/concurrency/mutex_test.go @@ -23,7 +23,7 @@ import ( ) func TestMutexLockSessionExpired(t *testing.T) { - cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) + cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()}) if err != nil { t.Fatal(err) }