mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

Currently auth tokens are generated in the replicated state machine layer randomly. It means one auth token generated in node A cannot be used for node B. It is problematic for load balancing and fail over. This commit moves the token generation logic from the state machine to API layer (before raft) and let all nodes share a single token. Log index of Raft is also added to a token for ensuring uniqueness of the token and detecting activation of the token in the cluster (some nodes can receive the token before generating and installing the token in its state machine). This commit also lets authStore have simple token related things. It is required because of unit test. The test requires cleaning of the state of the simple token things after one test (succeeding test can create duplicated token and it causes panic).
515 lines
16 KiB
Go
515 lines
16 KiB
Go
// Copyright 2015 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package etcdserver
|
|
|
|
import (
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/coreos/etcd/auth"
|
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
"github.com/coreos/etcd/lease"
|
|
"github.com/coreos/etcd/lease/leasehttp"
|
|
"github.com/coreos/etcd/mvcc"
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/grpc/metadata"
|
|
)
|
|
|
|
const (
|
|
// the max request size that raft accepts.
|
|
// TODO: make this a flag? But we probably do not want to
|
|
// accept large request which might block raft stream. User
|
|
// specify a large value might end up with shooting in the foot.
|
|
maxRequestBytes = 1.5 * 1024 * 1024
|
|
|
|
// max timeout for waiting a v3 request to go through raft.
|
|
maxV3RequestTimeout = 5 * time.Second
|
|
)
|
|
|
|
type RaftKV interface {
|
|
Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
|
|
Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
|
|
DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
|
|
Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
|
|
Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
|
|
}
|
|
|
|
type Lessor interface {
|
|
// LeaseGrant sends LeaseGrant request to raft and apply it after committed.
|
|
LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
|
|
// LeaseRevoke sends LeaseRevoke request to raft and apply it after committed.
|
|
LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
|
|
|
|
// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
|
|
// is returned.
|
|
LeaseRenew(id lease.LeaseID) (int64, error)
|
|
}
|
|
|
|
type Authenticator interface {
|
|
AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error)
|
|
AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error)
|
|
Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error)
|
|
UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
|
|
UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error)
|
|
UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error)
|
|
UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error)
|
|
UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error)
|
|
UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error)
|
|
RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error)
|
|
RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error)
|
|
RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error)
|
|
RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error)
|
|
RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
|
|
}
|
|
|
|
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
|
if r.Serializable {
|
|
user, err := s.usernameFromCtx(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
hdr := &pb.RequestHeader{Username: user}
|
|
if !s.AuthStore().IsRangePermitted(hdr, string(r.Key), string(r.RangeEnd)) {
|
|
return nil, auth.ErrPermissionDenied
|
|
}
|
|
return s.applyV3.Range(noTxn, r)
|
|
}
|
|
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.RangeResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Put: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.PutResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.DeleteRangeResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
if isTxnSerializable(r) {
|
|
return s.applyV3.Txn(r)
|
|
}
|
|
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.TxnResponse), nil
|
|
}
|
|
|
|
func isTxnSerializable(r *pb.TxnRequest) bool {
|
|
for _, u := range r.Success {
|
|
if r := u.GetRequestRange(); r == nil || !r.Serializable {
|
|
return false
|
|
}
|
|
}
|
|
for _, u := range r.Failure {
|
|
if r := u.GetRequestRange(); r == nil || !r.Serializable {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Compaction: r})
|
|
if r.Physical && result != nil && result.physc != nil {
|
|
<-result.physc
|
|
// The compaction is done deleting keys; the hash is now settled
|
|
// but the data is not necessarily committed. If there's a crash,
|
|
// the hash may revert to a hash prior to compaction completing
|
|
// if the compaction resumes. Force the finished compaction to
|
|
// commit so it won't resume following a crash.
|
|
s.be.ForceCommit()
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
resp := result.resp.(*pb.CompactionResponse)
|
|
if resp == nil {
|
|
resp = &pb.CompactionResponse{}
|
|
}
|
|
if resp.Header == nil {
|
|
resp.Header = &pb.ResponseHeader{}
|
|
}
|
|
resp.Header.Revision = s.kv.Rev()
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
|
|
// no id given? choose one
|
|
for r.ID == int64(lease.NoLease) {
|
|
// only use positive int64 id's
|
|
r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1))
|
|
}
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{LeaseGrant: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.LeaseGrantResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.LeaseRevokeResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
|
|
ttl, err := s.lessor.Renew(id)
|
|
if err == nil {
|
|
return ttl, nil
|
|
}
|
|
if err != lease.ErrNotPrimary {
|
|
return -1, err
|
|
}
|
|
|
|
// renewals don't go through raft; forward to leader manually
|
|
leader := s.cluster.Member(s.Leader())
|
|
for i := 0; i < 5 && leader == nil; i++ {
|
|
// wait an election
|
|
dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond
|
|
select {
|
|
case <-time.After(dur):
|
|
leader = s.cluster.Member(s.Leader())
|
|
case <-s.done:
|
|
return -1, ErrStopped
|
|
}
|
|
}
|
|
if leader == nil || len(leader.PeerURLs) == 0 {
|
|
return -1, ErrNoLeader
|
|
}
|
|
|
|
for _, url := range leader.PeerURLs {
|
|
lurl := url + "/leases"
|
|
ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout())
|
|
if err == nil {
|
|
break
|
|
}
|
|
}
|
|
return ttl, err
|
|
}
|
|
|
|
func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Alarm: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AlarmResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthEnable: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthEnableResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthDisableResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) {
|
|
st, err := s.AuthStore().GenSimpleToken()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
internalReq := &pb.InternalAuthenticateRequest{
|
|
Name: r.Name,
|
|
Password: r.Password,
|
|
SimpleToken: st,
|
|
}
|
|
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Authenticate: internalReq})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthenticateResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthUserAddResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthUserDeleteResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthUserChangePasswordResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserGrantRole: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthUserGrantRoleResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserGet: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthUserGetResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthUserRevokeRoleResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthRoleAddResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthRoleGrantPermissionResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleGet: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthRoleGetResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthRoleRevokePermissionResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleDelete: r})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if result.err != nil {
|
|
return nil, result.err
|
|
}
|
|
return result.resp.(*pb.AuthRoleDeleteResponse), nil
|
|
}
|
|
|
|
func (s *EtcdServer) isValidSimpleToken(token string) bool {
|
|
splitted := strings.Split(token, ".")
|
|
if len(splitted) != 2 {
|
|
return false
|
|
}
|
|
index, err := strconv.Atoi(splitted[1])
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
// CAUTION: below index synchronization is required because this node
|
|
// might not receive and apply the log entry of Authenticate() RPC.
|
|
authApplied := false
|
|
for i := 0; i < 10; i++ {
|
|
if uint64(index) <= s.getAppliedIndex() {
|
|
authApplied = true
|
|
break
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
|
|
if !authApplied {
|
|
plog.Errorf("timeout of waiting Authenticate() RPC")
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (s *EtcdServer) usernameFromCtx(ctx context.Context) (string, error) {
|
|
md, ok := metadata.FromContext(ctx)
|
|
if !ok {
|
|
return "", nil
|
|
}
|
|
|
|
ts, tok := md["token"]
|
|
if !tok {
|
|
return "", nil
|
|
}
|
|
|
|
token := ts[0]
|
|
if !s.isValidSimpleToken(token) {
|
|
return "", ErrInvalidAuthToken
|
|
}
|
|
|
|
username, uok := s.AuthStore().UsernameFromToken(token)
|
|
if !uok {
|
|
plog.Warningf("invalid auth token: %s", token)
|
|
return "", ErrInvalidAuthToken
|
|
}
|
|
return username, nil
|
|
}
|
|
|
|
func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
|
|
r.Header = &pb.RequestHeader{
|
|
ID: s.reqIDGen.Next(),
|
|
}
|
|
username, err := s.usernameFromCtx(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
r.Header.Username = username
|
|
|
|
data, err := r.Marshal()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(data) > maxRequestBytes {
|
|
return nil, ErrRequestTooLarge
|
|
}
|
|
|
|
id := r.ID
|
|
if id == 0 {
|
|
id = r.Header.ID
|
|
}
|
|
ch := s.w.Register(id)
|
|
|
|
cctx, cancel := context.WithTimeout(ctx, maxV3RequestTimeout)
|
|
defer cancel()
|
|
|
|
s.r.Propose(cctx, data)
|
|
|
|
select {
|
|
case x := <-ch:
|
|
return x.(*applyResult), nil
|
|
case <-cctx.Done():
|
|
s.w.Trigger(id, nil) // GC wait
|
|
return nil, cctx.Err()
|
|
case <-s.done:
|
|
return nil, ErrStopped
|
|
}
|
|
}
|
|
|
|
// Watchable returns a watchable interface attached to the etcdserver.
|
|
func (s *EtcdServer) Watchable() mvcc.Watchable { return s.KV() }
|