Merge pull request #7634 from heyitsanthony/election-rpc

Election RPC service
This commit is contained in:
Anthony Romano 2017-04-07 20:03:09 -07:00 committed by GitHub
commit 25acdbf41b
11 changed files with 2520 additions and 12 deletions

View File

@ -19,6 +19,7 @@ import (
"fmt"
v3 "github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
)
@ -36,6 +37,7 @@ type Election struct {
leaderKey string
leaderRev int64
leaderSession *Session
hdr *pb.ResponseHeader
}
// NewElection returns a new election on a given key prefix.
@ -43,6 +45,16 @@ func NewElection(s *Session, pfx string) *Election {
return &Election{session: s, keyPrefix: pfx + "/"}
}
// ResumeElection initializes an election with a known leader.
func ResumeElection(s *Session, pfx string, leaderKey string, leaderRev int64) *Election {
return &Election{
session: s,
leaderKey: leaderKey,
leaderRev: leaderRev,
leaderSession: s,
}
}
// Campaign puts a value as eligible for the election. It blocks until
// it is elected, an error occurs, or the context is cancelled.
func (e *Election) Campaign(ctx context.Context, val string) error {
@ -80,6 +92,7 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
}
return err
}
e.hdr = resp.Header
return nil
}
@ -101,6 +114,8 @@ func (e *Election) Proclaim(ctx context.Context, val string) error {
e.leaderKey = ""
return ErrElectionNotLeader
}
e.hdr = tresp.Header
return nil
}
@ -110,23 +125,27 @@ func (e *Election) Resign(ctx context.Context) (err error) {
return nil
}
client := e.session.Client()
_, err = client.Delete(ctx, e.leaderKey)
cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
resp, err := client.Txn(ctx).If(cmp).Then(v3.OpDelete(e.leaderKey)).Commit()
if err == nil {
e.hdr = resp.Header
}
e.leaderKey = ""
e.leaderSession = nil
return err
}
// Leader returns the leader value for the current election.
func (e *Election) Leader(ctx context.Context) (string, error) {
func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error) {
client := e.session.Client()
resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
if err != nil {
return "", err
return nil, err
} else if len(resp.Kvs) == 0 {
// no leader currently elected
return "", ErrElectionNoLeader
return nil, ErrElectionNoLeader
}
return string(resp.Kvs[0].Value), nil
return resp, nil
}
// Observe returns a channel that observes all leader proposal values as
@ -142,20 +161,21 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
client := e.session.Client()
defer close(ch)
lastRev := int64(0)
for {
resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
opts := append(v3.WithFirstCreate(), v3.WithRev(lastRev))
resp, err := client.Get(ctx, e.keyPrefix, opts...)
if err != nil {
return
}
var kv *mvccpb.KeyValue
cctx, cancel := context.WithCancel(ctx)
if len(resp.Kvs) == 0 {
cctx, cancel := context.WithCancel(ctx)
// wait for first key put on prefix
opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}
wch := client.Watch(cctx, e.keyPrefix, opts...)
for kv == nil {
wr, ok := <-wch
if !ok || wr.Err() != nil {
@ -170,10 +190,12 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
}
}
}
cancel()
} else {
kv = resp.Kvs[0]
}
cctx, cancel := context.WithCancel(ctx)
wch := client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision))
keyDeleted := false
for !keyDeleted {
@ -183,6 +205,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
}
for _, ev := range wr.Events {
if ev.Type == mvccpb.DELETE {
lastRev = ev.Kv.ModRevision
keyDeleted = true
break
}
@ -201,3 +224,9 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
// Key returns the leader key if elected, empty string otherwise.
func (e *Election) Key() string { return e.leaderKey }
// Rev returns the leader key's creation revision, if elected.
func (e *Election) Rev() int64 { return e.leaderRev }
// Header is the response header from the last successful election proposal.
func (m *Election) Header() *pb.ResponseHeader { return m.hdr }

View File

@ -25,6 +25,8 @@ import (
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3client"
"github.com/coreos/etcd/etcdserver/api/v3election"
"github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
"github.com/coreos/etcd/etcdserver/api/v3lock"
"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
@ -66,10 +68,14 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
plog.Info("ready to serve client requests")
m := cmux.New(sctx.l)
v3c := v3client.New(s)
servElection := v3election.NewElectionServer(v3c)
servLock := v3lock.NewLockServer(v3c)
if sctx.insecure {
gs := v3rpc.Server(s, nil)
v3lockpb.RegisterLockServer(gs, v3lock.NewLockServer(v3client.New(s)))
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}
@ -97,7 +103,8 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
if sctx.secure {
gs := v3rpc.Server(s, tlscfg)
v3lockpb.RegisterLockServer(gs, v3lock.NewLockServer(v3client.New(s)))
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}

View File

@ -0,0 +1,16 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package v3election provides a v3 election service from an etcdserver.
package v3election

View File

@ -0,0 +1,123 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v3election
import (
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
epb "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
)
type electionServer struct {
c *clientv3.Client
}
func NewElectionServer(c *clientv3.Client) epb.ElectionServer {
return &electionServer{c}
}
func (es *electionServer) Campaign(ctx context.Context, req *epb.CampaignRequest) (*epb.CampaignResponse, error) {
s, err := es.session(ctx, req.Lease)
if err != nil {
return nil, err
}
e := concurrency.NewElection(s, string(req.Name))
if err = e.Campaign(ctx, string(req.Value)); err != nil {
return nil, err
}
return &epb.CampaignResponse{
Header: e.Header(),
Leader: &epb.LeaderKey{
Name: req.Name,
Key: []byte(e.Key()),
Rev: e.Rev(),
Lease: int64(s.Lease()),
},
}, nil
}
func (es *electionServer) Proclaim(ctx context.Context, req *epb.ProclaimRequest) (*epb.ProclaimResponse, error) {
s, err := es.session(ctx, req.Leader.Lease)
if err != nil {
return nil, err
}
e := concurrency.ResumeElection(s, string(req.Leader.Name), string(req.Leader.Key), req.Leader.Rev)
if err := e.Proclaim(ctx, string(req.Value)); err != nil {
return nil, err
}
return &epb.ProclaimResponse{Header: e.Header()}, nil
}
func (es *electionServer) Observe(req *epb.LeaderRequest, stream epb.Election_ObserveServer) error {
s, err := es.session(stream.Context(), -1)
if err != nil {
return err
}
e := concurrency.NewElection(s, string(req.Name))
ch := e.Observe(stream.Context())
for stream.Context().Err() == nil {
select {
case <-stream.Context().Done():
case resp, ok := <-ch:
if !ok {
return nil
}
lresp := &epb.LeaderResponse{Header: resp.Header, Kv: resp.Kvs[0]}
if err := stream.Send(lresp); err != nil {
return err
}
}
}
return stream.Context().Err()
}
func (es *electionServer) Leader(ctx context.Context, req *epb.LeaderRequest) (*epb.LeaderResponse, error) {
s, err := es.session(ctx, -1)
if err != nil {
return nil, err
}
l, lerr := concurrency.NewElection(s, string(req.Name)).Leader(ctx)
if lerr != nil {
return nil, lerr
}
return &epb.LeaderResponse{Header: l.Header, Kv: l.Kvs[0]}, nil
}
func (es *electionServer) Resign(ctx context.Context, req *epb.ResignRequest) (*epb.ResignResponse, error) {
s, err := es.session(ctx, req.Leader.Lease)
if err != nil {
return nil, err
}
e := concurrency.ResumeElection(s, string(req.Leader.Name), string(req.Leader.Key), req.Leader.Rev)
if err := e.Resign(ctx); err != nil {
return nil, err
}
return &epb.ResignResponse{Header: e.Header()}, nil
}
func (es *electionServer) session(ctx context.Context, lease int64) (*concurrency.Session, error) {
s, err := concurrency.NewSession(
es.c,
concurrency.WithLease(clientv3.LeaseID(lease)),
concurrency.WithContext(ctx),
)
if err != nil {
return nil, err
}
s.Orphan()
return s, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,119 @@
syntax = "proto3";
package v3electionpb;
import "gogoproto/gogo.proto";
import "etcd/etcdserver/etcdserverpb/rpc.proto";
import "etcd/mvcc/mvccpb/kv.proto";
// for grpc-gateway
import "google/api/annotations.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.unmarshaler_all) = true;
// The election service exposes client-side election facilities as a gRPC interface.
service Election {
// Campaign waits to acquire leadership in an election, returning a LeaderKey
// representing the leadership if successful. The LeaderKey can then be used
// to issue new values on the election, transactionally guard API requests on
// leadership still being held, and resign from the election.
rpc Campaign(CampaignRequest) returns (CampaignResponse) {
option (google.api.http) = {
post: "/v3alpha/election/campaign"
body: "*"
};
}
// Proclaim updates the leader's posted value with a new value.
rpc Proclaim(ProclaimRequest) returns (ProclaimResponse) {
option (google.api.http) = {
post: "/v3alpha/election/proclaim"
body: "*"
};
}
// Leader returns the current election proclamation, if any.
rpc Leader(LeaderRequest) returns (LeaderResponse) {
option (google.api.http) = {
post: "/v3alpha/election/leader"
body: "*"
};
}
// Observe streams election proclamations in-order as made by the election's
// elected leaders.
rpc Observe(LeaderRequest) returns (stream LeaderResponse) {
option (google.api.http) = {
post: "/v3alpha/election/observe"
body: "*"
};
}
// Resign releases election leadership so other campaigners may acquire
// leadership on the election.
rpc Resign(ResignRequest) returns (ResignResponse) {
option (google.api.http) = {
post: "/v3alpha/election/resign"
body: "*"
};
}
}
message CampaignRequest {
// name is the election's identifier for the campaign.
bytes name = 1;
// lease is the ID of the lease attached to leadership of the election. If the
// lease expires or is revoked before resigning leadership, then the
// leadership is transferred to the next campaigner, if any.
int64 lease = 2;
// value is the initial proclaimed value set when the campaigner wins the
// election.
bytes value = 3;
}
message CampaignResponse {
etcdserverpb.ResponseHeader header = 1;
// leader describes the resources used for holding leadereship of the election.
LeaderKey leader = 2;
}
message LeaderKey {
// name is the election identifier that correponds to the leadership key.
bytes name = 1;
// key is an opaque key representing the ownership of the election. If the key
// is deleted, then leadership is lost.
bytes key = 2;
// rev is the creation revision of the key. It can be used to test for ownership
// of an election during transactions by testing the key's creation revision
// matches rev.
int64 rev = 3;
// lease is the lease ID of the election leader.
int64 lease = 4;
}
message LeaderRequest {
// name is the election identifier for the leadership information.
bytes name = 1;
}
message LeaderResponse {
etcdserverpb.ResponseHeader header = 1;
// kv is the key-value pair representing the latest leader update.
mvccpb.KeyValue kv = 2;
}
message ResignRequest {
// leader is the leadership to relinquish by resignation.
LeaderKey leader = 1;
}
message ResignResponse {
etcdserverpb.ResponseHeader header = 1;
}
message ProclaimRequest {
// leader is the leadership hold on the election.
LeaderKey leader = 1;
// value is an update meant to overwrite the leader's current value.
bytes value = 2;
}
message ProclaimResponse {
etcdserverpb.ResponseHeader header = 1;
}

View File

@ -171,6 +171,11 @@ func (sws *serverWatchStream) recvLoop() error {
// \x00 is the smallest key
creq.Key = []byte{0}
}
if len(creq.RangeEnd) == 0 {
// force nil since watchstream.Watch distinguishes
// between nil and []byte{} for single key / >=
creq.RangeEnd = nil
}
if len(creq.RangeEnd) == 1 && creq.RangeEnd[0] == 0 {
// support >= key queries
creq.RangeEnd = []byte{}

View File

@ -39,6 +39,8 @@ import (
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v2http"
"github.com/coreos/etcd/etcdserver/api/v3client"
"github.com/coreos/etcd/etcdserver/api/v3election"
epb "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
"github.com/coreos/etcd/etcdserver/api/v3lock"
lockpb "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
@ -668,6 +670,7 @@ func (m *member) Launch() error {
m.grpcServer = v3rpc.Server(m.s, tlscfg)
m.serverClient = v3client.New(m.s)
lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient))
go m.grpcServer.Serve(m.grpcListener)
}

View File

@ -0,0 +1,173 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"fmt"
"testing"
"time"
epb "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/testutil"
"golang.org/x/net/context"
)
// TestV3ElectionCampaign checks that Campaign will not give
// simultaneous leadership to multiple campaigners.
func TestV3ElectionCampaign(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
lease1, err1 := toGRPC(clus.RandClient()).Lease.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
if err1 != nil {
t.Fatal(err1)
}
lease2, err2 := toGRPC(clus.RandClient()).Lease.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
if err2 != nil {
t.Fatal(err2)
}
lc := epb.NewElectionClient(clus.Client(0).ActiveConnection())
req1 := &epb.CampaignRequest{Name: []byte("foo"), Lease: lease1.ID, Value: []byte("abc")}
l1, lerr1 := lc.Campaign(context.TODO(), req1)
if lerr1 != nil {
t.Fatal(lerr1)
}
campaignc := make(chan struct{})
go func() {
defer close(campaignc)
req2 := &epb.CampaignRequest{Name: []byte("foo"), Lease: lease2.ID, Value: []byte("def")}
l2, lerr2 := lc.Campaign(context.TODO(), req2)
if lerr2 != nil {
t.Fatal(lerr2)
}
if l1.Header.Revision >= l2.Header.Revision {
t.Fatalf("expected l1 revision < l2 revision, got %d >= %d", l1.Header.Revision, l2.Header.Revision)
}
}()
select {
case <-time.After(200 * time.Millisecond):
case <-campaignc:
t.Fatalf("got leadership before resign")
}
if _, uerr := lc.Resign(context.TODO(), &epb.ResignRequest{Leader: l1.Leader}); uerr != nil {
t.Fatal(uerr)
}
select {
case <-time.After(200 * time.Millisecond):
t.Fatalf("campaigner unelected after resign")
case <-campaignc:
}
lval, lverr := lc.Leader(context.TODO(), &epb.LeaderRequest{Name: []byte("foo")})
if lverr != nil {
t.Fatal(lverr)
}
if string(lval.Kv.Value) != "def" {
t.Fatalf("got election value %q, expected %q", string(lval.Kv.Value), "def")
}
}
// TestV3ElectionObserve checks that an Observe stream receives
// proclamations from different leaders uninterrupted.
func TestV3ElectionObserve(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
lc := epb.NewElectionClient(clus.Client(0).ActiveConnection())
// observe 10 leadership events
observec := make(chan struct{})
go func() {
defer close(observec)
s, err := lc.Observe(context.Background(), &epb.LeaderRequest{Name: []byte("foo")})
observec <- struct{}{}
if err != nil {
t.Fatal(err)
}
for i := 0; i < 10; i++ {
resp, rerr := s.Recv()
if rerr != nil {
t.Fatal(rerr)
}
if string(resp.Kv.Value) != fmt.Sprintf("%d", i) {
t.Fatalf(`got observe value %q, expected "%d"`, string(resp.Kv.Value), i)
}
}
}()
select {
case <-observec:
case <-time.After(time.Second):
t.Fatalf("observe stream took too long to start")
}
lease1, err1 := toGRPC(clus.RandClient()).Lease.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
if err1 != nil {
t.Fatal(err1)
}
c1, cerr1 := lc.Campaign(context.TODO(), &epb.CampaignRequest{Name: []byte("foo"), Lease: lease1.ID, Value: []byte("0")})
if cerr1 != nil {
t.Fatal(cerr1)
}
// overlap other leader so it waits on resign
leader2c := make(chan struct{})
go func() {
defer close(leader2c)
lease2, err2 := toGRPC(clus.RandClient()).Lease.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
if err2 != nil {
t.Fatal(err2)
}
c2, cerr2 := lc.Campaign(context.TODO(), &epb.CampaignRequest{Name: []byte("foo"), Lease: lease2.ID, Value: []byte("5")})
if cerr2 != nil {
t.Fatal(cerr2)
}
for i := 6; i < 10; i++ {
v := []byte(fmt.Sprintf("%d", i))
req := &epb.ProclaimRequest{Leader: c2.Leader, Value: v}
if _, err := lc.Proclaim(context.TODO(), req); err != nil {
t.Fatal(err)
}
}
}()
for i := 1; i < 5; i++ {
v := []byte(fmt.Sprintf("%d", i))
req := &epb.ProclaimRequest{Leader: c1.Leader, Value: v}
if _, err := lc.Proclaim(context.TODO(), req); err != nil {
t.Fatal(err)
}
}
// start second leader
lc.Resign(context.TODO(), &epb.ResignRequest{Leader: c1.Leader})
select {
case <-observec:
case <-time.After(time.Second):
t.Fatalf("observe did not observe all events in time")
}
<-leader2c
}

View File

@ -17,7 +17,7 @@ if ! [[ $(protoc --version) =~ "3.2.0" ]]; then
fi
# directories containing protos to be built
DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./mvcc/mvccpb ./lease/leasepb ./auth/authpb ./etcdserver/api/v3lock/v3lockpb"
DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./mvcc/mvccpb ./lease/leasepb ./auth/authpb ./etcdserver/api/v3lock/v3lockpb ./etcdserver/api/v3election/v3electionpb"
# exact version of protoc-gen-gogo to build
GOGO_PROTO_SHA="8d70fb3182befc465c4a1eac8ad4d38ff49778e2"

View File

@ -94,7 +94,7 @@ func runElectionFunc(cmd *cobra.Command, args []string) {
}
}
rcs[i].validate = func() error {
if l, err := e.Leader(context.TODO()); err == nil && l != observedLeader {
if l, err := e.Leader(context.TODO()); err == nil && string(l.Kvs[0].Value) != observedLeader {
return fmt.Errorf("expected leader %q, got %q", observedLeader, l)
}
validatec <- struct{}{}