Skip to content

Commit 6d88657

Browse files
Merge pull request #21 from C4T-BuT-S4D/pomo/queue-improvements
Add timeouts scaling, add queue size metrics
2 parents 1ba3943 + dc1b77f commit 6d88657

12 files changed

+193
-33
lines changed

cmd/client/cli/run.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func parseJobsFlag(cmd *cobra.Command, name string) int {
2626
logrus.Fatalf("Could not get jobs number: %v", err)
2727
}
2828
if jobs < 0 {
29-
logrus.Fatal("run: job count should be non-negavtive")
29+
logrus.Fatal("run: job count should be non-negative")
3030
}
3131
return jobs
3232
}
@@ -42,13 +42,21 @@ func NewRun(cmd *cobra.Command, _ []string, cfg *client.Config) NeoCLI {
4242

4343
jobs := parseJobsFlag(cmd, "jobs")
4444
endlessJobs := parseJobsFlag(cmd, "endless-jobs")
45+
timeoutScaleTarget, err := cmd.Flags().GetFloat64("timeout-autoscale-target")
46+
if err != nil {
47+
logrus.Fatalf("Could not get timeout-autoscale-target flag: %v", err)
48+
}
49+
if timeoutScaleTarget < 0 {
50+
logrus.Fatalf("timeout-autoscale-target should be non-negative")
51+
}
4552

4653
neocli.Weight = jobs
4754
cli.sender = joblogger.NewRemoteSender(neocli)
4855
cli.run = exploit.NewRunner(
4956
cli.ClientID(),
5057
jobs,
5158
endlessJobs,
59+
timeoutScaleTarget,
5260
cfg,
5361
neocli,
5462
cli.sender,

cmd/client/cmd/run.go

+6
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,10 @@ func init() {
2929
rootCmd.AddCommand(runCmd)
3030
runCmd.Flags().IntP("jobs", "j", runtime.NumCPU()*cli.JobsPerCPU, "workers to run")
3131
runCmd.Flags().IntP("endless-jobs", "e", 0, "workers to run for endless mode. Default is 0 for no endless mode")
32+
runCmd.Flags().Float64(
33+
"timeout-autoscale-target",
34+
1.5,
35+
"target upper bound for recurrent exploit worker utilization by scaling timeouts."+
36+
" Setting this to 0 disables scaling",
37+
)
3238
}

internal/exploit/metrics.go

+18-4
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,16 @@ package exploit
33
import (
44
"github.com/prometheus/client_golang/prometheus"
55
"github.com/prometheus/client_golang/prometheus/promauto"
6-
"github.com/samber/lo"
76
)
87

98
type Metrics struct {
109
FlagsSubmitted *prometheus.CounterVec
1110
Teams prometheus.Gauge
11+
Queue *prometheus.GaugeVec
1212
}
1313

1414
func NewMetrics(namespace string) *Metrics {
1515
const subsystem = "exploit_runner"
16-
targetLabels := []string{"target_id", "target_ip"}
17-
exploitLabels := []string{"exploit_id", "exploit_version", "exploit_type"}
1816

1917
return &Metrics{
2018
FlagsSubmitted: promauto.NewCounterVec(
@@ -24,7 +22,13 @@ func NewMetrics(namespace string) *Metrics {
2422
Name: "flags_submitted_total",
2523
Help: "Number of exploits finished",
2624
},
27-
lo.Union(targetLabels, exploitLabels),
25+
[]string{
26+
"target_id",
27+
"target_ip",
28+
"exploit_id",
29+
"exploit_version",
30+
"exploit_type",
31+
},
2832
),
2933

3034
Teams: promauto.NewGauge(prometheus.GaugeOpts{
@@ -33,5 +37,15 @@ func NewMetrics(namespace string) *Metrics {
3337
Name: "teams",
3438
Help: "Number of teams scheduled for the current runner",
3539
}),
40+
41+
Queue: promauto.NewGaugeVec(
42+
prometheus.GaugeOpts{
43+
Namespace: namespace,
44+
Subsystem: subsystem,
45+
Name: "queue",
46+
Help: "Number of exploits in the queue",
47+
},
48+
[]string{"type"},
49+
),
3650
}
3751
}

internal/exploit/models.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,16 @@ func (r *FullResult) MetricLabels() prometheus.Labels {
4242
}
4343

4444
type State struct {
45-
ID string
46-
Version int64
47-
Dir string
48-
Path string
49-
Disabled bool
50-
Endless bool
51-
RunEvery time.Duration
52-
LastRun time.Time
53-
Timeout time.Duration
45+
ID string
46+
Version int64
47+
Dir string
48+
Path string
49+
Disabled bool
50+
Endless bool
51+
RunEvery time.Duration
52+
LastRun time.Time
53+
ScaledTimeout time.Duration
54+
Timeout time.Duration
5455
}
5556

5657
func (s *State) ExploitType() models.ExploitType {

internal/exploit/runner.go

+20-11
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,23 @@ var (
3131
func NewRunner(
3232
clientID string,
3333
maxJobs, maxEndlessJobs int,
34+
timeoutScaleTarget float64,
3435
clientConfig *client.Config,
3536
c *client.Client,
3637
logSender joblogger.Sender,
3738
) *Runner {
3839
return &Runner{
39-
storage: NewStorage(NewCache(), clientConfig.ExploitDir, c),
40-
cfg: &config.ExploitsConfig{},
41-
client: c,
42-
maxJobs: maxJobs,
43-
maxEndlessJobs: maxEndlessJobs,
44-
singleRuns: make(chan *epb.SingleRunSubscribeResponse),
45-
restarts: make(chan struct{}, 1),
46-
logSender: logSender,
40+
storage: NewStorage(NewCache(), clientConfig.ExploitDir, c),
41+
cfg: &config.ExploitsConfig{},
42+
client: c,
43+
44+
maxJobs: maxJobs,
45+
maxEndlessJobs: maxEndlessJobs,
46+
timeoutScaleTarget: timeoutScaleTarget,
47+
48+
singleRuns: make(chan *epb.SingleRunSubscribeResponse),
49+
restarts: make(chan struct{}, 1),
50+
logSender: logSender,
4751
metricsPusher: push.
4852
New(clientConfig.MetricsHost, "neo_runner").
4953
Grouping("client_id", clientID).
@@ -63,8 +67,9 @@ type Runner struct {
6367
metricsPusher *push.Pusher
6468
metrics *Metrics
6569

66-
maxJobs int
67-
maxEndlessJobs int
70+
maxJobs int
71+
maxEndlessJobs int
72+
timeoutScaleTarget float64
6873

6974
simpleLoop *submitLoop
7075
endlessLoop *submitLoop
@@ -371,6 +376,10 @@ func (r *Runner) onServerStateUpdate(ctx context.Context, state *epb.ServerState
371376
}
372377

373378
if r.storage.UpdateExploits(ctx, state.Exploits) {
379+
if r.timeoutScaleTarget > 0 {
380+
r.storage.ScaleTimeouts(r.maxJobs, len(r.teams), r.timeoutScaleTarget)
381+
}
382+
374383
r.logger.Info("Exploits changed, scheduling loops restart")
375384
r.restartLoops()
376385
}
@@ -400,7 +409,7 @@ func CreateExploitJobs(
400409
ex.Path,
401410
ex.Dir,
402411
environ,
403-
ex.Timeout,
412+
ex.ScaledTimeout,
404413
joblogger.New(ex.ID, ex.Version, ip, sender),
405414
))
406415
}

internal/exploit/storage.go

+49-8
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,46 @@ func (s *Storage) UpdateExploits(ctx context.Context, exs []*epb.ExploitState) b
6161
return true
6262
}
6363

64+
func (s *Storage) ScaleTimeouts(workers, teams int, target float64) {
65+
// Alpha is a worker usage coefficient.
66+
// For example, an exploit with timeout 10s and run every 20s
67+
// Uses half of the worker's time for each team, so if teams = 4,
68+
// exploit will use 2 full workers.
69+
// Alpha in the case above will be 10/20 = 0.5 after the loop,
70+
// if workers = 2 its final value will be 0.5 * 4 / 2 = 1,
71+
// which means full worker utilization.
72+
// If it's smaller, we could increase the timeouts, if larger --
73+
// decrease them proportionally to their original values.
74+
// Target allows to specify the desired Alpha value,
75+
// as in most cases exploits finish before timeout,
76+
// and "safe" case with target = 1 leads to
77+
// suboptimal worker utilization.
78+
// NB 1: endless exploits are not scaled.
79+
// NB 2: timeouts are rounded down to nearest second.
80+
alpha := 0.0
81+
82+
for _, ex := range s.cache.Exploits() {
83+
if ex.Endless {
84+
continue
85+
}
86+
alpha += ex.Timeout.Seconds() / ex.RunEvery.Seconds()
87+
}
88+
alpha = alpha * float64(teams) / float64(workers)
89+
logrus.Infof("Scaling timeouts: alpha = %.2f, target = %.2f", alpha, target)
90+
for _, ex := range s.cache.Exploits() {
91+
if ex.Endless {
92+
continue
93+
}
94+
newTimeout := time.Duration(float64(ex.Timeout) * target / alpha)
95+
96+
// Round down to nearest second.
97+
newTimeout -= newTimeout % time.Second
98+
99+
logrus.Infof("Scaling timeout for exploit %s: %s -> %s", ex.ID, ex.ScaledTimeout, newTimeout)
100+
ex.ScaledTimeout = newTimeout
101+
}
102+
}
103+
64104
func (s *Storage) updateExploit(ctx context.Context, exploitID string) (*State, error) {
65105
// Download the current exploit state.
66106
resp, err := s.client.Exploit(ctx, exploitID)
@@ -115,14 +155,15 @@ func (s *Storage) updateExploit(ctx context.Context, exploitID string) (*State,
115155
}
116156

117157
res := &State{
118-
ID: state.ExploitId,
119-
Version: state.Version,
120-
Dir: "",
121-
Path: entryPath,
122-
Disabled: state.Config.Disabled,
123-
Endless: state.Config.Endless,
124-
RunEvery: state.Config.RunEvery.AsDuration(),
125-
Timeout: state.Config.Timeout.AsDuration(),
158+
ID: state.ExploitId,
159+
Version: state.Version,
160+
Dir: "",
161+
Path: entryPath,
162+
Disabled: state.Config.Disabled,
163+
Endless: state.Config.Endless,
164+
RunEvery: state.Config.RunEvery.AsDuration(),
165+
ScaledTimeout: state.Config.Timeout.AsDuration(),
166+
Timeout: state.Config.Timeout.AsDuration(),
126167
}
127168
if state.Config.IsArchive {
128169
res.Dir = oPath

internal/exploit/storage_test.go

+67
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"path"
88
"strings"
99
"testing"
10+
"time"
1011

1112
"github.com/google/go-cmp/cmp"
1213
"github.com/google/go-cmp/cmp/cmpopts"
@@ -168,3 +169,69 @@ func Test_prepareEntry(t *testing.T) {
168169
// Check that file is executable.
169170
require.NotZero(t, fi.Mode()&0111)
170171
}
172+
173+
func TestStorage_Scale(t *testing.T) {
174+
st, cleanup := mockStorage()
175+
defer func() {
176+
require.NoError(t, cleanup())
177+
}()
178+
179+
// This exploit's timeout should be halved, as teams = 2 * workers.
180+
st.cache.Update([]*State{
181+
{
182+
ID: "1",
183+
Version: 1,
184+
RunEvery: time.Minute,
185+
ScaledTimeout: time.Minute,
186+
Timeout: time.Minute,
187+
},
188+
})
189+
st.ScaleTimeouts(10, 20, 1)
190+
191+
res, ok := st.Exploit("1")
192+
require.True(t, ok)
193+
require.EqualValues(t, 1, res.Version)
194+
require.EqualValues(t, time.Minute, res.RunEvery)
195+
require.EqualValues(t, 30*time.Second, res.ScaledTimeout)
196+
197+
// Now it should be doubled, as workers = 2 * teams.
198+
st.ScaleTimeouts(20, 10, 1)
199+
200+
res, ok = st.Exploit("1")
201+
require.True(t, ok)
202+
require.EqualValues(t, time.Minute, res.RunEvery)
203+
require.EqualValues(t, 2*time.Minute, res.ScaledTimeout)
204+
205+
// Add another exploit, expect scale to work proportionally to original timeouts.
206+
st.cache.Update([]*State{
207+
{
208+
ID: "2",
209+
Version: 1,
210+
RunEvery: time.Minute,
211+
ScaledTimeout: time.Minute,
212+
Timeout: time.Minute,
213+
},
214+
})
215+
st.ScaleTimeouts(20, 10, 1)
216+
217+
res, ok = st.Exploit("1")
218+
require.True(t, ok)
219+
require.EqualValues(t, time.Minute, res.RunEvery)
220+
require.EqualValues(t, time.Minute, res.ScaledTimeout)
221+
222+
res, ok = st.Exploit("2")
223+
require.True(t, ok)
224+
require.EqualValues(t, time.Minute, res.RunEvery)
225+
require.EqualValues(t, time.Minute, res.ScaledTimeout)
226+
227+
// Scale with target = 2, expect exploit timeouts to scale up.
228+
st.ScaleTimeouts(20, 10, 2)
229+
230+
res, ok = st.Exploit("1")
231+
require.True(t, ok)
232+
require.EqualValues(t, 2*time.Minute, res.ScaledTimeout)
233+
234+
res, ok = st.Exploit("2")
235+
require.True(t, ok)
236+
require.EqualValues(t, 2*time.Minute, res.ScaledTimeout)
237+
}

internal/exploit/submit_loop.go

+1
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ func (l *submitLoop) Start(ctx context.Context) {
126126
}
127127
case <-t.C:
128128
flush()
129+
l.metrics.Queue.WithLabelValues(string(l.q.Type())).Set(float64(l.q.Size()))
129130
case <-ctx.Done():
130131
return
131132
}

internal/exploit/submit_loop_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,10 @@ func (m *mockQueue) Type() queue.Type {
330330
return "mock"
331331
}
332332

333+
func (m *mockQueue) Size() int {
334+
return len(m.in)
335+
}
336+
333337
func (m *mockQueue) Start(ctx context.Context) {
334338
<-ctx.Done()
335339
}

internal/queue/endless.go

+4
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ func (q *endlessQueue) Type() Type {
5050
return TypeEndless
5151
}
5252

53+
func (q *endlessQueue) Size() int {
54+
return len(q.c)
55+
}
56+
5357
// Start is synchronous.
5458
// Cancel the start's context to stop the queue.
5559
func (q *endlessQueue) Start(ctx context.Context) {

internal/queue/queue.go

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type Queue interface {
3131
Add(*Job) error
3232
Results() <-chan *Output
3333
Type() Type
34+
Size() int
3435

3536
fmt.Stringer
3637
}

internal/queue/simple.go

+4
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ func (q *simpleQueue) Type() Type {
4949
return TypeSimple
5050
}
5151

52+
func (q *simpleQueue) Size() int {
53+
return len(q.c)
54+
}
55+
5256
// Start is synchronous.
5357
// Cancel the start's context to stop the queue.
5458
func (q *simpleQueue) Start(ctx context.Context) {

0 commit comments

Comments
 (0)