Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Fair lock & reduce the granularity for DataNodeSchemaLockManager #15137

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -566,23 +566,32 @@ public TSStatus invalidateLastCache(final String database) {

@Override
public TSStatus invalidateSchemaCache(final TInvalidateCacheReq req) {
DataNodeSchemaLockManager.getInstance().takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
DataNodeSchemaLockManager.getInstance()
.takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TABLE);
try {
TreeDeviceSchemaCacheManager.getInstance().takeWriteLock();
DataNodeSchemaLockManager.getInstance()
.takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TREE);
try {
final String database = req.getFullPath();
// req.getFullPath() is a database path
ClusterTemplateManager.getInstance().invalid(database);
// clear table related cache
DataNodeTableCache.getInstance().invalid(database);
tableDeviceSchemaCache.invalidate(database);
LOGGER.info("Schema cache of {} has been invalidated", req.getFullPath());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
TreeDeviceSchemaCacheManager.getInstance().takeWriteLock();
try {
final String database = req.getFullPath();
// req.getFullPath() is a database path
ClusterTemplateManager.getInstance().invalid(database);
// clear table related cache
DataNodeTableCache.getInstance().invalid(database);
tableDeviceSchemaCache.invalidate(database);
LOGGER.info("Schema cache of {} has been invalidated", req.getFullPath());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} finally {
TreeDeviceSchemaCacheManager.getInstance().releaseWriteLock();
}
} finally {
TreeDeviceSchemaCacheManager.getInstance().releaseWriteLock();
DataNodeSchemaLockManager.getInstance()
.releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TREE);
}
} finally {
DataNodeSchemaLockManager.getInstance().releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
DataNodeSchemaLockManager.getInstance()
.releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TABLE);
}
}

Expand Down Expand Up @@ -649,7 +658,7 @@ public TSStatus rollbackSchemaBlackList(TRollbackSchemaBlackListReq req) {
@Override
public TSStatus invalidateMatchedSchemaCache(final TInvalidateMatchedSchemaCacheReq req) {
final TreeDeviceSchemaCacheManager cache = TreeDeviceSchemaCacheManager.getInstance();
DataNodeSchemaLockManager.getInstance().takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
DataNodeSchemaLockManager.getInstance().takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TREE);
try {
cache.takeWriteLock();
try {
Expand All @@ -658,7 +667,8 @@ public TSStatus invalidateMatchedSchemaCache(final TInvalidateMatchedSchemaCache
cache.releaseWriteLock();
}
} finally {
DataNodeSchemaLockManager.getInstance().releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
DataNodeSchemaLockManager.getInstance()
.releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TREE);
}
return RpcUtils.SUCCESS_STATUS;
}
Expand Down Expand Up @@ -1570,13 +1580,15 @@ public TSStatus updateTable(final TUpdateTableReq req) {

@Override
public TSStatus invalidateTableCache(final TInvalidateTableCacheReq req) {
DataNodeSchemaLockManager.getInstance().takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
DataNodeSchemaLockManager.getInstance()
.takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TABLE);
try {
TableDeviceSchemaCache.getInstance()
.invalidate(PathUtils.unQualifyDatabaseName(req.getDatabase()), req.getTableName());
return StatusUtils.OK;
} finally {
DataNodeSchemaLockManager.getInstance().releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
DataNodeSchemaLockManager.getInstance()
.releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TABLE);
}
}

Expand Down Expand Up @@ -1652,7 +1664,8 @@ public TSStatus rollbackTableDeviceBlackList(final TTableDeviceDeletionWithPatte

@Override
public TSStatus invalidateMatchedTableDeviceCache(final TTableDeviceInvalidateCacheReq req) {
DataNodeSchemaLockManager.getInstance().takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
DataNodeSchemaLockManager.getInstance()
.takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TABLE);
try {
TableDeviceSchemaCache.getInstance()
.invalidate(
Expand All @@ -1662,7 +1675,8 @@ public TSStatus invalidateMatchedTableDeviceCache(final TTableDeviceInvalidateCa
req.getDatabase(), req.getTableName(), req.getPatternInfo()));
return StatusUtils.OK;
} finally {
DataNodeSchemaLockManager.getInstance().releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
DataNodeSchemaLockManager.getInstance()
.releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TABLE);
}
}

Expand Down Expand Up @@ -1701,7 +1715,8 @@ public TSStatus deleteTableDeviceInBlackList(final TTableDeviceDeletionWithPatte

@Override
public TSStatus invalidateColumnCache(final TInvalidateColumnCacheReq req) {
DataNodeSchemaLockManager.getInstance().takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
DataNodeSchemaLockManager.getInstance()
.takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TABLE);
try {
TableDeviceSchemaCache.getInstance()
.invalidate(
Expand All @@ -1711,7 +1726,8 @@ public TSStatus invalidateColumnCache(final TInvalidateColumnCacheReq req) {
req.isIsAttributeColumn());
return StatusUtils.OK;
} finally {
DataNodeSchemaLockManager.getInstance().releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
DataNodeSchemaLockManager.getInstance()
.releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TABLE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,11 @@ private ExecutionResult execution(
}
return result;
} finally {
// Final logic to guarantee safety
DataNodeSchemaLockManager.getInstance().releaseReadLock(queryContext);
if (queryContext != null) {
queryContext.releaseAllMemoryReservedForFrontEnd();
}
DataNodeSchemaLockManager.getInstance().releaseReadLock(queryContext);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2610,12 +2610,16 @@ private void checkIsTemplateCompatible(
final PartialPath timeSeriesPath, final String alias, final MPPQueryContext context) {
DataNodeSchemaLockManager.getInstance()
.takeReadLock(context, SchemaLockType.TIMESERIES_VS_TEMPLATE);
final Pair<Template, PartialPath> templateInfo =
schemaFetcher.checkTemplateSetAndPreSetInfo(timeSeriesPath, alias);
if (templateInfo != null) {
throw new SemanticException(
new TemplateIncompatibleException(
timeSeriesPath.getFullPath(), templateInfo.left.getName(), templateInfo.right));
try {
final Pair<Template, PartialPath> templateInfo =
schemaFetcher.checkTemplateSetAndPreSetInfo(timeSeriesPath, alias);
if (templateInfo != null) {
throw new SemanticException(
new TemplateIncompatibleException(
timeSeriesPath.getFullPath(), templateInfo.left.getName(), templateInfo.right));
}
} finally {
DataNodeSchemaLockManager.getInstance().releaseReadLock(context);
}
}

Expand All @@ -2626,18 +2630,22 @@ private void checkIsTemplateCompatible(
final MPPQueryContext context) {
DataNodeSchemaLockManager.getInstance()
.takeReadLock(context, SchemaLockType.TIMESERIES_VS_TEMPLATE);
for (int i = 0; i < measurements.size(); i++) {
final Pair<Template, PartialPath> templateInfo =
schemaFetcher.checkTemplateSetAndPreSetInfo(
devicePath.concatAsMeasurementPath(measurements.get(i)),
aliasList == null ? null : aliasList.get(i));
if (templateInfo != null) {
throw new SemanticException(
new TemplateIncompatibleException(
devicePath.getFullPath() + measurements,
templateInfo.left.getName(),
templateInfo.right));
try {
for (int i = 0; i < measurements.size(); i++) {
final Pair<Template, PartialPath> templateInfo =
schemaFetcher.checkTemplateSetAndPreSetInfo(
devicePath.concatAsMeasurementPath(measurements.get(i)),
aliasList == null ? null : aliasList.get(i));
if (templateInfo != null) {
throw new SemanticException(
new TemplateIncompatibleException(
devicePath.getFullPath() + measurements,
templateInfo.left.getName(),
templateInfo.right));
}
}
} finally {
DataNodeSchemaLockManager.getInstance().releaseReadLock(context);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private DataNodeSchemaLockManager() {
final int lockNum = SchemaLockType.values().length;
this.locks = new ReentrantReadWriteLock[lockNum];
for (int i = 0; i < lockNum; i++) {
locks[i] = new ReentrantReadWriteLock(false);
locks[i] = new ReentrantReadWriteLock(true);
}
}

Expand All @@ -49,11 +49,14 @@ public void takeReadLock(final MPPQueryContext context, final SchemaLockType loc
}
}

// This is called at the very last to guarantee safety
// And can be also called when the read's safety is guaranteed to reduce lock granularity
public void releaseReadLock(final MPPQueryContext queryContext) {
if (queryContext != null && !queryContext.getAcquiredLocks().isEmpty()) {
queryContext
.getAcquiredLocks()
.forEach(lockType -> locks[lockType.ordinal()].readLock().unlock());
queryContext.getAcquiredLocks().clear();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,17 @@ public enum SchemaLockType {
* <li>Release write lock after finishing invalidating schema cache.
* </ol>
*/
VALIDATE_VS_DELETION,
VALIDATE_VS_DELETION_TREE,

/**
* This lock is used for guarantee no data without schema after table related deletion.
*
* <ol>
* <li>Take read lock before validating schema during inserting data or loading TsFile.
* <li>Release read lock after finishing inserting data or loading TsFile.
* <li>Take write lock before invalidating table device cache during table related deletion.
* <li>Release write lock after finishing invalidating table device cache.
* </ol>
*/
VALIDATE_VS_DELETION_TABLE,
}
Loading
Loading