2
2
3
3
import static com .linkedin .metadata .Constants .ASPECT_LATEST_VERSION ;
4
4
5
+ import com .google .common .annotations .VisibleForTesting ;
5
6
import com .linkedin .datahub .upgrade .UpgradeContext ;
6
7
import com .linkedin .datahub .upgrade .UpgradeStep ;
7
8
import com .linkedin .datahub .upgrade .UpgradeStepResult ;
@@ -80,9 +81,11 @@ private List<RestoreIndicesResult> iterateFutures(List<Future<RestoreIndicesResu
80
81
if (future .isDone ()) {
81
82
try {
82
83
result .add (future .get ());
83
- futures .remove (future );
84
84
} catch (InterruptedException | ExecutionException e ) {
85
85
log .error ("Error iterating futures" , e );
86
+ result .add (null ); // add null to indicate failure
87
+ } finally {
88
+ futures .remove (future );
86
89
}
87
90
}
88
91
}
@@ -101,6 +104,7 @@ private RestoreIndicesArgs getArgs(UpgradeContext context) {
101
104
result .batchDelayMs = getBatchDelayMs (context .parsedArgs ());
102
105
result .start = getStartingOffset (context .parsedArgs ());
103
106
result .urnBasedPagination = getUrnBasedPagination (context .parsedArgs ());
107
+ result .createDefaultAspects = getCreateDefaultAspects (context .parsedArgs ());
104
108
if (containsKey (context .parsedArgs (), RestoreIndices .ASPECT_NAME_ARG_NAME )) {
105
109
result .aspectName = context .parsedArgs ().get (RestoreIndices .ASPECT_NAME_ARG_NAME ).get ();
106
110
context .report ().addLine (String .format ("aspect is %s" , result .aspectName ));
@@ -151,7 +155,8 @@ private RestoreIndicesArgs getArgs(UpgradeContext context) {
151
155
return result ;
152
156
}
153
157
154
- private int getRowCount (RestoreIndicesArgs args ) {
158
+ @ VisibleForTesting
159
+ int getRowCount (RestoreIndicesArgs args ) {
155
160
ExpressionList <EbeanAspectV2 > countExp =
156
161
_server
157
162
.find (EbeanAspectV2 .class )
@@ -233,6 +238,10 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
233
238
while (futures .size () > 0 ) {
234
239
List <RestoreIndicesResult > tmpResults = iterateFutures (futures );
235
240
for (RestoreIndicesResult tmpResult : tmpResults ) {
241
+ if (tmpResult == null ) {
242
+ // return error if there was an error processing a future
243
+ return new DefaultUpgradeStepResult (id (), DataHubUpgradeState .FAILED );
244
+ }
236
245
reportStats (context , finalJobResult , tmpResult , rowCount , startTime );
237
246
}
238
247
}
@@ -315,6 +324,16 @@ private long getBatchDelayMs(final Map<String, Optional<String>> parsedArgs) {
315
324
return resolvedBatchDelayMs ;
316
325
}
317
326
327
+ private boolean getCreateDefaultAspects (final Map <String , Optional <String >> parsedArgs ) {
328
+ Boolean createDefaultAspects = null ;
329
+ if (containsKey (parsedArgs , RestoreIndices .CREATE_DEFAULT_ASPECTS_ARG_NAME )) {
330
+ createDefaultAspects =
331
+ Boolean .parseBoolean (
332
+ parsedArgs .get (RestoreIndices .CREATE_DEFAULT_ASPECTS_ARG_NAME ).get ());
333
+ }
334
+ return createDefaultAspects != null ? createDefaultAspects : false ;
335
+ }
336
+
318
337
private int getThreadCount (final Map <String , Optional <String >> parsedArgs ) {
319
338
return getInt (parsedArgs , DEFAULT_THREADS , RestoreIndices .NUM_THREADS_ARG_NAME );
320
339
}
0 commit comments