auth, etcdserver: permission of range requests

Currently the auth mechanism doesn't support permissions of range
request. It just checks exact matching of key names even for range
queries. This commit adds a mechanism for setting permission to range
queries. Range queries are allowed if a range of the query is [begin1,
end1) and the user has a permission of reading [begin2, range2) and
[begin1, end2) is a subset of [begin2, range2). Range delete requests
will follow the same rule.
This commit is contained in:
Hitoshi Mitake 2016-06-07 11:45:29 -07:00
parent 35329a1674
commit 6bb96074da
6 changed files with 407 additions and 48 deletions

View File

@ -72,8 +72,9 @@ func (*User) Descriptor() ([]byte, []int) { return fileDescriptorAuth, []int{0}
// Permission is a single entity
type Permission struct {
Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
PermType Permission_Type `protobuf:"varint,2,opt,name=permType,proto3,enum=authpb.Permission_Type" json:"permType,omitempty"`
PermType Permission_Type `protobuf:"varint,1,opt,name=permType,proto3,enum=authpb.Permission_Type" json:"permType,omitempty"`
Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
RangeEnd []byte `protobuf:"bytes,3,opt,name=range_end,json=rangeEnd,proto3" json:"range_end,omitempty"`
}
func (m *Permission) Reset() { *m = Permission{} }
@ -158,16 +159,22 @@ func (m *Permission) MarshalTo(data []byte) (int, error) {
_ = i
var l int
_ = l
if m.PermType != 0 {
data[i] = 0x8
i++
i = encodeVarintAuth(data, i, uint64(m.PermType))
}
if len(m.Key) > 0 {
data[i] = 0xa
data[i] = 0x12
i++
i = encodeVarintAuth(data, i, uint64(len(m.Key)))
i += copy(data[i:], m.Key)
}
if m.PermType != 0 {
data[i] = 0x10
if len(m.RangeEnd) > 0 {
data[i] = 0x1a
i++
i = encodeVarintAuth(data, i, uint64(m.PermType))
i = encodeVarintAuth(data, i, uint64(len(m.RangeEnd)))
i += copy(data[i:], m.RangeEnd)
}
return i, nil
}
@ -258,12 +265,16 @@ func (m *User) Size() (n int) {
func (m *Permission) Size() (n int) {
var l int
_ = l
if m.PermType != 0 {
n += 1 + sovAuth(uint64(m.PermType))
}
l = len(m.Key)
if l > 0 {
n += 1 + l + sovAuth(uint64(l))
}
if m.PermType != 0 {
n += 1 + sovAuth(uint64(m.PermType))
l = len(m.RangeEnd)
if l > 0 {
n += 1 + l + sovAuth(uint64(l))
}
return n
}
@ -468,6 +479,25 @@ func (m *Permission) Unmarshal(data []byte) error {
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field PermType", wireType)
}
m.PermType = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowAuth
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.PermType |= (Permission_Type(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
}
@ -498,11 +528,11 @@ func (m *Permission) Unmarshal(data []byte) error {
m.Key = []byte{}
}
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field PermType", wireType)
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field RangeEnd", wireType)
}
m.PermType = 0
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowAuth
@ -512,11 +542,23 @@ func (m *Permission) Unmarshal(data []byte) error {
}
b := data[iNdEx]
iNdEx++
m.PermType |= (Permission_Type(b) & 0x7F) << shift
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthAuth
}
postIndex := iNdEx + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.RangeEnd = append(m.RangeEnd[:0], data[iNdEx:postIndex]...)
if m.RangeEnd == nil {
m.RangeEnd = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipAuth(data[iNdEx:])
@ -756,22 +798,23 @@ var (
)
var fileDescriptorAuth = []byte{
// 265 bytes of a gzipped FileDescriptorProto
// 276 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4a, 0x2c, 0x2d, 0xc9,
0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x03, 0xb1, 0x0b, 0x92, 0xa4, 0x44, 0xd2, 0xf3,
0xd3, 0xf3, 0xc1, 0x42, 0xfa, 0x20, 0x16, 0x44, 0x56, 0xc9, 0x87, 0x8b, 0x25, 0xb4, 0x38, 0xb5,
0x48, 0x48, 0x88, 0x8b, 0x25, 0x2f, 0x31, 0x37, 0x55, 0x82, 0x51, 0x81, 0x51, 0x83, 0x27, 0x08,
0xcc, 0x16, 0x92, 0xe2, 0xe2, 0x28, 0x48, 0x2c, 0x2e, 0x2e, 0xcf, 0x2f, 0x4a, 0x91, 0x60, 0x02,
0x8b, 0xc3, 0xf9, 0x42, 0x22, 0x5c, 0xac, 0x45, 0xf9, 0x39, 0xa9, 0xc5, 0x12, 0xcc, 0x0a, 0xcc,
0x1a, 0x9c, 0x41, 0x10, 0x8e, 0x52, 0x3d, 0x17, 0x57, 0x40, 0x6a, 0x51, 0x6e, 0x66, 0x71, 0x71,
0x66, 0x7e, 0x9e, 0x90, 0x00, 0x17, 0x73, 0x76, 0x6a, 0x25, 0xd4, 0x48, 0x10, 0x53, 0xc8, 0x98,
0x8b, 0xa3, 0x20, 0xb5, 0x28, 0x37, 0xa4, 0xb2, 0x20, 0x15, 0x6c, 0x22, 0x9f, 0x91, 0xb8, 0x1e,
0xc4, 0x79, 0x7a, 0x08, 0x7d, 0x7a, 0x20, 0xe9, 0x20, 0xb8, 0x42, 0x25, 0x2d, 0x2e, 0x16, 0x10,
0x2d, 0xc4, 0xc1, 0xc5, 0x12, 0xe4, 0xea, 0xe8, 0x22, 0xc0, 0x20, 0xc4, 0xc9, 0xc5, 0x1a, 0x1e,
0xe4, 0x19, 0xe2, 0x2a, 0xc0, 0x28, 0xc4, 0xcb, 0xc5, 0x09, 0x12, 0x84, 0x70, 0x99, 0x94, 0x42,
0xb8, 0x58, 0x82, 0xf2, 0x73, 0x52, 0xb1, 0x7a, 0xc7, 0x82, 0x8b, 0x37, 0x3b, 0xb5, 0x12, 0x61,
0x8f, 0x04, 0x93, 0x02, 0xb3, 0x06, 0xb7, 0x91, 0x10, 0xa6, 0x0b, 0x82, 0x50, 0x15, 0x3a, 0x89,
0x9c, 0x78, 0x28, 0xc7, 0x70, 0xe1, 0xa1, 0x1c, 0xc3, 0x89, 0x47, 0x72, 0x8c, 0x17, 0x1e, 0xc9,
0x31, 0x3e, 0x78, 0x24, 0xc7, 0x98, 0xc4, 0x06, 0x0e, 0x41, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff,
0xff, 0x92, 0x06, 0xa1, 0xed, 0x6d, 0x01, 0x00, 0x00,
0x1a, 0x9c, 0x41, 0x10, 0x8e, 0xd2, 0x1c, 0x46, 0x2e, 0xae, 0x80, 0xd4, 0xa2, 0xdc, 0xcc, 0xe2,
0xe2, 0xcc, 0xfc, 0x3c, 0x21, 0x63, 0xa0, 0x01, 0x40, 0x5e, 0x48, 0x65, 0x01, 0xc4, 0x60, 0x3e,
0x23, 0x71, 0x3d, 0x88, 0x6b, 0xf4, 0x10, 0xaa, 0xf4, 0x40, 0xd2, 0x41, 0x70, 0x85, 0x42, 0x02,
0x5c, 0xcc, 0xd9, 0xa9, 0x95, 0x50, 0x0b, 0x41, 0x4c, 0x21, 0x69, 0x2e, 0xce, 0xa2, 0xc4, 0xbc,
0xf4, 0xd4, 0xf8, 0xd4, 0xbc, 0x14, 0xa0, 0x7d, 0x60, 0x87, 0x80, 0x05, 0x5c, 0xf3, 0x52, 0x94,
0xb4, 0xb8, 0x58, 0xc0, 0xda, 0x38, 0xb8, 0x58, 0x82, 0x5c, 0x1d, 0x5d, 0x04, 0x18, 0x84, 0x38,
0xb9, 0x58, 0xc3, 0x83, 0x3c, 0x43, 0x5c, 0x05, 0x18, 0x85, 0x78, 0xb9, 0x38, 0x41, 0x82, 0x10,
0x2e, 0x93, 0x52, 0x08, 0x50, 0x0d, 0xd0, 0x9d, 0x58, 0x3d, 0x6b, 0xc1, 0xc5, 0x0b, 0xb4, 0x0b,
0xe1, 0x2c, 0xa0, 0x03, 0x98, 0x35, 0xb8, 0x8d, 0x84, 0x30, 0x1d, 0x1c, 0x84, 0xaa, 0xd0, 0x49,
0xe4, 0xc4, 0x43, 0x39, 0x86, 0x0b, 0x40, 0x7c, 0xe2, 0x91, 0x1c, 0xe3, 0x05, 0x20, 0x7e, 0x00,
0xc4, 0x49, 0x6c, 0xe0, 0xf0, 0x35, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x9e, 0x31, 0x53, 0xfd,
0x8b, 0x01, 0x00, 0x00,
}

View File

@ -18,14 +18,15 @@ message User {
// Permission is a single entity
message Permission {
bytes key = 1;
enum Type {
READ = 0;
WRITE = 1;
READWRITE = 2;
}
Type permType = 2;
Type permType = 1;
bytes key = 2;
bytes range_end = 3;
}
// Role is a single entry in the bucket authRoles

195
auth/range_perm_cache.go Normal file
View File

@ -0,0 +1,195 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package auth
import (
"sort"
"strings"
"github.com/coreos/etcd/auth/authpb"
"github.com/coreos/etcd/mvcc/backend"
)
func isSubset(a, b *rangePerm) bool {
// return true if a is a subset of b
return 0 <= strings.Compare(a.begin, b.begin) && strings.Compare(a.end, b.end) <= 0
}
func reduceSubsets(perms []*rangePerm) []*rangePerm {
// TODO(mitake): currently it is O(n^2), we need a better algorithm
ret := make([]*rangePerm, 0)
for i := range perms {
subset := false
for j := range perms {
if i != j && isSubset(perms[i], perms[j]) {
subset = true
break
}
}
if subset {
continue
}
ret = append(ret, perms[i])
}
return ret
}
func unifyPerms(perms []*rangePerm) []*rangePerm {
ret := make([]*rangePerm, 0)
perms = reduceSubsets(perms)
sort.Sort(RangePermSliceByBegin(perms))
i := 0
for i < len(perms) {
begin := i
for i+1 < len(perms) && perms[i].end >= perms[i+1].begin {
i++
}
if i == begin {
ret = append(ret, &rangePerm{begin: perms[i].begin, end: perms[i].end})
} else {
ret = append(ret, &rangePerm{begin: perms[begin].begin, end: perms[i].end})
}
i++
}
return ret
}
func (as *authStore) makeUnifiedPerms(tx backend.BatchTx, userName string) *unifiedRangePermissions {
user := getUser(tx, userName)
if user == nil {
plog.Errorf("invalid user name %s", userName)
return nil
}
var readPerms, writePerms []*rangePerm
for _, roleName := range user.Roles {
_, vs := tx.UnsafeRange(authRolesBucketName, []byte(roleName), nil, 0)
if len(vs) != 1 {
plog.Errorf("invalid role name %s", roleName)
return nil
}
role := &authpb.Role{}
err := role.Unmarshal(vs[0])
if err != nil {
plog.Errorf("failed to unmarshal a role %s: %s", roleName, err)
return nil
}
for _, perm := range role.KeyPermission {
if len(perm.RangeEnd) == 0 {
continue
}
if perm.PermType == authpb.READWRITE || perm.PermType == authpb.READ {
readPerms = append(readPerms, &rangePerm{begin: string(perm.Key), end: string(perm.RangeEnd)})
}
if perm.PermType == authpb.READWRITE || perm.PermType == authpb.WRITE {
writePerms = append(writePerms, &rangePerm{begin: string(perm.Key), end: string(perm.RangeEnd)})
}
}
}
return &unifiedRangePermissions{readPerms: unifyPerms(readPerms), writePerms: unifyPerms(writePerms)}
}
func checkCachedPerm(cachedPerms *unifiedRangePermissions, userName string, key, rangeEnd string, write, read bool) bool {
var perms []*rangePerm
if write {
perms = cachedPerms.writePerms
} else {
perms = cachedPerms.readPerms
}
for _, perm := range perms {
if strings.Compare(rangeEnd, "") != 0 {
if strings.Compare(perm.begin, key) <= 0 && strings.Compare(rangeEnd, perm.end) <= 0 {
return true
}
} else {
if strings.Compare(perm.begin, key) <= 0 && strings.Compare(key, perm.end) <= 0 {
return true
}
}
}
return false
}
func (as *authStore) isRangeOpPermitted(tx backend.BatchTx, userName string, key, rangeEnd string, write, read bool) bool {
// assumption: tx is Lock()ed
_, ok := as.rangePermCache[userName]
if ok {
return checkCachedPerm(as.rangePermCache[userName], userName, key, rangeEnd, write, read)
}
perms := as.makeUnifiedPerms(tx, userName)
if perms == nil {
plog.Errorf("failed to create a unified permission of user %s", userName)
return false
}
as.rangePermCache[userName] = perms
return checkCachedPerm(as.rangePermCache[userName], userName, key, rangeEnd, write, read)
}
func (as *authStore) clearCachedPerm() {
as.rangePermCache = make(map[string]*unifiedRangePermissions)
}
func (as *authStore) invalidateCachedPerm(userName string) {
delete(as.rangePermCache, userName)
}
type unifiedRangePermissions struct {
// readPerms[i] and readPerms[j] (i != j) don't overlap
readPerms []*rangePerm
// writePerms[i] and writePerms[j] (i != j) don't overlap, too
writePerms []*rangePerm
}
type rangePerm struct {
begin, end string
}
type RangePermSliceByBegin []*rangePerm
func (slice RangePermSliceByBegin) Len() int {
return len(slice)
}
func (slice RangePermSliceByBegin) Less(i, j int) bool {
if slice[i].begin == slice[j].begin {
return slice[i].end < slice[j].end
}
return slice[i].begin < slice[j].begin
}
func (slice RangePermSliceByBegin) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}

View File

@ -0,0 +1,96 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package auth
import (
"testing"
)
func isPermsEqual(a, b []*rangePerm) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if len(b) <= i {
return false
}
if a[i].begin != b[i].begin || a[i].end != b[i].end {
return false
}
}
return true
}
func TestUnifyParams(t *testing.T) {
tests := []struct {
params []*rangePerm
want []*rangePerm
}{
{
[]*rangePerm{{"a", "b"}},
[]*rangePerm{{"a", "b"}},
},
{
[]*rangePerm{{"a", "b"}, {"b", "c"}},
[]*rangePerm{{"a", "c"}},
},
{
[]*rangePerm{{"a", "c"}, {"b", "d"}},
[]*rangePerm{{"a", "d"}},
},
{
[]*rangePerm{{"a", "b"}, {"b", "c"}, {"d", "e"}},
[]*rangePerm{{"a", "c"}, {"d", "e"}},
},
{
[]*rangePerm{{"a", "b"}, {"c", "d"}, {"e", "f"}},
[]*rangePerm{{"a", "b"}, {"c", "d"}, {"e", "f"}},
},
{
[]*rangePerm{{"e", "f"}, {"c", "d"}, {"a", "b"}},
[]*rangePerm{{"a", "b"}, {"c", "d"}, {"e", "f"}},
},
{
[]*rangePerm{{"a", "b"}, {"c", "d"}, {"a", "z"}},
[]*rangePerm{{"a", "z"}},
},
{
[]*rangePerm{{"a", "b"}, {"c", "d"}, {"a", "z"}, {"1", "9"}},
[]*rangePerm{{"1", "9"}, {"a", "z"}},
},
{
[]*rangePerm{{"a", "b"}, {"c", "d"}, {"a", "z"}, {"1", "a"}},
[]*rangePerm{{"1", "z"}},
},
{
[]*rangePerm{{"a", "b"}, {"a", "z"}, {"5", "6"}, {"1", "9"}},
[]*rangePerm{{"1", "9"}, {"a", "z"}},
},
{
[]*rangePerm{{"a", "b"}, {"b", "c"}, {"c", "d"}, {"d", "f"}, {"1", "9"}},
[]*rangePerm{{"1", "9"}, {"a", "f"}},
},
}
for i, tt := range tests {
result := unifyPerms(tt.params)
if !isPermsEqual(result, tt.want) {
t.Errorf("#%d: result=%q, want=%q", i, result, tt.want)
}
}
}

View File

@ -103,13 +103,15 @@ type AuthStore interface {
IsPutPermitted(header *pb.RequestHeader, key string) bool
// IsRangePermitted checks range permission of the user
IsRangePermitted(header *pb.RequestHeader, key string) bool
IsRangePermitted(header *pb.RequestHeader, key, rangeEnd string) bool
}
type authStore struct {
be backend.Backend
enabled bool
enabledMu sync.RWMutex
rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions
}
func (as *authStore) AuthEnable() {
@ -126,6 +128,8 @@ func (as *authStore) AuthEnable() {
as.enabled = true
as.enabledMu.Unlock()
as.rangePermCache = make(map[string]*unifiedRangePermissions)
plog.Noticef("Authentication enabled")
}
@ -301,6 +305,8 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
tx.UnsafePut(authUsersBucketName, user.Name, marshaledUser)
as.invalidateCachedPerm(r.User)
plog.Noticef("granted role %s to user %s", r.Role, r.User)
return &pb.AuthUserGrantRoleResponse{}, nil
}
@ -357,6 +363,8 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
tx.UnsafePut(authUsersBucketName, updatedUser.Name, marshaledUser)
as.invalidateCachedPerm(r.Name)
plog.Noticef("revoked role %s from user %s", r.Role, r.Name)
return &pb.AuthUserRevokeRoleResponse{}, nil
}
@ -424,6 +432,10 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest)
tx.UnsafePut(authRolesBucketName, updatedRole.Name, marshaledRole)
// TODO(mitake): currently single role update invalidates every cache
// It should be optimized.
as.clearCachedPerm()
plog.Noticef("revoked key %s from role %s", r.Key, r.Role)
return &pb.AuthRoleRevokePermissionResponse{}, nil
}
@ -546,12 +558,16 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
tx.UnsafePut(authRolesBucketName, []byte(r.Name), marshaledRole)
// TODO(mitake): currently single role update invalidates every cache
// It should be optimized.
as.clearCachedPerm()
plog.Noticef("role %s's permission of key %s is updated as %s", r.Name, r.Perm.Key, authpb.Permission_Type_name[int32(r.Perm.PermType)])
return &pb.AuthRoleGrantPermissionResponse{}, nil
}
func (as *authStore) isOpPermitted(userName string, key string, write bool, read bool) bool {
func (as *authStore) isOpPermitted(userName string, key, rangeEnd string, write bool, read bool) bool {
// TODO(mitake): this function would be costly so we need a caching mechanism
if !as.isAuthEnabled() {
return true
@ -567,22 +583,26 @@ func (as *authStore) isOpPermitted(userName string, key string, write bool, read
return false
}
for _, roleName := range user.Roles {
_, vs := tx.UnsafeRange(authRolesBucketName, []byte(roleName), nil, 0)
if len(vs) != 1 {
plog.Errorf("invalid role name %s for permission checking", roleName)
return false
}
if strings.Compare(rangeEnd, "") == 0 {
for _, roleName := range user.Roles {
_, vs := tx.UnsafeRange(authRolesBucketName, []byte(roleName), nil, 0)
if len(vs) != 1 {
plog.Errorf("invalid role name %s for permission checking", roleName)
return false
}
role := &authpb.Role{}
err := role.Unmarshal(vs[0])
if err != nil {
plog.Errorf("failed to unmarshal a role %s: %s", roleName, err)
return false
}
role := &authpb.Role{}
err := role.Unmarshal(vs[0])
if err != nil {
plog.Errorf("failed to unmarshal a role %s: %s", roleName, err)
return false
}
for _, perm := range role.KeyPermission {
if !bytes.Equal(perm.Key, []byte(key)) {
continue
}
for _, perm := range role.KeyPermission {
if bytes.Equal(perm.Key, []byte(key)) {
if perm.PermType == authpb.READWRITE {
return true
}
@ -598,15 +618,19 @@ func (as *authStore) isOpPermitted(userName string, key string, write bool, read
}
}
if as.isRangeOpPermitted(tx, userName, key, rangeEnd, write, read) {
return true
}
return false
}
func (as *authStore) IsPutPermitted(header *pb.RequestHeader, key string) bool {
return as.isOpPermitted(header.Username, key, true, false)
return as.isOpPermitted(header.Username, key, "", true, false)
}
func (as *authStore) IsRangePermitted(header *pb.RequestHeader, key string) bool {
return as.isOpPermitted(header.Username, key, false, true)
func (as *authStore) IsRangePermitted(header *pb.RequestHeader, key, rangeEnd string) bool {
return as.isOpPermitted(header.Username, key, rangeEnd, false, true)
}
func getUser(tx backend.BatchTx, username string) *authpb.User {

View File

@ -81,7 +81,7 @@ func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) *applyResult {
ar := &applyResult{}
switch {
case r.Range != nil:
if s.AuthStore().IsRangePermitted(r.Header, string(r.Range.Key)) {
if s.AuthStore().IsRangePermitted(r.Header, string(r.Range.Key), string(r.Range.RangeEnd)) {
ar.resp, ar.err = s.applyV3.Range(noTxn, r.Range)
} else {
ar.err = auth.ErrPermissionDenied