etcd/tests/integration/v3_election_test.go
Piotr Tabor 5ddabfdb24 tests: Make tests operate in /tmp director instead of src.
Thanks to this, unix sockets should be not longer
created by integration tests in the the source code directory,
so potentially trigger IDE reloads and unnecessery load (and mess).
2021-03-09 18:19:52 +01:00

319 lines
8.4 KiB
Go

// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"context"
"fmt"
"testing"
"time"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
// TestElectionWait tests if followers can correctly wait for elections.
func TestElectionWait(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
leaders := 3
followers := 3
var clients []*clientv3.Client
newClient := MakeMultiNodeClients(t, clus, &clients)
defer func() {
CloseClients(t, clients)
}()
electedc := make(chan string)
nextc := []chan struct{}{}
// wait for all elections
donec := make(chan struct{})
for i := 0; i < followers; i++ {
nextc = append(nextc, make(chan struct{}))
go func(ch chan struct{}) {
for j := 0; j < leaders; j++ {
session, err := concurrency.NewSession(newClient())
if err != nil {
t.Error(err)
}
b := concurrency.NewElection(session, "test-election")
cctx, cancel := context.WithCancel(context.TODO())
defer cancel()
s, ok := <-b.Observe(cctx)
if !ok {
t.Errorf("could not observe election; channel closed")
}
electedc <- string(s.Kvs[0].Value)
// wait for next election round
<-ch
session.Orphan()
}
donec <- struct{}{}
}(nextc[i])
}
// elect some leaders
for i := 0; i < leaders; i++ {
go func() {
session, err := concurrency.NewSession(newClient())
if err != nil {
t.Error(err)
}
defer session.Orphan()
e := concurrency.NewElection(session, "test-election")
ev := fmt.Sprintf("electval-%v", time.Now().UnixNano())
if err := e.Campaign(context.TODO(), ev); err != nil {
t.Errorf("failed volunteer (%v)", err)
}
// wait for followers to accept leadership
for j := 0; j < followers; j++ {
s := <-electedc
if s != ev {
t.Errorf("wrong election value got %s, wanted %s", s, ev)
}
}
// let next leader take over
if err := e.Resign(context.TODO()); err != nil {
t.Errorf("failed resign (%v)", err)
}
// tell followers to start listening for next leader
for j := 0; j < followers; j++ {
nextc[j] <- struct{}{}
}
}()
}
// wait on followers
for i := 0; i < followers; i++ {
<-donec
}
}
// TestElectionFailover tests that an election will
func TestElectionFailover(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
cctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ss := make([]*concurrency.Session, 3)
for i := 0; i < 3; i++ {
var err error
ss[i], err = concurrency.NewSession(clus.clients[i])
if err != nil {
t.Error(err)
}
defer ss[i].Orphan()
}
// first leader (elected)
e := concurrency.NewElection(ss[0], "test-election")
if err := e.Campaign(context.TODO(), "foo"); err != nil {
t.Fatalf("failed volunteer (%v)", err)
}
// check first leader
resp, ok := <-e.Observe(cctx)
if !ok {
t.Fatalf("could not wait for first election; channel closed")
}
s := string(resp.Kvs[0].Value)
if s != "foo" {
t.Fatalf("wrong election result. got %s, wanted foo", s)
}
// next leader
electedErrC := make(chan error, 1)
go func() {
ee := concurrency.NewElection(ss[1], "test-election")
eer := ee.Campaign(context.TODO(), "bar")
electedErrC <- eer // If eer != nil, the test will fail by calling t.Fatal(eer)
}()
// invoke leader failover
if err := ss[0].Close(); err != nil {
t.Fatal(err)
}
// check new leader
e = concurrency.NewElection(ss[2], "test-election")
resp, ok = <-e.Observe(cctx)
if !ok {
t.Fatalf("could not wait for second election; channel closed")
}
s = string(resp.Kvs[0].Value)
if s != "bar" {
t.Fatalf("wrong election result. got %s, wanted bar", s)
}
// leader must ack election (otherwise, Campaign may see closed conn)
eer := <-electedErrC
if eer != nil {
t.Fatal(eer)
}
}
// TestElectionSessionRelock ensures that campaigning twice on the same election
// with the same lock will Proclaim instead of deadlocking.
func TestElectionSessionRecampaign(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.RandClient()
session, err := concurrency.NewSession(cli)
if err != nil {
t.Error(err)
}
defer session.Orphan()
e := concurrency.NewElection(session, "test-elect")
if err := e.Campaign(context.TODO(), "abc"); err != nil {
t.Fatal(err)
}
e2 := concurrency.NewElection(session, "test-elect")
if err := e2.Campaign(context.TODO(), "def"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
if resp := <-e.Observe(ctx); len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) != "def" {
t.Fatalf("expected value=%q, got response %v", "def", resp)
}
}
// TestElectionOnPrefixOfExistingKey checks that a single
// candidate can be elected on a new key that is a prefix
// of an existing key. To wit, check for regression
// of bug #6278. https://github.com/etcd-io/etcd/issues/6278
//
func TestElectionOnPrefixOfExistingKey(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.RandClient()
if _, err := cli.Put(context.TODO(), "testa", "value"); err != nil {
t.Fatal(err)
}
s, serr := concurrency.NewSession(cli)
if serr != nil {
t.Fatal(serr)
}
e := concurrency.NewElection(s, "test")
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
err := e.Campaign(ctx, "abc")
cancel()
if err != nil {
// after 5 seconds, deadlock results in
// 'context deadline exceeded' here.
t.Fatal(err)
}
}
// TestElectionOnSessionRestart tests that a quick restart of leader (resulting
// in a new session with the same lease id) does not result in loss of
// leadership.
func TestElectionOnSessionRestart(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.RandClient()
session, err := concurrency.NewSession(cli)
if err != nil {
t.Fatal(err)
}
e := concurrency.NewElection(session, "test-elect")
if cerr := e.Campaign(context.TODO(), "abc"); cerr != nil {
t.Fatal(cerr)
}
// ensure leader is not lost to waiter on fail-over
waitSession, werr := concurrency.NewSession(cli)
if werr != nil {
t.Fatal(werr)
}
defer waitSession.Orphan()
waitCtx, waitCancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer waitCancel()
go concurrency.NewElection(waitSession, "test-elect").Campaign(waitCtx, "123")
// simulate restart by reusing the lease from the old session
newSession, nerr := concurrency.NewSession(cli, concurrency.WithLease(session.Lease()))
if nerr != nil {
t.Fatal(nerr)
}
defer newSession.Orphan()
newElection := concurrency.NewElection(newSession, "test-elect")
if ncerr := newElection.Campaign(context.TODO(), "def"); ncerr != nil {
t.Fatal(ncerr)
}
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
if resp := <-newElection.Observe(ctx); len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) != "def" {
t.Errorf("expected value=%q, got response %v", "def", resp)
}
}
// TestElectionObserveCompacted checks that observe can tolerate
// a leader key with a modrev less than the compaction revision.
func TestElectionObserveCompacted(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.Client(0)
session, err := concurrency.NewSession(cli)
if err != nil {
t.Fatal(err)
}
defer session.Orphan()
e := concurrency.NewElection(session, "test-elect")
if cerr := e.Campaign(context.TODO(), "abc"); cerr != nil {
t.Fatal(cerr)
}
presp, perr := cli.Put(context.TODO(), "foo", "bar")
if perr != nil {
t.Fatal(perr)
}
if _, cerr := cli.Compact(context.TODO(), presp.Header.Revision); cerr != nil {
t.Fatal(cerr)
}
v, ok := <-e.Observe(context.TODO())
if !ok {
t.Fatal("failed to observe on compacted revision")
}
if string(v.Kvs[0].Value) != "abc" {
t.Fatalf(`expected leader value "abc", got %q`, string(v.Kvs[0].Value))
}
}