wal: fix ReleaseLockTo

ReleaseLockTo should not release the lock on the WAL
segment that is right before the given index. When
restarting etcd, etcd needs to read from the WAL segment
that has a smaller index than the snapshot index.

The correct behavior is that ReleaseLockTo releases
the locks w is holding so that w only holds one lock
that has an index smaller than the given index.
This commit is contained in:
Xiang Li 2015-03-09 19:06:26 -07:00 committed by Yicheng Qin
parent 78e0149f41
commit e13b09e4d9
2 changed files with 67 additions and 17 deletions

View File

@ -320,27 +320,34 @@ func (w *WAL) sync() error {
return w.f.Sync()
}
// ReleaseLockTo releases the locks w is holding, which
// have index smaller or equal to the given index.
// ReleaseLockTo releases the locks, which has smaller index than the given index
// except the largest one among them.
// For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release
// lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4.
func (w *WAL) ReleaseLockTo(index uint64) error {
for _, l := range w.locks {
_, i, err := parseWalName(path.Base(l.Name()))
var smaller int
for i, l := range w.locks {
_, lockIndex, err := parseWalName(path.Base(l.Name()))
if err != nil {
return err
}
if i > index {
return nil
if lockIndex >= index {
smaller = i - 1
break
}
err = l.Unlock()
if err != nil {
return err
}
err = l.Destroy()
if err != nil {
return err
}
w.locks = w.locks[1:]
}
if smaller <= 0 {
return nil
}
for i := 0; i < smaller; i++ {
w.locks[i].Unlock()
w.locks[i].Destroy()
}
w.locks = w.locks[smaller:]
return nil
}

View File

@ -435,6 +435,7 @@ func TestOpenNotInUse(t *testing.T) {
unlockIndex := uint64(5)
w.ReleaseLockTo(unlockIndex)
// 1,2,3 are avaliable.
w2, err := OpenNotInUse(p, walpb.Snapshot{})
defer w2.Close()
if err != nil {
@ -444,8 +445,8 @@ func TestOpenNotInUse(t *testing.T) {
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
if g := ents[len(ents)-1].Index; g != unlockIndex {
t.Errorf("last index read = %d, want %d", g, unlockIndex)
if g := ents[len(ents)-1].Index; g != unlockIndex-2 {
t.Errorf("last index read = %d, want %d", g, unlockIndex-2)
}
}
@ -462,3 +463,45 @@ func TestSaveEmpty(t *testing.T) {
t.Errorf("buf.Bytes = %d, want 0", len(buf.Bytes()))
}
}
func TestReleaseLockTo(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(p)
// create WAL
w, err := Create(p, nil)
defer w.Close()
if err != nil {
t.Fatal(err)
}
// make 10 seperate files
for i := 0; i < 10; i++ {
es := []raftpb.Entry{{Index: uint64(i)}}
if err = w.Save(raftpb.HardState{}, es); err != nil {
t.Fatal(err)
}
if err = w.Cut(); err != nil {
t.Fatal(err)
}
}
// release the lock to 5
unlockIndex := uint64(5)
w.ReleaseLockTo(unlockIndex)
// expected remaining are 4,5,6,7,8,9,10
if len(w.locks) != 7 {
t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 7)
}
for i, l := range w.locks {
_, lockIndex, err := parseWalName(path.Base(l.Name()))
if err != nil {
t.Fatal(err)
}
if lockIndex != uint64(i+4) {
t.Errorf("#%d: lockindex = %d, want %d", i, lockIndex, uint64(i+4))
}
}
}