diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 13fd46ba540e..7e75f74e6ac9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -198,6 +198,22 @@ public static FragmentInstanceContext createFragmentInstanceContext( return instanceContext; } + @TestOnly + public static FragmentInstanceContext createFragmentInstanceContext( + FragmentInstanceId id, + FragmentInstanceStateMachine stateMachine, + MemoryReservationManager memoryReservationManager) { + FragmentInstanceContext instanceContext = + new FragmentInstanceContext( + id, + stateMachine, + new SessionInfo(1, "test", ZoneId.systemDefault()), + memoryReservationManager); + instanceContext.initialize(); + instanceContext.start(); + return instanceContext; + } + private FragmentInstanceContext( FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, @@ -230,6 +246,20 @@ private FragmentInstanceContext( new ThreadSafeMemoryReservationManager(id.getQueryId(), this.getClass().getName()); } + private FragmentInstanceContext( + FragmentInstanceId id, + FragmentInstanceStateMachine stateMachine, + SessionInfo sessionInfo, + MemoryReservationManager memoryReservationManager) { + this.id = id; + this.stateMachine = stateMachine; + this.executionEndTime.set(END_TIME_INITIAL_VALUE); + this.sessionInfo = sessionInfo; + this.dataNodeQueryContextMap = null; + this.dataNodeQueryContext = null; + this.memoryReservationManager = memoryReservationManager; + } + private FragmentInstanceContext( FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, @@ -687,11 +717,11 @@ public void releaseResourceWhenAllDriversAreClosed() { private void releaseTVListOwnedByQuery() { for (TVList tvList : tvListSet) { tvList.lockQueryList(); - List queryContextList = tvList.getQueryContextList(); + Set queryContextSet = tvList.getQueryContextSet(); try { - queryContextList.remove(this); + queryContextSet.remove(this); if (tvList.getOwnerQuery() == this) { - if (queryContextList.isEmpty()) { + if (queryContextSet.isEmpty()) { LOGGER.debug( "TVList {} is released by the query, FragmentInstance Id is {}", tvList, @@ -699,11 +729,13 @@ private void releaseTVListOwnedByQuery() { memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize()); tvList.clear(); } else { + FragmentInstanceContext queryContext = + (FragmentInstanceContext) queryContextSet.iterator().next(); LOGGER.debug( "TVList {} is now owned by another query, FragmentInstance Id is {}", tvList, - ((FragmentInstanceContext) queryContextList.get(0)).getId()); - tvList.setOwnerQuery(queryContextList.get(0)); + queryContext.getId()); + tvList.setOwnerQuery(queryContext); } } } finally { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java index c6c7e3386696..b0c4daa85c28 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java @@ -140,7 +140,7 @@ protected Map prepareTvListMapForQuery( try { LOGGER.debug( "Flushing/Working MemTable - add current query context to immutable TVList's query list"); - tvList.getQueryContextList().add(context); + tvList.getQueryContextSet().add(context); tvListQueryMap.put(tvList, tvList.rowCount()); } finally { tvList.unlockQueryList(); @@ -160,13 +160,13 @@ protected Map prepareTvListMapForQuery( if (!isWorkMemTable) { LOGGER.debug( "Flushing MemTable - add current query context to mutable TVList's query list"); - list.getQueryContextList().add(context); + list.getQueryContextSet().add(context); tvListQueryMap.put(list, list.rowCount()); } else { - if (list.isSorted() || list.getQueryContextList().isEmpty()) { + if (list.isSorted() || list.getQueryContextSet().isEmpty()) { LOGGER.debug( "Working MemTable - add current query context to mutable TVList's query list when it's sorted or no other query on it"); - list.getQueryContextList().add(context); + list.getQueryContextSet().add(context); tvListQueryMap.put(list, list.rowCount()); } else { /* @@ -185,7 +185,7 @@ protected Map prepareTvListMapForQuery( */ LOGGER.debug( "Working MemTable - clone mutable TVList and replace old TVList in working MemTable"); - QueryContext firstQuery = list.getQueryContextList().get(0); + QueryContext firstQuery = list.getQueryContextSet().iterator().next(); // reserve query memory if (firstQuery instanceof FragmentInstanceContext) { MemoryReservationManager memoryReservationManager = @@ -196,7 +196,7 @@ protected Map prepareTvListMapForQuery( // clone TVList cloneList = list.clone(); - cloneList.getQueryContextList().add(context); + cloneList.getQueryContextSet().add(context); tvListQueryMap.put(cloneList, cloneList.rowCount()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java new file mode 100644 index 000000000000..d4b4cfde3833 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.memtable; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; +import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; +import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; +import org.apache.iotdb.db.utils.datastructure.TVList; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.write.chunk.IChunkWriter; +import org.apache.tsfile.write.schema.IMeasurementSchema; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +public abstract class AbstractWritableMemChunk implements IWritableMemChunk { + protected static long RETRY_INTERVAL_MS = 100L; + protected static long MAX_WAIT_QUERY_MS = 60 * 1000L; + + /** + * Release the TVList if there is no query on it. Otherwise, it should set the first query as the + * owner. TVList is released until all queries finish. If it throws memory-not-enough exception + * during owner transfer, retry the release process after 100ms. If the problem is still not + * solved in 60s, it starts to abort first query, kick it out of the query list and retry. This + * method must ensure success because it's part of flushing. + * + * @param tvList + */ + protected void maybeReleaseTvList(TVList tvList) { + long startTimeInMs = System.currentTimeMillis(); + boolean succeed = false; + while (!succeed) { + try { + tryReleaseTvList(tvList); + succeed = true; + } catch (MemoryNotEnoughException ex) { + long waitQueryInMs = System.currentTimeMillis() - startTimeInMs; + if (waitQueryInMs > MAX_WAIT_QUERY_MS) { + // Abort first query in the list. When all queries in the list have been aborted, + // tryReleaseTvList will ensure succeed finally. + tvList.lockQueryList(); + try { + // fail the first query + Iterator iterator = tvList.getQueryContextSet().iterator(); + if (iterator.hasNext()) { + FragmentInstanceContext firstQuery = (FragmentInstanceContext) iterator.next(); + firstQuery.failed( + new MemoryNotEnoughException( + "Memory not enough to clone the tvlist during flush phase")); + } + } finally { + tvList.unlockQueryList(); + } + } + + // sleep 100ms to retry + try { + Thread.sleep(RETRY_INTERVAL_MS); + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + } + } + } + + private void tryReleaseTvList(TVList tvList) { + tvList.lockQueryList(); + try { + if (tvList.getQueryContextSet().isEmpty()) { + tvList.clear(); + } else { + QueryContext firstQuery = tvList.getQueryContextSet().iterator().next(); + // transfer memory from write process to read process. Here it reserves read memory and + // releaseFlushedMemTable will release write memory. + if (firstQuery instanceof FragmentInstanceContext) { + MemoryReservationManager memoryReservationManager = + ((FragmentInstanceContext) firstQuery).getMemoryReservationContext(); + memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize()); + } + // update current TVList owner to first query in the list + tvList.setOwnerQuery(firstQuery); + } + } finally { + tvList.unlockQueryList(); + } + } + + @Override + public abstract void putLong(long t, long v); + + @Override + public abstract void putInt(long t, int v); + + @Override + public abstract void putFloat(long t, float v); + + @Override + public abstract void putDouble(long t, double v); + + @Override + public abstract void putBinary(long t, Binary v); + + @Override + public abstract void putBoolean(long t, boolean v); + + @Override + public abstract void putAlignedRow(long t, Object[] v); + + @Override + public abstract void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end); + + @Override + public abstract void putInts(long[] t, int[] v, BitMap bitMap, int start, int end); + + @Override + public abstract void putFloats(long[] t, float[] v, BitMap bitMap, int start, int end); + + @Override + public abstract void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end); + + @Override + public abstract void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int end); + + @Override + public abstract void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end); + + @Override + public abstract void putAlignedTablet( + long[] t, Object[] v, BitMap[] bitMaps, int start, int end, TSStatus[] results); + + @Override + public abstract void writeNonAlignedPoint(long insertTime, Object objectValue); + + @Override + public abstract void writeAlignedPoints( + long insertTime, Object[] objectValue, List schemaList); + + @Override + public abstract void writeNonAlignedTablet( + long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end); + + @Override + public abstract void writeAlignedTablet( + long[] times, + Object[] valueList, + BitMap[] bitMaps, + List schemaList, + int start, + int end, + TSStatus[] results); + + @Override + public abstract long count(); + + @Override + public abstract long rowCount(); + + @Override + public abstract IMeasurementSchema getSchema(); + + @Override + public abstract void sortTvListForFlush(); + + @Override + public abstract int delete(long lowerBound, long upperBound); + + @Override + public abstract IChunkWriter createIChunkWriter(); + + @Override + public abstract void encode(BlockingQueue ioTaskQueue); + + @Override + public abstract void release(); + + @Override + public abstract boolean isEmpty(); + + @Override + public abstract List getSortedList(); + + @Override + public abstract TVList getWorkingTVList(); + + @Override + public abstract void setWorkingTVList(TVList list); + + @Override + public abstract void serializeToWAL(IWALByteBufferView buffer); + + @Override + public abstract int serializedSize(); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index b61f2a56cf43..19a054d06253 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -22,9 +22,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; -import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; -import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; import org.apache.iotdb.db.utils.datastructure.AlignedTVList; @@ -61,7 +58,7 @@ import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted; -public class AlignedWritableMemChunk implements IWritableMemChunk { +public class AlignedWritableMemChunk extends AbstractWritableMemChunk { private final Map measurementIndexMap; private final List dataTypes; @@ -196,32 +193,10 @@ public void writeNonAlignedTablet( } protected void handoverAlignedTvList() { - // ensure query contexts won't be removed from list during handover process. - list.lockQueryList(); - try { - if (list.isSorted()) { - sortedList.add(list); - } else if (list.getQueryContextList().isEmpty()) { - list.sort(); - sortedList.add(list); - } else { - QueryContext firstQuery = list.getQueryContextList().get(0); - // reserve query memory - if (firstQuery instanceof FragmentInstanceContext) { - MemoryReservationManager memoryReservationManager = - ((FragmentInstanceContext) firstQuery).getMemoryReservationContext(); - memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize()); - } - // update current TVList owner to first query in the list - list.setOwnerQuery(firstQuery); - // clone tv list - AlignedTVList cloneList = list.clone(); - cloneList.sort(); - sortedList.add(cloneList); - } - } finally { - list.unlockQueryList(); + if (!list.isSorted()) { + list.sort(); } + sortedList.add(list); this.list = AlignedTVList.newAlignedList(dataTypes); } @@ -255,114 +230,6 @@ public void writeAlignedTablet( } } - /** - * Check metadata of columns and return array that mapping existed metadata to index of data - * column. - * - * @param schemaListInInsertPlan Contains all existed schema in InsertPlan. If some timeseries - * have been deleted, there will be null in its slot. - * @return columnIndexArray: schemaList[i] is schema of columns[columnIndexArray[i]] - */ - private Pair checkAndReorderColumnValuesInInsertPlan( - List schemaListInInsertPlan, Object[] columnValues, BitMap[] bitMaps) { - Object[] reorderedColumnValues = new Object[schemaList.size()]; - BitMap[] reorderedBitMaps = bitMaps == null ? null : new BitMap[schemaList.size()]; - for (int i = 0; i < schemaListInInsertPlan.size(); i++) { - IMeasurementSchema measurementSchema = schemaListInInsertPlan.get(i); - if (measurementSchema != null) { - Integer index = this.measurementIndexMap.get(measurementSchema.getMeasurementName()); - // Index is null means this measurement was not in this AlignedTVList before. - // We need to extend a new column in AlignedMemChunk and AlignedTVList. - // And the reorderedColumnValues should extend one more column for the new measurement - if (index == null) { - index = - measurementIndexMap.isEmpty() - ? 0 - : measurementIndexMap.values().stream() - .mapToInt(Integer::intValue) - .max() - .getAsInt() - + 1; - this.measurementIndexMap.put(schemaListInInsertPlan.get(i).getMeasurementName(), index); - this.schemaList.add(schemaListInInsertPlan.get(i)); - this.list.extendColumn(schemaListInInsertPlan.get(i).getType()); - reorderedColumnValues = - Arrays.copyOf(reorderedColumnValues, reorderedColumnValues.length + 1); - if (reorderedBitMaps != null) { - reorderedBitMaps = Arrays.copyOf(reorderedBitMaps, reorderedBitMaps.length + 1); - } - } - reorderedColumnValues[index] = columnValues[i]; - if (bitMaps != null) { - reorderedBitMaps[index] = bitMaps[i]; - } - } - } - return new Pair<>(reorderedColumnValues, reorderedBitMaps); - } - - private void filterDeletedTimeStamp( - AlignedTVList alignedTVList, - List> valueColumnsDeletionList, - boolean ignoreAllNullRows, - Map timestampWithBitmap) { - BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap(); - - int rowCount = alignedTVList.rowCount(); - List valueColumnDeleteCursor = new ArrayList<>(); - if (valueColumnsDeletionList != null) { - valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new int[] {0})); - } - - for (int row = 0; row < rowCount; row++) { - // the row is deleted - if (allValueColDeletedMap != null && allValueColDeletedMap.isMarked(row)) { - continue; - } - long timestamp = alignedTVList.getTime(row); - - BitMap bitMap = new BitMap(schemaList.size()); - for (int column = 0; column < schemaList.size(); column++) { - if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row), column)) { - bitMap.mark(column); - } - - // skip deleted row - if (valueColumnsDeletionList != null - && !valueColumnsDeletionList.isEmpty() - && isPointDeleted( - timestamp, - valueColumnsDeletionList.get(column), - valueColumnDeleteCursor.get(column))) { - bitMap.mark(column); - } - - // skip all-null row - if (ignoreAllNullRows && bitMap.isAllMarked()) { - continue; - } - timestampWithBitmap.put(timestamp, bitMap); - } - } - } - - public long[] getFilteredTimestamp( - List> deletionList, List bitMaps, boolean ignoreAllNullRows) { - Map timestampWithBitmap = new TreeMap<>(); - - filterDeletedTimeStamp(list, deletionList, ignoreAllNullRows, timestampWithBitmap); - for (AlignedTVList alignedTVList : sortedList) { - filterDeletedTimeStamp(alignedTVList, deletionList, ignoreAllNullRows, timestampWithBitmap); - } - - List filteredTimestamps = new ArrayList<>(); - for (Map.Entry entry : timestampWithBitmap.entrySet()) { - filteredTimestamps.add(entry.getKey()); - bitMaps.add(entry.getValue()); - } - return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray(); - } - @Override public AlignedTVList getWorkingTVList() { return list; @@ -420,41 +287,6 @@ public long getMinTime() { return minTime; } - @Override - public synchronized TVList getSortedTvListForQuery() { - sortTVList(); - // increase reference count - list.increaseReferenceCount(); - return list; - } - - @Override - public synchronized TVList getSortedTvListForQuery( - List schemaList, boolean ignoreAllNullRows) { - sortTVList(); - // increase reference count - list.increaseReferenceCount(); - List columnIndexList = new ArrayList<>(); - List dataTypeList = new ArrayList<>(); - for (IMeasurementSchema measurementSchema : schemaList) { - columnIndexList.add( - measurementIndexMap.getOrDefault(measurementSchema.getMeasurementName(), -1)); - dataTypeList.add(measurementSchema.getType()); - } - return list.getTvListByColumnIndex(columnIndexList, dataTypeList, ignoreAllNullRows); - } - - private void sortTVList() { - // check reference count - if ((list.getReferenceCount() > 0 && !list.isSorted())) { - list = list.clone(); - } - - if (!list.isSorted()) { - list.sort(); - } - } - @Override public synchronized void sortTvListForFlush() { if (!list.isSorted()) { @@ -808,28 +640,6 @@ public synchronized void encode(BlockingQueue ioTaskQueue) { } } - private void maybeReleaseTvList(AlignedTVList alignedTvList) { - alignedTvList.lockQueryList(); - try { - if (alignedTvList.getQueryContextList().isEmpty()) { - alignedTvList.clear(); - } else { - QueryContext firstQuery = alignedTvList.getQueryContextList().get(0); - // transfer memory from write process to read process. Here it reserves read memory and - // releaseFlushedMemTable will release write memory. - if (firstQuery instanceof FragmentInstanceContext) { - MemoryReservationManager memoryReservationManager = - ((FragmentInstanceContext) firstQuery).getMemoryReservationContext(); - memoryReservationManager.reserveMemoryCumulatively(alignedTvList.calculateRamSize()); - } - // update current TVList owner to first query in the list - alignedTvList.setOwnerQuery(firstQuery); - } - } finally { - alignedTvList.unlockQueryList(); - } - } - @Override public void release() { maybeReleaseTvList(list); @@ -863,14 +673,19 @@ public boolean isEmpty() { if (measurementIndexMap.isEmpty()) { return true; } - if (list.getAllValueColDeletedMap() == null - || !list.getAllValueColDeletedMap().isAllMarked()) { - return false; + + if (list.rowCount() > 0) { + BitMap allValueColDeletedMap = list.getAllValueColDeletedMap(); + if (allValueColDeletedMap == null || !allValueColDeletedMap.isAllMarked()) { + return false; + } } for (AlignedTVList alignedTvList : sortedList) { - if (alignedTvList.getAllValueColDeletedMap() == null - || !alignedTvList.getAllValueColDeletedMap().isAllMarked()) { - return false; + if (alignedTvList.rowCount() > 0) { + BitMap allValueColDeletedMap = alignedTvList.getAllValueColDeletedMap(); + if (allValueColDeletedMap == null || !allValueColDeletedMap.isAllMarked()) { + return false; + } } } return true; @@ -970,4 +785,112 @@ public List buildColumnIndexList(List schemaList) { } return columnIndexList; } + + /** + * Check metadata of columns and return array that mapping existed metadata to index of data + * column. + * + * @param schemaListInInsertPlan Contains all existed schema in InsertPlan. If some timeseries + * have been deleted, there will be null in its slot. + * @return columnIndexArray: schemaList[i] is schema of columns[columnIndexArray[i]] + */ + private Pair checkAndReorderColumnValuesInInsertPlan( + List schemaListInInsertPlan, Object[] columnValues, BitMap[] bitMaps) { + Object[] reorderedColumnValues = new Object[schemaList.size()]; + BitMap[] reorderedBitMaps = bitMaps == null ? null : new BitMap[schemaList.size()]; + for (int i = 0; i < schemaListInInsertPlan.size(); i++) { + IMeasurementSchema measurementSchema = schemaListInInsertPlan.get(i); + if (measurementSchema != null) { + Integer index = this.measurementIndexMap.get(measurementSchema.getMeasurementName()); + // Index is null means this measurement was not in this AlignedTVList before. + // We need to extend a new column in AlignedMemChunk and AlignedTVList. + // And the reorderedColumnValues should extend one more column for the new measurement + if (index == null) { + index = + measurementIndexMap.isEmpty() + ? 0 + : measurementIndexMap.values().stream() + .mapToInt(Integer::intValue) + .max() + .getAsInt() + + 1; + this.measurementIndexMap.put(schemaListInInsertPlan.get(i).getMeasurementName(), index); + this.schemaList.add(schemaListInInsertPlan.get(i)); + this.list.extendColumn(schemaListInInsertPlan.get(i).getType()); + reorderedColumnValues = + Arrays.copyOf(reorderedColumnValues, reorderedColumnValues.length + 1); + if (reorderedBitMaps != null) { + reorderedBitMaps = Arrays.copyOf(reorderedBitMaps, reorderedBitMaps.length + 1); + } + } + reorderedColumnValues[index] = columnValues[i]; + if (bitMaps != null) { + reorderedBitMaps[index] = bitMaps[i]; + } + } + } + return new Pair<>(reorderedColumnValues, reorderedBitMaps); + } + + private void filterDeletedTimeStamp( + AlignedTVList alignedTVList, + List> valueColumnsDeletionList, + boolean ignoreAllNullRows, + Map timestampWithBitmap) { + BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap(); + + int rowCount = alignedTVList.rowCount(); + List valueColumnDeleteCursor = new ArrayList<>(); + if (valueColumnsDeletionList != null) { + valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new int[] {0})); + } + + for (int row = 0; row < rowCount; row++) { + // the row is deleted + if (allValueColDeletedMap != null && allValueColDeletedMap.isMarked(row)) { + continue; + } + long timestamp = alignedTVList.getTime(row); + + BitMap bitMap = new BitMap(schemaList.size()); + for (int column = 0; column < schemaList.size(); column++) { + if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row), column)) { + bitMap.mark(column); + } + + // skip deleted row + if (valueColumnsDeletionList != null + && !valueColumnsDeletionList.isEmpty() + && isPointDeleted( + timestamp, + valueColumnsDeletionList.get(column), + valueColumnDeleteCursor.get(column))) { + bitMap.mark(column); + } + + // skip all-null row + if (ignoreAllNullRows && bitMap.isAllMarked()) { + continue; + } + timestampWithBitmap.put(timestamp, bitMap); + } + } + } + + public long[] getFilteredTimestamp( + List> deletionList, List bitMaps, boolean ignoreAllNullRows) { + Map timestampWithBitmap = new TreeMap<>(); + + filterDeletedTimeStamp(list, deletionList, ignoreAllNullRows, timestampWithBitmap); + for (AlignedTVList alignedTVList : sortedList) { + filterDeletedTimeStamp(alignedTVList, deletionList, ignoreAllNullRows, timestampWithBitmap); + } + + List filteredTimestamps = new ArrayList<>(); + for (Map.Entry entry : timestampWithBitmap.entrySet()) { + filteredTimestamps.add(entry.getKey()); + bitMaps.add(entry.getValue()); + } + return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java index 3e5cd3a8b160..26abdf1d39c4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java @@ -88,37 +88,6 @@ void writeAlignedTablet( IMeasurementSchema getSchema(); - /** - * served for read requests. - * - *

if tv list has been sorted, just return reference of it - * - *

if tv list hasn't been sorted and has no reference, sort and return reference of it - * - *

if tv list hasn't been sorted and has reference we should copy and sort it, then return ths - * list - * - *

the mechanism is just like copy on write - * - *

This interface should be synchronized for concurrent with sortTvListForFlush - * - * @return sorted tv list - */ - TVList getSortedTvListForQuery(); - - /** - * served for vector read requests. - * - *

the mechanism is just like copy on write - * - *

This interface should be synchronized for concurrent with sortTvListForFlush - * - * @param ignoreAllNullRows whether to ignore all null rows, true for tree model, false for table - * model - * @return sorted tv list - */ - TVList getSortedTvListForQuery(List schemaList, boolean ignoreAllNullRows); - /** * served for flush requests. The logic is just same as getSortedTVListForQuery, but without add * reference count diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index 56fd922b2067..5c8a41621ffd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java @@ -22,9 +22,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; -import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; -import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.utils.ModificationUtils; import org.apache.iotdb.db.utils.datastructure.MemPointIterator; @@ -55,7 +52,7 @@ import static org.apache.iotdb.db.utils.MemUtils.getBinarySize; -public class WritableMemChunk implements IWritableMemChunk { +public class WritableMemChunk extends AbstractWritableMemChunk { private IMeasurementSchema schema; private TVList list; @@ -78,46 +75,10 @@ public WritableMemChunk(IMeasurementSchema schema) { private WritableMemChunk() {} protected void handoverTvList() { - // ensure query contexts won't be removed from list during handover process. - list.lockQueryList(); - try { - if (list.isSorted()) { - sortedList.add(list); - } else if (list.getQueryContextList().isEmpty()) { - list.sort(); - sortedList.add(list); - } else { - /* - * +----------------------+ - * | MemTable | - * | | - * | +---------------+ | +----------+ - * | | sorted TVList | | +---+ Query | - * | +------^--------+ | | +----------+ - * | | | | - * +----------+-----------+ | - * | Clone + Sort | - * +-----+------+ | - * | TVList | <---------+ - * +------------+ - */ - QueryContext firstQuery = list.getQueryContextList().get(0); - // reserve query memory - if (firstQuery instanceof FragmentInstanceContext) { - MemoryReservationManager memoryReservationManager = - ((FragmentInstanceContext) firstQuery).getMemoryReservationContext(); - memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize()); - } - // update current TVList owner to first query in the list - list.setOwnerQuery(firstQuery); - // clone tv list - TVList cloneList = list.clone(); - cloneList.sort(); - sortedList.add(cloneList); - } - } finally { - list.unlockQueryList(); + if (!list.isSorted()) { + list.sort(); } + sortedList.add(list); this.list = TVList.newList(schema.getType()); } @@ -283,31 +244,6 @@ public void putAlignedTablet( throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); } - @Override - public synchronized TVList getSortedTvListForQuery() { - sortTVList(); - // increase reference count - list.increaseReferenceCount(); - return list; - } - - @Override - public synchronized TVList getSortedTvListForQuery( - List measurementSchema, boolean ignoreAllNullRows) { - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + list.getDataType()); - } - - private void sortTVList() { - // check reference count - if ((list.getReferenceCount() > 0 && !list.isSorted())) { - list = list.clone(); - } - - if (!list.isSorted()) { - list.sort(); - } - } - @Override public synchronized void sortTvListForFlush() { if (!list.isSorted()) { @@ -315,43 +251,6 @@ public synchronized void sortTvListForFlush() { } } - private void filterDeletedTimestamp( - TVList tvlist, List deletionList, List timestampList) { - long lastTime = Long.MIN_VALUE; - int[] deletionCursor = {0}; - int rowCount = tvlist.rowCount(); - for (int i = 0; i < rowCount; i++) { - if (tvlist.getBitMap() != null && tvlist.isNullValue(tvlist.getValueIndex(i))) { - continue; - } - long curTime = tvlist.getTime(i); - if (deletionList != null - && ModificationUtils.isPointDeleted(curTime, deletionList, deletionCursor)) { - continue; - } - - if (i == rowCount - 1 || curTime != lastTime) { - timestampList.add(curTime); - } - lastTime = curTime; - } - } - - public long[] getFilteredTimestamp(List deletionList) { - List timestampList = new ArrayList<>(); - filterDeletedTimestamp(list, deletionList, timestampList); - for (TVList tvList : sortedList) { - filterDeletedTimestamp(tvList, deletionList, timestampList); - } - - // remove duplicated time - List distinctTimestamps = timestampList.stream().distinct().collect(Collectors.toList()); - // sort timestamps - long[] filteredTimestamps = distinctTimestamps.stream().mapToLong(Long::longValue).toArray(); - Arrays.sort(filteredTimestamps); - return filteredTimestamps; - } - @Override public TVList getWorkingTVList() { return list; @@ -651,34 +550,6 @@ public synchronized void encode(BlockingQueue ioTaskQueue) { } } - /** - * Release process for memtable flush. Release the TVList if there is no query on it, otherwise - * set query owner and release the TVList until query finishes. - * - * @param tvList - */ - private void maybeReleaseTvList(TVList tvList) { - tvList.lockQueryList(); - try { - if (tvList.getQueryContextList().isEmpty()) { - tvList.clear(); - } else { - QueryContext firstQuery = tvList.getQueryContextList().get(0); - // transfer memory from write process to read process. Here it reserves read memory and - // releaseFlushedMemTable will release write memory. - if (firstQuery instanceof FragmentInstanceContext) { - MemoryReservationManager memoryReservationManager = - ((FragmentInstanceContext) firstQuery).getMemoryReservationContext(); - memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize()); - } - // update current TVList owner to first query in the list - tvList.setOwnerQuery(firstQuery); - } - } finally { - tvList.unlockQueryList(); - } - } - @Override public void release() { maybeReleaseTvList(list); @@ -734,4 +605,41 @@ public static WritableMemChunk deserializeSingleTVListMemChunks(DataInputStream public List getSortedList() { return sortedList; } + + private void filterDeletedTimestamp( + TVList tvlist, List deletionList, List timestampList) { + long lastTime = Long.MIN_VALUE; + int[] deletionCursor = {0}; + int rowCount = tvlist.rowCount(); + for (int i = 0; i < rowCount; i++) { + if (tvlist.getBitMap() != null && tvlist.isNullValue(tvlist.getValueIndex(i))) { + continue; + } + long curTime = tvlist.getTime(i); + if (deletionList != null + && ModificationUtils.isPointDeleted(curTime, deletionList, deletionCursor)) { + continue; + } + + if (i == rowCount - 1 || curTime != lastTime) { + timestampList.add(curTime); + } + lastTime = curTime; + } + } + + public long[] getFilteredTimestamp(List deletionList) { + List timestampList = new ArrayList<>(); + filterDeletedTimestamp(list, deletionList, timestampList); + for (TVList tvList : sortedList) { + filterDeletedTimestamp(tvList, deletionList, timestampList); + } + + // remove duplicated time + List distinctTimestamps = timestampList.stream().distinct().collect(Collectors.toList()); + // sort timestamps + long[] filteredTimestamps = distinctTimestamps.stream().mapToLong(Long::longValue).toArray(); + Arrays.sort(filteredTimestamps); + return filteredTimestamps; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index f73880c2310c..3ce296e694f3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -146,7 +146,7 @@ public TVList getTvListByColumnIndex( } @Override - public AlignedTVList clone() { + public synchronized AlignedTVList clone() { AlignedTVList cloneList = AlignedTVList.newAlignedList(dataTypes); cloneAs(cloneList); cloneList.timeDeletedCnt = this.timeDeletedCnt; @@ -1547,7 +1547,7 @@ public class AlignedTVListIterator extends TVListIterator implements MemPointIte private final List columnIndexList; private final List timeColumnDeletion; private final List> valueColumnsDeletionList; - private final Integer floatPrecision; + private final int floatPrecision; private final List encodingList; private final boolean ignoreAllNullRows; @@ -1576,7 +1576,7 @@ public AlignedTVListIterator( ? IntStream.range(0, dataTypes.size()).boxed().collect(Collectors.toList()) : columnIndexList; this.allValueColDeletedMap = ignoreAllNullRows ? getAllValueColDeletedMap() : null; - this.floatPrecision = floatPrecision; + this.floatPrecision = floatPrecision != null ? floatPrecision : 0; this.encodingList = encodingList; this.timeColumnDeletion = timeColumnDeletion; this.valueColumnsDeletionList = valueColumnsDeletionList; @@ -1677,7 +1677,7 @@ public TimeValuePair currentTimeValuePair() { } public TsPrimitiveType getPrimitiveTypeObject(int rowIndex, int columnIndex) { - int valueIndex = getValueIndex(index); + int valueIndex = getValueIndex(rowIndex); if (valueIndex < 0 || valueIndex >= rows) { return null; } @@ -1701,19 +1701,19 @@ public TsPrimitiveType getPrimitiveTypeObject(int rowIndex, int columnIndex) { return TsPrimitiveType.getByType( TSDataType.INT64, getLongByValueIndex(valueIndex, validColumnIndex)); case FLOAT: - return TsPrimitiveType.getByType( - TSDataType.FLOAT, - roundValueWithGivenPrecision( - getFloatByValueIndex(valueIndex, validColumnIndex), - floatPrecision, - encodingList.get(columnIndex))); + float valueF = getFloatByValueIndex(valueIndex, validColumnIndex); + if (encodingList != null) { + valueF = + roundValueWithGivenPrecision(valueF, floatPrecision, encodingList.get(columnIndex)); + } + return TsPrimitiveType.getByType(TSDataType.FLOAT, valueF); case DOUBLE: - return TsPrimitiveType.getByType( - TSDataType.DOUBLE, - roundValueWithGivenPrecision( - getDoubleByValueIndex(valueIndex, validColumnIndex), - floatPrecision, - encodingList.get(columnIndex))); + double valueD = getDoubleByValueIndex(valueIndex, validColumnIndex); + if (encodingList != null) { + valueD = + roundValueWithGivenPrecision(valueD, floatPrecision, encodingList.get(columnIndex)); + } + return TsPrimitiveType.getByType(TSDataType.DOUBLE, valueD); case TEXT: case BLOB: case STRING: @@ -1861,18 +1861,22 @@ public TsBlock nextBatch() { valueBuilder.writeLong(getLongByValueIndex(originRowIndex, validColumnIndex)); break; case FLOAT: - valueBuilder.writeFloat( - roundValueWithGivenPrecision( - getFloatByValueIndex(originRowIndex, validColumnIndex), - floatPrecision, - encodingList.get(columnIndex))); + float valueF = getFloatByValueIndex(originRowIndex, validColumnIndex); + if (encodingList != null) { + valueF = + roundValueWithGivenPrecision( + valueF, floatPrecision, encodingList.get(columnIndex)); + } + valueBuilder.writeFloat(valueF); break; case DOUBLE: - valueBuilder.writeDouble( - roundValueWithGivenPrecision( - getDoubleByValueIndex(originRowIndex, validColumnIndex), - floatPrecision, - encodingList.get(columnIndex))); + double valueD = getDoubleByValueIndex(originRowIndex, validColumnIndex); + if (encodingList != null) { + valueD = + roundValueWithGivenPrecision( + valueD, floatPrecision, encodingList.get(columnIndex)); + } + valueBuilder.writeDouble(valueD); break; case TEXT: case BLOB: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java index fd26eded1ac2..dc4ff5529d45 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java @@ -64,7 +64,7 @@ public static BinaryTVList newList() { } @Override - public BinaryTVList clone() { + public synchronized BinaryTVList clone() { BinaryTVList cloneList = BinaryTVList.newList(); cloneAs(cloneList); cloneBitMap(cloneList); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java index 2a84c13f5462..b8eb0e508bfe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java @@ -63,7 +63,7 @@ public static BooleanTVList newList() { } @Override - public BooleanTVList clone() { + public synchronized BooleanTVList clone() { BooleanTVList cloneList = BooleanTVList.newList(); cloneAs(cloneList); cloneBitMap(cloneList); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java index 5f82527ca765..f61995ef0628 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java @@ -64,7 +64,7 @@ public static DoubleTVList newList() { } @Override - public DoubleTVList clone() { + public synchronized DoubleTVList clone() { DoubleTVList cloneList = DoubleTVList.newList(); cloneAs(cloneList); cloneBitMap(cloneList); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java index de79955a2e73..3623fa49a3ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java @@ -64,7 +64,7 @@ public static FloatTVList newList() { } @Override - public FloatTVList clone() { + public synchronized FloatTVList clone() { FloatTVList cloneList = FloatTVList.newList(); cloneAs(cloneList); cloneBitMap(cloneList); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java index f3312ebd9c20..758cd64053bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java @@ -63,7 +63,7 @@ public static IntTVList newList() { } @Override - public IntTVList clone() { + public synchronized IntTVList clone() { IntTVList cloneList = IntTVList.newList(); cloneAs(cloneList); cloneBitMap(cloneList); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java index bea59266fcd4..7b4bd8d82d26 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java @@ -63,7 +63,7 @@ public static LongTVList newList() { } @Override - public LongTVList clone() { + public synchronized LongTVList clone() { LongTVList cloneList = LongTVList.newList(); cloneAs(cloneList); cloneBitMap(cloneList); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java index 82c3563411ab..1201f54a5090 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java @@ -151,8 +151,9 @@ public TsBlock nextBatch() { continue; } + int valueIndex = alignedTVList.getValueIndex(currentRowIndex(columnIndex)); // null value - if (alignedTVList.isNullValue(currentRowIndex(columnIndex), validColumnIndex)) { + if (alignedTVList.isNullValue(valueIndex, validColumnIndex)) { valueBuilder.appendNull(); continue; } @@ -160,22 +161,18 @@ public TsBlock nextBatch() { switch (tsDataTypeList.get(columnIndex)) { case BOOLEAN: valueBuilder.writeBoolean( - alignedTVList.getBooleanByValueIndex( - currentRowIndex(columnIndex), validColumnIndex)); + alignedTVList.getBooleanByValueIndex(valueIndex, validColumnIndex)); break; case INT32: case DATE: - valueBuilder.writeInt( - alignedTVList.getIntByValueIndex(currentRowIndex(columnIndex), validColumnIndex)); + valueBuilder.writeInt(alignedTVList.getIntByValueIndex(valueIndex, validColumnIndex)); break; case INT64: case TIMESTAMP: - valueBuilder.writeLong( - alignedTVList.getLongByValueIndex(currentRowIndex(columnIndex), validColumnIndex)); + valueBuilder.writeLong(alignedTVList.getLongByValueIndex(valueIndex, validColumnIndex)); break; case FLOAT: - float valueF = - alignedTVList.getFloatByValueIndex(currentRowIndex(columnIndex), validColumnIndex); + float valueF = alignedTVList.getFloatByValueIndex(valueIndex, validColumnIndex); if (encodingList != null) { valueF = alignedTVList.roundValueWithGivenPrecision( @@ -184,8 +181,7 @@ public TsBlock nextBatch() { valueBuilder.writeFloat(valueF); break; case DOUBLE: - double valueD = - alignedTVList.getDoubleByValueIndex(currentRowIndex(columnIndex), validColumnIndex); + double valueD = alignedTVList.getDoubleByValueIndex(valueIndex, validColumnIndex); if (encodingList != null) { valueD = alignedTVList.roundValueWithGivenPrecision( @@ -197,8 +193,7 @@ public TsBlock nextBatch() { case BLOB: case STRING: valueBuilder.writeBinary( - alignedTVList.getBinaryByValueIndex( - currentRowIndex(columnIndex), validColumnIndex)); + alignedTVList.getBinaryByValueIndex(valueIndex, validColumnIndex)); break; default: throw new UnSupportedDataTypeException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index 45ddce42a9fb..a4b3e71ab2f3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -42,7 +42,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -71,8 +73,8 @@ public abstract class TVList implements WALEntryValue { // lock to provide synchronization for query list private final ReentrantLock queryListLock = new ReentrantLock(); - // list of query that this TVList is used - protected final List queryContextList; + // set of query that this TVList is used + protected final Set queryContextSet; // the owner query which is obligated to release the TVList. // When it is null, the TVList is owned by insert thread and released after flush. @@ -94,7 +96,7 @@ protected TVList() { seqRowCount = 0; maxTime = Long.MIN_VALUE; minTime = Long.MAX_VALUE; - queryContextList = new ArrayList<>(); + queryContextSet = new HashSet<>(); referenceCount = new AtomicInteger(); } @@ -419,7 +421,7 @@ public int delete(long lowerBound, long upperBound) { } // common clone for both TVList and AlignedTVList - protected synchronized void cloneAs(TVList cloneList) { + protected void cloneAs(TVList cloneList) { // clone timestamps for (long[] timestampArray : timestamps) { cloneList.timestamps.add(cloneTime(timestampArray)); @@ -444,7 +446,7 @@ public void clear() { sorted = true; maxTime = Long.MIN_VALUE; minTime = Long.MAX_VALUE; - queryContextList.clear(); + queryContextSet.clear(); ownerQuery = null; clearTime(); clearValue(); @@ -622,8 +624,8 @@ public QueryContext getOwnerQuery() { return ownerQuery; } - public List getQueryContextList() { - return queryContextList; + public Set getQueryContextSet() { + return queryContextSet; } public List getBitMap() { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java index 3d2387674899..d64e7c7e5cd7 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java @@ -18,17 +18,34 @@ */ package org.apache.iotdb.db.storageengine.dataregion.memtable; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.NonAlignedFullPath; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException; +import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException; +import org.apache.iotdb.db.queryengine.execution.driver.IDriver; +import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager; +import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceExecution; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; +import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler; +import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALByteBufferForTest; @@ -52,6 +69,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import java.io.IOException; import java.nio.ByteBuffer; @@ -62,8 +80,13 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.ExecutorService; + +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; public class PrimitiveMemTableTest { + private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig(); + private static final int dataNodeId = 0; String database = "root.test"; String dataRegionId = "1"; @@ -96,6 +119,7 @@ public class PrimitiveMemTableTest { @Before public void setUp() { delta = Math.pow(0.1, TSFileDescriptor.getInstance().getConfig().getFloatPrecision()); + conf.setDataNodeId(dataNodeId); } @Test @@ -190,6 +214,10 @@ public void simpleTest() throws IOException, QueryProcessException, MetadataExce @Test public void totalSeriesNumberTest() throws IOException, QueryProcessException, MetadataException { + IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig(); + int dataNodeId = 0; + conf.setDataNodeId(dataNodeId); + IMemTable memTable = new PrimitiveMemTable(database, dataRegionId); int count = 10; String deviceId = "d1"; @@ -582,4 +610,64 @@ public void testSerializeSize() // TODO: revert until TsFile is updated // assertEquals(0, walBuffer.getBuffer().remaining()); } + + @Test + public void testReleaseWithNotEnoughMemory() throws CpuNotEnoughException { + TSDataType dataType = TSDataType.INT32; + WritableMemChunk series = + new WritableMemChunk(new MeasurementSchema("s1", dataType, TSEncoding.PLAIN)); + int count = 100; + for (int i = 0; i < count; i++) { + series.writeNonAlignedPoint(i, i); + } + + // mock MemoryNotEnoughException exception + TVList list = series.getWorkingTVList(); + + // mock MemoryReservationManager + MemoryReservationManager memoryReservationManager = + Mockito.mock(MemoryReservationManager.class); + Mockito.doThrow(new MemoryNotEnoughException("")) + .when(memoryReservationManager) + .reserveMemoryCumulatively(list.calculateRamSize()); + + // create FragmentInstanceId + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext queryContext = + createFragmentInstanceContext(instanceId, stateMachine, memoryReservationManager); + queryContext.initializeNumOfDrivers(1); + DataRegion dataRegion = Mockito.mock(DataRegion.class); + queryContext.setDataRegion(dataRegion); + + list.getQueryContextSet().add(queryContext); + Map tvlistMap = new HashMap<>(); + tvlistMap.put(list, 100); + queryContext.addTVListToSet(tvlistMap); + + // fragment instance execution + IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class); + List drivers = Collections.emptyList(); + ISink sinkHandle = Mockito.mock(ISink.class); + MPPDataExchangeManager exchangeManager = Mockito.mock(MPPDataExchangeManager.class); + FragmentInstanceExecution execution = + FragmentInstanceExecution.createFragmentInstanceExecution( + scheduler, + instanceId, + queryContext, + drivers, + sinkHandle, + stateMachine, + -1, + false, + exchangeManager); + + queryContext.decrementNumOfUnClosedDriver(); + series.release(); + } }