|
2 | 2 |
|
3 | 3 | import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION;
|
4 | 4 |
|
| 5 | +import com.google.common.annotations.VisibleForTesting; |
5 | 6 | import com.linkedin.datahub.upgrade.UpgradeContext;
|
6 | 7 | import com.linkedin.datahub.upgrade.UpgradeStep;
|
7 | 8 | import com.linkedin.datahub.upgrade.UpgradeStepResult;
|
@@ -80,9 +81,11 @@ private List<RestoreIndicesResult> iterateFutures(List<Future<RestoreIndicesResu
|
80 | 81 | if (future.isDone()) {
|
81 | 82 | try {
|
82 | 83 | result.add(future.get());
|
83 |
| - futures.remove(future); |
84 | 84 | } catch (InterruptedException | ExecutionException e) {
|
85 | 85 | log.error("Error iterating futures", e);
|
| 86 | + result.add(null); // add null to indicate failure |
| 87 | + } finally { |
| 88 | + futures.remove(future); |
86 | 89 | }
|
87 | 90 | }
|
88 | 91 | }
|
@@ -152,7 +155,8 @@ private RestoreIndicesArgs getArgs(UpgradeContext context) {
|
152 | 155 | return result;
|
153 | 156 | }
|
154 | 157 |
|
155 |
| - private int getRowCount(RestoreIndicesArgs args) { |
| 158 | + @VisibleForTesting |
| 159 | + int getRowCount(RestoreIndicesArgs args) { |
156 | 160 | ExpressionList<EbeanAspectV2> countExp =
|
157 | 161 | _server
|
158 | 162 | .find(EbeanAspectV2.class)
|
@@ -234,6 +238,10 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
|
234 | 238 | while (futures.size() > 0) {
|
235 | 239 | List<RestoreIndicesResult> tmpResults = iterateFutures(futures);
|
236 | 240 | for (RestoreIndicesResult tmpResult : tmpResults) {
|
| 241 | + if (tmpResult == null) { |
| 242 | + // return error if there was an error processing a future |
| 243 | + return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED); |
| 244 | + } |
237 | 245 | reportStats(context, finalJobResult, tmpResult, rowCount, startTime);
|
238 | 246 | }
|
239 | 247 | }
|
|
0 commit comments