Skip to content

Commit d7b1ff6

Browse files
committed
[Metrics] Handle vLLM streaming response in streaming server
- Update streaming integration test when the response includes usage, the DONE message is returned together with the last message. The end of stream contains empty message.
1 parent d7a9dfa commit d7b1ff6

File tree

3 files changed

+127
-53
lines changed

3 files changed

+127
-53
lines changed

pkg/epp/handlers/response.go

+46-32
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ import (
3030
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3131
)
3232

33+
const (
34+
streamingRespPrefix = "data: "
35+
streamingEndMsg = "data: [DONE]"
36+
)
37+
3338
// HandleResponseHeaders processes response headers from the backend model server.
3439
func (s *Server) HandleResponseHeaders(
3540
ctx context.Context,
@@ -197,39 +202,10 @@ func (s *Server) HandleStreaming(
197202
body *extProcPb.ProcessingRequest_ResponseBody,
198203
loggerVerbose logr.Logger,
199204
) error {
200-
respPrefix := "data: "
201205
responseText := string(body.ResponseBody.Body)
202-
// Example message if "stream_options": {"include_usage": "true"} is included in the request:
203-
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
204-
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
205-
//
206-
// data: [DONE]
207-
//
208-
// Noticed that vLLM returns two entries in one response.
209-
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
210-
//
211-
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
212-
// indicates end of streaming.
213-
if strings.Contains(responseText, "data: [DONE]") {
214-
response := Response{}
215-
216-
lines := strings.Split(responseText, "\n")
217-
for _, line := range lines {
218-
if !strings.HasPrefix(line, respPrefix) {
219-
continue
220-
}
221-
content := strings.TrimPrefix(line, respPrefix)
222-
if content == "[DONE]" {
223-
continue
224-
}
225-
226-
byteSlice := []byte(content)
227-
if err := json.Unmarshal(byteSlice, &response); err != nil {
228-
loggerVerbose.Error(err, "unmarshaling response body")
229-
continue
230-
}
231-
}
232-
reqCtx.Response = response
206+
if strings.Contains(responseText, streamingEndMsg) {
207+
parsedResp := ParseRespForUsage(ctx, responseText, loggerVerbose)
208+
reqCtx.Response = parsedResp
233209
}
234210

235211
if body.ResponseBody.EndOfStream {
@@ -242,6 +218,44 @@ func (s *Server) HandleStreaming(
242218
return nil
243219
}
244220

221+
// Example message if "stream_options": {"include_usage": "true"} is included in the request:
222+
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
223+
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
224+
//
225+
// data: [DONE]
226+
//
227+
// Noticed that vLLM returns two entries in one response.
228+
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
229+
//
230+
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
231+
// indicates end of streaming.
232+
func ParseRespForUsage(
233+
ctx context.Context,
234+
responseText string,
235+
loggerVerbose logr.Logger,
236+
) Response {
237+
response := Response{}
238+
239+
lines := strings.Split(responseText, "\n")
240+
for _, line := range lines {
241+
if !strings.HasPrefix(line, streamingRespPrefix) {
242+
continue
243+
}
244+
content := strings.TrimPrefix(line, streamingRespPrefix)
245+
if content == "[DONE]" {
246+
continue
247+
}
248+
249+
byteSlice := []byte(content)
250+
if err := json.Unmarshal(byteSlice, &response); err != nil {
251+
loggerVerbose.Error(err, "unmarshaling response body")
252+
continue
253+
}
254+
}
255+
256+
return response
257+
}
258+
245259
type Response struct {
246260
Usage Usage `json:"usage"`
247261
}

pkg/epp/handlers/streamingserver.go

+28
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,17 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
167167
case *extProcPb.ProcessingRequest_ResponseBody:
168168
if reqCtx.modelServerStreaming {
169169
// Currently we punt on response parsing if the modelServer is streaming, and we just passthrough.
170+
171+
responseText := string(v.ResponseBody.Body)
172+
s.HandleResponseBodyModelStreaming(ctx, reqCtx, responseText)
173+
if v.ResponseBody.EndOfStream {
174+
loggerVerbose.Info("streaming is completed")
175+
176+
reqCtx.ResponseCompleteTimestamp = time.Now()
177+
metrics.RecordRequestLatencies(ctx, reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp)
178+
metrics.RecordResponseSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseSize)
179+
}
180+
170181
reqCtx.respBodyResp = &extProcPb.ProcessingResponse{
171182
Response: &extProcPb.ProcessingResponse_ResponseBody{
172183
ResponseBody: &extProcPb.BodyResponse{
@@ -543,3 +554,20 @@ func (s *StreamingServer) HandleResponseBody(
543554
}
544555
return reqCtx, nil
545556
}
557+
558+
// The function is to handle streaming response if the modelServer is streaming.
559+
func (s *StreamingServer) HandleResponseBodyModelStreaming(
560+
ctx context.Context,
561+
reqCtx *StreamingRequestContext,
562+
responseText string,
563+
) {
564+
logger := log.FromContext(ctx)
565+
loggerVerbose := logger.V(logutil.VERBOSE)
566+
loggerVerbose.Info("Processing HandleResponseBody")
567+
568+
if strings.Contains(responseText, streamingEndMsg) {
569+
resp := ParseRespForUsage(ctx, responseText, loggerVerbose)
570+
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, resp.Usage.PromptTokens)
571+
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, resp.Usage.CompletionTokens)
572+
}
573+
}

test/integration/epp/hermetic_test.go

+53-21
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
356356
requests []*extProcPb.ProcessingRequest
357357
pods map[backendmetrics.Pod]*backendmetrics.Metrics
358358
wantResponses []*extProcPb.ProcessingResponse
359-
wantMetrics string
359+
wantMetrics map[string]string
360360
wantErr bool
361361
immediateResponse *extProcPb.ImmediateResponse
362362
}{
@@ -379,11 +379,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
379379
KVCacheUsagePercent: 0.2,
380380
},
381381
},
382-
wantMetrics: `
382+
wantMetrics: map[string]string{`inference_model_request_total`: `
383383
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
384384
# TYPE inference_model_request_total counter
385385
inference_model_request_total{model_name="my-model",target_model_name="my-model-12345"} 1
386-
`,
386+
`},
387387
wantErr: false,
388388
wantResponses: []*extProcPb.ProcessingResponse{
389389
{
@@ -460,11 +460,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
460460
},
461461
},
462462
},
463-
wantMetrics: `
463+
wantMetrics: map[string]string{`inference_model_request_total`: `
464464
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
465465
# TYPE inference_model_request_total counter
466466
inference_model_request_total{model_name="sql-lora",target_model_name="sql-lora-1fdg2"} 1
467-
`,
467+
`},
468468
wantErr: false,
469469
wantResponses: []*extProcPb.ProcessingResponse{
470470
{
@@ -541,11 +541,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
541541
},
542542
},
543543
},
544-
wantMetrics: `
544+
wantMetrics: map[string]string{`inference_model_request_total`: `
545545
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
546546
# TYPE inference_model_request_total counter
547547
inference_model_request_total{model_name="sql-lora",target_model_name="sql-lora-1fdg2"} 1
548-
`,
548+
`},
549549
wantErr: false,
550550
wantResponses: []*extProcPb.ProcessingResponse{
551551
{
@@ -624,7 +624,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
624624
},
625625
},
626626
wantErr: false,
627-
wantMetrics: "",
627+
wantMetrics: map[string]string{},
628628
wantResponses: []*extProcPb.ProcessingResponse{
629629
{
630630
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
@@ -668,11 +668,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
668668
},
669669
},
670670
},
671-
wantMetrics: `
671+
wantMetrics: map[string]string{`inference_model_request_total`: `
672672
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
673673
# TYPE inference_model_request_total counter
674674
inference_model_request_total{model_name="sql-lora-sheddable",target_model_name="sql-lora-1fdg3"} 1
675-
`,
675+
`},
676676
wantErr: false,
677677
wantResponses: []*extProcPb.ProcessingResponse{
678678
{
@@ -776,11 +776,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
776776
},
777777
},
778778
},
779-
wantMetrics: `
779+
wantMetrics: map[string]string{`inference_model_request_total`: `
780780
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
781781
# TYPE inference_model_request_total counter
782782
inference_model_request_total{model_name="sql-lora-sheddable",target_model_name="sql-lora-1fdg3"} 1
783-
`,
783+
`},
784784
wantErr: false,
785785
wantResponses: []*extProcPb.ProcessingResponse{
786786
{
@@ -884,11 +884,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
884884
},
885885
},
886886
},
887-
wantMetrics: `
887+
wantMetrics: map[string]string{`inference_model_request_total`: `
888888
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
889889
# TYPE inference_model_request_total counter
890890
inference_model_request_total{model_name="direct-model",target_model_name="direct-model"} 1
891-
`,
891+
`},
892892
wantErr: false,
893893
wantResponses: []*extProcPb.ProcessingResponse{
894894
{
@@ -1186,19 +1186,47 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
11861186
{
11871187
Request: &extProcPb.ProcessingRequest_ResponseBody{
11881188
ResponseBody: &extProcPb.HttpBody{
1189-
Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"tweet-summary-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}`),
1189+
Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"tweet-summary-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
1190+
data: [DONE]`,
1191+
),
11901192
EndOfStream: false},
11911193
},
11921194
},
11931195
{
11941196
Request: &extProcPb.ProcessingRequest_ResponseBody{
11951197
ResponseBody: &extProcPb.HttpBody{
1196-
Body: []byte("data: [DONE]"),
1198+
Body: []byte(""),
11971199
EndOfStream: true},
11981200
},
11991201
},
12001202
},
12011203
wantErr: false,
1204+
wantMetrics: map[string]string{`inference_model_input_tokens`: `
1205+
# HELP inference_model_input_tokens [ALPHA] Inference model input token count distribution for requests in each model.
1206+
# TYPE inference_model_input_tokens histogram
1207+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="1"} 0
1208+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="8"} 1
1209+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="16"} 1
1210+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="32"} 1
1211+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="64"} 1
1212+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="128"} 1
1213+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="256"} 1
1214+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="512"} 1
1215+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="1024"} 1
1216+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="2048"} 1
1217+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="4096"} 1
1218+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="8192"} 1
1219+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="16384"} 1
1220+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="32778"} 1
1221+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="65536"} 1
1222+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="131072"} 1
1223+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="262144"} 1
1224+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="524288"} 1
1225+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="1.048576e+06"} 1
1226+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="+Inf"} 1
1227+
inference_model_input_tokens_sum{model_name="",target_model_name=""} 7
1228+
inference_model_input_tokens_count{model_name="",target_model_name=""} 1
1229+
`},
12021230
wantResponses: []*extProcPb.ProcessingResponse{
12031231
{
12041232
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
@@ -1305,7 +1333,9 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
13051333
BodyMutation: &extProcPb.BodyMutation{
13061334
Mutation: &extProcPb.BodyMutation_StreamedResponse{
13071335
StreamedResponse: &extProcPb.StreamedBodyResponse{
1308-
Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"tweet-summary-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}`),
1336+
Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"tweet-summary-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
1337+
data: [DONE]`,
1338+
),
13091339
EndOfStream: false,
13101340
},
13111341
},
@@ -1321,7 +1351,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
13211351
BodyMutation: &extProcPb.BodyMutation{
13221352
Mutation: &extProcPb.BodyMutation_StreamedResponse{
13231353
StreamedResponse: &extProcPb.StreamedBodyResponse{
1324-
Body: []byte("data: [DONE]"),
1354+
Body: []byte(""),
13251355
EndOfStream: true,
13261356
},
13271357
},
@@ -1347,9 +1377,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
13471377
t.Errorf("Unexpected response, (-want +got): %v", diff)
13481378
}
13491379

1350-
if test.wantMetrics != "" {
1351-
if err := metricsutils.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(test.wantMetrics), "inference_model_request_total"); err != nil {
1352-
t.Error(err)
1380+
if len(test.wantMetrics) != 0 {
1381+
for metricName, value := range test.wantMetrics {
1382+
if err := metricsutils.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(value), metricName); err != nil {
1383+
t.Error(err)
1384+
}
13531385
}
13541386
}
13551387

0 commit comments

Comments
 (0)