mirror of
				https://github.com/etcd-io/etcd.git
				synced 2024-09-27 06:25:44 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			179 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			179 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2015 The etcd Authors
 | |
| //
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| //     http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package rafthttp
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"io"
 | |
| 	"runtime"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"go.uber.org/zap"
 | |
| 
 | |
| 	"go.etcd.io/etcd/client/pkg/v3/types"
 | |
| 	"go.etcd.io/etcd/pkg/v3/pbutil"
 | |
| 	stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
 | |
| 	"go.etcd.io/raft/v3"
 | |
| 	"go.etcd.io/raft/v3/raftpb"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	connPerPipeline = 4
 | |
| 	// pipelineBufSize is the size of pipeline buffer, which helps hold the
 | |
| 	// temporary network latency.
 | |
| 	// The size ensures that pipeline does not drop messages when the network
 | |
| 	// is out of work for less than 1 second in good path.
 | |
| 	pipelineBufSize = 64
 | |
| )
 | |
| 
 | |
| var errStopped = errors.New("stopped")
 | |
| 
 | |
| type pipeline struct {
 | |
| 	peerID types.ID
 | |
| 
 | |
| 	tr     *Transport
 | |
| 	picker *urlPicker
 | |
| 	status *peerStatus
 | |
| 	raft   Raft
 | |
| 	errorc chan error
 | |
| 	// deprecate when we depercate v2 API
 | |
| 	followerStats *stats.FollowerStats
 | |
| 
 | |
| 	msgc chan raftpb.Message
 | |
| 	// wait for the handling routines
 | |
| 	wg    sync.WaitGroup
 | |
| 	stopc chan struct{}
 | |
| }
 | |
| 
 | |
| func (p *pipeline) start() {
 | |
| 	p.stopc = make(chan struct{})
 | |
| 	p.msgc = make(chan raftpb.Message, pipelineBufSize)
 | |
| 	p.wg.Add(connPerPipeline)
 | |
| 	for i := 0; i < connPerPipeline; i++ {
 | |
| 		go p.handle()
 | |
| 	}
 | |
| 
 | |
| 	if p.tr != nil && p.tr.Logger != nil {
 | |
| 		p.tr.Logger.Info(
 | |
| 			"started HTTP pipelining with remote peer",
 | |
| 			zap.String("local-member-id", p.tr.ID.String()),
 | |
| 			zap.String("remote-peer-id", p.peerID.String()),
 | |
| 		)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *pipeline) stop() {
 | |
| 	close(p.stopc)
 | |
| 	p.wg.Wait()
 | |
| 
 | |
| 	if p.tr != nil && p.tr.Logger != nil {
 | |
| 		p.tr.Logger.Info(
 | |
| 			"stopped HTTP pipelining with remote peer",
 | |
| 			zap.String("local-member-id", p.tr.ID.String()),
 | |
| 			zap.String("remote-peer-id", p.peerID.String()),
 | |
| 		)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *pipeline) handle() {
 | |
| 	defer p.wg.Done()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case m := <-p.msgc:
 | |
| 			start := time.Now()
 | |
| 			err := p.post(pbutil.MustMarshal(&m))
 | |
| 			end := time.Now()
 | |
| 
 | |
| 			if err != nil {
 | |
| 				p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
 | |
| 
 | |
| 				if isMsgApp(m) && p.followerStats != nil {
 | |
| 					p.followerStats.Fail()
 | |
| 				}
 | |
| 				p.raft.ReportUnreachable(m.To)
 | |
| 				if isMsgSnap(m) {
 | |
| 					p.raft.ReportSnapshot(m.To, raft.SnapshotFailure)
 | |
| 				}
 | |
| 				sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			p.status.activate()
 | |
| 			if isMsgApp(m) && p.followerStats != nil {
 | |
| 				p.followerStats.Succ(end.Sub(start))
 | |
| 			}
 | |
| 			if isMsgSnap(m) {
 | |
| 				p.raft.ReportSnapshot(m.To, raft.SnapshotFinish)
 | |
| 			}
 | |
| 			sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size()))
 | |
| 		case <-p.stopc:
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // post POSTs a data payload to a url. Returns nil if the POST succeeds,
 | |
| // error on any failure.
 | |
| func (p *pipeline) post(data []byte) (err error) {
 | |
| 	u := p.picker.pick()
 | |
| 	req := createPostRequest(p.tr.Logger, u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID)
 | |
| 
 | |
| 	done := make(chan struct{}, 1)
 | |
| 	ctx, cancel := context.WithCancel(context.Background())
 | |
| 	req = req.WithContext(ctx)
 | |
| 	go func() {
 | |
| 		select {
 | |
| 		case <-done:
 | |
| 			cancel()
 | |
| 		case <-p.stopc:
 | |
| 			waitSchedule()
 | |
| 			cancel()
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	resp, err := p.tr.pipelineRt.RoundTrip(req)
 | |
| 	done <- struct{}{}
 | |
| 	if err != nil {
 | |
| 		p.picker.unreachable(u)
 | |
| 		return err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 	b, err := io.ReadAll(resp.Body)
 | |
| 	if err != nil {
 | |
| 		p.picker.unreachable(u)
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	err = checkPostResponse(p.tr.Logger, resp, b, req, p.peerID)
 | |
| 	if err != nil {
 | |
| 		p.picker.unreachable(u)
 | |
| 		// errMemberRemoved is a critical error since a removed member should
 | |
| 		// always be stopped. So we use reportCriticalError to report it to errorc.
 | |
| 		if err == errMemberRemoved {
 | |
| 			reportCriticalError(err, p.errorc)
 | |
| 		}
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // waitSchedule waits other goroutines to be scheduled for a while
 | |
| func waitSchedule() { runtime.Gosched() }
 | 
