Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metrics] Handle vLLM streaming response in streaming server #518

Merged
merged 1 commit into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 46 additions & 32 deletions pkg/epp/handlers/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ import (
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

const (
streamingRespPrefix = "data: "
streamingEndMsg = "data: [DONE]"
)

// HandleResponseHeaders processes response headers from the backend model server.
func (s *Server) HandleResponseHeaders(
ctx context.Context,
Expand Down Expand Up @@ -197,39 +202,10 @@ func (s *Server) HandleStreaming(
body *extProcPb.ProcessingRequest_ResponseBody,
loggerVerbose logr.Logger,
) error {
respPrefix := "data: "
responseText := string(body.ResponseBody.Body)
// Example message if "stream_options": {"include_usage": "true"} is included in the request:
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
//
// data: [DONE]
//
// Noticed that vLLM returns two entries in one response.
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
//
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
// indicates end of streaming.
if strings.Contains(responseText, "data: [DONE]") {
response := Response{}

lines := strings.Split(responseText, "\n")
for _, line := range lines {
if !strings.HasPrefix(line, respPrefix) {
continue
}
content := strings.TrimPrefix(line, respPrefix)
if content == "[DONE]" {
continue
}

byteSlice := []byte(content)
if err := json.Unmarshal(byteSlice, &response); err != nil {
loggerVerbose.Error(err, "unmarshaling response body")
continue
}
}
reqCtx.Response = response
if strings.Contains(responseText, streamingEndMsg) {
parsedResp := ParseRespForUsage(ctx, responseText, loggerVerbose)
reqCtx.Response = parsedResp
}

if body.ResponseBody.EndOfStream {
Expand All @@ -242,6 +218,44 @@ func (s *Server) HandleStreaming(
return nil
}

// Example message if "stream_options": {"include_usage": "true"} is included in the request:
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
//
// data: [DONE]
//
// Noticed that vLLM returns two entries in one response.
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
//
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
// indicates end of streaming.
func ParseRespForUsage(
ctx context.Context,
responseText string,
loggerVerbose logr.Logger,
) Response {
response := Response{}

lines := strings.Split(responseText, "\n")
for _, line := range lines {
if !strings.HasPrefix(line, streamingRespPrefix) {
continue
}
content := strings.TrimPrefix(line, streamingRespPrefix)
if content == "[DONE]" {
continue
}

byteSlice := []byte(content)
if err := json.Unmarshal(byteSlice, &response); err != nil {
loggerVerbose.Error(err, "unmarshaling response body")
continue
}
}

return response
}

type Response struct {
Usage Usage `json:"usage"`
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/epp/handlers/streamingserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,17 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
case *extProcPb.ProcessingRequest_ResponseBody:
if reqCtx.modelServerStreaming {
// Currently we punt on response parsing if the modelServer is streaming, and we just passthrough.

responseText := string(v.ResponseBody.Body)
s.HandleResponseBodyModelStreaming(ctx, reqCtx, responseText)
if v.ResponseBody.EndOfStream {
loggerVerbose.Info("streaming is completed")

reqCtx.ResponseCompleteTimestamp = time.Now()
metrics.RecordRequestLatencies(ctx, reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp)
metrics.RecordResponseSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseSize)
}

reqCtx.respBodyResp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseBody{
ResponseBody: &extProcPb.BodyResponse{
Expand Down Expand Up @@ -543,3 +554,20 @@ func (s *StreamingServer) HandleResponseBody(
}
return reqCtx, nil
}

// The function is to handle streaming response if the modelServer is streaming.
func (s *StreamingServer) HandleResponseBodyModelStreaming(
ctx context.Context,
reqCtx *StreamingRequestContext,
responseText string,
) {
logger := log.FromContext(ctx)
loggerVerbose := logger.V(logutil.VERBOSE)
loggerVerbose.Info("Processing HandleResponseBody")

if strings.Contains(responseText, streamingEndMsg) {
resp := ParseRespForUsage(ctx, responseText, loggerVerbose)
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, resp.Usage.PromptTokens)
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, resp.Usage.CompletionTokens)
}
}
74 changes: 53 additions & 21 deletions test/integration/epp/hermetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
requests []*extProcPb.ProcessingRequest
pods map[backendmetrics.Pod]*backendmetrics.Metrics
wantResponses []*extProcPb.ProcessingResponse
wantMetrics string
wantMetrics map[string]string
wantErr bool
immediateResponse *extProcPb.ImmediateResponse
}{
Expand All @@ -410,11 +410,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
KVCacheUsagePercent: 0.2,
},
},
wantMetrics: `
wantMetrics: map[string]string{`inference_model_request_total`: `
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
# TYPE inference_model_request_total counter
inference_model_request_total{model_name="my-model",target_model_name="my-model-12345"} 1
`,
`},
wantErr: false,
wantResponses: []*extProcPb.ProcessingResponse{
{
Expand Down Expand Up @@ -491,11 +491,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
},
},
},
wantMetrics: `
wantMetrics: map[string]string{`inference_model_request_total`: `
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
# TYPE inference_model_request_total counter
inference_model_request_total{model_name="sql-lora",target_model_name="sql-lora-1fdg2"} 1
`,
`},
wantErr: false,
wantResponses: []*extProcPb.ProcessingResponse{
{
Expand Down Expand Up @@ -572,11 +572,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
},
},
},
wantMetrics: `
wantMetrics: map[string]string{`inference_model_request_total`: `
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
# TYPE inference_model_request_total counter
inference_model_request_total{model_name="sql-lora",target_model_name="sql-lora-1fdg2"} 1
`,
`},
wantErr: false,
wantResponses: []*extProcPb.ProcessingResponse{
{
Expand Down Expand Up @@ -655,7 +655,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
},
},
wantErr: false,
wantMetrics: "",
wantMetrics: map[string]string{},
wantResponses: []*extProcPb.ProcessingResponse{
{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
Expand Down Expand Up @@ -699,11 +699,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
},
},
},
wantMetrics: `
wantMetrics: map[string]string{`inference_model_request_total`: `
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
# TYPE inference_model_request_total counter
inference_model_request_total{model_name="sql-lora-sheddable",target_model_name="sql-lora-1fdg3"} 1
`,
`},
wantErr: false,
wantResponses: []*extProcPb.ProcessingResponse{
{
Expand Down Expand Up @@ -807,11 +807,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
},
},
},
wantMetrics: `
wantMetrics: map[string]string{`inference_model_request_total`: `
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
# TYPE inference_model_request_total counter
inference_model_request_total{model_name="sql-lora-sheddable",target_model_name="sql-lora-1fdg3"} 1
`,
`},
wantErr: false,
wantResponses: []*extProcPb.ProcessingResponse{
{
Expand Down Expand Up @@ -915,11 +915,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
},
},
},
wantMetrics: `
wantMetrics: map[string]string{`inference_model_request_total`: `
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
# TYPE inference_model_request_total counter
inference_model_request_total{model_name="direct-model",target_model_name="direct-model"} 1
`,
`},
wantErr: false,
wantResponses: []*extProcPb.ProcessingResponse{
{
Expand Down Expand Up @@ -1217,19 +1217,47 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
{
Request: &extProcPb.ProcessingRequest_ResponseBody{
ResponseBody: &extProcPb.HttpBody{
Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"tweet-summary-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}`),
Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"tweet-summary-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
data: [DONE]`,
),
EndOfStream: false},
},
},
{
Request: &extProcPb.ProcessingRequest_ResponseBody{
ResponseBody: &extProcPb.HttpBody{
Body: []byte("data: [DONE]"),
Body: []byte(""),
EndOfStream: true},
},
},
},
wantErr: false,
wantMetrics: map[string]string{`inference_model_input_tokens`: `
# HELP inference_model_input_tokens [ALPHA] Inference model input token count distribution for requests in each model.
# TYPE inference_model_input_tokens histogram
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="1"} 0
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="8"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="16"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="32"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="64"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="128"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="256"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="512"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="1024"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="2048"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="4096"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="8192"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="16384"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="32778"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="65536"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="131072"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="262144"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="524288"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="1.048576e+06"} 1
inference_model_input_tokens_bucket{model_name="",target_model_name="",le="+Inf"} 1
inference_model_input_tokens_sum{model_name="",target_model_name=""} 7
inference_model_input_tokens_count{model_name="",target_model_name=""} 1
`},
wantResponses: []*extProcPb.ProcessingResponse{
{
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
Expand Down Expand Up @@ -1336,7 +1364,9 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
BodyMutation: &extProcPb.BodyMutation{
Mutation: &extProcPb.BodyMutation_StreamedResponse{
StreamedResponse: &extProcPb.StreamedBodyResponse{
Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"tweet-summary-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}`),
Body: []byte(`data: {"id":"cmpl-0fee233f-7d56-404a-acd3-4dad775d03d9","object":"text_completion","created":1741379018,"model":"tweet-summary-1","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
data: [DONE]`,
),
EndOfStream: false,
},
},
Expand All @@ -1352,7 +1382,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
BodyMutation: &extProcPb.BodyMutation{
Mutation: &extProcPb.BodyMutation_StreamedResponse{
StreamedResponse: &extProcPb.StreamedBodyResponse{
Body: []byte("data: [DONE]"),
Body: []byte(""),
EndOfStream: true,
},
},
Expand All @@ -1378,9 +1408,11 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
t.Errorf("Unexpected response, (-want +got): %v", diff)
}

if test.wantMetrics != "" {
if err := metricsutils.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(test.wantMetrics), "inference_model_request_total"); err != nil {
t.Error(err)
if len(test.wantMetrics) != 0 {
for metricName, value := range test.wantMetrics {
if err := metricsutils.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(value), metricName); err != nil {
t.Error(err)
}
}
}

Expand Down