Skip to content

Commit 5b8dcc3

Browse files
committed
feat: search cache invalidation for iceberg entities
1 parent 4714f46 commit 5b8dcc3

File tree

15 files changed

+635
-20
lines changed

15 files changed

+635
-20
lines changed

metadata-io/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ dependencies {
4141
implementation externalDependency.slf4jApi
4242
runtimeOnly externalDependency.logbackClassic
4343
compileOnly externalDependency.lombok
44+
compileOnly externalDependency.hazelcast
4445
implementation externalDependency.commonsCollections
4546
api externalDependency.datastaxOssNativeProtocol
4647
api(externalDependency.datastaxOssCore) {
@@ -98,7 +99,6 @@ dependencies {
9899
testImplementation spec.product.pegasus.restliServer
99100
testImplementation externalDependency.ebeanTest
100101
testImplementation externalDependency.opentelemetrySdk
101-
102102
// logback >=1.3 required due to `testcontainers` only
103103
testImplementation 'ch.qos.logback:logback-classic:1.4.7'
104104
testImplementation 'net.datafaker:datafaker:1.9.0'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package com.linkedin.metadata.search.client;
2+
3+
import com.hazelcast.map.IMap;
4+
import com.linkedin.common.urn.Urn;
5+
import java.util.Collection;
6+
import java.util.Collections;
7+
import java.util.List;
8+
import java.util.Set;
9+
import lombok.RequiredArgsConstructor;
10+
import lombok.extern.slf4j.Slf4j;
11+
import org.springframework.cache.Cache;
12+
import org.springframework.cache.CacheManager;
13+
14+
@RequiredArgsConstructor
15+
@Slf4j
16+
public class CacheEvictionService {
17+
private final CacheManager cacheManager;
18+
private final Boolean cachingEnabled;
19+
private final Boolean enableEviction;
20+
21+
// invalidates all caches
22+
public void invalidateAll() {
23+
if (cachingEnabled && enableEviction) {
24+
cacheManager.getCacheNames().forEach(this::invalidate);
25+
}
26+
}
27+
28+
// invalidates a specific cache
29+
public void invalidate(String cacheName) {
30+
if (cachingEnabled && enableEviction) {
31+
Cache cache = cacheManager.getCache(cacheName);
32+
if (cache != null) {
33+
cache.invalidate();
34+
} else {
35+
throw new AssertionError(String.format("Invalid cache name %s supplied", cacheName));
36+
}
37+
}
38+
}
39+
40+
public void evict(CacheKeyMatcher matcher){
41+
42+
if (cachingEnabled && enableEviction) {
43+
Collection<String> cacheNames = cacheManager.getCacheNames();
44+
for (String cacheName : cacheNames) {
45+
long evictCount = 0;
46+
if (matcher.supportsCache(cacheName)) {
47+
Cache cache = cacheManager.getCache(cacheName);
48+
assert (cache != null);
49+
Set<Object> keys = getKeys(cacheName);
50+
for (Object key : keys) {
51+
if (matcher.match(cacheName, key)) {
52+
cache.evict(key);
53+
evictCount ++;
54+
log.debug("From cache '{}' evicting key {}", cacheName, key);
55+
}
56+
}
57+
if (evictCount>0){
58+
log.info("Evicted {} keys from cache {}", evictCount, cacheName);
59+
}
60+
}
61+
}
62+
}
63+
}
64+
65+
private Set<Object> getKeys(String cacheName) {
66+
// Enumerating cache keys is not part of the standard Cache interface, but needs is native cache
67+
// implementation
68+
// dependent and so must be implemented for all cache implementations we may use.
69+
70+
Cache springCache = cacheManager.getCache(cacheName);
71+
if (springCache == null) {
72+
return Collections.emptySet();
73+
}
74+
75+
Object nativeCache = springCache.getNativeCache();
76+
if (nativeCache instanceof com.github.benmanes.caffeine.cache.Cache) {
77+
com.github.benmanes.caffeine.cache.Cache<Object, Object> caffeineCache =
78+
(com.github.benmanes.caffeine.cache.Cache<Object, Object>) nativeCache;
79+
return caffeineCache.asMap().keySet();
80+
} else if (nativeCache instanceof IMap) {
81+
IMap<Object, Object> hazelCache = (IMap<Object, Object>) nativeCache;
82+
return hazelCache.keySet();
83+
}
84+
85+
log.warn("Unhandled cache type {} of type {}", cacheName, nativeCache.getClass());
86+
return Collections.emptySet();
87+
}
88+
89+
//Useful during matcher debugging, but voluminous
90+
private void dumpCache(String message){
91+
log.debug("Begin Dump {}", message);
92+
cacheManager.getCacheNames()
93+
.forEach(cacheName -> {
94+
log.debug("Dump cache: {}", cacheName);
95+
Cache cache = cacheManager.getCache(cacheName);
96+
getKeys(cacheName).forEach(key -> {
97+
log.debug(" key {} : {}", key, cache.get(key));
98+
});
99+
});
100+
}
101+
102+
public void evict(List<Urn> urns) {
103+
log.info("Attempting eviction of search cache due to updates to {}", urns);
104+
UrnCacheKeyMatcher matcher = new UrnCacheKeyMatcher(urns);
105+
evict(matcher);
106+
}
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.linkedin.metadata.search.client;
2+
3+
public interface CacheKeyMatcher {
4+
boolean supportsCache(String cacheName);
5+
6+
// Called for each supported cache, with each key
7+
boolean match(String cacheName, Object key);
8+
}

metadata-io/src/main/java/com/linkedin/metadata/search/client/CachingEntitySearchService.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@
3030

3131
@RequiredArgsConstructor
3232
public class CachingEntitySearchService {
33-
private static final String ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME = "entitySearchServiceSearch";
34-
private static final String ENTITY_SEARCH_SERVICE_AUTOCOMPLETE_CACHE_NAME =
33+
static final String ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME = "entitySearchServiceSearch";
34+
static final String ENTITY_SEARCH_SERVICE_AUTOCOMPLETE_CACHE_NAME =
3535
"entitySearchServiceAutoComplete";
36-
private static final String ENTITY_SEARCH_SERVICE_BROWSE_CACHE_NAME = "entitySearchServiceBrowse";
37-
public static final String ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME = "entitySearchServiceScroll";
36+
static final String ENTITY_SEARCH_SERVICE_BROWSE_CACHE_NAME = "entitySearchServiceBrowse";
37+
static final String ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME = "entitySearchServiceScroll";
3838

3939
private final CacheManager cacheManager;
4040
private final EntitySearchService
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package com.linkedin.metadata.search.client;
2+
3+
import static com.linkedin.metadata.search.client.CachingEntitySearchService.*;
4+
5+
import com.linkedin.common.urn.Urn;
6+
import java.util.Arrays;
7+
import java.util.HashSet;
8+
import java.util.List;
9+
import java.util.Set;
10+
import org.javatuples.Octet;
11+
import org.javatuples.Septet;
12+
13+
class UrnCacheKeyMatcher implements CacheKeyMatcher {
14+
private final List<Urn> urns;
15+
private final Set<String> entityTypes;
16+
17+
final List<String> SUPPORTED_CACHE_NAMES =
18+
Arrays.asList(
19+
ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME, ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME);
20+
21+
UrnCacheKeyMatcher(List<Urn> urns) {
22+
this.urns = urns;
23+
this.entityTypes = new HashSet<>();
24+
urns.forEach(
25+
urn -> {
26+
this.entityTypes.add(urn.getEntityType());
27+
});
28+
}
29+
30+
@Override
31+
public boolean supportsCache(String cacheName) {
32+
return SUPPORTED_CACHE_NAMES.contains(cacheName);
33+
}
34+
35+
@Override
36+
public boolean match(String cacheName, Object key) {
37+
if (!SUPPORTED_CACHE_NAMES.contains(cacheName)) {
38+
return false;
39+
}
40+
41+
switch (cacheName) {
42+
case ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME:
43+
return matchSearchServiceCacheKey(key);
44+
case ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME:
45+
return matchSearchServiceScrollCacheKey(key);
46+
}
47+
return false;
48+
}
49+
50+
private boolean matchSearchServiceScrollCacheKey(Object key) {
51+
Octet<?, List<String>, String, String, ?, ?, List<String>, ?> cacheKey
52+
= (Octet<?, List<String>, String, String, ?, ?, List<String>, ?>) key;
53+
// @Nonnull OperationContext opContext,
54+
// @Nonnull List<String> entities,
55+
// @Nonnull String query,
56+
// @Nullable Filter filters,
57+
// List<SortCriterion> sortCriteria,
58+
// @Nullable String scrollId,
59+
// @Nonnull List<String> facets
60+
// int size,
61+
List<String> entitiesInCacheKey = (List<String>) cacheKey.getValue(1);
62+
String filter = (String) cacheKey.getValue(3);
63+
String query = (String) cacheKey.getValue(2);
64+
List<String> facets = (List<String>) cacheKey.getValue(6);
65+
66+
//Facets may contain urns. Since the check for urns in filters is similar, can append it to the filter.
67+
return isKeyImpactedByEntity(entitiesInCacheKey, query, filter + " " + String.join(" ",facets ));
68+
}
69+
70+
private boolean matchSearchServiceCacheKey(Object key) {
71+
Septet<?, List<String>, ?, String, ?, ?, ?> cacheKey =
72+
(Septet<?, List<String>, ?, String, ?, ?, ?>) key;
73+
74+
List<String> entitiesInCacheKey = (List<String>) cacheKey.getValue(1);
75+
String filter = (String) cacheKey.getValue(3);
76+
String query = (String) cacheKey.getValue(2);
77+
78+
return isKeyImpactedByEntity(entitiesInCacheKey, query, filter);
79+
}
80+
81+
boolean isKeyImpactedByEntity(List<String> entitiesInCacheKey, String query, String filter){
82+
boolean entityMatch = entitiesInCacheKey.stream().anyMatch(entityTypes::contains);
83+
if (!entityMatch) {
84+
return false;
85+
}
86+
87+
if (filter == null){ //No filter, but already established there is an entity match
88+
return true;
89+
}
90+
91+
// Ignoring query for now. A query could make this cache entry more targeted, but till there is a quick way to
92+
// evaluate if the entities that were updated are affected by this query, ignoring it may mean some cache entries
93+
// are invalidated even if they may not be a match, and an uncached query result will still be fetched.
94+
95+
boolean containsUrn = filter.contains("urn:li");
96+
if (!containsUrn) {
97+
return true; // Entity match, has a filter, but not on urn. this may be a suboptimal
98+
}
99+
100+
return urns.stream()
101+
.anyMatch(
102+
urn ->
103+
filter.contains(
104+
urn.toString())); // If we found an exact URN match, this is to be evicted. If
105+
106+
// this entry was for some other urn, do not evict.
107+
}
108+
}

0 commit comments

Comments
 (0)