Update grpc-gateway to 1.3.1 (#11843)

This commit is contained in:
Changxin Miao
2020-05-07 06:32:08 +08:00
committed by GitHub
parent 7e20b9ff91
commit 1b5e2f4305
9 changed files with 127 additions and 24 deletions

View File

@@ -2,6 +2,10 @@ package runtime
import (
"strconv"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/ptypes/duration"
"github.com/golang/protobuf/ptypes/timestamp"
)
// String just returns the given string.
@@ -56,3 +60,17 @@ func Uint32(val string) (uint32, error) {
}
return uint32(i), nil
}
// Timestamp converts the given RFC3339 formatted string into a timestamp.Timestamp.
func Timestamp(val string) (*timestamp.Timestamp, error) {
var r *timestamp.Timestamp
err := jsonpb.UnmarshalString(val, r)
return r, err
}
// Duration converts the given string into a timestamp.Duration.
func Duration(val string) (*duration.Duration, error) {
var r *duration.Duration
err := jsonpb.UnmarshalString(val, r)
return r, err
}

View File

@@ -34,34 +34,47 @@ func ForwardResponseStream(ctx context.Context, mux *ServeMux, marshaler Marshal
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Content-Type", marshaler.ContentType())
if err := handleForwardResponseOptions(ctx, w, nil, opts); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
HTTPError(ctx, mux, marshaler, w, req, err)
return
}
w.WriteHeader(http.StatusOK)
f.Flush()
var delimiter []byte
if d, ok := marshaler.(Delimited); ok {
delimiter = d.Delimiter()
} else {
delimiter = []byte("\n")
}
var wroteHeader bool
for {
resp, err := recv()
if err == io.EOF {
return
}
if err != nil {
handleForwardResponseStreamError(marshaler, w, err)
handleForwardResponseStreamError(wroteHeader, marshaler, w, err)
return
}
if err := handleForwardResponseOptions(ctx, w, resp, opts); err != nil {
handleForwardResponseStreamError(marshaler, w, err)
handleForwardResponseStreamError(wroteHeader, marshaler, w, err)
return
}
buf, err := marshaler.Marshal(streamChunk(resp, nil))
if err != nil {
grpclog.Printf("Failed to marshal response chunk: %v", err)
handleForwardResponseStreamError(wroteHeader, marshaler, w, err)
return
}
if _, err = w.Write(buf); err != nil {
grpclog.Printf("Failed to send response chunk: %v", err)
return
}
wroteHeader = true
if _, err = w.Write(delimiter); err != nil {
grpclog.Printf("Failed to send delimiter chunk: %v", err)
return
}
f.Flush()
}
}
@@ -134,13 +147,20 @@ func handleForwardResponseOptions(ctx context.Context, w http.ResponseWriter, re
return nil
}
func handleForwardResponseStreamError(marshaler Marshaler, w http.ResponseWriter, err error) {
func handleForwardResponseStreamError(wroteHeader bool, marshaler Marshaler, w http.ResponseWriter, err error) {
buf, merr := marshaler.Marshal(streamChunk(nil, err))
if merr != nil {
grpclog.Printf("Failed to marshal an error: %v", merr)
return
}
if _, werr := fmt.Fprintf(w, "%s\n", buf); werr != nil {
if !wroteHeader {
s, ok := status.FromError(err)
if !ok {
s = status.New(codes.Unknown, err.Error())
}
w.WriteHeader(HTTPStatusFromCode(s.Code()))
}
if _, werr := w.Write(buf); werr != nil {
grpclog.Printf("Failed to notify error to client: %v", werr)
return
}

View File

@@ -35,3 +35,8 @@ func (j *JSONBuiltin) NewDecoder(r io.Reader) Decoder {
func (j *JSONBuiltin) NewEncoder(w io.Writer) Encoder {
return json.NewEncoder(w)
}
// Delimiter for newline encoded JSON streams.
func (j *JSONBuiltin) Delimiter() []byte {
return []byte("\n")
}

View File

@@ -182,3 +182,8 @@ type protoEnum interface {
}
var typeProtoMessage = reflect.TypeOf((*proto.Message)(nil)).Elem()
// Delimiter for newline encoded JSON streams.
func (j *JSONPb) Delimiter() []byte {
return []byte("\n")
}

View File

@@ -40,3 +40,9 @@ type EncoderFunc func(v interface{}) error
// Encode delegates invocations to the underlying function itself.
func (f EncoderFunc) Encode(v interface{}) error { return f(v) }
// Delimited defines the streaming delimiter.
type Delimited interface {
// Delimiter returns the record seperator for the stream.
Delimiter() []byte
}