diff --git a/etcd/participant.go b/etcd/participant.go index 7e5222b3a..ad0bf2bb4 100644 --- a/etcd/participant.go +++ b/etcd/participant.go @@ -22,7 +22,6 @@ import ( "log" "math/rand" "net/http" - "os" "path" "time" @@ -112,17 +111,14 @@ func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDura p.rh = newRaftHandler(peerHub, p.Store.Version(), p.serverStats) p.peerHub.setServerStats(p.serverStats) - walPath := path.Join(p.cfg.DataDir, "wal") - w, err := wal.Open(walPath) - if err != nil { - if !os.IsNotExist(err) { - return nil, err - } - + walPath := p.cfg.DataDir + var w *wal.WAL + var err error + if !wal.Exist(walPath) { p.id = genId() p.pubAddr = c.Addr p.raftPubAddr = c.Peer.Addr - if w, err = wal.New(walPath); err != nil { + if w, err = wal.Create(walPath); err != nil { return nil, err } p.node.Node = raft.New(p.id, defaultHeartbeat, defaultElection) @@ -132,7 +128,7 @@ func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDura } log.Printf("id=%x participant.new path=%s\n", p.id, walPath) } else { - n, err := w.LoadNode() + n, err := wal.Read(walPath, 0) if err != nil { return nil, err } @@ -140,6 +136,9 @@ func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDura p.node.Node = raft.Recover(n.Id, n.Ents, n.State, defaultHeartbeat, defaultElection) p.apply(p.node.Next()) log.Printf("id=%x participant.load path=%s state=\"%+v\" len(ents)=%d", p.id, walPath, n.State, len(n.Ents)) + if w, err = wal.Open(walPath); err != nil { + return nil, err + } } p.w = w diff --git a/wal/wal.go b/wal/wal.go index 752a4e48d..796b46a35 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -24,14 +24,21 @@ import ( "io" "log" "os" + "path" + "sort" "github.com/coreos/etcd/raft" ) +const ( + infoType int64 = iota + 1 + entryType + stateType +) + var ( - infoType = int64(1) - entryType = int64(2) - stateType = int64(3) + ErrIdMismatch = fmt.Errorf("unmatch id") + ErrNotFound = fmt.Errorf("wal file is not found") ) type WAL struct { @@ -44,29 +51,69 @@ func newWAL(f *os.File) *WAL { return &WAL{f, bufio.NewWriter(f), new(bytes.Buffer)} } -func New(path string) (*WAL, error) { - log.Printf("path=%s wal.new", path) - f, err := os.Open(path) - if err == nil { - f.Close() +func Exist(dirpath string) bool { + names, err := readDir(dirpath) + if err != nil { + return false + } + return len(names) != 0 +} + +func Create(dirpath string) (*WAL, error) { + log.Printf("path=%s wal.create", dirpath) + if Exist(dirpath) { return nil, os.ErrExist } - f, err = os.Create(path) + p := path.Join(dirpath, fmt.Sprintf("%016x-%016x.wal", 0, 0)) + f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) if err != nil { return nil, err } return newWAL(f), nil } -func Open(path string) (*WAL, error) { - log.Printf("path=%s wal.open", path) - f, err := os.OpenFile(path, os.O_RDWR, 0) +func Open(dirpath string) (*WAL, error) { + log.Printf("path=%s wal.append", dirpath) + names, err := readDir(dirpath) + if err != nil { + return nil, err + } + names = checkWalNames(names) + if len(names) == 0 { + return nil, ErrNotFound + } + + name := names[len(names)-1] + p := path.Join(dirpath, name) + f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND, 0) if err != nil { return nil, err } return newWAL(f), nil } +// index should be the index of last log entry currently. +// Cut closes current file written and creates a new one to append. +func (w *WAL) Cut(index int64) error { + log.Printf("path=%s wal.cut index=%d", w.f.Name(), index) + fpath := w.f.Name() + seq, _, err := parseWalName(path.Base(fpath)) + if err != nil { + panic("parse correct name error") + } + fpath = path.Join(path.Dir(fpath), fmt.Sprintf("%016x-%016x.wal", seq+1, index)) + f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) + if err != nil { + return err + } + + w.Sync() + w.f.Close() + w.f = f + w.bw = bufio.NewWriter(f) + return nil +} + func (w *WAL) Sync() error { if err := w.bw.Flush(); err != nil { return err @@ -129,52 +176,141 @@ type Node struct { Id int64 Ents []raft.Entry State raft.State + + // index of the first entry + index int64 } -func (w *WAL) LoadNode() (*Node, error) { - log.Printf("path=%s wal.loadNode", w.f.Name()) - if err := w.checkAtHead(); err != nil { - return nil, err +func newNode(index int64) *Node { + return &Node{Ents: make([]raft.Entry, 0), index: index + 1} +} + +func (n *Node) load(path string) error { + f, err := os.Open(path) + if err != nil { + return err } - br := bufio.NewReader(w.f) + defer f.Close() + br := bufio.NewReader(f) rec := &Record{} - err := readRecord(br, rec) + err = readRecord(br, rec) if err != nil { - return nil, err + return err } if rec.Type != infoType { - return nil, fmt.Errorf("the first block of wal is not infoType but %d", rec.Type) + return fmt.Errorf("the first block of wal is not infoType but %d", rec.Type) } i, err := loadInfo(rec.Data) if err != nil { - return nil, err + return err } + if n.Id != 0 && n.Id != i.Id { + return ErrIdMismatch + } + n.Id = i.Id - ents := make([]raft.Entry, 0) - var state raft.State for err = readRecord(br, rec); err == nil; err = readRecord(br, rec) { switch rec.Type { case entryType: e, err := loadEntry(rec.Data) if err != nil { - return nil, err + return err + } + if e.Index >= n.index { + n.Ents = append(n.Ents[:e.Index-n.index], e) } - ents = append(ents[:e.Index-1], e) case stateType: s, err := loadState(rec.Data) if err != nil { - return nil, err + return err } - state = s + n.State = s default: - return nil, fmt.Errorf("unexpected block type %d", rec.Type) + return fmt.Errorf("unexpected block type %d", rec.Type) } } if err != io.EOF { + return err + } + return nil +} + +func (n *Node) startFrom(index int64) error { + diff := int(index - n.index) + if diff > len(n.Ents) { + return ErrNotFound + } + n.Ents = n.Ents[diff:] + return nil +} + +// Read loads all entries after index (index is not included). +func Read(dirpath string, index int64) (*Node, error) { + log.Printf("path=%s wal.load index=%d", dirpath, index) + names, err := readDir(dirpath) + if err != nil { return nil, err } - return &Node{i.Id, ents, state}, nil + names = checkWalNames(names) + if len(names) == 0 { + return nil, ErrNotFound + } + + sort.Sort(sort.StringSlice(names)) + nameIndex, ok := searchIndex(names, index) + if !ok || !isValidSeq(names[nameIndex:]) { + return nil, ErrNotFound + } + + _, initIndex, err := parseWalName(names[nameIndex]) + if err != nil { + panic("parse correct name error") + } + n := newNode(initIndex) + for _, name := range names[nameIndex:] { + if err := n.load(path.Join(dirpath, name)); err != nil { + return nil, err + } + } + if err := n.startFrom(index + 1); err != nil { + return nil, ErrNotFound + } + return n, nil +} + +// The input names should be sorted. +// serachIndex returns the array index of the last name that has +// a smaller raft index section than the given raft index. +func searchIndex(names []string, index int64) (int, bool) { + for i := len(names) - 1; i >= 0; i-- { + name := names[i] + _, curIndex, err := parseWalName(name) + if err != nil { + panic("parse correct name error") + } + if index >= curIndex { + return i, true + } + } + return -1, false +} + +// names should have been sorted based on sequence number. +// isValidSeq checks whether seq increases continuously. +func isValidSeq(names []string) bool { + var lastSeq int64 + for _, name := range names { + curSeq, _, err := parseWalName(name) + if err != nil { + panic("parse correct name error") + } + if lastSeq != 0 && lastSeq != curSeq-1 { + return false + } + lastSeq = curSeq + } + return true } func loadInfo(d []byte) (raft.Info, error) { @@ -201,6 +337,41 @@ func loadState(d []byte) (raft.State, error) { return s, err } +// readDir returns the filenames in wal directory. +func readDir(dirpath string) ([]string, error) { + dir, err := os.Open(dirpath) + if err != nil { + return nil, err + } + defer dir.Close() + names, err := dir.Readdirnames(-1) + if err != nil { + return nil, err + } + return names, nil +} + +func checkWalNames(names []string) []string { + wnames := make([]string, 0) + for _, name := range names { + if _, _, err := parseWalName(name); err != nil { + log.Printf("parse %s: %v", name, err) + continue + } + wnames = append(wnames, name) + } + return wnames +} + +func parseWalName(str string) (seq, index int64, err error) { + var num int + num, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index) + if num != 2 && err == nil { + err = fmt.Errorf("bad wal name: %s", str) + } + return +} + func writeInt64(w io.Writer, n int64) error { return binary.Write(w, binary.LittleEndian, n) } diff --git a/wal/wal_test.go b/wal/wal_test.go index 7e9a38a10..3e76a660b 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -17,6 +17,7 @@ limitations under the License. package wal import ( + "fmt" "io/ioutil" "os" "path" @@ -35,36 +36,123 @@ var ( entryData = []byte("\b\x01\x10\x01\x18\x01\x22\x01\x01") entryRecord = append([]byte("\x0f\x00\x00\x00\x00\x00\x00\x00\b\x02\x10\x00\x1a\t"), entryData...) + + firstWalName = "0000000000000000-0000000000000000.wal" ) func TestNew(t *testing.T) { - f, err := ioutil.TempFile(os.TempDir(), "waltest") + p, err := ioutil.TempDir(os.TempDir(), "waltest") if err != nil { t.Fatal(err) } - p := f.Name() - _, err = New(p) - if err == nil || err != os.ErrExist { - t.Errorf("err = %v, want %v", err, os.ErrExist) - } - err = os.Remove(p) + defer os.RemoveAll(p) + + w, err := Create(p) if err != nil { - t.Fatal(err) + t.Fatalf("err = %v, want nil", err) } - w, err := New(p) - if err != nil { - t.Errorf("err = %v, want nil", err) + if g := path.Base(w.f.Name()); g != firstWalName { + t.Errorf("name = %+v, want %+v", g, firstWalName) } w.Close() - err = os.Remove(p) +} + +func TestNewForInitedDir(t *testing.T) { + p, err := ioutil.TempDir(os.TempDir(), "waltest") if err != nil { t.Fatal(err) } + defer os.RemoveAll(p) + + os.Create(path.Join(p, firstWalName)) + if _, err = Create(p); err == nil || err != os.ErrExist { + t.Errorf("err = %v, want %v", err, os.ErrExist) + } +} + +func TestAppend(t *testing.T) { + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + + os.Create(path.Join(p, firstWalName)) + w, err := Open(p) + if err != nil { + t.Fatalf("err = %v, want nil", err) + } + if g := path.Base(w.f.Name()); g != firstWalName { + t.Errorf("name = %+v, want %+v", g, firstWalName) + } + w.Close() + + wname := fmt.Sprintf("%016x-%016x.wal", 2, 10) + os.Create(path.Join(p, wname)) + w, err = Open(p) + if err != nil { + t.Fatalf("err = %v, want nil", err) + } + if g := path.Base(w.f.Name()); g != wname { + t.Errorf("name = %+v, want %+v", g, wname) + } + w.Close() +} + +func TestAppendForUninitedDir(t *testing.T) { + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + + if _, err = Open(p); err != ErrNotFound { + t.Errorf("err = %v, want %v", err, ErrNotFound) + } +} + +func TestCut(t *testing.T) { + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + + w, err := Create(p) + if err != nil { + t.Fatal(err) + } + defer w.Close() + + if err := w.Cut(0); err != nil { + t.Fatal(err) + } + wname := fmt.Sprintf("%016x-%016x.wal", 1, 0) + if g := path.Base(w.f.Name()); g != wname { + t.Errorf("name = %s, want %s", g, wname) + } + + e := &raft.Entry{Type: 1, Index: 1, Term: 1, Data: []byte{1}} + if err := w.SaveEntry(e); err != nil { + t.Fatal(err) + } + if err := w.Cut(1); err != nil { + t.Fatal(err) + } + wname = fmt.Sprintf("%016x-%016x.wal", 2, 1) + if g := path.Base(w.f.Name()); g != wname { + t.Errorf("name = %s, want %s", g, wname) + } } func TestSaveEntry(t *testing.T) { - p := path.Join(os.TempDir(), "waltest") - w, err := New(p) + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + + w, err := Create(p) if err != nil { t.Fatal(err) } @@ -75,23 +163,23 @@ func TestSaveEntry(t *testing.T) { } w.Close() - b, err := ioutil.ReadFile(p) + b, err := ioutil.ReadFile(path.Join(p, firstWalName)) if err != nil { t.Fatal(err) } if !reflect.DeepEqual(b, entryRecord) { t.Errorf("ent = %q, want %q", b, entryRecord) } - - err = os.Remove(p) - if err != nil { - t.Fatal(err) - } } func TestSaveInfo(t *testing.T) { - p := path.Join(os.TempDir(), "waltest") - w, err := New(p) + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + + w, err := Create(p) if err != nil { t.Fatal(err) } @@ -116,23 +204,23 @@ func TestSaveInfo(t *testing.T) { } w.Close() - b, err := ioutil.ReadFile(p) + b, err := ioutil.ReadFile(path.Join(p, firstWalName)) if err != nil { t.Fatal(err) } if !reflect.DeepEqual(b, infoRecord) { t.Errorf("ent = %q, want %q", b, infoRecord) } - - err = os.Remove(p) - if err != nil { - t.Fatal(err) - } } func TestSaveState(t *testing.T) { - p := path.Join(os.TempDir(), "waltest") - w, err := New(p) + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + + w, err := Create(p) if err != nil { t.Fatal(err) } @@ -143,18 +231,13 @@ func TestSaveState(t *testing.T) { } w.Close() - b, err := ioutil.ReadFile(p) + b, err := ioutil.ReadFile(path.Join(p, firstWalName)) if err != nil { t.Fatal(err) } if !reflect.DeepEqual(b, stateRecord) { t.Errorf("ent = %q, want %q", b, stateRecord) } - - err = os.Remove(p) - if err != nil { - t.Fatal(err) - } } func TestLoadInfo(t *testing.T) { @@ -189,9 +272,14 @@ func TestLoadState(t *testing.T) { } } -func TestLoadNode(t *testing.T) { - p := path.Join(os.TempDir(), "waltest") - w, err := New(p) +func TestNodeLoad(t *testing.T) { + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + + w, err := Create(p) if err != nil { t.Fatal(err) } @@ -213,12 +301,8 @@ func TestLoadNode(t *testing.T) { } w.Close() - w, err = Open(p) - if err != nil { - t.Fatal(err) - } - n, err := w.LoadNode() - if err != nil { + n := newNode(0) + if err := n.load(path.Join(p, firstWalName)); err != nil { t.Fatal(err) } if n.Id != i.Id { @@ -232,9 +316,130 @@ func TestLoadNode(t *testing.T) { if !reflect.DeepEqual(n.State, s) { t.Errorf("state = %+v, want %+v", n.State, s) } +} - err = os.Remove(p) +func TestSearchIndex(t *testing.T) { + tests := []struct { + names []string + index int64 + widx int + wok bool + }{ + { + []string{ + "0000000000000000-0000000000000000.wal", + "0000000000000001-0000000000001000.wal", + "0000000000000002-0000000000002000.wal", + }, + 0x1000, 1, true, + }, + { + []string{ + "0000000000000001-0000000000004000.wal", + "0000000000000002-0000000000003000.wal", + "0000000000000003-0000000000005000.wal", + }, + 0x4000, 1, true, + }, + { + []string{ + "0000000000000001-0000000000002000.wal", + "0000000000000002-0000000000003000.wal", + "0000000000000003-0000000000005000.wal", + }, + 0x1000, -1, false, + }, + } + for i, tt := range tests { + idx, ok := searchIndex(tt.names, tt.index) + if idx != tt.widx { + t.Errorf("#%d: idx = %d, want %d", i, idx, tt.widx) + } + if ok != tt.wok { + t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok) + } + } +} + +func TestScanWalName(t *testing.T) { + tests := []struct { + str string + wseq, windex int64 + wok bool + }{ + {"0000000000000000-0000000000000000.wal", 0, 0, true}, + {"0000000000000000.wal", 0, 0, false}, + {"0000000000000000-0000000000000000.snap", 0, 0, false}, + } + for i, tt := range tests { + s, index, err := parseWalName(tt.str) + if g := err == nil; g != tt.wok { + t.Errorf("#%d: ok = %v, want %v", i, g, tt.wok) + } + if s != tt.wseq { + t.Errorf("#%d: seq = %d, want %d", i, s, tt.wseq) + } + if index != tt.windex { + t.Errorf("#%d: index = %d, want %d", i, index, tt.windex) + } + } +} + +func TestRead(t *testing.T) { + p, err := ioutil.TempDir(os.TempDir(), "waltest") if err != nil { t.Fatal(err) } + defer os.RemoveAll(p) + + w, err := Create(p) + if err != nil { + t.Fatal(err) + } + info := &raft.Info{Id: int64(0xBEEF)} + if err = w.SaveInfo(info); err != nil { + t.Fatal(err) + } + if err = w.Cut(0); err != nil { + t.Fatal(err) + } + for i := 1; i < 10; i++ { + e := raft.Entry{Index: int64(i)} + if err = w.SaveEntry(&e); err != nil { + t.Fatal(err) + } + if err = w.Cut(e.Index); err != nil { + t.Fatal(err) + } + if err = w.SaveInfo(info); err != nil { + t.Fatal(err) + } + } + w.Close() + + if err := os.Remove(path.Join(p, "0000000000000004-0000000000000003.wal")); err != nil { + t.Fatal(err) + } + + for i := 0; i < 15; i++ { + n, err := Read(p, int64(i)) + if i <= 3 || i >= 10 { + if err != ErrNotFound { + t.Errorf("#%d: err = %v, want %v", i, err, ErrNotFound) + } + continue + } + if err != nil { + t.Errorf("#%d: err = %v, want nil", i, err) + continue + } + if n.Id != info.Id { + t.Errorf("#%d: id = %d, want %d", n.Id, info.Id) + } + for j, e := range n.Ents { + if e.Index != int64(j+i+1) { + t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i+1) + } + } + } }