Merge pull request #2 from etcd-io/master

Rebase master
This commit is contained in:
Yuchen Zhou 2019-10-08 12:52:05 -07:00 committed by GitHub
commit 2263930462
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
48 changed files with 610 additions and 345 deletions

View File

@ -6,7 +6,7 @@ sudo: required
services: docker
go:
- 1.13
- 1.13.1
- tip
notifications:
@ -30,13 +30,13 @@ env:
matrix:
fast_finish: true
allow_failures:
- go: 1.13
- go: 1.13.1
env: TARGET=linux-amd64-grpcproxy
- go: 1.13
- go: 1.13.1
env: TARGET=linux-amd64-coverage
- go: tip
env: TARGET=linux-amd64-fmt-unit-go-tip
- go: 1.13
- go: 1.13.1
env: TARGET=linux-386-unit
exclude:
- go: tip
@ -57,7 +57,7 @@ matrix:
env: TARGET=linux-amd64-grpcproxy
- go: tip
env: TARGET=linux-amd64-coverage
- go: 1.13
- go: 1.13.1
env: TARGET=linux-amd64-fmt-unit-go-tip
- go: tip
env: TARGET=linux-386-unit
@ -75,7 +75,7 @@ script:
linux-amd64-fmt)
docker run --rm \
--volume=`pwd`:/go/src/go.etcd.io/etcd gcr.io/etcd-development/etcd-test:go${TRAVIS_GO_VERSION} \
/bin/bash -c "GOARCH=amd64 PASSES='fmt dep' ./test"
/bin/bash -c "GOARCH=amd64 PASSES='fmt bom dep' ./test"
;;
linux-amd64-integration-1-cpu)
docker run --rm \

1
.words
View File

@ -95,6 +95,7 @@ jitter
WithBackoff
BackoffLinearWithJitter
jitter
WithDialer
WithMax
ServerStreams
BidiStreams

View File

@ -10,7 +10,7 @@ The minimum recommended etcd versions to run in **production** are 3.1.11+, 3.2.
## [v3.1.21](https://github.com/etcd-io/etcd/releases/tag/v3.1.21) (2019-TBD)
### etcdctl
### etcdctl v3
- [Strip out insecure endpoints from DNS SRV records when using discovery](https://github.com/etcd-io/etcd/pull/10443) with etcdctl v2
- Add [`etcdctl endpoint health --write-out` support](https://github.com/etcd-io/etcd/pull/9540).

View File

@ -9,9 +9,9 @@ The minimum recommended etcd versions to run in **production** are 3.1.11+, 3.2.
<hr>
## [v3.2.27](https://github.com/etcd-io/etcd/releases/tag/v3.2.27) (2019-TBD)
## [v3.2.27](https://github.com/etcd-io/etcd/releases/tag/v3.2.27) (2019-09-17)
### etcdctl
### etcdctl v3
- [Strip out insecure endpoints from DNS SRV records when using discovery](https://github.com/etcd-io/etcd/pull/10443) with etcdctl v2
- Add [`etcdctl endpoint health --write-out` support](https://github.com/etcd-io/etcd/pull/9540).
@ -19,6 +19,13 @@ The minimum recommended etcd versions to run in **production** are 3.1.11+, 3.2.
- The command output is changed. Previously, if endpoint is unreachable, the command output is
"\<endpoint\> is unhealthy: failed to connect: \<error message\>". This change unified the error message, all error types
now have the same output "\<endpoint\> is unhealthy: failed to commit proposal: \<error message\>".
- Fix [`etcdctl snapshot status` to not modify snapshot file](https://github.com/etcd-io/etcd/pull/11157).
- For example, start etcd `v3.3.10`
- Write some data
- Use etcdctl `v3.3.10` to save snapshot
- Somehow, upgrading Kubernetes fails, thus rolling back to previous version etcd `v3.2.24`
- Run etcdctl `v3.2.24` `snapshot status` against the snapshot file saved from `v3.3.10` server
- Run etcdctl `v3.2.24` `snapshot restore` fails with `"expected sha256 [12..."`
### Metrics, Monitoring
@ -30,6 +37,11 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
- Add [`etcd_debugging_mvcc_current_revision`](https://github.com/etcd-io/etcd/pull/11126) Prometheus metric.
- Add [`etcd_debugging_mvcc_compact_revision`](https://github.com/etcd-io/etcd/pull/11126) Prometheus metric.
### Go
- Compile with [*Go 1.8.7*](https://golang.org/doc/devel/release.html#go1.8).
<hr>

View File

@ -15,14 +15,6 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.3.15...v3.3.16) an
**Again, before running upgrades from any previous release, please make sure to read change logs below and [v3.3 upgrade guide](https://github.com/etcd-io/etcd/blob/master/Documentation/upgrades/upgrade_3_3.md).**
### Dependency
- Upgrade [`github.com/coreos/bbolt`](https://github.com/etcd-io/bbolt/releases) from [**`v1.3.1-coreos.6`**](https://github.com/etcd-io/bbolt/releases/tag/v1.3.1-coreos.6) to [**`v1.3.3`**](https://github.com/etcd-io/bbolt/releases/tag/v1.3.3).
### Go
- Compile with [*Go 1.12.9*](https://golang.org/doc/devel/release.html#go1.12) including [*Go 1.12.8*](https://groups.google.com/d/msg/golang-announce/65QixT3tcmg/DrFiG6vvCwAJ) security fixes.
### Metrics, Monitoring
See [List of metrics](https://github.com/etcd-io/etcd/tree/master/Documentation/metrics) for all metrics per release.
@ -32,6 +24,18 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
- Add [`etcd_debugging_mvcc_current_revision`](https://github.com/etcd-io/etcd/pull/11126) Prometheus metric.
- Add [`etcd_debugging_mvcc_compact_revision`](https://github.com/etcd-io/etcd/pull/11126) Prometheus metric.
### Dependency
- Upgrade [`github.com/coreos/bbolt`](https://github.com/etcd-io/bbolt/releases) from [**`v1.3.1-coreos.6`**](https://github.com/etcd-io/bbolt/releases/tag/v1.3.1-coreos.6) to [**`v1.3.3`**](https://github.com/etcd-io/bbolt/releases/tag/v1.3.3).
### etcdctl v3
- Fix [`etcdctl member add`](https://github.com/etcd-io/etcd/pull/11194) command to prevent potential timeout.
### Go
- Compile with [*Go 1.12.9*](https://golang.org/doc/devel/release.html#go1.12) including [*Go 1.12.8*](https://groups.google.com/d/msg/golang-announce/65QixT3tcmg/DrFiG6vvCwAJ) security fixes.
<hr>
@ -208,7 +212,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.3.11...v3.3.12) an
**Again, before running upgrades from any previous release, please make sure to read change logs below and [v3.3 upgrade guide](https://github.com/etcd-io/etcd/blob/master/Documentation/upgrades/upgrade_3_3.md).**
### etcdctl
### etcdctl v3
- [Strip out insecure endpoints from DNS SRV records when using discovery](https://github.com/etcd-io/etcd/pull/10443) with etcdctl v2
@ -772,6 +776,13 @@ See [security doc](https://github.com/etcd-io/etcd/blob/master/Documentation/op-
- Enable [`clientv3.WithRequireLeader(context.Context)` for `watch`](https://github.com/etcd-io/etcd/pull/8672) command.
- Print [`"del"` instead of `"delete"`](https://github.com/etcd-io/etcd/pull/8297) in `txn` interactive mode.
- Print [`ETCD_INITIAL_ADVERTISE_PEER_URLS` in `member add`](https://github.com/etcd-io/etcd/pull/8332).
- Fix [`etcdctl snapshot status` to not modify snapshot file](https://github.com/etcd-io/etcd/pull/8815).
- For example, start etcd `v3.3.10`
- Write some data
- Use etcdctl `v3.3.10` to save snapshot
- Somehow, upgrading Kubernetes fails, thus rolling back to previous version etcd `v3.2.24`
- Run etcdctl `v3.2.24` `snapshot status` against the snapshot file saved from `v3.3.10` server
- Run etcdctl `v3.2.24` `snapshot restore` fails with `"expected sha256 [12..."`
### etcdctl v3

View File

@ -9,7 +9,29 @@ The minimum recommended etcd versions to run in **production** are 3.1.11+, 3.2.
<hr>
## [v3.4.1](https://github.com/etcd-io/etcd/releases/tag/v3.4.1) (2019-TBD)
## [v3.4.2](https://github.com/etcd-io/etcd/releases/tag/v3.4.2) (2019 TBD)
See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.1...v3.4.2) and [v3.4 upgrade guide](https://github.com/etcd-io/etcd/blob/master/Documentation/upgrades/upgrade_3_4.md) for any breaking changes.
**Again, before running upgrades from any previous release, please make sure to read change logs below and [v3.4 upgrade guide](https://github.com/etcd-io/etcd/blob/master/Documentation/upgrades/upgrade_3_4.md).**
### Dependency
- Upgrade [`google.golang.org/grpc`](https://github.com/grpc/grpc-go/releases) from [**`v1.23.1`**](https://github.com/grpc/grpc-go/releases/tag/v1.23.1) to [**`v1.24.0`**](https://github.com/grpc/grpc-go/releases/tag/v1.24.0).
### etcdctl v3
- Fix [`etcdctl member add`](https://github.com/etcd-io/etcd/pull/11194) command to prevent potential timeout.
### Go
- Compile with [*Go 1.12.9*](https://golang.org/doc/devel/release.html#go1.12) including [*Go 1.12.8*](https://groups.google.com/d/msg/golang-announce/65QixT3tcmg/DrFiG6vvCwAJ) security fixes.
<hr>
## [v3.4.1](https://github.com/etcd-io/etcd/releases/tag/v3.4.1) (2019-09-17)
See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.4.1) and [v3.4 upgrade guide](https://github.com/etcd-io/etcd/blob/master/Documentation/upgrades/upgrade_3_4.md) for any breaking changes.
@ -29,6 +51,14 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
- Fix [secure server logging message](https://github.com/etcd-io/etcd/commit/8b053b0f44c14ac0d9f39b9b78c17c57d47966eb).
- Remove [redundant `%` characters in file descriptor warning message](https://github.com/etcd-io/etcd/commit/d5f79adc9cea9ec8c93669526464b0aa19ed417b).
### Package `embed`
- Add [`embed.Config.ZapLoggerBuilder`](https://github.com/etcd-io/etcd/pull/11148) to allow creating a custom zap logger.
### Dependency
- Upgrade [`google.golang.org/grpc`](https://github.com/grpc/grpc-go/releases) from [**`v1.23.0`**](https://github.com/grpc/grpc-go/releases/tag/v1.23.0) to [**`v1.23.1`**](https://github.com/grpc/grpc-go/releases/tag/v1.23.1).
### Go
- Compile with [*Go 1.12.9*](https://golang.org/doc/devel/release.html#go1.12) including [*Go 1.12.8*](https://groups.google.com/d/msg/golang-announce/65QixT3tcmg/DrFiG6vvCwAJ) security fixes.

View File

@ -80,6 +80,15 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
- Remove [`embed.Config.Debug`](https://github.com/etcd-io/etcd/pull/10947).
- Use `embed.Config.LogLevel` instead.
- Add [`embed.Config.ZapLoggerBuilder`](https://github.com/etcd-io/etcd/pull/11147) to allow creating a custom zap logger.
### Package `clientv3`
- Add [TryLock](https://github.com/etcd-io/etcd/pull/11104) method to `clientv3/concurrency/Mutex`. A non-blocking method on `Mutex` which does not wait to get lock on the Mutex, returns immediately if Mutex is locked by another session.
### etcdctl v3
- Fix [`etcdctl member add`](https://github.com/etcd-io/etcd/pull/11194) command to prevent potential timeout.
### gRPC gateway
@ -87,11 +96,18 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
- Deprecated [`/v3beta`](https://github.com/etcd-io/etcd/pull/9298).
- `curl -L http://localhost:2379/v3beta/kv/put -X POST -d '{"key": "Zm9v", "value": "YmFy"}'` does work in v3.5. Use `curl -L http://localhost:2379/v3/kv/put -X POST -d '{"key": "Zm9v", "value": "YmFy"}'` instead.
### Dependency
- Upgrade [`google.golang.org/grpc`](https://github.com/grpc/grpc-go/releases) from [**`v1.23.0`**](https://github.com/grpc/grpc-go/releases/tag/v1.23.0) to [**`v1.23.1`**](https://github.com/grpc/grpc-go/releases/tag/v1.23.1).
### Go
- Require [*Go 1.13+*](https://github.com/etcd-io/etcd/pull/11110).
- Compile with [*Go 1.13*](https://golang.org/doc/devel/release.html#go1.13)
### Project Governance
- The etcd team has added, a well defined and openly discussed, project [governance](https://github.com/etcd-io/etcd/pull/11175).
<hr>

View File

@ -6,6 +6,7 @@ etcd is Apache 2.0 licensed and accepts contributions via GitHub pull requests.
- Email: [etcd-dev](https://groups.google.com/forum/?hl=en#!forum/etcd-dev)
- IRC: #[etcd](irc://irc.freenode.org:6667/#etcd) IRC channel on freenode.org
- Slack: [#etcd](https://kubernetes.slack.com/messages/C3HD8ARJ5/details/)
## Getting started

View File

@ -411,7 +411,7 @@ Follow the instructions when using these flags.
+ env variable: ETCD_ENABLE_PPROF
### --metrics
+ Set level of detail for exported metrics, specify 'extensive' to include histogram metrics.
+ Set level of detail for exported metrics, specify 'extensive' to include server side grpc histogram metrics.
+ default: basic
+ env variable: ETCD_METRICS

80
GOVERNANCE.md Normal file
View File

@ -0,0 +1,80 @@
# etcd Governance
## Principles
The etcd community adheres to the following principles:
- Open: etcd is open source.
- Welcoming and respectful: See [Code of Conduct](code-of-conduct.md).
- Transparent and accessible: Changes to the etcd code repository and CNCF related
activities (e.g. level, involvement, etc) are done in public.
- Merit: Ideas and contributions are accepted according to their technical merit for
the betterment of the project. For specific guidance on practical contribution steps
please see [CONTRIBUTING](./CONTRIBUTING.md) guide.
## Maintainers
[Maintainers](./MAINTAINERS) are first and foremost contributors that have shown they
are committed to the long term success of a project. Maintainership is about building
trust with the current maintainers of the project and being a person that they can
depend on to make decisions in the best interest of the project in a consistent manner.
The maintainers role can be a top-level or restricted to certain package/feature
depending upon their commitment in fulfilling the expected responsibilities as explained
below.
### Top-level maintainer
- Running the etcd release processes
- Ownership of test and debug infrastructure
- Triage GitHub issues to keep the issue count low (goal: under 100)
- Regularly review GitHub pull requests across all pkgs
- Providing cross pkg design review
- Monitor email aliases
- Participate when called upon in the [security disclosure and release process](security/README.md)
- General project maintenance
### Package/feature maintainer
- Ownership of test and debug failures in a pkg/feature
- Resolution of bugs triaged to a package/feature
- Regularly review pull requests to the pkg subsystem
Contributors who are interested in becoming a maintainer, if performing these
responsibilities, should discuss their interest with the existing maintainers. New
maintainers must be nominated by an existing maintainer and must be elected by a
supermajority of maintainers. Likewise, maintainers can be removed by a supermajority
of the maintainers and moved to emeritus status.
Life priorities, interests, and passions can change. If a maintainer needs to step
down, inform other maintainers about this intention, and if possible, help find someone
to pick up the related work. At the very least, ensure the related work can be continued.
Afterward, create a pull request to remove yourself from the [MAINTAINERS](./MAINTAINERS)
file.
## Reviewers
[Reviewers](./MAINTAINERS) are contributors who have demonstrated greater skill in
reviewing the code contribution from other contributors. Their LGTM counts towards
merging a code change into the project. A reviewer is generally on the ladder towards
maintainership. New reviewers must be nominated by an existing maintainer and must be
elected by a supermajority of maintainers. Likewise, reviewers can be removed by a
supermajority of the maintainers or can resign by notifying the maintainers.
## Decision making process
Decisions are built on consensus between maintainers publicly. Proposals and ideas
can either be submitted for agreement via a GitHub issue or PR, or by sending an email
to `etcd-maintainers@googlegroups.com`.
## Conflict resolution
In general, we prefer that technical issues and maintainer membership are amicably
worked out between the persons involved. However, any technical dispute that has
reached an impasse with a subset of the community, any contributor may open a GitHub
issue or PR or send an email to `etcd-maintainers@googlegroups.com`. If the
maintainers themselves cannot decide an issue, the issue will be resolved by a
supermajority of the maintainers.
## Changes in Governance
Changes in project governance could be initiated by opening a GitHub PR.

View File

@ -1,4 +1,6 @@
# This is the official list of etcd maintainers.
# The official list of maintainers and reviewers for the project maintenance.
#
# Refer to the GOVERNANCE.md for description of the roles.
#
# Names should be added to this file like so:
# Individual's name <submission email address> (@GITHUB_HANDLE) pkg:*
@ -6,6 +8,7 @@
#
# Please keep the list sorted.
# MAINTAINERS
Brandon Philips <bphilips@redhat.com> (@philips) pkg:*
Gyuho Lee <gyuhox@gmail.com> <leegyuho@amazon.com> (@gyuho) pkg:*
Hitoshi Mitake <h.mitake@gmail.com> (@mitake) pkg:*
@ -18,3 +21,5 @@ Xiang Li <xiangli.cs@gmail.com> (@xiang90) pkg:*
Ben Darnell <ben@cockroachlabs.com> (@bdarnell) pkg:go.etcd.io/etcd/raft
Tobias Grieger <tobias.schottdorf@gmail.com> (@tbg) pkg:go.etcd.io/etcd/raft
# REVIEWERS
Wenjia Zhang <wenjiazhang@google.com> (@wenjiaswe) pkg:*

View File

@ -1,16 +0,0 @@
This document describes basic expectations for maintainers. To become a maintainer, start taking on these responsibilities. Consistent contributors then discuss with existing maintainers to become the official [MAINTAINERS](./MAINTAINERS).
### Top-level maintainer
- Running the etcd release processes
- Ownership of test and debug infrastructure
- Resolve or redirect issues to keep the issue count low (goal: under 100)
- Regularly review pull requests across all pkgs
- Providing cross pkg design review
### Package/feature maintainer
- Ownership of test and debug failures in a pkg/feature
- Resolution of bugs triaged to a package/feature
- Regularly review pull requests to the pkg subsystem

View File

@ -51,7 +51,7 @@ docker-remove:
GO_VERSION ?= 1.13
GO_VERSION ?= 1.13.1
ETCD_VERSION ?= $(shell git rev-parse --short HEAD || echo "GitNotFound")
TEST_SUFFIX = $(shell date +%s | base64 | head -c 15)
@ -65,11 +65,11 @@ endif
# Example:
# GO_VERSION=1.13 make build-docker-test
# GO_VERSION=1.13.1 make build-docker-test
# make build-docker-test
#
# gcloud docker -- login -u _json_key -p "$(cat /etc/gcp-key-etcd-development.json)" https://gcr.io
# GO_VERSION=1.13 make push-docker-test
# GO_VERSION=1.13.1 make push-docker-test
# make push-docker-test
#
# gsutil -m acl ch -u allUsers:R -r gs://artifacts.etcd-development.appspot.com

5
NOTICE
View File

@ -1,5 +0,0 @@
CoreOS Project
Copyright 2014 CoreOS, Inc
This product includes software developed at CoreOS, Inc.
(http://www.coreos.com/).

20
OWNERS
View File

@ -1,20 +0,0 @@
approvers:
- heyitsanthony
- philips
- fanminshi
- gyuho
- mitake
- jpbetz
- xiang90
- hexfusion
reviewers:
- heyitsanthony
- philips
- fanminshi
- gyuho
- mitake
- jpbetz
- xiang90
- wenjiaswe
- jingyih
- hexfusion

View File

@ -52,16 +52,13 @@ Time:
- [Jul 11th, 2019 11:00 AM video](https://youtu.be/k_FZEipWD6Y)
- [Jul 25, 2019 11:00 AM video](https://youtu.be/VSUJTACO93I)
- [Aug 22, 2019 11:00 AM video](https://youtu.be/6IBQ-VxQmuM)
- Sep 19, 2019 11:00 AM
- [Sep 19, 2019 11:00 AM video](https://youtu.be/SqfxU9DhBOc)
- Nov 14, 2019 11:00 AM
- Dec 12, 2019 11:00 AM
Join Hangouts Meet
meet.google.com/umg-nrxn-qvs
Join Hangouts Meet: [meet.google.com/umg-nrxn-qvs](https://meet.google.com/umg-nrxn-qvs)
Join by phone
+1 405-792-0633 PIN: 299 906#
More phone numbers
Join by phone: +1 405-792-0633 PIN: 299 906#
## Getting started
@ -72,7 +69,7 @@ The easiest way to get etcd is to use one of the pre-built release binaries whic
For more installation guides, please check out [play.etcd.io](http://play.etcd.io) and [operating etcd](https://github.com/etcd-io/etcd/tree/master/Documentation#operating-etcd-clusters).
For those wanting to try the very latest version, [build the latest version of etcd][dl-build] from the `master` branch. This first needs [*Go*](https://golang.org/) installed (version 1.12+ is required). All development occurs on `master`, including new features and bug fixes. Bug fixes are first targeted at `master` and subsequently ported to release branches, as described in the [branch management][branch-management] guide.
For those wanting to try the very latest version, [build the latest version of etcd][dl-build] from the `master` branch. This first needs [*Go*](https://golang.org/) installed (version 1.13+ is required). All development occurs on `master`, including new features and bug fixes. Bug fixes are first targeted at `master` and subsequently ported to release branches, as described in the [branch management][branch-management] guide.
[github-release]: https://github.com/etcd-io/etcd/releases
[branch-management]: ./Documentation/branch_management.md

View File

@ -16,7 +16,9 @@
package endpoint
import (
"context"
"fmt"
"net"
"net/url"
"strings"
"sync"
@ -238,3 +240,19 @@ func ParseHostPort(hostPort string) (host string, port string) {
}
return host, port
}
// Dialer dials a endpoint using net.Dialer.
// Context cancelation and timeout are supported.
func Dialer(ctx context.Context, dialEp string) (net.Conn, error) {
proto, host, _ := ParseEndpoint(dialEp)
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
dialer := &net.Dialer{}
if deadline, ok := ctx.Deadline(); ok {
dialer.Deadline = deadline
}
return dialer.DialContext(ctx, proto, host)
}

View File

@ -230,24 +230,17 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
}
opts = append(opts, dopts...)
// Provide a net dialer that supports cancelation and timeout.
f := func(dialEp string, t time.Duration) (net.Conn, error) {
proto, host, _ := endpoint.ParseEndpoint(dialEp)
select {
case <-c.ctx.Done():
return nil, c.ctx.Err()
default:
}
dialer := &net.Dialer{Timeout: t}
return dialer.DialContext(c.ctx, proto, host)
}
opts = append(opts, grpc.WithDialer(f))
dialer := endpoint.Dialer
if creds != nil {
opts = append(opts, grpc.WithTransportCredentials(creds))
// gRPC load balancer workaround. See credentials.transportCredential for details.
if credsDialer, ok := creds.(TransportCredentialsWithDialer); ok {
dialer = credsDialer.Dialer
}
} else {
opts = append(opts, grpc.WithInsecure())
}
opts = append(opts, grpc.WithContextDialer(dialer))
// Interceptor retry and backoff.
// TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
@ -663,3 +656,9 @@ func IsConnCanceled(err error) bool {
// <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
return strings.Contains(err.Error(), "grpc: the client connection is closing")
}
// TransportCredentialsWithDialer is for a gRPC load balancer workaround. See credentials.transportCredential for details.
type TransportCredentialsWithDialer interface {
grpccredentials.TransportCredentials
Dialer(ctx context.Context, dialEp string) (net.Conn, error)
}

View File

@ -23,6 +23,57 @@ import (
"go.etcd.io/etcd/clientv3/concurrency"
)
func ExampleMutex_TryLock() {
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// create two separate sessions for lock competition
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock")
s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/my-lock")
// acquire lock for s1
if err = m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")
if err = m2.TryLock(context.TODO()); err == nil {
log.Fatal("should not acquire lock")
}
if err == concurrency.ErrLocked {
fmt.Println("cannot acquire lock for s2, as already locked in another session")
}
if err = m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1")
if err = m2.TryLock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s2")
// Output:
// acquired lock for s1
// cannot acquire lock for s2, as already locked in another session
// released lock for s1
// acquired lock for s2
}
func ExampleMutex_Lock() {
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {

View File

@ -16,6 +16,7 @@ package concurrency
import (
"context"
"errors"
"fmt"
"sync"
@ -23,6 +24,9 @@ import (
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
)
// ErrLocked is returned by TryLock when Mutex is already locked by another session.
var ErrLocked = errors.New("mutex: Locked by another session")
// Mutex implements the sync Locker interface with etcd
type Mutex struct {
s *Session
@ -37,9 +41,56 @@ func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx + "/", "", -1, nil}
}
// TryLock locks the mutex if not already locked by another session.
// If lock is held by another session, return immediately after attempting necessary cleanup
// The ctx argument is used for the sending/receiving Txn RPC.
func (m *Mutex) TryLock(ctx context.Context) error {
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
client := m.s.Client()
// Cannot lock, so delete the key
if _, err := client.Delete(ctx, m.myKey); err != nil {
return err
}
m.myKey = "\x00"
m.myRev = -1
return ErrLocked
}
// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
client := m.s.Client()
// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
} else {
m.hdr = hdr
}
return werr
}
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
s := m.s
client := m.s.Client()
@ -53,28 +104,13 @@ func (m *Mutex) Lock(ctx context.Context) error {
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return err
return nil, err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
} else {
m.hdr = hdr
}
return werr
return resp, nil
}
func (m *Mutex) Unlock(ctx context.Context) error {

View File

@ -22,6 +22,7 @@ import (
"net"
"sync"
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
grpccredentials "google.golang.org/grpc/credentials"
)
@ -65,38 +66,37 @@ func (b *bundle) NewWithMode(mode string) (grpccredentials.Bundle, error) {
}
// transportCredential implements "grpccredentials.TransportCredentials" interface.
// transportCredential wraps TransportCredentials to track which
// addresses are dialed for which endpoints, and then sets the authority when checking the endpoint's cert to the
// hostname or IP of the dialed endpoint.
// This is a workaround of a gRPC load balancer issue. gRPC uses the dialed target's service name as the authority when
// checking all endpoint certs, which does not work for etcd servers using their hostname or IP as the Subject Alternative Name
// in their TLS certs.
// To enable, include both WithTransportCredentials(creds) and WithContextDialer(creds.Dialer)
// when dialing.
type transportCredential struct {
gtc grpccredentials.TransportCredentials
mu sync.Mutex
// addrToEndpoint maps from the connection addresses that are dialed to the hostname or IP of the
// endpoint provided to the dialer when dialing
addrToEndpoint map[string]string
}
func newTransportCredential(cfg *tls.Config) *transportCredential {
return &transportCredential{
gtc: grpccredentials.NewTLS(cfg),
gtc: grpccredentials.NewTLS(cfg),
addrToEndpoint: map[string]string{},
}
}
func (tc *transportCredential) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) {
// Only overwrite when authority is an IP address!
// Let's say, a server runs SRV records on "etcd.local" that resolves
// to "m1.etcd.local", and its SAN field also includes "m1.etcd.local".
// But what if SAN does not include its resolved IP address (e.g. 127.0.0.1)?
// Then, the server should only authenticate using its DNS hostname "m1.etcd.local",
// instead of overwriting it with its IP address.
// And we do not overwrite "localhost" either. Only overwrite IP addresses!
if isIP(authority) {
target := rawConn.RemoteAddr().String()
if authority != target {
// When user dials with "grpc.WithDialer", "grpc.DialContext" "cc.parsedTarget"
// update only happens once. This is problematic, because when TLS is enabled,
// retries happen through "grpc.WithDialer" with static "cc.parsedTarget" from
// the initial dial call.
// If the server authenticates by IP addresses, we want to set a new endpoint as
// a new authority. Otherwise
// "transport: authentication handshake failed: x509: certificate is valid for 127.0.0.1, 192.168.121.180, not 192.168.223.156"
// when the new dial target is "192.168.121.180" whose certificate host name is also "192.168.121.180"
// but client tries to authenticate with previously set "cc.parsedTarget" field "192.168.223.156"
authority = target
}
// Set the authority when checking the endpoint's cert to the hostname or IP of the dialed endpoint
tc.mu.Lock()
dialEp, ok := tc.addrToEndpoint[rawConn.RemoteAddr().String()]
tc.mu.Unlock()
if ok {
_, host, _ := endpoint.ParseEndpoint(dialEp)
authority = host
}
return tc.gtc.ClientHandshake(ctx, authority, rawConn)
}
@ -115,8 +115,15 @@ func (tc *transportCredential) Info() grpccredentials.ProtocolInfo {
}
func (tc *transportCredential) Clone() grpccredentials.TransportCredentials {
copy := map[string]string{}
tc.mu.Lock()
for k, v := range tc.addrToEndpoint {
copy[k] = v
}
tc.mu.Unlock()
return &transportCredential{
gtc: tc.gtc.Clone(),
gtc: tc.gtc.Clone(),
addrToEndpoint: copy,
}
}
@ -124,6 +131,17 @@ func (tc *transportCredential) OverrideServerName(serverNameOverride string) err
return tc.gtc.OverrideServerName(serverNameOverride)
}
func (tc *transportCredential) Dialer(ctx context.Context, dialEp string) (net.Conn, error) {
// Keep track of which addresses are dialed for which endpoints
conn, err := endpoint.Dialer(ctx, dialEp)
if conn != nil {
tc.mu.Lock()
tc.addrToEndpoint[conn.RemoteAddr().String()] = dialEp
tc.mu.Unlock()
}
return conn, err
}
// perRPCCredential implements "grpccredentials.PerRPCCredentials" interface.
type perRPCCredential struct {
authToken string

View File

@ -249,7 +249,6 @@ func ExampleKV_txn() {
}
gresp, err := kvc.Get(context.TODO(), "key")
cancel()
if err != nil {
log.Fatal(err)
}

View File

@ -276,8 +276,7 @@ func TestMemberPromote(t *testing.T) {
select {
case <-time.After(500 * time.Millisecond):
case <-timeout:
t.Errorf("failed all attempts to promote learner member, last error: %v", err)
break
t.Fatalf("failed all attempts to promote learner member, last error: %v", err)
}
_, err = capi.MemberPromote(context.Background(), learnerID)

View File

@ -1,61 +1,3 @@
## CoreOS Community Code of Conduct
## etcd Community Code of Conduct
### Contributor Code of Conduct
As contributors and maintainers of this project, and in the interest of
fostering an open and welcoming community, we pledge to respect all people who
contribute through reporting issues, posting feature requests, updating
documentation, submitting pull requests or patches, and other activities.
We are committed to making participation in this project a harassment-free
experience for everyone, regardless of level of experience, gender, gender
identity and expression, sexual orientation, disability, personal appearance,
body size, race, ethnicity, age, religion, or nationality.
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery
* Personal attacks
* Trolling or insulting/derogatory comments
* Public or private harassment
* Publishing others' private information, such as physical or electronic addresses, without explicit permission
* Other unethical or unprofessional conduct.
Project maintainers have the right and responsibility to remove, edit, or
reject comments, commits, code, wiki edits, issues, and other contributions
that are not aligned to this Code of Conduct. By adopting this Code of Conduct,
project maintainers commit themselves to fairly and consistently applying these
principles to every aspect of managing this project. Project maintainers who do
not follow or enforce the Code of Conduct may be permanently removed from the
project team.
This code of conduct applies both within project spaces and in public spaces
when an individual is representing the project or its community.
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported by contacting a project maintainer, Brandon Philips
<brandon.philips@coreos.com>, and/or Rithu John <rithu.john@coreos.com>.
This Code of Conduct is adapted from the Contributor Covenant
(http://contributor-covenant.org), version 1.2.0, available at
http://contributor-covenant.org/version/1/2/0/
### CoreOS Events Code of Conduct
CoreOS events are working conferences intended for professional networking and
collaboration in the CoreOS community. Attendees are expected to behave
according to professional standards and in accordance with their employers
policies on appropriate workplace behavior.
While at CoreOS events or related social networking opportunities, attendees
should not engage in discriminatory or offensive speech or actions including
but not limited to gender, sexuality, race, age, disability, or religion.
Speakers should be especially aware of these concerns.
CoreOS does not condone any statements by speakers contrary to these standards.
CoreOS reserves the right to deny entrance and/or eject from an event (without
refund) any individual found to be engaging in discriminatory or offensive
speech or actions.
Please bring any concerns to the immediate attention of designated on-site
staff, Brandon Philips <brandon.philips@coreos.com>, and/or Rithu John <rithu.john@coreos.com>.
etcd follows the [CNCF Code of Conduct](https://github.com/cncf/foundation/blob/master/code-of-conduct.md).

View File

@ -303,8 +303,8 @@ type Config struct {
// It can be multiple when "Logger" is zap.
LogOutputs []string `json:"log-outputs"`
// zapLoggerBuilder is used to build the zap logger.
zapLoggerBuilder func(*Config) error
// ZapLoggerBuilder is used to build the zap logger.
ZapLoggerBuilder func(*Config) error
// logger logs server-side operations. The default is nil,
// and "setupLogging" must be called before starting server.

View File

@ -181,8 +181,8 @@ func (cfg *Config) setupLogging() error {
// TODO: remove "Debug" check in v3.5
grpc.EnableTracing = true
}
if cfg.zapLoggerBuilder == nil {
cfg.zapLoggerBuilder = func(c *Config) error {
if cfg.ZapLoggerBuilder == nil {
cfg.ZapLoggerBuilder = func(c *Config) error {
var err error
c.logger, err = copied.Build()
if err != nil {
@ -235,8 +235,8 @@ func (cfg *Config) setupLogging() error {
syncer,
lvl,
)
if cfg.zapLoggerBuilder == nil {
cfg.zapLoggerBuilder = func(c *Config) error {
if cfg.ZapLoggerBuilder == nil {
cfg.ZapLoggerBuilder = func(c *Config) error {
c.logger = zap.New(cr, zap.AddCaller(), zap.ErrorOutput(syncer))
c.loggerMu.Lock()
defer c.loggerMu.Unlock()
@ -252,7 +252,7 @@ func (cfg *Config) setupLogging() error {
}
}
err := cfg.zapLoggerBuilder(cfg)
err := cfg.ZapLoggerBuilder(cfg)
if err != nil {
return err
}

View File

@ -16,7 +16,7 @@ ETCDCTL_CERT=/tmp/cert.pem
ETCDCTL_KEY=/tmp/key.pem
```
Prefix flag strings with `ETCDCTL_`, convert all letters to upper-case, and replace dash(`-`) with underscore(`_`).
Prefix flag strings with `ETCDCTL_`, convert all letters to upper-case, and replace dash(`-`) with underscore(`_`). Note that the environment variables with the prefix `ETCDCTL_` can only be used with the etcdctl global flags. Also, the environment variable `ETCDCTL_API` is a special case variable for etcdctl internal use only.
## Key-value commands

View File

@ -158,12 +158,14 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
if _, ok := (display).(*simplePrinter); ok {
ctx, cancel = commandCtx(cmd)
listResp, err := cli.MemberList(ctx)
// get latest member list; if there's failover new member might have outdated list
// make sure the member who served member list request has the latest member list.
syncedMemberSet := make(map[uint64]struct{})
syncedMemberSet[resp.Header.MemberId] = struct{}{} // the member who served member add is guaranteed to have the latest member list.
for {
if err != nil {
ExitWithError(ExitError, err)
}
if listResp.Header.MemberId == resp.Header.MemberId {
if _, ok := syncedMemberSet[listResp.Header.MemberId]; ok {
break
}
// quorum get to sync cluster list
@ -171,7 +173,7 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
if gerr != nil {
ExitWithError(ExitError, err)
}
resp.Header.MemberId = gresp.Header.MemberId
syncedMemberSet[gresp.Header.MemberId] = struct{}{}
listResp, err = cli.MemberList(ctx)
}
cancel()

View File

@ -240,7 +240,7 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"")
// additional metrics
fs.StringVar(&cfg.ec.Metrics, "metrics", cfg.ec.Metrics, "Set level of detail for exported metrics, specify 'extensive' to include histogram metrics")
fs.StringVar(&cfg.ec.Metrics, "metrics", cfg.ec.Metrics, "Set level of detail for exported metrics, specify 'extensive' to include server side grpc histogram metrics")
// auth
fs.StringVar(&cfg.ec.AuthToken, "auth-token", cfg.ec.AuthToken, "Specify auth token specific options.")

View File

@ -167,7 +167,7 @@ Profiling and Monitoring:
--enable-pprof 'false'
Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"
--metrics 'basic'
Set level of detail for exported metrics, specify 'extensive' to include histogram metrics.
Set level of detail for exported metrics, specify 'extensive' to include server side grpc histogram metrics.
--listen-metrics-urls ''
List of URLs to listen on for the metrics and health endpoints.

View File

@ -759,16 +759,21 @@ func ValidateClusterAndAssignIDs(lg *zap.Logger, local *RaftCluster, existing *R
if len(ems) != len(lms) {
return fmt.Errorf("member count is unequal")
}
sort.Sort(MembersByPeerURLs(ems))
sort.Sort(MembersByPeerURLs(lms))
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
defer cancel()
for i := range ems {
if ok, err := netutil.URLStringsEqual(ctx, lg, ems[i].PeerURLs, lms[i].PeerURLs); !ok {
return fmt.Errorf("unmatched member while checking PeerURLs (%v)", err)
var err error
ok := false
for j := range lms {
if ok, err = netutil.URLStringsEqual(ctx, lg, ems[i].PeerURLs, lms[j].PeerURLs); ok {
lms[j].ID = ems[i].ID
break
}
}
if !ok {
return fmt.Errorf("PeerURLs: no match found for existing member (%v, %v), last resolver error (%v)", ems[i].ID, ems[i].PeerURLs, err)
}
lms[i].ID = ems[i].ID
}
local.members = make(map[types.ID]*Member)
for _, m := range lms {

View File

@ -13,7 +13,7 @@ if ! [[ "${0}" =~ "scripts/docker-local-agent.sh" ]]; then
fi
if [[ -z "${GO_VERSION}" ]]; then
GO_VERSION=1.13
GO_VERSION=1.13.1
fi
echo "Running with GO_VERSION:" ${GO_VERSION}

View File

@ -6,7 +6,7 @@ if ! [[ "${0}" =~ "scripts/docker-local-tester.sh" ]]; then
fi
if [[ -z "${GO_VERSION}" ]]; then
GO_VERSION=1.13
GO_VERSION=1.13.1
fi
echo "Running with GO_VERSION:" ${GO_VERSION}

View File

@ -64,7 +64,6 @@ func Test_read(t *testing.T) {
InitialCorruptCheck: true,
Logger: "zap",
LogOutputs: []string{"/tmp/etcd-functional-1/etcd.log"},
Debug: true,
},
ClientCertData: "",
ClientCertPath: "",
@ -117,7 +116,6 @@ func Test_read(t *testing.T) {
InitialCorruptCheck: true,
Logger: "zap",
LogOutputs: []string{"/tmp/etcd-functional-2/etcd.log"},
Debug: true,
},
ClientCertData: "",
ClientCertPath: "",
@ -170,7 +168,6 @@ func Test_read(t *testing.T) {
InitialCorruptCheck: true,
Logger: "zap",
LogOutputs: []string{"/tmp/etcd-functional-3/etcd.log"},
Debug: true,
},
ClientCertData: "",
ClientCertPath: "",

2
go.mod
View File

@ -43,7 +43,7 @@ require (
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 // indirect
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2
google.golang.org/grpc v1.23.0
google.golang.org/grpc v1.24.0
gopkg.in/cheggaaa/pb.v1 v1.0.25
gopkg.in/yaml.v2 v2.2.2
sigs.k8s.io/yaml v1.1.0

4
go.sum
View File

@ -181,8 +181,8 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0 h1:AzbTB6ux+okLTzP8Ru1Xs41C303zdcfEht7MQnYJt5A=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.24.0 h1:vb/1TCsVn3DcJlQ0Gs1yB1pKI6Do2/QNwxdKqmc/b0s=
google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@ -7,14 +7,14 @@ Handles cherry-picks of PR(s) from etcd master to a stable etcd release branch a
Set the `UPSTREAM_REMOTE` and `FORK_REMOTE` environment variables.
`UPSTREAM_REMOTE` should be set to git remote name of `github.com/etcd-io/etcd`,
and `FORK_REMOTE` should be set to the git remote name of the forked etcd
repo (`github.com/${github-username}/etcd`). Use `git remotes -v` to
repo (`github.com/${github-username}/etcd`). Use `git remote -v` to
look up the git remote names. If etcd has not been forked, create
one on github.com and register it locally with `git remote add ...`.
```
export UPSTREAM_REMOTE=origin
export FORK_REMOTE=${github-username}
export UPSTREAM_REMOTE=upstream
export FORK_REMOTE=origin
export GITHUB_USER=${github-username}
```

View File

@ -23,30 +23,30 @@ import (
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/contrib/recipes"
recipe "go.etcd.io/etcd/contrib/recipes"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/testutil"
)
func TestMutexSingleNode(t *testing.T) {
func TestMutexLockSingleNode(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
var clients []*clientv3.Client
testMutex(t, 5, makeSingleNodeClients(t, clus.cluster, &clients))
testMutexLock(t, 5, makeSingleNodeClients(t, clus.cluster, &clients))
closeClients(t, clients)
}
func TestMutexMultiNode(t *testing.T) {
func TestMutexLockMultiNode(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
var clients []*clientv3.Client
testMutex(t, 5, makeMultiNodeClients(t, clus.cluster, &clients))
testMutexLock(t, 5, makeMultiNodeClients(t, clus.cluster, &clients))
closeClients(t, clients)
}
func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
func testMutexLock(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
// stream lock acquisitions
lockedC := make(chan *concurrency.Mutex)
for i := 0; i < waiters; i++ {
@ -82,6 +82,62 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client)
}
}
func TestMutexTryLockSingleNode(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
var clients []*clientv3.Client
testMutexTryLock(t, 5, makeSingleNodeClients(t, clus.cluster, &clients))
closeClients(t, clients)
}
func TestMutexTryLockMultiNode(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
var clients []*clientv3.Client
testMutexTryLock(t, 5, makeMultiNodeClients(t, clus.cluster, &clients))
closeClients(t, clients)
}
func testMutexTryLock(t *testing.T, lockers int, chooseClient func() *clientv3.Client) {
lockedC := make(chan *concurrency.Mutex)
notlockedC := make(chan *concurrency.Mutex)
for i := 0; i < lockers; i++ {
go func() {
session, err := concurrency.NewSession(chooseClient())
if err != nil {
t.Error(err)
}
m := concurrency.NewMutex(session, "test-mutex-try-lock")
err = m.TryLock(context.TODO())
if err == nil {
lockedC <- m
} else if err == concurrency.ErrLocked {
notlockedC <- m
} else {
t.Errorf("Unexpected Error %v", err)
}
}()
}
timerC := time.After(time.Second)
select {
case <-lockedC:
for i := 0; i < lockers-1; i++ {
select {
case <-lockedC:
t.Fatalf("Multiple Mutes locked on same key")
case <-notlockedC:
case <-timerC:
t.Errorf("timed out waiting for lock")
}
}
case <-timerC:
t.Errorf("timed out waiting for lock")
}
}
// TestMutexSessionRelock ensures that acquiring the same lock with the same
// session will not result in deadlock.
func TestMutexSessionRelock(t *testing.T) {
@ -219,7 +275,7 @@ func BenchmarkMutex4Waiters(b *testing.B) {
clus := NewClusterV3(nil, &ClusterConfig{Size: 3})
defer clus.Terminate(nil)
for i := 0; i < b.N; i++ {
testMutex(nil, 4, func() *clientv3.Client { return clus.RandClient() })
testMutexLock(nil, 4, func() *clientv3.Client { return clus.RandClient() })
}
}

View File

@ -142,9 +142,6 @@ func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.Pro
if n := symdiff(incoming(c.Tracker.Voters), incoming(cfg.Voters)); n > 1 {
return tracker.Config{}, nil, errors.New("more than one voter changed without entering joint config")
}
if err := checkInvariants(cfg, prs); err != nil {
return tracker.Config{}, tracker.ProgressMap{}, nil
}
return checkAndReturn(cfg, prs)
}

View File

@ -57,6 +57,11 @@ function main {
cd release
setup_env "${PROJ}" "${VER}"
tarcmd=tar
if [[ $(go env GOOS) == "darwin" ]]; then
tarcmd=gtar
fi
for os in darwin windows linux; do
export GOOS=${os}
TARGET_ARCHS=("amd64")
@ -78,7 +83,7 @@ function main {
package "${TARGET}" "${PROJ}"
if [ ${GOOS} == "linux" ]; then
tar cfz "${TARGET}.tar.gz" "${TARGET}"
${tarcmd} cfz "${TARGET}.tar.gz" "${TARGET}"
echo "Wrote release/${TARGET}.tar.gz"
else
zip -qr "${TARGET}.zip" "${TARGET}"

View File

@ -147,7 +147,7 @@ main() {
# Generate SHA256SUMS
echo -e "Generating sha256sums of release artifacts.\n"
pushd ./release
grep . -E '\.tar.gz$|\.zip$' | xargs shasum -a 256 > ./SHA256SUMS
ls . | grep -E '\.tar.gz$|\.zip$' | xargs shasum -a 256 > ./SHA256SUMS
popd
if [ -s ./release/SHA256SUMS ]; then
cat ./release/SHA256SUMS
@ -185,7 +185,7 @@ main() {
docker push "quay.io/coreos/etcd:${RELEASE_VERSION}"
echo "Pushing container images to gcr.io ${RELEASE_VERSION}"
gcloud docker -- "push gcr.io/etcd-development/etcd:${RELEASE_VERSION}"
gcloud docker -- push "gcr.io/etcd-development/etcd:${RELEASE_VERSION}"
for TARGET_ARCH in "-arm64" "-ppc64le"; do
echo "Pushing container images to quay.io ${RELEASE_VERSION}${TARGET_ARCH}"

View File

@ -1,15 +1,16 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: grpc/health/v1/health.proto
package grpc_health_v1 // import "google.golang.org/grpc/health/grpc_health_v1"
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
package grpc_health_v1
import (
context "golang.org/x/net/context"
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
@ -21,7 +22,7 @@ var _ = math.Inf
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type HealthCheckResponse_ServingStatus int32
@ -38,6 +39,7 @@ var HealthCheckResponse_ServingStatus_name = map[int32]string{
2: "NOT_SERVING",
3: "SERVICE_UNKNOWN",
}
var HealthCheckResponse_ServingStatus_value = map[string]int32{
"UNKNOWN": 0,
"SERVING": 1,
@ -48,8 +50,9 @@ var HealthCheckResponse_ServingStatus_value = map[string]int32{
func (x HealthCheckResponse_ServingStatus) String() string {
return proto.EnumName(HealthCheckResponse_ServingStatus_name, int32(x))
}
func (HealthCheckResponse_ServingStatus) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_health_6b1a06aa67f91efd, []int{1, 0}
return fileDescriptor_e265fd9d4e077217, []int{1, 0}
}
type HealthCheckRequest struct {
@ -63,16 +66,17 @@ func (m *HealthCheckRequest) Reset() { *m = HealthCheckRequest{} }
func (m *HealthCheckRequest) String() string { return proto.CompactTextString(m) }
func (*HealthCheckRequest) ProtoMessage() {}
func (*HealthCheckRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_health_6b1a06aa67f91efd, []int{0}
return fileDescriptor_e265fd9d4e077217, []int{0}
}
func (m *HealthCheckRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_HealthCheckRequest.Unmarshal(m, b)
}
func (m *HealthCheckRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_HealthCheckRequest.Marshal(b, m, deterministic)
}
func (dst *HealthCheckRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_HealthCheckRequest.Merge(dst, src)
func (m *HealthCheckRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_HealthCheckRequest.Merge(m, src)
}
func (m *HealthCheckRequest) XXX_Size() int {
return xxx_messageInfo_HealthCheckRequest.Size(m)
@ -101,16 +105,17 @@ func (m *HealthCheckResponse) Reset() { *m = HealthCheckResponse{} }
func (m *HealthCheckResponse) String() string { return proto.CompactTextString(m) }
func (*HealthCheckResponse) ProtoMessage() {}
func (*HealthCheckResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_health_6b1a06aa67f91efd, []int{1}
return fileDescriptor_e265fd9d4e077217, []int{1}
}
func (m *HealthCheckResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_HealthCheckResponse.Unmarshal(m, b)
}
func (m *HealthCheckResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_HealthCheckResponse.Marshal(b, m, deterministic)
}
func (dst *HealthCheckResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_HealthCheckResponse.Merge(dst, src)
func (m *HealthCheckResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_HealthCheckResponse.Merge(m, src)
}
func (m *HealthCheckResponse) XXX_Size() int {
return xxx_messageInfo_HealthCheckResponse.Size(m)
@ -129,9 +134,34 @@ func (m *HealthCheckResponse) GetStatus() HealthCheckResponse_ServingStatus {
}
func init() {
proto.RegisterEnum("grpc.health.v1.HealthCheckResponse_ServingStatus", HealthCheckResponse_ServingStatus_name, HealthCheckResponse_ServingStatus_value)
proto.RegisterType((*HealthCheckRequest)(nil), "grpc.health.v1.HealthCheckRequest")
proto.RegisterType((*HealthCheckResponse)(nil), "grpc.health.v1.HealthCheckResponse")
proto.RegisterEnum("grpc.health.v1.HealthCheckResponse_ServingStatus", HealthCheckResponse_ServingStatus_name, HealthCheckResponse_ServingStatus_value)
}
func init() { proto.RegisterFile("grpc/health/v1/health.proto", fileDescriptor_e265fd9d4e077217) }
var fileDescriptor_e265fd9d4e077217 = []byte{
// 297 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x4e, 0x2f, 0x2a, 0x48,
0xd6, 0xcf, 0x48, 0x4d, 0xcc, 0x29, 0xc9, 0xd0, 0x2f, 0x33, 0x84, 0xb2, 0xf4, 0x0a, 0x8a, 0xf2,
0x4b, 0xf2, 0x85, 0xf8, 0x40, 0x92, 0x7a, 0x50, 0xa1, 0x32, 0x43, 0x25, 0x3d, 0x2e, 0x21, 0x0f,
0x30, 0xc7, 0x39, 0x23, 0x35, 0x39, 0x3b, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x44, 0x48, 0x82,
0x8b, 0xbd, 0x38, 0xb5, 0xa8, 0x2c, 0x33, 0x39, 0x55, 0x82, 0x51, 0x81, 0x51, 0x83, 0x33, 0x08,
0xc6, 0x55, 0xda, 0xc8, 0xc8, 0x25, 0x8c, 0xa2, 0xa1, 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0x55, 0xc8,
0x93, 0x8b, 0xad, 0xb8, 0x24, 0xb1, 0xa4, 0xb4, 0x18, 0xac, 0x81, 0xcf, 0xc8, 0x50, 0x0f, 0xd5,
0x22, 0x3d, 0x2c, 0x9a, 0xf4, 0x82, 0x41, 0x86, 0xe6, 0xa5, 0x07, 0x83, 0x35, 0x06, 0x41, 0x0d,
0x50, 0xf2, 0xe7, 0xe2, 0x45, 0x91, 0x10, 0xe2, 0xe6, 0x62, 0x0f, 0xf5, 0xf3, 0xf6, 0xf3, 0x0f,
0xf7, 0x13, 0x60, 0x00, 0x71, 0x82, 0x5d, 0x83, 0xc2, 0x3c, 0xfd, 0xdc, 0x05, 0x18, 0x85, 0xf8,
0xb9, 0xb8, 0xfd, 0xfc, 0x43, 0xe2, 0x61, 0x02, 0x4c, 0x42, 0xc2, 0x5c, 0xfc, 0x60, 0x8e, 0xb3,
0x6b, 0x3c, 0x4c, 0x0b, 0xb3, 0xd1, 0x3a, 0x46, 0x2e, 0x36, 0x88, 0xf5, 0x42, 0x01, 0x5c, 0xac,
0x60, 0x27, 0x08, 0x29, 0xe1, 0x75, 0x1f, 0x38, 0x14, 0xa4, 0x94, 0x89, 0xf0, 0x83, 0x50, 0x10,
0x17, 0x6b, 0x78, 0x62, 0x49, 0x72, 0x06, 0xd5, 0x4c, 0x34, 0x60, 0x74, 0x4a, 0xe4, 0x12, 0xcc,
0xcc, 0x47, 0x53, 0xea, 0xc4, 0x0d, 0x51, 0x1b, 0x00, 0x8a, 0xc6, 0x00, 0xc6, 0x28, 0x9d, 0xf4,
0xfc, 0xfc, 0xf4, 0x9c, 0x54, 0xbd, 0xf4, 0xfc, 0x9c, 0xc4, 0xbc, 0x74, 0xbd, 0xfc, 0xa2, 0x74,
0x7d, 0xe4, 0x78, 0x07, 0xb1, 0xe3, 0x21, 0xec, 0xf8, 0x32, 0xc3, 0x55, 0x4c, 0x7c, 0xee, 0x20,
0xd3, 0x20, 0x46, 0xe8, 0x85, 0x19, 0x26, 0xb1, 0x81, 0x93, 0x83, 0x31, 0x20, 0x00, 0x00, 0xff,
0xff, 0x12, 0x7d, 0x96, 0xcb, 0x2d, 0x02, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -239,6 +269,17 @@ type HealthServer interface {
Watch(*HealthCheckRequest, Health_WatchServer) error
}
// UnimplementedHealthServer can be embedded to have forward compatible implementations.
type UnimplementedHealthServer struct {
}
func (*UnimplementedHealthServer) Check(ctx context.Context, req *HealthCheckRequest) (*HealthCheckResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Check not implemented")
}
func (*UnimplementedHealthServer) Watch(req *HealthCheckRequest, srv Health_WatchServer) error {
return status.Errorf(codes.Unimplemented, "method Watch not implemented")
}
func RegisterHealthServer(s *grpc.Server, srv HealthServer) {
s.RegisterService(&_Health_serviceDesc, srv)
}
@ -300,28 +341,3 @@ var _Health_serviceDesc = grpc.ServiceDesc{
},
Metadata: "grpc/health/v1/health.proto",
}
func init() { proto.RegisterFile("grpc/health/v1/health.proto", fileDescriptor_health_6b1a06aa67f91efd) }
var fileDescriptor_health_6b1a06aa67f91efd = []byte{
// 297 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x4e, 0x2f, 0x2a, 0x48,
0xd6, 0xcf, 0x48, 0x4d, 0xcc, 0x29, 0xc9, 0xd0, 0x2f, 0x33, 0x84, 0xb2, 0xf4, 0x0a, 0x8a, 0xf2,
0x4b, 0xf2, 0x85, 0xf8, 0x40, 0x92, 0x7a, 0x50, 0xa1, 0x32, 0x43, 0x25, 0x3d, 0x2e, 0x21, 0x0f,
0x30, 0xc7, 0x39, 0x23, 0x35, 0x39, 0x3b, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x44, 0x48, 0x82,
0x8b, 0xbd, 0x38, 0xb5, 0xa8, 0x2c, 0x33, 0x39, 0x55, 0x82, 0x51, 0x81, 0x51, 0x83, 0x33, 0x08,
0xc6, 0x55, 0xda, 0xc8, 0xc8, 0x25, 0x8c, 0xa2, 0xa1, 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0x55, 0xc8,
0x93, 0x8b, 0xad, 0xb8, 0x24, 0xb1, 0xa4, 0xb4, 0x18, 0xac, 0x81, 0xcf, 0xc8, 0x50, 0x0f, 0xd5,
0x22, 0x3d, 0x2c, 0x9a, 0xf4, 0x82, 0x41, 0x86, 0xe6, 0xa5, 0x07, 0x83, 0x35, 0x06, 0x41, 0x0d,
0x50, 0xf2, 0xe7, 0xe2, 0x45, 0x91, 0x10, 0xe2, 0xe6, 0x62, 0x0f, 0xf5, 0xf3, 0xf6, 0xf3, 0x0f,
0xf7, 0x13, 0x60, 0x00, 0x71, 0x82, 0x5d, 0x83, 0xc2, 0x3c, 0xfd, 0xdc, 0x05, 0x18, 0x85, 0xf8,
0xb9, 0xb8, 0xfd, 0xfc, 0x43, 0xe2, 0x61, 0x02, 0x4c, 0x42, 0xc2, 0x5c, 0xfc, 0x60, 0x8e, 0xb3,
0x6b, 0x3c, 0x4c, 0x0b, 0xb3, 0xd1, 0x3a, 0x46, 0x2e, 0x36, 0x88, 0xf5, 0x42, 0x01, 0x5c, 0xac,
0x60, 0x27, 0x08, 0x29, 0xe1, 0x75, 0x1f, 0x38, 0x14, 0xa4, 0x94, 0x89, 0xf0, 0x83, 0x50, 0x10,
0x17, 0x6b, 0x78, 0x62, 0x49, 0x72, 0x06, 0xd5, 0x4c, 0x34, 0x60, 0x74, 0x4a, 0xe4, 0x12, 0xcc,
0xcc, 0x47, 0x53, 0xea, 0xc4, 0x0d, 0x51, 0x1b, 0x00, 0x8a, 0xc6, 0x00, 0xc6, 0x28, 0x9d, 0xf4,
0xfc, 0xfc, 0xf4, 0x9c, 0x54, 0xbd, 0xf4, 0xfc, 0x9c, 0xc4, 0xbc, 0x74, 0xbd, 0xfc, 0xa2, 0x74,
0x7d, 0xe4, 0x78, 0x07, 0xb1, 0xe3, 0x21, 0xec, 0xf8, 0x32, 0xc3, 0x55, 0x4c, 0x7c, 0xee, 0x20,
0xd3, 0x20, 0x46, 0xe8, 0x85, 0x19, 0x26, 0xb1, 0x81, 0x93, 0x83, 0x31, 0x20, 0x00, 0x00, 0xff,
0xff, 0x12, 0x7d, 0x96, 0xcb, 0x2d, 0x02, 0x00, 0x00,
}

View File

@ -107,8 +107,8 @@ func (*registerStream) isTransportResponseFrame() bool { return false }
type headerFrame struct {
streamID uint32
hf []hpack.HeaderField
endStream bool // Valid on server side.
initStream func(uint32) (bool, error) // Used only on the client side.
endStream bool // Valid on server side.
initStream func(uint32) error // Used only on the client side.
onWrite func()
wq *writeQuota // write quota for the stream created.
cleanup *cleanupStream // Valid on the server side.
@ -637,21 +637,17 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
func (l *loopyWriter) originateStream(str *outStream) error {
hdr := str.itl.dequeue().(*headerFrame)
sendPing, err := hdr.initStream(str.id)
if err != nil {
if err := hdr.initStream(str.id); err != nil {
if err == ErrConnClosing {
return err
}
// Other errors(errStreamDrain) need not close transport.
return nil
}
if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
return err
}
l.estdStreams[str.id] = str
if sendPing {
return l.pingHandler(&ping{data: [8]byte{}})
}
return nil
}

View File

@ -62,8 +62,6 @@ type http2Client struct {
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
// awakenKeepalive is used to wake up keepalive when after it has gone dormant.
awakenKeepalive chan struct{}
framer *framer
// controlBuf delivers all the control related tasks (e.g., window
@ -110,6 +108,16 @@ type http2Client struct {
// goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame.
goAwayReason GoAwayReason
// A condition variable used to signal when the keepalive goroutine should
// go dormant. The condition for dormancy is based on the number of active
// streams and the `PermitWithoutStream` keepalive client parameter. And
// since the number of active streams is guarded by the above mutex, we use
// the same for this condition variable as well.
kpDormancyCond *sync.Cond
// A boolean to track whether the keepalive goroutine is dormant or not.
// This is checked before attempting to signal the above condition
// variable.
kpDormant bool
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
@ -232,7 +240,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
awakenKeepalive: make(chan struct{}, 1),
framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
fc: &trInFlow{limit: uint32(icwz)},
scheme: scheme,
@ -264,9 +271,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
updateFlowControl: t.updateFlowControl,
}
}
// Make sure awakenKeepalive can't be written upon.
// keepalive routine will make it writable, if need be.
t.awakenKeepalive <- struct{}{}
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
@ -281,6 +285,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
}
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
}
// Start the reader goroutine for incoming message. Each transport has
@ -564,7 +569,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
hdr := &headerFrame{
hf: headerFields,
endStream: false,
initStream: func(id uint32) (bool, error) {
initStream: func(id uint32) error {
t.mu.Lock()
if state := t.state; state != reachable {
t.mu.Unlock()
@ -574,29 +579,19 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
err = ErrConnClosing
}
cleanup(err)
return false, err
return err
}
t.activeStreams[id] = s
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
var sendPing bool
// If the number of active streams change from 0 to 1, then check if keepalive
// has gone dormant. If so, wake it up.
if len(t.activeStreams) == 1 && t.keepaliveEnabled {
select {
case t.awakenKeepalive <- struct{}{}:
sendPing = true
// Fill the awakenKeepalive channel again as this channel must be
// kept non-writable except at the point that the keepalive()
// goroutine is waiting either to be awaken or shutdown.
t.awakenKeepalive <- struct{}{}
default:
}
// If the keepalive goroutine has gone dormant, wake it up.
if t.kpDormant {
t.kpDormancyCond.Signal()
}
t.mu.Unlock()
return sendPing, nil
return nil
},
onOrphaned: cleanup,
wq: s.wq,
@ -778,6 +773,11 @@ func (t *http2Client) Close() error {
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
if t.kpDormant {
// If the keepalive goroutine is blocked on this condition variable, we
// should unblock it so that the goroutine eventually exits.
t.kpDormancyCond.Signal()
}
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
@ -853,11 +853,11 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
return t.controlBuf.put(df)
}
func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
func (t *http2Client) getStream(f http2.Frame) *Stream {
t.mu.Lock()
defer t.mu.Unlock()
s, ok := t.activeStreams[f.Header().StreamID]
return s, ok
s := t.activeStreams[f.Header().StreamID]
t.mu.Unlock()
return s
}
// adjustWindow sends out extra window update over the initial window size
@ -937,8 +937,8 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
t.controlBuf.put(bdpPing)
}
// Select the right stream to dispatch.
s, ok := t.getStream(f)
if !ok {
s := t.getStream(f)
if s == nil {
return
}
if size > 0 {
@ -969,8 +969,8 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
}
func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
s, ok := t.getStream(f)
if !ok {
s := t.getStream(f)
if s == nil {
return
}
if f.ErrCode == http2.ErrCodeRefusedStream {
@ -1147,8 +1147,8 @@ func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
// operateHeaders takes action on the decoded headers.
func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
s, ok := t.getStream(frame)
if !ok {
s := t.getStream(frame)
if s == nil {
return
}
endStream := frame.StreamEnded()
@ -1303,29 +1303,32 @@ func (t *http2Client) keepalive() {
timer.Reset(t.kp.Time)
continue
}
// Check if keepalive should go dormant.
t.mu.Lock()
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// Make awakenKeepalive writable.
<-t.awakenKeepalive
if t.state == closing {
// If the transport is closing, we should exit from the
// keepalive goroutine here. If not, we could have a race
// between the call to Signal() from Close() and the call to
// Wait() here, whereby the keepalive goroutine ends up
// blocking on the condition variable which will never be
// signalled again.
t.mu.Unlock()
select {
case <-t.awakenKeepalive:
// If the control gets here a ping has been sent
// need to reset the timer with keepalive.Timeout.
case <-t.ctx.Done():
return
}
} else {
t.mu.Unlock()
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
// Send ping.
t.controlBuf.put(p)
return
}
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
t.kpDormant = true
t.kpDormancyCond.Wait()
}
t.kpDormant = false
t.mu.Unlock()
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
// We get here either because we were dormant and a new stream was
// created which unblocked the Wait() call, or because the
// keepalive timer expired. In both cases, we need to send a ping.
t.controlBuf.put(p)
// By the time control gets here a ping has been sent one way or the other.
timer.Reset(t.kp.Timeout)
select {
case <-timer.C:

View File

@ -65,8 +65,7 @@ var (
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
ctx context.Context
ctxDone <-chan struct{} // Cache the context.Done() chan
cancel context.CancelFunc
done chan struct{}
conn net.Conn
loopy *loopyWriter
readerDone chan struct{} // sync point to enable testing.
@ -138,7 +137,10 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
}
framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
// Send initial settings as connection preface to client.
var isettings []http2.Setting
isettings := []http2.Setting{{
ID: http2.SettingMaxFrameSize,
Val: http2MaxFrameLen,
}}
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
maxStreams := config.MaxStreams
@ -203,11 +205,10 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
t := &http2Server{
ctx: ctx,
cancel: cancel,
ctxDone: ctx.Done(),
ctx: context.Background(),
done: done,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
@ -228,7 +229,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
czData: new(channelzData),
bufferPool: newBufferPool(),
}
t.controlBuf = newControlBuffer(t.ctxDone)
t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
@ -359,12 +360,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
s.cancel()
return false
}
}
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
s.cancel()
return false
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
@ -375,12 +378,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
s.cancel()
return false
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
t.mu.Unlock()
// illegal gRPC stream id.
errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
s.cancel()
return true
}
t.maxStreamID = streamID
@ -882,7 +887,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
s.cancel()
select {
case <-t.ctx.Done():
case <-t.done:
return ErrConnClosing
default:
}
@ -904,7 +909,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
case <-t.ctx.Done():
case <-t.done:
return ErrConnClosing
default:
}
@ -970,7 +975,7 @@ func (t *http2Server) keepalive() {
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity)
case <-t.ctx.Done():
case <-t.done:
}
return
case <-keepalive.C:
@ -992,7 +997,7 @@ func (t *http2Server) keepalive() {
}
t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout)
case <-t.ctx.Done():
case <-t.done:
return
}
}
@ -1012,7 +1017,7 @@ func (t *http2Server) Close() error {
t.activeStreams = nil
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
close(t.done)
err := t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
@ -1152,7 +1157,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
select {
case <-t.drainChan:
case <-timer.C:
case <-t.ctx.Done():
case <-t.done:
return
}
t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
@ -1202,7 +1207,7 @@ func (t *http2Server) getOutFlowWindow() int64 {
select {
case sz := <-resp:
return int64(sz)
case <-t.ctxDone:
case <-t.done:
return -1
case <-timer.C:
return -2

View File

@ -667,6 +667,7 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList
writer: w,
fr: http2.NewFramer(w, r),
}
f.fr.SetMaxReadFrameSize(http2MaxFrameLen)
// Opt-in to Frame reuse API on framer to reduce garbage.
// Frames aren't safe to read from after a subsequent call to ReadFrame.
f.fr.SetReuseFrames()

View File

@ -310,6 +310,14 @@ func parseServiceConfig(js string) (*ServiceConfig, error) {
}
break
}
if sc.lbConfig == nil {
// We had a loadBalancingConfig field but did not encounter a
// supported policy. The config is considered invalid in this
// case.
err := fmt.Errorf("invalid loadBalancingConfig: no supported policies found")
grpclog.Warningf(err.Error())
return nil, err
}
}
if rsc.MethodConfig == nil {

View File

@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
const Version = "1.23.0"
const Version = "1.24.0"