mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge 9d488f55296710347d0b2081e914c196b4908e3c into 8dc7dd7ea0e2a83cf316a7952a9c25bed90ee4c7
This commit is contained in:
commit
e3b55172ab
@ -246,10 +246,10 @@ http://www.g-loaded.eu/2005/11/10/be-your-own-ca/
|
||||
```
|
||||
|
||||
```sh
|
||||
./etcd -clientCert client.crt -clientKey client.key -i
|
||||
./etcd -clientCert client.crt -clientKey client.key -f
|
||||
```
|
||||
|
||||
`-i` is to ignore the previously created default configuration file.
|
||||
`-f` forces new node configuration if existing configuration is found (WARNING: data loss!)
|
||||
`-clientCert` and `-clientKey` are the key and cert for transport layer security between client and server
|
||||
|
||||
```sh
|
||||
@ -276,7 +276,7 @@ And also the response from the etcd server.
|
||||
We also can do authentication using CA cert. The clients will also need to provide their cert to the server. The server will check whether the cert is signed by the CA and decide whether to serve the request.
|
||||
|
||||
```sh
|
||||
./etcd -clientCert client.crt -clientKey client.key -clientCAFile clientCA.crt -i
|
||||
./etcd -clientCert client.crt -clientKey client.key -clientCAFile clientCA.crt -f
|
||||
```
|
||||
|
||||
```-clientCAFile``` is the path to the CA cert.
|
||||
|
186
etcd.go
186
etcd.go
@ -37,22 +37,10 @@ var machinesFile string
|
||||
|
||||
var cluster []string
|
||||
|
||||
var hostname string
|
||||
var clientPort int
|
||||
var raftPort int
|
||||
var webPort int
|
||||
|
||||
var serverCertFile string
|
||||
var serverKeyFile string
|
||||
var serverCAFile string
|
||||
|
||||
var clientCertFile string
|
||||
var clientKeyFile string
|
||||
var clientCAFile string
|
||||
|
||||
var argInfo Info
|
||||
var dirPath string
|
||||
|
||||
var ignore bool
|
||||
var force bool
|
||||
|
||||
var maxSize int
|
||||
|
||||
@ -71,22 +59,22 @@ func init() {
|
||||
flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma")
|
||||
flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
|
||||
|
||||
flag.StringVar(&hostname, "h", "0.0.0.0", "the hostname of the local machine")
|
||||
flag.IntVar(&clientPort, "c", 4001, "the port to communicate with clients")
|
||||
flag.IntVar(&raftPort, "s", 7001, "the port to communicate with servers")
|
||||
flag.IntVar(&webPort, "w", -1, "the port of web interface (-1 means do not start web interface)")
|
||||
flag.StringVar(&argInfo.Hostname, "h", "0.0.0.0", "the hostname of the local machine")
|
||||
flag.IntVar(&argInfo.ClientPort, "c", 4001, "the port to communicate with clients")
|
||||
flag.IntVar(&argInfo.RaftPort, "s", 7001, "the port to communicate with servers")
|
||||
flag.IntVar(&argInfo.WebPort, "w", -1, "the port of web interface (-1 means do not start web interface)")
|
||||
|
||||
flag.StringVar(&serverCAFile, "serverCAFile", "", "the path of the CAFile")
|
||||
flag.StringVar(&serverCertFile, "serverCert", "", "the cert file of the server")
|
||||
flag.StringVar(&serverKeyFile, "serverKey", "", "the key file of the server")
|
||||
flag.StringVar(&argInfo.ServerCAFile, "serverCAFile", "", "the path of the CAFile")
|
||||
flag.StringVar(&argInfo.ServerCertFile, "serverCert", "", "the cert file of the server")
|
||||
flag.StringVar(&argInfo.ServerKeyFile, "serverKey", "", "the key file of the server")
|
||||
|
||||
flag.StringVar(&clientCAFile, "clientCAFile", "", "the path of the client CAFile")
|
||||
flag.StringVar(&clientCertFile, "clientCert", "", "the cert file of the client")
|
||||
flag.StringVar(&clientKeyFile, "clientKey", "", "the key file of the client")
|
||||
flag.StringVar(&argInfo.ClientCAFile, "clientCAFile", "", "the path of the client CAFile")
|
||||
flag.StringVar(&argInfo.ClientCertFile, "clientCert", "", "the cert file of the client")
|
||||
flag.StringVar(&argInfo.ClientKeyFile, "clientKey", "", "the key file of the client")
|
||||
|
||||
flag.StringVar(&dirPath, "d", "/tmp/", "the directory to store log and snapshot")
|
||||
flag.StringVar(&dirPath, "d", ".", "the directory to store log and snapshot")
|
||||
|
||||
flag.BoolVar(&ignore, "i", false, "ignore the old configuration, create a new node")
|
||||
flag.BoolVar(&force, "f", false, "force new node configuration if existing is found (WARNING: data loss!)")
|
||||
|
||||
flag.BoolVar(&snapshot, "snapshot", false, "open or close snapshot")
|
||||
|
||||
@ -226,14 +214,14 @@ func main() {
|
||||
|
||||
startRaft(st)
|
||||
|
||||
if webPort != -1 {
|
||||
if argInfo.WebPort != -1 {
|
||||
// start web
|
||||
etcdStore.SetMessager(&storeMsg)
|
||||
go webHelper()
|
||||
go web.Start(raftServer, webPort)
|
||||
go web.Start(raftServer, argInfo.WebPort)
|
||||
}
|
||||
|
||||
startClientTransport(info.ClientPort, clientSt)
|
||||
startClientTransport(*info, clientSt)
|
||||
|
||||
}
|
||||
|
||||
@ -280,9 +268,9 @@ func startRaft(securityType int) {
|
||||
for {
|
||||
command := &JoinCommand{
|
||||
Name: raftServer.Name(),
|
||||
Hostname: hostname,
|
||||
RaftPort: raftPort,
|
||||
ClientPort: clientPort,
|
||||
Hostname: argInfo.Hostname,
|
||||
RaftPort: argInfo.RaftPort,
|
||||
ClientPort: argInfo.ClientPort,
|
||||
}
|
||||
_, err := raftServer.Do(command)
|
||||
if err == nil {
|
||||
@ -340,7 +328,7 @@ func startRaft(securityType int) {
|
||||
}
|
||||
|
||||
// start to response to raft requests
|
||||
go startRaftTransport(info.RaftPort, securityType)
|
||||
go startRaftTransport(*info, securityType)
|
||||
|
||||
}
|
||||
|
||||
@ -367,7 +355,7 @@ func createTransporter(st int) transporter {
|
||||
case HTTPSANDVERIFY:
|
||||
t.scheme = "https://"
|
||||
|
||||
tlsCert, err := tls.LoadX509KeyPair(serverCertFile, serverKeyFile)
|
||||
tlsCert, err := tls.LoadX509KeyPair(argInfo.ServerCertFile, argInfo.ServerKeyFile)
|
||||
|
||||
if err != nil {
|
||||
fatal(err)
|
||||
@ -394,7 +382,7 @@ func dialTimeout(network, addr string) (net.Conn, error) {
|
||||
}
|
||||
|
||||
// Start to listen and response raft command
|
||||
func startRaftTransport(port int, st int) {
|
||||
func startRaftTransport(info Info, st int) {
|
||||
|
||||
// internal commands
|
||||
http.HandleFunc("/join", JoinHttpHandler)
|
||||
@ -408,30 +396,30 @@ func startRaftTransport(port int, st int) {
|
||||
switch st {
|
||||
|
||||
case HTTP:
|
||||
fmt.Printf("raft server [%s] listen on http port %v\n", hostname, port)
|
||||
fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
|
||||
fmt.Printf("raft server [%s] listen on http port %v\n", info.Hostname, info.RaftPort)
|
||||
fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.RaftPort), nil))
|
||||
|
||||
case HTTPS:
|
||||
fmt.Printf("raft server [%s] listen on https port %v\n", hostname, port)
|
||||
fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil))
|
||||
fmt.Printf("raft server [%s] listen on https port %v\n", info.Hostname, info.RaftPort)
|
||||
fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", info.RaftPort), info.ServerCertFile, argInfo.ServerKeyFile, nil))
|
||||
|
||||
case HTTPSANDVERIFY:
|
||||
|
||||
server := &http.Server{
|
||||
TLSConfig: &tls.Config{
|
||||
ClientAuth: tls.RequireAndVerifyClientCert,
|
||||
ClientCAs: createCertPool(serverCAFile),
|
||||
ClientCAs: createCertPool(info.ServerCAFile),
|
||||
},
|
||||
Addr: fmt.Sprintf(":%d", port),
|
||||
Addr: fmt.Sprintf(":%d", info.RaftPort),
|
||||
}
|
||||
fmt.Printf("raft server [%s] listen on https port %v\n", hostname, port)
|
||||
fatal(server.ListenAndServeTLS(serverCertFile, serverKeyFile))
|
||||
fmt.Printf("raft server [%s] listen on https port %v\n", info.Hostname, info.RaftPort)
|
||||
fatal(server.ListenAndServeTLS(info.ServerCertFile, argInfo.ServerKeyFile))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Start to listen and response client command
|
||||
func startClientTransport(port int, st int) {
|
||||
func startClientTransport(info Info, st int) {
|
||||
// external commands
|
||||
http.HandleFunc("/"+version+"/keys/", Multiplexer)
|
||||
http.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
|
||||
@ -444,24 +432,24 @@ func startClientTransport(port int, st int) {
|
||||
switch st {
|
||||
|
||||
case HTTP:
|
||||
fmt.Printf("etcd [%s] listen on http port %v\n", hostname, clientPort)
|
||||
fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
|
||||
fmt.Printf("etcd [%s] listen on http port %v\n", info.Hostname, info.ClientPort)
|
||||
fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.ClientPort), nil))
|
||||
|
||||
case HTTPS:
|
||||
fmt.Printf("etcd [%s] listen on https port %v\n", hostname, clientPort)
|
||||
http.ListenAndServeTLS(fmt.Sprintf(":%d", port), clientCertFile, clientKeyFile, nil)
|
||||
fmt.Printf("etcd [%s] listen on https port %v\n", info.Hostname, info.ClientPort)
|
||||
http.ListenAndServeTLS(fmt.Sprintf(":%d", info.ClientPort), info.ClientCertFile, info.ClientKeyFile, nil)
|
||||
|
||||
case HTTPSANDVERIFY:
|
||||
|
||||
server := &http.Server{
|
||||
TLSConfig: &tls.Config{
|
||||
ClientAuth: tls.RequireAndVerifyClientCert,
|
||||
ClientCAs: createCertPool(clientCAFile),
|
||||
ClientCAs: createCertPool(info.ClientCAFile),
|
||||
},
|
||||
Addr: fmt.Sprintf(":%d", port),
|
||||
Addr: fmt.Sprintf(":%d", info.ClientPort),
|
||||
}
|
||||
fmt.Printf("etcd [%s] listen on https port %v\n", hostname, clientPort)
|
||||
fatal(server.ListenAndServeTLS(clientCertFile, clientKeyFile))
|
||||
fmt.Printf("etcd [%s] listen on https port %v\n", info.Hostname, info.ClientPort)
|
||||
fatal(server.ListenAndServeTLS(info.ClientCertFile, info.ClientKeyFile))
|
||||
}
|
||||
}
|
||||
|
||||
@ -511,6 +499,30 @@ func securityType(source int) int {
|
||||
return -1
|
||||
}
|
||||
|
||||
func parseInfo(path string) *Info {
|
||||
file, err := os.Open(path)
|
||||
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
info := &Info{}
|
||||
defer file.Close()
|
||||
|
||||
content, err := ioutil.ReadAll(file)
|
||||
if err != nil {
|
||||
fatalf("Unable to read info: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err = json.Unmarshal(content, &info); err != nil {
|
||||
fatalf("Unable to parse info: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
return info
|
||||
}
|
||||
|
||||
// Get the server info from previous conf file
|
||||
// or from the user
|
||||
func getInfo(path string) *Info {
|
||||
@ -519,8 +531,7 @@ func getInfo(path string) *Info {
|
||||
infoPath := fmt.Sprintf("%s/info", path)
|
||||
|
||||
// Delete the old configuration if exist
|
||||
if ignore {
|
||||
|
||||
if force {
|
||||
logPath := fmt.Sprintf("%s/log", path)
|
||||
confPath := fmt.Sprintf("%s/conf", path)
|
||||
snapshotPath := fmt.Sprintf("%s/snapshot", path)
|
||||
@ -528,54 +539,33 @@ func getInfo(path string) *Info {
|
||||
os.Remove(logPath)
|
||||
os.Remove(confPath)
|
||||
os.RemoveAll(snapshotPath)
|
||||
|
||||
}
|
||||
|
||||
if file, err := os.Open(infoPath); err == nil {
|
||||
info := &Info{}
|
||||
if content, err := ioutil.ReadAll(file); err != nil {
|
||||
fatalf("Unable to read info: %v", err)
|
||||
} else {
|
||||
if err = json.Unmarshal(content, &info); err != nil {
|
||||
fatalf("Unable to parse info: %v", err)
|
||||
}
|
||||
}
|
||||
file.Close()
|
||||
return info
|
||||
} else {
|
||||
// Otherwise ask user for info and write it to file.
|
||||
|
||||
hostname = strings.TrimSpace(hostname)
|
||||
|
||||
if hostname == "" {
|
||||
fatal("Please give the address of the local machine")
|
||||
}
|
||||
|
||||
fmt.Println("address ", hostname)
|
||||
info := &Info{
|
||||
Hostname: hostname,
|
||||
|
||||
RaftPort: raftPort,
|
||||
ClientPort: clientPort,
|
||||
WebPort: webPort,
|
||||
|
||||
ClientCAFile: clientCAFile,
|
||||
ClientCertFile: clientCertFile,
|
||||
ClientKeyFile: clientKeyFile,
|
||||
|
||||
ServerCAFile: serverCAFile,
|
||||
ServerKeyFile: serverKeyFile,
|
||||
ServerCertFile: serverCertFile,
|
||||
}
|
||||
|
||||
// Write to file.
|
||||
content, _ := json.Marshal(info)
|
||||
content = []byte(string(content) + "\n")
|
||||
if err := ioutil.WriteFile(infoPath, content, 0644); err != nil {
|
||||
fatalf("Unable to write info to file: %v", err)
|
||||
}
|
||||
info := parseInfo(infoPath)
|
||||
if info != nil {
|
||||
fmt.Printf("Found node configuration in '%s'. Ignoring flags.\n", infoPath)
|
||||
return info
|
||||
}
|
||||
|
||||
// Otherwise ask user for info and write it to file.
|
||||
argInfo.Hostname = strings.TrimSpace(argInfo.Hostname)
|
||||
|
||||
if argInfo.Hostname == "" {
|
||||
fatal("Please give the address of the local machine")
|
||||
}
|
||||
|
||||
info = &argInfo
|
||||
|
||||
// Write to file.
|
||||
content, _ := json.Marshal(info)
|
||||
content = []byte(string(content) + "\n")
|
||||
if err := ioutil.WriteFile(infoPath, content, 0644); err != nil {
|
||||
fatalf("Unable to write info to file: %v", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Wrote node configuration to '%s'.\n", infoPath)
|
||||
|
||||
return info
|
||||
}
|
||||
|
||||
// Create client auth certpool
|
||||
|
@ -14,7 +14,7 @@ import (
|
||||
func TestSingleNode(t *testing.T) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
args := []string{"etcd", "-i", "-d=/tmp/node1"}
|
||||
args := []string{"etcd", "-f", "-d=/tmp/node1"}
|
||||
|
||||
process, err := os.StartProcess("etcd", args, procAttr)
|
||||
if err != nil {
|
||||
@ -58,7 +58,7 @@ func TestSingleNodeRecovery(t *testing.T) {
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
args := []string{"etcd", "-d=/tmp/node1"}
|
||||
|
||||
process, err := os.StartProcess("etcd", append(args, "-i"), procAttr)
|
||||
process, err := os.StartProcess("etcd", append(args, "-f"), procAttr)
|
||||
if err != nil {
|
||||
t.Fatal("start process failed:" + err.Error())
|
||||
return
|
||||
|
@ -91,7 +91,7 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
func ClientHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
debugf("[recv] Get %s/client/ ", raftTransporter.scheme+raftServer.Name())
|
||||
w.WriteHeader(http.StatusOK)
|
||||
client := hostname + ":" + strconv.Itoa(clientPort)
|
||||
client := argInfo.Hostname + ":" + strconv.Itoa(argInfo.ClientPort)
|
||||
w.Write([]byte(client))
|
||||
}
|
||||
|
||||
|
2
test.go
2
test.go
@ -70,7 +70,7 @@ func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process,
|
||||
|
||||
for i, _ := range etcds {
|
||||
var err error
|
||||
etcds[i], err = os.StartProcess("etcd", append(argGroup[i], "-i"), procAttr)
|
||||
etcds[i], err = os.StartProcess("etcd", append(argGroup[i], "-f"), procAttr)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -133,6 +133,11 @@ type Tokenizer struct {
|
||||
// subsequent Next calls would return an ErrorToken.
|
||||
// err is never reset. Once it becomes non-nil, it stays non-nil.
|
||||
err error
|
||||
// readErr is the error returned by the io.Reader r. It is separate from
|
||||
// err because it is valid for an io.Reader to return (n int, err1 error)
|
||||
// such that n > 0 && err1 != nil, and callers should always process the
|
||||
// n > 0 bytes before considering the error err1.
|
||||
readErr error
|
||||
// buf[raw.start:raw.end] holds the raw bytes of the current token.
|
||||
// buf[raw.end:] is buffered input that will yield future tokens.
|
||||
raw span
|
||||
@ -222,7 +227,12 @@ func (z *Tokenizer) Err() error {
|
||||
// Pre-condition: z.err == nil.
|
||||
func (z *Tokenizer) readByte() byte {
|
||||
if z.raw.end >= len(z.buf) {
|
||||
// Our buffer is exhausted and we have to read from z.r.
|
||||
// Our buffer is exhausted and we have to read from z.r. Check if the
|
||||
// previous read resulted in an error.
|
||||
if z.readErr != nil {
|
||||
z.err = z.readErr
|
||||
return 0
|
||||
}
|
||||
// We copy z.buf[z.raw.start:z.raw.end] to the beginning of z.buf. If the length
|
||||
// z.raw.end - z.raw.start is more than half the capacity of z.buf, then we
|
||||
// allocate a new buffer before the copy.
|
||||
@ -253,9 +263,10 @@ func (z *Tokenizer) readByte() byte {
|
||||
z.raw.start, z.raw.end, z.buf = 0, d, buf1[:d]
|
||||
// Now that we have copied the live bytes to the start of the buffer,
|
||||
// we read from z.r into the remainder.
|
||||
n, err := z.r.Read(buf1[d:cap(buf1)])
|
||||
if err != nil {
|
||||
z.err = err
|
||||
var n int
|
||||
n, z.readErr = readAtLeastOneByte(z.r, buf1[d:cap(buf1)])
|
||||
if n == 0 {
|
||||
z.err = z.readErr
|
||||
return 0
|
||||
}
|
||||
z.buf = buf1[:d+n]
|
||||
@ -265,6 +276,19 @@ func (z *Tokenizer) readByte() byte {
|
||||
return x
|
||||
}
|
||||
|
||||
// readAtLeastOneByte wraps an io.Reader so that reading cannot return (0, nil).
|
||||
// It returns io.ErrNoProgress if the underlying r.Read method returns (0, nil)
|
||||
// too many times in succession.
|
||||
func readAtLeastOneByte(r io.Reader, b []byte) (int, error) {
|
||||
for i := 0; i < 100; i++ {
|
||||
n, err := r.Read(b)
|
||||
if n != 0 || err != nil {
|
||||
return n, err
|
||||
}
|
||||
}
|
||||
return 0, io.ErrNoProgress
|
||||
}
|
||||
|
||||
// skipWhiteSpace skips past any white space.
|
||||
func (z *Tokenizer) skipWhiteSpace() {
|
||||
if z.err != nil {
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"bytes"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
@ -531,6 +532,85 @@ func TestConvertNewlines(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestReaderEdgeCases(t *testing.T) {
|
||||
const s = "<p>An io.Reader can return (0, nil) or (n, io.EOF).</p>"
|
||||
testCases := []io.Reader{
|
||||
&zeroOneByteReader{s: s},
|
||||
&eofStringsReader{s: s},
|
||||
&stuckReader{},
|
||||
}
|
||||
for i, tc := range testCases {
|
||||
got := []TokenType{}
|
||||
z := NewTokenizer(tc)
|
||||
for {
|
||||
tt := z.Next()
|
||||
if tt == ErrorToken {
|
||||
break
|
||||
}
|
||||
got = append(got, tt)
|
||||
}
|
||||
if err := z.Err(); err != nil && err != io.EOF {
|
||||
if err != io.ErrNoProgress {
|
||||
t.Errorf("i=%d: %v", i, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
want := []TokenType{
|
||||
StartTagToken,
|
||||
TextToken,
|
||||
EndTagToken,
|
||||
}
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Errorf("i=%d: got %v, want %v", i, got, want)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// zeroOneByteReader is like a strings.Reader that alternates between
|
||||
// returning 0 bytes and 1 byte at a time.
|
||||
type zeroOneByteReader struct {
|
||||
s string
|
||||
n int
|
||||
}
|
||||
|
||||
func (r *zeroOneByteReader) Read(p []byte) (int, error) {
|
||||
if len(p) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
if len(r.s) == 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
r.n++
|
||||
if r.n%2 != 0 {
|
||||
return 0, nil
|
||||
}
|
||||
p[0], r.s = r.s[0], r.s[1:]
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
// eofStringsReader is like a strings.Reader but can return an (n, err) where
|
||||
// n > 0 && err != nil.
|
||||
type eofStringsReader struct {
|
||||
s string
|
||||
}
|
||||
|
||||
func (r *eofStringsReader) Read(p []byte) (int, error) {
|
||||
n := copy(p, r.s)
|
||||
r.s = r.s[n:]
|
||||
if r.s != "" {
|
||||
return n, nil
|
||||
}
|
||||
return n, io.EOF
|
||||
}
|
||||
|
||||
// stuckReader is an io.Reader that always returns no data and no error.
|
||||
type stuckReader struct{}
|
||||
|
||||
func (*stuckReader) Read(p []byte) (int, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
const (
|
||||
rawLevel = iota
|
||||
lowLevel
|
||||
|
@ -31,6 +31,7 @@ func Read(filename string) (map[string]string, error) {
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
defer in.Close()
|
||||
scanner := bufio.NewScanner(in)
|
||||
line := ""
|
||||
section := ""
|
||||
@ -60,7 +61,6 @@ func Read(filename string) (map[string]string, error) {
|
||||
res[section+key] = value
|
||||
line = ""
|
||||
}
|
||||
in.Close()
|
||||
return res, nil
|
||||
}
|
||||
|
||||
|
@ -25,4 +25,6 @@ func main() {
|
||||
res, err := config.Read("example.conf")
|
||||
fmt.Println(err)
|
||||
fmt.Println(res)
|
||||
fmt.Println(res["test.a"])
|
||||
fmt.Println(res["dd"])
|
||||
}
|
||||
|
@ -50,46 +50,30 @@ func (c *Client) watchOnce(key string, sinceIndex uint64, stop chan bool) (*stor
|
||||
var resp *http.Response
|
||||
var err error
|
||||
|
||||
if sinceIndex == 0 {
|
||||
// Get request if no index is given
|
||||
resp, err = c.sendRequest("GET", path.Join("watch", key), "")
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
// Post
|
||||
v := url.Values{}
|
||||
v.Set("index", fmt.Sprintf("%v", sinceIndex))
|
||||
|
||||
if stop != nil {
|
||||
ch := make(chan respAndErr)
|
||||
|
||||
if stop != nil {
|
||||
go func() {
|
||||
resp, err = c.sendRequest("POST", path.Join("watch", key), v.Encode())
|
||||
go func() {
|
||||
resp, err = c.sendWatchRequest(key, sinceIndex)
|
||||
|
||||
ch <- respAndErr{resp, err}
|
||||
}()
|
||||
ch <- respAndErr{resp, err}
|
||||
}()
|
||||
|
||||
// select at stop or continue to receive
|
||||
select {
|
||||
// select at stop or continue to receive
|
||||
select {
|
||||
|
||||
case res := <-ch:
|
||||
resp, err = res.resp, res.err
|
||||
case res := <-ch:
|
||||
resp, err = res.resp, res.err
|
||||
|
||||
case <-stop:
|
||||
resp, err = nil, errors.New("User stoped watch")
|
||||
}
|
||||
} else {
|
||||
resp, err = c.sendRequest("POST", path.Join("watch", key), v.Encode())
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
case <-stop:
|
||||
resp, err = nil, errors.New("User stoped watch")
|
||||
}
|
||||
} else {
|
||||
resp, err = c.sendWatchRequest(key, sinceIndex)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
@ -115,3 +99,16 @@ func (c *Client) watchOnce(key string, sinceIndex uint64, stop chan bool) (*stor
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (c *Client) sendWatchRequest(key string, sinceIndex uint64) (*http.Response, error) {
|
||||
if sinceIndex == 0 {
|
||||
resp, err := c.sendRequest("GET", path.Join("watch", key), "")
|
||||
return resp, err
|
||||
} else {
|
||||
v := url.Values{}
|
||||
v.Set("index", fmt.Sprintf("%v", sinceIndex))
|
||||
resp, err := c.sendRequest("POST", path.Join("watch", key), v.Encode())
|
||||
return resp, err
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -14,12 +14,12 @@ func main() {
|
||||
|
||||
ch := make(chan bool, 10)
|
||||
// set up a lock
|
||||
c := etcd.CreateClient()
|
||||
c := etcd.NewClient()
|
||||
c.Set("lock", "unlock", 0)
|
||||
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
go t(i, ch, etcd.CreateClient())
|
||||
go t(i, ch, etcd.NewClient())
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
|
@ -11,14 +11,14 @@ var count = 0
|
||||
func main() {
|
||||
ch := make(chan bool, 10)
|
||||
// set up a lock
|
||||
for i:=0; i < 1000; i++ {
|
||||
go t(i, ch, etcd.CreateClient())
|
||||
for i:=0; i < 100; i++ {
|
||||
go t(i, ch, etcd.NewClient())
|
||||
}
|
||||
start := time.Now()
|
||||
for i:=0; i< 1000; i++ {
|
||||
for i:=0; i< 100; i++ {
|
||||
<-ch
|
||||
}
|
||||
fmt.Println(time.Now().Sub(start), ": ", 1000 * 50, "commands")
|
||||
fmt.Println(time.Now().Sub(start), ": ", 100 * 50, "commands")
|
||||
}
|
||||
|
||||
func t(num int, ch chan bool, c *etcd.Client) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user