Skip to content

Commit f053188

Browse files
authored
fix: search cache invalidation for iceberg entities (#12805)
1 parent a101c27 commit f053188

File tree

15 files changed

+717
-15
lines changed

15 files changed

+717
-15
lines changed

metadata-io/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ dependencies {
4141
implementation externalDependency.slf4jApi
4242
runtimeOnly externalDependency.logbackClassic
4343
compileOnly externalDependency.lombok
44+
compileOnly externalDependency.hazelcast
4445
implementation externalDependency.commonsCollections
4546
api externalDependency.datastaxOssNativeProtocol
4647
api(externalDependency.datastaxOssCore) {
@@ -98,7 +99,6 @@ dependencies {
9899
testImplementation spec.product.pegasus.restliServer
99100
testImplementation externalDependency.ebeanTest
100101
testImplementation externalDependency.opentelemetrySdk
101-
102102
// logback >=1.3 required due to `testcontainers` only
103103
testImplementation 'ch.qos.logback:logback-classic:1.4.7'
104104
testImplementation 'net.datafaker:datafaker:1.9.0'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package com.linkedin.metadata.search.client;
2+
3+
import com.hazelcast.map.IMap;
4+
import com.linkedin.common.urn.Urn;
5+
import java.util.Collection;
6+
import java.util.Collections;
7+
import java.util.List;
8+
import java.util.Set;
9+
import lombok.RequiredArgsConstructor;
10+
import lombok.extern.slf4j.Slf4j;
11+
import org.springframework.cache.Cache;
12+
import org.springframework.cache.CacheManager;
13+
14+
/**
15+
* A framework to enable search cache invalidation. The cache keys in the search cache are queries
16+
* of different forms and when an entity is modified, there isnt a simple direct correlation of that
17+
* entity to the queries in the cache (except for fully evaluating that search). This service
18+
* provides a mechanism to implement some a CacheKeyMatcher that implements some approximations to
19+
* check if a cache key is likely related to some entity that was updated and clear those entries.
20+
* The evict method can be called when entities are updated and it is important for those updates to
21+
* be visible in the UI without waiting for cache expiration. The eviction is disabled by default
22+
* and enabled via a spring application property searchService.enableEviction
23+
*/
24+
@RequiredArgsConstructor
25+
@Slf4j
26+
public class CacheEvictionService {
27+
private final CacheManager cacheManager;
28+
private final Boolean cachingEnabled;
29+
private final Boolean enableEviction;
30+
31+
// invalidates all caches
32+
public void invalidateAll() {
33+
if (cachingEnabled && enableEviction) {
34+
cacheManager.getCacheNames().forEach(this::invalidate);
35+
}
36+
}
37+
38+
// invalidates a specific cache
39+
public void invalidate(String cacheName) {
40+
if (cachingEnabled && enableEviction) {
41+
Cache cache = cacheManager.getCache(cacheName);
42+
if (cache != null) {
43+
cache.invalidate();
44+
} else {
45+
throw new AssertionError(String.format("Invalid cache name %s supplied", cacheName));
46+
}
47+
}
48+
}
49+
50+
// Runs all cache keys through the supplied matcher implementation and clear the cache keys
51+
// identified by the matcher.
52+
public void evict(CacheKeyMatcher matcher) {
53+
54+
if (cachingEnabled && enableEviction) {
55+
Collection<String> cacheNames = cacheManager.getCacheNames();
56+
for (String cacheName : cacheNames) {
57+
long evictCount = 0;
58+
if (matcher.supportsCache(cacheName)) {
59+
Cache cache = cacheManager.getCache(cacheName);
60+
assert (cache != null);
61+
Set<Object> keys = getKeys(cacheName);
62+
for (Object key : keys) {
63+
if (matcher.match(cacheName, key)) {
64+
cache.evict(key);
65+
evictCount++;
66+
log.debug("From cache '{}' evicting key {}", cacheName, key);
67+
}
68+
}
69+
if (evictCount > 0) {
70+
log.info("Evicted {} keys from cache {}", evictCount, cacheName);
71+
}
72+
}
73+
}
74+
}
75+
}
76+
77+
// Use a UrnCacheKeyMatcher implement to evict cache keys that are likely related to the supplied
78+
// list of urns
79+
public void evict(List<Urn> urns) {
80+
log.info("Attempting eviction of search cache due to updates to {}", urns);
81+
UrnCacheKeyMatcher matcher = new UrnCacheKeyMatcher(urns);
82+
evict(matcher);
83+
}
84+
85+
private Set<Object> getKeys(String cacheName) {
86+
// Enumerating cache keys is not part of the standard Cache interface, but needs is native cache
87+
// implementation
88+
// dependent and so must be implemented for all cache implementations we may use.
89+
90+
Cache springCache = cacheManager.getCache(cacheName);
91+
assert (springCache != null);
92+
Object nativeCache = springCache.getNativeCache();
93+
if (nativeCache instanceof com.github.benmanes.caffeine.cache.Cache) {
94+
com.github.benmanes.caffeine.cache.Cache<Object, Object> caffeineCache =
95+
(com.github.benmanes.caffeine.cache.Cache<Object, Object>) nativeCache;
96+
return caffeineCache.asMap().keySet();
97+
} else if (nativeCache instanceof IMap) {
98+
IMap<Object, Object> hazelCache = (IMap<Object, Object>) nativeCache;
99+
return hazelCache.keySet();
100+
}
101+
102+
log.warn("Unhandled cache type {} of type {}", cacheName, nativeCache.getClass());
103+
return Collections.emptySet();
104+
}
105+
106+
// Useful during matcher debugging, but voluminous
107+
void dumpCache(String message) {
108+
log.debug("Begin Dump {}", message);
109+
cacheManager
110+
.getCacheNames()
111+
.forEach(
112+
cacheName -> {
113+
log.debug("Dump cache: {}", cacheName);
114+
Cache cache = cacheManager.getCache(cacheName);
115+
getKeys(cacheName)
116+
.forEach(
117+
key -> {
118+
log.debug(" key {} : {}", key, cache.get(key));
119+
});
120+
});
121+
}
122+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.linkedin.metadata.search.client;
2+
3+
public interface CacheKeyMatcher {
4+
boolean supportsCache(String cacheName);
5+
6+
// Called for each supported cache, with each key
7+
boolean match(String cacheName, Object key);
8+
}

metadata-io/src/main/java/com/linkedin/metadata/search/client/CachingEntitySearchService.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@
3030

3131
@RequiredArgsConstructor
3232
public class CachingEntitySearchService {
33-
private static final String ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME = "entitySearchServiceSearch";
34-
private static final String ENTITY_SEARCH_SERVICE_AUTOCOMPLETE_CACHE_NAME =
33+
public static final String ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME = "entitySearchServiceSearch";
34+
public static final String ENTITY_SEARCH_SERVICE_AUTOCOMPLETE_CACHE_NAME =
3535
"entitySearchServiceAutoComplete";
36-
private static final String ENTITY_SEARCH_SERVICE_BROWSE_CACHE_NAME = "entitySearchServiceBrowse";
36+
public static final String ENTITY_SEARCH_SERVICE_BROWSE_CACHE_NAME = "entitySearchServiceBrowse";
3737
public static final String ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME = "entitySearchServiceScroll";
3838

3939
private final CacheManager cacheManager;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package com.linkedin.metadata.search.client;
2+
3+
import static com.linkedin.metadata.search.client.CachingEntitySearchService.*;
4+
5+
import com.linkedin.common.urn.Urn;
6+
import java.util.Arrays;
7+
import java.util.HashSet;
8+
import java.util.List;
9+
import java.util.Set;
10+
import org.javatuples.Octet;
11+
import org.javatuples.Septet;
12+
13+
class UrnCacheKeyMatcher implements CacheKeyMatcher {
14+
private final List<Urn> urns;
15+
private final Set<String> entityTypes;
16+
17+
final List<String> SUPPORTED_CACHE_NAMES =
18+
Arrays.asList(
19+
ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME, ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME);
20+
21+
UrnCacheKeyMatcher(List<Urn> urns) {
22+
this.urns = urns;
23+
this.entityTypes = new HashSet<>();
24+
urns.forEach(
25+
urn -> {
26+
this.entityTypes.add(urn.getEntityType());
27+
});
28+
}
29+
30+
@Override
31+
public boolean supportsCache(String cacheName) {
32+
return SUPPORTED_CACHE_NAMES.contains(cacheName);
33+
}
34+
35+
@Override
36+
public boolean match(String cacheName, Object key) {
37+
switch (cacheName) {
38+
case ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME:
39+
return matchSearchServiceCacheKey(key);
40+
case ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME:
41+
return matchSearchServiceScrollCacheKey(key);
42+
}
43+
return false;
44+
}
45+
46+
private boolean matchSearchServiceScrollCacheKey(Object key) {
47+
Octet<?, List<String>, String, String, ?, ?, List<String>, ?> cacheKey =
48+
(Octet<?, List<String>, String, String, ?, ?, List<String>, ?>) key;
49+
// For reference - cache key contents
50+
// @Nonnull OperationContext opContext,
51+
// @Nonnull List<String> entities,
52+
// @Nonnull String query,
53+
// @Nullable Filter filters,
54+
// List<SortCriterion> sortCriteria,
55+
// @Nullable String scrollId,
56+
// @Nonnull List<String> facets
57+
// int size,
58+
List<String> entitiesInCacheKey = (List<String>) cacheKey.getValue(1);
59+
String filter = (String) cacheKey.getValue(3);
60+
String query = (String) cacheKey.getValue(2);
61+
List<String> facets = (List<String>) cacheKey.getValue(6);
62+
63+
if (filter == null) {
64+
filter = "";
65+
}
66+
filter += " " + String.join(" ", facets);
67+
// Facets may contain urns. Since the check for urns in filters is similar, can append it to the
68+
// filter.
69+
return isKeyImpactedByEntity(entitiesInCacheKey, query, filter);
70+
}
71+
72+
private boolean matchSearchServiceCacheKey(Object key) {
73+
Septet<?, List<String>, ?, String, ?, ?, ?> cacheKey =
74+
(Septet<?, List<String>, ?, String, ?, ?, ?>) key;
75+
// For reference
76+
// @Nonnull OperationContext opContext,
77+
// @Nonnull List<String> entityNames,
78+
// @Nonnull String query,
79+
// @Nullable Filter filters,
80+
// List<SortCriterion> sortCriteria,
81+
// @Nonnull List<String> facets
82+
// querySize
83+
84+
List<String> entitiesInCacheKey = (List<String>) cacheKey.getValue(1);
85+
String filter = (String) cacheKey.getValue(3);
86+
String query = (String) cacheKey.getValue(2);
87+
List<String> facets = (List<String>) cacheKey.getValue(5);
88+
89+
// Facets may contain urns. Since the check for urns in filters is similar, can append it to the
90+
// filter.
91+
if (filter == null) {
92+
filter = "";
93+
}
94+
filter += " " + String.join(" ", facets);
95+
96+
return isKeyImpactedByEntity(entitiesInCacheKey, query, filter);
97+
}
98+
99+
boolean isKeyImpactedByEntity(List<String> entitiesInCacheKey, String query, String filter) {
100+
boolean entityMatch = entitiesInCacheKey.stream().anyMatch(entityTypes::contains);
101+
if (!entityMatch) {
102+
return false;
103+
}
104+
105+
// Ignoring query for now. A query could make this cache entry more targeted, but till there is
106+
// a quick way to evaluate if the entities that were updated are affected by this query,
107+
// ignoring it may mean some cache entries are invalidated even if they may not be a match,
108+
// and an uncached query result will still be fetched.
109+
110+
boolean containsUrn = filter.contains("urn:li");
111+
if (!containsUrn) {
112+
return true; // Entity match, has a filter, but not on urn. this may be a suboptimal
113+
}
114+
115+
return urns.stream()
116+
.anyMatch(
117+
urn ->
118+
filter.contains(
119+
urn.toString())); // If we found an exact URN match, this is to be evicted. If
120+
121+
// this entry was for some other urn, do not evict.
122+
}
123+
}

0 commit comments

Comments
 (0)