tests: Cover periodic check in tests

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2022-07-25 13:03:11 +02:00
parent 4a75e3d52d
commit a56ec0be4b
5 changed files with 232 additions and 2 deletions

View File

@ -178,6 +178,7 @@ type etcdProcessClusterConfig struct {
logLevel string
MaxConcurrentStreams uint32 // default is math.MaxUint32
CorruptCheckTime time.Duration
}
// newEtcdProcessCluster launches a new cluster from etcd processes, returning
@ -326,6 +327,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams))
}
if cfg.CorruptCheckTime != 0 {
args = append(args, "--experimental-corrupt-check-time", fmt.Sprintf("%s", cfg.CorruptCheckTime))
}
etcdCfgs[i] = &etcdServerProcessConfig{
lg: lg,
execPath: cfg.execPath,

View File

@ -20,13 +20,13 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/api/v3/etcdserverpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/datadir"
"go.etcd.io/etcd/server/v3/storage/mvcc/testutil"
)
// TODO: test with embedded etcd in integration package
func TestEtcdCorruptHash(t *testing.T) {
// oldenv := os.Getenv("EXPECT_DEBUG")
// defer os.Setenv("EXPECT_DEBUG", oldenv)
@ -94,3 +94,40 @@ func corruptTest(cx ctlCtx) {
// restarting corrupted member should fail
waitReadyExpectProc(proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)})
}
func TestPeriodicCheckDetectsCorruption(t *testing.T) {
checkTime := time.Second
BeforeTest(t)
epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{
clusterSize: 3,
keepDataDir: true,
CorruptCheckTime: time.Second,
})
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
t.Cleanup(func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
})
cc := NewEtcdctl(epc.EndpointsV3())
for i := 0; i < 10; i++ {
err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i))
assert.NoError(t, err, "error on put")
}
epc.procs[0].Stop()
err = testutil.CorruptBBolt(datadir.ToBackendFileName(epc.procs[0].Config().dataDirPath))
assert.NoError(t, err)
err = epc.procs[0].Restart()
assert.NoError(t, err)
time.Sleep(checkTime * 11 / 10)
alarmResponse, err := cc.AlarmList()
assert.NoError(t, err, "error on alarm list")
// TODO: Investigate why MemberID is 0?
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: 0}}, alarmResponse.Alarms)
}

83
tests/e2e/etcdctl.go Normal file
View File

@ -0,0 +1,83 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"encoding/json"
"fmt"
"strings"
clientv3 "go.etcd.io/etcd/client/v3"
)
type EtcdctlV3 struct {
endpoints []string
}
func NewEtcdctl(endpoints []string) *EtcdctlV3 {
return &EtcdctlV3{
endpoints: endpoints,
}
}
func (ctl *EtcdctlV3) Put(key, value string) error {
args := ctl.cmdArgs()
args = append(args, "put", key, value)
return spawnWithExpect(args, "OK")
}
func (ctl *EtcdctlV3) AlarmList() (*clientv3.AlarmResponse, error) {
var resp clientv3.AlarmResponse
err := ctl.spawnJsonCmd(&resp, "alarm", "list")
return &resp, err
}
func (ctl *EtcdctlV3) MemberList() (*clientv3.MemberListResponse, error) {
var resp clientv3.MemberListResponse
err := ctl.spawnJsonCmd(&resp, "member", "list")
return &resp, err
}
func (ctl *EtcdctlV3) Compact(rev int64) (*clientv3.CompactResponse, error) {
args := ctl.cmdArgs("compact", fmt.Sprint(rev))
return nil, spawnWithExpect(args, fmt.Sprintf("compacted revision %v", rev))
}
func (ctl *EtcdctlV3) spawnJsonCmd(output interface{}, args ...string) error {
args = append(args, "-w", "json")
cmd, err := spawnCmd(append(ctl.cmdArgs(), args...), nil)
if err != nil {
return err
}
line, err := cmd.Expect("header")
if err != nil {
return err
}
return json.Unmarshal([]byte(line), output)
}
func (ctl *EtcdctlV3) cmdArgs(args ...string) []string {
cmdArgs := []string{ctlBinPath + "3"}
for k, v := range ctl.flags() {
cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v))
}
return append(cmdArgs, args...)
}
func (ctl *EtcdctlV3) flags() map[string]string {
fmap := make(map[string]string)
fmap["endpoints"] = strings.Join(ctl.endpoints, ",")
return fmap
}

View File

@ -613,6 +613,10 @@ type member struct {
func (m *member) GRPCURL() string { return m.grpcURL }
func (m *member) CorruptionChecker() etcdserver.CorruptionChecker {
return m.s.CorruptionChecker()
}
type memberConfig struct {
name string
uniqNumber int64

View File

@ -0,0 +1,101 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/api/v3/etcdserverpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/storage/mvcc/testutil"
)
func TestPeriodicCheck(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
cc, err := clus.ClusterClient()
require.NoError(t, err)
ctx := context.Background()
var totalRevisions int64 = 1210
var rev int64
for ; rev < totalRevisions; rev += testutil.CompactionCycle {
testPeriodicCheck(ctx, t, cc, clus, rev, rev+testutil.CompactionCycle)
}
testPeriodicCheck(ctx, t, cc, clus, rev, rev+totalRevisions)
alarmResponse, err := cc.AlarmList(ctx)
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember(nil), alarmResponse.Alarms)
}
func testPeriodicCheck(ctx context.Context, t *testing.T, cc *clientv3.Client, clus *ClusterV3, start, stop int64) {
for i := start; i <= stop; i++ {
if i%67 == 0 {
_, err := cc.Delete(ctx, testutil.PickKey(i+83))
assert.NoError(t, err, "error on delete")
} else {
_, err := cc.Put(ctx, testutil.PickKey(i), fmt.Sprint(i))
assert.NoError(t, err, "error on put")
}
}
err := clus.Members[0].CorruptionChecker().PeriodicCheck()
assert.NoError(t, err, "error on periodic check (rev %v)", stop)
}
func TestPeriodicCheckDetectsCorruption(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
cc, err := clus.ClusterClient()
require.NoError(t, err)
ctx := context.Background()
for i := 0; i < 10; i++ {
_, err := cc.Put(ctx, testutil.PickKey(int64(i)), fmt.Sprint(i))
assert.NoError(t, err, "error on put")
}
err = clus.Members[0].CorruptionChecker().PeriodicCheck()
assert.NoError(t, err, "error on periodic check")
clus.Members[0].Stop(t)
clus.WaitLeader(t)
err = testutil.CorruptBBolt(clus.Members[0].BackendPath())
assert.NoError(t, err)
err = clus.Members[0].Restart(t)
assert.NoError(t, err)
time.Sleep(50 * time.Millisecond)
leader := clus.WaitLeader(t)
err = clus.Members[leader].CorruptionChecker().PeriodicCheck()
assert.NoError(t, err, "error on periodic check")
time.Sleep(50 * time.Millisecond)
alarmResponse, err := cc.AlarmList(ctx)
assert.NoError(t, err, "error on alarm list")
// TODO: Investigate why MemberID is 0?
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: 0}}, alarmResponse.Alarms)
}