Skip to content

Commit 9e57090

Browse files
authored
Add integration module for gRPC-go (alibaba#81)
1 parent 2fd68d5 commit 9e57090

File tree

9 files changed

+457
-17
lines changed

9 files changed

+457
-17
lines changed

.gitignore

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ atlassian-ide-plugin.xml
2727
*.out
2828

2929
# Dependency directories (remove the comment below to include it)
30-
# vendor/
30+
vendor/
3131

3232
### macOS template
3333
.DS_Store
@@ -63,4 +63,4 @@ Temporary Items
6363
*.log
6464

6565
# coverage file
66-
coverage.html
66+
coverage.html

adapter/grpc/client.go

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
6+
sentinel "github.com/alibaba/sentinel-golang/api"
7+
"github.com/alibaba/sentinel-golang/core/base"
8+
"google.golang.org/grpc"
9+
)
10+
11+
// SentinelUnaryClientIntercept returns new grpc.UnaryClientInterceptor instance
12+
func SentinelUnaryClientIntercept(opts ...Option) grpc.UnaryClientInterceptor {
13+
options := evaluateOptions(opts)
14+
return func(
15+
ctx context.Context,
16+
method string,
17+
req, reply interface{},
18+
cc *grpc.ClientConn,
19+
invoker grpc.UnaryInvoker,
20+
opts ...grpc.CallOption,
21+
) error {
22+
// method as resource name by default
23+
resourceName := method
24+
if options.unaryClientResourceExtract != nil {
25+
resourceName = options.unaryClientResourceExtract(ctx, method, req, cc)
26+
}
27+
28+
entry, err := sentinel.Entry(
29+
resourceName,
30+
sentinel.WithResourceType(base.ResTypeRPC),
31+
sentinel.WithTrafficType(base.Outbound),
32+
)
33+
if err != nil {
34+
if options.unaryClientBlockFallback != nil {
35+
return options.unaryClientBlockFallback(ctx, method, req, cc, err)
36+
}
37+
return err
38+
}
39+
defer entry.Exit()
40+
41+
return invoker(ctx, method, req, reply, cc, opts...)
42+
}
43+
}
44+
45+
// SentinelStreamClientIntercept returns new grpc.StreamClientInterceptor instance
46+
func SentinelStreamClientIntercept(opts ...Option) grpc.StreamClientInterceptor {
47+
options := evaluateOptions(opts)
48+
return func(
49+
ctx context.Context,
50+
desc *grpc.StreamDesc,
51+
cc *grpc.ClientConn,
52+
method string,
53+
streamer grpc.Streamer,
54+
opts ...grpc.CallOption,
55+
) (grpc.ClientStream, error) {
56+
// method as resource name by default
57+
resourceName := method
58+
if options.streamClientResourceExtract != nil {
59+
resourceName = options.streamClientResourceExtract(ctx, desc, cc, method)
60+
}
61+
62+
entry, err := sentinel.Entry(
63+
resourceName,
64+
sentinel.WithResourceType(base.ResTypeRPC),
65+
sentinel.WithTrafficType(base.Outbound),
66+
)
67+
if err != nil { // blocked
68+
if options.streamClientBlockFallback != nil {
69+
return options.streamClientBlockFallback(ctx, desc, cc, method, err)
70+
}
71+
return nil, err
72+
}
73+
74+
defer entry.Exit()
75+
76+
return streamer(ctx, desc, cc, method, opts...)
77+
}
78+
}

adapter/grpc/client_test.go

+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/alibaba/sentinel-golang/core/base"
9+
"github.com/alibaba/sentinel-golang/core/flow"
10+
"github.com/stretchr/testify/assert"
11+
"google.golang.org/grpc"
12+
)
13+
14+
func TestUnaryClientIntercept(t *testing.T) {
15+
const errMsgFake = "fake error"
16+
interceptor := SentinelUnaryClientIntercept()
17+
invoker := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
18+
opts ...grpc.CallOption) error {
19+
return errors.New(errMsgFake)
20+
}
21+
method := "/grpc.testing.TestService/UnaryCall"
22+
t.Run("success", func(t *testing.T) {
23+
var _, err = flow.LoadRules([]*flow.FlowRule{
24+
{
25+
Resource: "/grpc.testing.TestService/UnaryCall",
26+
MetricType: flow.QPS,
27+
Count: 1,
28+
ControlBehavior: flow.Reject,
29+
},
30+
})
31+
assert.Nil(t, err)
32+
err = interceptor(nil, method, nil, nil, nil, invoker)
33+
assert.EqualError(t, err, errMsgFake)
34+
t.Run("second fail", func(t *testing.T) {
35+
err = interceptor(nil, method, nil, nil, nil, invoker)
36+
assert.IsType(t, &base.BlockError{}, err)
37+
})
38+
})
39+
40+
t.Run("fail", func(t *testing.T) {
41+
var _, err = flow.LoadRules([]*flow.FlowRule{
42+
{
43+
Resource: "/grpc.testing.TestService/UnaryCall",
44+
MetricType: flow.QPS,
45+
Count: 0,
46+
ControlBehavior: flow.Reject,
47+
},
48+
})
49+
assert.Nil(t, err)
50+
err = interceptor(nil, method, nil, nil, nil, invoker)
51+
assert.IsType(t, &base.BlockError{}, err)
52+
})
53+
}
54+
55+
func TestStreamClientIntercept(t *testing.T) {
56+
const errMsgFake = "fake error"
57+
interceptor := SentinelStreamClientIntercept()
58+
streamer := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
59+
opts ...grpc.CallOption) (grpc.ClientStream, error) {
60+
return nil, errors.New(errMsgFake)
61+
}
62+
method := "/grpc.testing.TestService/StreamingOutputCall"
63+
t.Run("success", func(t *testing.T) {
64+
var _, err = flow.LoadRules([]*flow.FlowRule{
65+
{
66+
Resource: "/grpc.testing.TestService/StreamingOutputCall",
67+
MetricType: flow.QPS,
68+
Count: 1,
69+
ControlBehavior: flow.Reject,
70+
},
71+
})
72+
assert.Nil(t, err)
73+
rep, err := interceptor(nil, nil, nil, method, streamer)
74+
assert.EqualError(t, err, errMsgFake)
75+
assert.Nil(t, rep)
76+
t.Run("second fail", func(t *testing.T) {
77+
rep, err := interceptor(nil, nil, nil, method, streamer)
78+
assert.IsType(t, &base.BlockError{}, err)
79+
assert.Nil(t, rep)
80+
})
81+
})
82+
83+
t.Run("fail", func(t *testing.T) {
84+
var _, err = flow.LoadRules([]*flow.FlowRule{
85+
{
86+
Resource: "/grpc.testing.TestService/StreamingOutputCall",
87+
MetricType: flow.QPS,
88+
Count: 0,
89+
ControlBehavior: flow.Reject,
90+
},
91+
})
92+
assert.Nil(t, err)
93+
rep, err := interceptor(nil, nil, nil, method, streamer)
94+
assert.IsType(t, &base.BlockError{}, err)
95+
assert.Nil(t, rep)
96+
})
97+
}

adapter/grpc/options.go

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
6+
"github.com/alibaba/sentinel-golang/core/base"
7+
"google.golang.org/grpc"
8+
)
9+
10+
type (
11+
Option func(*options)
12+
13+
options struct {
14+
unaryClientResourceExtract func(context.Context, string, interface{}, *grpc.ClientConn) string
15+
unaryServerResourceExtract func(context.Context, interface{}, *grpc.UnaryServerInfo) string
16+
17+
streamClientResourceExtract func(context.Context, *grpc.StreamDesc, *grpc.ClientConn, string) string
18+
streamServerResourceExtract func(interface{}, grpc.ServerStream, *grpc.StreamServerInfo) string
19+
20+
unaryClientBlockFallback func(context.Context, string, interface{}, *grpc.ClientConn, *base.BlockError) error
21+
unaryServerBlockFallback func(context.Context, interface{}, *grpc.UnaryServerInfo, *base.BlockError) (interface{}, error)
22+
23+
streamClientBlockFallback func(context.Context, *grpc.StreamDesc, *grpc.ClientConn, string, *base.BlockError) (grpc.ClientStream, error)
24+
streamServerBlockFallback func(interface{}, grpc.ServerStream, *grpc.StreamServerInfo, *base.BlockError) error
25+
}
26+
)
27+
28+
// WithUnaryClientResourceExtractor set unaryClientResourceExtract
29+
func WithUnaryClientResourceExtractor(fn func(context.Context, string, interface{}, *grpc.ClientConn) string) Option {
30+
return func(opts *options) {
31+
opts.unaryClientResourceExtract = fn
32+
}
33+
}
34+
35+
// WithUnaryServerResourceExtractor set unaryServerResourceExtract
36+
func WithUnaryServerResourceExtractor(fn func(context.Context, interface{}, *grpc.UnaryServerInfo) string) Option {
37+
return func(opts *options) {
38+
opts.unaryServerResourceExtract = fn
39+
}
40+
}
41+
42+
// WithStreamClientResourceExtractor set streamClientResourceExtract
43+
func WithStreamClientResourceExtractor(fn func(context.Context, *grpc.StreamDesc, *grpc.ClientConn, string) string) Option {
44+
return func(opts *options) {
45+
opts.streamClientResourceExtract = fn
46+
}
47+
}
48+
49+
// WithStreamServerResourceExtractor set streamServerResourceExtract
50+
func WithStreamServerResourceExtractor(fn func(interface{}, grpc.ServerStream, *grpc.StreamServerInfo) string) Option {
51+
return func(opts *options) {
52+
opts.streamServerResourceExtract = fn
53+
}
54+
}
55+
56+
// WithUnaryClientBlockFallback set unaryClientBlockFallback
57+
func WithUnaryClientBlockFallback(fn func(context.Context, string, interface{}, *grpc.ClientConn, *base.BlockError) error) Option {
58+
return func(opts *options) {
59+
opts.unaryClientBlockFallback = fn
60+
}
61+
}
62+
63+
// WithUnaryServerBlockFallback set unaryServerBlockFallback
64+
func WithUnaryServerBlockFallback(fn func(context.Context, interface{}, *grpc.UnaryServerInfo, *base.BlockError) (interface{}, error)) Option {
65+
return func(opts *options) {
66+
opts.unaryServerBlockFallback = fn
67+
}
68+
}
69+
70+
// WithStreamClientBlockFallback set streamClientBlockFallback
71+
func WithStreamClientBlockFallback(fn func(context.Context, *grpc.StreamDesc, *grpc.ClientConn, string, *base.BlockError) (grpc.ClientStream, error)) Option {
72+
return func(opts *options) {
73+
opts.streamClientBlockFallback = fn
74+
}
75+
}
76+
77+
// WithStreamServerBlockFallback set streamServerBlockFallback
78+
func WithStreamServerBlockFallback(fn func(interface{}, grpc.ServerStream, *grpc.StreamServerInfo, *base.BlockError) error) Option {
79+
return func(opts *options) {
80+
opts.streamServerBlockFallback = fn
81+
}
82+
}
83+
84+
func evaluateOptions(opts []Option) *options {
85+
optCopy := &options{}
86+
for _, o := range opts {
87+
o(optCopy)
88+
}
89+
return optCopy
90+
}

adapter/grpc/server.go

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
6+
sentinel "github.com/alibaba/sentinel-golang/api"
7+
"github.com/alibaba/sentinel-golang/core/base"
8+
"google.golang.org/grpc"
9+
)
10+
11+
// SentinelUnaryServerIntercept implements gRPC unary server interceptor interface
12+
func SentinelUnaryServerIntercept(opts ...Option) grpc.UnaryServerInterceptor {
13+
options := evaluateOptions(opts)
14+
return func(
15+
ctx context.Context,
16+
req interface{},
17+
info *grpc.UnaryServerInfo,
18+
handler grpc.UnaryHandler,
19+
) (interface{}, error) {
20+
// method as resource name by default
21+
resourceName := info.FullMethod
22+
if options.unaryServerResourceExtract != nil {
23+
resourceName = options.unaryServerResourceExtract(ctx, req, info)
24+
}
25+
entry, err := sentinel.Entry(
26+
resourceName,
27+
sentinel.WithResourceType(base.ResTypeRPC),
28+
sentinel.WithTrafficType(base.Inbound),
29+
)
30+
if err != nil {
31+
if options.unaryServerBlockFallback != nil {
32+
return options.unaryServerBlockFallback(ctx, req, info, err)
33+
}
34+
return nil, err
35+
}
36+
defer entry.Exit()
37+
return handler(ctx, req)
38+
}
39+
}
40+
41+
// SentinelStreamServerIntercept implements gRPC stream server interceptor interface
42+
func SentinelStreamServerIntercept(opts ...Option) grpc.StreamServerInterceptor {
43+
options := evaluateOptions(opts)
44+
return func(
45+
srv interface{},
46+
ss grpc.ServerStream,
47+
info *grpc.StreamServerInfo,
48+
handler grpc.StreamHandler,
49+
) error {
50+
// method as resource name by default
51+
resourceName := info.FullMethod
52+
if options.streamServerResourceExtract != nil {
53+
resourceName = options.streamServerResourceExtract(srv, ss, info)
54+
}
55+
entry, err := sentinel.Entry(
56+
resourceName,
57+
sentinel.WithResourceType(base.ResTypeRPC),
58+
sentinel.WithTrafficType(base.Inbound),
59+
)
60+
if err != nil { // blocked
61+
if options.streamServerBlockFallback != nil {
62+
return options.streamServerBlockFallback(srv, ss, info, err)
63+
}
64+
return err
65+
}
66+
defer entry.Exit()
67+
return handler(srv, ss)
68+
}
69+
}

0 commit comments

Comments
 (0)