1
1
package com .linkedin .datahub .upgrade .system .entity .steps ;
2
2
3
+ import static com .linkedin .metadata .Constants .*;
4
+
3
5
import com .google .common .collect .ImmutableList ;
4
6
import com .linkedin .common .AuditStamp ;
5
7
import com .linkedin .common .urn .Urn ;
34
36
import lombok .extern .slf4j .Slf4j ;
35
37
import org .jetbrains .annotations .NotNull ;
36
38
37
- import static com . linkedin . metadata . Constants .*;
38
-
39
-
40
- /** This bootstrap step is responsible for upgrading DataHub policy documents with new searchable fields in ES */
39
+ /**
40
+ * This bootstrap step is responsible for upgrading DataHub policy documents with new searchable
41
+ * fields in ES
42
+ */
41
43
@ Slf4j
42
44
public class BackfillPolicyFieldsStep implements UpgradeStep {
43
45
private static final String UPGRADE_ID = "BackfillPolicyFieldsStep" ;
@@ -47,7 +49,9 @@ public class BackfillPolicyFieldsStep implements UpgradeStep {
47
49
private final EntityService <?> entityService ;
48
50
private final SearchService _searchService ;
49
51
50
- public BackfillPolicyFieldsStep (EntityService <?> entityService , SearchService searchService ,
52
+ public BackfillPolicyFieldsStep (
53
+ EntityService <?> entityService ,
54
+ SearchService searchService ,
51
55
boolean reprocessEnabled ,
52
56
Integer batchSize ) {
53
57
this .entityService = entityService ;
@@ -74,8 +78,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
74
78
do {
75
79
log .info (
76
80
String .format (
77
- "Upgrading batch of policies %s-%s" ,
78
- migratedCount , migratedCount + batchSize ));
81
+ "Upgrading batch of policies %s-%s" , migratedCount , migratedCount + batchSize ));
79
82
scrollId = backfillPolicies (auditStamp , scrollId );
80
83
migratedCount += batchSize ;
81
84
} while (scrollId != null );
@@ -169,8 +172,9 @@ private Filter backfillPolicyFieldFilter() {
169
172
private void ingestPolicyFields (Urn urn , AuditStamp auditStamp ) {
170
173
EntityResponse entityResponse = null ;
171
174
try {
172
- entityResponse = entityService .getEntityV2 (
173
- urn .getEntityType (), urn , Collections .singleton (DATAHUB_POLICY_INFO_ASPECT_NAME ));
175
+ entityResponse =
176
+ entityService .getEntityV2 (
177
+ urn .getEntityType (), urn , Collections .singleton (DATAHUB_POLICY_INFO_ASPECT_NAME ));
174
178
} catch (URISyntaxException e ) {
175
179
log .error (
176
180
String .format (
@@ -181,16 +185,20 @@ private void ingestPolicyFields(Urn urn, AuditStamp auditStamp) {
181
185
182
186
if (entityResponse != null
183
187
&& entityResponse .getAspects ().containsKey (DATAHUB_POLICY_INFO_ASPECT_NAME )) {
184
- final DataMap dataMap = entityResponse .getAspects ().get (DATAHUB_POLICY_INFO_ASPECT_NAME ).getValue ().data ();
188
+ final DataMap dataMap =
189
+ entityResponse .getAspects ().get (DATAHUB_POLICY_INFO_ASPECT_NAME ).getValue ().data ();
185
190
final DataHubPolicyInfo infoAspect = new DataHubPolicyInfo (dataMap );
186
- log .debug (String .format ("Restating policy information for urn %s with value %s" , urn , infoAspect ));
191
+ log .debug (
192
+ String .format ("Restating policy information for urn %s with value %s" , urn , infoAspect ));
187
193
MetadataChangeProposal proposal = new MetadataChangeProposal ();
188
194
proposal .setEntityUrn (urn );
189
195
proposal .setEntityType (urn .getEntityType ());
190
196
proposal .setAspectName (DATAHUB_POLICY_INFO_ASPECT_NAME );
191
197
proposal .setChangeType (ChangeType .RESTATE );
192
198
proposal .setSystemMetadata (
193
- new SystemMetadata ().setRunId (DEFAULT_RUN_ID ).setLastObserved (System .currentTimeMillis ()));
199
+ new SystemMetadata ()
200
+ .setRunId (DEFAULT_RUN_ID )
201
+ .setLastObserved (System .currentTimeMillis ()));
194
202
proposal .setAspect (GenericRecordUtils .serializeAspect (infoAspect ));
195
203
entityService .ingestProposal (proposal , auditStamp , true );
196
204
}
@@ -208,5 +216,4 @@ private static ConjunctiveCriterion getCriterionForMissingField(String field) {
208
216
conjunctiveCriterion .setAnd (criterionArray );
209
217
return conjunctiveCriterion ;
210
218
}
211
-
212
- }
219
+ }
0 commit comments