Skip to content

Commit 267a09b

Browse files
authored
xds/internal/xdsclient: Add counter metrics for valid and invalid resource updates (#8038)
1 parent f227ba9 commit 267a09b

13 files changed

+245
-23
lines changed

internal/internal.go

+3
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ var (
6464
// gRPC server. An xDS-enabled server needs to know what type of credentials
6565
// is configured on the underlying gRPC server. This is set by server.go.
6666
GetServerCredentials any // func (*grpc.Server) credentials.TransportCredentials
67+
// MetricsRecorderForServer returns the MetricsRecorderList derived from a
68+
// server's stats handlers.
69+
MetricsRecorderForServer any // func (*grpc.Server) estats.MetricsRecorder
6770
// CanonicalString returns the canonical string of the code defined here:
6871
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md.
6972
//

resolver/resolver.go

+3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030

3131
"google.golang.org/grpc/attributes"
3232
"google.golang.org/grpc/credentials"
33+
"google.golang.org/grpc/experimental/stats"
3334
"google.golang.org/grpc/internal"
3435
"google.golang.org/grpc/serviceconfig"
3536
)
@@ -175,6 +176,8 @@ type BuildOptions struct {
175176
// Authority is the effective authority of the clientconn for which the
176177
// resolver is built.
177178
Authority string
179+
// MetricsRecorder is the metrics recorder to do recording.
180+
MetricsRecorder stats.MetricsRecorder
178181
}
179182

180183
// An Endpoint is one network endpoint, or server, which may have multiple

resolver_wrapper.go

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ func (ccr *ccResolverWrapper) start() error {
7777
CredsBundle: ccr.cc.dopts.copts.CredsBundle,
7878
Dialer: ccr.cc.dopts.copts.Dialer,
7979
Authority: ccr.cc.authority,
80+
MetricsRecorder: ccr.cc.metricsRecorderList,
8081
}
8182
var err error
8283
// The delegating resolver is used unless:

server.go

+5
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@ import (
3737
"google.golang.org/grpc/credentials"
3838
"google.golang.org/grpc/encoding"
3939
"google.golang.org/grpc/encoding/proto"
40+
estats "google.golang.org/grpc/experimental/stats"
4041
"google.golang.org/grpc/grpclog"
4142
"google.golang.org/grpc/internal"
4243
"google.golang.org/grpc/internal/binarylog"
4344
"google.golang.org/grpc/internal/channelz"
4445
"google.golang.org/grpc/internal/grpcsync"
4546
"google.golang.org/grpc/internal/grpcutil"
47+
istats "google.golang.org/grpc/internal/stats"
4648
"google.golang.org/grpc/internal/transport"
4749
"google.golang.org/grpc/keepalive"
4850
"google.golang.org/grpc/mem"
@@ -82,6 +84,9 @@ func init() {
8284
internal.BinaryLogger = binaryLogger
8385
internal.JoinServerOptions = newJoinServerOption
8486
internal.BufferPool = bufferPool
87+
internal.MetricsRecorderForServer = func(srv *Server) estats.MetricsRecorder {
88+
return istats.NewMetricsRecorderList(srv.opts.statsHandlers)
89+
}
8590
}
8691

8792
var statusOK = status.New(codes.OK, "")

xds/internal/resolver/internal/internal.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,5 @@ var (
2626
NewWRR any // func() wrr.WRR
2727

2828
// NewXDSClient is the function used to create a new xDS client.
29-
NewXDSClient any // func(string) (xdsclient.XDSClient, func(), error)
29+
NewXDSClient any // func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error)
3030
)

xds/internal/resolver/xds_resolver.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
rand "math/rand/v2"
2626
"sync/atomic"
2727

28+
estats "google.golang.org/grpc/experimental/stats"
2829
"google.golang.org/grpc/internal"
2930
"google.golang.org/grpc/internal/grpclog"
3031
"google.golang.org/grpc/internal/grpcsync"
@@ -50,13 +51,16 @@ const Scheme = "xds"
5051
// the provided config and a new xDS client in that pool.
5152
func newBuilderWithConfigForTesting(config []byte) (resolver.Builder, error) {
5253
return &xdsResolverBuilder{
53-
newXDSClient: func(name string) (xdsclient.XDSClient, func(), error) {
54+
newXDSClient: func(name string, mr estats.MetricsRecorder) (xdsclient.XDSClient, func(), error) {
5455
config, err := bootstrap.NewConfigFromContents(config)
5556
if err != nil {
5657
return nil, nil, err
5758
}
5859
pool := xdsclient.NewPool(config)
59-
return pool.NewClientForTesting(xdsclient.OptionsForTesting{Name: name})
60+
return pool.NewClientForTesting(xdsclient.OptionsForTesting{
61+
Name: name,
62+
MetricsRecorder: mr,
63+
})
6064
},
6165
}, nil
6266
}
@@ -66,8 +70,11 @@ func newBuilderWithConfigForTesting(config []byte) (resolver.Builder, error) {
6670
// specific xds client pool being used.
6771
func newBuilderWithPoolForTesting(pool *xdsclient.Pool) (resolver.Builder, error) {
6872
return &xdsResolverBuilder{
69-
newXDSClient: func(name string) (xdsclient.XDSClient, func(), error) {
70-
return pool.NewClientForTesting(xdsclient.OptionsForTesting{Name: name})
73+
newXDSClient: func(name string, mr estats.MetricsRecorder) (xdsclient.XDSClient, func(), error) {
74+
return pool.NewClientForTesting(xdsclient.OptionsForTesting{
75+
Name: name,
76+
MetricsRecorder: mr,
77+
})
7178
},
7279
}, nil
7380
}
@@ -82,7 +89,7 @@ func init() {
8289
}
8390

8491
type xdsResolverBuilder struct {
85-
newXDSClient func(string) (xdsclient.XDSClient, func(), error)
92+
newXDSClient func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error)
8693
}
8794

8895
// Build helps implement the resolver.Builder interface.
@@ -115,11 +122,11 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon
115122
r.serializerCancel = cancel
116123

117124
// Initialize the xDS client.
118-
newXDSClient := rinternal.NewXDSClient.(func(string) (xdsclient.XDSClient, func(), error))
125+
newXDSClient := rinternal.NewXDSClient.(func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error))
119126
if b.newXDSClient != nil {
120127
newXDSClient = b.newXDSClient
121128
}
122-
client, closeFn, err := newXDSClient(target.String())
129+
client, closeFn, err := newXDSClient(target.String(), opts.MetricsRecorder)
123130
if err != nil {
124131
return nil, fmt.Errorf("xds: failed to create xds-client: %v", err)
125132
}

xds/internal/resolver/xds_resolver_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/google/go-cmp/cmp"
3333
"github.com/google/uuid"
3434
"google.golang.org/grpc/codes"
35+
estats "google.golang.org/grpc/experimental/stats"
3536
"google.golang.org/grpc/internal"
3637
iresolver "google.golang.org/grpc/internal/resolver"
3738
"google.golang.org/grpc/internal/testutils"
@@ -257,7 +258,7 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) {
257258
// client is closed.
258259
origNewClient := rinternal.NewXDSClient
259260
closeCh := make(chan struct{})
260-
rinternal.NewXDSClient = func(string) (xdsclient.XDSClient, func(), error) {
261+
rinternal.NewXDSClient = func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error) {
261262
bc := e2e.DefaultBootstrapContents(t, uuid.New().String(), "dummy-management-server-address")
262263
config, err := bootstrap.NewConfigFromContents(bc)
263264
if err != nil {

xds/internal/xdsclient/authority.go

+10
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"sync"
2424
"sync/atomic"
2525

26+
"google.golang.org/grpc/experimental/stats"
2627
"google.golang.org/grpc/grpclog"
2728
igrpclog "google.golang.org/grpc/internal/grpclog"
2829
"google.golang.org/grpc/internal/grpcsync"
@@ -87,6 +88,8 @@ type authority struct {
8788
xdsClientSerializer *grpcsync.CallbackSerializer // Serializer to run call ins from the xDS client, owned by this authority.
8889
xdsClientSerializerClose func() // Function to close the above serializer.
8990
logger *igrpclog.PrefixLogger // Logger for this authority.
91+
target string // The gRPC Channel target.
92+
metricsRecorder stats.MetricsRecorder // The metrics recorder used for emitting metrics.
9093

9194
// The below defined fields must only be accessed in the context of the
9295
// serializer callback, owned by this authority.
@@ -120,6 +123,8 @@ type authorityBuildOptions struct {
120123
serializer *grpcsync.CallbackSerializer // Callback serializer for invoking watch callbacks
121124
getChannelForADS xdsChannelForADS // Function to acquire a reference to an xdsChannel
122125
logPrefix string // Prefix for logging
126+
target string // Target for the gRPC Channel that owns xDS Client/Authority
127+
metricsRecorder stats.MetricsRecorder // metricsRecorder to emit metrics
123128
}
124129

125130
// newAuthority creates a new authority instance with the provided
@@ -143,6 +148,8 @@ func newAuthority(args authorityBuildOptions) *authority {
143148
xdsClientSerializerClose: cancel,
144149
logger: igrpclog.NewPrefixLogger(l, logPrefix),
145150
resources: make(map[xdsresource.Type]map[string]*resourceState),
151+
target: args.target,
152+
metricsRecorder: args.metricsRecorder,
146153
}
147154

148155
// Create an ordered list of xdsChannels with their server configs. The
@@ -358,6 +365,7 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig
358365
// On error, keep previous version of the resource. But update status
359366
// and error.
360367
if uErr.Err != nil {
368+
xdsClientResourceUpdatesInvalidMetric.Record(a.metricsRecorder, 1, a.target, serverConfig.ServerURI(), rType.TypeName())
361369
state.md.ErrState = md.ErrState
362370
state.md.Status = md.Status
363371
for watcher := range state.watchers {
@@ -369,6 +377,8 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig
369377
continue
370378
}
371379

380+
xdsClientResourceUpdatesValidMetric.Record(a.metricsRecorder, 1, a.target, serverConfig.ServerURI(), rType.TypeName())
381+
372382
if state.deletionIgnored {
373383
state.deletionIgnored = false
374384
a.logger.Infof("A valid update was received for resource %q of type %q after previously ignoring a deletion", name, rType.TypeName())

xds/internal/xdsclient/client_refcounted_test.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/google/uuid"
2727
"google.golang.org/grpc/internal/testutils"
28+
"google.golang.org/grpc/internal/testutils/stats"
2829
"google.golang.org/grpc/internal/testutils/xds/e2e"
2930
"google.golang.org/grpc/internal/xds/bootstrap"
3031
)
@@ -60,7 +61,7 @@ func (s) TestClientNew_Single(t *testing.T) {
6061
defer func() { xdsClientImplCloseHook = origClientImplCloseHook }()
6162

6263
// The first call to New() should create a new client.
63-
_, closeFunc, err := pool.NewClient(t.Name())
64+
_, closeFunc, err := pool.NewClient(t.Name(), &stats.NoopMetricsRecorder{})
6465
if err != nil {
6566
t.Fatalf("Failed to create xDS client: %v", err)
6667
}
@@ -76,7 +77,7 @@ func (s) TestClientNew_Single(t *testing.T) {
7677
closeFuncs := make([]func(), count)
7778
for i := 0; i < count; i++ {
7879
func() {
79-
_, closeFuncs[i], err = pool.NewClient(t.Name())
80+
_, closeFuncs[i], err = pool.NewClient(t.Name(), &stats.NoopMetricsRecorder{})
8081
if err != nil {
8182
t.Fatalf("%d-th call to New() failed with error: %v", i, err)
8283
}
@@ -114,7 +115,7 @@ func (s) TestClientNew_Single(t *testing.T) {
114115

115116
// Calling New() again, after the previous Client was actually closed,
116117
// should create a new one.
117-
_, closeFunc, err = pool.NewClient(t.Name())
118+
_, closeFunc, err = pool.NewClient(t.Name(), &stats.NoopMetricsRecorder{})
118119
if err != nil {
119120
t.Fatalf("Failed to create xDS client: %v", err)
120121
}
@@ -156,7 +157,7 @@ func (s) TestClientNew_Multiple(t *testing.T) {
156157

157158
// Create two xDS clients.
158159
client1Name := t.Name() + "-1"
159-
_, closeFunc1, err := pool.NewClient(client1Name)
160+
_, closeFunc1, err := pool.NewClient(client1Name, &stats.NoopMetricsRecorder{})
160161
if err != nil {
161162
t.Fatalf("Failed to create xDS client: %v", err)
162163
}
@@ -171,7 +172,7 @@ func (s) TestClientNew_Multiple(t *testing.T) {
171172
}
172173

173174
client2Name := t.Name() + "-2"
174-
_, closeFunc2, err := pool.NewClient(client2Name)
175+
_, closeFunc2, err := pool.NewClient(client2Name, &stats.NoopMetricsRecorder{})
175176
if err != nil {
176177
t.Fatalf("Failed to create xDS client: %v", err)
177178
}
@@ -193,7 +194,7 @@ func (s) TestClientNew_Multiple(t *testing.T) {
193194
defer wg.Done()
194195
for i := 0; i < count; i++ {
195196
var err error
196-
_, closeFuncs1[i], err = pool.NewClient(client1Name)
197+
_, closeFuncs1[i], err = pool.NewClient(client1Name, &stats.NoopMetricsRecorder{})
197198
if err != nil {
198199
t.Errorf("%d-th call to New() failed with error: %v", i, err)
199200
}
@@ -203,7 +204,7 @@ func (s) TestClientNew_Multiple(t *testing.T) {
203204
defer wg.Done()
204205
for i := 0; i < count; i++ {
205206
var err error
206-
_, closeFuncs2[i], err = pool.NewClient(client2Name)
207+
_, closeFuncs2[i], err = pool.NewClient(client2Name, &stats.NoopMetricsRecorder{})
207208
if err != nil {
208209
t.Errorf("%d-th call to New() failed with error: %v", i, err)
209210
}

xds/internal/xdsclient/clientimpl.go

+25-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"time"
2828

2929
v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
30+
estats "google.golang.org/grpc/experimental/stats"
3031
"google.golang.org/grpc/internal"
3132
"google.golang.org/grpc/internal/backoff"
3233
"google.golang.org/grpc/internal/grpclog"
@@ -60,6 +61,21 @@ var (
6061
xdsClientImplCloseHook = func(string) {}
6162

6263
defaultExponentialBackoff = backoff.DefaultExponential.Backoff
64+
65+
xdsClientResourceUpdatesValidMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
66+
Name: "grpc.xds_client.resource_updates_valid",
67+
Description: "A counter of resources received that were considered valid. The counter will be incremented even for resources that have not changed.",
68+
Unit: "resource",
69+
Labels: []string{"grpc.target", "grpc.xds.server", "grpc.xds.resource_type"},
70+
Default: false,
71+
})
72+
xdsClientResourceUpdatesInvalidMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
73+
Name: "grpc.xds_client.resource_updates_invalid",
74+
Description: "A counter of resources received that were considered invalid.",
75+
Unit: "resource",
76+
Labels: []string{"grpc.target", "grpc.xds.server", "grpc.xds.resource_type"},
77+
Default: false,
78+
})
6379
)
6480

6581
// clientImpl is the real implementation of the xDS client. The exported Client
@@ -78,6 +94,8 @@ type clientImpl struct {
7894
serializer *grpcsync.CallbackSerializer // Serializer for invoking resource watcher callbacks.
7995
serializerClose func() // Function to close the serializer.
8096
logger *grpclog.PrefixLogger // Logger for this client.
97+
metricsRecorder estats.MetricsRecorder // Metrics recorder for metrics.
98+
target string // The gRPC target for this client.
8199

82100
// The clientImpl owns a bunch of channels to individual xDS servers
83101
// specified in the bootstrap configuration. Authorities acquire references
@@ -111,9 +129,11 @@ func init() {
111129
}
112130

113131
// newClientImpl returns a new xdsClient with the given config.
114-
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) {
132+
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration, mr estats.MetricsRecorder, target string) (*clientImpl, error) {
115133
ctx, cancel := context.WithCancel(context.Background())
116134
c := &clientImpl{
135+
metricsRecorder: mr,
136+
target: target,
117137
done: grpcsync.NewEvent(),
118138
authorities: make(map[string]*authority),
119139
config: config,
@@ -139,6 +159,8 @@ func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, s
139159
serializer: c.serializer,
140160
getChannelForADS: c.getChannelForADS,
141161
logPrefix: clientPrefix(c),
162+
target: target,
163+
metricsRecorder: c.metricsRecorder,
142164
})
143165
}
144166
c.topLevelAuthority = newAuthority(authorityBuildOptions{
@@ -147,6 +169,8 @@ func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, s
147169
serializer: c.serializer,
148170
getChannelForADS: c.getChannelForADS,
149171
logPrefix: clientPrefix(c),
172+
target: target,
173+
metricsRecorder: c.metricsRecorder,
150174
})
151175
c.logger = prefixLogger(c)
152176
return c, nil

0 commit comments

Comments
 (0)