Merge pull request #1099 from unihorn/128

wal: OpenFromIndex fails if it cannot find previous index
This commit is contained in:
Yicheng Qin 2014-09-17 16:00:51 -07:00
commit df70f653a4
2 changed files with 45 additions and 11 deletions

View File

@ -42,10 +42,11 @@ const (
)
var (
ErrIDMismatch = errors.New("wal: unmatch id")
ErrNotFound = errors.New("wal: file is not found")
ErrCRCMismatch = errors.New("wal: crc mismatch")
crcTable = crc32.MakeTable(crc32.Castagnoli)
ErrIDMismatch = errors.New("wal: unmatch id")
ErrFileNotFound = errors.New("wal: file not found")
ErrIndexNotFound = errors.New("wal: index not found in file")
ErrCRCMismatch = errors.New("wal: crc mismatch")
crcTable = crc32.MakeTable(crc32.Castagnoli)
)
// WAL is a logical repersentation of the stable storage.
@ -94,7 +95,8 @@ func Create(dirpath string) (*WAL, error) {
}
// OpenAtIndex opens the WAL at the given index.
// The index MUST have been previously committed to the WAL.
// The index SHOULD have been previously committed to the WAL, or the following
// ReadAll will fail.
// The returned WAL is ready to read and the first record will be the given
// index. The WAL cannot be appended to before reading out all of its
// previous records.
@ -106,14 +108,14 @@ func OpenAtIndex(dirpath string, index int64) (*WAL, error) {
}
names = checkWalNames(names)
if len(names) == 0 {
return nil, ErrNotFound
return nil, ErrFileNotFound
}
sort.Sort(sort.StringSlice(names))
nameIndex, ok := searchIndex(names, index)
if !ok || !isValidSeq(names[nameIndex:]) {
return nil, ErrNotFound
return nil, ErrFileNotFound
}
// open the wal files for reading
@ -153,6 +155,7 @@ func OpenAtIndex(dirpath string, index int64) (*WAL, error) {
}
// ReadAll reads out all records of the current WAL.
// If it cannot read out the expected entry, it will return ErrIndexNotFound.
// After ReadAll, the WAL will be ready for appending new records.
func (w *WAL) ReadAll() (id int64, state raftpb.HardState, ents []raftpb.Entry, err error) {
rec := &walpb.Record{}
@ -193,6 +196,10 @@ func (w *WAL) ReadAll() (id int64, state raftpb.HardState, ents []raftpb.Entry,
state.Reset()
return 0, state, nil, err
}
if w.enti < w.ri {
state.Reset()
return 0, state, nil, ErrIndexNotFound
}
// close decoder, disable reading
w.decoder.close()

View File

@ -111,8 +111,8 @@ func TestOpenAtIndex(t *testing.T) {
t.Fatal(err)
}
defer os.RemoveAll(emptydir)
if _, err = OpenAtIndex(emptydir, 0); err != ErrNotFound {
t.Errorf("err = %v, want %v", err, ErrNotFound)
if _, err = OpenAtIndex(emptydir, 0); err != ErrFileNotFound {
t.Errorf("err = %v, want %v", err, ErrFileNotFound)
}
}
@ -315,8 +315,8 @@ func TestRecoverAfterCut(t *testing.T) {
w, err := OpenAtIndex(p, int64(i))
if err != nil {
if i <= 4 {
if err != ErrNotFound {
t.Errorf("#%d: err = %v, want %v", i, err, ErrNotFound)
if err != ErrFileNotFound {
t.Errorf("#%d: err = %v, want %v", i, err, ErrFileNotFound)
}
} else {
t.Errorf("#%d: err = %v, want nil", i, err)
@ -339,6 +339,33 @@ func TestRecoverAfterCut(t *testing.T) {
}
}
func TestOpenAtUncommittedIndex(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(p)
w, err := Create(p)
if err != nil {
t.Fatal(err)
}
if err := w.SaveEntry(&raftpb.Entry{Index: 0}); err != nil {
t.Fatal(err)
}
w.Close()
w, err = OpenAtIndex(p, 1)
if err != nil {
t.Fatal(err)
}
// commit up to index 0, try to read index 1
if _, _, _, err := w.ReadAll(); err != ErrIndexNotFound {
t.Errorf("err = %v, want %v", err, ErrIndexNotFound)
}
w.Close()
}
func TestSaveEmpty(t *testing.T) {
var buf bytes.Buffer
var est raftpb.HardState