Skip to content

Commit dba4a1b

Browse files
committed
[Metrics] Handle vLLM streaming response in streaming server
- Update streaming integration test when the response includes usage, the DONE message is returned together with the last message. The end of stream contains empty message.
1 parent dc5f7aa commit dba4a1b

File tree

3 files changed

+127
-53
lines changed

3 files changed

+127
-53
lines changed

pkg/epp/handlers/response.go

+46-32
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ import (
3030
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3131
)
3232

33+
const (
34+
streamingRespPrefix = "data: "
35+
streamingEndMsg = "data: [DONE]"
36+
)
37+
3338
// HandleResponseHeaders processes response headers from the backend model server.
3439
func (s *Server) HandleResponseHeaders(
3540
ctx context.Context,
@@ -197,39 +202,10 @@ func (s *Server) HandleStreaming(
197202
body *extProcPb.ProcessingRequest_ResponseBody,
198203
loggerVerbose logr.Logger,
199204
) error {
200-
respPrefix := "data: "
201205
responseText := string(body.ResponseBody.Body)
202-
// Example message if "stream_options": {"include_usage": "true"} is included in the request:
203-
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
204-
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
205-
//
206-
// data: [DONE]
207-
//
208-
// Noticed that vLLM returns two entries in one response.
209-
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
210-
//
211-
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
212-
// indicates end of streaming.
213-
if strings.Contains(responseText, "data: [DONE]") {
214-
response := Response{}
215-
216-
lines := strings.Split(responseText, "\n")
217-
for _, line := range lines {
218-
if !strings.HasPrefix(line, respPrefix) {
219-
continue
220-
}
221-
content := strings.TrimPrefix(line, respPrefix)
222-
if content == "[DONE]" {
223-
continue
224-
}
225-
226-
byteSlice := []byte(content)
227-
if err := json.Unmarshal(byteSlice, &response); err != nil {
228-
loggerVerbose.Error(err, "unmarshaling response body")
229-
continue
230-
}
231-
}
232-
reqCtx.Response = response
206+
if strings.Contains(responseText, streamingEndMsg) {
207+
parsedResp := ParseRespForUsage(ctx, responseText, loggerVerbose)
208+
reqCtx.Response = parsedResp
233209
}
234210

235211
if body.ResponseBody.EndOfStream {
@@ -242,6 +218,44 @@ func (s *Server) HandleStreaming(
242218
return nil
243219
}
244220

221+
// Example message if "stream_options": {"include_usage": "true"} is included in the request:
222+
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
223+
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
224+
//
225+
// data: [DONE]
226+
//
227+
// Noticed that vLLM returns two entries in one response.
228+
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
229+
//
230+
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
231+
// indicates end of streaming.
232+
func ParseRespForUsage(
233+
ctx context.Context,
234+
responseText string,
235+
loggerVerbose logr.Logger,
236+
) Response {
237+
response := Response{}
238+
239+
lines := strings.Split(responseText, "\n")
240+
for _, line := range lines {
241+
if !strings.HasPrefix(line, streamingRespPrefix) {
242+
continue
243+
}
244+
content := strings.TrimPrefix(line, streamingRespPrefix)
245+
if content == "[DONE]" {
246+
continue
247+
}
248+
249+
byteSlice := []byte(content)
250+
if err := json.Unmarshal(byteSlice, &response); err != nil {
251+
loggerVerbose.Error(err, "unmarshaling response body")
252+
continue
253+
}
254+
}
255+
256+
return response
257+
}
258+
245259
type Response struct {
246260
Usage Usage `json:"usage"`
247261
}

pkg/epp/handlers/streamingserver.go

+28
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,17 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
167167
case *extProcPb.ProcessingRequest_ResponseBody:
168168
if reqCtx.modelServerStreaming {
169169
// Currently we punt on response parsing if the modelServer is streaming, and we just passthrough.
170+
171+
responseText := string(v.ResponseBody.Body)
172+
s.HandleResponseBodyModelStreaming(ctx, reqCtx, responseText)
173+
if v.ResponseBody.EndOfStream {
174+
loggerVerbose.Info("streaming is completed")
175+
176+
reqCtx.ResponseCompleteTimestamp = time.Now()
177+
metrics.RecordRequestLatencies(ctx, reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp)
178+
metrics.RecordResponseSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseSize)
179+
}
180+
170181
reqCtx.respBodyResp = &extProcPb.ProcessingResponse{
171182
Response: &extProcPb.ProcessingResponse_ResponseBody{
172183
ResponseBody: &extProcPb.BodyResponse{
@@ -543,3 +554,20 @@ func (s *StreamingServer) HandleResponseBody(
543554
}
544555
return reqCtx, nil
545556
}
557+
558+
// The function is to handle streaming response if the modelServer is streaming.
559+
func (s *StreamingServer) HandleResponseBodyModelStreaming(
560+
ctx context.Context,
561+
reqCtx *StreamingRequestContext,
562+
responseText string,
563+
) {
564+
logger := log.FromContext(ctx)
565+
loggerVerbose := logger.V(logutil.VERBOSE)
566+
loggerVerbose.Info("Processing HandleResponseBody")
567+
568+
if strings.Contains(responseText, streamingEndMsg) {
569+
resp := ParseRespForUsage(ctx, responseText, loggerVerbose)
570+
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, resp.Usage.PromptTokens)
571+
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, resp.Usage.CompletionTokens)
572+
}
573+
}

test/integration/epp/hermetic_test.go

+53-21
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
387387
requests []*extProcPb.ProcessingRequest
388388
pods map[backendmetrics.Pod]*backendmetrics.Metrics
389389
wantResponses []*extProcPb.ProcessingResponse
390-
wantMetrics string
390+
wantMetrics map[string]string
391391
wantErr bool
392392
immediateResponse *extProcPb.ImmediateResponse
393393
}{
@@ -410,11 +410,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
410410
KVCacheUsagePercent: 0.2,
411411
},
412412
},
413-
wantMetrics: `
413+
wantMetrics: map[string]string{`inference_model_request_total`: `
414414
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
415415
# TYPE inference_model_request_total counter
416416
inference_model_request_total{model_name="my-model",target_model_name="my-model-12345"} 1
417-
`,
417+
`},
418418
wantErr: false,
419419
wantResponses: []*extProcPb.ProcessingResponse{
420420
{
@@ -491,11 +491,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
491491
},
492492
},
493493
},
494-
wantMetrics: `
494+
wantMetrics: map[string]string{`inference_model_request_total`: `
495495
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
496496
# TYPE inference_model_request_total counter
497497
inference_model_request_total{model_name="sql-lora",target_model_name="sql-lora-1fdg2"} 1
498-
`,
498+
`},
499499
wantErr: false,
500500
wantResponses: []*extProcPb.ProcessingResponse{
501501
{
@@ -572,11 +572,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
572572
},
573573
},
574574
},
575-
wantMetrics: `
575+
wantMetrics: map[string]string{`inference_model_request_total`: `
576576
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
577577
# TYPE inference_model_request_total counter
578578
inference_model_request_total{model_name="sql-lora",target_model_name="sql-lora-1fdg2"} 1
579-
`,
579+
`},
580580
wantErr: false,
581581
wantResponses: []*extProcPb.ProcessingResponse{
582582
{
@@ -655,7 +655,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
655655
},
656656
},
657657
wantErr: false,
658-
wantMetrics: "",
658+
wantMetrics: map[string]string{},
659659
wantResponses: []*extProcPb.ProcessingResponse{
660660
{
661661
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
@@ -699,11 +699,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
699699
},
700700
},
701701
},
702-
wantMetrics: `
702+
wantMetrics: map[string]string{`inference_model_request_total`: `
703703
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
704704
# TYPE inference_model_request_total counter
705705
inference_model_request_total{model_name="sql-lora-sheddable",target_model_name="sql-lora-1fdg3"} 1
706-
`,
706+
`},
707707
wantErr: false,
708708
wantResponses: []*extProcPb.ProcessingResponse{
709709
{
@@ -807,11 +807,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
807807
},
808808
},
809809
},
810-
wantMetrics: `
810+
wantMetrics: map[string]string{`inference_model_request_total`: `
811811
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
812812
# TYPE inference_model_request_total counter
813813
inference_model_request_total{model_name="sql-lora-sheddable",target_model_name="sql-lora-1fdg3"} 1
814-
`,
814+
`},
815815
wantErr: false,
816816
wantResponses: []*extProcPb.ProcessingResponse{
817817
{
@@ -915,11 +915,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
915915
},
916916
},
917917
},
918-
wantMetrics: `
918+
wantMetrics: map[string]string{`inference_model_request_total`: `
919919
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
920920
# TYPE inference_model_request_total counter
921921
inference_model_request_total{model_name="direct-model",target_model_name="direct-model"} 1
922-
`,
922+
`},
923923
wantErr: false,
924924
wantResponses: []*extProcPb.ProcessingResponse{
925925
{
@@ -1217,19 +1217,47 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
12171217
{
12181218
Request: &extProcPb.ProcessingRequest_ResponseBody{
12191219
ResponseBody: &extProcPb.HttpBody{
1220-
Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"tweet-summary-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}`),
1220+
Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"tweet-summary-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
1221+
data: [DONE]`,
1222+
),
12211223
EndOfStream: false},
12221224
},
12231225
},
12241226
{
12251227
Request: &extProcPb.ProcessingRequest_ResponseBody{
12261228
ResponseBody: &extProcPb.HttpBody{
1227-
Body: []byte("data: [DONE]"),
1229+
Body: []byte(""),
12281230
EndOfStream: true},
12291231
},
12301232
},
12311233
},
12321234
wantErr: false,
1235+
wantMetrics: map[string]string{`inference_model_input_tokens`: `
1236+
# HELP inference_model_input_tokens [ALPHA] Inference model input token count distribution for requests in each model.
1237+
# TYPE inference_model_input_tokens histogram
1238+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="1"} 0
1239+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="8"} 1
1240+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="16"} 1
1241+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="32"} 1
1242+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="64"} 1
1243+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="128"} 1
1244+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="256"} 1
1245+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="512"} 1
1246+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="1024"} 1
1247+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="2048"} 1
1248+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="4096"} 1
1249+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="8192"} 1
1250+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="16384"} 1
1251+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="32778"} 1
1252+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="65536"} 1
1253+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="131072"} 1
1254+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="262144"} 1
1255+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="524288"} 1
1256+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="1.048576e+06"} 1
1257+
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="+Inf"} 1
1258+
inference_model_input_tokens_sum{model_name="",target_model_name=""} 7
1259+
inference_model_input_tokens_count{model_name="",target_model_name=""} 1
1260+
`},
12331261
wantResponses: []*extProcPb.ProcessingResponse{
12341262
{
12351263
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
@@ -1336,7 +1364,9 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
13361364
BodyMutation: &extProcPb.BodyMutation{
13371365
Mutation: &extProcPb.BodyMutation_StreamedResponse{
13381366
StreamedResponse: &extProcPb.StreamedBodyResponse{
1339-
Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"tweet-summary-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}`),
1367+
Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"tweet-summary-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
1368+
data: [DONE]`,
1369+
),
13401370
EndOfStream: false,
13411371
},
13421372
},
@@ -1352,7 +1382,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
13521382
BodyMutation: &extProcPb.BodyMutation{
13531383
Mutation: &extProcPb.BodyMutation_StreamedResponse{
13541384
StreamedResponse: &extProcPb.StreamedBodyResponse{
1355-
Body: []byte("data: [DONE]"),
1385+
Body: []byte(""),
13561386
EndOfStream: true,
13571387
},
13581388
},
@@ -1378,9 +1408,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
13781408
t.Errorf("Unexpected response, (-want +got): %v", diff)
13791409
}
13801410

1381-
if test.wantMetrics != "" {
1382-
if err := metricsutils.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(test.wantMetrics), "inference_model_request_total"); err != nil {
1383-
t.Error(err)
1411+
if len(test.wantMetrics) != 0 {
1412+
for metricName, value := range test.wantMetrics {
1413+
if err := metricsutils.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(value), metricName); err != nil {
1414+
t.Error(err)
1415+
}
13841416
}
13851417
}
13861418

0 commit comments

Comments
 (0)