Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS Go SDK V2 Upgrade, Single Measure Support #3

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ bin
# High Dynamic Range (HDR) Histogram files
*.hdr

# configs
config
config.yaml

# artifacts
data
1 change: 1 addition & 0 deletions cmd/tsbs_generate_queries/databases/timestream/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const goTimeFmt = "2006-01-02 15:04:05.999999 -0700"
// BaseGenerator contains settings specific for Timestream
type BaseGenerator struct {
DBName string
SingleMeasure bool
}

// GenerateEmptyQuery returns an empty query.TimescaleDB.
Expand Down
139 changes: 111 additions & 28 deletions cmd/tsbs_generate_queries/databases/timestream/devops.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ func panicIfErr(err error) {
}

const (
oneMinute = 60
oneHour = oneMinute * 60

oneMinute = 60
oneHour = oneMinute * 60
timeBucketFmt = "bin(time, %ds)"
)

Expand Down Expand Up @@ -49,28 +48,45 @@ func (d *Devops) getHostWhereString(nHosts int) string {
return d.getHostWhereWithHostnames(hostnames)
}

// getMeasureNameWhereString returns a WHERE clause for the given measure names for single-measure records.
// For example, given ["a", "b"], it returns: (measure_name = 'a' OR measure_name = 'b')
func (d *Devops) getMeasureNameWhereString(measureNames []string) string {
var measureClauses []string
for _, s := range measureNames {
measureClauses = append(measureClauses, fmt.Sprintf("measure_name = '%s'", s))
}
combinedMeasureClause := strings.Join(measureClauses, " OR ")
return "(" + combinedMeasureClause + ")"
}

func (d *Devops) getTimeBucket(seconds int) string {
return fmt.Sprintf(timeBucketFmt, seconds)
}

// getSelectClausesAggMetrics returns the SELECT clauses for aggregated metrics.
// In single-measure mode the column reference is replaced by a CASE expression.
func (d *Devops) getSelectClausesAggMetrics(agg string, metrics []string) []string {
selectClauses := make([]string, len(metrics))
for i, m := range metrics {
selectClauses[i] = fmt.Sprintf("%[1]s(%[2]s) as %[1]s_%[2]s", agg, m)
if d.SingleMeasure {
selectClauses[i] = fmt.Sprintf("%s(case when measure_name = '%s' THEN measure_value::double ELSE NULL END) as %s_%s", agg, m, agg, m)
} else {
selectClauses[i] = fmt.Sprintf("%s(%s) as %s_%s", agg, m, agg, m)
}
}

return selectClauses
}

// GroupByTime selects the MAX for numMetrics metrics under 'cpu',
// per minute for nhosts hosts,
// per minute for nHosts hosts,
// e.g. in pseudo-SQL:
//
// SELECT minute, max(metric1), ..., max(metricN)
// FROM cpu
// WHERE (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N')
// AND time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY minute ORDER BY minute ASC
// In single-measure mode, it adds an extra filter on measure_name.
func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) {
interval := d.Interval.MustRandWindow(timeRange)
metrics, err := devops.GetCPUMetricsSlice(numMetrics)
Expand All @@ -80,15 +96,23 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t
panic(fmt.Sprintf("invalid number of select clauses: got %d", len(selectClauses)))
}

var whereClause string
if d.SingleMeasure {
// When in single-measure mode, filter on both measure_name and host.
whereClause = fmt.Sprintf("%s AND %s", d.getMeasureNameWhereString(metrics), d.getHostWhereString(nHosts))
} else {
whereClause = d.getHostWhereString(nHosts)
}

sql := fmt.Sprintf(`SELECT %s AS minute,
%s
FROM "%s"."cpu"
WHERE %s AND %s AND time >= '%s' AND time < '%s'
WHERE %s AND time >= '%s' AND time < '%s'
GROUP BY 1 ORDER BY 1 ASC`,
d.getTimeBucket(oneMinute),
strings.Join(selectClauses, ",\n"),
d.DBName,
d.getHostWhereString(nHosts),
whereClause,
interval.Start().Format(goTimeFmt),
interval.End().Format(goTimeFmt))

Expand All @@ -97,23 +121,36 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t
d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql)
}

// GroupByOrderByLimit populates a query.Query that has a time WHERE clause, that groups by a truncated date, orders by that date, and takes a limit:
// GroupByOrderByLimit creates a query that groups by a truncated minute, orders by that value, and limits the result:
// SELECT time_bucket('1 minute', time) AS t, MAX(cpu) FROM cpu
// WHERE time < '$TIME'
// GROUP BY t ORDER BY t DESC
// LIMIT $LIMIT
// In single-measure mode, it adds a filter on measure_name and casts measure_value.
func (d *Devops) GroupByOrderByLimit(qi query.Query) {
interval := d.Interval.MustRandWindow(time.Hour)
sql := fmt.Sprintf(`SELECT %s AS minute, max(usage_user) as max_usage_user
var sql string
if d.SingleMeasure {
sql = fmt.Sprintf(`SELECT %s AS minute, max(measure_value::double) as max_usage_user
FROM "%s"."cpu"
WHERE time < '%s' AND measure_name = 'usage_user'
GROUP BY 1
ORDER BY 1 DESC
LIMIT 5`,
d.getTimeBucket(oneMinute),
d.DBName,
interval.End().Format(goTimeFmt))
} else {
sql = fmt.Sprintf(`SELECT %s AS minute, max(usage_user) as max_usage_user
FROM "%s"."cpu"
WHERE time < '%s'
GROUP BY 1
ORDER BY 1 DESC
LIMIT 5`,
d.getTimeBucket(oneMinute),
d.DBName,
interval.End().Format(goTimeFmt))

d.getTimeBucket(oneMinute),
d.DBName,
interval.End().Format(goTimeFmt))
}
humanLabel := "Timestream max cpu over last 5 min-intervals (random end)"
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.EndString())
d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql)
Expand All @@ -126,6 +163,7 @@ func (d *Devops) GroupByOrderByLimit(qi query.Query) {
// FROM cpu
// WHERE time >= '$HOUR_START' AND time < '$HOUR_END'
// GROUP BY hour, hostname ORDER BY hour
// In single-measure mode, it uses a CASE expression to filter by measure_name.
func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
metrics, err := devops.GetCPUMetricsSlice(numMetrics)
panicIfErr(err)
Expand All @@ -135,7 +173,11 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
meanClauses := make([]string, numMetrics)
for i, m := range metrics {
meanClauses[i] = "mean_" + m
selectClauses[i] = fmt.Sprintf("avg(%s) as %s", m, meanClauses[i])
if d.SingleMeasure {
selectClauses[i] = fmt.Sprintf("avg (case when measure_name = '%s' THEN measure_value::double ELSE NULL END) as %s", m, meanClauses[i])
} else {
selectClauses[i] = fmt.Sprintf("avg(%s) as %s", m, meanClauses[i])
}
}

sql := fmt.Sprintf(`
Expand Down Expand Up @@ -185,9 +227,30 @@ func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) {
d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql)
}

// LastPointPerHost finds the last row for every host in the dataset
// LastPointPerHost finds the last row for every host in the dataset.
// In single-measure mode, it groups by both hostname and measure_name.
func (d *Devops) LastPointPerHost(qi query.Query) {
sql := fmt.Sprintf(`
var sql string
if d.SingleMeasure {
sql = fmt.Sprintf(`
WITH latest_recorded_time AS (
SELECT
hostname,
measure_name,
max(time) as latest_time
FROM "%s"."cpu"
GROUP BY 1, 2
)
SELECT b.hostname,
b.measure_name,
b.measure_value::double,
b.time
FROM latest_recorded_time a
JOIN "%s"."cpu" b
ON a.hostname = b.hostname AND a.latest_time = b.time AND a.measure_name = b.measure_name
ORDER BY hostname, measure_name`, d.DBName, d.DBName)
} else {
sql = fmt.Sprintf(`
WITH latest_recorded_time AS (
SELECT
hostname,
Expand All @@ -200,19 +263,21 @@ func (d *Devops) LastPointPerHost(qi query.Query) {
JOIN "%s"."cpu" b
ON a.hostname = b.hostname AND a.latest_time = b.time
ORDER BY hostname`, d.DBName, d.DBName)
}
humanLabel := "Timestream last row per host"
humanDesc := humanLabel
d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql)
}

// HighCPUForHosts populates a query that gets CPU metrics when the CPU has high
// usage between a time period for a number of hosts (if 0, it will search all hosts),
// HighCPUForHosts creates a query that returns CPU metrics when CPU usage is high,
// for a given number of hosts,
// e.g. in pseudo-SQL:
//
// SELECT * FROM cpu
// WHERE usage_user > 90.0
// AND time >= '$TIME_START' AND time < '$TIME_END'
// AND (hostname = '$HOST' OR hostname = '$HOST2'...)
// In single-measure mode, it adds a measure_name filter and casts measure_value.
func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) {
var hostWhereClause string
if nHosts == 0 {
Expand All @@ -221,8 +286,27 @@ func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) {
hostWhereClause = fmt.Sprintf("AND %s", d.getHostWhereString(nHosts))
}
interval := d.Interval.MustRandWindow(devops.HighCPUDuration)

sql := fmt.Sprintf(`
var sql string
if d.SingleMeasure {
sql = fmt.Sprintf(`
WITH usage_over_ninety AS (
SELECT time,
hostname
FROM "%s"."cpu"
WHERE measure_name = 'usage_user' AND measure_value::double > 90
AND time >= '%s' AND time < '%s'
%s
)
SELECT *
FROM "%s"."cpu" a
JOIN usage_over_ninety b ON a.hostname = b.hostname AND a.time = b.time`,
d.DBName,
interval.Start().Format(goTimeFmt),
interval.End().Format(goTimeFmt),
hostWhereClause,
d.DBName)
} else {
sql = fmt.Sprintf(`
WITH usage_over_ninety AS (
SELECT time,
hostname
Expand All @@ -234,13 +318,12 @@ func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) {
SELECT *
FROM "%s"."cpu" a
JOIN usage_over_ninety b ON a.hostname = b.hostname AND a.time = b.time`,
d.DBName,
interval.Start().Format(goTimeFmt),
interval.End().Format(goTimeFmt),
hostWhereClause,
d.DBName,
)

d.DBName,
interval.Start().Format(goTimeFmt),
interval.End().Format(goTimeFmt),
hostWhereClause,
d.DBName)
}
humanLabel, err := devops.GetHighCPULabel("Timestream", nHosts)
panicIfErr(err)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
Expand Down
Loading