diff --git a/clientv3/concurrency/example_election_test.go b/clientv3/concurrency/example_election_test.go new file mode 100644 index 000000000..5cce9f490 --- /dev/null +++ b/clientv3/concurrency/example_election_test.go @@ -0,0 +1,90 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package concurrency_test + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" +) + +func ExampleElection_Campaign() { + cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) + if err != nil { + log.Fatal(err) + } + defer cli.Close() + + // create two separate sessions for election competition + s1, err := concurrency.NewSession(cli) + if err != nil { + log.Fatal(err) + } + defer s1.Close() + e1 := concurrency.NewElection(s1, "/my-election/") + + s2, err := concurrency.NewSession(cli) + if err != nil { + log.Fatal(err) + } + defer s2.Close() + e2 := concurrency.NewElection(s2, "/my-election/") + + // create competing candidates, with e1 initially losing to e2 + var wg sync.WaitGroup + wg.Add(2) + electc := make(chan *concurrency.Election, 2) + go func() { + defer wg.Done() + // delay candidacy so e2 wins first + time.Sleep(3 * time.Second) + if err := e1.Campaign(context.Background(), "e1"); err != nil { + log.Fatal(err) + } + electc <- e1 + }() + go func() { + defer wg.Done() + if err := e2.Campaign(context.Background(), "e2"); err != nil { + log.Fatal(err) + } + electc <- e2 + }() + + cctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + e := <-electc + fmt.Println("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value)) + + // resign so next candidate can be elected + if err := e.Resign(context.TODO()); err != nil { + log.Fatal(err) + } + + e = <-electc + fmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value)) + + wg.Wait() + + // Output: + // completed first election with e2 + // completed second election with e1 +} diff --git a/clientv3/concurrency/example_mutex_test.go b/clientv3/concurrency/example_mutex_test.go new file mode 100644 index 000000000..c9a878806 --- /dev/null +++ b/clientv3/concurrency/example_mutex_test.go @@ -0,0 +1,75 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package concurrency_test + +import ( + "context" + "fmt" + "log" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" +) + +func ExampleMutex_Lock() { + cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) + if err != nil { + log.Fatal(err) + } + defer cli.Close() + + // create two separate sessions for lock competition + s1, err := concurrency.NewSession(cli) + if err != nil { + log.Fatal(err) + } + defer s1.Close() + m1 := concurrency.NewMutex(s1, "/my-lock/") + + s2, err := concurrency.NewSession(cli) + if err != nil { + log.Fatal(err) + } + defer s2.Close() + m2 := concurrency.NewMutex(s2, "/my-lock/") + + // acquire lock for s1 + if err := m1.Lock(context.TODO()); err != nil { + log.Fatal(err) + } + fmt.Println("acquired lock for s1") + + m2Locked := make(chan struct{}) + go func() { + defer close(m2Locked) + // wait until s1 is locks /my-lock/ + if err := m2.Lock(context.TODO()); err != nil { + log.Fatal(err) + } + }() + + if err := m1.Unlock(context.TODO()); err != nil { + log.Fatal(err) + } + fmt.Println("released lock for s1") + + <-m2Locked + fmt.Println("acquired lock for s2") + + // Output: + // acquired lock for s1 + // released lock for s1 + // acquired lock for s2 +} diff --git a/clientv3/concurrency/example_stm_test.go b/clientv3/concurrency/example_stm_test.go new file mode 100644 index 000000000..d49862c7d --- /dev/null +++ b/clientv3/concurrency/example_stm_test.go @@ -0,0 +1,97 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package concurrency_test + +import ( + "context" + "fmt" + "log" + "math/rand" + "sync" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" +) + +// ExampleSTM_apply shows how to use STM with a transactional +// transfer between balances. +func ExampleSTM_apply() { + cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) + if err != nil { + log.Fatal(err) + } + defer cli.Close() + + // set up "accounts" + totalAccounts := 5 + for i := 0; i < totalAccounts; i++ { + k := fmt.Sprintf("accts/%d", i) + if _, err = cli.Put(context.TODO(), k, "100"); err != nil { + log.Fatal(err) + } + } + + exchange := func(stm concurrency.STM) error { + from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts) + if from == to { + // nothing to do + return nil + } + // read values + fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to) + fromV, toV := stm.Get(fromK), stm.Get(toK) + fromInt, toInt := 0, 0 + fmt.Sscanf(fromV, "%d", &fromInt) + fmt.Sscanf(toV, "%d", &toInt) + + // transfer amount + xfer := fromInt / 2 + fromInt, toInt = fromInt-xfer, toInt-xfer + + // writeback + stm.Put(fromK, fmt.Sprintf("%d", fromInt)) + stm.Put(toK, fmt.Sprintf("%d", toInt)) + return nil + } + + // concurrently exchange values between accounts + var wg sync.WaitGroup + wg.Add(10) + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + if _, serr := concurrency.NewSTM(cli, exchange); serr != nil { + log.Fatal(serr) + } + }() + } + wg.Wait() + + // confirm account sum matches sum from beginning. + sum := 0 + accts, err := cli.Get(context.TODO(), "accts/", clientv3.WithPrefix()) + if err != nil { + log.Fatal(err) + } + for _, kv := range accts.Kvs { + v := 0 + fmt.Sscanf(string(kv.Value), "%d", &v) + sum += v + } + + fmt.Println("account sum is", sum) + // Output: + // account sum is 500 +} diff --git a/clientv3/concurrency/main_test.go b/clientv3/concurrency/main_test.go new file mode 100644 index 000000000..797fe9bd1 --- /dev/null +++ b/clientv3/concurrency/main_test.go @@ -0,0 +1,44 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package concurrency_test + +import ( + "fmt" + "os" + "testing" + "time" + + "github.com/coreos/etcd/integration" + "github.com/coreos/etcd/pkg/testutil" +) + +var endpoints []string + +// TestMain sets up an etcd cluster for running the examples. +func TestMain(m *testing.M) { + cfg := integration.ClusterConfig{Size: 1} + clus := integration.NewClusterV3(nil, &cfg) + endpoints = []string{clus.Client(0).Endpoints()[0]} + v := m.Run() + clus.Terminate(nil) + if err := testutil.CheckAfterTest(time.Second); err != nil { + fmt.Fprintf(os.Stderr, "%v", err) + os.Exit(1) + } + if v == 0 && testutil.CheckLeakedGoroutine() { + os.Exit(1) + } + os.Exit(v) +}