Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SVLS-3545] Add Serverless metric origins to dogstatsd package #876

Open
wants to merge 39 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
64c60bc
init test origins
DylanLovesCoffee Feb 3, 2025
237eebb
test more than one check
DylanLovesCoffee Feb 5, 2025
0084b32
sanity check
DylanLovesCoffee Feb 5, 2025
83fcbd2
apply to sketches
DylanLovesCoffee Feb 5, 2025
0b62b66
add origins list
DylanLovesCoffee Feb 5, 2025
e086e35
metrics origins
DylanLovesCoffee Feb 5, 2025
c7c588c
other serverless types
DylanLovesCoffee Feb 6, 2025
472b077
cleanup
DylanLovesCoffee Feb 6, 2025
8667213
add tests
DylanLovesCoffee Feb 10, 2025
4c3a355
test
DylanLovesCoffee Feb 10, 2025
4d6e311
test again
DylanLovesCoffee Feb 10, 2025
8a48e8b
test again
DylanLovesCoffee Feb 11, 2025
0e5e492
check tags
DylanLovesCoffee Feb 11, 2025
0e73ade
fix tags
DylanLovesCoffee Feb 11, 2025
803b412
only custom metrics
DylanLovesCoffee Feb 11, 2025
f35aa56
test for series
DylanLovesCoffee Feb 12, 2025
f02d604
exclude DD
DylanLovesCoffee Feb 12, 2025
ffd31df
testing
DylanLovesCoffee Feb 12, 2025
04a8055
rename
DylanLovesCoffee Feb 12, 2025
5407fc5
Merge branch 'main' into dylan/metric-origins
DylanLovesCoffee Feb 12, 2025
b0a8dd8
update tests
DylanLovesCoffee Feb 12, 2025
7f2f889
cleanup
DylanLovesCoffee Feb 12, 2025
5c19950
update
DylanLovesCoffee Feb 12, 2025
ffd91cb
todo
DylanLovesCoffee Feb 12, 2025
932f15e
update aggregator
DylanLovesCoffee Feb 18, 2025
68543b5
refactor origin
DylanLovesCoffee Feb 18, 2025
384a5b7
clippy
DylanLovesCoffee Feb 18, 2025
e0ebca2
refactor tag checking
DylanLovesCoffee Feb 20, 2025
1e36f6f
update with azure functions
DylanLovesCoffee Feb 20, 2025
2868e8a
Merge branch 'main' into dylan/metric-origins
hghotra Mar 12, 2025
7db1305
Add timestamp to metric constructor in test
hghotra Mar 12, 2025
50faf7c
Refactor based on alex's comment
hghotra Mar 13, 2025
68c4e83
Factor out reduntant functions
hghotra Mar 13, 2025
c0853de
Update vars & add comments
hghotra Mar 13, 2025
147216e
Better naming
hghotra Mar 13, 2025
e45ec69
Remove ambiguity in tag search
hghotra Mar 13, 2025
95b94ed
Pacify clippy
hghotra Mar 13, 2025
d491ddd
Fix logic when tag value not available
hghotra Mar 13, 2025
927967c
elide the lifetimes
hghotra Mar 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions dogstatsd/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@
//! The aggregation of metrics.

use crate::constants;
use crate::datadog::{self, Metric as MetricToShip, Series};
use crate::datadog::{
self, Metadata as MetadataToShip, Metric as MetricToShip, Origin as OriginToShip, Series,
};
use crate::errors;
use crate::metric::{self, Metric, MetricValue, SortedTags};
use crate::origin::get_origin;
use std::time;

use datadog_protos::metrics::{Dogsketch, Sketch, SketchPayload};
use datadog_protos::metrics::{Dogsketch, Metadata, Origin, Sketch, SketchPayload};
use ddsketch_agent::DDSketch;
use hashbrown::hash_table;
use protobuf::Message;
use protobuf::{Message, MessageField, SpecialFields};
use tracing::{error, warn};
use ustr::Ustr;

Expand Down Expand Up @@ -268,6 +271,15 @@ fn build_sketch(now: i64, entry: &Metric, mut base_tag_vec: SortedTags) -> Optio
base_tag_vec.extend(&tags);
}
sketch.set_tags(base_tag_vec.to_chars());

let origin: Option<Origin> = get_origin(entry, base_tag_vec);
if let Some(origin) = origin {
sketch.set_metadata(Metadata {
origin: MessageField::some(origin),
special_fields: SpecialFields::default(),
});
}

Some(sketch)
}

Expand Down Expand Up @@ -296,12 +308,21 @@ fn build_metric(entry: &Metric, mut base_tag_vec: SortedTags) -> Option<MetricTo
base_tag_vec.extend(&tags);
}

let origin: Option<Origin> = get_origin(entry, base_tag_vec.clone());

Some(MetricToShip {
metric: entry.name.as_str(),
resources,
kind,
points: [point; 1],
tags: base_tag_vec.to_strings(),
metadata: Some(MetadataToShip {
origin: origin.map(|o| OriginToShip {
origin_product: o.origin_product,
origin_sub_product: o.origin_category,
origin_product_detail: o.origin_service,
}),
}),
})
}

Expand All @@ -318,7 +339,7 @@ pub mod tests {

const PRECISION: f64 = 0.000_000_01;

const SINGLE_METRIC_SIZE: usize = 193; // taken from the test, size of a serialized metric with one tag and 1 digit counter value
const SINGLE_METRIC_SIZE: usize = 220; // taken from the test, size of a serialized metric with one tag and 1 digit counter value
const SINGLE_DISTRIBUTION_SIZE: u64 = 140;
const DEFAULT_TAGS: &str =
"dd_extension_version:63-next,architecture:x86_64,_dd.compute_stats:1";
Expand Down Expand Up @@ -664,7 +685,7 @@ pub mod tests {
fn consume_metrics_batch_bytes() {
let expected_metrics_per_batch = 2;
let total_number_of_metrics = 5;
let two_metrics_size = 374;
let two_metrics_size = 428;
let max_bytes = SINGLE_METRIC_SIZE * expected_metrics_per_batch + 13;
let mut aggregator = Aggregator {
tags: to_sorted_tags(),
Expand Down
14 changes: 14 additions & 0 deletions dogstatsd/src/datadog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,20 @@ pub(crate) struct Metric {
/// The kind of metric
pub(crate) kind: DdMetricKind,
pub(crate) tags: Vec<String>,
/// Optional metadata associated with the metric
pub(crate) metadata: Option<Metadata>,
}

#[derive(Debug, Serialize)]
pub struct Metadata {
pub(crate) origin: Option<Origin>,
}

#[derive(Debug, Serialize)]
pub struct Origin {
pub(crate) origin_product: u32,
pub(crate) origin_sub_product: u32,
pub(crate) origin_product_detail: u32,
}

#[derive(Debug, Serialize)]
Expand Down
1 change: 1 addition & 0 deletions dogstatsd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ pub mod dogstatsd;
pub mod errors;
pub mod flusher;
pub mod metric;
pub mod origin;
29 changes: 29 additions & 0 deletions dogstatsd/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,17 @@ impl SortedTags {
tags_as_vec
}

pub fn contains(&self, key: &str) -> bool {
self.values.iter().any(|(k, _)| k.as_str() == key)
}

pub fn get(&self, key: &str) -> Option<&str> {
self.values
.iter()
.find(|(k, _)| k.as_str() == key)
.map(|(_, v)| v.as_str())
}

pub(crate) fn to_resources(&self) -> Vec<datadog::Resource> {
let mut resources = Vec::with_capacity(constants::MAX_TAGS);
for (key, val) in &self.values {
Expand Down Expand Up @@ -561,4 +572,22 @@ mod tests {
assert_eq!(first_element.0, Ustr::from("a"));
assert_eq!(first_element.1, Ustr::from("a1"));
}

#[test]
fn sorted_tags_contains_key() {
let tags = SortedTags::parse("a:1,b:2,c:3").unwrap();
assert!(tags.contains("a"));
assert!(tags.contains("b"));
assert!(tags.contains("c"));
assert!(!tags.contains("d"));
}

#[test]
fn sorted_tags_get_value() {
let tags = SortedTags::parse("a:1,b:2,c:3").unwrap();
assert_eq!(tags.get("a"), Some("1"));
assert_eq!(tags.get("b"), Some("2"));
assert_eq!(tags.get("c"), Some("3"));
assert_eq!(tags.get("d"), None);
}
}
170 changes: 170 additions & 0 deletions dogstatsd/src/origin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::metric::{Metric, SortedTags};
use datadog_protos::metrics::Origin;

const DD_ORIGIN_TAG_KEY: &str = "origin";
const AWS_LAMBDA_TAG_KEY: &str = "function_arn";
const AWS_STEP_FUNCTIONS_TAG_KEY: &str = "statemachinearn";

const AZURE_APP_SERVICES_TAG_VALUE: &str = "appservice";
const GOOGLE_CLOUD_RUN_TAG_VALUE: &str = "cloudrun";
const AZURE_CONTAINER_APP_TAG_VALUE: &str = "containerapp";

const DATADOG_PREFIX: &str = "datadog";
const AZURE_APP_SERVICES_PREFIX: &str = "azure.app_services";
const GOOGLE_CLOUD_RUN_PREFIX: &str = "gcp.run";
const AZURE_CONTAINER_APP_PREFIX: &str = "azure.app_containerapps";
const AWS_LAMBDA_PREFIX: &str = "aws.lambda";
const AWS_STEP_FUNCTIONS_PREFIX: &str = "aws.states";

/// Represents the product origin of a metric.
/// The full enum is exhaustive so we only include what we need. Please reference the corresponding
/// enum for all possible values https://github.com/DataDog/dd-source/blob/573dee9b5f7ee13935cb3ad11b16dde970528983/domains/metrics/shared/libs/proto/origin/origin.proto#L161
pub enum OriginProduct {
Other = 0,
Serverless = 1,
}

impl From<OriginProduct> for u32 {
fn from(product: OriginProduct) -> u32 {
product as u32
}
}

/// Represents the category origin of a metric.
/// The full enum is exhaustive so we only include what we need. Please reference the corresponding
/// enum for all possible values https://github.com/DataDog/dd-source/blob/573dee9b5f7ee13935cb3ad11b16dde970528983/domains/metrics/shared/libs/proto/origin/origin.proto#L276
pub enum OriginCategory {
Other = 0,
AppServicesMetrics = 35,
CloudRunMetrics = 36,
ContainerAppMetrics = 37,
LambdaMetrics = 38,
StepFunctionsMetrics = 41,
}

impl From<OriginCategory> for u32 {
fn from(category: OriginCategory) -> u32 {
category as u32
}
}

/// Represents the service origin of a metric.
/// The full enum is exhaustive so we only include what we need. Please reference the corresponding
/// enum for all possible values https://github.com/DataDog/dd-source/blob/573dee9b5f7ee13935cb3ad11b16dde970528983/domains/metrics/shared/libs/proto/origin/origin.proto#L417
pub enum OriginService {
Other = 0,
}

impl From<OriginService> for u32 {
fn from(service: OriginService) -> u32 {
service as u32
}
}

pub fn get_origin(metric: &Metric, tags: SortedTags) -> Option<Origin> {
let name = metric.name.to_string();
let prefix = name.split('.').take(2).collect::<Vec<&str>>().join(".");

// TODO (dylan): expand origin service to differentiate custom and standard metrics
let origin: Option<Origin> = match tags.get(DD_ORIGIN_TAG_KEY) {
Some(AZURE_APP_SERVICES_TAG_VALUE) if prefix != AZURE_APP_SERVICES_PREFIX => Some(Origin {
origin_product: OriginProduct::Serverless.into(),
origin_category: OriginCategory::AppServicesMetrics.into(),
origin_service: OriginService::Other.into(),
..Default::default()
}),
Some(GOOGLE_CLOUD_RUN_TAG_VALUE) if prefix != GOOGLE_CLOUD_RUN_PREFIX => Some(Origin {
origin_product: OriginProduct::Serverless.into(),
origin_category: OriginCategory::CloudRunMetrics.into(),
origin_service: OriginService::Other.into(),
..Default::default()
}),
Some(AZURE_CONTAINER_APP_TAG_VALUE) if prefix != AZURE_CONTAINER_APP_PREFIX => {
Some(Origin {
origin_product: OriginProduct::Serverless.into(),
origin_category: OriginCategory::ContainerAppMetrics.into(),
origin_service: OriginService::Other.into(),
..Default::default()
})
}
_ if tags.contains(AWS_LAMBDA_TAG_KEY) && prefix != AWS_LAMBDA_PREFIX => Some(Origin {
origin_product: OriginProduct::Serverless.into(),
origin_category: OriginCategory::LambdaMetrics.into(),
origin_service: OriginService::Other.into(),
..Default::default()
}),
_ if tags.contains(AWS_STEP_FUNCTIONS_TAG_KEY) && prefix != AWS_STEP_FUNCTIONS_PREFIX => {
Some(Origin {
origin_product: OriginProduct::Serverless.into(),
origin_category: OriginCategory::StepFunctionsMetrics.into(),
origin_service: OriginService::Other.into(),
..Default::default()
})
}
_ if prefix == DATADOG_PREFIX => return None,
_ => return None,
};
origin
}

#[cfg(test)]
mod tests {
use crate::metric::MetricValue;

use super::*;

#[test]
fn test_origin_product() {
let origin_product: u32 = OriginProduct::Serverless.into();
assert_eq!(origin_product, 1);
}

#[test]
fn test_origin_category() {
let origin_category: u32 = OriginCategory::LambdaMetrics.into();
assert_eq!(origin_category, 38);
}

#[test]
fn test_origin_service() {
let origin_service: u32 = OriginService::Other.into();
assert_eq!(origin_service, 0);
}

#[test]
fn test_get_origin_aws_lambda_standard_metric() {
let tags = SortedTags::parse("function_arn:hello123").unwrap();
let metric = Metric {
id: 0,
name: "aws.lambda.enhanced.invocations".into(),
value: MetricValue::Gauge(1.0),
tags: Some(tags.clone()),
};
let origin = get_origin(&metric, tags);
assert_eq!(origin, None);
}

#[test]
fn test_get_origin_aws_lambda_custom_metric() {
let tags = SortedTags::parse("function_arn:hello123").unwrap();
let metric = Metric {
id: 0,
name: "my.custom.aws.lambda.invocations".into(),
value: MetricValue::Gauge(1.0),
tags: Some(tags.clone()),
};
let origin = get_origin(&metric, tags);
assert_eq!(
origin,
Some(Origin {
origin_product: OriginProduct::Serverless.into(),
origin_category: OriginCategory::LambdaMetrics.into(),
origin_service: OriginService::Other.into(),
..Default::default()
})
);
}
}
Loading