mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

This exists to prevent sending too many requests that would lead into applier falling behind Raft accepting-proposal. Based on recent benchmarks, etcd was able to process high workloads (2 million writes with 1K concurrent clients). The limit 1000 is too conservative to test those high workloads.
855 lines
24 KiB
Go
855 lines
24 KiB
Go
// Copyright 2015 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package etcdserver
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"io"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/coreos/etcd/auth"
|
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
"github.com/coreos/etcd/etcdserver/membership"
|
|
"github.com/coreos/etcd/lease"
|
|
"github.com/coreos/etcd/lease/leasehttp"
|
|
"github.com/coreos/etcd/lease/leasepb"
|
|
"github.com/coreos/etcd/mvcc"
|
|
"github.com/coreos/etcd/raft"
|
|
|
|
"github.com/coreos/go-semver/semver"
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/grpc/metadata"
|
|
)
|
|
|
|
const (
|
|
// the max request size that raft accepts.
|
|
// TODO: make this a flag? But we probably do not want to
|
|
// accept large request which might block raft stream. User
|
|
// specify a large value might end up with shooting in the foot.
|
|
maxRequestBytes = 1.5 * 1024 * 1024
|
|
|
|
// In the health case, there might be a small gap (10s of entries) between
|
|
// the applied index and committed index.
|
|
// However, if the committed entries are very heavy to apply, the gap might grow.
|
|
// We should stop accepting new proposals if the gap growing to a certain point.
|
|
maxGapBetweenApplyAndCommitIndex = 5000
|
|
)
|
|
|
|
var (
|
|
newRangeClusterVersion = *semver.Must(semver.NewVersion("3.1.0"))
|
|
)
|
|
|
|
type RaftKV interface {
|
|
Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
|
|
Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
|
|
DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
|
|
Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
|
|
Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
|
|
}
|
|
|
|
type Lessor interface {
|
|
// LeaseGrant sends LeaseGrant request to raft and apply it after committed.
|
|
LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
|
|
// LeaseRevoke sends LeaseRevoke request to raft and apply it after committed.
|
|
LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
|
|
|
|
// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
|
|
// is returned.
|
|
LeaseRenew(id lease.LeaseID) (int64, error)
|
|
|
|
// LeaseTimeToLive retrieves lease information.
|
|
LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
|
|
}
|
|
|
|
type Authenticator interface {
|
|
AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error)
|
|
AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error)
|
|
Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error)
|
|
UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
|
|
UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error)
|
|
UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error)
|
|
UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error)
|
|
UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error)
|
|
UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error)
|
|
RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error)
|
|
RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error)
|
|
RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error)
|
|
RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error)
|
|
RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
|
|
UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
|
|
RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
|
|
}
|
|
|
|
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
|
// TODO: remove this checking when we release etcd 3.2
|
|
if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) {
|
|
return s.legacyRange(ctx, r)
|
|
}
|
|
|
|
if !r.Serializable {
|
|
err := s.linearizableReadNotify(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
var resp *pb.RangeResponse
|
|
var err error
|
|
chk := func(ai *auth.AuthInfo) error {
|
|
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
|
|
}
|
|
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
|
|
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
|
return nil, serr
|
|
}
|
|
return resp, err
|
|
}
|
|
|
|
// TODO: remove this func when we release etcd 3.2
|
|
func (s *EtcdServer) legacyRange(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
|
if r.Serializable {
|
|
var resp *pb.RangeResponse
|
|
var err error
|
|
chk := func(ai *auth.AuthInfo) error {
|
|
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
|
|
}
|
|
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
|
|
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
|
return nil, serr
|
|
}
|
|
return resp, err
|
|
}
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.RangeResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Put: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.PutResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.DeleteRangeResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
// TODO: remove this checking when we release etcd 3.2
|
|
if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) {
|
|
return s.legacyTxn(ctx, r)
|
|
}
|
|
|
|
if isTxnReadonly(r) {
|
|
if !isTxnSerializable(r) {
|
|
err := s.linearizableReadNotify(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
var resp *pb.TxnResponse
|
|
var err error
|
|
chk := func(ai *auth.AuthInfo) error {
|
|
return checkTxnAuth(s.authStore, ai, r)
|
|
}
|
|
get := func() { resp, err = s.applyV3Base.Txn(r) }
|
|
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
|
return nil, serr
|
|
}
|
|
return resp, err
|
|
}
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.TxnResponse), nil
|
|
}
|
|
|
|
// TODO: remove this func when we release etcd 3.2
|
|
func (s *EtcdServer) legacyTxn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
if isTxnSerializable(r) {
|
|
var resp *pb.TxnResponse
|
|
var err error
|
|
chk := func(ai *auth.AuthInfo) error {
|
|
return checkTxnAuth(s.authStore, ai, r)
|
|
}
|
|
get := func() { resp, err = s.applyV3Base.Txn(r) }
|
|
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
|
return nil, serr
|
|
}
|
|
return resp, err
|
|
}
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.TxnResponse), nil
|
|
}
|
|
|
|
func isTxnSerializable(r *pb.TxnRequest) bool {
|
|
for _, u := range r.Success {
|
|
if r := u.GetRequestRange(); r == nil || !r.Serializable {
|
|
return false
|
|
}
|
|
}
|
|
for _, u := range r.Failure {
|
|
if r := u.GetRequestRange(); r == nil || !r.Serializable {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func isTxnReadonly(r *pb.TxnRequest) bool {
|
|
for _, u := range r.Success {
|
|
if r := u.GetRequestRange(); r == nil {
|
|
return false
|
|
}
|
|
}
|
|
for _, u := range r.Failure {
|
|
if r := u.GetRequestRange(); r == nil {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
|
|
result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
|
|
if r.Physical && result != nil && result.physc != nil {
|
|
<-result.physc
|
|
// The compaction is done deleting keys; the hash is now settled
|
|
// but the data is not necessarily committed. If there's a crash,
|
|
// the hash may revert to a hash prior to compaction completing
|
|
// if the compaction resumes. Force the finished compaction to
|
|
// commit so it won't resume following a crash.
|
|
s.be.ForceCommit()
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
resp := result.resp.(*pb.CompactionResponse)
|
|
if resp == nil {
|
|
resp = &pb.CompactionResponse{}
|
|
}
|
|
if resp.Header == nil {
|
|
resp.Header = &pb.ResponseHeader{}
|
|
}
|
|
resp.Header.Revision = s.kv.Rev()
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
|
|
// no id given? choose one
|
|
for r.ID == int64(lease.NoLease) {
|
|
// only use positive int64 id's
|
|
r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1))
|
|
}
|
|
result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{LeaseGrant: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.LeaseGrantResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
|
|
result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.LeaseRevokeResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
|
|
ttl, err := s.lessor.Renew(id)
|
|
if err == nil { // already requested to primary lessor(leader)
|
|
return ttl, nil
|
|
}
|
|
if err != lease.ErrNotPrimary {
|
|
return -1, err
|
|
}
|
|
|
|
// renewals don't go through raft; forward to leader manually
|
|
leader, err := s.waitLeader()
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
for _, url := range leader.PeerURLs {
|
|
lurl := url + leasehttp.LeasePrefix
|
|
ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout())
|
|
if err == nil {
|
|
break
|
|
}
|
|
err = convertEOFToNoLeader(err)
|
|
}
|
|
return ttl, err
|
|
}
|
|
|
|
func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
|
|
if s.Leader() == s.ID() {
|
|
// primary; timetolive directly from leader
|
|
le := s.lessor.Lookup(lease.LeaseID(r.ID))
|
|
if le == nil {
|
|
return nil, lease.ErrLeaseNotFound
|
|
}
|
|
// TODO: fill out ResponseHeader
|
|
resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64(le.Remaining().Seconds()), GrantedTTL: le.TTL()}
|
|
if r.Keys {
|
|
ks := le.Keys()
|
|
kbs := make([][]byte, len(ks))
|
|
for i := range ks {
|
|
kbs[i] = []byte(ks[i])
|
|
}
|
|
resp.Keys = kbs
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// manually request to leader
|
|
leader, err := s.waitLeader()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, url := range leader.PeerURLs {
|
|
lurl := url + leasehttp.LeaseInternalPrefix
|
|
var iresp *leasepb.LeaseInternalResponse
|
|
iresp, err = leasehttp.TimeToLiveHTTP(ctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt)
|
|
if err == nil {
|
|
return iresp.LeaseTimeToLiveResponse, nil
|
|
}
|
|
err = convertEOFToNoLeader(err)
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// convertEOFToNoLeader converts EOF erros to ErrNoLeader because
|
|
// lease renew, timetolive requests to followers are forwarded to leader,
|
|
// and follower might not be able to reach leader from transient network
|
|
// errors (often EOF errors). By returning ErrNoLeader, signal clients
|
|
// to retry its requests.
|
|
func convertEOFToNoLeader(err error) error {
|
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
|
return ErrNoLeader
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s *EtcdServer) waitLeader() (*membership.Member, error) {
|
|
leader := s.cluster.Member(s.Leader())
|
|
for i := 0; i < 5 && leader == nil; i++ {
|
|
// wait an election
|
|
dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond
|
|
select {
|
|
case <-time.After(dur):
|
|
leader = s.cluster.Member(s.Leader())
|
|
case <-s.stopping:
|
|
return nil, ErrStopped
|
|
}
|
|
}
|
|
if leader == nil || len(leader.PeerURLs) == 0 {
|
|
return nil, ErrNoLeader
|
|
}
|
|
return leader, nil
|
|
}
|
|
|
|
func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {
|
|
result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Alarm: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AlarmResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) {
|
|
result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{AuthEnable: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthEnableResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthDisableResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) {
|
|
var result *applyResult
|
|
|
|
err := s.linearizableReadNotify(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for {
|
|
checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password)
|
|
if err != nil {
|
|
plog.Errorf("invalid authentication request to user %s was issued", r.Name)
|
|
return nil, err
|
|
}
|
|
|
|
st, err := s.AuthStore().GenSimpleToken()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
internalReq := &pb.InternalAuthenticateRequest{
|
|
Name: r.Name,
|
|
Password: r.Password,
|
|
SimpleToken: st,
|
|
}
|
|
|
|
result, err = s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Authenticate: internalReq})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
|
|
if checkedRevision != s.AuthStore().Revision() {
|
|
plog.Infof("revision when password checked is obsolete, retrying")
|
|
continue
|
|
}
|
|
|
|
break
|
|
}
|
|
|
|
return result.resp.(*pb.AuthenticateResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthUserAddResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthUserDeleteResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthUserChangePasswordResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserGrantRole: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthUserGrantRoleResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserGet: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthUserGetResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserList: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthUserListResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthUserRevokeRoleResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthRoleAddResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthRoleGrantPermissionResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleGet: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthRoleGetResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleList: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthRoleListResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthRoleRevokePermissionResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleDelete: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthRoleDeleteResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) isValidSimpleToken(token string) bool {
|
|
splitted := strings.Split(token, ".")
|
|
if len(splitted) != 2 {
|
|
return false
|
|
}
|
|
index, err := strconv.Atoi(splitted[1])
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
select {
|
|
case <-s.applyWait.Wait(uint64(index)):
|
|
return true
|
|
case <-s.stop:
|
|
return true
|
|
}
|
|
}
|
|
|
|
func (s *EtcdServer) authInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) {
|
|
md, ok := metadata.FromContext(ctx)
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
|
|
ts, tok := md["token"]
|
|
if !tok {
|
|
return nil, nil
|
|
}
|
|
|
|
token := ts[0]
|
|
if !s.isValidSimpleToken(token) {
|
|
return nil, ErrInvalidAuthToken
|
|
}
|
|
|
|
authInfo, uok := s.AuthStore().AuthInfoFromToken(token)
|
|
if !uok {
|
|
plog.Warningf("invalid auth token: %s", token)
|
|
return nil, ErrInvalidAuthToken
|
|
}
|
|
return authInfo, nil
|
|
}
|
|
|
|
// doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure.
|
|
func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error {
|
|
for {
|
|
ai, err := s.authInfoFromCtx(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if ai == nil {
|
|
// chk expects non-nil AuthInfo; use empty credentials
|
|
ai = &auth.AuthInfo{}
|
|
}
|
|
if err = chk(ai); err != nil {
|
|
if err == auth.ErrAuthOldRevision {
|
|
continue
|
|
}
|
|
return err
|
|
}
|
|
// fetch response for serialized request
|
|
get()
|
|
// empty credentials or current auth info means no need to retry
|
|
if ai.Revision == 0 || ai.Revision == s.authStore.Revision() {
|
|
return nil
|
|
}
|
|
// avoid TOCTOU error, retry of the request is required.
|
|
}
|
|
}
|
|
|
|
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
|
|
ai := s.getAppliedIndex()
|
|
ci := s.getCommittedIndex()
|
|
if ci > ai+maxGapBetweenApplyAndCommitIndex {
|
|
return nil, ErrTooManyRequests
|
|
}
|
|
|
|
r.Header = &pb.RequestHeader{
|
|
ID: s.reqIDGen.Next(),
|
|
}
|
|
|
|
authInfo, err := s.authInfoFromCtx(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if authInfo != nil {
|
|
r.Header.Username = authInfo.Username
|
|
r.Header.AuthRevision = authInfo.Revision
|
|
}
|
|
|
|
data, err := r.Marshal()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(data) > maxRequestBytes {
|
|
return nil, ErrRequestTooLarge
|
|
}
|
|
|
|
id := r.ID
|
|
if id == 0 {
|
|
id = r.Header.ID
|
|
}
|
|
ch := s.w.Register(id)
|
|
|
|
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
|
|
defer cancel()
|
|
|
|
start := time.Now()
|
|
s.r.Propose(cctx, data)
|
|
proposalsPending.Inc()
|
|
defer proposalsPending.Dec()
|
|
|
|
select {
|
|
case x := <-ch:
|
|
return x.(*applyResult), nil
|
|
case <-cctx.Done():
|
|
proposalsFailed.Inc()
|
|
s.w.Trigger(id, nil) // GC wait
|
|
return nil, s.parseProposeCtxErr(cctx.Err(), start)
|
|
case <-s.done:
|
|
return nil, ErrStopped
|
|
}
|
|
}
|
|
|
|
func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
|
|
var result *applyResult
|
|
var err error
|
|
for {
|
|
result, err = s.processInternalRaftRequestOnce(ctx, r)
|
|
if err != auth.ErrAuthOldRevision {
|
|
break
|
|
}
|
|
}
|
|
|
|
return result, err
|
|
}
|
|
|
|
// Watchable returns a watchable interface attached to the etcdserver.
|
|
func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
|
|
|
|
func (s *EtcdServer) linearizableReadLoop() {
|
|
var rs raft.ReadState
|
|
internalTimeout := time.Second
|
|
|
|
for {
|
|
ctx := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next())
|
|
|
|
select {
|
|
case <-s.readwaitc:
|
|
case <-s.stopping:
|
|
return
|
|
}
|
|
|
|
nextnr := newNotifier()
|
|
|
|
s.readMu.Lock()
|
|
nr := s.readNotifier
|
|
s.readNotifier = nextnr
|
|
s.readMu.Unlock()
|
|
|
|
cctx, cancel := context.WithTimeout(context.Background(), internalTimeout)
|
|
if err := s.r.ReadIndex(cctx, ctx); err != nil {
|
|
cancel()
|
|
if err == raft.ErrStopped {
|
|
return
|
|
}
|
|
plog.Errorf("failed to get read index from raft: %v", err)
|
|
nr.notify(err)
|
|
continue
|
|
}
|
|
cancel()
|
|
|
|
var (
|
|
timeout bool
|
|
done bool
|
|
)
|
|
for !timeout && !done {
|
|
select {
|
|
case rs = <-s.r.readStateC:
|
|
done = bytes.Equal(rs.RequestCtx, ctx)
|
|
if !done {
|
|
// a previous request might time out. now we should ignore the response of it and
|
|
// continue waiting for the response of the current requests.
|
|
plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx)
|
|
}
|
|
case <-time.After(internalTimeout):
|
|
plog.Warningf("timed out waiting for read index response")
|
|
nr.notify(ErrTimeout)
|
|
timeout = true
|
|
case <-s.stopping:
|
|
return
|
|
}
|
|
}
|
|
if !done {
|
|
continue
|
|
}
|
|
|
|
if ai := s.getAppliedIndex(); ai < rs.Index {
|
|
select {
|
|
case <-s.applyWait.Wait(rs.Index):
|
|
case <-s.stopping:
|
|
return
|
|
}
|
|
}
|
|
// unblock all l-reads requested at indices before rs.Index
|
|
nr.notify(nil)
|
|
}
|
|
}
|
|
|
|
func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
|
|
s.readMu.RLock()
|
|
nc := s.readNotifier
|
|
s.readMu.RUnlock()
|
|
|
|
// signal linearizable loop for current notify if it hasn't been already
|
|
select {
|
|
case s.readwaitc <- struct{}{}:
|
|
default:
|
|
}
|
|
|
|
// wait for read state notification
|
|
select {
|
|
case <-nc.c:
|
|
return nc.err
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-s.done:
|
|
return ErrStopped
|
|
}
|
|
}
|