clientv3/concurrency: Expose examples close to the source-code.

This commit is contained in:
Piotr Tabor 2020-10-06 14:19:02 +02:00
parent c5ccebf792
commit dd45d04b2d
10 changed files with 281 additions and 207 deletions

View File

@ -0,0 +1 @@
../../tests/integration/clientv3/concurrency/example_election_test.go

View File

@ -0,0 +1 @@
../../tests/integration/clientv3/concurrency/example_mutex_test.go

View File

@ -0,0 +1 @@
../../tests/integration/clientv3/concurrency/example_stm_test.go

View File

@ -0,0 +1,35 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package concurrency_test
import (
"testing"
"go.etcd.io/etcd/v3/pkg/testutil"
)
func exampleEndpoints() []string { return nil }
func forUnitTestsRunInMockedContext(mocking func(), example func()) {
mocking()
// TODO: Call 'example' when mocking() provides realistic mocking of transport.
// The real testing logic of examples gets executed
// as part of ./tests/integration/clientv3/integration/...
}
func TestMain(m *testing.M) {
testutil.MustTestMainWithLeakDetection(m)
}

View File

@ -28,7 +28,7 @@ import (
func TestResumeElection(t *testing.T) { func TestResumeElection(t *testing.T) {
const prefix = "/resume-election/" const prefix = "/resume-election/"
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -25,64 +25,73 @@ import (
"go.etcd.io/etcd/v3/clientv3/concurrency" "go.etcd.io/etcd/v3/clientv3/concurrency"
) )
func mockElection_Campaign() {
fmt.Println("completed first election with e2")
fmt.Println("completed second election with e1")
}
func ExampleElection_Campaign() { func ExampleElection_Campaign() {
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) forUnitTestsRunInMockedContext(
if err != nil { mockElection_Campaign,
log.Fatal(err) func() {
} cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
defer cli.Close() if err != nil {
log.Fatal(err)
}
defer cli.Close()
// create two separate sessions for election competition // create two separate sessions for election competition
s1, err := concurrency.NewSession(cli) s1, err := concurrency.NewSession(cli)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer s1.Close() defer s1.Close()
e1 := concurrency.NewElection(s1, "/my-election/") e1 := concurrency.NewElection(s1, "/my-election/")
s2, err := concurrency.NewSession(cli) s2, err := concurrency.NewSession(cli)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer s2.Close() defer s2.Close()
e2 := concurrency.NewElection(s2, "/my-election/") e2 := concurrency.NewElection(s2, "/my-election/")
// create competing candidates, with e1 initially losing to e2 // create competing candidates, with e1 initially losing to e2
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(2) wg.Add(2)
electc := make(chan *concurrency.Election, 2) electc := make(chan *concurrency.Election, 2)
go func() { go func() {
defer wg.Done() defer wg.Done()
// delay candidacy so e2 wins first // delay candidacy so e2 wins first
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
if err := e1.Campaign(context.Background(), "e1"); err != nil { if err := e1.Campaign(context.Background(), "e1"); err != nil {
log.Fatal(err) log.Fatal(err)
} }
electc <- e1 electc <- e1
}() }()
go func() { go func() {
defer wg.Done() defer wg.Done()
if err := e2.Campaign(context.Background(), "e2"); err != nil { if err := e2.Campaign(context.Background(), "e2"); err != nil {
log.Fatal(err) log.Fatal(err)
} }
electc <- e2 electc <- e2
}() }()
cctx, cancel := context.WithCancel(context.TODO()) cctx, cancel := context.WithCancel(context.TODO())
defer cancel() defer cancel()
e := <-electc e := <-electc
fmt.Println("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value)) fmt.Println("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value))
// resign so next candidate can be elected // resign so next candidate can be elected
if err := e.Resign(context.TODO()); err != nil { if err := e.Resign(context.TODO()); err != nil {
log.Fatal(err) log.Fatal(err)
} }
e = <-electc e = <-electc
fmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value)) fmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value))
wg.Wait() wg.Wait()
})
// Output: // Output:
// completed first election with e2 // completed first election with e2

View File

@ -23,49 +23,60 @@ import (
"go.etcd.io/etcd/v3/clientv3/concurrency" "go.etcd.io/etcd/v3/clientv3/concurrency"
) )
func ExampleMutex_TryLock() { func mockMutex_TryLock() {
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// create two separate sessions for lock competition
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock")
s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/my-lock")
// acquire lock for s1
if err = m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1") fmt.Println("acquired lock for s1")
fmt.Println("cannot acquire lock for s2, as already locked in another session")
if err = m2.TryLock(context.TODO()); err == nil {
log.Fatal("should not acquire lock")
}
if err == concurrency.ErrLocked {
fmt.Println("cannot acquire lock for s2, as already locked in another session")
}
if err = m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1") fmt.Println("released lock for s1")
if err = m2.TryLock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s2") fmt.Println("acquired lock for s2")
}
func ExampleMutex_TryLock() {
forUnitTestsRunInMockedContext(
mockMutex_TryLock,
func() {
cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// create two separate sessions for lock competition
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock")
s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/my-lock")
// acquire lock for s1
if err = m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")
if err = m2.TryLock(context.TODO()); err == nil {
log.Fatal("should not acquire lock")
}
if err == concurrency.ErrLocked {
fmt.Println("cannot acquire lock for s2, as already locked in another session")
}
if err = m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1")
if err = m2.TryLock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s2")
})
// Output: // Output:
// acquired lock for s1 // acquired lock for s1
@ -74,50 +85,60 @@ func ExampleMutex_TryLock() {
// acquired lock for s2 // acquired lock for s2
} }
func ExampleMutex_Lock() { func mockMutex_Lock() {
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// create two separate sessions for lock competition
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock/")
s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/my-lock/")
// acquire lock for s1
if err := m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1") fmt.Println("acquired lock for s1")
m2Locked := make(chan struct{})
go func() {
defer close(m2Locked)
// wait until s1 is locks /my-lock/
if err := m2.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
}()
if err := m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1") fmt.Println("released lock for s1")
<-m2Locked
fmt.Println("acquired lock for s2") fmt.Println("acquired lock for s2")
}
func ExampleMutex_Lock() {
forUnitTestsRunInMockedContext(
mockMutex_Lock,
func() {
cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// create two separate sessions for lock competition
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock/")
s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/my-lock/")
// acquire lock for s1
if err := m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")
m2Locked := make(chan struct{})
go func() {
defer close(m2Locked)
// wait until s1 is locks /my-lock/
if err := m2.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
}()
if err := m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1")
<-m2Locked
fmt.Println("acquired lock for s2")
})
// Output: // Output:
// acquired lock for s1 // acquired lock for s1

View File

@ -25,73 +25,81 @@ import (
"go.etcd.io/etcd/v3/clientv3/concurrency" "go.etcd.io/etcd/v3/clientv3/concurrency"
) )
func mockSTM_apply() {
fmt.Println("account sum is 500")
}
// ExampleSTM_apply shows how to use STM with a transactional // ExampleSTM_apply shows how to use STM with a transactional
// transfer between balances. // transfer between balances.
func ExampleSTM_apply() { func ExampleSTM_apply() {
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) forUnitTestsRunInMockedContext(
if err != nil { mockSTM_apply,
log.Fatal(err) func() {
} cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
defer cli.Close() if err != nil {
log.Fatal(err)
// set up "accounts"
totalAccounts := 5
for i := 0; i < totalAccounts; i++ {
k := fmt.Sprintf("accts/%d", i)
if _, err = cli.Put(context.TODO(), k, "100"); err != nil {
log.Fatal(err)
}
}
exchange := func(stm concurrency.STM) error {
from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts)
if from == to {
// nothing to do
return nil
}
// read values
fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to)
fromV, toV := stm.Get(fromK), stm.Get(toK)
fromInt, toInt := 0, 0
fmt.Sscanf(fromV, "%d", &fromInt)
fmt.Sscanf(toV, "%d", &toInt)
// transfer amount
xfer := fromInt / 2
fromInt, toInt = fromInt-xfer, toInt+xfer
// write back
stm.Put(fromK, fmt.Sprintf("%d", fromInt))
stm.Put(toK, fmt.Sprintf("%d", toInt))
return nil
}
// concurrently exchange values between accounts
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
if _, serr := concurrency.NewSTM(cli, exchange); serr != nil {
log.Fatal(serr)
} }
}() defer cli.Close()
}
wg.Wait()
// confirm account sum matches sum from beginning. // set up "accounts"
sum := 0 totalAccounts := 5
accts, err := cli.Get(context.TODO(), "accts/", clientv3.WithPrefix()) for i := 0; i < totalAccounts; i++ {
if err != nil { k := fmt.Sprintf("accts/%d", i)
log.Fatal(err) if _, err = cli.Put(context.TODO(), k, "100"); err != nil {
} log.Fatal(err)
for _, kv := range accts.Kvs { }
v := 0 }
fmt.Sscanf(string(kv.Value), "%d", &v)
sum += v
}
fmt.Println("account sum is", sum) exchange := func(stm concurrency.STM) error {
from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts)
if from == to {
// nothing to do
return nil
}
// read values
fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to)
fromV, toV := stm.Get(fromK), stm.Get(toK)
fromInt, toInt := 0, 0
fmt.Sscanf(fromV, "%d", &fromInt)
fmt.Sscanf(toV, "%d", &toInt)
// transfer amount
xfer := fromInt / 2
fromInt, toInt = fromInt-xfer, toInt+xfer
// write back
stm.Put(fromK, fmt.Sprintf("%d", fromInt))
stm.Put(toK, fmt.Sprintf("%d", toInt))
return nil
}
// concurrently exchange values between accounts
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
if _, serr := concurrency.NewSTM(cli, exchange); serr != nil {
log.Fatal(serr)
}
}()
}
wg.Wait()
// confirm account sum matches sum from beginning.
sum := 0
accts, err := cli.Get(context.TODO(), "accts/", clientv3.WithPrefix())
if err != nil {
log.Fatal(err)
}
for _, kv := range accts.Kvs {
v := 0
fmt.Sscanf(string(kv.Value), "%d", &v)
sum += v
}
fmt.Println("account sum is", sum)
})
// Output: // Output:
// account sum is 500 // account sum is 500
} }

View File

@ -15,30 +15,28 @@
package concurrency_test package concurrency_test
import ( import (
"fmt"
"os" "os"
"testing" "testing"
"time"
"go.etcd.io/etcd/tests/v3/integration" "go.etcd.io/etcd/tests/v3/integration"
"go.etcd.io/etcd/v3/pkg/testutil" "go.etcd.io/etcd/v3/pkg/testutil"
) )
var endpoints []string var lazyCluster = integration.NewLazyCluster()
// TestMain sets up an etcd cluster for running the examples. func exampleEndpoints() []string { return lazyCluster.EndpointsV3() }
func forUnitTestsRunInMockedContext(mocking func(), example func()) {
// For integration tests runs in the provided environment
example()
}
// TestMain sets up an etcd cluster if running the examples.
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
cfg := integration.ClusterConfig{Size: 1}
clus := integration.NewClusterV3(nil, &cfg)
endpoints = []string{clus.Client(0).Endpoints()[0]}
v := m.Run() v := m.Run()
clus.Terminate(nil) lazyCluster.Terminate()
if err := testutil.CheckAfterTest(time.Second); err != nil { if v == 0 {
fmt.Fprintf(os.Stderr, "%v", err) testutil.MustCheckLeakedGoroutine()
os.Exit(1)
}
if v == 0 && testutil.CheckLeakedGoroutine() {
os.Exit(1)
} }
os.Exit(v) os.Exit(v)
} }

View File

@ -23,7 +23,7 @@ import (
) )
func TestMutexLockSessionExpired(t *testing.T) { func TestMutexLockSessionExpired(t *testing.T) {
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) cli, err := clientv3.New(clientv3.Config{Endpoints: exampleEndpoints()})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }