Skip to content

Commit e5df53d

Browse files
author
Hugo Correia
authored
Prepare for release (#14)
1 parent a2245fa commit e5df53d

12 files changed

+162
-68
lines changed

README.md

+52-12
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,59 @@
11
# dynamodbcopy
2+
23
[![Build Status](https://travis-ci.org/uniplaces/dynamodbcopy.svg?branch=master)](https://travis-ci.org/uniplaces/dynamodbcopy)
34
[![Go Report Card](https://goreportcard.com/badge/github.com/uniplaces/dynamodbcopy)](https://goreportcard.com/report/github.com/uniplaces/dynamodbcopy)
45
[![codecov](https://codecov.io/gh/uniplaces/dynamodbcopy/branch/master/graph/badge.svg)](https://codecov.io/gh/uniplaces/dynamodbcopy)
56
[![GoDoc](https://godoc.org/github.com/uniplaces/dynamodbcopy?status.svg)](https://godoc.org/github.com/uniplaces/dynamodbcopy)
67
[![License](http://img.shields.io/:license-apache-blue.svg)](http://www.apache.org/licenses/LICENSE-2.0.html)
78

8-
## Development
9-
To build and run this cmd, you'll need go (1.11.x) with mod support enabled `export GO111MODULE=on`
10-
11-
### Running
12-
To run:
13-
```
14-
go run cmd/main.go
15-
```
16-
Alternatively, you can easily build the binary by:
17-
```
18-
go build -o dynamodbcopy cmd/main.go
19-
```
9+
Dynamodbcopy is a cli tool wrapper around the [aws-sdk](https://github.com/aws/aws-sdk-go) that allows you to copy information from one dynamodb table to another.
10+
11+
## Main Features
12+
13+
- Provides a CLI to easily copy dynamodb records from one place to another
14+
- Allows you to set read and write capacity units for the source and target table
15+
- Integrates with [aws-sdk](https://github.com/aws/aws-sdk-go), sharing it's credentials
16+
- Allows you to parameterize the source and target table with specific roles, enabling you to perform cross-account copies
17+
- Stores current provisioning values before performing a copy, restoring the inital values at the end of the copy or if any error occurs during the copy.
18+
19+
## Usage
20+
21+
> Use "dynamodbcopy [command] --help" for more information about a command.
22+
23+
## Installing
24+
25+
Use go get to retrieve `dynamodbcopy` to add it to your GOPATH workspace, or project's Go module dependencies.
26+
27+
> go get github.com/uniplaces/dynamodbcopy/cmd/dynamodbcopy
28+
29+
To update run with `-u`
30+
31+
> go get -u github.com/uniplaces/dynamodbcopy/cmd/dynamodbcopy
32+
33+
### Go Modules
34+
35+
If you are using Go modules, your go get will default to the latest tagged version. To get a specific release version of the `dynamodbcopy` use `@<tag>` in your go get command.
36+
37+
> go get github.com/uniplaces/dynamodbcopy/cmd/[email protected]
38+
39+
To get the latest repository change use `@latest` tag.
40+
41+
> go get github.com/uniplaces/dynamodbcopy/cmd/dynamodbcopy@latest
42+
43+
## Opening Issues
44+
45+
If you encounter a bug, please start by searching the existing issues and see if others are also experiencing the issue before opening a new one. Please include the version for `dynamodbcopy` and Go that you are using. Please also include reproduction case when appropriate.
46+
47+
## Contributing
48+
49+
Please feel free to make suggestions, create issues, fork the repository and send pull requests!
50+
51+
## Licence
52+
53+
Copyright 2018 UNIPLACES
54+
55+
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
56+
57+
http://www.apache.org/licenses/LICENSE-2.0
58+
59+
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
File renamed without changes.

config.go

+5
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package dynamodbcopy
22

3+
// Config encapsulates the values nedeed for the command
34
type Config struct {
45
readCapacityUnits int64
56
writeCapacityUnits int64
67
readWorkers int
78
writeWorkers int
89
}
910

11+
// NewConfig creates a new Config to store the parameters user defined parameters
1012
func NewConfig(readUnits, writeUnits, readWorkers, writeWorkers int) Config {
1113
return Config{
1214
readCapacityUnits: int64(readUnits),
@@ -16,6 +18,8 @@ func NewConfig(readUnits, writeUnits, readWorkers, writeWorkers int) Config {
1618
}
1719
}
1820

21+
// Provisioning calculates a new Provisioning value based on the passed argument and the current Config (receiver).
22+
// The returned Provisioning value will have the higher values for read and write capacity units of the 2
1923
func (c Config) Provisioning(current Provisioning) Provisioning {
2024
src := current.Source
2125
if src != nil && c.readCapacityUnits > src.Read {
@@ -30,6 +34,7 @@ func (c Config) Provisioning(current Provisioning) Provisioning {
3034
return Provisioning{Source: src, Target: trg}
3135
}
3236

37+
// Workers returns the Config read and write worker count
3338
func (c Config) Workers() (int, int) {
3439
return c.readWorkers, c.writeWorkers
3540
}

copier.go

+19-14
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,33 @@ import (
55
"sync"
66
)
77

8+
// Copier is the interface that allows you to copy records from the source to target table
89
type Copier interface {
910
Copy(readers, writers int) error
1011
}
1112

1213
type copyService struct {
13-
srcTable DynamoDBService
14-
trgTable DynamoDBService
15-
chans CopierChans
16-
logger Logger
14+
srcTable DynamoDBService
15+
trgTable DynamoDBService
16+
copierChan CopierChan
17+
logger Logger
1718
}
1819

19-
func NewCopier(srcTableService, trgTableService DynamoDBService, chans CopierChans, logger Logger) Copier {
20+
// NewCopier returns a new Copier to copy records
21+
func NewCopier(srcTableService, trgTableService DynamoDBService, copierChan CopierChan, logger Logger) Copier {
2022
return copyService{
21-
srcTable: srcTableService,
22-
trgTable: trgTableService,
23-
chans: chans,
24-
logger: logger,
23+
srcTable: srcTableService,
24+
trgTable: trgTableService,
25+
copierChan: copierChan,
26+
logger: logger,
2527
}
2628
}
2729

30+
// Copy will copy all records from the source to target table.
31+
// This method will create a worker pool according to the number of readers and writes that are passed as argument
2832
func (service copyService) Copy(readers, writers int) error {
2933
service.logger.Printf("copying table with %d readers and %d writers", readers, writers)
30-
itemsChan, errChan := service.chans.Items, service.chans.Errors
34+
itemsChan, errChan := service.copierChan.Items, service.copierChan.Errors
3135

3236
wgReaders := &sync.WaitGroup{}
3337
wgReaders.Add(readers)
@@ -93,14 +97,15 @@ func (service copyService) write(wg *sync.WaitGroup, itemsChan <-chan []DynamoDB
9397
service.logger.Printf("writer wrote a total of %d items", totalWritten)
9498
}
9599

96-
// CopierChans encapsulates the chan that are used by the copier
97-
type CopierChans struct {
100+
// CopierChan encapsulates the value and error channel used by the copier
101+
type CopierChan struct {
98102
Items chan []DynamoDBItem
99103
Errors chan error
100104
}
101105

102-
func NewCopierChans(itemsChanSize int) CopierChans {
103-
return CopierChans{
106+
// NewCopierChan creates a new CopierChan with a buffered chan []DynamoDBItem of itemsChanSize
107+
func NewCopierChan(itemsChanSize int) CopierChan {
108+
return CopierChan{
104109
Items: make(chan []DynamoDBItem, itemsChanSize),
105110
Errors: make(chan error),
106111
}

copier_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@ func TestCopy(t *testing.T) {
2323

2424
testCases := []struct {
2525
subTestName string
26-
mocker func(src, trg *mocks.DynamoDBService, chans *dynamodbcopy.CopierChans)
26+
mocker func(src, trg *mocks.DynamoDBService, chans *dynamodbcopy.CopierChan)
2727
totalReaders int
2828
totalWriters int
2929
expectedError error
3030
}{
3131
{
3232
"ScanError",
33-
func(src, trg *mocks.DynamoDBService, chans *dynamodbcopy.CopierChans) {
33+
func(src, trg *mocks.DynamoDBService, chans *dynamodbcopy.CopierChan) {
3434
var readChan chan<- []dynamodbcopy.DynamoDBItem = chans.Items
3535
src.On("Scan", 1, 0, readChan).Return(scanError).Once()
3636
},
@@ -40,7 +40,7 @@ func TestCopy(t *testing.T) {
4040
},
4141
{
4242
"BatchWriteError",
43-
func(src, trg *mocks.DynamoDBService, chans *dynamodbcopy.CopierChans) {
43+
func(src, trg *mocks.DynamoDBService, chans *dynamodbcopy.CopierChan) {
4444
var readChan chan<- []dynamodbcopy.DynamoDBItem = chans.Items
4545
src.On("Scan", 1, 0, readChan).Return(nil).Once()
4646

@@ -54,7 +54,7 @@ func TestCopy(t *testing.T) {
5454
},
5555
{
5656
"Success",
57-
func(src, trg *mocks.DynamoDBService, chans *dynamodbcopy.CopierChans) {
57+
func(src, trg *mocks.DynamoDBService, chans *dynamodbcopy.CopierChan) {
5858
var readChan chan<- []dynamodbcopy.DynamoDBItem = chans.Items
5959
src.On("Scan", 1, 0, readChan).Return(nil).Once()
6060

@@ -68,7 +68,7 @@ func TestCopy(t *testing.T) {
6868
},
6969
{
7070
"MultipleWorkers",
71-
func(src, trg *mocks.DynamoDBService, chans *dynamodbcopy.CopierChans) {
71+
func(src, trg *mocks.DynamoDBService, chans *dynamodbcopy.CopierChan) {
7272
var readChan chan<- []dynamodbcopy.DynamoDBItem = chans.Items
7373
src.On("Scan", 3, 0, readChan).Return(nil).Once()
7474
src.On("Scan", 3, 1, readChan).Return(nil).Once()
@@ -92,7 +92,7 @@ func TestCopy(t *testing.T) {
9292
},
9393
{
9494
"ReadPanic",
95-
func(src, trg *mocks.DynamoDBService, chans *dynamodbcopy.CopierChans) {
95+
func(src, trg *mocks.DynamoDBService, chans *dynamodbcopy.CopierChan) {
9696
var readChan chan<- []dynamodbcopy.DynamoDBItem = chans.Items
9797
src.On("Scan", 1, 0, readChan).Run(func(args mock.Arguments) {
9898
panic("read panic")
@@ -104,7 +104,7 @@ func TestCopy(t *testing.T) {
104104
},
105105
{
106106
"WritePanic",
107-
func(src, trg *mocks.DynamoDBService, chans *dynamodbcopy.CopierChans) {
107+
func(src, trg *mocks.DynamoDBService, chans *dynamodbcopy.CopierChan) {
108108
var readChan chan<- []dynamodbcopy.DynamoDBItem = chans.Items
109109
src.On("Scan", 1, 0, readChan).Return(nil).Once()
110110

@@ -127,7 +127,7 @@ func TestCopy(t *testing.T) {
127127
src := &mocks.DynamoDBService{}
128128
trg := &mocks.DynamoDBService{}
129129

130-
copierChans := dynamodbcopy.NewCopierChans(testCase.totalWriters)
130+
copierChans := dynamodbcopy.NewCopierChan(testCase.totalWriters)
131131

132132
testCase.mocker(src, trg, &copierChans)
133133

doc.go

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
// Package dynamodbcopy provides a simple interface for you to copy info across dynamodb tables
2+
package dynamodbcopy

dynamodb.go

+48-24
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,39 @@ const (
2020
errCodeThrottlingException = "ThrottlingException"
2121
)
2222

23-
// DynamoDBAPI just a wrapper over aws-sdk dynamodbiface.DynamoDBAPI interface for mocking purposes
24-
type DynamoDBAPI interface {
23+
// DynamoDBClient is a wrapper interface over aws-sdk dynamodbiface.DynamoDBClient for mocking purposes
24+
type DynamoDBClient interface {
2525
dynamodbiface.DynamoDBAPI
2626
}
2727

28+
// NewDynamoClient creates a DynamoDB client wrapper around the AWS-SDK with a predefined Session.
29+
// By default, it creates a new Session with SharedConfigEnable,
30+
// so you can use AWS SDK's environment variables and AWS credentials to connect to DynamoDB.
31+
//
32+
// The provided ARN role allows you to configure the Session to assume a specific IAM Role.
33+
//
34+
// If an empty string is provided, it will create a new session with SharedConfigEnable
35+
// Please refer to https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html for more
36+
// information on how you can set up the SDK
37+
func NewDynamoClient(roleArn string) DynamoDBClient {
38+
options := session.Options{
39+
SharedConfigState: session.SharedConfigEnable,
40+
}
41+
42+
currentSession := session.Must(session.NewSessionWithOptions(options))
43+
if roleArn != "" {
44+
roleCredentials := stscreds.NewCredentials(currentSession, roleArn)
45+
46+
return dynamodb.New(currentSession, &aws.Config{Credentials: roleCredentials})
47+
}
48+
49+
return dynamodb.New(currentSession)
50+
}
51+
52+
// DynamoDBItem type to abstract a DynamoDB item
2853
type DynamoDBItem map[string]*dynamodb.AttributeValue
2954

55+
// DynamoDBService interface provides methods to call the aws sdk
3056
type DynamoDBService interface {
3157
DescribeTable() (*dynamodb.TableDescription, error)
3258
UpdateCapacity(capacity Capacity) error
@@ -37,43 +63,31 @@ type DynamoDBService interface {
3763

3864
type dynamoDBSerivce struct {
3965
tableName string
40-
api DynamoDBAPI
66+
client DynamoDBClient
4167
sleep Sleeper
4268
logger Logger
4369
}
4470

45-
func NewDynamoDBAPI(roleArn string) DynamoDBAPI {
46-
options := session.Options{
47-
SharedConfigState: session.SharedConfigEnable,
48-
}
49-
50-
currentSession := session.Must(session.NewSessionWithOptions(options))
51-
if roleArn != "" {
52-
roleCredentials := stscreds.NewCredentials(currentSession, roleArn)
53-
54-
return dynamodb.New(currentSession, &aws.Config{Credentials: roleCredentials})
55-
}
56-
57-
return dynamodb.New(currentSession)
58-
}
59-
60-
func NewDynamoDBService(tableName string, api DynamoDBAPI, sleepFn Sleeper, logger Logger) DynamoDBService {
61-
return dynamoDBSerivce{tableName, api, sleepFn, logger}
71+
// NewDynamoDBService creates new service for a given DynamoDB table with a previously configured DynamoDB client
72+
func NewDynamoDBService(tableName string, client DynamoDBClient, sleepFn Sleeper, logger Logger) DynamoDBService {
73+
return dynamoDBSerivce{tableName, client, sleepFn, logger}
6274
}
6375

76+
// DescribeTable returns the current table metadata for the DynamoDB table
6477
func (db dynamoDBSerivce) DescribeTable() (*dynamodb.TableDescription, error) {
6578
input := &dynamodb.DescribeTableInput{
6679
TableName: aws.String(db.tableName),
6780
}
6881

69-
output, err := db.api.DescribeTable(input)
82+
output, err := db.client.DescribeTable(input)
7083
if err != nil {
7184
return nil, fmt.Errorf("unable to describe table %s: %s", db.tableName, err)
7285
}
7386

7487
return output.Table, nil
7588
}
7689

90+
// UpdateCapacity sets the tables read and write capacity, waiting for the table to be ready for processing
7791
func (db dynamoDBSerivce) UpdateCapacity(capacity Capacity) error {
7892
read := capacity.Read
7993
write := capacity.Write
@@ -95,14 +109,21 @@ func (db dynamoDBSerivce) UpdateCapacity(capacity Capacity) error {
95109
}
96110

97111
db.logger.Printf("updating %s with read: %d, write: %d", db.tableName, read, write)
98-
_, err := db.api.UpdateTable(input)
112+
_, err := db.client.UpdateTable(input)
99113
if err != nil {
100114
return fmt.Errorf("unable to update table %s: %s", db.tableName, err)
101115
}
102116

103117
return db.WaitForReadyTable()
104118
}
105119

120+
// BatchWrite writes the given DynamoDBItem slice into the DynamoDB table.
121+
//
122+
// The given items will be written in groups of 25 each.
123+
//
124+
// This method will retry:
125+
// 1 - if there are any any unprocessed items when performing the BatchWrite
126+
// 2 - if there is a Provisioning or Throttling aws error (tries for a max time of 3 minutes)
106127
func (db dynamoDBSerivce) BatchWrite(items []DynamoDBItem) error {
107128
db.logger.Printf("writing batch of %d to %s", len(items), db.tableName)
108129
if len(items) == 0 {
@@ -142,7 +163,7 @@ func (db dynamoDBSerivce) batchWriteItem(requests []*dynamodb.WriteRequest) erro
142163
},
143164
}
144165

145-
output, err := db.api.BatchWriteItem(batchInput)
166+
output, err := db.client.BatchWriteItem(batchInput)
146167
if err == nil {
147168
writeRequests = output.UnprocessedItems[tableName]
148169

@@ -178,6 +199,7 @@ func (db dynamoDBSerivce) batchWriteItem(requests []*dynamodb.WriteRequest) erro
178199
return nil
179200
}
180201

202+
// WaitForReadyTable will wait for the table status to be active (waits for 3 minutes)
181203
func (db dynamoDBSerivce) WaitForReadyTable() error {
182204
return db.retry(func(attempt, elapsed int) (bool, error) {
183205
description, err := db.DescribeTable()
@@ -207,6 +229,8 @@ func (db dynamoDBSerivce) retry(handler func(attempt, elapsed int) (bool, error)
207229
return fmt.Errorf("waited for too long (%d ms) to perform operation on %s table", elapsed, db.tableName)
208230
}
209231

232+
// Scan allows you to perform a parallel scan over the table, writing the scanned items into the provided itemsChan
233+
// If totalSegments is equal to 1, it will perform a sequential scan.
210234
func (db dynamoDBSerivce) Scan(totalSegments, segment int, itemsChan chan<- []DynamoDBItem) error {
211235
if totalSegments == 0 {
212236
return errors.New("totalSegments has to be greater than 0")
@@ -235,7 +259,7 @@ func (db dynamoDBSerivce) Scan(totalSegments, segment int, itemsChan chan<- []Dy
235259
return !b
236260
}
237261

238-
if err := db.api.ScanPages(&input, pagerFn); err != nil {
262+
if err := db.client.ScanPages(&input, pagerFn); err != nil {
239263
return fmt.Errorf("unable to scan table %s: %s", db.tableName, err)
240264
}
241265

0 commit comments

Comments
 (0)