Skip to content

Commit 3d09bc6

Browse files
authored
Timestream Prometheus Connector with AWS PrivateLink (#14)
- Added new config for setting base endpoints - Added SAM template and guide
1 parent 8917c42 commit 3d09bc6

File tree

10 files changed

+379
-184
lines changed

10 files changed

+379
-184
lines changed

.github/workflows/go-build-test.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ jobs:
5050
- uses: actions/checkout@v4
5151
- uses: actions/setup-go@v5
5252
with:
53-
go-version: 'stable'
53+
go-version: '1.23'
5454
cache: false
5555
env:
5656
GO111MODULE: on

configuration.go

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ var (
3535
defaultDatabaseConfig = &configuration{flag: "default-database", envFlag: "default_database", defaultValue: ""}
3636
defaultTableConfig = &configuration{flag: "default-table", envFlag: "default_table", defaultValue: ""}
3737
enableSigV4AuthConfig = &configuration{flag: "enable-sigv4-auth", envFlag: "enable_sigv4_auth", defaultValue: "true"}
38+
queryBaseEndpointConfig = &configuration{flag: "query-base-endpoint", envFlag: "query_base_endpoint", defaultValue: ""}
39+
writeBaseEndpointConfig = &configuration{flag: "write-base-endpoint", envFlag: "write_base_endpoint", defaultValue: ""}
3840
listenAddrConfig = &configuration{flag: "web.listen-address", envFlag: "", defaultValue: ":9201"}
3941
telemetryPathConfig = &configuration{flag: "web.telemetry-path", envFlag: "", defaultValue: "/metrics"}
4042
failOnLabelConfig = &configuration{flag: "fail-on-long-label", envFlag: "fail_on_long_label", defaultValue: "false"}

main.go

+25-6
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ type connectionConfig struct {
103103
telemetryPath string
104104
maxReadRetries int
105105
maxWriteRetries int
106+
queryBaseEndpoint string
107+
writeBaseEndpoint string
106108
certificate string
107109
key string
108110
}
@@ -122,13 +124,14 @@ func main() {
122124
logger := cfg.createLogger()
123125

124126
ctx := context.Background()
125-
awsQueryConfigs, err := cfg.buildAWSConfig(ctx, cfg.maxReadRetries)
127+
128+
awsQueryConfigs, err := cfg.buildAWSConfig(ctx, cfg.maxReadRetries, cfg.queryBaseEndpoint)
126129
if err != nil {
127130
timestream.LogError(logger, "Failed to build AWS configuration for query", err)
128131
os.Exit(1)
129132
}
130133

131-
awsWriteConfigs, err := cfg.buildAWSConfig(ctx, cfg.maxWriteRetries)
134+
awsWriteConfigs, err := cfg.buildAWSConfig(ctx, cfg.maxWriteRetries, cfg.writeBaseEndpoint)
132135
if err != nil {
133136
timestream.LogError(logger, "Failed to build AWS configuration for write", err)
134137
os.Exit(1)
@@ -185,12 +188,12 @@ func lambdaHandler(req events.APIGatewayProxyRequest) (events.APIGatewayProxyRes
185188
return createErrorResponse(errors.NewParseBasicAuthHeaderError().(*errors.ParseBasicAuthHeaderError).Message())
186189
}
187190
}
188-
awsQueryConfigs, err := cfg.buildAWSConfig(ctx, cfg.maxReadRetries)
191+
awsQueryConfigs, err := cfg.buildAWSConfig(ctx, cfg.maxReadRetries, cfg.queryBaseEndpoint)
189192
if err != nil {
190193
timestream.LogError(logger, "Failed to build AWS configuration for query", err)
191194
os.Exit(1)
192195
}
193-
awsWriteConfigs, err := cfg.buildAWSConfig(ctx, cfg.maxWriteRetries)
196+
awsWriteConfigs, err := cfg.buildAWSConfig(ctx, cfg.maxWriteRetries, cfg.writeBaseEndpoint)
194197
if err != nil {
195198
timestream.LogError(logger, "Failed to build AWS configuration for write", err)
196199
os.Exit(1)
@@ -394,6 +397,9 @@ func parseEnvironmentVariables() (*connectionConfig, error) {
394397
return nil, errors.NewParseRetriesError(writeRetries, "write")
395398
}
396399

400+
cfg.queryBaseEndpoint = getOrDefault(queryBaseEndpointConfig)
401+
cfg.writeBaseEndpoint = getOrDefault(writeBaseEndpointConfig)
402+
397403
cfg.promlogConfig = promlog.Config{Level: &promlog.AllowedLevel{}, Format: &promlog.AllowedFormat{}}
398404
cfg.promlogConfig.Level.Set(getOrDefault(promlogLevelConfig))
399405
cfg.promlogConfig.Format.Set(getOrDefault(promlogFormatConfig))
@@ -431,6 +437,10 @@ func parseFlags() *connectionConfig {
431437
a.Flag(certificateConfig.flag, "TLS server certificate file.").Default(certificateConfig.defaultValue).StringVar(&cfg.certificate)
432438
a.Flag(keyConfig.flag, "TLS server private key file.").Default(keyConfig.defaultValue).StringVar(&cfg.key)
433439
a.Flag(enableSigV4AuthConfig.flag, "Whether to enable SigV4 authentication with the API Gateway. Default to 'false'.").Default(enableSigV4AuthConfig.defaultValue).StringVar(&enableSigV4Auth)
440+
a.Flag(queryBaseEndpointConfig.flag, "Override the default Timestream query endpoint (e.g., a VPC Endpoint).").
441+
Default(queryBaseEndpointConfig.defaultValue).StringVar(&cfg.queryBaseEndpoint)
442+
a.Flag(writeBaseEndpointConfig.flag, "Override the default Timestream write endpoint (e.g., a VPC Endpoint).").
443+
Default(writeBaseEndpointConfig.defaultValue).StringVar(&cfg.writeBaseEndpoint)
434444

435445
flag.AddFlags(a, &cfg.promlogConfig)
436446

@@ -439,7 +449,12 @@ func parseFlags() *connectionConfig {
439449
os.Exit(1)
440450
}
441451

442-
if err := cfg.parseBoolFromStrings(enableLogging, failOnLongMetricLabelName, failOnInvalidSample, enableSigV4Auth); err != nil {
452+
if err := cfg.parseBoolFromStrings(
453+
enableLogging,
454+
failOnLongMetricLabelName,
455+
failOnInvalidSample,
456+
enableSigV4Auth,
457+
); err != nil {
443458
os.Exit(1)
444459
}
445460

@@ -457,7 +472,7 @@ func parseFlags() *connectionConfig {
457472
}
458473

459474
// buildAWSConfig builds a aws.Config and return the pointer of the config.
460-
func (cfg *connectionConfig) buildAWSConfig(ctx context.Context, maxRetries int) (aws.Config, error) {
475+
func (cfg *connectionConfig) buildAWSConfig(ctx context.Context, maxRetries int, baseEndpoint string) (aws.Config, error) {
461476
awsConfig, err := config.LoadDefaultConfig(ctx,
462477
config.WithRegion(cfg.clientConfig.region),
463478
config.WithRetryer(func() aws.Retryer {
@@ -469,6 +484,10 @@ func (cfg *connectionConfig) buildAWSConfig(ctx context.Context, maxRetries int)
469484
if err != nil {
470485
return aws.Config{}, fmt.Errorf("failed to build AWS config: %w", err)
471486
}
487+
488+
if baseEndpoint != "" {
489+
awsConfig.BaseEndpoint = aws.String(baseEndpoint)
490+
}
472491
return awsConfig, nil
473492
}
474493

main_test.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -661,16 +661,19 @@ func TestBuildAWSConfig(t *testing.T) {
661661
name string
662662
maxRetries int
663663
expectedMaxAttempts int
664+
baseEndpoint string
664665
}{
665666
{
666667
name: "read config",
667668
maxRetries: 10,
668669
expectedMaxAttempts: 10,
670+
baseEndpoint: "",
669671
},
670672
{
671673
name: "write config",
672674
maxRetries: 3,
673675
expectedMaxAttempts: 3,
676+
baseEndpoint: "https://ingest-cell1.timestream.us-west-2.amazonaws.com",
674677
},
675678
}
676679

@@ -685,7 +688,7 @@ func TestBuildAWSConfig(t *testing.T) {
685688
maxWriteRetries: test.expectedMaxAttempts,
686689
}
687690

688-
actualConfig, err := input.buildAWSConfig(context.Background(), test.maxRetries)
691+
actualConfig, err := input.buildAWSConfig(context.Background(), test.maxRetries, test.baseEndpoint)
689692

690693
assert.Nil(t, err)
691694
assert.NotNil(t, actualConfig)
@@ -700,6 +703,9 @@ func TestBuildAWSConfig(t *testing.T) {
700703
if ok {
701704
assert.Equal(t, test.expectedMaxAttempts, standardRetryer.MaxAttempts())
702705
}
706+
if test.baseEndpoint != "" {
707+
assert.Equal(t, aws.String(test.baseEndpoint), actualConfig.BaseEndpoint)
708+
}
703709
})
704710
}
705711
}

privatelink/DEVELOPER_README.md

+141
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
# Timestream Prometheus Connector with AWS PrivateLink
2+
3+
## Overview
4+
5+
This guide explains how to set up the Prometheus Connector to ingest data to Amazon Timestream from within an isolated VPC environment using [AWS PrivateLink](https://aws.amazon.com/privatelink/).
6+
7+
This [serverless application](https://aws.amazon.com/serverless/) consists of the following:
8+
- [Amazon EC2](https://aws.amazon.com/ec2/getting-started/) instance that will host the Prometheus Connector.
9+
- [VPC Endpoints](https://docs.aws.amazon.com/whitepapers/latest/aws-privatelink/what-are-vpc-endpoints.html) for securely communicating with AWS services using PrivateLink.
10+
11+
This application assumes that the VPC in which the template will be deployed has no internet access and ensures that all communication stays within Amazon's internal network.
12+
13+
## Prerequisites
14+
15+
1. A VPC with at least two private subnets and route tables.
16+
2. A Timestream database and table.
17+
3. [Read and write cells](https://docs.aws.amazon.com/timestream/latest/developerguide/architecture.html#cells) for your Timestream account. Amazon routes requests to the write and query endpoints of the cell that your account has been mapped to for a given region.
18+
19+
To get your assigned cells using `awscli`:
20+
21+
For read endpoint:
22+
```
23+
aws timestream-query describe-endpoints --region <AWS_REGION>
24+
```
25+
26+
For write endpoint:
27+
```
28+
aws timestream-write describe-endpoints --region <AWS_REGION>
29+
```
30+
31+
Example output for the write endpoint:
32+
```
33+
{
34+
"Endpoints": [
35+
{
36+
"Address": "ingest-cell1.timestream.us-west-2.amazonaws.com",
37+
"CachePeriodInMinutes": 1440
38+
}
39+
]
40+
}
41+
```
42+
Take note of your assigned cells (`ingest-cell1` for the above example) for both read and write endpoints.
43+
44+
45+
## Deployment
46+
47+
From your existing VPC, you will need the following values:
48+
- VPC ID: This is the ID of your existing VPC
49+
- VPC CIDR : This is the CIDR range for your VPC
50+
- Private Subnet IDs: This is where the EC2 instance and VPC endpoints will be deployed
51+
- Private Route Table ID(s): This is how the [S3 Gateway endpoint](https://docs.aws.amazon.com/vpc/latest/privatelink/vpc-endpoints-s3.html) will resolve requests
52+
- Query and Write cells: These are your assigned endpoint cells for Timestream
53+
54+
55+
1. From the `privatelink` directory, run the following command to deploy the SAM template:
56+
57+
```
58+
sam deploy --parameter-overrides "VpcId=<VPC_ID> VpcCidrIp=<VPC_CIDR_IP> PrivateSubnetIds=<PRIVATE_SUBNET_ID_1>,<PRIVATE_SUBNET_ID_2> PrivateRouteTableIds=<PRIVATE_ROUTE_TABLE_ID> TimestreamQueryCell=<QUERY_CELL> TimestreamWriteCell=<WRITE_CELL> --region <AWS_REGION>"
59+
```
60+
61+
To view the full set of `sam deploy` options see the [sam deploy documentation](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-cli-command-reference-sam-deploy.html).
62+
63+
2. The deployment will have the following outputs upon completion:
64+
65+
- `InstanceId`: ID of the EC2 instance
66+
67+
An example of the output:
68+
69+
```
70+
------------------------------------------------------------------------------
71+
Outputs
72+
------------------------------------------------------------------------------
73+
Key InstanceId
74+
Description ID of the EC2 instance
75+
Value i-08a5d7e1700c9be5a
76+
------------------------------------------------------------------------------
77+
```
78+
79+
3. Start an AWS SSM session, replacing `INSTANCE_ID` with your EC2 instance ID from deployment. You can install the [plugin here.](https://docs.aws.amazon.com/systems-manager/latest/userguide/session-manager-working-with-install-plugin.html)
80+
81+
```shell
82+
aws ssm start-session --target i-<INSTANCE_ID>
83+
```
84+
85+
4. Install the Prometheus Connector.
86+
87+
1. Create a directory for the connector.
88+
```
89+
mkdir ~/connector && cd ~/connector
90+
```
91+
92+
2. Download the precompiled binary from S3 for your region. [See here](https://github.com/awslabs/amazon-timestream-connector-prometheus/tags) for released versions.
93+
94+
```shell
95+
curl -O https://timestreamassets-<AWS_REGION>.s3.<AWS_REGION>.amazonaws.com/timestream-prometheus-connector/timestream-prometheus-connector-linux-arm64-<VERSION>.zip
96+
```
97+
98+
3. Unzip the binary.
99+
100+
```shell
101+
unzip timestream-prometheus-connector-linux-arm64-<VERSION>.zip
102+
```
103+
104+
4. Disable endpoint discovery by setting the `AWS_ENABLE_ENDPOINT_DISCOVERY` environment variable to `false`. This ensures requests from the connector are routed through VPC endpoints.
105+
```
106+
export AWS_ENABLE_ENDPOINT_DISCOVERY=false
107+
```
108+
109+
5. Launch Prometheus Connector
110+
111+
Replace the following variables to configure your Timestream database, region, and assigned cells.
112+
113+
114+
- `DEFAULT_DATABASE`: Specifies the default Timestream database for the Prometheus Connector.
115+
- `DEFAULT_TABLE`: Specifies the default table for storing Prometheus metrics.
116+
- `AWS_REGION`: Defines the AWS region.
117+
- `QUERY_CELL`: Defines the query endpoint cell for Timestream.
118+
- `INGEST_CELL`: Defines the ingestion endpoint cell for Timestream.
119+
120+
Run the Prometheus Connector:
121+
122+
```
123+
./bootstrap \
124+
--default-database=<DEFAULT_DATABASE> \
125+
--default-table=<DEFAULT_TABLE> \
126+
--region=<AWS_REGION> \
127+
--query-base-endpoint=https://<QUERY_CELL>.timestream.<AWS_REGION>.amazonaws.com \
128+
--write-base-endpoint=https://<INGEST_CELL>.timestream.<AWS_REGION>.amazonaws.com
129+
```
130+
131+
The connector is now ready to ingest data to Timestream!
132+
133+
To see an example of how Prometheus can be configured, [see here](https://github.com/awslabs/amazon-timestream-connector-prometheus?tab=readme-ov-file#prometheus-configuration).
134+
135+
### Cleanup
136+
137+
Delete the CloudFormation stack. From the `privatelink` directory, run the following command:
138+
139+
```shell
140+
sam delete --region <AWS_REGION>
141+
```

privatelink/samconfig.toml

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
version = 0.1
2+
[default]
3+
[default.deploy]
4+
[default.deploy.parameters]
5+
capabilities = "CAPABILITY_NAMED_IAM"
6+
confirm_changeset = true
7+
profile = "default"
8+
region = "us-west-2"
9+
stack_name = "PrometheusConnectorPrivateLink"
10+
resolve_s3 = true

0 commit comments

Comments
 (0)