mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #12836 from chalin/chalin-contrib-lock-2021-04-06
Contrib lock example
This commit is contained in:
commit
e24e72c5c8
61
contrib/lock/README.md
Normal file
61
contrib/lock/README.md
Normal file
@ -0,0 +1,61 @@
|
||||
# What is this?
|
||||
This directory provides an executable example of the scenarios described in [the article by Martin Kleppmann][fencing].
|
||||
|
||||
Generally speaking, a lease-based lock service cannot provide mutual exclusion to processes. This is because such a lease mechanism depends on the physical clock of both the lock service and client processes. Many factors (e.g. stop-the-world GC pause of a language runtime) can cause false expiration of a granted lease as depicted in the below figure: ![unsafe lock][unsafe-lock]
|
||||
|
||||
As discussed in [notes on the usage of lock and lease][why], such a problem can be solved with a technique called version number validation or fencing tokens. With this technique a shared resource (storage in the figures) needs to validate requests from clients based on their tokens like this: ![fencing tokens][fencing-tokens]
|
||||
|
||||
This directory contains two programs: `client` and `storage`. With `etcd`, you can reproduce the expired lease problem of distributed locking and a simple example solution of the validation technique which can avoid incorrect access from a client with an expired lease.
|
||||
|
||||
`storage` works as a very simple key value in-memory store which is accessible through HTTP and a custom JSON protocol. `client` works as client processes which tries to write a key/value to `storage` with coordination of etcd locking.
|
||||
|
||||
## How to build
|
||||
|
||||
For building `client` and `storage`, just execute `go build` in each directory.
|
||||
|
||||
## How to try
|
||||
|
||||
At first you need to start an etcd cluster, which works as lock service in the figures. On top of the etcd source directory, execute commands like below:
|
||||
```
|
||||
$ ./build # build etcd
|
||||
$ goreman start
|
||||
```
|
||||
|
||||
Then run `storage` command in `storage` directory:
|
||||
```
|
||||
$ ./storage
|
||||
```
|
||||
|
||||
Now client processes ("Client 1" and "Client 2" in the figures) can be started. At first, execute below command for starting a client process which corresponds to "Client 1":
|
||||
```
|
||||
$ GODEBUG=gcstoptheworld=2 ./client 1
|
||||
```
|
||||
It will show an output like this:
|
||||
```
|
||||
client 1 starts
|
||||
creted etcd client
|
||||
acquired lock, version: 1029195466614598192
|
||||
took 6.771998255s for allocation, took 36.217205ms for GC
|
||||
emulated stop the world GC, make sure the /lock/* key disappeared and hit any key after executing client 2:
|
||||
```
|
||||
The process causes stop the world GC pause for making lease expiration intentionally and waits a keyboard input. Now another client process can be started like this:
|
||||
```
|
||||
$ ./client 2
|
||||
client 2 starts
|
||||
creted etcd client
|
||||
acquired lock, version: 4703569812595502727
|
||||
this is client 2, continuing
|
||||
```
|
||||
If things go well the second client process invoked as `./client 2` finishes soon. It successfully writes a key to `storage` process. After checking this, please hit any key for `./client 1` and resume the process. It will show an output like below:
|
||||
```
|
||||
resuming client 1
|
||||
failed to write to storage: error: given version (4703569812595502721) differ from the existing version (4703569812595502727)
|
||||
```
|
||||
|
||||
### Notes on the parameters related to stop the world GC pause
|
||||
`client` program includes two constant values: `nrGarbageObjects` and `sessionTTL`. These parameters are configured for causing lease expiration with stop the world GC pause of go runtime. They heavily rely on resources of a machine for executing the example. If lease expiration doesn't happen on your machine, update these parameters and try again.
|
||||
|
||||
[fencing]: https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
|
||||
[fencing-tokens]: https://martin.kleppmann.com/2016/02/fencing-tokens.png
|
||||
[unsafe-lock]: https://martin.kleppmann.com/2016/02/unsafe-lock.png
|
||||
[why]: https://etcd.io/docs/next/learning/why/#notes-on-the-usage-of-lock-and-lease
|
1
contrib/lock/client/.gitignore
vendored
Normal file
1
contrib/lock/client/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
client
|
206
contrib/lock/client/client.go
Normal file
206
contrib/lock/client/client.go
Normal file
@ -0,0 +1,206 @@
|
||||
// Copyright 2020 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// An example distributed locking with fencing in the case of etcd
|
||||
// Based on https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
|
||||
|
||||
// Important usage:
|
||||
// If you are invoking this program as client 1, you need to configure GODEBUG env var like below:
|
||||
// GODEBUG=gcstoptheworld=2 ./client 1
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/client/v3/concurrency"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type node struct {
|
||||
next *node
|
||||
}
|
||||
|
||||
const (
|
||||
// These const values might be need adjustment.
|
||||
nrGarbageObjects = 100 * 1000 * 1000
|
||||
sessionTTL = 1
|
||||
)
|
||||
|
||||
func stopTheWorld() {
|
||||
n := new(node)
|
||||
root := n
|
||||
allocStart := time.Now()
|
||||
for i := 0; i < nrGarbageObjects; i++ {
|
||||
n.next = new(node)
|
||||
n = n.next
|
||||
}
|
||||
func(n *node) {}(root) // dummy usage of root for removing a compiler error
|
||||
root = nil
|
||||
allocDur := time.Since(allocStart)
|
||||
|
||||
gcStart := time.Now()
|
||||
runtime.GC()
|
||||
gcDur := time.Since(gcStart)
|
||||
fmt.Printf("took %v for allocation, took %v for GC\n", allocDur, gcDur)
|
||||
}
|
||||
|
||||
type request struct {
|
||||
Op string `json:"op"`
|
||||
Key string `json:"key"`
|
||||
Val string `json:"val"`
|
||||
Version int64 `json:"version"`
|
||||
}
|
||||
|
||||
type response struct {
|
||||
Val string `json:"val"`
|
||||
Version int64 `json:"version"`
|
||||
Err string `json:"err"`
|
||||
}
|
||||
|
||||
func write(key string, value string, version int64) error {
|
||||
req := request{
|
||||
Op: "write",
|
||||
Key: key,
|
||||
Val: value,
|
||||
Version: version,
|
||||
}
|
||||
|
||||
reqBytes, err := json.Marshal(&req)
|
||||
if err != nil {
|
||||
fmt.Printf("failed to marshal request: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
httpResp, err := http.Post("http://localhost:8080", "application/json", bytes.NewReader(reqBytes))
|
||||
if err != nil {
|
||||
fmt.Printf("failed to send a request to storage: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
respBytes, err := ioutil.ReadAll(httpResp.Body)
|
||||
if err != nil {
|
||||
fmt.Printf("failed to read request body: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
resp := new(response)
|
||||
err = json.Unmarshal(respBytes, resp)
|
||||
if err != nil {
|
||||
fmt.Printf("failed to unmarshal response json: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if resp.Err != "" {
|
||||
return fmt.Errorf("error: %s", resp.Err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func read(key string) (string, int64) {
|
||||
req := request{
|
||||
Op: "read",
|
||||
Key: key,
|
||||
}
|
||||
|
||||
reqBytes, err := json.Marshal(&req)
|
||||
if err != nil {
|
||||
fmt.Printf("failed to marshal request: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
httpResp, err := http.Post("http://localhost:8080", "application/json", bytes.NewReader(reqBytes))
|
||||
if err != nil {
|
||||
fmt.Printf("failed to send a request to storage: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
respBytes, err := ioutil.ReadAll(httpResp.Body)
|
||||
if err != nil {
|
||||
fmt.Printf("failed to read request body: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
resp := new(response)
|
||||
err = json.Unmarshal(respBytes, resp)
|
||||
if err != nil {
|
||||
fmt.Printf("failed to unmarshal response json: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
return resp.Val, resp.Version
|
||||
}
|
||||
|
||||
func main() {
|
||||
if len(os.Args) != 2 {
|
||||
fmt.Printf("usage: %s <1 or 2>\n", os.Args[0])
|
||||
return
|
||||
}
|
||||
|
||||
mode, err := strconv.Atoi(os.Args[1])
|
||||
if err != nil || mode != 1 && mode != 2 {
|
||||
fmt.Printf("mode should be 1 or 2 (given value is %s)\n", os.Args[1])
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("client %d starts\n", mode)
|
||||
|
||||
client, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: []string{"http://127.0.0.1:2379", "http://127.0.0.1:22379", "http://127.0.0.1:32379"},
|
||||
})
|
||||
if err != nil {
|
||||
fmt.Printf("failed to create an etcd client: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
fmt.Printf("creted etcd client\n")
|
||||
|
||||
session, err := concurrency.NewSession(client, concurrency.WithTTL(sessionTTL))
|
||||
if err != nil {
|
||||
fmt.Printf("failed to create a session: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
locker := concurrency.NewLocker(session, "/lock")
|
||||
locker.Lock()
|
||||
defer locker.Unlock()
|
||||
version := session.Lease()
|
||||
fmt.Printf("acquired lock, version: %d\n", version)
|
||||
|
||||
if mode == 1 {
|
||||
stopTheWorld()
|
||||
fmt.Printf("emulated stop the world GC, make sure the /lock/* key disappeared and hit any key after executing client 2: ")
|
||||
reader := bufio.NewReader(os.Stdin)
|
||||
reader.ReadByte()
|
||||
fmt.Printf("resuming client 1\n")
|
||||
} else {
|
||||
fmt.Printf("this is client 2, continuing\n")
|
||||
}
|
||||
|
||||
err = write("key0", fmt.Sprintf("value from client %d", mode), int64(version))
|
||||
if err != nil {
|
||||
fmt.Printf("failed to write to storage: %s\n", err) // client 1 should show this message
|
||||
} else {
|
||||
fmt.Printf("successfully write a key to storage\n")
|
||||
}
|
||||
}
|
1
contrib/lock/storage/.gitignore
vendored
Normal file
1
contrib/lock/storage/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
storage
|
101
contrib/lock/storage/storage.go
Normal file
101
contrib/lock/storage/storage.go
Normal file
@ -0,0 +1,101 @@
|
||||
// Copyright 2020 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type value struct {
|
||||
val string
|
||||
version int64
|
||||
}
|
||||
|
||||
var data = make(map[string]*value)
|
||||
|
||||
type request struct {
|
||||
Op string `json:"op"`
|
||||
Key string `json:"key"`
|
||||
Val string `json:"val"`
|
||||
Version int64 `json:"version"`
|
||||
}
|
||||
|
||||
type response struct {
|
||||
Val string `json:"val"`
|
||||
Version int64 `json:"version"`
|
||||
Err string `json:"err"`
|
||||
}
|
||||
|
||||
func writeResponse(resp response, w http.ResponseWriter) {
|
||||
wBytes, err := json.Marshal(resp)
|
||||
if err != nil {
|
||||
fmt.Printf("failed to marshal json: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
_, err = w.Write(wBytes)
|
||||
if err != nil {
|
||||
fmt.Printf("failed to write a response: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func handler(w http.ResponseWriter, r *http.Request) {
|
||||
rBytes, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
fmt.Printf("failed to read http request: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
var req request
|
||||
err = json.Unmarshal(rBytes, &req)
|
||||
if err != nil {
|
||||
fmt.Printf("failed to unmarshal json: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if strings.Compare(req.Op, "read") == 0 {
|
||||
if val, ok := data[req.Key]; ok {
|
||||
writeResponse(response{val.val, val.version, ""}, w)
|
||||
} else {
|
||||
writeResponse(response{"", -1, "key not found"}, w)
|
||||
}
|
||||
} else if strings.Compare(req.Op, "write") == 0 {
|
||||
if val, ok := data[req.Key]; ok {
|
||||
if req.Version != val.version {
|
||||
writeResponse(response{"", -1, fmt.Sprintf("given version (%d) is different from the existing version (%d)", req.Version, val.version)}, w)
|
||||
} else {
|
||||
data[req.Key].val = req.Val
|
||||
data[req.Key].version = req.Version
|
||||
writeResponse(response{req.Val, req.Version, ""}, w)
|
||||
}
|
||||
} else {
|
||||
data[req.Key] = &value{req.Val, req.Version}
|
||||
writeResponse(response{req.Val, req.Version, ""}, w)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf("unknown op: %s\n", req.Op)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
http.HandleFunc("/", handler)
|
||||
http.ListenAndServe(":8080", nil)
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user