From 2ba69de2815e6d37ec8ee30b26320eeadcecff54 Mon Sep 17 00:00:00 2001 From: Patrice Chalin Date: Tue, 6 Apr 2021 15:21:01 -0400 Subject: [PATCH] Contrib lock example --- contrib/lock/README.md | 61 ++++++++++ contrib/lock/client/.gitignore | 1 + contrib/lock/client/client.go | 206 ++++++++++++++++++++++++++++++++ contrib/lock/storage/.gitignore | 1 + contrib/lock/storage/storage.go | 101 ++++++++++++++++ 5 files changed, 370 insertions(+) create mode 100644 contrib/lock/README.md create mode 100644 contrib/lock/client/.gitignore create mode 100644 contrib/lock/client/client.go create mode 100644 contrib/lock/storage/.gitignore create mode 100644 contrib/lock/storage/storage.go diff --git a/contrib/lock/README.md b/contrib/lock/README.md new file mode 100644 index 000000000..e1e408a0a --- /dev/null +++ b/contrib/lock/README.md @@ -0,0 +1,61 @@ +# What is this? +This directory provides an executable example of the scenarios described in [the article by Martin Kleppmann][fencing]. + +Generally speaking, a lease-based lock service cannot provide mutual exclusion to processes. This is because such a lease mechanism depends on the physical clock of both the lock service and client processes. Many factors (e.g. stop-the-world GC pause of a language runtime) can cause false expiration of a granted lease as depicted in the below figure: ![unsafe lock][unsafe-lock] + +As discussed in [notes on the usage of lock and lease][why], such a problem can be solved with a technique called version number validation or fencing tokens. With this technique a shared resource (storage in the figures) needs to validate requests from clients based on their tokens like this: ![fencing tokens][fencing-tokens] + +This directory contains two programs: `client` and `storage`. With `etcd`, you can reproduce the expired lease problem of distributed locking and a simple example solution of the validation technique which can avoid incorrect access from a client with an expired lease. + +`storage` works as a very simple key value in-memory store which is accessible through HTTP and a custom JSON protocol. `client` works as client processes which tries to write a key/value to `storage` with coordination of etcd locking. + +## How to build + +For building `client` and `storage`, just execute `go build` in each directory. + +## How to try + +At first you need to start an etcd cluster, which works as lock service in the figures. On top of the etcd source directory, execute commands like below: +``` +$ ./build # build etcd +$ goreman start +``` + +Then run `storage` command in `storage` directory: +``` +$ ./storage +``` + +Now client processes ("Client 1" and "Client 2" in the figures) can be started. At first, execute below command for starting a client process which corresponds to "Client 1": +``` +$ GODEBUG=gcstoptheworld=2 ./client 1 +``` +It will show an output like this: +``` +client 1 starts +creted etcd client +acquired lock, version: 1029195466614598192 +took 6.771998255s for allocation, took 36.217205ms for GC +emulated stop the world GC, make sure the /lock/* key disappeared and hit any key after executing client 2: +``` +The process causes stop the world GC pause for making lease expiration intentionally and waits a keyboard input. Now another client process can be started like this: +``` +$ ./client 2 +client 2 starts +creted etcd client +acquired lock, version: 4703569812595502727 +this is client 2, continuing +``` +If things go well the second client process invoked as `./client 2` finishes soon. It successfully writes a key to `storage` process. After checking this, please hit any key for `./client 1` and resume the process. It will show an output like below: +``` +resuming client 1 +failed to write to storage: error: given version (4703569812595502721) differ from the existing version (4703569812595502727) +``` + +### Notes on the parameters related to stop the world GC pause +`client` program includes two constant values: `nrGarbageObjects` and `sessionTTL`. These parameters are configured for causing lease expiration with stop the world GC pause of go runtime. They heavily rely on resources of a machine for executing the example. If lease expiration doesn't happen on your machine, update these parameters and try again. + +[fencing]: https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html +[fencing-tokens]: https://martin.kleppmann.com/2016/02/fencing-tokens.png +[unsafe-lock]: https://martin.kleppmann.com/2016/02/unsafe-lock.png +[why]: https://etcd.io/docs/next/learning/why/#notes-on-the-usage-of-lock-and-lease diff --git a/contrib/lock/client/.gitignore b/contrib/lock/client/.gitignore new file mode 100644 index 000000000..2a11f8b95 --- /dev/null +++ b/contrib/lock/client/.gitignore @@ -0,0 +1 @@ +client \ No newline at end of file diff --git a/contrib/lock/client/client.go b/contrib/lock/client/client.go new file mode 100644 index 000000000..b5c42f0cb --- /dev/null +++ b/contrib/lock/client/client.go @@ -0,0 +1,206 @@ +// Copyright 2020 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// An example distributed locking with fencing in the case of etcd +// Based on https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html + +// Important usage: +// If you are invoking this program as client 1, you need to configure GODEBUG env var like below: +// GODEBUG=gcstoptheworld=2 ./client 1 + +package main + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" + "io/ioutil" + "net/http" + "os" + "runtime" + "strconv" + "time" +) + +type node struct { + next *node +} + +const ( + // These const values might be need adjustment. + nrGarbageObjects = 100 * 1000 * 1000 + sessionTTL = 1 +) + +func stopTheWorld() { + n := new(node) + root := n + allocStart := time.Now() + for i := 0; i < nrGarbageObjects; i++ { + n.next = new(node) + n = n.next + } + func(n *node) {}(root) // dummy usage of root for removing a compiler error + root = nil + allocDur := time.Since(allocStart) + + gcStart := time.Now() + runtime.GC() + gcDur := time.Since(gcStart) + fmt.Printf("took %v for allocation, took %v for GC\n", allocDur, gcDur) +} + +type request struct { + Op string `json:"op"` + Key string `json:"key"` + Val string `json:"val"` + Version int64 `json:"version"` +} + +type response struct { + Val string `json:"val"` + Version int64 `json:"version"` + Err string `json:"err"` +} + +func write(key string, value string, version int64) error { + req := request{ + Op: "write", + Key: key, + Val: value, + Version: version, + } + + reqBytes, err := json.Marshal(&req) + if err != nil { + fmt.Printf("failed to marshal request: %s\n", err) + os.Exit(1) + } + + httpResp, err := http.Post("http://localhost:8080", "application/json", bytes.NewReader(reqBytes)) + if err != nil { + fmt.Printf("failed to send a request to storage: %s\n", err) + os.Exit(1) + } + + respBytes, err := ioutil.ReadAll(httpResp.Body) + if err != nil { + fmt.Printf("failed to read request body: %s\n", err) + os.Exit(1) + } + + resp := new(response) + err = json.Unmarshal(respBytes, resp) + if err != nil { + fmt.Printf("failed to unmarshal response json: %s\n", err) + os.Exit(1) + } + + if resp.Err != "" { + return fmt.Errorf("error: %s", resp.Err) + } + + return nil +} + +func read(key string) (string, int64) { + req := request{ + Op: "read", + Key: key, + } + + reqBytes, err := json.Marshal(&req) + if err != nil { + fmt.Printf("failed to marshal request: %s\n", err) + os.Exit(1) + } + + httpResp, err := http.Post("http://localhost:8080", "application/json", bytes.NewReader(reqBytes)) + if err != nil { + fmt.Printf("failed to send a request to storage: %s\n", err) + os.Exit(1) + } + + respBytes, err := ioutil.ReadAll(httpResp.Body) + if err != nil { + fmt.Printf("failed to read request body: %s\n", err) + os.Exit(1) + } + + resp := new(response) + err = json.Unmarshal(respBytes, resp) + if err != nil { + fmt.Printf("failed to unmarshal response json: %s\n", err) + os.Exit(1) + } + + return resp.Val, resp.Version +} + +func main() { + if len(os.Args) != 2 { + fmt.Printf("usage: %s <1 or 2>\n", os.Args[0]) + return + } + + mode, err := strconv.Atoi(os.Args[1]) + if err != nil || mode != 1 && mode != 2 { + fmt.Printf("mode should be 1 or 2 (given value is %s)\n", os.Args[1]) + return + } + + fmt.Printf("client %d starts\n", mode) + + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{"http://127.0.0.1:2379", "http://127.0.0.1:22379", "http://127.0.0.1:32379"}, + }) + if err != nil { + fmt.Printf("failed to create an etcd client: %s\n", err) + os.Exit(1) + } + + fmt.Printf("creted etcd client\n") + + session, err := concurrency.NewSession(client, concurrency.WithTTL(sessionTTL)) + if err != nil { + fmt.Printf("failed to create a session: %s\n", err) + os.Exit(1) + } + + locker := concurrency.NewLocker(session, "/lock") + locker.Lock() + defer locker.Unlock() + version := session.Lease() + fmt.Printf("acquired lock, version: %d\n", version) + + if mode == 1 { + stopTheWorld() + fmt.Printf("emulated stop the world GC, make sure the /lock/* key disappeared and hit any key after executing client 2: ") + reader := bufio.NewReader(os.Stdin) + reader.ReadByte() + fmt.Printf("resuming client 1\n") + } else { + fmt.Printf("this is client 2, continuing\n") + } + + err = write("key0", fmt.Sprintf("value from client %d", mode), int64(version)) + if err != nil { + fmt.Printf("failed to write to storage: %s\n", err) // client 1 should show this message + } else { + fmt.Printf("successfully write a key to storage\n") + } +} diff --git a/contrib/lock/storage/.gitignore b/contrib/lock/storage/.gitignore new file mode 100644 index 000000000..5d252d7c9 --- /dev/null +++ b/contrib/lock/storage/.gitignore @@ -0,0 +1 @@ +storage \ No newline at end of file diff --git a/contrib/lock/storage/storage.go b/contrib/lock/storage/storage.go new file mode 100644 index 000000000..66ed6a7f2 --- /dev/null +++ b/contrib/lock/storage/storage.go @@ -0,0 +1,101 @@ +// Copyright 2020 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os" + "strings" +) + +type value struct { + val string + version int64 +} + +var data = make(map[string]*value) + +type request struct { + Op string `json:"op"` + Key string `json:"key"` + Val string `json:"val"` + Version int64 `json:"version"` +} + +type response struct { + Val string `json:"val"` + Version int64 `json:"version"` + Err string `json:"err"` +} + +func writeResponse(resp response, w http.ResponseWriter) { + wBytes, err := json.Marshal(resp) + if err != nil { + fmt.Printf("failed to marshal json: %s\n", err) + os.Exit(1) + } + _, err = w.Write(wBytes) + if err != nil { + fmt.Printf("failed to write a response: %s\n", err) + os.Exit(1) + } +} + +func handler(w http.ResponseWriter, r *http.Request) { + rBytes, err := ioutil.ReadAll(r.Body) + if err != nil { + fmt.Printf("failed to read http request: %s\n", err) + os.Exit(1) + } + + var req request + err = json.Unmarshal(rBytes, &req) + if err != nil { + fmt.Printf("failed to unmarshal json: %s\n", err) + os.Exit(1) + } + + if strings.Compare(req.Op, "read") == 0 { + if val, ok := data[req.Key]; ok { + writeResponse(response{val.val, val.version, ""}, w) + } else { + writeResponse(response{"", -1, "key not found"}, w) + } + } else if strings.Compare(req.Op, "write") == 0 { + if val, ok := data[req.Key]; ok { + if req.Version != val.version { + writeResponse(response{"", -1, fmt.Sprintf("given version (%d) is different from the existing version (%d)", req.Version, val.version)}, w) + } else { + data[req.Key].val = req.Val + data[req.Key].version = req.Version + writeResponse(response{req.Val, req.Version, ""}, w) + } + } else { + data[req.Key] = &value{req.Val, req.Version} + writeResponse(response{req.Val, req.Version, ""}, w) + } + } else { + fmt.Printf("unknown op: %s\n", req.Op) + return + } +} + +func main() { + http.HandleFunc("/", handler) + http.ListenAndServe(":8080", nil) +}