Skip to content

Commit a2245fa

Browse files
author
Hugo Correia
authored
Improve error handling (#12)
* Add recovery for panics in goroutines * Add debug logging support * Add retry on provisioning and throttling errors while writing to dynamodb
1 parent 2930245 commit a2245fa

12 files changed

+333
-44
lines changed

copier.go

+25-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package dynamodbcopy
22

3-
import "sync"
3+
import (
4+
"fmt"
5+
"sync"
6+
)
47

58
type Copier interface {
69
Copy(readers, writers int) error
@@ -10,17 +13,20 @@ type copyService struct {
1013
srcTable DynamoDBService
1114
trgTable DynamoDBService
1215
chans CopierChans
16+
logger Logger
1317
}
1418

15-
func NewCopier(srcTableService, trgTableService DynamoDBService, chans CopierChans) Copier {
19+
func NewCopier(srcTableService, trgTableService DynamoDBService, chans CopierChans, logger Logger) Copier {
1620
return copyService{
1721
srcTable: srcTableService,
1822
trgTable: trgTableService,
1923
chans: chans,
24+
logger: logger,
2025
}
2126
}
2227

2328
func (service copyService) Copy(readers, writers int) error {
29+
service.logger.Printf("copying table with %d readers and %d writers", readers, writers)
2430
itemsChan, errChan := service.chans.Items, service.chans.Errors
2531

2632
wgReaders := &sync.WaitGroup{}
@@ -54,7 +60,12 @@ func (service copyService) read(
5460
itemsChan chan<- []DynamoDBItem,
5561
errChan chan<- error,
5662
) {
57-
defer wg.Done()
63+
defer func() {
64+
if err := recover(); err != nil {
65+
errChan <- fmt.Errorf("read recovery: %s", err)
66+
}
67+
wg.Done()
68+
}()
5869

5970
err := service.srcTable.Scan(totalReaders, readerID, itemsChan)
6071
if err != nil {
@@ -63,13 +74,23 @@ func (service copyService) read(
6374
}
6475

6576
func (service copyService) write(wg *sync.WaitGroup, itemsChan <-chan []DynamoDBItem, errChan chan<- error) {
66-
defer wg.Done()
77+
defer func() {
78+
if err := recover(); err != nil {
79+
errChan <- fmt.Errorf("write recovery: %s", err)
80+
}
81+
wg.Done()
82+
}()
6783

84+
totalWritten := 0
6885
for items := range itemsChan {
6986
if err := service.trgTable.BatchWrite(items); err != nil {
7087
errChan <- err
7188
}
89+
90+
totalWritten += len(items)
7291
}
92+
93+
service.logger.Printf("writer wrote a total of %d items", totalWritten)
7394
}
7495

7596
// CopierChans encapsulates the chan that are used by the copier

copier_test.go

+32-1
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package dynamodbcopy_test
22

33
import (
44
"errors"
5+
"io/ioutil"
6+
"log"
57
"strconv"
68
"testing"
79

810
"github.com/aws/aws-sdk-go/aws"
911
"github.com/aws/aws-sdk-go/service/dynamodb"
1012
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/mock"
1114
"github.com/uniplaces/dynamodbcopy"
1215
"github.com/uniplaces/dynamodbcopy/mocks"
1316
)
@@ -87,6 +90,34 @@ func TestCopy(t *testing.T) {
8790
3,
8891
nil,
8992
},
93+
{
94+
"ReadPanic",
95+
func(src, trg *mocks.DynamoDBService, chans *dynamodbcopy.CopierChans) {
96+
var readChan chan<- []dynamodbcopy.DynamoDBItem = chans.Items
97+
src.On("Scan", 1, 0, readChan).Run(func(args mock.Arguments) {
98+
panic("read panic")
99+
}).Once()
100+
},
101+
1,
102+
1,
103+
errors.New("read recovery: read panic"),
104+
},
105+
{
106+
"WritePanic",
107+
func(src, trg *mocks.DynamoDBService, chans *dynamodbcopy.CopierChans) {
108+
var readChan chan<- []dynamodbcopy.DynamoDBItem = chans.Items
109+
src.On("Scan", 1, 0, readChan).Return(nil).Once()
110+
111+
items := buildItems(1)
112+
chans.Items <- items
113+
trg.On("BatchWrite", items).Run(func(args mock.Arguments) {
114+
panic("write panic")
115+
}).Once()
116+
},
117+
1,
118+
1,
119+
errors.New("write recovery: write panic"),
120+
},
90121
}
91122

92123
for _, testCase := range testCases {
@@ -100,7 +131,7 @@ func TestCopy(t *testing.T) {
100131

101132
testCase.mocker(src, trg, &copierChans)
102133

103-
service := dynamodbcopy.NewCopier(src, trg, copierChans)
134+
service := dynamodbcopy.NewCopier(src, trg, copierChans, log.New(ioutil.Discard, "", log.Ltime))
104135

105136
err := service.Copy(testCase.totalReaders, testCase.totalWriters)
106137

dynamodb.go

+69-18
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,22 @@ package dynamodbcopy
33
import (
44
"errors"
55
"fmt"
6+
"time"
67

78
"github.com/aws/aws-sdk-go/aws"
9+
"github.com/aws/aws-sdk-go/aws/awserr"
810
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
911
"github.com/aws/aws-sdk-go/aws/session"
1012
"github.com/aws/aws-sdk-go/service/dynamodb"
1113
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
1214
)
1315

14-
const maxBatchWriteSize = 25
16+
const (
17+
maxBatchWriteSize = 25
18+
maxRetryTime = int(time.Minute) * 3
19+
20+
errCodeThrottlingException = "ThrottlingException"
21+
)
1522

1623
// DynamoDBAPI just a wrapper over aws-sdk dynamodbiface.DynamoDBAPI interface for mocking purposes
1724
type DynamoDBAPI interface {
@@ -32,6 +39,7 @@ type dynamoDBSerivce struct {
3239
tableName string
3340
api DynamoDBAPI
3441
sleep Sleeper
42+
logger Logger
3543
}
3644

3745
func NewDynamoDBAPI(roleArn string) DynamoDBAPI {
@@ -49,8 +57,8 @@ func NewDynamoDBAPI(roleArn string) DynamoDBAPI {
4957
return dynamodb.New(currentSession)
5058
}
5159

52-
func NewDynamoDBService(tableName string, api DynamoDBAPI, sleepFn Sleeper) DynamoDBService {
53-
return dynamoDBSerivce{tableName, api, sleepFn}
60+
func NewDynamoDBService(tableName string, api DynamoDBAPI, sleepFn Sleeper, logger Logger) DynamoDBService {
61+
return dynamoDBSerivce{tableName, api, sleepFn, logger}
5462
}
5563

5664
func (db dynamoDBSerivce) DescribeTable() (*dynamodb.TableDescription, error) {
@@ -86,6 +94,7 @@ func (db dynamoDBSerivce) UpdateCapacity(capacity Capacity) error {
8694
},
8795
}
8896

97+
db.logger.Printf("updating %s with read: %d, write: %d", db.tableName, read, write)
8998
_, err := db.api.UpdateTable(input)
9099
if err != nil {
91100
return fmt.Errorf("unable to update table %s: %s", db.tableName, err)
@@ -95,6 +104,7 @@ func (db dynamoDBSerivce) UpdateCapacity(capacity Capacity) error {
95104
}
96105

97106
func (db dynamoDBSerivce) BatchWrite(items []DynamoDBItem) error {
107+
db.logger.Printf("writing batch of %d to %s", len(items), db.tableName)
98108
if len(items) == 0 {
99109
return nil
100110
}
@@ -125,40 +135,76 @@ func (db dynamoDBSerivce) batchWriteItem(requests []*dynamodb.WriteRequest) erro
125135

126136
writeRequests := requests
127137
for len(writeRequests) != 0 {
128-
batchInput := &dynamodb.BatchWriteItemInput{
129-
RequestItems: map[string][]*dynamodb.WriteRequest{
130-
tableName: writeRequests,
131-
},
132-
}
138+
retryHandler := func(attempt, elapsed int) (bool, error) {
139+
batchInput := &dynamodb.BatchWriteItemInput{
140+
RequestItems: map[string][]*dynamodb.WriteRequest{
141+
tableName: writeRequests,
142+
},
143+
}
133144

134-
output, err := db.api.BatchWriteItem(batchInput)
135-
if err != nil {
136-
return fmt.Errorf("unable to batch write to table %s: %s", db.tableName, err)
145+
output, err := db.api.BatchWriteItem(batchInput)
146+
if err == nil {
147+
writeRequests = output.UnprocessedItems[tableName]
148+
149+
return true, nil
150+
}
151+
152+
if awsErr, ok := err.(awserr.Error); ok {
153+
switch awsErr.Code() {
154+
case dynamodb.ErrCodeProvisionedThroughputExceededException:
155+
db.logger.Printf("batch write provisioning error: waited %d ms (attempt %d)", elapsed, attempt)
156+
return false, nil
157+
case errCodeThrottlingException:
158+
db.logger.Printf("batch write throttling error: waited %d ms (attempt %d)", elapsed, attempt)
159+
return false, nil
160+
default:
161+
return false, fmt.Errorf(
162+
"aws %s error in batch write to table %s: %s",
163+
awsErr.Code(),
164+
db.tableName,
165+
awsErr.Error(),
166+
)
167+
}
168+
}
169+
170+
return false, fmt.Errorf("unable to batch write to table %s: %s", db.tableName, err)
137171
}
138172

139-
writeRequests = output.UnprocessedItems[tableName]
173+
if err := db.retry(retryHandler); err != nil {
174+
return err
175+
}
140176
}
141177

142178
return nil
143179
}
144180

145181
func (db dynamoDBSerivce) WaitForReadyTable() error {
146-
elapsed := 0
147-
148-
for attempt := 0; ; attempt++ {
182+
return db.retry(func(attempt, elapsed int) (bool, error) {
149183
description, err := db.DescribeTable()
184+
if err != nil {
185+
return false, err
186+
}
187+
188+
return *description.TableStatus == dynamodb.TableStatusActive, nil
189+
})
190+
}
191+
192+
func (db dynamoDBSerivce) retry(handler func(attempt, elapsed int) (bool, error)) error {
193+
elapsed := 0
194+
for attempt := 0; elapsed < maxRetryTime; attempt++ {
195+
handled, err := handler(attempt, elapsed)
150196
if err != nil {
151197
return err
152198
}
153199

154-
if *description.TableStatus == dynamodb.TableStatusActive {
155-
break
200+
if handled {
201+
return nil
156202
}
157203

158204
elapsed += db.sleep(elapsed * attempt)
159205
}
160206

161-
return nil
207+
return fmt.Errorf("waited for too long (%d ms) to perform operation on %s table", elapsed, db.tableName)
162208
}
163209

164210
func (db dynamoDBSerivce) Scan(totalSegments, segment int, itemsChan chan<- []DynamoDBItem) error {
@@ -175,11 +221,14 @@ func (db dynamoDBSerivce) Scan(totalSegments, segment int, itemsChan chan<- []Dy
175221
input.SetTotalSegments(int64(totalSegments))
176222
}
177223

224+
totalScanned := 0
178225
pagerFn := func(output *dynamodb.ScanOutput, b bool) bool {
179226
var items []DynamoDBItem
180227
for _, item := range output.Items {
181228
items = append(items, item)
229+
totalScanned++
182230
}
231+
db.logger.Printf("%s table scanned page with %d items (reader %d)", db.tableName, len(items), segment)
183232

184233
itemsChan <- items
185234

@@ -190,5 +239,7 @@ func (db dynamoDBSerivce) Scan(totalSegments, segment int, itemsChan chan<- []Dy
190239
return fmt.Errorf("unable to scan table %s: %s", db.tableName, err)
191240
}
192241

242+
db.logger.Printf("%s table scanned a total of %d items (reader %d)", db.tableName, totalScanned, segment)
243+
193244
return nil
194245
}

0 commit comments

Comments
 (0)