Skip to content

Commit e6d3ac2

Browse files
committed
Support full duplex streaming
1 parent 53cb18f commit e6d3ac2

File tree

7 files changed

+319
-122
lines changed

7 files changed

+319
-122
lines changed

cmd/body-based-routing/main.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,16 @@ import (
4444
var (
4545
grpcPort = flag.Int(
4646
"grpcPort",
47-
runserver.DefaultGrpcPort,
47+
9004,
4848
"The gRPC port used for communicating with Envoy proxy")
4949
grpcHealthPort = flag.Int(
5050
"grpcHealthPort",
5151
9005,
5252
"The port used for gRPC liveness and readiness probes")
5353
metricsPort = flag.Int(
5454
"metricsPort", 9090, "The metrics port")
55+
streaming = flag.Bool(
56+
"streaming", false, "Enables streaming support for Envoy full-duplex streaming mode")
5557
logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity")
5658

5759
setupLog = ctrl.Log.WithName("setup")
@@ -92,7 +94,7 @@ func run() error {
9294
ctx := ctrl.SetupSignalHandler()
9395

9496
// Setup runner.
95-
serverRunner := &runserver.ExtProcServerRunner{GrpcPort: *grpcPort}
97+
serverRunner := runserver.NewDefaultExtProcServerRunner(*grpcPort, *streaming)
9698

9799
// Register health server.
98100
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), *grpcHealthPort); err != nil {

pkg/body-based-routing/handlers/request.go

+94-33
Original file line numberDiff line numberDiff line change
@@ -23,55 +23,93 @@ import (
2323

2424
basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2525
eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
26+
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
2627
"sigs.k8s.io/controller-runtime/pkg/log"
2728
"sigs.k8s.io/gateway-api-inference-extension/pkg/body-based-routing/metrics"
2829
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
2930
)
3031

32+
const modelHeader = "X-Gateway-Model-Name"
33+
3134
// HandleRequestBody handles request bodies.
32-
func (s *Server) HandleRequestBody(ctx context.Context, body *eppb.HttpBody) (*eppb.ProcessingResponse, error) {
35+
func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]*eppb.ProcessingResponse, error) {
3336
logger := log.FromContext(ctx)
37+
var ret []*eppb.ProcessingResponse
3438

35-
var data map[string]any
36-
if err := json.Unmarshal(body.GetBody(), &data); err != nil {
39+
requestBodyBytes, err := json.Marshal(data)
40+
if err != nil {
3741
return nil, err
3842
}
3943

4044
modelVal, ok := data["model"]
4145
if !ok {
4246
metrics.RecordModelNotInBodyCounter()
4347
logger.V(logutil.DEFAULT).Info("Request body does not contain model parameter")
44-
return &eppb.ProcessingResponse{
45-
Response: &eppb.ProcessingResponse_RequestBody{
46-
RequestBody: &eppb.BodyResponse{},
47-
},
48-
}, nil
48+
if s.streaming {
49+
ret = append(ret, &eppb.ProcessingResponse{
50+
Response: &eppb.ProcessingResponse_RequestHeaders{
51+
RequestHeaders: &eppb.HeadersResponse{},
52+
},
53+
})
54+
ret = addStreamedBodyResponse(ret, requestBodyBytes)
55+
return ret, nil
56+
} else {
57+
ret = append(ret, &eppb.ProcessingResponse{
58+
Response: &eppb.ProcessingResponse_RequestBody{
59+
RequestBody: &eppb.BodyResponse{},
60+
},
61+
})
62+
}
63+
return ret, nil
4964
}
5065

5166
modelStr, ok := modelVal.(string)
5267
if !ok {
5368
metrics.RecordModelNotParsedCounter()
5469
logger.V(logutil.DEFAULT).Info("Model parameter value is not a string")
55-
return &eppb.ProcessingResponse{
56-
Response: &eppb.ProcessingResponse_RequestBody{
57-
RequestBody: &eppb.BodyResponse{},
58-
},
59-
}, fmt.Errorf("the model parameter value %v is not a string", modelVal)
70+
return nil, fmt.Errorf("the model parameter value %v is not a string", modelVal)
6071
}
6172

6273
metrics.RecordSuccessCounter()
63-
return &eppb.ProcessingResponse{
64-
Response: &eppb.ProcessingResponse_RequestBody{
65-
RequestBody: &eppb.BodyResponse{
66-
Response: &eppb.CommonResponse{
67-
// Necessary so that the new headers are used in the routing decision.
68-
ClearRouteCache: true,
69-
HeaderMutation: &eppb.HeaderMutation{
70-
SetHeaders: []*basepb.HeaderValueOption{
71-
{
72-
Header: &basepb.HeaderValue{
73-
Key: "X-Gateway-Model-Name",
74-
RawValue: []byte(modelStr),
74+
75+
if s.streaming {
76+
ret = append(ret, &eppb.ProcessingResponse{
77+
Response: &eppb.ProcessingResponse_RequestHeaders{
78+
RequestHeaders: &eppb.HeadersResponse{
79+
Response: &eppb.CommonResponse{
80+
ClearRouteCache: true,
81+
HeaderMutation: &eppb.HeaderMutation{
82+
SetHeaders: []*basepb.HeaderValueOption{
83+
{
84+
Header: &basepb.HeaderValue{
85+
Key: modelHeader,
86+
RawValue: []byte(modelStr),
87+
},
88+
},
89+
},
90+
},
91+
},
92+
},
93+
},
94+
})
95+
ret = addStreamedBodyResponse(ret, requestBodyBytes)
96+
return ret, nil
97+
}
98+
99+
return []*eppb.ProcessingResponse{
100+
{
101+
Response: &eppb.ProcessingResponse_RequestBody{
102+
RequestBody: &eppb.BodyResponse{
103+
Response: &eppb.CommonResponse{
104+
// Necessary so that the new headers are used in the routing decision.
105+
ClearRouteCache: true,
106+
HeaderMutation: &eppb.HeaderMutation{
107+
SetHeaders: []*basepb.HeaderValueOption{
108+
{
109+
Header: &basepb.HeaderValue{
110+
Key: modelHeader,
111+
RawValue: []byte(modelStr),
112+
},
75113
},
76114
},
77115
},
@@ -82,20 +120,43 @@ func (s *Server) HandleRequestBody(ctx context.Context, body *eppb.HttpBody) (*e
82120
}, nil
83121
}
84122

123+
func addStreamedBodyResponse(responses []*eppb.ProcessingResponse, requestBodyBytes []byte) []*eppb.ProcessingResponse {
124+
return append(responses, &extProcPb.ProcessingResponse{
125+
Response: &extProcPb.ProcessingResponse_RequestBody{
126+
RequestBody: &extProcPb.BodyResponse{
127+
Response: &extProcPb.CommonResponse{
128+
BodyMutation: &extProcPb.BodyMutation{
129+
Mutation: &extProcPb.BodyMutation_StreamedResponse{
130+
StreamedResponse: &extProcPb.StreamedBodyResponse{
131+
Body: requestBodyBytes,
132+
EndOfStream: true,
133+
},
134+
},
135+
},
136+
},
137+
},
138+
},
139+
})
140+
}
141+
85142
// HandleRequestHeaders handles request headers.
86-
func (s *Server) HandleRequestHeaders(headers *eppb.HttpHeaders) (*eppb.ProcessingResponse, error) {
87-
return &eppb.ProcessingResponse{
88-
Response: &eppb.ProcessingResponse_RequestHeaders{
89-
RequestHeaders: &eppb.HeadersResponse{},
143+
func (s *Server) HandleRequestHeaders(headers *eppb.HttpHeaders) ([]*eppb.ProcessingResponse, error) {
144+
return []*eppb.ProcessingResponse{
145+
{
146+
Response: &eppb.ProcessingResponse_RequestHeaders{
147+
RequestHeaders: &eppb.HeadersResponse{},
148+
},
90149
},
91150
}, nil
92151
}
93152

94153
// HandleRequestTrailers handles request trailers.
95-
func (s *Server) HandleRequestTrailers(trailers *eppb.HttpTrailers) (*eppb.ProcessingResponse, error) {
96-
return &eppb.ProcessingResponse{
97-
Response: &eppb.ProcessingResponse_RequestTrailers{
98-
RequestTrailers: &eppb.TrailersResponse{},
154+
func (s *Server) HandleRequestTrailers(trailers *eppb.HttpTrailers) ([]*eppb.ProcessingResponse, error) {
155+
return []*eppb.ProcessingResponse{
156+
{
157+
Response: &eppb.ProcessingResponse_RequestTrailers{
158+
RequestTrailers: &eppb.TrailersResponse{},
159+
},
99160
},
100161
}, nil
101162
}

0 commit comments

Comments
 (0)