Update module google.golang.org/grpc to v1.81.0

Signed-off-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
This commit is contained in:
renovate[bot]
2026-05-04 06:00:01 +00:00
committed by GitHub
parent 9f02e222c4
commit afd4d3d71f
19 changed files with 542 additions and 50 deletions

2
go.mod
View File

@@ -72,7 +72,7 @@ require (
golang.org/x/sync v0.20.0
golang.org/x/sys v0.43.0
golang.org/x/term v0.42.0
google.golang.org/grpc v1.80.0
google.golang.org/grpc v1.81.0
google.golang.org/protobuf v1.36.11
gopkg.in/inf.v0 v0.9.1
gopkg.in/yaml.v3 v3.0.1

4
go.sum
View File

@@ -560,8 +560,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 h1:
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:7QBABkRtR8z+TEnmXTqIqwJLlzrZKVfAUm7tY3yGv0M=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 h1:m8qni9SQFH0tJc1X0vmnpw/0t+AImlSvp30sEupozUg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
google.golang.org/grpc v1.81.0 h1:W3G9N3KQf3BU+YuCtGKJk0CmxQNbAISICD/9AORxLIw=
google.golang.org/grpc v1.81.0/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@@ -24,10 +24,12 @@ import (
"fmt"
"math"
"net/url"
"os"
"slices"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"google.golang.org/grpc/balancer"
@@ -1268,8 +1270,9 @@ type addrConn struct {
channelz *channelz.SubChannel
localityLabel string
backendServiceLabel string
localityLabel string
backendServiceLabel string
disconnectErrorLabel string
}
// Note: this requires a lock on ac.mu.
@@ -1286,9 +1289,14 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
// TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second
// part of the if condition below once the issue is fixed.
if ac.state == connectivity.Ready || (ac.state == connectivity.Connecting && s == connectivity.Idle) {
disconnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel, "unknown")
disconnectError := ac.disconnectErrorLabel
if disconnectError == "" {
disconnectError = "unknown"
}
disconnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel, disconnectError)
openConnectionsMetric.Record(ac.cc.metricsRecorderList, -1, ac.cc.target, ac.backendServiceLabel, ac.securityLevelLocked(), ac.localityLabel)
}
ac.disconnectErrorLabel = "" // Reset for next time
ac.state = s
ac.channelz.ChannelMetrics.State.Store(&s)
if lastErr == nil {
@@ -1483,11 +1491,11 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,
addr.ServerName = ac.cc.getServerName(addr)
hctx, hcancel := context.WithCancel(ctx)
onClose := func(r transport.GoAwayReason) {
onClose := func(info transport.GoAwayInfo) {
ac.mu.Lock()
defer ac.mu.Unlock()
// adjust params based on GoAwayReason
ac.adjustParams(r)
ac.adjustParams(info.Reason)
if ctx.Err() != nil {
// Already shut down or connection attempt canceled. tearDown() or
// updateAddrs() already cleared the transport and canceled hctx
@@ -1504,6 +1512,7 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,
return
}
ac.transport = nil
ac.disconnectErrorLabel = disconnectErrorString(info)
// Refresh the name resolver on any connection loss.
ac.cc.resolveNow(resolver.ResolveNowOptions{})
// Always go idle and wait for the LB policy to initiate a new
@@ -1560,6 +1569,32 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,
return nil
}
// disconnectErrorString returns the grpc.disconnect_error metric label corresponding
// to the provided transport.GoAwayInfo, as specified by gRFC A94:
// https://github.com/grpc/proposal/blob/master/A94-grpc-subchannel-disconnections-metrics.md
func disconnectErrorString(info transport.GoAwayInfo) string {
err := info.Err
var sysErr syscall.Errno
switch {
case info.Reason != transport.GoAwayInvalid:
return fmt.Sprintf("GOAWAY %s", info.GoAwayCode.String())
case err == nil:
return "unknown"
case errors.Is(err, context.Canceled):
return "subchannel shutdown"
case errors.Is(err, syscall.ECONNRESET):
return "connection reset"
case errors.Is(err, syscall.ETIMEDOUT), errors.Is(err, context.DeadlineExceeded), errors.Is(err, os.ErrDeadlineExceeded):
return "connection timed out"
case errors.Is(err, syscall.ECONNABORTED):
return "connection aborted"
case errors.As(err, &sysErr):
return "socket error"
default:
return "unknown"
}
}
// startHealthCheck starts the health checking stream (RPC) to watch the health
// stats of this connection if health checking is requested and configured.
//
@@ -1663,6 +1698,9 @@ func (ac *addrConn) tearDown(err error) {
}
curTr := ac.transport
ac.transport = nil
if ac.disconnectErrorLabel == "" {
ac.disconnectErrorLabel = "subchannel shutdown"
}
// We have to set the state to Shutdown before anything else to prevent races
// between setting the state and logic that waits on context cancellation / etc.
ac.updateConnectivityState(connectivity.Shutdown, nil)

View File

@@ -20,10 +20,27 @@
package stats
import (
"context"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/stats"
)
type customLabelKey struct{}
// NewContextWithCustomLabel returns a new context with the provided custom label
// attached. The label will be propagated to all metric instruments specified in gRFC A108.
func NewContextWithCustomLabel(ctx context.Context, label string) context.Context {
return context.WithValue(ctx, customLabelKey{}, label)
}
// CustomLabelFromContext returns the custom label from the context if it exists.
// If the custom label is not present, it returns an empty string.
func CustomLabelFromContext(ctx context.Context) string {
label, _ := ctx.Value(customLabelKey{}).(string)
return label
}
// MetricsRecorder records on metrics derived from metric registry.
// Implementors must embed UnimplementedMetricsRecorder.
type MetricsRecorder interface {

View File

@@ -126,6 +126,16 @@ var (
// enabled by setting the env variable
// GRPC_EXPERIMENTAL_ENABLE_PRIORITY_LB_CHILD_POLICY_CACHE to true.
EnablePriorityLBChildPolicyCache = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_PRIORITY_LB_CHILD_POLICY_CACHE", false)
// EnableHTTPFramerReadBufferPooling enables the use of the
// readyreader.Reader interface to perform non-memory-pinning reads,
// provided the underlying net.Conn supports it. This reduces memory usage
// when subchannels are idle.
//
// This environment variable serves as an escape hatch to disable the
// feature if unforeseen issues arise, and it will be removed in a future
// release.
EnableHTTPFramerReadBufferPooling = boolFromEnv("GRPC_GO_EXPERIMENTAL_HTTP_FRAMER_READ_BUFFER_POOLING", true)
)
func boolFromEnv(envVar string, def bool) bool {

View File

@@ -79,4 +79,14 @@ var (
// xDS bootstrap configuration via the `call_creds` field. For more details,
// see: https://github.com/grpc/proposal/blob/master/A97-xds-jwt-call-creds.md
XDSBootstrapCallCredsEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_BOOTSTRAP_CALL_CREDS", false)
// XDSSNIEnabled controls if gRPC should send SNI information in xDS
// configured TLS handshakes. For more details, see:
// https://github.com/grpc/proposal/blob/master/A101-SNI-setting-and-SNI-SAN-validation.md
XDSSNIEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_SNI", false)
// XDSORCAToLRSPropEnabled controls whether ORCA metrics are explicitly
// filtered and prefix-propagated to the LRS server. For more details, see:
// https://github.com/grpc/proposal/blob/master/A85-lrs-custom-metrics-changes.md
XDSORCAToLRSPropEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION", false)
)

View File

@@ -73,7 +73,7 @@ type BinaryTieredBufferPool struct {
func NewBinaryTieredBufferPool(powerOfTwoExponents ...uint8) (*BinaryTieredBufferPool, error) {
return newBinaryTiered(func(size int) bufferPool {
return newSizedBufferPool(size, true)
}, &simpleBufferPool{shouldZero: true}, powerOfTwoExponents...)
}, &SimpleBufferPool{shouldZero: true}, powerOfTwoExponents...)
}
// NewDirtyBinaryTieredBufferPool returns a BufferPool backed by multiple
@@ -82,7 +82,7 @@ func NewBinaryTieredBufferPool(powerOfTwoExponents ...uint8) (*BinaryTieredBuffe
func NewDirtyBinaryTieredBufferPool(powerOfTwoExponents ...uint8) (*BinaryTieredBufferPool, error) {
return newBinaryTiered(func(size int) bufferPool {
return newSizedBufferPool(size, false)
}, &simpleBufferPool{shouldZero: false}, powerOfTwoExponents...)
}, NewDirtySimplePool(), powerOfTwoExponents...)
}
func newBinaryTiered(sizedPoolFactory func(int) bufferPool, fallbackPool bufferPool, powerOfTwoExponents ...uint8) (*BinaryTieredBufferPool, error) {
@@ -258,7 +258,7 @@ func newSizedBufferPool(size int, zero bool) *sizedBufferPool {
// buffer pools for different sizes of buffers.
type TieredBufferPool struct {
sizedPools []*sizedBufferPool
fallbackPool simpleBufferPool
fallbackPool SimpleBufferPool
}
// NewTieredBufferPool returns a BufferPool implementation that uses multiple
@@ -271,7 +271,7 @@ func NewTieredBufferPool(poolSizes ...int) *TieredBufferPool {
}
return &TieredBufferPool{
sizedPools: pools,
fallbackPool: simpleBufferPool{shouldZero: true},
fallbackPool: SimpleBufferPool{shouldZero: true},
}
}
@@ -297,16 +297,26 @@ func (p *TieredBufferPool) getPool(size int) bufferPool {
return p.sizedPools[poolIdx]
}
// simpleBufferPool is an implementation of the BufferPool interface that
// SimpleBufferPool is an implementation of the mem.BufferPool interface that
// attempts to pool buffers with a sync.Pool. When Get is invoked, it tries to
// acquire a buffer from the pool but if that buffer is too small, it returns it
// to the pool and creates a new one.
type simpleBufferPool struct {
type SimpleBufferPool struct {
pool sync.Pool
shouldZero bool
}
func (p *simpleBufferPool) Get(size int) *[]byte {
// NewDirtySimplePool constructs a [SimpleBufferPool]. It does not initialize
// the buffers before returning them. Callers must ensure they don't read the
// buffers before writing data to them.
func NewDirtySimplePool() *SimpleBufferPool {
return &SimpleBufferPool{
shouldZero: false,
}
}
// Get returns a buffer with specified length from the pool.
func (p *SimpleBufferPool) Get(size int) *[]byte {
bs, ok := p.pool.Get().(*[]byte)
if ok && cap(*bs) >= size {
if p.shouldZero {
@@ -333,6 +343,7 @@ func (p *simpleBufferPool) Get(size int) *[]byte {
return &b
}
func (p *simpleBufferPool) Put(buf *[]byte) {
// Put returns a buffer to the pool.
func (p *SimpleBufferPool) Put(buf *[]byte) {
p.pool.Put(buf)
}

View File

@@ -115,6 +115,9 @@ type ClientInterceptor interface {
// ClientStream after done is called, since the interceptor is invoked by
// application-layer operations. done must never be nil when called.
NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error)
// Close closes the interceptor. Once called, no new calls to NewStream are
// accepted. Ongoing calls to NewStream are allowed to complete.
Close()
}
// ServerInterceptor is an interceptor for incoming RPC's on gRPC server side.
@@ -123,6 +126,9 @@ type ServerInterceptor interface {
// information about connection RPC was received on, and HTTP Headers. This
// information will be piped into context.
AllowRPC(ctx context.Context) error // TODO: Make this a real interceptor for filters such as rate limiting.
// Close closes the interceptor. Once called, no new calls to NewStream are
// accepted. Ongoing calls to NewStream are allowed to complete.
Close()
}
type csKeyType string

View File

@@ -134,6 +134,8 @@ type http2Client struct {
// goAwayDebugMessage contains a detailed human readable string about a
// GoAway frame, useful for error messages.
goAwayDebugMessage string
// goAwayCode records the http2.ErrCode received with the GoAway frame.
goAwayCode http2.ErrCode
// A condition variable used to signal when the keepalive goroutine should
// go dormant. The condition for dormancy is based on the number of active
// streams and the `PermitWithoutStream` keepalive client parameter. And
@@ -147,7 +149,7 @@ type http2Client struct {
channelz *channelz.Socket
onClose func(GoAwayReason)
onClose OnCloseFunc
bufferPool mem.BufferPool
@@ -204,7 +206,7 @@ func isTemporary(err error) bool {
// NewHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ ClientTransport, err error) {
func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose OnCloseFunc) (_ ClientTransport, err error) {
scheme := "http"
ctx, cancel := context.WithCancel(ctx)
defer func() {
@@ -1015,7 +1017,7 @@ func (t *http2Client) Close(err error) {
// Call t.onClose ASAP to prevent the client from attempting to create new
// streams.
if t.state != draining {
t.onClose(GoAwayInvalid)
t.onClose(GoAwayInfo{Reason: GoAwayInvalid, GoAwayCode: http2.ErrCodeNo, Err: err})
}
t.state = closing
streams := t.activeStreams
@@ -1086,7 +1088,7 @@ func (t *http2Client) GracefulClose() {
if t.logger.V(logLevel) {
t.logger.Infof("GracefulClose called")
}
t.onClose(GoAwayInvalid)
t.onClose(GoAwayInfo{Reason: GoAwayInvalid, GoAwayCode: http2.ErrCodeNo})
t.state = draining
active := len(t.activeStreams)
t.mu.Unlock()
@@ -1236,7 +1238,10 @@ func (t *http2Client) handleData(f *parsedDataFrame) {
// The server has closed the stream without sending trailers. Record that
// the read direction is closed, and set the status appropriately.
if f.StreamEnded() {
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
// If client received END_STREAM from server while stream was still
// active, send RST_STREAM.
rstStream := s.getState() == streamActive
t.closeStream(s, io.EOF, rstStream, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
}
}
@@ -1372,7 +1377,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error {
// draining, to allow the client to stop attempting to create streams
// before disallowing new streams on this connection.
if t.state != draining {
t.onClose(t.goAwayReason)
t.onClose(GoAwayInfo{Reason: t.goAwayReason, GoAwayCode: t.goAwayCode})
t.state = draining
}
}
@@ -1422,6 +1427,7 @@ func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
} else {
t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %q", f.ErrCode, string(f.DebugData()))
}
t.goAwayCode = f.ErrCode
}
func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) {

View File

@@ -36,6 +36,9 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/envconfig"
imem "google.golang.org/grpc/internal/mem"
"google.golang.org/grpc/internal/transport/readyreader"
"google.golang.org/grpc/mem"
)
@@ -296,7 +299,7 @@ func decodeGrpcMessageUnchecked(msg string) string {
}
type bufWriter struct {
pool *sync.Pool
pool *imem.SimpleBufferPool
buf []byte
offset int
batchSize int
@@ -304,7 +307,7 @@ type bufWriter struct {
err error
}
func newBufWriter(conn io.Writer, batchSize int, pool *sync.Pool) *bufWriter {
func newBufWriter(conn io.Writer, batchSize int, pool *imem.SimpleBufferPool) *bufWriter {
w := &bufWriter{
batchSize: batchSize,
conn: conn,
@@ -326,7 +329,7 @@ func (w *bufWriter) Write(b []byte) (int, error) {
return n, toIOError(err)
}
if w.buf == nil {
b := w.pool.Get().(*[]byte)
b := w.pool.Get(w.batchSize)
w.buf = *b
}
written := 0
@@ -407,22 +410,32 @@ type framer struct {
errDetail error
}
var writeBufferPoolMap = make(map[int]*sync.Pool)
var writeBufferMutex sync.Mutex
var ioBufferPoolMap = make(map[int]*imem.SimpleBufferPool)
var ioBufferMutex sync.Mutex
func bufferedReader(r io.Reader, bufSize int) io.Reader {
if bufSize <= 0 {
return r
}
if envconfig.EnableHTTPFramerReadBufferPooling {
if rr := readyreader.NewNonBlocking(r); rr != nil {
readPool := ioBufferPool(bufSize)
return readyreader.NewBuffered(rr, bufSize, readPool)
}
}
return bufio.NewReaderSize(r, bufSize)
}
func newFramer(conn io.ReadWriter, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32, memPool mem.BufferPool) *framer {
if writeBufferSize < 0 {
writeBufferSize = 0
}
var r io.Reader = conn
if readBufferSize > 0 {
r = bufio.NewReaderSize(r, readBufferSize)
}
var pool *sync.Pool
r := bufferedReader(conn, readBufferSize)
var writePool *imem.SimpleBufferPool
if sharedWriteBuffer {
pool = getWriteBufferPool(writeBufferSize)
writePool = ioBufferPool(writeBufferSize)
}
w := newBufWriter(conn, writeBufferSize, pool)
w := newBufWriter(conn, writeBufferSize, writePool)
f := &framer{
writer: w,
fr: http2.NewFramer(w, r),
@@ -578,20 +591,15 @@ func (df *parsedDataFrame) Header() http2.FrameHeader {
return df.FrameHeader
}
func getWriteBufferPool(size int) *sync.Pool {
writeBufferMutex.Lock()
defer writeBufferMutex.Unlock()
pool, ok := writeBufferPoolMap[size]
func ioBufferPool(size int) *imem.SimpleBufferPool {
ioBufferMutex.Lock()
defer ioBufferMutex.Unlock()
pool, ok := ioBufferPoolMap[size]
if ok {
return pool
}
pool = &sync.Pool{
New: func() any {
b := make([]byte, size)
return &b
},
}
writeBufferPoolMap[size] = pool
pool = imem.NewDirtySimplePool()
ioBufferPoolMap[size] = pool
return pool
}

View File

@@ -0,0 +1,39 @@
/*
*
* Copyright 2026 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package readyreader
import "syscall"
func isRawConnSupported() bool {
return true
}
// sysRead uses the standard syscall package rather than the modern unix package
// to avoid triggering the race detector. Because both packages perform sync
// operations on a local variable to satisfy the race detector, mixing them
// for read and write syscalls causes data races. We use syscall here to remain
// consistent with net.Conn implementations in standard library.
func sysRead(fd uintptr, p []byte) (int, error) {
return syscall.Read(int(fd), p)
}
// wouldBlock checks standard Unix non-blocking errors.
func wouldBlock(err error) bool {
return err == syscall.EAGAIN || err == syscall.EWOULDBLOCK
}

View File

@@ -0,0 +1,35 @@
//go:build !linux
/*
*
* Copyright 2026 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package readyreader
func isRawConnSupported() bool {
return false
}
// sysRead is not implemented. Support can be added in the future if necessary.
func sysRead(uintptr, []byte) (int, error) {
panic("RawConn functionality is not implemented for non-unix platforms.")
}
// wouldBlock is not implemented. Support can be added in the future if necessary.
func wouldBlock(error) bool {
panic("RawConn functionality is not implemented for non-unix platforms.")
}

View File

@@ -0,0 +1,253 @@
/*
*
* Copyright 2026 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package readyreader provides utilities to perform non-memory-pinning reads.
package readyreader
import (
"io"
"net"
"syscall"
"google.golang.org/grpc/mem"
)
// Reader is an optional interface that can be implemented by [net.Conn]
// implementations to enable gRPC to perform non-memory-pinning reads.
type Reader interface {
// ReadOnReady waits for data to arrive, fetches a buffer, and performs a
// read. When the underlying IO is readable, it allocates a buffer of size
// bufSize from the pool and reads up to bufSize bytes into the buffer.
//
// It returns a pointer to the buffer so it can be returned to the pool
// later, the number of bytes read, and an error.
//
// Callers should always process the n > 0 bytes returned before considering
// the error. Doing so correctly handles I/O errors that happen after
// reading some bytes, as well as both of the allowed EOF behaviors.
ReadOnReady(bufSize int, pool mem.BufferPool) (b *[]byte, n int, err error)
}
// nonBlockingReader is optimized for non-memory-pinning reads using the RawConn
// interface.
type nonBlockingReader struct {
raw syscall.RawConn
// The following fields are stored as field to avoid heap allocations.
state readState
doRead func(fd uintptr) bool
}
type readState struct {
// Request params.
bufSize int
pool mem.BufferPool
// Response params.
readError error
bytesRead int
buf *[]byte
}
// NewNonBlocking returns a ReadyReader if the passed reader supports
// non-memory-pinning reads, else nil.
func NewNonBlocking(r io.Reader) Reader {
if rr, ok := r.(Reader); ok {
return rr
}
if !isRawConnSupported() {
return nil
}
// We restrict the types before asserting syscall.Conn. The credentials
// package may return a wrapper that implements syscall.Conn by embedding
// both the raw connection and the encrypted connection. If the code
// attempts to read directly from the raw syscall.RawConn, it would read
// encrypted data.
switch r.(type) {
case *net.TCPConn, *net.UDPConn, *net.UnixConn, *net.IPConn:
default:
return nil
}
sysConn, ok := r.(syscall.Conn)
if !ok {
return nil
}
raw, err := sysConn.SyscallConn()
if err != nil {
return nil
}
rr := &nonBlockingReader{raw: raw}
rr.doRead = func(fd uintptr) bool {
s := &rr.state
s.buf = s.pool.Get(s.bufSize)
s.bytesRead, s.readError = sysRead(fd, *s.buf)
if s.readError != nil {
s.pool.Put(s.buf)
s.buf = nil
}
return !wouldBlock(s.readError)
}
return rr
}
func (c *nonBlockingReader) ReadOnReady(bufSize int, pool mem.BufferPool) (*[]byte, int, error) {
c.state = readState{
pool: pool,
bufSize: bufSize,
}
err := c.raw.Read(c.doRead)
buf := c.state.buf
n := c.state.bytesRead
readErr := c.state.readError
c.state = readState{}
if err != nil {
if buf != nil {
pool.Put(buf)
}
return nil, 0, err
}
if readErr != nil {
// buffer is already released in the callback.
return nil, 0, readErr
}
if n == 0 {
// syscall.Read doesn't consider a graceful socket closure to be an
// error condition, but Go's io.Reader expects an EOF error.
pool.Put(buf)
return nil, 0, io.EOF
}
return buf, n, nil
}
type blockingReader struct {
reader io.Reader
}
func (c *blockingReader) ReadOnReady(bufSize int, pool mem.BufferPool) (*[]byte, int, error) {
buf := pool.Get(bufSize)
n, err := c.reader.Read(*buf)
if err != nil {
pool.Put(buf)
return nil, 0, err
}
return buf, n, nil
}
// New detects if [syscall.RawConn] is available for non-memory-pinning reads.
// If [syscall.RawConn] is unavailable, it falls back to using the simpler
// [io.Reader] interface for reads.
func New(r io.Reader) Reader {
if r := NewNonBlocking(r); r != nil {
return r
}
return &blockingReader{reader: r}
}
// bufReadyReader implements buffering for a ReadyReader object.
// A new bufReadyReader is created by calling [NewBuffered].
type bufReadyReader struct {
buf *[]byte
pool mem.BufferPool
bufSize int
rd Reader // reader provided by the caller
r, w int // buf read and write positions
err error
constPool constBufferPool // stored as a field to avoid heap allocations.
}
// NewBuffered returns a new [io.Reader] with a buffer of the specified size
// which is allocated from the provided pool.
func NewBuffered(rd Reader, size int, pool mem.BufferPool) io.Reader {
return &bufReadyReader{
rd: rd,
pool: pool,
bufSize: size,
}
}
func (b *bufReadyReader) readErr() error {
err := b.err
b.err = nil
return err
}
func (b *bufReadyReader) buffered() int { return b.w - b.r }
// Read reads data into p. It returns the number of bytes read into p. The
// bytes are taken from at most one Read on the underlying [ReadyReader],
// hence n may be less than len(p). If the underlying [ReadyReader] can return
// a non-zero count with io.EOF, then this Read method can do so as well; see
// the [io.Reader] docs.
func (b *bufReadyReader) Read(p []byte) (n int, err error) {
n = len(p)
if n == 0 {
if b.buffered() > 0 {
return 0, nil
}
return 0, b.readErr()
}
if b.r == b.w {
if b.err != nil {
return 0, b.readErr()
}
if len(p) >= b.bufSize {
// Large read, empty buffer.
// Read directly into p to avoid copy.
b.constPool.buffer = p
_, n, b.err = b.rd.ReadOnReady(len(p), &b.constPool)
return n, b.readErr()
}
// One read.
b.r = 0
b.w = 0
b.buf, n, b.err = b.rd.ReadOnReady(b.bufSize, b.pool)
if n == 0 {
if b.buf != nil {
b.pool.Put(b.buf)
b.buf = nil
}
return 0, b.readErr()
}
b.w += n
}
// copy as much as we can
// b.buf must be non-nil since b.r != b.w.
buf := *b.buf
n = copy(p, buf[b.r:b.w])
b.r += n
if b.r == b.w {
// Consumed entire buffer, release it.
b.pool.Put(b.buf)
b.buf = nil
}
return n, nil
}
type constBufferPool struct {
buffer []byte
}
func (p *constBufferPool) Get(int) *[]byte {
return &p.buffer
}
func (p *constBufferPool) Put(*[]byte) {}

View File

@@ -31,6 +31,7 @@ import (
"sync/atomic"
"time"
"golang.org/x/net/http2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
@@ -742,6 +743,22 @@ const (
GoAwayTooManyPings GoAwayReason = 2
)
// GoAwayInfo contains metadata about why a connection was closed.
type GoAwayInfo struct {
// Reason is the parsed reason for an HTTP/2 GOAWAY frame.
Reason GoAwayReason
// GoAwayCode is the raw HTTP/2 error code received in a GOAWAY frame.
GoAwayCode http2.ErrCode
// Err is the underlying error that caused the connection to close. It is
// populated if the connection was closed due to a socket error or context
// cancellation without receiving a GOAWAY frame. If the connection was
// closed due to a GOAWAY frame, this field will be nil.
Err error
}
// OnCloseFunc is a callback invoked when a ClientTransport closes.
type OnCloseFunc func(GoAwayInfo)
// ContextErr converts the error from context package into a status error.
func ContextErr(err error) error {
switch err {

View File

@@ -165,7 +165,7 @@ func (r *Reader) Close() error {
}
func (r *Reader) freeFirstBufferIfEmpty() bool {
if len(r.data) == 0 || r.bufferIdx != len(r.data[0].ReadOnlyData()) {
if len(r.data) == 0 || r.bufferIdx != r.data[0].Len() {
return false
}

View File

@@ -53,6 +53,10 @@ type Buffer interface {
Free()
// Len returns the Buffer's size.
Len() int
// Slice returns a new Buffer that is a view into this buffer's data
// from [start:end). The buffer is not modified. Panics if the buffer
// has been freed or if start/end are out of bounds.
Slice(start, end int) Buffer
split(n int) (left, right Buffer)
read(buf []byte) (int, Buffer)
@@ -180,6 +184,32 @@ func (b *buffer) Len() int {
return len(b.ReadOnlyData())
}
func (b *buffer) Slice(start, end int) Buffer {
if b.rootBuf == nil {
panic("Cannot slice freed buffer")
}
data := b.data[start:end] // access the data to check slice bounds
if len(data) == 0 {
return emptyBuffer{}
}
if len(data) == len(b.data) {
b.Ref()
return b
}
// We are creating a new reference (view) to a portion of the root buffer's
// data. Therefore, we must increment the reference count of the root buffer
// to ensure the underlying data is not freed while this view is still in
// use.
b.rootBuf.Ref()
s := newBuffer()
s.data = data
s.rootBuf = b.rootBuf
s.refs.Store(1)
return s
}
func (b *buffer) split(n int) (Buffer, Buffer) {
if b.rootBuf == nil || b.rootBuf.refs.Add(1) <= 1 {
panic("Cannot split freed buffer")
@@ -240,6 +270,13 @@ func (e emptyBuffer) Len() int {
return 0
}
func (e emptyBuffer) Slice(start, end int) Buffer {
if start != 0 || end != 0 {
panic(fmt.Sprintf("slice bounds out of range [%d:%d] with length 0", start, end))
}
return e
}
func (e emptyBuffer) split(int) (left, right Buffer) {
return e, e
}
@@ -264,6 +301,9 @@ func (s SliceBuffer) Free() {}
// Len is a noop implementation of Len.
func (s SliceBuffer) Len() int { return len(s) }
// Slice returns a new SliceBuffer that is a view into the receiver from [start:end).
func (s SliceBuffer) Slice(start, end int) Buffer { return s[start:end] }
func (s SliceBuffer) split(n int) (left, right Buffer) {
return s[:n], s[n:]
}

View File

@@ -21,6 +21,7 @@ package grpc
import (
"context"
"errors"
"fmt"
"io"
"math"
rand "math/rand/v2"
@@ -749,7 +750,7 @@ func (a *csAttempt) shouldRetry(err error) (bool, error) {
return false, err
}
if cs.numRetries+1 >= rp.MaxAttempts {
return false, err
return false, fmt.Errorf("max retries exhausted: failed after %d attempts: %w", cs.numRetries+1, err)
}
var dur time.Duration

View File

@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
const Version = "1.80.0"
const Version = "1.81.0"

5
vendor/modules.txt vendored
View File

@@ -1054,8 +1054,8 @@ google.golang.org/genproto/googleapis/api/annotations
# google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9
## explicit; go 1.25.0
google.golang.org/genproto/googleapis/rpc/status
# google.golang.org/grpc v1.80.0
## explicit; go 1.24.0
# google.golang.org/grpc v1.81.0
## explicit; go 1.25.0
google.golang.org/grpc
google.golang.org/grpc/attributes
google.golang.org/grpc/backoff
@@ -1108,6 +1108,7 @@ google.golang.org/grpc/internal/status
google.golang.org/grpc/internal/syscall
google.golang.org/grpc/internal/transport
google.golang.org/grpc/internal/transport/networktype
google.golang.org/grpc/internal/transport/readyreader
google.golang.org/grpc/keepalive
google.golang.org/grpc/mem
google.golang.org/grpc/metadata