Merge branch '0.2' of https://github.com/coreos/etcd into migration-test

This commit is contained in:
Ben Johnson 2013-11-03 16:03:20 -07:00
commit 3d4e604607
5 changed files with 272 additions and 146 deletions

186
README.md
View File

@ -1,5 +1,5 @@
# etcd # etcd
README version 0.1.0 README version 0.2.0
[![Build Status](https://travis-ci.org/coreos/etcd.png)](https://travis-ci.org/coreos/etcd) [![Build Status](https://travis-ci.org/coreos/etcd.png)](https://travis-ci.org/coreos/etcd)
@ -62,61 +62,58 @@ The `-n node0` tells the rest of the cluster that this node is named node0.
Lets set the first key-value pair to the node. In this case the key is `/message` and the value is `Hello world`. Lets set the first key-value pair to the node. In this case the key is `/message` and the value is `Hello world`.
```sh ```sh
curl -L http://127.0.0.1:4001/v1/keys/message -d value="Hello world" curl -L http://127.0.0.1:4001/v2/keys/message -XPUT -d value="Hello world"
``` ```
```json ```json
{"action":"SET","key":"/message","value":"Hello world","newKey":true,"index":3} {"action":"set","key":"/message","value":"Hello world","index":3,"term":0}
``` ```
This response contains five fields. We will introduce three more fields as we try more commands. This response contains five fields. We will introduce three more fields as we try more commands.
1. The action of the request; we set the value via a POST request, thus the action is `SET`. 1. The action of the request; we set the value via a PUT request, thus the action is `set`.
2. The key of the request; we set `/message` to `Hello world!`, so the key field is `/message`. 2. The key of the request; we set `/message` to `Hello world!`, so the key field is `/message`.
Notice we use a file system like structure to represent the key-value pairs. So each key starts with `/`. Notice we use a file system like structure to represent the key-value pairs. So each key starts with `/`.
3. The current value of the key; we set the value to`Hello world`. 3. The current value of the key; we set the value to`Hello world`.
4. If we set a new key; `/message` did not exist before, so this is a new key. 4. Index is the unique internal log index of the set request. Requests that change the log index include `set`, `delete`, `update`, `create` and `compareAndSwap`. The `get` and `watch` commands do not change state in the store and so they do not change the index. You may notice that in this example the index is 3, although it is the first request you sent to the server. This is because there are internal commands that also change the state like adding and syncing servers.
5. Index is the unique internal log index of the set request. Requests that change the log index include `SET`, `DELETE` and `TESTANDSET`. The `GET`, `LIST` and `WATCH` commands do not change state in the store and so they do not change the index. You may notice that in this example the index is 3, although it is the first request you sent to the server. This is because there are internal commands that also change the state like adding and syncing servers.
### Get the value of a key ### Get the value of a key
Get the value that we just set in `/message` by issuing a GET: Get the value that we just set in `/message` by issuing a GET:
```sh ```sh
curl -L http://127.0.0.1:4001/v1/keys/message curl -L http://127.0.0.1:4001/v2/keys/message
``` ```
```json ```json
{"action":"GET","key":"/message","value":"Hello world","index":3} {"action":"get","key":"/message","value":"Hello world","index":3,"term":0}
``` ```
### Change the value of a key ### Changing the value of a key
Change the value of `/message` from `Hello world` to `Hello etcd` with another POST to the key: Change the value of `/message` from `Hello world` to `Hello etcd` with another PUT request to the key:
```sh ```sh
curl -L http://127.0.0.1:4001/v1/keys/message -d value="Hello etcd" curl -L http://127.0.0.1:4001/v1/keys/message -XPUT -d value="Hello etcd"
``` ```
```json ```json
{"action":"SET","key":"/message","prevValue":"Hello world","value":"Hello etcd","index":4} {"action":"set","key":"/message","prevValue":"Hello world","value":"Hello etcd","index":4,"term":0}
``` ```
Notice that the `prevValue` is set to `Hello world`. Notice that the `prevValue` is set to the previous value of the key - `Hello world`. It is useful when you want to atomically set a value to a key and get its old value.
### Deleting a key
### Delete a key
Remove the `/message` key with a DELETE: Remove the `/message` key with a DELETE:
```sh ```sh
curl -L http://127.0.0.1:4001/v1/keys/message -X DELETE curl -L http://127.0.0.1:4001/v2/keys/message -XDELETE
``` ```
```json ```json
{"action":"DELETE","key":"/message","prevValue":"Hello etcd","index":5} {"action":"delete","key":"/message","prevValue":"Hello etcd","index":5,"term":0}
``` ```
### Using key TTL ### Using key TTL
@ -124,11 +121,11 @@ curl -L http://127.0.0.1:4001/v1/keys/message -X DELETE
Keys in etcd can be set to expire after a specified number of seconds. That is done by setting a TTL (time to live) on the key when you POST: Keys in etcd can be set to expire after a specified number of seconds. That is done by setting a TTL (time to live) on the key when you POST:
```sh ```sh
curl -L http://127.0.0.1:4001/v1/keys/foo -d value=bar -d ttl=5 curl -L http://127.0.0.1:4001/v2/keys/foo -XPUT -d value=bar -d ttl=5
``` ```
```json ```json
{"action":"SET","key":"/foo","value":"bar","newKey":true,"expiration":"2013-07-11T20:31:12.156146039-07:00","ttl":4,"index":6} {"action":"set","key":"/foo","value":"bar","expiration":"2013-10-19T18:44:04.528757176-07:00","ttl":5,"index":6,"term":0}
``` ```
Note the last two new fields in response: Note the last two new fields in response:
@ -140,115 +137,180 @@ Note the last two new fields in response:
Now you can try to get the key by sending: Now you can try to get the key by sending:
```sh ```sh
curl -L http://127.0.0.1:4001/v1/keys/foo curl -L http://127.0.0.1:4001/v2/keys/foo
``` ```
If the TTL has expired, the key will be deleted, and you will be returned a 100. If the TTL has expired, the key will be deleted, and you will be returned a 100.
```json ```json
{"errorCode":100,"message":"Key Not Found","cause":"/foo"} {"errorCode":100,"message":"Key Not Found","cause":"/foo","index":6,"term":0}
``` ```
### Watching a prefix ### Waiting for a change
We can watch a path prefix and get notifications if any key change under that prefix. We can watch for a change and get a notification at a given path or any keys underneath it.
In one terminal, we send a watch request: In one terminal, we send a get request with `wait=true` :
```sh ```sh
curl -L http://127.0.0.1:4001/v1/watch/foo curl -L http://127.0.0.1:4001/v2/keys/foo?wait=true
``` ```
Now, we are watching at the path prefix `/foo` and wait for any changes under this path. Now, we are waiting for any changes at path `/foo`.
In another terminal, we set a key `/foo/foo` to `barbar` to see what will happen: In another terminal, we set a key `/foo` with value `bar`:
```sh ```sh
curl -L http://127.0.0.1:4001/v1/keys/foo/foo -d value=barbar curl -L http://127.0.0.1:4001/v2/keys/foo -XPUT -d value=bar
``` ```
The first terminal should get the notification and return with the same response as the set request. The first terminal should get the notification and return with the same response as the set request.
```json ```json
{"action":"SET","key":"/foo/foo","value":"barbar","newKey":true,"index":7} {"action":"set","key":"/foo","value":"bar","index":7,"term":0}
``` ```
However, the watch command can do more than this. Using the the index we can watch for commands that has happened in the past. This is useful for ensuring you don't miss events between watch commands. However, the watch command can do more than this. Using the the index we can watch for commands that has happened in the past. This is useful for ensuring you don't miss events between watch commands.
Let's try to watch for the set command of index 6 again: Let's try to watch for the set command of index 7 again:
```sh ```sh
curl -L http://127.0.0.1:4001/v1/watch/foo -d index=7 curl -L http://127.0.0.1:4001/v2/keys/foo?wait=true\&waitIndex=7
``` ```
The watch command returns immediately with the same response as previous. The watch command returns immediately with the same response as previous.
### Atomic Test and Set ### Atomic Compare and Swap
Etcd can be used as a centralized coordination service in a cluster and `TestAndSet` is the most basic operation to build distributed lock service. This command will set the value only if the client provided `prevValue` is equal the current key value. Etcd can be used as a centralized coordination service in a cluster and `CompareAndSwap` is the most basic operation to build distributed lock service.
This command will set the value to the key only if the client provided conditions are equal to the current conditions.
The current comparable conditions are:
1. `prevValue` previous value of the key:
2. `prevIndex` previous index of the key
3. `prevExist` previous existence of the key: if `prevExist` is true, it is a `update` request; if prevExist is `false`, it is a `create` request.
Here is a simple example. Let's create a key-value pair first: `foo=one`. Here is a simple example. Let's create a key-value pair first: `foo=one`.
```sh ```sh
curl -L http://127.0.0.1:4001/v1/keys/foo -d value=one curl -L http://127.0.0.1:4001/v1/keys/foo -XPUT -d value=one
``` ```
Let's try an invalid `TestAndSet` command. Let's try an invalid `CompareAndSwap` command.
We can give another parameter prevValue to set command to make it a TestAndSet command. We can give another parameter prevValue to set command to make it a `CompareAndSwap` command.
```sh ```sh
curl -L http://127.0.0.1:4001/v1/keys/foo -d prevValue=two -d value=three curl -L http://127.0.0.1:4001/v1/keys/foo?prevValue=two -XPUT -d value=three
``` ```
This will try to test if the previous of the key is two, it is change it to three. This will try to compare the previous value of the key and the previous value we provided. If they are equal, the value of the key will change to three.
```json ```json
{"errorCode":101,"message":"The given PrevValue is not equal to the value of the key","cause":"TestAndSet: one!=two"} {"errorCode":101,"message":"Test Failed","cause":"[two != one] [0 != 8]","index":9,"term":0}
``` ```
which means `testAndSet` failed. which means `compareAndSwap` failed.
Let us try a valid one. Let us try a valid one.
```sh ```sh
curl -L http://127.0.0.1:4001/v1/keys/foo -d prevValue=one -d value=two curl -L http://127.0.0.1:4001/v2/keys/foo?prevValue=one -XPUT -d value=two
``` ```
The response should be The response should be
```json ```json
{"action":"SET","key":"/foo","prevValue":"one","value":"two","index":10} {"action":"compareAndSwap","key":"/foo","prevValue":"one","value":"two","index":10,"term":0}
``` ```
We successfully changed the value from “one” to “two”, since we give the correct previous value. We successfully changed the value from “one” to “two”, since we give the correct previous value.
### Listing a directory ### Listing a directory
Last we provide a simple List command to list all the keys under a prefix path.
Let us create some keys first. Let us create some keys first.
We already have `/foo/foo=barbar` We already have `/foo=two`
We create another one `/foo/foo_dir/foo=barbarbar` We create another one `/foo_dir/foo=bar`
```sh ```sh
curl -L http://127.0.0.1:4001/v1/keys/foo/foo_dir/bar -d value=barbarbar curl -L http://127.0.0.1:4001/v2/keys/foo_dir/foo -XPUT -d value=bar
``` ```
Now list the keys under `/foo` ```json
{"action":"set","key":"/foo_dir/foo","value":"bar","index":11,"term":0}
```
Now list the keys under root `/`
```sh ```sh
curl -L http://127.0.0.1:4001/v1/keys/foo/ curl -L http://127.0.0.1:4001/v2/keys/
``` ```
We should see the response as an array of items We should see the response as an array of items
```json ```json
[{"action":"GET","key":"/foo/foo","value":"barbar","index":10},{"action":"GET","key":"/foo/foo_dir","dir":true,"index":10}] {"action":"get","key":"/","dir":true,"kvs":[{"key":"/foo","value":"two"},{"key":"/foo_dir","dir":true}],"index":11,"term":0}
``` ```
which meas `foo=barbar` is a key-value pair under `/foo` and `foo_dir` is a directory. which meas `/foo=two` is a key-value pair under `/ and `/foo_dir` is a directory.
Also we can recursively get all the content under a directory by add `recursive=true`.
```sh
curl -L http://127.0.0.1:4001/v2/keys/?recursive=true
```
```json
{"action":"get","key":"/","dir":true,"kvs":[{"key":"/foo","value":"two"},{"key":"/foo_dir","dir":true,"kvs":[{"key":"/foo_dir/foo","value":"bar"}]}],"index":11,"term":0}
```
### Deleting a directory
Let try to delete the directory `/foo_dir`.
To delete a directory, we must add `recursive=true`.
```sh
curl -L http://127.0.0.1:4001/v2/keys/foo_dir?recursive=true -XDELETE
```
```json
{"action":"delete","key":"/foo_dir","dir":true,"index":12,"term":0}
```
### Creating a hidden node
We can create a hidden key-value pair or directory by add `_` prefix. The hidden item will not be list when using get for a directory.
```sh
curl -L http://127.0.0.1:4001/v2/keys/_message -XPUT -d value="Hello hidden world"
```
```json
{"action":"set","key":"/_message","value":"Hello hidden world","index":13,"term":0}
```
```sh
curl -L http://127.0.0.1:4001/v2/keys/message -XPUT -d value="Hello world"
```
```json
{"action":"set","key":"/message","value":"Hello world","index":14,"term":0}
```
Let us try to get the root `/`
```sh
curl -L http://127.0.0.1:4001/v2/keys/
```
```json
{"action":"get","key":"/","dir":true,"kvs":[{"key":"/foo","value":"two"},{"key":"/message","value":"Hello world"}],"index":15,"term":0}
```
We can only get `/message`, but cannot get `/_message`.
## Advanced Usage ## Advanced Usage
@ -273,7 +335,7 @@ Next, lets configure etcd to use this keypair:
You can now test the configuration using https: You can now test the configuration using https:
```sh ```sh
curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -d value=bar -v curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v2/keys/foo -XPUT -d value=bar -v
``` ```
You should be able to see the handshake succeed. You should be able to see the handshake succeed.
@ -287,7 +349,7 @@ SSLv3, TLS handshake, Finished (20):
And also the response from the etcd server. And also the response from the etcd server.
```json ```json
{"action":"SET","key":"/foo","value":"bar","newKey":true,"index":3} {"action":"set","key":"/foo","value":"bar","index":3, "term: 0"}
``` ```
### Authentication with HTTPS client certificates ### Authentication with HTTPS client certificates
@ -303,7 +365,7 @@ We can also do authentication using CA certs. The clients will provide their cer
Try the same request to this server: Try the same request to this server:
```sh ```sh
curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -d value=bar -v curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v2/keys/foo -XPUT -d value=bar -v
``` ```
The request should be rejected by the server. The request should be rejected by the server.
@ -317,7 +379,7 @@ routines:SSL3_READ_BYTES:sslv3 alert bad certificate
We need to give the CA signed cert to the server. We need to give the CA signed cert to the server.
```sh ```sh
curl -L https://127.0.0.1:4001/v1/keys/foo -d value=bar -v --key myclient.key --cert myclient.crt -cacert clientCA.crt curl -L https://127.0.0.1:4001/v1/keys/foo -XPUT -d value=bar -v --key myclient.key --cert myclient.crt -cacert clientCA.crt
``` ```
You should able to see You should able to see
@ -331,7 +393,7 @@ TLS handshake, Finished (20)
And also the response from the server: And also the response from the server:
```json ```json
{"action":"SET","key":"/foo","value":"bar","newKey":true,"index":3} {"action":"set","key":"/foo","value":"bar","index":3,"term:0"}
``` ```
## Clustering ## Clustering
@ -377,7 +439,7 @@ curl -L http://127.0.0.1:4001/v1/keys/_etcd/machines
``` ```
```json ```json
[{"action":"GET","key":"/_etcd/machines/node1","value":"raft=http://127.0.0.1:7001&etcd=http://127.0.0.1:4001","index":4},{"action":"GET","key":"/_etcd/machines/node2","value":"raft=http://127.0.0.1:7002&etcd=http://127.0.0.1:4002","index":4},{"action":"GET","key":"/_etcd/machines/node3","value":"raft=http://127.0.0.1:7003&etcd=http://127.0.0.1:4003","index":4}] [{"action":"get","key":"/_etcd/machines/node1","value":"raft=http://127.0.0.1:7001&etcd=http://127.0.0.1:4001&raftVersion=v0.1.1-311-g91cad59","index":4},{"action":"get","key":"/_etcd/machines/node2","value":"raft=http://127.0.0.1:7002&etcd=http://127.0.0.1:4002&raftVersion=v0.1.1-311-g91cad59","index":4},{"action":"get","key":"/_etcd/machines/node3","value":"raft=http://127.0.0.1:7003&etcd=http://127.0.0.1:4003&raftVersion=v0.1.1-311-g91cad59","index":4}]
``` ```
The key of the machine is based on the ```commit index``` when it was added. The value of the machine is ```hostname```, ```raft port``` and ```client port```. The key of the machine is based on the ```commit index``` when it was added. The value of the machine is ```hostname```, ```raft port``` and ```client port```.
@ -385,7 +447,7 @@ The key of the machine is based on the ```commit index``` when it was added. The
Also try to get the current leader in the cluster Also try to get the current leader in the cluster
``` ```
curl -L http://127.0.0.1:4001/v1/leader curl -L http://127.0.0.1:4001/v2/leader
``` ```
The first server we set up should be the leader, if it has not died during these commands. The first server we set up should be the leader, if it has not died during these commands.
@ -396,11 +458,11 @@ http://127.0.0.1:7001
Now we can do normal SET and GET operations on keys as we explored earlier. Now we can do normal SET and GET operations on keys as we explored earlier.
```sh ```sh
curl -L http://127.0.0.1:4001/v1/keys/foo -d value=bar curl -L http://127.0.0.1:4001/v2/keys/foo -XPUT -d value=bar
``` ```
```json ```json
{"action":"SET","key":"/foo","value":"bar","newKey":true,"index":5} {"action":"set","key":"/foo","value":"bar","index":5,"term:0"}
``` ```
### Killing Nodes in the Cluster ### Killing Nodes in the Cluster
@ -430,7 +492,7 @@ http://127.0.0.1:7003
You should be able to see this: You should be able to see this:
```json ```json
{"action":"GET","key":"/foo","value":"bar","index":5} {"action":"get","key":"/foo","value":"bar","index":5,"term:1"}
``` ```
It succeeded! It succeeded!

View File

@ -42,7 +42,12 @@ func main() {
log.Fatal("info:", err) log.Fatal("info:", err)
} }
if info.Name == "" { if info.Name == "" {
log.Fatal("ERROR: server name required. e.g. '-n=server_name'") host, err := os.Hostname()
if err != nil || host == "" {
log.Fatal("Machine name required and hostname not set. e.g. '-n=machine_name'")
}
log.Warnf("Using hostname %s as the machine name. You must ensure this name is unique among etcd machines.", host)
info.Name = host
} }
// Retrieve TLS configuration. // Retrieve TLS configuration.

View File

@ -23,72 +23,115 @@ func (ps *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request
// Response to vote request // Response to vote request
func (ps *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) { func (ps *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
rvreq := &raft.RequestVoteRequest{} rvreq := &raft.RequestVoteRequest{}
err := decodeJsonRequest(req, rvreq)
if err == nil { if _, err := rvreq.Decode(req.Body); err != nil {
log.Debugf("[recv] POST %s/vote [%s]", ps.url, rvreq.CandidateName) http.Error(w, "", http.StatusBadRequest)
if resp := ps.raftServer.RequestVote(rvreq); resp != nil { log.Warnf("[recv] BADREQUEST %s/vote [%v]", ps.url, err)
w.WriteHeader(http.StatusOK) return
json.NewEncoder(w).Encode(resp) }
return
} log.Debugf("[recv] POST %s/vote [%s]", ps.url, rvreq.CandidateName)
resp := ps.raftServer.RequestVote(rvreq)
if resp == nil {
log.Warn("[vote] Error: nil response")
http.Error(w, "", http.StatusInternalServerError)
return
}
if _, err := resp.Encode(w); err != nil {
log.Warn("[vote] Error: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
} }
log.Warnf("[vote] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
} }
// Response to append entries request // Response to append entries request
func (ps *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { func (ps *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.AppendEntriesRequest{} aereq := &raft.AppendEntriesRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil { if _, err := aereq.Decode(req.Body); err != nil {
log.Debugf("[recv] POST %s/log/append [%d]", ps.url, len(aereq.Entries)) http.Error(w, "", http.StatusBadRequest)
log.Warnf("[recv] BADREQUEST %s/log/append [%v]", ps.url, err)
ps.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength)) return
}
if resp := ps.raftServer.AppendEntries(aereq); resp != nil {
w.WriteHeader(http.StatusOK) log.Debugf("[recv] POST %s/log/append [%d]", ps.url, len(aereq.Entries))
json.NewEncoder(w).Encode(resp)
if !resp.Success { ps.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
log.Debugf("[Append Entry] Step back")
} resp := ps.raftServer.AppendEntries(aereq)
return
} if resp == nil {
log.Warn("[ae] Error: nil response")
http.Error(w, "", http.StatusInternalServerError)
return
}
if !resp.Success {
log.Debugf("[Append Entry] Step back")
}
if _, err := resp.Encode(w); err != nil {
log.Warn("[ae] Error: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
} }
log.Warnf("[Append Entry] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
} }
// Response to recover from snapshot request // Response to recover from snapshot request
func (ps *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { func (ps *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.SnapshotRequest{} ssreq := &raft.SnapshotRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil { if _, err := ssreq.Decode(req.Body); err != nil {
log.Debugf("[recv] POST %s/snapshot/ ", ps.url) http.Error(w, "", http.StatusBadRequest)
if resp := ps.raftServer.RequestSnapshot(aereq); resp != nil { log.Warnf("[recv] BADREQUEST %s/snapshot [%v]", ps.url, err)
w.WriteHeader(http.StatusOK) return
json.NewEncoder(w).Encode(resp) }
return
} log.Debugf("[recv] POST %s/snapshot", ps.url)
resp := ps.raftServer.RequestSnapshot(ssreq)
if resp == nil {
log.Warn("[ss] Error: nil response")
http.Error(w, "", http.StatusInternalServerError)
return
}
if _, err := resp.Encode(w); err != nil {
log.Warn("[ss] Error: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
} }
log.Warnf("[Snapshot] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
} }
// Response to recover from snapshot request // Response to recover from snapshot request
func (ps *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { func (ps *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.SnapshotRecoveryRequest{} ssrreq := &raft.SnapshotRecoveryRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil { if _, err := ssrreq.Decode(req.Body); err != nil {
log.Debugf("[recv] POST %s/snapshotRecovery/ ", ps.url) http.Error(w, "", http.StatusBadRequest)
if resp := ps.raftServer.SnapshotRecoveryRequest(aereq); resp != nil { log.Warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", ps.url, err)
w.WriteHeader(http.StatusOK) return
json.NewEncoder(w).Encode(resp) }
return
} log.Debugf("[recv] POST %s/snapshotRecovery", ps.url)
resp := ps.raftServer.SnapshotRecoveryRequest(ssrreq)
if resp == nil {
log.Warn("[ssr] Error: nil response")
http.Error(w, "", http.StatusInternalServerError)
return
}
if _, err := resp.Encode(w); err != nil {
log.Warn("[ssr] Error: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
} }
log.Warnf("[Snapshot] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
} }
// Get the port that listening for etcd connecting of the server // Get the port that listening for etcd connecting of the server

View File

@ -116,6 +116,7 @@ func (s *Server) installV2() {
s.handleFunc("/v2/stats/self", s.GetStatsHandler).Methods("GET") s.handleFunc("/v2/stats/self", s.GetStatsHandler).Methods("GET")
s.handleFunc("/v2/stats/leader", s.GetLeaderStatsHandler).Methods("GET") s.handleFunc("/v2/stats/leader", s.GetLeaderStatsHandler).Methods("GET")
s.handleFunc("/v2/stats/store", s.GetStoreStatsHandler).Methods("GET") s.handleFunc("/v2/stats/store", s.GetStoreStatsHandler).Methods("GET")
s.handleFunc("/v2/speedTest", s.SpeedTestHandler).Methods("GET")
} }
// Adds a v1 server handler to the router. // Adds a v1 server handler to the router.

View File

@ -3,7 +3,6 @@ package server
import ( import (
"bytes" "bytes"
"crypto/tls" "crypto/tls"
"encoding/json"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -65,10 +64,12 @@ func dialWithTimeout(network, addr string) (net.Conn, error) {
// Sends AppendEntries RPCs to a peer when the server is the leader. // Sends AppendEntries RPCs to a peer when the server is the leader.
func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
var aersp *raft.AppendEntriesResponse
var b bytes.Buffer var b bytes.Buffer
json.NewEncoder(&b).Encode(req) if _, err := req.Encode(&b); err != nil {
log.Warn("transporter.ae.encoding.error:", err)
return nil
}
size := b.Len() size := b.Len()
@ -97,6 +98,7 @@ func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Pe
if ok { if ok {
thisFollowerStats.Fail() thisFollowerStats.Fail()
} }
return nil
} else { } else {
if ok { if ok {
thisFollowerStats.Succ(end.Sub(start)) thisFollowerStats.Succ(end.Sub(start))
@ -108,21 +110,25 @@ func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Pe
t.CancelWhenTimeout(httpRequest) t.CancelWhenTimeout(httpRequest)
aersp = &raft.AppendEntriesResponse{} aeresp := &raft.AppendEntriesResponse{}
if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { if _, err = aeresp.Decode(resp.Body); err != nil && err != io.EOF {
return aersp log.Warn("transporter.ae.decoding.error:", err)
return nil
} }
return aeresp
} }
return aersp return nil
} }
// Sends RequestVote RPCs to a peer when the server is the candidate. // Sends RequestVote RPCs to a peer when the server is the candidate.
func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse { func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
var rvrsp *raft.RequestVoteResponse
var b bytes.Buffer var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
if _, err := req.Encode(&b); err != nil {
log.Warn("transporter.vr.encoding.error:", err)
return nil
}
u, _ := t.peerServer.registry.PeerURL(peer.Name) u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send Vote from %s to %s", server.Name(), u) log.Debugf("Send Vote from %s to %s", server.Name(), u)
@ -139,28 +145,31 @@ func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *
t.CancelWhenTimeout(httpRequest) t.CancelWhenTimeout(httpRequest)
rvrsp := &raft.RequestVoteResponse{} rvrsp := &raft.RequestVoteResponse{}
if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF { if _, err = rvrsp.Decode(resp.Body); err != nil && err != io.EOF {
return rvrsp log.Warn("transporter.vr.decoding.error:", err)
return nil
} }
return rvrsp
} }
return rvrsp return nil
} }
// Sends SnapshotRequest RPCs to a peer when the server is the candidate. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse { func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
var aersp *raft.SnapshotResponse
var b bytes.Buffer var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
if _, err := req.Encode(&b); err != nil {
log.Warn("transporter.ss.encoding.error:", err)
return nil
}
u, _ := t.peerServer.registry.PeerURL(peer.Name) u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u, log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u)
req.LastTerm, req.LastIndex)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
if err != nil { if err != nil {
log.Debugf("Cannot send SendSnapshotRequest to %s : %s", u, err) log.Debugf("Cannot send Snapshot Request to %s : %s", u, err)
} }
if resp != nil { if resp != nil {
@ -168,42 +177,48 @@ func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, r
t.CancelWhenTimeout(httpRequest) t.CancelWhenTimeout(httpRequest)
aersp = &raft.SnapshotResponse{} ssrsp := &raft.SnapshotResponse{}
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { if _, err = ssrsp.Decode(resp.Body); err != nil && err != io.EOF {
log.Warn("transporter.ss.decoding.error:", err)
return aersp return nil
} }
return ssrsp
} }
return nil
return aersp
} }
// Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
var aersp *raft.SnapshotRecoveryResponse
var b bytes.Buffer var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
if _, err := req.Encode(&b); err != nil {
log.Warn("transporter.ss.encoding.error:", err)
return nil
}
u, _ := t.peerServer.registry.PeerURL(peer.Name) u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u, log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u)
req.LastTerm, req.LastIndex)
resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
if err != nil { if err != nil {
log.Debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err) log.Debugf("Cannot send Snapshot Recovery to %s : %s", u, err)
} }
if resp != nil { if resp != nil {
defer resp.Body.Close() defer resp.Body.Close()
aersp = &raft.SnapshotRecoveryResponse{}
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { t.CancelWhenTimeout(httpRequest)
return aersp
ssrrsp := &raft.SnapshotRecoveryResponse{}
if _, err = ssrrsp.Decode(resp.Body); err != nil && err != io.EOF {
log.Warn("transporter.ssr.decoding.error:", err)
return nil
} }
return ssrrsp
} }
return nil
return aersp
} }
// Send server side POST request // Send server side POST request