2
2
3
3
import static com .linkedin .metadata .Constants .DEFAULT_RUN_ID ;
4
4
5
+ import com .linkedin .entity .EnvelopedAspect ;
5
6
import com .linkedin .entity .EnvelopedAspectMap ;
6
7
import com .linkedin .mxe .SystemMetadata ;
7
- import java .util .ArrayList ;
8
- import java .util .List ;
8
+ import java .util .Comparator ;
9
+ import java .util .Objects ;
10
+ import java .util .Optional ;
9
11
import javax .annotation .Nonnull ;
10
12
import javax .annotation .Nullable ;
11
13
@@ -15,51 +17,34 @@ private SystemMetadataUtils() {}
15
17
16
18
@ Nullable
17
19
public static Long getLastIngestedTime (@ Nonnull EnvelopedAspectMap aspectMap ) {
18
- RunInfo lastIngestionRun = getLastIngestionRun (aspectMap );
19
- return lastIngestionRun != null ? lastIngestionRun .getTime () : null ;
20
+ return getLastIngestionRun (aspectMap ).map (RunInfo ::getTime ).orElse (null );
20
21
}
21
22
22
23
@ Nullable
23
24
public static String getLastIngestedRunId (@ Nonnull EnvelopedAspectMap aspectMap ) {
24
- RunInfo lastIngestionRun = getLastIngestionRun (aspectMap );
25
- return lastIngestionRun != null ? lastIngestionRun .getId () : null ;
25
+ return getLastIngestionRun (aspectMap ).map (RunInfo ::getId ).orElse (null );
26
26
}
27
27
28
28
/**
29
- * Returns a sorted list of all of the most recent ingestion runs based on the most recent aspects
30
- * present for the entity.
29
+ * Returns the most recent ingestion run based on the most recent aspects present for the entity.
31
30
*/
32
31
@ Nonnull
33
- public static List <RunInfo > getLastIngestionRuns (@ Nonnull EnvelopedAspectMap aspectMap ) {
34
- final List <RunInfo > runs = new ArrayList <>();
35
- for (String aspect : aspectMap .keySet ()) {
36
- if (aspectMap .get (aspect ).hasSystemMetadata ()) {
37
- SystemMetadata systemMetadata = aspectMap .get (aspect ).getSystemMetadata ();
38
- if (systemMetadata .hasLastRunId ()
39
- && !systemMetadata .getLastRunId ().equals (DEFAULT_RUN_ID )
40
- && systemMetadata .hasLastObserved ()) {
41
- Long lastObserved = systemMetadata .getLastObserved ();
42
- String runId = systemMetadata .getLastRunId ();
43
- RunInfo run = new RunInfo (runId , lastObserved );
44
- runs .add (run );
45
- } else if (systemMetadata .hasRunId ()
46
- && !systemMetadata .getRunId ().equals (DEFAULT_RUN_ID )
47
- && systemMetadata .hasLastObserved ()) {
48
- // Handle the legacy case: Check original run ids.
49
- Long lastObserved = systemMetadata .getLastObserved ();
50
- String runId = systemMetadata .getRunId ();
51
- RunInfo run = new RunInfo (runId , lastObserved );
52
- runs .add (run );
53
- }
54
- }
55
- }
56
- runs .sort ((a , b ) -> Long .compare (b .getTime (), a .getTime ()));
57
- return runs ;
58
- }
59
-
60
- @ Nullable
61
- private static RunInfo getLastIngestionRun (@ Nonnull EnvelopedAspectMap aspectMap ) {
62
- List <RunInfo > runs = getLastIngestionRuns (aspectMap );
63
- return !runs .isEmpty () ? runs .get (0 ) : null ; // Just take the first, to get the most recent run.
32
+ private static Optional <RunInfo > getLastIngestionRun (@ Nonnull EnvelopedAspectMap aspectMap ) {
33
+ return aspectMap .values ().stream ()
34
+ .filter (EnvelopedAspect ::hasSystemMetadata )
35
+ .map (EnvelopedAspect ::getSystemMetadata )
36
+ .filter (SystemMetadata ::hasLastObserved )
37
+ .map (
38
+ systemMetadata ->
39
+ Optional .ofNullable (systemMetadata .getLastRunId ())
40
+ .filter (lastRunId -> !lastRunId .equals (DEFAULT_RUN_ID ))
41
+ .or (
42
+ () ->
43
+ Optional .ofNullable (systemMetadata .getRunId ())
44
+ .filter (runId -> !runId .equals (DEFAULT_RUN_ID )))
45
+ .map (runId -> new RunInfo (runId , systemMetadata .getLastObserved ()))
46
+ .orElse (null ))
47
+ .filter (Objects ::nonNull )
48
+ .max (Comparator .comparingLong (RunInfo ::getTime ));
64
49
}
65
50
}
0 commit comments