Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: search cache invalidation for iceberg entities #12805

Merged
merged 3 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-io/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies {
implementation externalDependency.slf4jApi
runtimeOnly externalDependency.logbackClassic
compileOnly externalDependency.lombok
compileOnly externalDependency.hazelcast
implementation externalDependency.commonsCollections
api externalDependency.datastaxOssNativeProtocol
api(externalDependency.datastaxOssCore) {
Expand Down Expand Up @@ -98,7 +99,6 @@ dependencies {
testImplementation spec.product.pegasus.restliServer
testImplementation externalDependency.ebeanTest
testImplementation externalDependency.opentelemetrySdk

// logback >=1.3 required due to `testcontainers` only
testImplementation 'ch.qos.logback:logback-classic:1.4.7'
testImplementation 'net.datafaker:datafaker:1.9.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package com.linkedin.metadata.search.client;

import com.hazelcast.map.IMap;
import com.linkedin.common.urn.Urn;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;

/**
* A framework to enable search cache invalidation. The cache keys in the search cache are queries
* of different forms and when an entity is modified, there isnt a simple direct correlation of that
* entity to the queries in the cache (except for fully evaluating that search). This service
* provides a mechanism to implement some a CacheKeyMatcher that implements some approximations to
* check if a cache key is likely related to some entity that was updated and clear those entries.
* The evict method can be called when entities are updated and it is important for those updates to
* be visible in the UI without waiting for cache expiration. The eviction is disabled by default
* and enabled via a spring application property searchService.enableEviction
*/
@RequiredArgsConstructor
@Slf4j
public class CacheEvictionService {
private final CacheManager cacheManager;
private final Boolean cachingEnabled;
private final Boolean enableEviction;

// invalidates all caches
public void invalidateAll() {
if (cachingEnabled && enableEviction) {
cacheManager.getCacheNames().forEach(this::invalidate);
}
}

// invalidates a specific cache
public void invalidate(String cacheName) {
if (cachingEnabled && enableEviction) {
Cache cache = cacheManager.getCache(cacheName);
if (cache != null) {
cache.invalidate();
} else {
throw new AssertionError(String.format("Invalid cache name %s supplied", cacheName));
}
}
}

// Runs all cache keys through the supplied matcher implementation and clear the cache keys
// identified by the matcher.
public void evict(CacheKeyMatcher matcher) {

if (cachingEnabled && enableEviction) {
Collection<String> cacheNames = cacheManager.getCacheNames();
for (String cacheName : cacheNames) {
long evictCount = 0;
if (matcher.supportsCache(cacheName)) {
Cache cache = cacheManager.getCache(cacheName);
assert (cache != null);
Set<Object> keys = getKeys(cacheName);
for (Object key : keys) {
if (matcher.match(cacheName, key)) {
cache.evict(key);
evictCount++;
log.debug("From cache '{}' evicting key {}", cacheName, key);
}
}
if (evictCount > 0) {
log.info("Evicted {} keys from cache {}", evictCount, cacheName);
}
}
}
}
}

// Use a UrnCacheKeyMatcher implement to evict cache keys that are likely related to the supplied
// list of urns
public void evict(List<Urn> urns) {
log.info("Attempting eviction of search cache due to updates to {}", urns);
UrnCacheKeyMatcher matcher = new UrnCacheKeyMatcher(urns);
evict(matcher);
}

private Set<Object> getKeys(String cacheName) {
// Enumerating cache keys is not part of the standard Cache interface, but needs is native cache
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unfortunate but I don't see any alternative either.

// implementation
// dependent and so must be implemented for all cache implementations we may use.

Cache springCache = cacheManager.getCache(cacheName);
assert (springCache != null);
Object nativeCache = springCache.getNativeCache();
if (nativeCache instanceof com.github.benmanes.caffeine.cache.Cache) {
com.github.benmanes.caffeine.cache.Cache<Object, Object> caffeineCache =
(com.github.benmanes.caffeine.cache.Cache<Object, Object>) nativeCache;
return caffeineCache.asMap().keySet();
} else if (nativeCache instanceof IMap) {
IMap<Object, Object> hazelCache = (IMap<Object, Object>) nativeCache;
return hazelCache.keySet();
}

log.warn("Unhandled cache type {} of type {}", cacheName, nativeCache.getClass());
return Collections.emptySet();
}

// Useful during matcher debugging, but voluminous
void dumpCache(String message) {
log.debug("Begin Dump {}", message);
cacheManager
.getCacheNames()
.forEach(
cacheName -> {
log.debug("Dump cache: {}", cacheName);
Cache cache = cacheManager.getCache(cacheName);
getKeys(cacheName)
.forEach(
key -> {
log.debug(" key {} : {}", key, cache.get(key));
});
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.linkedin.metadata.search.client;

public interface CacheKeyMatcher {
boolean supportsCache(String cacheName);

// Called for each supported cache, with each key
boolean match(String cacheName, Object key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@

@RequiredArgsConstructor
public class CachingEntitySearchService {
private static final String ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME = "entitySearchServiceSearch";
private static final String ENTITY_SEARCH_SERVICE_AUTOCOMPLETE_CACHE_NAME =
public static final String ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME = "entitySearchServiceSearch";
public static final String ENTITY_SEARCH_SERVICE_AUTOCOMPLETE_CACHE_NAME =
"entitySearchServiceAutoComplete";
private static final String ENTITY_SEARCH_SERVICE_BROWSE_CACHE_NAME = "entitySearchServiceBrowse";
public static final String ENTITY_SEARCH_SERVICE_BROWSE_CACHE_NAME = "entitySearchServiceBrowse";
public static final String ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME = "entitySearchServiceScroll";

private final CacheManager cacheManager;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.linkedin.metadata.search.client;

import static com.linkedin.metadata.search.client.CachingEntitySearchService.*;

import com.linkedin.common.urn.Urn;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.javatuples.Octet;
import org.javatuples.Septet;

class UrnCacheKeyMatcher implements CacheKeyMatcher {
private final List<Urn> urns;
private final Set<String> entityTypes;

final List<String> SUPPORTED_CACHE_NAMES =
Arrays.asList(
ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME, ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME);

UrnCacheKeyMatcher(List<Urn> urns) {
this.urns = urns;
this.entityTypes = new HashSet<>();
urns.forEach(
urn -> {
this.entityTypes.add(urn.getEntityType());
});
}

@Override
public boolean supportsCache(String cacheName) {
return SUPPORTED_CACHE_NAMES.contains(cacheName);
}

@Override
public boolean match(String cacheName, Object key) {
switch (cacheName) {
case ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME:
return matchSearchServiceCacheKey(key);
case ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME:
return matchSearchServiceScrollCacheKey(key);
}
return false;
}

private boolean matchSearchServiceScrollCacheKey(Object key) {
Octet<?, List<String>, String, String, ?, ?, List<String>, ?> cacheKey =
(Octet<?, List<String>, String, String, ?, ?, List<String>, ?>) key;
// For reference - cache key contents
// @Nonnull OperationContext opContext,
// @Nonnull List<String> entities,
// @Nonnull String query,
// @Nullable Filter filters,
// List<SortCriterion> sortCriteria,
// @Nullable String scrollId,
// @Nonnull List<String> facets
// int size,
List<String> entitiesInCacheKey = (List<String>) cacheKey.getValue(1);
String filter = (String) cacheKey.getValue(3);
String query = (String) cacheKey.getValue(2);
List<String> facets = (List<String>) cacheKey.getValue(6);

if (filter == null) {
filter = "";
}
filter += " " + String.join(" ", facets);
// Facets may contain urns. Since the check for urns in filters is similar, can append it to the
// filter.
return isKeyImpactedByEntity(entitiesInCacheKey, query, filter);
}

private boolean matchSearchServiceCacheKey(Object key) {
Septet<?, List<String>, ?, String, ?, ?, ?> cacheKey =
(Septet<?, List<String>, ?, String, ?, ?, ?>) key;
// For reference
// @Nonnull OperationContext opContext,
// @Nonnull List<String> entityNames,
// @Nonnull String query,
// @Nullable Filter filters,
// List<SortCriterion> sortCriteria,
// @Nonnull List<String> facets
// querySize

List<String> entitiesInCacheKey = (List<String>) cacheKey.getValue(1);
String filter = (String) cacheKey.getValue(3);
String query = (String) cacheKey.getValue(2);
List<String> facets = (List<String>) cacheKey.getValue(5);

// Facets may contain urns. Since the check for urns in filters is similar, can append it to the
// filter.
if (filter == null) {
filter = "";
}
filter += " " + String.join(" ", facets);

return isKeyImpactedByEntity(entitiesInCacheKey, query, filter);
}

boolean isKeyImpactedByEntity(List<String> entitiesInCacheKey, String query, String filter) {
boolean entityMatch = entitiesInCacheKey.stream().anyMatch(entityTypes::contains);
if (!entityMatch) {
return false;
}

// Ignoring query for now. A query could make this cache entry more targeted, but till there is
// a quick way to evaluate if the entities that were updated are affected by this query,
// ignoring it may mean some cache entries are invalidated even if they may not be a match,
// and an uncached query result will still be fetched.

boolean containsUrn = filter.contains("urn:li");
if (!containsUrn) {
return true; // Entity match, has a filter, but not on urn. this may be a suboptimal
}

return urns.stream()
.anyMatch(
urn ->
filter.contains(
urn.toString())); // If we found an exact URN match, this is to be evicted. If

// this entry was for some other urn, do not evict.
}
}
Loading
Loading