diff --git a/CHANGELOG.md b/CHANGELOG.md index 3afaca3ff..cb27f211c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ OpenTelemetry Go Automatic Instrumentation adheres to [Semantic Versioning](http ### Fixed - Fix spans parsing from eBPF for the legacy (go version < 1.24 otel-go < 1.33) otel global instrumentation. ([#1960](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1960)) +- Reset Kafka producer span underlying memory before each span. ([#1937](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1937)) ## [v0.21.0] - 2025-02-18 diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf/probe.bpf.c b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf/probe.bpf.c index 278ccf14a..0b3b8ac7a 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf/probe.bpf.c +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf/probe.bpf.c @@ -52,7 +52,7 @@ struct __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); __uint(key_size, sizeof(u32)); __uint(value_size, sizeof(struct kafka_request_t)); - __uint(max_entries, 1); + __uint(max_entries, 2); } kafka_request_storage_map SEC(".maps"); // https://github.com/segmentio/kafka-go/blob/main/protocol/record.go#L48 @@ -144,13 +144,24 @@ int uprobe_WriteMessages(struct pt_regs *ctx) { return 0; } - u32 map_id = 0; - struct kafka_request_t *kafka_request = bpf_map_lookup_elem(&kafka_request_storage_map, &map_id); - if (kafka_request == NULL) + u32 zero_id = 0; + struct kafka_request_t *zero_kafka_request = bpf_map_lookup_elem(&kafka_request_storage_map, &zero_id); + if (zero_kafka_request == NULL) { - bpf_printk("uuprobe/WriteMessages: kafka_request is NULL"); + bpf_printk("uuprobe/WriteMessages: zero_kafka_request is NULL"); return 0; } + + u32 actual_id = 1; + // Zero the span we are about to build, eBPF doesn't support memset of large structs (more than 1024 bytes) + bpf_map_update_elem(&kafka_request_storage_map, &actual_id, zero_kafka_request, BPF_ANY); + // Get a pointer to the zeroed span + struct kafka_request_t *kafka_request = bpf_map_lookup_elem(&kafka_request_storage_map, &actual_id); + if (kafka_request == NULL) { + bpf_printk("uprobe/WriteMessages: Failed to get kafka_request"); + return 0; + } + kafka_request->start_time = bpf_ktime_get_ns(); start_span_params_t start_span_params = {