From d80e6f98d307087957ea94b182728b9b7a33ca7d Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 5 Mar 2025 09:32:03 -0800 Subject: [PATCH] Move process Info creation into NewManager The Manager manages probes for a process. Have the manager handle the process info creation and coordination instead of extraneously passing it from the Instrumentation. --- instrumentation.go | 35 +----- internal/pkg/instrumentation/manager.go | 86 +++++++------- .../pkg/instrumentation/manager_load_test.go | 32 +++++- internal/pkg/instrumentation/manager_test.go | 107 ++++++++---------- 4 files changed, 119 insertions(+), 141 deletions(-) diff --git a/instrumentation.go b/instrumentation.go index bdc7d0e61..4d241c0c4 100644 --- a/instrumentation.go +++ b/instrumentation.go @@ -53,9 +53,7 @@ const ( // Instrumentation manages and controls all OpenTelemetry Go // auto-instrumentation. type Instrumentation struct { - target *process.Info - analyzer *process.Analyzer - manager *instrumentation.Manager + manager *instrumentation.Manager stopMu sync.Mutex stop context.CancelFunc @@ -97,42 +95,17 @@ func NewInstrumentation(ctx context.Context, opts ...InstrumentationOption) (*In } cp := convertConfigProvider(c.cp) - mngr, err := instrumentation.NewManager(c.logger, ctrl, cp, p...) + mngr, err := instrumentation.NewManager(c.logger, ctrl, c.pid, cp, p...) if err != nil { return nil, err } - pa := process.NewAnalyzer(c.logger, c.pid) - pi, err := pa.Analyze(mngr.GetRelevantFuncs()) - if err != nil { - return nil, err - } - - alloc, err := process.Allocate(c.logger, c.pid) - if err != nil { - return nil, err - } - pi.Allocation = alloc - - c.logger.Info( - "target process analysis completed", - "pid", pi.ID, - "go_version", pi.GoVersion, - "dependencies", pi.Modules, - "total_functions_found", len(pi.Functions), - ) - mngr.FilterUnusedProbes(pi) - - return &Instrumentation{ - target: pi, - analyzer: pa, - manager: mngr, - }, nil + return &Instrumentation{manager: mngr}, nil } // Load loads and attaches the relevant probes to the target process. func (i *Instrumentation) Load(ctx context.Context) error { - return i.manager.Load(ctx, i.target) + return i.manager.Load(ctx) } // Run starts the instrumentation. It must be called after [Instrumentation.Load]. diff --git a/internal/pkg/instrumentation/manager.go b/internal/pkg/instrumentation/manager.go index 875f30d53..8bac20dd5 100644 --- a/internal/pkg/instrumentation/manager.go +++ b/internal/pkg/instrumentation/manager.go @@ -55,7 +55,7 @@ type Manager struct { } // NewManager returns a new [Manager]. -func NewManager(logger *slog.Logger, otelController *opentelemetry.Controller, cp ConfigProvider, probes ...probe.Probe) (*Manager, error) { +func NewManager(logger *slog.Logger, otelController *opentelemetry.Controller, pid process.ID, cp ConfigProvider, probes ...probe.Probe) (*Manager, error) { m := &Manager{ logger: logger, probes: make(map[probe.ID]probe.Probe), @@ -63,11 +63,35 @@ func NewManager(logger *slog.Logger, otelController *opentelemetry.Controller, c cp: cp, } - err := m.registerProbes(probes) + funcs := make(map[string]any) + for _, p := range probes { + if err := m.registerProbe(p); err != nil { + return nil, err + } + + for _, s := range p.Manifest().Symbols { + funcs[s.Symbol] = nil + } + } + + pa := process.NewAnalyzer(logger, pid) + + var err error + m.proc, err = pa.Analyze(funcs) if err != nil { return nil, err } + alloc, err := process.Allocate(logger, pid) + if err != nil { + return nil, err + } + m.proc.Allocation = alloc + + m.logger.Info("loaded process info", "process", m.proc) + + m.filterUnusedProbes() + return m, nil } @@ -103,23 +127,11 @@ func (m *Manager) registerProbe(p probe.Probe) error { return nil } -// GetRelevantFuncs returns the instrumented functions for all managed probes. -func (m *Manager) GetRelevantFuncs() map[string]interface{} { - funcsMap := make(map[string]interface{}) - for _, i := range m.probes { - for _, s := range i.Manifest().Symbols { - funcsMap[s.Symbol] = nil - } - } - - return funcsMap -} - -// FilterUnusedProbes filterers probes whose functions are already instrumented +// filterUnusedProbes filterers probes whose functions are already instrumented // out of the Manager. -func (m *Manager) FilterUnusedProbes(target *process.Info) { +func (m *Manager) filterUnusedProbes() { existingFuncMap := make(map[string]interface{}) - for _, f := range target.Functions { + for _, f := range m.proc.Functions { existingFuncMap[f.Name] = nil } @@ -237,14 +249,14 @@ func (m *Manager) ConfigLoop(ctx context.Context) { } } -func (m *Manager) Load(ctx context.Context, target *process.Info) error { +func (m *Manager) Load(ctx context.Context) error { if len(m.probes) == 0 { return errors.New("no instrumentation for target process") } if m.cp == nil { return errors.New("no config provider set") } - if target == nil { + if m.proc == nil { return errors.New("target details not set - load is called on non-initialized instrumentation") } m.stateMu.Lock() @@ -255,12 +267,11 @@ func (m *Manager) Load(ctx context.Context, target *process.Info) error { } m.currentConfig = m.cp.InitialConfig(ctx) - err := m.loadProbes(target) + err := m.loadProbes() if err != nil { return err } - m.proc = target m.state = managerStateLoaded return nil @@ -330,7 +341,7 @@ func (m *Manager) Stop() error { defer m.probeMu.Unlock() m.logger.Debug("Shutting down all probes") - err := m.cleanup(m.proc) + err := m.cleanup() // Wait for all probes to stop. m.runningProbesWG.Wait() @@ -339,19 +350,19 @@ func (m *Manager) Stop() error { return err } -func (m *Manager) loadProbes(target *process.Info) error { +func (m *Manager) loadProbes() error { // Remove resource limits for kernels <5.11. if err := rlimitRemoveMemlock(); err != nil { return err } - exe, err := openExecutable(target.ID.ExePath()) + exe, err := openExecutable(m.proc.ID.ExePath()) if err != nil { return err } m.exe = exe - if err := m.mount(target); err != nil { + if err := m.mount(); err != nil { return err } @@ -359,10 +370,10 @@ func (m *Manager) loadProbes(target *process.Info) error { for name, i := range m.probes { if isProbeEnabled(name, m.currentConfig) { m.logger.Info("loading probe", "name", name) - err := i.Load(exe, target, m.currentConfig.SamplingConfig) + err := i.Load(exe, m.proc, m.currentConfig.SamplingConfig) if err != nil { m.logger.Error("error while loading probes, cleaning up", "error", err, "name", name) - return errors.Join(err, m.cleanup(target)) + return errors.Join(err, m.cleanup()) } } } @@ -371,16 +382,16 @@ func (m *Manager) loadProbes(target *process.Info) error { return nil } -func (m *Manager) mount(target *process.Info) error { - if target.Allocation != nil { - m.logger.Debug("Mounting bpffs", "allocation", target.Allocation) +func (m *Manager) mount() error { + if m.proc.Allocation != nil { + m.logger.Debug("Mounting bpffs", "allocation", m.proc.Allocation) } else { m.logger.Debug("Mounting bpffs") } - return bpffsMount(target) + return bpffsMount(m.proc) } -func (m *Manager) cleanup(target *process.Info) error { +func (m *Manager) cleanup() error { ctx := context.Background() err := m.cp.Shutdown(context.Background()) for _, i := range m.probes { @@ -394,14 +405,5 @@ func (m *Manager) cleanup(target *process.Info) error { } m.logger.Debug("Cleaning bpffs") - return errors.Join(err, bpffsCleanup(target)) -} - -func (m *Manager) registerProbes(probes []probe.Probe) error { - for _, p := range probes { - if err := m.registerProbe(p); err != nil { - return err - } - } - return nil + return errors.Join(err, bpffsCleanup(m.proc)) } diff --git a/internal/pkg/instrumentation/manager_load_test.go b/internal/pkg/instrumentation/manager_load_test.go index 3e9b93184..24258c1ce 100644 --- a/internal/pkg/instrumentation/manager_load_test.go +++ b/internal/pkg/instrumentation/manager_load_test.go @@ -22,8 +22,11 @@ import ( grpcServer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server" httpClient "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/net/http/client" httpServer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/net/http/server" + "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" "go.opentelemetry.io/auto/internal/pkg/instrumentation/testutils" "go.opentelemetry.io/auto/internal/pkg/instrumentation/utils" + "go.opentelemetry.io/auto/internal/pkg/process" + "go.opentelemetry.io/auto/internal/pkg/process/binary" ) func TestLoadProbes(t *testing.T) { @@ -51,10 +54,9 @@ func TestLoadProbes(t *testing.T) { } } -func fakeManager(t *testing.T) *Manager { +func fakeManager(t *testing.T, fnNames ...string) *Manager { logger := slog.Default() - m, err := NewManager( - logger, nil, NewNoopConfigProvider(nil), + probes := []probe.Probe{ grpcClient.New(logger, ""), grpcServer.New(logger, ""), httpServer.New(logger, ""), @@ -64,9 +66,27 @@ func fakeManager(t *testing.T) *Manager { kafkaConsumer.New(logger, ""), autosdk.New(logger), otelTraceGlobal.New(logger), - ) - assert.NoError(t, err) - assert.NotNil(t, m) + } + ver := semver.New(1, 20, 0, "", "") + var fn []*binary.Func + for _, name := range fnNames { + fn = append(fn, &binary.Func{Name: name}) + } + m := &Manager{ + logger: slog.Default(), + cp: NewNoopConfigProvider(nil), + probes: make(map[probe.ID]probe.Probe), + proc: &process.Info{ + ID: 1, + Functions: fn, + GoVersion: ver, + Modules: map[string]*semver.Version{}, + }, + } + for _, p := range probes { + m.probes[p.Manifest().ID] = p + } + m.filterUnusedProbes() return m } diff --git a/internal/pkg/instrumentation/manager_test.go b/internal/pkg/instrumentation/manager_test.go index 2b68bcae0..1b0670a2f 100644 --- a/internal/pkg/instrumentation/manager_test.go +++ b/internal/pkg/instrumentation/manager_test.go @@ -39,72 +39,32 @@ import ( ) func TestProbeFiltering(t *testing.T) { - ver := semver.New(1, 20, 0, "", "") - t.Run("empty target details", func(t *testing.T) { m := fakeManager(t) - - info := process.Info{ - ID: 1, - Functions: []*binary.Func{}, - GoVersion: ver, - Modules: map[string]*semver.Version{}, - } - m.FilterUnusedProbes(&info) assert.Empty(t, m.probes) }) t.Run("only HTTP client target details", func(t *testing.T) { - m := fakeManager(t) - - httpFuncs := []*binary.Func{ - {Name: "net/http.(*Transport).roundTrip"}, - } - - info := process.Info{ - ID: 1, - Functions: httpFuncs, - GoVersion: ver, - Modules: map[string]*semver.Version{}, - } - m.FilterUnusedProbes(&info) + m := fakeManager(t, "net/http.(*Transport).roundTrip") assert.Len(t, m.probes, 1) // one function, single probe }) t.Run("HTTP server and client target details", func(t *testing.T) { - m := fakeManager(t) - - httpFuncs := []*binary.Func{ - {Name: "net/http.(*Transport).roundTrip"}, - {Name: "net/http.serverHandler.ServeHTTP"}, - } - - info := process.Info{ - ID: 1, - Functions: httpFuncs, - GoVersion: ver, - Modules: map[string]*semver.Version{}, - } - m.FilterUnusedProbes(&info) + m := fakeManager( + t, + "net/http.(*Transport).roundTrip", + "net/http.serverHandler.ServeHTTP", + ) assert.Len(t, m.probes, 2) }) t.Run("HTTP server and client dependent function only target details", func(t *testing.T) { - m := fakeManager(t) - - httpFuncs := []*binary.Func{ + m := fakeManager( + t, // writeSubset depends on "net/http.(*Transport).roundTrip", it should be ignored without roundTrip - {Name: "net/http.Header.writeSubset"}, - {Name: "net/http.serverHandler.ServeHTTP"}, - } - - info := process.Info{ - ID: 1, - Functions: httpFuncs, - GoVersion: ver, - Modules: map[string]*semver.Version{}, - } - m.FilterUnusedProbes(&info) + "net/http.Header.writeSubset", + "net/http.serverHandler.ServeHTTP", + ) assert.Len(t, m.probes, 1) }) } @@ -192,10 +152,9 @@ func TestDependencyChecks(t *testing.T) { }) } -func fakeManager(t *testing.T) *Manager { +func fakeManager(t *testing.T, fnNames ...string) *Manager { logger := slog.Default() - m, err := NewManager( - logger, nil, NewNoopConfigProvider(nil), + probes := []probe.Probe{ grpcClient.New(logger, ""), grpcServer.New(logger, ""), httpServer.New(logger, ""), @@ -205,9 +164,27 @@ func fakeManager(t *testing.T) *Manager { kafkaConsumer.New(logger, ""), autosdk.New(logger), otelTraceGlobal.New(logger), - ) - assert.NoError(t, err) - assert.NotNil(t, m) + } + ver := semver.New(1, 20, 0, "", "") + var fn []*binary.Func + for _, name := range fnNames { + fn = append(fn, &binary.Func{Name: name}) + } + m := &Manager{ + logger: slog.Default(), + cp: NewNoopConfigProvider(nil), + probes: make(map[probe.ID]probe.Probe), + proc: &process.Info{ + ID: 1, + Functions: fn, + GoVersion: ver, + Modules: map[string]*semver.Version{}, + }, + } + for _, p := range probes { + m.probes[p.Manifest().ID] = p + } + m.filterUnusedProbes() return m } @@ -264,6 +241,7 @@ func TestRunStoppingByContext(t *testing.T) { logger: slog.Default(), probes: map[probe.ID]probe.Probe{{}: p}, cp: NewNoopConfigProvider(nil), + proc: new(process.Info), } mockExeAndBpffs(t) @@ -271,7 +249,7 @@ func TestRunStoppingByContext(t *testing.T) { ctx, stopCtx := context.WithCancel(context.Background()) errCh := make(chan error, 1) - err = m.Load(ctx, &process.Info{ID: 1000}) + err = m.Load(ctx) require.NoError(t, err) go func() { errCh <- m.Run(ctx) }() @@ -313,6 +291,7 @@ func TestRunStoppingByStop(t *testing.T) { logger: slog.Default(), probes: map[probe.ID]probe.Probe{{}: &p}, cp: NewNoopConfigProvider(nil), + proc: new(process.Info), } mockExeAndBpffs(t) @@ -320,7 +299,7 @@ func TestRunStoppingByStop(t *testing.T) { ctx := context.Background() errCh := make(chan error, 1) - err = m.Load(ctx, &process.Info{ID: 1000}) + err = m.Load(ctx) require.NoError(t, err) time.AfterFunc(100*time.Millisecond, func() { @@ -445,12 +424,13 @@ func TestConfigProvider(t *testing.T) { netHTTPClientLibID: {TracesEnabled: &falseVal}, }, }), + proc: new(process.Info), } mockExeAndBpffs(t) runCtx, cancel := context.WithCancel(context.Background()) - err := m.Load(runCtx, &process.Info{ID: 1000}) + err := m.Load(runCtx) require.NoError(t, err) runErr := make(chan error, 1) @@ -577,6 +557,7 @@ func TestRunStopDeadlock(t *testing.T) { logger: slog.Default(), probes: map[probe.ID]probe.Probe{{}: p}, cp: NewNoopConfigProvider(nil), + proc: new(process.Info), } mockExeAndBpffs(t) @@ -584,7 +565,7 @@ func TestRunStopDeadlock(t *testing.T) { ctx, stopCtx := context.WithCancel(context.Background()) errCh := make(chan error, 1) - err = m.Load(ctx, &process.Info{ID: 1000}) + err = m.Load(ctx) require.NoError(t, err) go func() { errCh <- m.Run(ctx) }() @@ -625,6 +606,7 @@ func TestStopBeforeLoad(t *testing.T) { logger: slog.Default(), probes: map[probe.ID]probe.Probe{{}: &p}, cp: NewNoopConfigProvider(nil), + proc: new(process.Info), } mockExeAndBpffs(t) @@ -645,11 +627,12 @@ func TestStopBeforeRun(t *testing.T) { logger: slog.Default(), probes: map[probe.ID]probe.Probe{{}: &p}, cp: NewNoopConfigProvider(nil), + proc: new(process.Info), } mockExeAndBpffs(t) - err = m.Load(context.Background(), &process.Info{ID: 1000}) + err = m.Load(context.Background()) require.NoError(t, err) require.True(t, p.loaded.Load())