diff --git a/contrib/lock/README.md b/contrib/lock/README.md index e1e408a0a..d33630e25 100644 --- a/contrib/lock/README.md +++ b/contrib/lock/README.md @@ -15,10 +15,10 @@ For building `client` and `storage`, just execute `go build` in each directory. ## How to try -At first you need to start an etcd cluster, which works as lock service in the figures. On top of the etcd source directory, execute commands like below: +At first, you need to start an etcd cluster, which works as lock service in the figures. On top of the etcd source directory, execute commands like below: ``` -$ ./build # build etcd -$ goreman start +$ make # build etcd +$ bin/etcd # start etcd ``` Then run `storage` command in `storage` directory: @@ -28,33 +28,45 @@ $ ./storage Now client processes ("Client 1" and "Client 2" in the figures) can be started. At first, execute below command for starting a client process which corresponds to "Client 1": ``` -$ GODEBUG=gcstoptheworld=2 ./client 1 +$ ./client 1 ``` It will show an output like this: ``` client 1 starts -creted etcd client -acquired lock, version: 1029195466614598192 -took 6.771998255s for allocation, took 36.217205ms for GC -emulated stop the world GC, make sure the /lock/* key disappeared and hit any key after executing client 2: +created etcd client and session +acquired lock, version: 694d82254d5fa305 +please manually revoke the lease using 'etcdctl lease revoke 694d82254d5fa305' or wait for it to expire, then start executing client 2 and hit any key... ``` -The process causes stop the world GC pause for making lease expiration intentionally and waits a keyboard input. Now another client process can be started like this: + +Verify the lease was created using: +``` +$ bin/etcdctl lease list +found 1 leases +694d82254d5fa305 +``` + +Then proceed to manually revoke the lease using: +``` +$ bin/etcdctl lease revoke 694d82254d5fa305 +lease 694d82254d5fa305 revoked +``` + +Now another client process can be started like this: ``` $ ./client 2 client 2 starts -creted etcd client -acquired lock, version: 4703569812595502727 +created etcd client and session +acquired lock, version: 694d82254e18770a this is client 2, continuing ``` -If things go well the second client process invoked as `./client 2` finishes soon. It successfully writes a key to `storage` process. After checking this, please hit any key for `./client 1` and resume the process. It will show an output like below: +If things go well the second client process invoked as `./client 2` finishes soon. It successfully writes a key to `storage` process. + +After checking this, please hit any key for `./client 1` and resume the process. It will show an output like below: ``` resuming client 1 -failed to write to storage: error: given version (4703569812595502721) differ from the existing version (4703569812595502727) +expected fail to write to storage with old lease version: error: given version (694d82254d5fa305) is different from the existing version (694d82254e18770a) ``` -### Notes on the parameters related to stop the world GC pause -`client` program includes two constant values: `nrGarbageObjects` and `sessionTTL`. These parameters are configured for causing lease expiration with stop the world GC pause of go runtime. They heavily rely on resources of a machine for executing the example. If lease expiration doesn't happen on your machine, update these parameters and try again. - [fencing]: https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html [fencing-tokens]: https://martin.kleppmann.com/2016/02/fencing-tokens.png [unsafe-lock]: https://martin.kleppmann.com/2016/02/unsafe-lock.png diff --git a/contrib/lock/client/client.go b/contrib/lock/client/client.go index 5673f7d57..9f8425a37 100644 --- a/contrib/lock/client/client.go +++ b/contrib/lock/client/client.go @@ -15,21 +15,18 @@ // An example distributed locking with fencing in the case of etcd // Based on https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html -// Important usage: -// If you are invoking this program as client 1, you need to configure GODEBUG env var like below: -// GODEBUG=gcstoptheworld=2 ./client 1 - package main import ( "bufio" "bytes" + "context" "encoding/json" "fmt" "io" + "log" "net/http" "os" - "runtime" "strconv" "time" @@ -37,34 +34,6 @@ import ( "go.etcd.io/etcd/client/v3/concurrency" ) -type node struct { - next *node -} - -const ( - // These const values might be need adjustment. - nrGarbageObjects = 100 * 1000 * 1000 - sessionTTL = 1 -) - -func stopTheWorld() { - n := new(node) - root := n - allocStart := time.Now() - for i := 0; i < nrGarbageObjects; i++ { - n.next = new(node) - n = n.next - } - func(n *node) {}(root) // dummy usage of root for removing a compiler error - root = nil - allocDur := time.Since(allocStart) - - gcStart := time.Now() - runtime.GC() - gcDur := time.Since(gcStart) - fmt.Printf("took %v for allocation, took %v for GC\n", allocDur, gcDur) -} - type request struct { Op string `json:"op"` Key string `json:"key"` @@ -88,27 +57,23 @@ func write(key string, value string, version int64) error { reqBytes, err := json.Marshal(&req) if err != nil { - fmt.Printf("failed to marshal request: %s\n", err) - os.Exit(1) + log.Fatalf("failed to marshal request: %s", err) } httpResp, err := http.Post("http://localhost:8080", "application/json", bytes.NewReader(reqBytes)) if err != nil { - fmt.Printf("failed to send a request to storage: %s\n", err) - os.Exit(1) + log.Fatalf("failed to send a request to storage: %s", err) } respBytes, err := io.ReadAll(httpResp.Body) if err != nil { - fmt.Printf("failed to read request body: %s\n", err) - os.Exit(1) + log.Fatalf("failed to read request body: %s", err) } resp := new(response) err = json.Unmarshal(respBytes, resp) if err != nil { - fmt.Printf("failed to unmarshal response json: %s\n", err) - os.Exit(1) + log.Fatalf("failed to unmarshal response json: %s", err) } if resp.Err != "" { @@ -118,90 +83,63 @@ func write(key string, value string, version int64) error { return nil } -func read(key string) (string, int64) { - req := request{ - Op: "read", - Key: key, - } - - reqBytes, err := json.Marshal(&req) - if err != nil { - fmt.Printf("failed to marshal request: %s\n", err) - os.Exit(1) - } - - httpResp, err := http.Post("http://localhost:8080", "application/json", bytes.NewReader(reqBytes)) - if err != nil { - fmt.Printf("failed to send a request to storage: %s\n", err) - os.Exit(1) - } - - respBytes, err := io.ReadAll(httpResp.Body) - if err != nil { - fmt.Printf("failed to read request body: %s\n", err) - os.Exit(1) - } - - resp := new(response) - err = json.Unmarshal(respBytes, resp) - if err != nil { - fmt.Printf("failed to unmarshal response json: %s\n", err) - os.Exit(1) - } - - return resp.Val, resp.Version -} - func main() { if len(os.Args) != 2 { - fmt.Printf("usage: %s <1 or 2>\n", os.Args[0]) - return + log.Fatalf("usage: %s <1 or 2>", os.Args[0]) } mode, err := strconv.Atoi(os.Args[1]) if err != nil || mode != 1 && mode != 2 { - fmt.Printf("mode should be 1 or 2 (given value is %s)\n", os.Args[1]) - return + log.Fatalf("mode should be 1 or 2 (given value is %s)", os.Args[1]) } - fmt.Printf("client %d starts\n", mode) + log.Printf("client %d starts\n", mode) client, err := clientv3.New(clientv3.Config{ Endpoints: []string{"http://127.0.0.1:2379", "http://127.0.0.1:22379", "http://127.0.0.1:32379"}, }) if err != nil { - fmt.Printf("failed to create an etcd client: %s\n", err) - os.Exit(1) + log.Fatalf("failed to create an etcd client: %s", err) } - fmt.Printf("creted etcd client\n") - - session, err := concurrency.NewSession(client, concurrency.WithTTL(sessionTTL)) + // do a connection check first, otherwise it will hang infinitely on newSession + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err = client.MemberList(ctx) if err != nil { - fmt.Printf("failed to create a session: %s\n", err) - os.Exit(1) + log.Fatalf("failed to reach etcd: %s", err) } + session, err := concurrency.NewSession(client, concurrency.WithTTL(1)) + if err != nil { + log.Fatalf("failed to create a session: %s", err) + } + + log.Printf("created etcd client and session") + locker := concurrency.NewLocker(session, "/lock") locker.Lock() defer locker.Unlock() version := session.Lease() - fmt.Printf("acquired lock, version: %d\n", version) + log.Printf("acquired lock, version: %x", version) if mode == 1 { - stopTheWorld() - fmt.Printf("emulated stop the world GC, make sure the /lock/* key disappeared and hit any key after executing client 2: ") + log.Printf("please manually revoke the lease using 'etcdctl lease revoke %x' or wait for it to expire, then start executing client 2 and hit any key...", version) reader := bufio.NewReader(os.Stdin) - reader.ReadByte() - fmt.Printf("resuming client 1\n") + _, _ = reader.ReadByte() + log.Printf("resuming client 1") } else { - fmt.Printf("this is client 2, continuing\n") + log.Printf("this is client 2, continuing\n") } - err = write("key0", fmt.Sprintf("value from client %d", mode), int64(version)) + err = write("key0", fmt.Sprintf("value from client %x", mode), int64(version)) if err != nil { - fmt.Printf("failed to write to storage: %s\n", err) // client 1 should show this message + if mode == 1 { + log.Printf("expected fail to write to storage with old lease version: %s\n", err) // client 1 should show this message + } else { + log.Fatalf("unexpected fail to write to storage: %s\n", err) + } } else { - fmt.Printf("successfully write a key to storage\n") + log.Printf("successfully write a key to storage using lease %x\n", int64(version)) } } diff --git a/contrib/lock/storage/storage.go b/contrib/lock/storage/storage.go index 917b86f42..e7d8c694c 100644 --- a/contrib/lock/storage/storage.go +++ b/contrib/lock/storage/storage.go @@ -79,7 +79,7 @@ func handler(w http.ResponseWriter, r *http.Request) { } else if strings.Compare(req.Op, "write") == 0 { if val, ok := data[req.Key]; ok { if req.Version != val.version { - writeResponse(response{"", -1, fmt.Sprintf("given version (%d) is different from the existing version (%d)", req.Version, val.version)}, w) + writeResponse(response{"", -1, fmt.Sprintf("given version (%x) is different from the existing version (%x)", req.Version, val.version)}, w) } else { data[req.Key].val = req.Val data[req.Key].version = req.Version