Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: MemoryNotEnough exception when flushing try to release tvlist #15143

Merged
merged 5 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,22 @@ public static FragmentInstanceContext createFragmentInstanceContext(
return instanceContext;
}

@TestOnly
public static FragmentInstanceContext createFragmentInstanceContext(
FragmentInstanceId id,
FragmentInstanceStateMachine stateMachine,
MemoryReservationManager memoryReservationManager) {
FragmentInstanceContext instanceContext =
new FragmentInstanceContext(
id,
stateMachine,
new SessionInfo(1, "test", ZoneId.systemDefault()),
memoryReservationManager);
instanceContext.initialize();
instanceContext.start();
return instanceContext;
}

private FragmentInstanceContext(
FragmentInstanceId id,
FragmentInstanceStateMachine stateMachine,
Expand Down Expand Up @@ -230,6 +246,20 @@ private FragmentInstanceContext(
new ThreadSafeMemoryReservationManager(id.getQueryId(), this.getClass().getName());
}

private FragmentInstanceContext(
FragmentInstanceId id,
FragmentInstanceStateMachine stateMachine,
SessionInfo sessionInfo,
MemoryReservationManager memoryReservationManager) {
this.id = id;
this.stateMachine = stateMachine;
this.executionEndTime.set(END_TIME_INITIAL_VALUE);
this.sessionInfo = sessionInfo;
this.dataNodeQueryContextMap = null;
this.dataNodeQueryContext = null;
this.memoryReservationManager = memoryReservationManager;
}

private FragmentInstanceContext(
FragmentInstanceId id,
FragmentInstanceStateMachine stateMachine,
Expand Down Expand Up @@ -687,23 +717,25 @@ public void releaseResourceWhenAllDriversAreClosed() {
private void releaseTVListOwnedByQuery() {
for (TVList tvList : tvListSet) {
tvList.lockQueryList();
List<QueryContext> queryContextList = tvList.getQueryContextList();
Set<QueryContext> queryContextSet = tvList.getQueryContextSet();
try {
queryContextList.remove(this);
queryContextSet.remove(this);
if (tvList.getOwnerQuery() == this) {
if (queryContextList.isEmpty()) {
if (queryContextSet.isEmpty()) {
LOGGER.debug(
"TVList {} is released by the query, FragmentInstance Id is {}",
tvList,
this.getId());
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
tvList.clear();
} else {
FragmentInstanceContext queryContext =
(FragmentInstanceContext) queryContextSet.iterator().next();
LOGGER.debug(
"TVList {} is now owned by another query, FragmentInstance Id is {}",
tvList,
((FragmentInstanceContext) queryContextList.get(0)).getId());
tvList.setOwnerQuery(queryContextList.get(0));
queryContext.getId());
tvList.setOwnerQuery(queryContext);
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
try {
LOGGER.debug(
"Flushing/Working MemTable - add current query context to immutable TVList's query list");
tvList.getQueryContextList().add(context);
tvList.getQueryContextSet().add(context);
tvListQueryMap.put(tvList, tvList.rowCount());
} finally {
tvList.unlockQueryList();
Expand All @@ -160,13 +160,13 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
if (!isWorkMemTable) {
LOGGER.debug(
"Flushing MemTable - add current query context to mutable TVList's query list");
list.getQueryContextList().add(context);
list.getQueryContextSet().add(context);
tvListQueryMap.put(list, list.rowCount());
} else {
if (list.isSorted() || list.getQueryContextList().isEmpty()) {
if (list.isSorted() || list.getQueryContextSet().isEmpty()) {
LOGGER.debug(
"Working MemTable - add current query context to mutable TVList's query list when it's sorted or no other query on it");
list.getQueryContextList().add(context);
list.getQueryContextSet().add(context);
tvListQueryMap.put(list, list.rowCount());
} else {
/*
Expand All @@ -185,7 +185,7 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
*/
LOGGER.debug(
"Working MemTable - clone mutable TVList and replace old TVList in working MemTable");
QueryContext firstQuery = list.getQueryContextList().get(0);
QueryContext firstQuery = list.getQueryContextSet().iterator().next();
// reserve query memory
if (firstQuery instanceof FragmentInstanceContext) {
MemoryReservationManager memoryReservationManager =
Expand All @@ -196,7 +196,7 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(

// clone TVList
cloneList = list.clone();
cloneList.getQueryContextList().add(context);
cloneList.getQueryContextSet().add(context);
tvListQueryMap.put(cloneList, cloneList.rowCount());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.storageengine.dataregion.memtable;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.utils.datastructure.TVList;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.write.chunk.IChunkWriter;
import org.apache.tsfile.write.schema.IMeasurementSchema;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;

public abstract class AbstractWritableMemChunk implements IWritableMemChunk {
protected static long RETRY_INTERVAL_MS = 100L;
protected static long MAX_WAIT_QUERY_MS = 60 * 1000L;

/**
* Release the TVList if there is no query on it. Otherwise, it should set the first query as the
* owner. TVList is released until all queries finish. If it throws memory-not-enough exception
* during owner transfer, retry the release process after 100ms. If the problem is still not
* solved in 60s, it starts to abort first query, kick it out of the query list and retry. This
* method must ensure success because it's part of flushing.
*
* @param tvList
*/
protected void maybeReleaseTvList(TVList tvList) {
long startTimeInMs = System.currentTimeMillis();
boolean succeed = false;
while (!succeed) {
try {
tryReleaseTvList(tvList);
succeed = true;
} catch (MemoryNotEnoughException ex) {
long waitQueryInMs = System.currentTimeMillis() - startTimeInMs;
if (waitQueryInMs > MAX_WAIT_QUERY_MS) {
// Abort first query in the list. When all queries in the list have been aborted,
// tryReleaseTvList will ensure succeed finally.
tvList.lockQueryList();
try {
// fail the first query
Iterator<QueryContext> iterator = tvList.getQueryContextSet().iterator();
if (iterator.hasNext()) {
FragmentInstanceContext firstQuery = (FragmentInstanceContext) iterator.next();
firstQuery.failed(
new MemoryNotEnoughException(
"Memory not enough to clone the tvlist during flush phase"));
}
} finally {
tvList.unlockQueryList();
}
}

// sleep 100ms to retry
try {
Thread.sleep(RETRY_INTERVAL_MS);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
}
}

private void tryReleaseTvList(TVList tvList) {
tvList.lockQueryList();
try {
if (tvList.getQueryContextSet().isEmpty()) {
tvList.clear();
} else {
QueryContext firstQuery = tvList.getQueryContextSet().iterator().next();
// transfer memory from write process to read process. Here it reserves read memory and
// releaseFlushedMemTable will release write memory.
if (firstQuery instanceof FragmentInstanceContext) {
MemoryReservationManager memoryReservationManager =
((FragmentInstanceContext) firstQuery).getMemoryReservationContext();
memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize());
}
// update current TVList owner to first query in the list
tvList.setOwnerQuery(firstQuery);
}
} finally {
tvList.unlockQueryList();
}
}

@Override
public abstract void putLong(long t, long v);

@Override
public abstract void putInt(long t, int v);

@Override
public abstract void putFloat(long t, float v);

@Override
public abstract void putDouble(long t, double v);

@Override
public abstract void putBinary(long t, Binary v);

@Override
public abstract void putBoolean(long t, boolean v);

@Override
public abstract void putAlignedRow(long t, Object[] v);

@Override
public abstract void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end);

@Override
public abstract void putInts(long[] t, int[] v, BitMap bitMap, int start, int end);

@Override
public abstract void putFloats(long[] t, float[] v, BitMap bitMap, int start, int end);

@Override
public abstract void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end);

@Override
public abstract void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int end);

@Override
public abstract void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end);

@Override
public abstract void putAlignedTablet(
long[] t, Object[] v, BitMap[] bitMaps, int start, int end, TSStatus[] results);

@Override
public abstract void writeNonAlignedPoint(long insertTime, Object objectValue);

@Override
public abstract void writeAlignedPoints(
long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList);

@Override
public abstract void writeNonAlignedTablet(
long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end);

@Override
public abstract void writeAlignedTablet(
long[] times,
Object[] valueList,
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
int end,
TSStatus[] results);

@Override
public abstract long count();

@Override
public abstract long rowCount();

@Override
public abstract IMeasurementSchema getSchema();

@Override
public abstract void sortTvListForFlush();

@Override
public abstract int delete(long lowerBound, long upperBound);

@Override
public abstract IChunkWriter createIChunkWriter();

@Override
public abstract void encode(BlockingQueue<Object> ioTaskQueue);

@Override
public abstract void release();

@Override
public abstract boolean isEmpty();

@Override
public abstract List<? extends TVList> getSortedList();

@Override
public abstract TVList getWorkingTVList();

@Override
public abstract void setWorkingTVList(TVList list);

@Override
public abstract void serializeToWAL(IWALByteBufferView buffer);

@Override
public abstract int serializedSize();
}
Loading
Loading