diff --git a/.gitignore b/.gitignore index ec3765b58..37cfcb66b 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,9 @@ bin # High Dynamic Range (HDR) Histogram files *.hdr +# configs +config config.yaml + +# artifacts +data diff --git a/cmd/tsbs_generate_queries/databases/timestream/common.go b/cmd/tsbs_generate_queries/databases/timestream/common.go index ef3bfde5f..e92eec1ce 100644 --- a/cmd/tsbs_generate_queries/databases/timestream/common.go +++ b/cmd/tsbs_generate_queries/databases/timestream/common.go @@ -13,6 +13,7 @@ const goTimeFmt = "2006-01-02 15:04:05.999999 -0700" // BaseGenerator contains settings specific for Timestream type BaseGenerator struct { DBName string + SingleMeasure bool } // GenerateEmptyQuery returns an empty query.TimescaleDB. diff --git a/cmd/tsbs_generate_queries/databases/timestream/devops.go b/cmd/tsbs_generate_queries/databases/timestream/devops.go index 862e323e4..58dca157c 100644 --- a/cmd/tsbs_generate_queries/databases/timestream/devops.go +++ b/cmd/tsbs_generate_queries/databases/timestream/devops.go @@ -17,9 +17,8 @@ func panicIfErr(err error) { } const ( - oneMinute = 60 - oneHour = oneMinute * 60 - + oneMinute = 60 + oneHour = oneMinute * 60 timeBucketFmt = "bin(time, %ds)" ) @@ -49,21 +48,37 @@ func (d *Devops) getHostWhereString(nHosts int) string { return d.getHostWhereWithHostnames(hostnames) } +// getMeasureNameWhereString returns a WHERE clause for the given measure names for single-measure records. +// For example, given ["a", "b"], it returns: (measure_name = 'a' OR measure_name = 'b') +func (d *Devops) getMeasureNameWhereString(measureNames []string) string { + var measureClauses []string + for _, s := range measureNames { + measureClauses = append(measureClauses, fmt.Sprintf("measure_name = '%s'", s)) + } + combinedMeasureClause := strings.Join(measureClauses, " OR ") + return "(" + combinedMeasureClause + ")" +} + func (d *Devops) getTimeBucket(seconds int) string { return fmt.Sprintf(timeBucketFmt, seconds) } +// getSelectClausesAggMetrics returns the SELECT clauses for aggregated metrics. +// In single-measure mode the column reference is replaced by a CASE expression. func (d *Devops) getSelectClausesAggMetrics(agg string, metrics []string) []string { selectClauses := make([]string, len(metrics)) for i, m := range metrics { - selectClauses[i] = fmt.Sprintf("%[1]s(%[2]s) as %[1]s_%[2]s", agg, m) + if d.SingleMeasure { + selectClauses[i] = fmt.Sprintf("%s(case when measure_name = '%s' THEN measure_value::double ELSE NULL END) as %s_%s", agg, m, agg, m) + } else { + selectClauses[i] = fmt.Sprintf("%s(%s) as %s_%s", agg, m, agg, m) + } } - return selectClauses } // GroupByTime selects the MAX for numMetrics metrics under 'cpu', -// per minute for nhosts hosts, +// per minute for nHosts hosts, // e.g. in pseudo-SQL: // // SELECT minute, max(metric1), ..., max(metricN) @@ -71,6 +86,7 @@ func (d *Devops) getSelectClausesAggMetrics(agg string, metrics []string) []stri // WHERE (hostname = '$HOSTNAME_1' OR ... OR hostname = '$HOSTNAME_N') // AND time >= '$HOUR_START' AND time < '$HOUR_END' // GROUP BY minute ORDER BY minute ASC +// In single-measure mode, it adds an extra filter on measure_name. func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) { interval := d.Interval.MustRandWindow(timeRange) metrics, err := devops.GetCPUMetricsSlice(numMetrics) @@ -80,15 +96,23 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t panic(fmt.Sprintf("invalid number of select clauses: got %d", len(selectClauses))) } + var whereClause string + if d.SingleMeasure { + // When in single-measure mode, filter on both measure_name and host. + whereClause = fmt.Sprintf("%s AND %s", d.getMeasureNameWhereString(metrics), d.getHostWhereString(nHosts)) + } else { + whereClause = d.getHostWhereString(nHosts) + } + sql := fmt.Sprintf(`SELECT %s AS minute, %s FROM "%s"."cpu" - WHERE %s AND %s AND time >= '%s' AND time < '%s' + WHERE %s AND time >= '%s' AND time < '%s' GROUP BY 1 ORDER BY 1 ASC`, d.getTimeBucket(oneMinute), strings.Join(selectClauses, ",\n"), d.DBName, - d.getHostWhereString(nHosts), + whereClause, interval.Start().Format(goTimeFmt), interval.End().Format(goTimeFmt)) @@ -97,23 +121,36 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql) } -// GroupByOrderByLimit populates a query.Query that has a time WHERE clause, that groups by a truncated date, orders by that date, and takes a limit: +// GroupByOrderByLimit creates a query that groups by a truncated minute, orders by that value, and limits the result: // SELECT time_bucket('1 minute', time) AS t, MAX(cpu) FROM cpu // WHERE time < '$TIME' // GROUP BY t ORDER BY t DESC // LIMIT $LIMIT +// In single-measure mode, it adds a filter on measure_name and casts measure_value. func (d *Devops) GroupByOrderByLimit(qi query.Query) { interval := d.Interval.MustRandWindow(time.Hour) - sql := fmt.Sprintf(`SELECT %s AS minute, max(usage_user) as max_usage_user + var sql string + if d.SingleMeasure { + sql = fmt.Sprintf(`SELECT %s AS minute, max(measure_value::double) as max_usage_user + FROM "%s"."cpu" + WHERE time < '%s' AND measure_name = 'usage_user' + GROUP BY 1 + ORDER BY 1 DESC + LIMIT 5`, + d.getTimeBucket(oneMinute), + d.DBName, + interval.End().Format(goTimeFmt)) + } else { + sql = fmt.Sprintf(`SELECT %s AS minute, max(usage_user) as max_usage_user FROM "%s"."cpu" WHERE time < '%s' GROUP BY 1 ORDER BY 1 DESC LIMIT 5`, - d.getTimeBucket(oneMinute), - d.DBName, - interval.End().Format(goTimeFmt)) - + d.getTimeBucket(oneMinute), + d.DBName, + interval.End().Format(goTimeFmt)) + } humanLabel := "Timestream max cpu over last 5 min-intervals (random end)" humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.EndString()) d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql) @@ -126,6 +163,7 @@ func (d *Devops) GroupByOrderByLimit(qi query.Query) { // FROM cpu // WHERE time >= '$HOUR_START' AND time < '$HOUR_END' // GROUP BY hour, hostname ORDER BY hour +// In single-measure mode, it uses a CASE expression to filter by measure_name. func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { metrics, err := devops.GetCPUMetricsSlice(numMetrics) panicIfErr(err) @@ -135,7 +173,11 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { meanClauses := make([]string, numMetrics) for i, m := range metrics { meanClauses[i] = "mean_" + m - selectClauses[i] = fmt.Sprintf("avg(%s) as %s", m, meanClauses[i]) + if d.SingleMeasure { + selectClauses[i] = fmt.Sprintf("avg (case when measure_name = '%s' THEN measure_value::double ELSE NULL END) as %s", m, meanClauses[i]) + } else { + selectClauses[i] = fmt.Sprintf("avg(%s) as %s", m, meanClauses[i]) + } } sql := fmt.Sprintf(` @@ -185,9 +227,30 @@ func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql) } -// LastPointPerHost finds the last row for every host in the dataset +// LastPointPerHost finds the last row for every host in the dataset. +// In single-measure mode, it groups by both hostname and measure_name. func (d *Devops) LastPointPerHost(qi query.Query) { - sql := fmt.Sprintf(` + var sql string + if d.SingleMeasure { + sql = fmt.Sprintf(` + WITH latest_recorded_time AS ( + SELECT + hostname, + measure_name, + max(time) as latest_time + FROM "%s"."cpu" + GROUP BY 1, 2 + ) + SELECT b.hostname, + b.measure_name, + b.measure_value::double, + b.time + FROM latest_recorded_time a + JOIN "%s"."cpu" b + ON a.hostname = b.hostname AND a.latest_time = b.time AND a.measure_name = b.measure_name + ORDER BY hostname, measure_name`, d.DBName, d.DBName) + } else { + sql = fmt.Sprintf(` WITH latest_recorded_time AS ( SELECT hostname, @@ -200,19 +263,21 @@ func (d *Devops) LastPointPerHost(qi query.Query) { JOIN "%s"."cpu" b ON a.hostname = b.hostname AND a.latest_time = b.time ORDER BY hostname`, d.DBName, d.DBName) + } humanLabel := "Timestream last row per host" humanDesc := humanLabel d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql) } -// HighCPUForHosts populates a query that gets CPU metrics when the CPU has high -// usage between a time period for a number of hosts (if 0, it will search all hosts), +// HighCPUForHosts creates a query that returns CPU metrics when CPU usage is high, +// for a given number of hosts, // e.g. in pseudo-SQL: // // SELECT * FROM cpu // WHERE usage_user > 90.0 // AND time >= '$TIME_START' AND time < '$TIME_END' // AND (hostname = '$HOST' OR hostname = '$HOST2'...) +// In single-measure mode, it adds a measure_name filter and casts measure_value. func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { var hostWhereClause string if nHosts == 0 { @@ -221,8 +286,27 @@ func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { hostWhereClause = fmt.Sprintf("AND %s", d.getHostWhereString(nHosts)) } interval := d.Interval.MustRandWindow(devops.HighCPUDuration) - - sql := fmt.Sprintf(` + var sql string + if d.SingleMeasure { + sql = fmt.Sprintf(` + WITH usage_over_ninety AS ( + SELECT time, + hostname + FROM "%s"."cpu" + WHERE measure_name = 'usage_user' AND measure_value::double > 90 + AND time >= '%s' AND time < '%s' + %s + ) + SELECT * + FROM "%s"."cpu" a + JOIN usage_over_ninety b ON a.hostname = b.hostname AND a.time = b.time`, + d.DBName, + interval.Start().Format(goTimeFmt), + interval.End().Format(goTimeFmt), + hostWhereClause, + d.DBName) + } else { + sql = fmt.Sprintf(` WITH usage_over_ninety AS ( SELECT time, hostname @@ -234,13 +318,12 @@ func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { SELECT * FROM "%s"."cpu" a JOIN usage_over_ninety b ON a.hostname = b.hostname AND a.time = b.time`, - d.DBName, - interval.Start().Format(goTimeFmt), - interval.End().Format(goTimeFmt), - hostWhereClause, - d.DBName, - ) - + d.DBName, + interval.Start().Format(goTimeFmt), + interval.End().Format(goTimeFmt), + hostWhereClause, + d.DBName) + } humanLabel, err := devops.GetHighCPULabel("Timestream", nHosts) panicIfErr(err) humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) diff --git a/cmd/tsbs_generate_queries/databases/timestream/devops_single_measure_test.go b/cmd/tsbs_generate_queries/databases/timestream/devops_single_measure_test.go new file mode 100644 index 000000000..dbecb6f7b --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/timestream/devops_single_measure_test.go @@ -0,0 +1,404 @@ +package timestream + +import ( + // "fmt" + "github.com/andreyvit/diff" + "github.com/timescale/tsbs/pkg/query" + "math/rand" + "strings" + "testing" + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" +) +func TestDevopsSingleMeasureGetMeasureNameWhere(t *testing.T) { + cases := []struct { + desc string + measureNames []string + want string + }{ + { + desc: "single measure", + measureNames: []string{"foo1"}, + want: "(measure_name = 'foo1')", + }, + { + desc: "multi host", + measureNames: []string{"foo1", "foo2"}, + want: "(measure_name = 'foo1' OR measure_name = 'foo2')", + }, + } + + for _, c := range cases { + b := BaseGenerator{SingleMeasure: true} + dq, err := b.NewDevops(time.Now(), time.Now(), 10) + if err != nil { + t.Fatalf("Error while creating devops generator") + } + d := dq.(*Devops) + + if got := d.getMeasureNameWhereString(c.measureNames); got != c.want { + t.Errorf("%s: incorrect output: got %s want %s", c.desc, got, c.want) + } + } +} + +func TestDevopsSingleMeasureGetSelectClausesAggMetrics(t *testing.T) { + cases := []struct { + desc string + agg string + metrics []string + want string + }{ + { + desc: "single metric - max", + agg: "max", + metrics: []string{"foo"}, + want: "max(case when measure_name = 'foo' THEN measure_value::double ELSE NULL END) as max_foo", + }, + { + desc: "multiple metric - max", + agg: "max", + metrics: []string{"foo", "bar"}, + want: "max(case when measure_name = 'foo' THEN measure_value::double ELSE NULL END) as max_foo," + + "max(case when measure_name = 'bar' THEN measure_value::double ELSE NULL END) as max_bar", + }, + { + desc: "multiple metric - avg", + agg: "avg", + metrics: []string{"foo", "bar"}, + want: "avg(case when measure_name = 'foo' THEN measure_value::double ELSE NULL END) as avg_foo," + + "avg(case when measure_name = 'bar' THEN measure_value::double ELSE NULL END) as avg_bar", + }, + } + + for _, c := range cases { + b := BaseGenerator{SingleMeasure: true} + dq, err := b.NewDevops(time.Now(), time.Now(), 10) + if err != nil { + t.Fatalf("Error while creating devops generator") + } + d := dq.(*Devops) + + if got := strings.Join(d.getSelectClausesAggMetrics(c.agg, c.metrics), ","); got != c.want { + t.Errorf("%s: incorrect output: got %s want %s", c.desc, got, c.want) + } + } +} + +func TestDevopsSingleMeasureGroupByTime(t *testing.T) { + expectedHumanLabel := "Timestream 1 cpu metric(s), random 1 hosts, random 1s by 1m" + expectedHumanDesc := "Timestream 1 cpu metric(s), random 1 hosts, random 1s by 1m: 1970-01-01T00:05:58Z" + expectedTable := "cpu" + expectedSQLQuery := `SELECT bin(time, 60s) AS minute, + max(case when measure_name = 'usage_user' THEN measure_value::double ELSE NULL END) as max_usage_user + FROM "db"."cpu" + WHERE (measure_name = 'usage_user') AND (hostname = 'host_9') AND time >= '1970-01-01 00:05:58.646325 +0000' AND time < '1970-01-01 00:05:59.646325 +0000' + GROUP BY 1 ORDER BY 1 ASC` + + rand.Seed(123) // Setting seed for testing purposes. + s := time.Unix(0, 0) + e := s.Add(time.Hour) + b := BaseGenerator{DBName: "db", SingleMeasure: true} + dq, err := b.NewDevops(s, e, 10) + if err != nil { + t.Fatalf("Error while creating devops generator") + } + d := dq.(*Devops) + + metrics := 1 + nHosts := 1 + duration := time.Second + + q := d.GenerateEmptyQuery() + d.GroupByTime(q, nHosts, metrics, duration) + + verifySingleMeasureQuery(t, q, expectedHumanLabel, expectedHumanDesc, expectedTable, expectedSQLQuery) +} + +func TestSingleMeasureGroupByOrderByLimit(t *testing.T) { + expectedHumanLabel := "Timestream max cpu over last 5 min-intervals (random end)" + expectedHumanDesc := "Timestream max cpu over last 5 min-intervals (random end): 1970-01-01T01:16:22Z" + expectedTable := "cpu" + expectedSQLQuery := `SELECT bin(time, 60s) AS minute, max(measure_value::double) as max_usage_user + FROM "b"."cpu" + WHERE time < '1970-01-01 01:16:22.646325 +0000' AND measure_name = 'usage_user' + GROUP BY 1 + ORDER BY 1 DESC + LIMIT 5` + + rand.Seed(123) // Setting seed for testing purposes. + s := time.Unix(0, 0) + e := s.Add(2 * time.Hour) + b := BaseGenerator{DBName: "b", SingleMeasure: true} + dq, err := b.NewDevops(s, e, 10) + if err != nil { + t.Fatalf("Error while creating devops generator") + } + d := dq.(*Devops) + + q := d.GenerateEmptyQuery() + d.GroupByOrderByLimit(q) + + verifySingleMeasureQuery(t, q, expectedHumanLabel, expectedHumanDesc, expectedTable, expectedSQLQuery) +} + +func TestSingleMeasureGroupByTimeAndPrimaryTag(t *testing.T) { + cases := []struct { + desc string + expectedHumanLabel string + expectedHumanDesc string + expectedTable string + expectedSQLQuery string + numMetrics int + }{ + { + desc: "1 metric", + expectedHumanLabel: "Timestream mean of 1 metrics, all hosts, random 12h0m0s by 1h", + expectedHumanDesc: "Timestream mean of 1 metrics, all hosts, random 12h0m0s by 1h: 1970-01-01T00:16:22Z", + expectedTable: "cpu", + expectedSQLQuery: ` + SELECT bin(time, 3600s) as hour, + hostname, + avg (case when measure_name = 'usage_user' THEN measure_value::double ELSE NULL END) as mean_usage_user + FROM "b"."cpu" + WHERE time >= '1970-01-01 00:16:22.646325 +0000' AND time < '1970-01-01 12:16:22.646325 +0000' + GROUP BY 1, 2`, + numMetrics: 1, + }, { + desc: "3 metric", + expectedHumanLabel: "Timestream mean of 3 metrics, all hosts, random 12h0m0s by 1h", + expectedHumanDesc: "Timestream mean of 3 metrics, all hosts, random 12h0m0s by 1h: 1970-01-01T00:54:10Z", + expectedTable: "cpu", + expectedSQLQuery: ` + SELECT bin(time, 3600s) as hour, + hostname, + avg (case when measure_name = 'usage_user' THEN measure_value::double ELSE NULL END) as mean_usage_user, + avg (case when measure_name = 'usage_system' THEN measure_value::double ELSE NULL END) as mean_usage_system, + avg (case when measure_name = 'usage_idle' THEN measure_value::double ELSE NULL END) as mean_usage_idle + FROM "b"."cpu" + WHERE time >= '1970-01-01 00:54:10.138978 +0000' AND time < '1970-01-01 12:54:10.138978 +0000' + GROUP BY 1, 2`, + numMetrics: 3, + }, + } + + rand.Seed(123) // Setting seed for testing purposes. + s := time.Unix(0, 0) + e := s.Add(devops.DoubleGroupByDuration).Add(time.Hour) + + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + b := BaseGenerator{DBName: "b",SingleMeasure: true} + dq, err := b.NewDevops(s, e, 10) + if err != nil { + t.Fatalf("Error while creating devops generator") + } + d := dq.(*Devops) + + q := d.GenerateEmptyQuery() + d.GroupByTimeAndPrimaryTag(q, c.numMetrics) + + verifySingleMeasureQuery(t, q, c.expectedHumanLabel, c.expectedHumanDesc, c.expectedTable, c.expectedSQLQuery) + }) + } +} + +func TestSingleMeasureMaxAllCPU(t *testing.T) { + expectedHumanLabel := "Timestream max of all CPU metrics, random 1 hosts, random 8h0m0s by 1h" + expectedHumanDesc := "Timestream max of all CPU metrics, random 1 hosts, random 8h0m0s by 1h: 1970-01-01T00:16:22Z" + expectedTable := "cpu" + expectedSQLQuery := `SELECT bin(time, 3600s) AS hour, + max(case when measure_name = 'usage_user' THEN measure_value::double ELSE NULL END) as max_usage_user, + max(case when measure_name = 'usage_system' THEN measure_value::double ELSE NULL END) as max_usage_system, + max(case when measure_name = 'usage_idle' THEN measure_value::double ELSE NULL END) as max_usage_idle, + max(case when measure_name = 'usage_nice' THEN measure_value::double ELSE NULL END) as max_usage_nice, + max(case when measure_name = 'usage_iowait' THEN measure_value::double ELSE NULL END) as max_usage_iowait, + max(case when measure_name = 'usage_irq' THEN measure_value::double ELSE NULL END) as max_usage_irq, + max(case when measure_name = 'usage_softirq' THEN measure_value::double ELSE NULL END) as max_usage_softirq, + max(case when measure_name = 'usage_steal' THEN measure_value::double ELSE NULL END) as max_usage_steal, + max(case when measure_name = 'usage_guest' THEN measure_value::double ELSE NULL END) as max_usage_guest, + max(case when measure_name = 'usage_guest_nice' THEN measure_value::double ELSE NULL END) as max_usage_guest_nice + FROM "b"."cpu" + WHERE (hostname = 'host_9') AND time >= '1970-01-01 00:16:22.646325 +0000' AND time < '1970-01-01 08:16:22.646325 +0000' + GROUP BY 1 ORDER BY 1` + rand.Seed(123) // Setting seed for testing purposes. + s := time.Unix(0, 0) + e := s.Add(devops.MaxAllDuration).Add(time.Hour) + + b := BaseGenerator{DBName: "b", SingleMeasure: true} + dq, err := b.NewDevops(s, e, 10) + if err != nil { + t.Fatalf("Error while creating devops generator") + } + d := dq.(*Devops) + + q := d.GenerateEmptyQuery() + d.MaxAllCPU(q, 1) + verifySingleMeasureQuery(t, q, expectedHumanLabel, expectedHumanDesc, expectedTable, expectedSQLQuery) +} + +func TestSingleMeasureLastPointPerHost(t *testing.T) { + cases := []struct { + desc string + expectedHumanLabel string + expectedHumanDesc string + expectedTable string + expectedSQLQuery string + }{ + { + desc: "last recorded value per host", + expectedHumanLabel: "Timestream last row per host", + expectedHumanDesc: "Timestream last row per host", + expectedTable: "cpu", + expectedSQLQuery: ` + WITH latest_recorded_time AS ( + SELECT + hostname, + measure_name, + max(time) as latest_time + FROM "b"."cpu" + GROUP BY 1, 2 + ) + SELECT b.hostname, + b.measure_name, + b.measure_value::double, + b.time + FROM latest_recorded_time a + JOIN "b"."cpu" b + ON a.hostname = b.hostname AND a.latest_time = b.time AND a.measure_name = b.measure_name + ORDER BY hostname, measure_name`, + }, + } + + rand.Seed(123) // Setting seed for testing purposes. + + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + b := BaseGenerator{DBName: "b", SingleMeasure: true} + dq, err := b.NewDevops(time.Now(), time.Now(), 10) + if err != nil { + t.Fatalf("Error while creating devops generator") + } + d := dq.(*Devops) + + q := d.GenerateEmptyQuery() + d.LastPointPerHost(q) + verifySingleMeasureQuery(t, q, c.expectedHumanLabel, c.expectedHumanDesc, c.expectedTable, c.expectedSQLQuery) + }) + } +} + +func TestSingleMeasureHighCPUForHosts(t *testing.T) { + cases := []struct { + desc string + nHosts int + expectedHumanLabel string + expectedHumanDesc string + expectedHypertable string + expectedSQLQuery string + }{ + { + desc: "zero hosts", + nHosts: 0, + expectedHumanLabel: "Timestream CPU over threshold, all hosts", + expectedHumanDesc: "Timestream CPU over threshold, all hosts: 1970-01-01T00:16:22Z", + expectedHypertable: "cpu", + expectedSQLQuery: ` + WITH usage_over_ninety AS ( + SELECT time, + hostname + FROM "b"."cpu" + WHERE measure_name = 'usage_user' AND measure_value::double > 90 + AND time >= '1970-01-01 00:16:22.646325 +0000' AND time < '1970-01-01 12:16:22.646325 +0000' + + ) + SELECT * + FROM "b"."cpu" a + JOIN usage_over_ninety b ON a.hostname = b.hostname AND a.time = b.time`, + }, + { + desc: "one host", + nHosts: 1, + expectedHumanLabel: "Timestream CPU over threshold, 1 host(s)", + expectedHumanDesc: "Timestream CPU over threshold, 1 host(s): 1970-01-01T00:47:30Z", + expectedHypertable: "cpu", + expectedSQLQuery: ` + WITH usage_over_ninety AS ( + SELECT time, + hostname + FROM "b"."cpu" + WHERE measure_name = 'usage_user' AND measure_value::double > 90 + AND time >= '1970-01-01 00:47:30.894865 +0000' AND time < '1970-01-01 12:47:30.894865 +0000' + AND (hostname = 'host_9') + ) + SELECT * + FROM "b"."cpu" a + JOIN usage_over_ninety b ON a.hostname = b.hostname AND a.time = b.time`, + }, + { + desc: "five hosts", + nHosts: 5, + expectedHumanLabel: "Timestream CPU over threshold, 5 host(s)", + expectedHumanDesc: "Timestream CPU over threshold, 5 host(s): 1970-01-01T00:08:59Z", + expectedHypertable: "cpu", + expectedSQLQuery: ` + WITH usage_over_ninety AS ( + SELECT time, + hostname + FROM "b"."cpu" + WHERE measure_name = 'usage_user' AND measure_value::double > 90 + AND time >= '1970-01-01 00:08:59.080812 +0000' AND time < '1970-01-01 12:08:59.080812 +0000' + AND (hostname = 'host_5' OR hostname = 'host_9' OR hostname = 'host_1' OR hostname = 'host_7' OR hostname = 'host_2') + ) + SELECT * + FROM "b"."cpu" a + JOIN usage_over_ninety b ON a.hostname = b.hostname AND a.time = b.time`, + }, + } + + rand.Seed(123) // Setting seed for testing purposes. + s := time.Unix(0, 0) + e := s.Add(devops.HighCPUDuration).Add(time.Hour) + + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + b := BaseGenerator{DBName: "b", SingleMeasure: true} + dq, err := b.NewDevops(s, e, 10) + if err != nil { + t.Fatalf("Error while creating devops generator") + } + d := dq.(*Devops) + + q := d.GenerateEmptyQuery() + d.HighCPUForHosts(q, c.nHosts) + + verifySingleMeasureQuery(t, q, c.expectedHumanLabel, c.expectedHumanDesc, c.expectedHypertable, c.expectedSQLQuery) + }) + } +} + + +func verifySingleMeasureQuery(t *testing.T, q query.Query, humanLabel, humanDesc, table, sqlQuery string) { + tsq, ok := q.(*query.Timestream) + + if !ok { + t.Fatal("Filled query is not *query.TimescaleDB type") + } + + if got := string(tsq.HumanLabel); got != humanLabel { + t.Errorf("incorrect human label:\ngot\n%s\nwant\n%s", got, humanLabel) + } + + if got := string(tsq.HumanDescription); got != humanDesc { + t.Errorf("incorrect human description:\ngot\n%s\nwant\n%s", got, humanDesc) + } + + if got := string(tsq.Table); got != table { + t.Errorf("incorrect table:\ngot\n%s\nwant\n%s", got, table) + } + + if got := string(tsq.SqlQuery); got != sqlQuery { + t.Errorf("incorrect SQL query:\ndiff\n%s\ngot\n%s\nwant\n%s", diff.CharacterDiff(got, sqlQuery), got, sqlQuery) + } +} diff --git a/cmd/tsbs_generate_queries/databases/timestream/devops_test.go b/cmd/tsbs_generate_queries/databases/timestream/devops_test.go index 8f173d9c7..d697dee1a 100644 --- a/cmd/tsbs_generate_queries/databases/timestream/devops_test.go +++ b/cmd/tsbs_generate_queries/databases/timestream/devops_test.go @@ -2,18 +2,17 @@ package timestream import ( "fmt" - "github.com/andreyvit/diff" - "github.com/timescale/tsbs/pkg/query" "math/rand" "strings" "testing" "time" + "github.com/andreyvit/diff" "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/timescale/tsbs/pkg/query" ) -// getMeasureNameWhereString -func TestDevopsGetMeasureNameWhereString(t *testing.T) { +func TestDevopsGetHostWhereWithHostnames(t *testing.T) { cases := []struct { desc string hostnames []string @@ -35,7 +34,7 @@ func TestDevopsGetMeasureNameWhereString(t *testing.T) { b := BaseGenerator{} dq, err := b.NewDevops(time.Now(), time.Now(), 10) if err != nil { - t.Fatalf("Error while creating devops generator") + t.Fatalf("Error while creating devops generator: %v", err) } d := dq.(*Devops) @@ -45,37 +44,6 @@ func TestDevopsGetMeasureNameWhereString(t *testing.T) { } } -func TestDevopsGetMeasureNameWhere(t *testing.T) { - cases := []struct { - desc string - measureNames []string - want string - }{ - { - desc: "single measure", - measureNames: []string{"foo1"}, - want: "(measure_name = 'foo1')", - }, - { - desc: "multi host", - measureNames: []string{"foo1", "foo2"}, - want: "(measure_name = 'foo1' OR measure_name = 'foo2')", - }, - } - - for _, c := range cases { - b := BaseGenerator{} - dq, err := b.NewDevops(time.Now(), time.Now(), 10) - if err != nil { - t.Fatalf("Error while creating devops generator") - } - d := dq.(*Devops) - - if got := d.getMeasureNameWhereString(c.measureNames); got != c.want { - t.Errorf("%s: incorrect output: got %s want %s", c.desc, got, c.want) - } - } -} func TestDevopsGetHostWhereString(t *testing.T) { cases := []struct { nHosts int @@ -100,7 +68,7 @@ func TestDevopsGetHostWhereString(t *testing.T) { b := BaseGenerator{} dq, err := b.NewDevops(time.Now(), time.Now(), 10) if err != nil { - t.Fatalf("Error while creating devops generator") + t.Fatalf("Error while creating devops generator: %v", err) } d := dq.(*Devops) @@ -114,7 +82,7 @@ func TestDevopsGetTimeBucket(t *testing.T) { b := BaseGenerator{} dq, err := b.NewDevops(time.Now(), time.Now(), 10) if err != nil { - t.Fatalf("Error while creating devops generator") + t.Fatalf("Error while creating devops generator: %v", err) } d := dq.(*Devops) @@ -136,21 +104,19 @@ func TestDevopsGetSelectClausesAggMetrics(t *testing.T) { desc: "single metric - max", agg: "max", metrics: []string{"foo"}, - want: "max(case when measure_name = 'foo' THEN measure_value::double ELSE NULL END) as max_foo", + want: "max(foo) as max_foo", }, { - desc: "multiple metric - max", + desc: "multiple metrics - max", agg: "max", metrics: []string{"foo", "bar"}, - want: "max(case when measure_name = 'foo' THEN measure_value::double ELSE NULL END) as max_foo," + - "max(case when measure_name = 'bar' THEN measure_value::double ELSE NULL END) as max_bar", + want: "max(foo) as max_foo,max(bar) as max_bar", }, { - desc: "multiple metric - avg", + desc: "multiple metrics - avg", agg: "avg", metrics: []string{"foo", "bar"}, - want: "avg(case when measure_name = 'foo' THEN measure_value::double ELSE NULL END) as avg_foo," + - "avg(case when measure_name = 'bar' THEN measure_value::double ELSE NULL END) as avg_bar", + want: "avg(foo) as avg_foo,avg(bar) as avg_bar", }, } @@ -158,11 +124,14 @@ func TestDevopsGetSelectClausesAggMetrics(t *testing.T) { b := BaseGenerator{} dq, err := b.NewDevops(time.Now(), time.Now(), 10) if err != nil { - t.Fatalf("Error while creating devops generator") + t.Fatalf("Error while creating devops generator: %v", err) } d := dq.(*Devops) - if got := strings.Join(d.getSelectClausesAggMetrics(c.agg, c.metrics), ","); got != c.want { + clauses := d.getSelectClausesAggMetrics(c.agg, c.metrics) + // Remove any newlines for comparison. + got := strings.ReplaceAll(strings.Join(clauses, ","), "\n", "") + if got != c.want { t.Errorf("%s: incorrect output: got %s want %s", c.desc, got, c.want) } } @@ -173,18 +142,18 @@ func TestDevopsGroupByTime(t *testing.T) { expectedHumanDesc := "Timestream 1 cpu metric(s), random 1 hosts, random 1s by 1m: 1970-01-01T00:05:58Z" expectedTable := "cpu" expectedSQLQuery := `SELECT bin(time, 60s) AS minute, - max(case when measure_name = 'usage_user' THEN measure_value::double ELSE NULL END) as max_usage_user + max(usage_user) as max_usage_user FROM "db"."cpu" - WHERE (measure_name = 'usage_user') AND (hostname = 'host_9') AND time >= '1970-01-01 00:05:58.646325 +0000' AND time < '1970-01-01 00:05:59.646325 +0000' + WHERE (hostname = 'host_9') AND time >= '1970-01-01 00:05:58.646325 +0000' AND time < '1970-01-01 00:05:59.646325 +0000' GROUP BY 1 ORDER BY 1 ASC` - rand.Seed(123) // Setting seed for testing purposes. + rand.Seed(123) s := time.Unix(0, 0) e := s.Add(time.Hour) b := BaseGenerator{DBName: "db"} dq, err := b.NewDevops(s, e, 10) if err != nil { - t.Fatalf("Error while creating devops generator") + t.Fatalf("Error while creating devops generator: %v", err) } d := dq.(*Devops) @@ -202,22 +171,20 @@ func TestGroupByOrderByLimit(t *testing.T) { expectedHumanLabel := "Timestream max cpu over last 5 min-intervals (random end)" expectedHumanDesc := "Timestream max cpu over last 5 min-intervals (random end): 1970-01-01T01:16:22Z" expectedTable := "cpu" - expectedSQLQuery := `SELECT bin(time, 60s) AS minute, max(measure_value::double) as max_usage_user + expectedSQLQuery := `SELECT bin(time, 60s) AS minute, max(usage_user) as max_usage_user FROM "b"."cpu" - WHERE time < '1970-01-01 01:16:22.646325 +0000' AND measure_name = 'usage_user' + WHERE time < '1970-01-01 01:16:22.646325 +0000' GROUP BY 1 ORDER BY 1 DESC LIMIT 5` - rand.Seed(123) // Setting seed for testing purposes. + rand.Seed(123) s := time.Unix(0, 0) e := s.Add(2 * time.Hour) - b := BaseGenerator{ - DBName: "b", - } + b := BaseGenerator{DBName: "b"} dq, err := b.NewDevops(s, e, 10) if err != nil { - t.Fatalf("Error while creating devops generator") + t.Fatalf("Error while creating devops generator: %v", err) } d := dq.(*Devops) @@ -244,12 +211,13 @@ func TestGroupByTimeAndPrimaryTag(t *testing.T) { expectedSQLQuery: ` SELECT bin(time, 3600s) as hour, hostname, - avg (case when measure_name = 'usage_user' THEN measure_value::double ELSE NULL END) as mean_usage_user + avg(usage_user) as mean_usage_user FROM "b"."cpu" WHERE time >= '1970-01-01 00:16:22.646325 +0000' AND time < '1970-01-01 12:16:22.646325 +0000' GROUP BY 1, 2`, numMetrics: 1, - }, { + }, + { desc: "3 metric", expectedHumanLabel: "Timestream mean of 3 metrics, all hosts, random 12h0m0s by 1h", expectedHumanDesc: "Timestream mean of 3 metrics, all hosts, random 12h0m0s by 1h: 1970-01-01T00:54:10Z", @@ -257,9 +225,9 @@ func TestGroupByTimeAndPrimaryTag(t *testing.T) { expectedSQLQuery: ` SELECT bin(time, 3600s) as hour, hostname, - avg (case when measure_name = 'usage_user' THEN measure_value::double ELSE NULL END) as mean_usage_user, - avg (case when measure_name = 'usage_system' THEN measure_value::double ELSE NULL END) as mean_usage_system, - avg (case when measure_name = 'usage_idle' THEN measure_value::double ELSE NULL END) as mean_usage_idle + avg(usage_user) as mean_usage_user, + avg(usage_system) as mean_usage_system, + avg(usage_idle) as mean_usage_idle FROM "b"."cpu" WHERE time >= '1970-01-01 00:54:10.138978 +0000' AND time < '1970-01-01 12:54:10.138978 +0000' GROUP BY 1, 2`, @@ -267,18 +235,16 @@ func TestGroupByTimeAndPrimaryTag(t *testing.T) { }, } - rand.Seed(123) // Setting seed for testing purposes. + rand.Seed(123) s := time.Unix(0, 0) e := s.Add(devops.DoubleGroupByDuration).Add(time.Hour) for _, c := range cases { t.Run(c.desc, func(t *testing.T) { - b := BaseGenerator{ - DBName: "b", - } + b := BaseGenerator{DBName: "b"} dq, err := b.NewDevops(s, e, 10) if err != nil { - t.Fatalf("Error while creating devops generator") + t.Fatalf("Error while creating devops generator: %v", err) } d := dq.(*Devops) @@ -295,29 +261,28 @@ func TestMaxAllCPU(t *testing.T) { expectedHumanDesc := "Timestream max of all CPU metrics, random 1 hosts, random 8h0m0s by 1h: 1970-01-01T00:16:22Z" expectedTable := "cpu" expectedSQLQuery := `SELECT bin(time, 3600s) AS hour, - max(case when measure_name = 'usage_user' THEN measure_value::double ELSE NULL END) as max_usage_user, - max(case when measure_name = 'usage_system' THEN measure_value::double ELSE NULL END) as max_usage_system, - max(case when measure_name = 'usage_idle' THEN measure_value::double ELSE NULL END) as max_usage_idle, - max(case when measure_name = 'usage_nice' THEN measure_value::double ELSE NULL END) as max_usage_nice, - max(case when measure_name = 'usage_iowait' THEN measure_value::double ELSE NULL END) as max_usage_iowait, - max(case when measure_name = 'usage_irq' THEN measure_value::double ELSE NULL END) as max_usage_irq, - max(case when measure_name = 'usage_softirq' THEN measure_value::double ELSE NULL END) as max_usage_softirq, - max(case when measure_name = 'usage_steal' THEN measure_value::double ELSE NULL END) as max_usage_steal, - max(case when measure_name = 'usage_guest' THEN measure_value::double ELSE NULL END) as max_usage_guest, - max(case when measure_name = 'usage_guest_nice' THEN measure_value::double ELSE NULL END) as max_usage_guest_nice + max(usage_user) as max_usage_user, + max(usage_system) as max_usage_system, + max(usage_idle) as max_usage_idle, + max(usage_nice) as max_usage_nice, + max(usage_iowait) as max_usage_iowait, + max(usage_irq) as max_usage_irq, + max(usage_softirq) as max_usage_softirq, + max(usage_steal) as max_usage_steal, + max(usage_guest) as max_usage_guest, + max(usage_guest_nice) as max_usage_guest_nice FROM "b"."cpu" WHERE (hostname = 'host_9') AND time >= '1970-01-01 00:16:22.646325 +0000' AND time < '1970-01-01 08:16:22.646325 +0000' GROUP BY 1 ORDER BY 1` - rand.Seed(123) // Setting seed for testing purposes. + + rand.Seed(123) s := time.Unix(0, 0) e := s.Add(devops.MaxAllDuration).Add(time.Hour) - b := BaseGenerator{ - DBName: "b", - } + b := BaseGenerator{DBName: "b"} dq, err := b.NewDevops(s, e, 10) if err != nil { - t.Fatalf("Error while creating devops generator") + t.Fatalf("Error while creating devops generator: %v", err) } d := dq.(*Devops) @@ -327,56 +292,34 @@ func TestMaxAllCPU(t *testing.T) { } func TestLastPointPerHost(t *testing.T) { - cases := []struct { - desc string - expectedHumanLabel string - expectedHumanDesc string - expectedTable string - expectedSQLQuery string - }{ - { - desc: "last recorded value per host", - expectedHumanLabel: "Timestream last row per host", - expectedHumanDesc: "Timestream last row per host", - expectedTable: "cpu", - expectedSQLQuery: ` + expectedHumanLabel := "Timestream last row per host" + expectedHumanDesc := "Timestream last row per host" + expectedTable := "cpu" + expectedSQLQuery := ` WITH latest_recorded_time AS ( SELECT hostname, - measure_name, max(time) as latest_time FROM "b"."cpu" - GROUP BY 1, 2 + GROUP BY hostname ) - SELECT b.hostname, - b.measure_name, - b.measure_value::double, - b.time + SELECT b.* FROM latest_recorded_time a JOIN "b"."cpu" b - ON a.hostname = b.hostname AND a.latest_time = b.time AND a.measure_name = b.measure_name - ORDER BY hostname, measure_name`, - }, - } - - rand.Seed(123) // Setting seed for testing purposes. - - for _, c := range cases { - t.Run(c.desc, func(t *testing.T) { - b := BaseGenerator{ - DBName: "b", - } - dq, err := b.NewDevops(time.Now(), time.Now(), 10) - if err != nil { - t.Fatalf("Error while creating devops generator") - } - d := dq.(*Devops) + ON a.hostname = b.hostname AND a.latest_time = b.time + ORDER BY hostname` - q := d.GenerateEmptyQuery() - d.LastPointPerHost(q) - verifyQuery(t, q, c.expectedHumanLabel, c.expectedHumanDesc, c.expectedTable, c.expectedSQLQuery) - }) + rand.Seed(123) + b := BaseGenerator{DBName: "b"} + dq, err := b.NewDevops(time.Now(), time.Now(), 10) + if err != nil { + t.Fatalf("Error while creating devops generator: %v", err) } + d := dq.(*Devops) + + q := d.GenerateEmptyQuery() + d.LastPointPerHost(q) + verifyQuery(t, q, expectedHumanLabel, expectedHumanDesc, expectedTable, expectedSQLQuery) } func TestHighCPUForHosts(t *testing.T) { @@ -399,7 +342,7 @@ func TestHighCPUForHosts(t *testing.T) { SELECT time, hostname FROM "b"."cpu" - WHERE measure_name = 'usage_user' AND measure_value::double > 90 + WHERE usage_user > 90 AND time >= '1970-01-01 00:16:22.646325 +0000' AND time < '1970-01-01 12:16:22.646325 +0000' ) @@ -418,7 +361,7 @@ func TestHighCPUForHosts(t *testing.T) { SELECT time, hostname FROM "b"."cpu" - WHERE measure_name = 'usage_user' AND measure_value::double > 90 + WHERE usage_user > 90 AND time >= '1970-01-01 00:47:30.894865 +0000' AND time < '1970-01-01 12:47:30.894865 +0000' AND (hostname = 'host_9') ) @@ -437,7 +380,7 @@ func TestHighCPUForHosts(t *testing.T) { SELECT time, hostname FROM "b"."cpu" - WHERE measure_name = 'usage_user' AND measure_value::double > 90 + WHERE usage_user > 90 AND time >= '1970-01-01 00:08:59.080812 +0000' AND time < '1970-01-01 12:08:59.080812 +0000' AND (hostname = 'host_5' OR hostname = 'host_9' OR hostname = 'host_1' OR hostname = 'host_7' OR hostname = 'host_2') ) @@ -447,7 +390,7 @@ func TestHighCPUForHosts(t *testing.T) { }, } - rand.Seed(123) // Setting seed for testing purposes. + rand.Seed(123) s := time.Unix(0, 0) e := s.Add(devops.HighCPUDuration).Add(time.Hour) @@ -456,7 +399,7 @@ func TestHighCPUForHosts(t *testing.T) { b := BaseGenerator{DBName: "b"} dq, err := b.NewDevops(s, e, 10) if err != nil { - t.Fatalf("Error while creating devops generator") + t.Fatalf("Error while creating devops generator: %v", err) } d := dq.(*Devops) @@ -470,9 +413,8 @@ func TestHighCPUForHosts(t *testing.T) { func verifyQuery(t *testing.T, q query.Query, humanLabel, humanDesc, table, sqlQuery string) { tsq, ok := q.(*query.Timestream) - if !ok { - t.Fatal("Filled query is not *query.TimescaleDB type") + t.Fatal("Filled query is not *query.Timestream type") } if got := string(tsq.HumanLabel); got != humanLabel { @@ -488,6 +430,7 @@ func verifyQuery(t *testing.T, q query.Query, humanLabel, humanDesc, table, sqlQ } if got := string(tsq.SqlQuery); got != sqlQuery { - t.Errorf("incorrect SQL query:\ndiff\n%s\ngot\n%s\nwant\n%s", diff.CharacterDiff(got, sqlQuery), got, sqlQuery) + t.Errorf("incorrect SQL query:\ndiff\n%s\ngot\n%s\nwant\n%s", + diff.CharacterDiff(got, sqlQuery), got, sqlQuery) } } diff --git a/cmd/tsbs_run_queries_timestream/main.go b/cmd/tsbs_run_queries_timestream/main.go index d65411e0f..3934bbc23 100644 --- a/cmd/tsbs_run_queries_timestream/main.go +++ b/cmd/tsbs_run_queries_timestream/main.go @@ -1,15 +1,17 @@ // tsbs_run_queries_timestream speed tests Timestream using requests from stdin or file // // It reads encoded Query objects from stdin or file, and makes concurrent requests -// to the a Timestream database encoded in the queries themselves, only the AWS region is -// required, and valid AWS credentials to be stored in .aws/credentials. +// to a Timestream database encoded in the queries themselves. Only the AWS region is +// required, and valid AWS credentials stored in .aws/credentials. // This program has no knowledge of the internals of the endpoint. package main import ( + "context" "encoding/json" "fmt" - "github.com/aws/aws-sdk-go/service/timestreamquery" + "github.com/aws/aws-sdk-go-v2/service/timestreamquery" + "github.com/aws/aws-sdk-go-v2/service/timestreamquery/types" "github.com/timescale/tsbs/pkg/targets/timestream" "time" @@ -83,14 +85,22 @@ func mapRows(page *timestreamquery.QueryOutput) []map[string]string { rowAsMap := make(map[string]string) for i, val := range row.Data { colName := cols[i].Name - rowAsMap[*colName] = val.String() + rowAsMap[*colName] = datumToString(val) } - rows = append(rows, rowAsMap) } return rows } +// datumToString converts a Datum to its string representation. +// For scalar values, it returns the underlying string. +func datumToString(d types.Datum) string { + if d.ScalarValue != nil { + return *d.ScalarValue + } + return "" +} + type queryExecutorOptions struct { showExplain bool debug bool @@ -99,7 +109,7 @@ type queryExecutorOptions struct { type processor struct { _opts *queryExecutorOptions - _readSvc *timestreamquery.TimestreamQuery + _readSvc *timestreamquery.Client } func newProcessor() query.Processor { @@ -107,11 +117,11 @@ func newProcessor() query.Processor { } func (p *processor) Init(_ int) { - awsSession, err := timestream.OpenAWSSession(&awsRegion, queryTimeout) + awsCfg, err := timestream.OpenAWSSession(&awsRegion, queryTimeout) if err != nil { panic("could not open aws session") } - p._readSvc = timestreamquery.New(awsSession) + p._readSvc = timestreamquery.NewFromConfig(awsCfg) p._opts = &queryExecutorOptions{ debug: runner.DebugLevel() > 0, printResponse: runner.DoPrintResponses(), @@ -133,21 +143,20 @@ func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) { } totalRows := 0 pageNum := 1 - err := p._readSvc.QueryPages(queryInput, - func(page *timestreamquery.QueryOutput, lastPage bool) bool { - // process query response - // making sure all the returned data is read - totalRows += len(page.Rows) - if p._opts.printResponse { - prettyPrintResponse(qry, page, pageNum) - } - pageNum++ - // return true to continue to next page - return true - }) - if err != nil { - return nil, err + + paginator := timestreamquery.NewQueryPaginator(p._readSvc, queryInput) + for paginator.HasMorePages() { + page, err := paginator.NextPage(context.Background()) + if err != nil { + return nil, err + } + totalRows += len(page.Rows) + if p._opts.printResponse { + prettyPrintResponse(qry, page, pageNum) + } + pageNum++ } + if p._opts.debug { fmt.Printf("Total rows: %d\n", totalRows) } @@ -155,5 +164,5 @@ func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) { stat := query.GetStat() stat.Init(q.HumanLabelName(), took) - return []*query.Stat{stat}, err + return []*query.Stat{stat}, nil } diff --git a/go.mod b/go.mod index 14918818f..7e18d8ec6 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,10 @@ require ( github.com/SiriDB/go-siridb-connector v0.0.0-20190110105621-86b34c44c921 github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 github.com/aws/aws-sdk-go v1.55.6 + github.com/aws/aws-sdk-go-v2 v1.36.1 + github.com/aws/aws-sdk-go-v2/config v1.29.6 + github.com/aws/aws-sdk-go-v2/service/timestreamquery v1.29.9 + github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.29.16 github.com/blagojts/viper v1.6.3-0.20200313094124-068f44cf5e69 github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 github.com/gocql/gocql v0.0.0-20190810123941-df4b9cc33030 @@ -89,10 +93,17 @@ require ( github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a // indirect github.com/asaskevich/govalidator v0.0.0-20200108200545-475eaeb16496 // indirect github.com/aws/aws-lambda-go v1.13.3 // indirect - github.com/aws/aws-sdk-go-v2 v1.36.1 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.59 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.28 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.32 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.32 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2 // indirect github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.13 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.13 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.15 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.14 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.14 // indirect github.com/aws/smithy-go v1.22.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bgentry/speakeasy v0.1.0 // indirect diff --git a/go.sum b/go.sum index ca5accace..b4d965f57 100644 --- a/go.sum +++ b/go.sum @@ -115,9 +115,32 @@ github.com/aws/aws-sdk-go v1.55.6/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQ github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/aws/aws-sdk-go-v2 v1.36.1 h1:iTDl5U6oAhkNPba0e1t1hrwAo02ZMqbrGq4k5JBWM5E= github.com/aws/aws-sdk-go-v2 v1.36.1/go.mod h1:5PMILGVKiW32oDzjj6RU52yrNrDPUHcbZQYr1sM7qmM= +github.com/aws/aws-sdk-go-v2/config v1.29.6 h1:fqgqEKK5HaZVWLQoLiC9Q+xDlSp+1LYidp6ybGE2OGg= +github.com/aws/aws-sdk-go-v2/config v1.29.6/go.mod h1:Ft+WLODzDQmCTHDvqAH1JfC2xxbZ0MxpZAcJqmE1LTQ= +github.com/aws/aws-sdk-go-v2/credentials v1.17.59 h1:9btwmrt//Q6JcSdgJOLI98sdr5p7tssS9yAsGe8aKP4= +github.com/aws/aws-sdk-go-v2/credentials v1.17.59/go.mod h1:NM8fM6ovI3zak23UISdWidyZuI1ghNe2xjzUZAyT+08= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.28 h1:KwsodFKVQTlI5EyhRSugALzsV6mG/SGrdjlMXSZSdso= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.28/go.mod h1:EY3APf9MzygVhKuPXAc5H+MkGb8k/DOSQjWS0LgkKqI= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.32 h1:BjUcr3X3K0wZPGFg2bxOWW3VPN8rkE3/61zhP+IHviA= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.32/go.mod h1:80+OGC/bgzzFFTUmcuwD0lb4YutwQeKLFpmt6hoWapU= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.32 h1:m1GeXHVMJsRsUAqG6HjZWx9dj7F5TR+cF1bjyfYyBd4= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.32/go.mod h1:IitoQxGfaKdVLNg0hD8/DXmAqNy0H4K2H2Sf91ti8sI= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2 h1:Pg9URiobXy85kgFev3og2CuOZ8JZUBENF+dcgWBaYNk= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2 h1:D4oz8/CzT9bAEYtVhSBmFj2dNOtaHOtMKc2vHBwYizA= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2/go.mod h1:Za3IHqTQ+yNcRHxu1OFucBh0ACZT4j4VQFF0BqpZcLY= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.13 h1:eWoHfLIzYeUtJEuoUmD5PwTE+fLaIPN9NZ7UXd9CW0s= github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.13/go.mod h1:x5t8Ve0J7JK9VHKSPSRAdBrWAgr/5hH3UeCFMLoyUGQ= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.13 h1:SYVGSFQHlchIcy6e7x12bsrxClCXSP5et8cqVhL8cuw= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.13/go.mod h1:kizuDaLX37bG5WZaoxGPQR/LNFXpxp0vsUnqfkWXfNE= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.15 h1:/eE3DogBjYlvlbhd2ssWyeuovWunHLxfgw3s/OJa4GQ= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.15/go.mod h1:2PCJYpi7EKeA5SkStAmZlF6fi0uUABuhtF8ILHjGc3Y= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.14 h1:M/zwXiL2iXUrHputuXgmO94TVNmcenPHxgLXLutodKE= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.14/go.mod h1:RVwIw3y/IqxC2YEXSIkAzRDdEU1iRabDPaYjpGCbCGQ= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.14 h1:TzeR06UCMUq+KA3bDkujxK1GVGy+G8qQN/QVYzGLkQE= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.14/go.mod h1:dspXf/oYWGWo6DEvj98wpaTeqt5+DMidZD0A9BYTizc= +github.com/aws/aws-sdk-go-v2/service/timestreamquery v1.29.9 h1:41cG0dszpyQ3q004cSwjKQ7MeIcXp83gO9TkUTAXOf0= +github.com/aws/aws-sdk-go-v2/service/timestreamquery v1.29.9/go.mod h1:AMkvoxnIiNPs/zuuVhQ71o7A8QZi1xAcbNER62Xjs6A= github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.29.16 h1:A6oLifvrpiy020lUUV38xEbAquPHgqRfrtlqleWKYlo= github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.29.16/go.mod h1:pFiao5K15XNf+tdIBEC7UBv/+mX0AJRJbjXyp16zckA= github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= diff --git a/pkg/query/config/config.go b/pkg/query/config/config.go index 86459e93b..0c0d604af 100644 --- a/pkg/query/config/config.go +++ b/pkg/query/config/config.go @@ -27,8 +27,10 @@ type QueryGeneratorConfig struct { ClickhouseUseTags bool `mapstructure:"clickhouse-use-tags"` - MongoUseNaive bool `mapstructure:"mongo-use-native"` - DbName string `mapstructure:"db-name"` + MongoUseNaive bool `mapstructure:"mongo-use-native"` + + DbName string `mapstructure:"db-name"` + TimestreamUseSingleMeasure bool `mapstructure:"timestream-use-single-measure"` } // Validate checks that the values of the QueryGeneratorConfig are reasonable. @@ -62,5 +64,7 @@ func (c *QueryGeneratorConfig) AddToFlagSet(fs *pflag.FlagSet) { fs.Bool("timescale-use-tags", true, "TimescaleDB only: Use separate tags table when querying") fs.Bool("timescale-use-time-bucket", true, "TimescaleDB only: Use time bucket. Set to false to test on native PostgreSQL") - fs.String("db-name", "benchmark", "Specify database name. Timestream requires it in order to generate the queries") + fs.Bool("timestream-use-single-measure", false, + "Timestream only: Use single-measure records. Set to false to generate queries for multi-measure records.") + fs.String("db-name", "benchmark", "Timestream only: Name of database. Timestream requires it in order to generate the queries") } diff --git a/pkg/query/factories/init_factories.go b/pkg/query/factories/init_factories.go index ff3faf47d..c6bf3dfbb 100644 --- a/pkg/query/factories/init_factories.go +++ b/pkg/query/factories/init_factories.go @@ -37,6 +37,7 @@ func InitQueryFactories(config *config.QueryGeneratorConfig) map[string]interfac factories[constants.FormatVictoriaMetrics] = &victoriametrics.BaseGenerator{} factories[constants.FormatTimestream] = ×tream.BaseGenerator{ DBName: config.DbName, + SingleMeasure: config.TimestreamUseSingleMeasure, } factories[constants.FormatQuestDB] = &questdb.BaseGenerator{} return factories diff --git a/pkg/targets/timestream/aws_session.go b/pkg/targets/timestream/aws_session.go index b827433ee..f66164b79 100644 --- a/pkg/targets/timestream/aws_session.go +++ b/pkg/targets/timestream/aws_session.go @@ -1,19 +1,26 @@ package timestream import ( - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "golang.org/x/net/http2" + "context" "net" "net/http" "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/aws/retry" + "golang.org/x/net/http2" ) -func OpenAWSSession(awsRegion *string, timeout time.Duration) (*session.Session, error) { +func OpenAWSSession(awsRegion *string, timeout time.Duration, retryCount ...int) (aws.Config, error) { + maxAttempts := 10 + if len(retryCount) > 0 { + maxAttempts = retryCount[0] + } + tr := &http.Transport{ ResponseHeaderTimeout: 20 * time.Second, - // Using DefaultTransport values for other parameters: https://golang.org/pkg/net/http/#RoundTripper - Proxy: http.ProxyFromEnvironment, + Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{ KeepAlive: 30 * time.Second, Timeout: timeout, @@ -25,10 +32,18 @@ func OpenAWSSession(awsRegion *string, timeout time.Duration) (*session.Session, } if err := http2.ConfigureTransport(tr); err != nil { panic("could not configure http transport: " + err.Error()) + } + client := &http.Client{Transport: tr} + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithHTTPClient(client), + config.WithRegion(*awsRegion), + config.WithRetryer(func() aws.Retryer { + return retry.AddWithMaxAttempts(retry.NewStandard(), maxAttempts) + }), + ) + if err != nil { + return aws.Config{}, err } - return session.NewSession(&aws.Config{ - Region: awsRegion, - MaxRetries: aws.Int(10), - HTTPClient: &http.Client{Transport: tr}}) + return cfg, nil } diff --git a/pkg/targets/timestream/benchmark.go b/pkg/targets/timestream/benchmark.go index 7472f9de7..f0a9b966a 100644 --- a/pkg/targets/timestream/benchmark.go +++ b/pkg/targets/timestream/benchmark.go @@ -3,7 +3,10 @@ package timestream import ( "bufio" "fmt" - "github.com/aws/aws-sdk-go/service/timestreamwrite" + "log" + "time" + + "github.com/aws/aws-sdk-go-v2/service/timestreamwrite" "github.com/pkg/errors" "github.com/timescale/tsbs/internal/inputs" "github.com/timescale/tsbs/load" @@ -11,8 +14,6 @@ import ( "github.com/timescale/tsbs/pkg/data/source" "github.com/timescale/tsbs/pkg/targets" "github.com/timescale/tsbs/pkg/targets/common" - "log" - "time" ) type benchmark struct { @@ -53,36 +54,36 @@ func (b benchmark) GetPointIndexer(maxPartitions uint) targets.PointIndexer { } func (b benchmark) GetProcessor() targets.Processor { - awsSession, err := OpenAWSSession(&b.config.AwsRegion, time.Minute) + awsCfg, err := OpenAWSSession(&b.config.AwsRegion, time.Minute) if err != nil { panic("could not open aws session") } - if b.config.UseCommonAttributes { + if !b.config.UseSingleMeasureRecords { return &commonDimensionsProcessor{ dbName: b.targetDb, batchPool: b.batchFactory.pool, headers: b.ds.Headers(), - writeService: timestreamwrite.New(awsSession), + writeService: timestreamwrite.NewFromConfig(awsCfg), multiMeasureMeasureName: b.config.MultiMeasureMeasureName, } } return &eachValueARecordProcessor{ batchPool: b.batchFactory.pool, - writeService: timestreamwrite.New(awsSession), + writeService: timestreamwrite.NewFromConfig(awsCfg), headers: b.ds.Headers(), dbName: b.targetDb, } } func (b benchmark) GetDBCreator() targets.DBCreator { - awsSession, err := OpenAWSSession(&b.config.AwsRegion, time.Minute) + awsCfg, err := OpenAWSSession(&b.config.AwsRegion, time.Minute) if err != nil { panic("could not open aws session") } return &dbCreator{ ds: b.ds, - writeSvc: timestreamwrite.New(awsSession), + writeSvc: timestreamwrite.NewFromConfig(awsCfg), magneticStoreRetentionPeriodInDays: b.config.MagStoreRetentionInDays, memoryRetentionPeriodInHours: b.config.MemStoreRetentionInHours, customPartitionKeyDimension: b.config.CustomPartitionKeyDimension, @@ -113,9 +114,7 @@ func initDataSource(config *source.DataSourceConfig, useCurrentTs bool) (targets panic("unhandled data source type!!!") } -// createHashProvider creates the function that will take out the -// value used to calculate the hash depending on which is the -// hashProperty. +// createHashProvider creates a function that extracts the value used for hashing from a data point. func createHashProvider(ds targets.DataSource, hashTag string) (func(point *data.LoadedPoint) []byte, error) { headers := ds.Headers() tagIndex := -1 diff --git a/pkg/targets/timestream/common_dimensions_processor.go b/pkg/targets/timestream/common_dimensions_processor.go index a7cc0800e..ca6dc6d46 100644 --- a/pkg/targets/timestream/common_dimensions_processor.go +++ b/pkg/targets/timestream/common_dimensions_processor.go @@ -1,22 +1,25 @@ package timestream import ( - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/timestreamwrite" + "context" + "log" + "sync" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/timestreamwrite" + "github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types" "github.com/pkg/errors" "github.com/timescale/tsbs/pkg/data/usecases/common" "github.com/timescale/tsbs/pkg/targets" - "log" - "sync" ) type commonDimensionsProcessor struct { dbName string batchPool *sync.Pool headers *common.GeneratedDataHeaders - _dimensionsBuffer []*timestreamwrite.Dimension - _recordsBuffer []*timestreamwrite.Record - writeService *timestreamwrite.TimestreamWrite + _dimensionsBuffer []*types.Dimension + _recordsBuffer []types.Record + writeService *timestreamwrite.Client multiMeasureMeasureName string } @@ -28,7 +31,7 @@ func (c *commonDimensionsProcessor) Init(workerNum int, doLoad, hashWorkers bool maxFields = len(tableFields) } } - c._recordsBuffer = make([]*timestreamwrite.Record, maxFields) + c._recordsBuffer = make([]types.Record, maxFields) } func (c *commonDimensionsProcessor) ProcessBatch(b targets.Batch, doLoad bool) (metricCount, rowCount uint64) { @@ -51,64 +54,64 @@ func (c *commonDimensionsProcessor) ProcessBatch(b targets.Batch, doLoad bool) ( func (c *commonDimensionsProcessor) expandDimensionBuffer(requiredDimensions int) { if len(c._dimensionsBuffer) < requiredDimensions { - c._dimensionsBuffer = make([]*timestreamwrite.Dimension, requiredDimensions) + c._dimensionsBuffer = make([]*types.Dimension, requiredDimensions) } } func (c *commonDimensionsProcessor) writeToTable(table string, rows []deserializedPoint) (metricCount uint64, err error) { - var batchedRecords []*timestreamwrite.Record + var batchedRecords []types.Record for _, row := range rows { c.expandDimensionBuffer(len(row.tagKeys)) numDimensions := convertTagsToDimensions(row.tagKeys, row.tags, c._dimensionsBuffer) - measureValues := make([]*timestreamwrite.MeasureValue, 0, len(c.headers.FieldKeys[table])) + measureValues := make([]types.MeasureValue, 0, len(c.headers.FieldKeys[table])) for i, fieldVal := range row.fields { if fieldVal == nil { continue } - mv := ×treamwrite.MeasureValue{ + mv := types.MeasureValue{ Name: aws.String(c.headers.FieldKeys[table][i]), Value: aws.String(*fieldVal), - Type: aws.String(timestreamwrite.MeasureValueTypeDouble), + Type: types.MeasureValueTypeDouble, } measureValues = append(measureValues, mv) } - dims := make([]*timestreamwrite.Dimension, numDimensions) + dims := make([]types.Dimension, numDimensions) for i := 0; i < numDimensions; i++ { - dims[i] = ×treamwrite.Dimension{ + dims[i] = types.Dimension{ Name: c._dimensionsBuffer[i].Name, Value: c._dimensionsBuffer[i].Value, DimensionValueType: c._dimensionsBuffer[i].DimensionValueType, } } - rec := ×treamwrite.Record{ - Dimensions: dims, - Time: aws.String(row.timeUnixNano), - TimeUnit: aws.String(timestreamwrite.TimeUnitNanoseconds), + rec := types.Record{ + MeasureValues: measureValues, + Dimensions: dims, + Time: aws.String(row.timeUnixNano), } - rec.SetMeasureName(c.multiMeasureMeasureName) - rec.SetMeasureValueType(timestreamwrite.MeasureValueTypeMulti) - rec.SetMeasureValues(measureValues) - // Append the record to the batch batchedRecords = append(batchedRecords, rec) metricCount += uint64(len(row.fields)) - // When 100 records have been accumulated, write them + // Write in batches of 100 records if len(batchedRecords) == 100 { input := ×treamwrite.WriteRecordsInput{ - DatabaseName: &c.dbName, - TableName: &table, + DatabaseName: aws.String(c.dbName), + TableName: aws.String(table), Records: batchedRecords, + CommonAttributes: &types.Record{ + MeasureName: aws.String(c.multiMeasureMeasureName), + MeasureValueType: types.MeasureValueTypeMulti, + TimeUnit: types.TimeUnitNanoseconds, + }, } - _, err = c.writeService.WriteRecords(input) + _, err = c.writeService.WriteRecords(context.Background(), input) if err != nil { return 0, errors.Wrap(err, "could not write records to db") } - // Reset the slice for the next batch batchedRecords = batchedRecords[:0] } } @@ -116,11 +119,16 @@ func (c *commonDimensionsProcessor) writeToTable(table string, rows []deserializ // Write any remaining records if len(batchedRecords) > 0 { input := ×treamwrite.WriteRecordsInput{ - DatabaseName: &c.dbName, - TableName: &table, + DatabaseName: aws.String(c.dbName), + TableName: aws.String(table), Records: batchedRecords, + CommonAttributes: &types.Record{ + MeasureName: aws.String(c.multiMeasureMeasureName), + MeasureValueType: types.MeasureValueTypeMulti, + TimeUnit: types.TimeUnitNanoseconds, + }, } - _, err = c.writeService.WriteRecords(input) + _, err = c.writeService.WriteRecords(context.Background(), input) if err != nil { return 0, errors.Wrap(err, "could not write records to db") } @@ -129,15 +137,14 @@ func (c *commonDimensionsProcessor) writeToTable(table string, rows []deserializ return metricCount, nil } -func convertTagsToDimensions(tagKeys, tagValues []string, buffer []*timestreamwrite.Dimension) (numDimensions int) { +func convertTagsToDimensions(tagKeys, tagValues []string, buffer []*types.Dimension) (numDimensions int) { for i, value := range tagValues { if buffer[i] == nil { - buffer[i] = ×treamwrite.Dimension{} + buffer[i] = &types.Dimension{} } - buffer[i].Name = &tagKeys[i] - valNewPtr := value - buffer[i].Value = &valNewPtr - buffer[i].DimensionValueType = aws.String(timestreamwrite.DimensionValueTypeVarchar) + buffer[i].Name = aws.String(tagKeys[i]) + buffer[i].Value = aws.String(value) + buffer[i].DimensionValueType = types.DimensionValueTypeVarchar } return len(tagValues) } diff --git a/pkg/targets/timestream/config.go b/pkg/targets/timestream/config.go index c88b7b709..7fa592adf 100644 --- a/pkg/targets/timestream/config.go +++ b/pkg/targets/timestream/config.go @@ -6,7 +6,7 @@ import ( ) type SpecificConfig struct { - UseCommonAttributes bool `yaml:"use-common-attributes" mapstructure:"use-common-attributes"` + UseSingleMeasureRecords bool `yaml:"use-single-measure-records" mapstructure:"use-single-measure-records"` AwsRegion string `yaml:"aws-region" mapstructure:"aws-region"` HashProperty string `yaml:"hash-property" mapstructure:"hash-property"` UseCurrentTime bool `yaml:"use-current-time" mapstructure:"use-current-time"` @@ -29,10 +29,9 @@ func parseSpecificConfig(v *viper.Viper) (*SpecificConfig, error) { func targetSpecificFlags(flagPrefix string, flagSet *pflag.FlagSet) { flagSet.Bool( - flagPrefix+"use-common-attributes", - true, - "Timestream client makes write requests with common attributes. "+ - "If false, each value is written as a separate Record and a request of 100 records at once is sent", + flagPrefix+"use-single-measure-records", + false, + "Whether to use single-measure records. Set to false to use multi-measure records.", ) flagSet.String(flagPrefix+"aws-region", "us-east-1", "AWS region where the db is located") flagSet.String( diff --git a/pkg/targets/timestream/db_creator.go b/pkg/targets/timestream/db_creator.go index 63bee5a7b..55622568a 100644 --- a/pkg/targets/timestream/db_creator.go +++ b/pkg/targets/timestream/db_creator.go @@ -1,13 +1,16 @@ package timestream import ( + "context" "fmt" - "github.com/aws/aws-sdk-go/service/timestreamwrite" - "github.com/pkg/errors" - "github.com/timescale/tsbs/pkg/targets" - "strings" "log" + "strings" "time" + + "github.com/aws/aws-sdk-go-v2/service/timestreamwrite" + "github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types" + "github.com/pkg/errors" + "github.com/timescale/tsbs/pkg/targets" ) const ( @@ -16,7 +19,7 @@ const ( ) type dbCreator struct { - writeSvc *timestreamwrite.TimestreamWrite + writeSvc *timestreamwrite.Client ds targets.DataSource customPartitionKeyDimension string customPartitionKeyType string @@ -27,34 +30,33 @@ type dbCreator struct { } func (d *dbCreator) Init() { - // read headers from data source so PostCreate can create the tables + // Read headers from data source so PostCreateDB can create the tables. d.ds.Headers() } func (d *dbCreator) DBExists(dbName string) bool { + ctx := context.Background() describeDatabaseInput := ×treamwrite.DescribeDatabaseInput{ DatabaseName: &dbName, } - _, err := d.writeSvc.DescribeDatabase(describeDatabaseInput) + _, err := d.writeSvc.DescribeDatabase(ctx, describeDatabaseInput) if err != nil { - // Check if error was "database doesn't exist" - _, ok := err.(*timestreamwrite.ResourceNotFoundException) - if ok { + var rnfe *types.ResourceNotFoundException + if errors.As(err, &rnfe) { return false } panic("could not execute 'describe database': " + err.Error()) } - return true } func (d *dbCreator) CreateDB(dbName string) error { log.Println("Creating database " + dbName) + ctx := context.Background() createDatabaseInput := ×treamwrite.CreateDatabaseInput{ DatabaseName: &dbName, } - - if _, err := d.writeSvc.CreateDatabase(createDatabaseInput); err != nil { + if _, err := d.writeSvc.CreateDatabase(ctx, createDatabaseInput); err != nil { return errors.Wrap(err, "could not create database "+dbName) } return nil @@ -62,8 +64,11 @@ func (d *dbCreator) CreateDB(dbName string) error { func (d *dbCreator) RemoveOldDB(dbName string) error { log.Println("Removing existing database " + dbName) - listTables := ×treamwrite.ListTablesInput{DatabaseName: &dbName} - tablesOutput, err := d.writeSvc.ListTables(listTables) + ctx := context.Background() + listTablesInput := ×treamwrite.ListTablesInput{ + DatabaseName: &dbName, + } + tablesOutput, err := d.writeSvc.ListTables(ctx, listTablesInput) if err != nil { return errors.Wrap(err, "could not check existing tables in "+dbName) } @@ -72,12 +77,14 @@ func (d *dbCreator) RemoveOldDB(dbName string) error { DatabaseName: &dbName, TableName: table.TableName, } - if _, err := d.writeSvc.DeleteTable(deleteTableInput); err != nil { + if _, err := d.writeSvc.DeleteTable(ctx, deleteTableInput); err != nil { return errors.Wrap(err, "could not delete table "+*table.TableName+" in db "+dbName) } } - deleteDatabaseInput := ×treamwrite.DeleteDatabaseInput{DatabaseName: &dbName} - if _, err := d.writeSvc.DeleteDatabase(deleteDatabaseInput); err != nil { + deleteDatabaseInput := ×treamwrite.DeleteDatabaseInput{ + DatabaseName: &dbName, + } + if _, err := d.writeSvc.DeleteDatabase(ctx, deleteDatabaseInput); err != nil { return errors.Wrap(err, "could not delete database "+dbName) } return nil @@ -87,48 +94,50 @@ func (d *dbCreator) RemoveOldDB(dbName string) error { func (d *dbCreator) PostCreateDB(dbName string) error { log.Println("Creating Timestream tables") headers := d.ds.Headers() - - var partitionKeyWithDimensionAndOptionalEnforcement []*timestreamwrite.PartitionKey - customPartitionKeyType := strings.ToUpper(d.customPartitionKeyType) - - if customPartitionKeyType == "DIMENSION" { - enforcement := "OPTIONAL" + var partitionKeys []types.PartitionKey + if strings.ToUpper(d.customPartitionKeyType) == "DIMENSION" { + var enforcement types.PartitionKeyEnforcementLevel if d.enforceCustomPartitionKey { - enforcement = "REQUIRED" + enforcement = types.PartitionKeyEnforcementLevelRequired + } else { + enforcement = types.PartitionKeyEnforcementLevelOptional } - partitionKeyWithDimensionAndOptionalEnforcement = []*timestreamwrite.PartitionKey{ + partitionKeys = []types.PartitionKey{ { Name: &d.customPartitionKeyDimension, - EnforcementInRecord: &enforcement, - Type: &customPartitionKeyType, + EnforcementInRecord: enforcement, + Type: types.PartitionKeyTypeDimension, }, } } + var requiredTables []string + ctx := context.Background() for tableName := range headers.FieldKeys { requiredTables = append(requiredTables, tableName) createTableInput := ×treamwrite.CreateTableInput{ DatabaseName: &dbName, - MagneticStoreWriteProperties: ×treamwrite.MagneticStoreWriteProperties{ + // Reference properties from the types package. + MagneticStoreWriteProperties: &types.MagneticStoreWriteProperties{ EnableMagneticStoreWrites: &d.enableMagStoreWrites, }, - RetentionProperties: ×treamwrite.RetentionProperties{ + RetentionProperties: &types.RetentionProperties{ MagneticStoreRetentionPeriodInDays: &d.magneticStoreRetentionPeriodInDays, MemoryStoreRetentionPeriodInHours: &d.memoryRetentionPeriodInHours, }, TableName: &tableName, } - if len(partitionKeyWithDimensionAndOptionalEnforcement) > 0 { - createTableInput.Schema = ×treamwrite.Schema{ - CompositePartitionKey: partitionKeyWithDimensionAndOptionalEnforcement, + if len(partitionKeys) > 0 { + createTableInput.Schema = &types.Schema{ + CompositePartitionKey: partitionKeys, } } - _, err := d.writeSvc.CreateTable(createTableInput) + _, err := d.writeSvc.CreateTable(ctx, createTableInput) if err != nil { - // If there's an error, check if it's a ConflictException (i.e., table already exists) - if _, ok := err.(*timestreamwrite.ConflictException); ok { + var conflict *types.ConflictException + if errors.As(err, &conflict) { log.Println("Table " + tableName + " exists, skipping create") } else { // Print the complete error details @@ -165,19 +174,21 @@ func (d *dbCreator) waitForTables(dbName string, requiredTables []string) error time.Sleep(checkTablesSecondsBetweenChecks * time.Second) } return nil - } -func (d *dbCreator) listTableStatus(dbName string) (tableStatus map[string]string, err error) { - listTables := ×treamwrite.ListTablesInput{DatabaseName: &dbName} - tablesOutput, err := d.writeSvc.ListTables(listTables) +func (d *dbCreator) listTableStatus(dbName string) (map[string]string, error) { + ctx := context.Background() + listTablesInput := ×treamwrite.ListTablesInput{ + DatabaseName: &dbName, + } + tablesOutput, err := d.writeSvc.ListTables(ctx, listTablesInput) if err != nil { return nil, errors.Wrap(err, "could not check existing tables in "+dbName) } - tableStatus = make(map[string]string, len(tablesOutput.Tables)) + tableStatus := make(map[string]string, len(tablesOutput.Tables)) for _, table := range tablesOutput.Tables { tableName := *table.TableName - tableStatus[tableName] = *table.TableStatus + tableStatus[tableName] = string(table.TableStatus) } return tableStatus, nil } @@ -188,7 +199,7 @@ func checkTableStatus(tableStatus map[string]string, requiredTables []string) (b if !ok { return false, fmt.Errorf("required table '%s' not found in db", table) } - if status != timestreamwrite.TableStatusActive { + if status != string(types.TableStatusActive) { return false, nil } } diff --git a/pkg/targets/timestream/each_point_a_record_processor.go b/pkg/targets/timestream/each_point_a_record_processor.go index ef61babe3..46976d8c5 100644 --- a/pkg/targets/timestream/each_point_a_record_processor.go +++ b/pkg/targets/timestream/each_point_a_record_processor.go @@ -1,13 +1,16 @@ package timestream import ( - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/timestreamwrite" + "context" + "log" + "sync" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/timestreamwrite" + "github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types" "github.com/pkg/errors" "github.com/timescale/tsbs/pkg/data/usecases/common" "github.com/timescale/tsbs/pkg/targets" - "log" - "sync" ) const maxRecordsPerWriteRequest = 100 @@ -16,7 +19,7 @@ type eachValueARecordProcessor struct { dbName string batchPool *sync.Pool headers *common.GeneratedDataHeaders - writeService *timestreamwrite.TimestreamWrite + writeService *timestreamwrite.Client } func (p *eachValueARecordProcessor) Init(_ int, _, _ bool) {} @@ -40,67 +43,77 @@ func (p *eachValueARecordProcessor) ProcessBatch(b targets.Batch, doLoad bool) ( } func (p *eachValueARecordProcessor) writeBatch(table string, rows []deserializedPoint) (numMetrics uint64, err error) { - records := make([]*timestreamwrite.Record, 0, maxRecordsPerWriteRequest) + ctx := context.Background() + records := make([]types.Record, 0, maxRecordsPerWriteRequest) for _, row := range rows { if len(records)+len(row.fields) >= maxRecordsPerWriteRequest { writeRecordsInput := ×treamwrite.WriteRecordsInput{ - DatabaseName: &p.dbName, - TableName: &table, + DatabaseName: aws.String(p.dbName), + TableName: aws.String(table), Records: records, } - _, err := p.writeService.WriteRecords(writeRecordsInput) + _, err := p.writeService.WriteRecords(ctx, writeRecordsInput) if err != nil { return 0, errors.Wrap(err, "could not write records to db") } numMetrics += uint64(len(records)) - records = make([]*timestreamwrite.Record, 0, maxRecordsPerWriteRequest) + records = make([]types.Record, 0, maxRecordsPerWriteRequest) } records = append(records, p.convertToRecords(table, row)...) } - writeRecordsInput := ×treamwrite.WriteRecordsInput{ - DatabaseName: &p.dbName, - TableName: &table, - Records: records, - } - _, err = p.writeService.WriteRecords(writeRecordsInput) - if err != nil { - return 0, errors.Wrap(err, "could not write records to db") + // Write any remaining records. + if len(records) > 0 { + writeRecordsInput := ×treamwrite.WriteRecordsInput{ + DatabaseName: aws.String(p.dbName), + TableName: aws.String(table), + Records: records, + } + _, err = p.writeService.WriteRecords(ctx, writeRecordsInput) + if err != nil { + return 0, errors.Wrap(err, "could not write records to db") + } + numMetrics += uint64(len(records)) } - numMetrics += uint64(len(records)) return numMetrics, nil } -func (p *eachValueARecordProcessor) convertToRecords(table string, row deserializedPoint) []*timestreamwrite.Record { +func (p *eachValueARecordProcessor) convertToRecords(table string, row deserializedPoint) []types.Record { dimensions := createDimensions(row.tagKeys, row.tags) return createRecords(&row, p.headers.FieldKeys[table], dimensions, row.timeUnixNano) } -func createRecords(point *deserializedPoint, fieldKeys []string, dimensions []*timestreamwrite.Dimension, ts string) (buffer []*timestreamwrite.Record) { - buffer = make([]*timestreamwrite.Record, 0, len(fieldKeys)) +func createRecords(point *deserializedPoint, fieldKeys []string, dimensions []types.Dimension, ts string) []types.Record { + buffer := make([]types.Record, 0, len(fieldKeys)) for i, fieldVal := range point.fields { if fieldVal == nil { continue } - newRecord := ×treamwrite.Record{} - newRecord.SetDimensions(dimensions) - newRecord.SetMeasureName(fieldKeys[i]) - newRecord.SetMeasureValueType(timestreamwrite.MeasureValueTypeDouble) - newRecord.SetMeasureValue(*fieldVal) - newRecord.SetTime(ts) - newRecord.SetTimeUnit(timestreamwrite.TimeUnitNanoseconds) + newRecord := types.Record{ + Dimensions: dimensions, + MeasureName: aws.String(fieldKeys[i]), + MeasureValueType: types.MeasureValueTypeDouble, + MeasureValue: aws.String(*fieldVal), + Time: aws.String(ts), + TimeUnit: types.TimeUnitNanoseconds, + } buffer = append(buffer, newRecord) } return buffer } -func createDimensions(tagKeys, tagValues []string) (buffer []*timestreamwrite.Dimension) { - buffer = make([]*timestreamwrite.Dimension, len(tagKeys)) - for i, value := range tagValues { - buffer[i] = ×treamwrite.Dimension{} - buffer[i].Name = &tagKeys[i] - valNewPtr := value - buffer[i].Value = &valNewPtr - buffer[i].DimensionValueType = aws.String(timestreamwrite.DimensionValueTypeVarchar) + +func createDimensions(tagKeys, tagValues []string) []types.Dimension { + buffer := make([]types.Dimension, len(tagKeys)) + for i, key := range tagKeys { + var value string + if i < len(tagValues) { + value = tagValues[i] + } + buffer[i] = types.Dimension{ + Name: aws.String(key), + Value: aws.String(value), + DimensionValueType: types.DimensionValueTypeVarchar, + } } return buffer }