Skip to content

Commit 4adabfd

Browse files
authoredMar 21, 2025··
fix: MemoryNotEnough exception when flushing try to release tvlist (#15143)
* fix: MemoryNotEnough exception when flushing try to release tvlist * retry release tvlist if memory not enough during flush * synchronized clone method for TVList * query context list -> query context set * fix: remove query context when abort finish * refactor: reorganize AbstractWritableMemChunk * fix: float/double value * fix: some testcase bugs
1 parent 421ed66 commit 4adabfd

File tree

16 files changed

+565
-427
lines changed

16 files changed

+565
-427
lines changed
 

‎iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java

+37-5
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,22 @@ public static FragmentInstanceContext createFragmentInstanceContext(
198198
return instanceContext;
199199
}
200200

201+
@TestOnly
202+
public static FragmentInstanceContext createFragmentInstanceContext(
203+
FragmentInstanceId id,
204+
FragmentInstanceStateMachine stateMachine,
205+
MemoryReservationManager memoryReservationManager) {
206+
FragmentInstanceContext instanceContext =
207+
new FragmentInstanceContext(
208+
id,
209+
stateMachine,
210+
new SessionInfo(1, "test", ZoneId.systemDefault()),
211+
memoryReservationManager);
212+
instanceContext.initialize();
213+
instanceContext.start();
214+
return instanceContext;
215+
}
216+
201217
private FragmentInstanceContext(
202218
FragmentInstanceId id,
203219
FragmentInstanceStateMachine stateMachine,
@@ -230,6 +246,20 @@ private FragmentInstanceContext(
230246
new ThreadSafeMemoryReservationManager(id.getQueryId(), this.getClass().getName());
231247
}
232248

249+
private FragmentInstanceContext(
250+
FragmentInstanceId id,
251+
FragmentInstanceStateMachine stateMachine,
252+
SessionInfo sessionInfo,
253+
MemoryReservationManager memoryReservationManager) {
254+
this.id = id;
255+
this.stateMachine = stateMachine;
256+
this.executionEndTime.set(END_TIME_INITIAL_VALUE);
257+
this.sessionInfo = sessionInfo;
258+
this.dataNodeQueryContextMap = null;
259+
this.dataNodeQueryContext = null;
260+
this.memoryReservationManager = memoryReservationManager;
261+
}
262+
233263
private FragmentInstanceContext(
234264
FragmentInstanceId id,
235265
FragmentInstanceStateMachine stateMachine,
@@ -687,23 +717,25 @@ public void releaseResourceWhenAllDriversAreClosed() {
687717
private void releaseTVListOwnedByQuery() {
688718
for (TVList tvList : tvListSet) {
689719
tvList.lockQueryList();
690-
List<QueryContext> queryContextList = tvList.getQueryContextList();
720+
Set<QueryContext> queryContextSet = tvList.getQueryContextSet();
691721
try {
692-
queryContextList.remove(this);
722+
queryContextSet.remove(this);
693723
if (tvList.getOwnerQuery() == this) {
694-
if (queryContextList.isEmpty()) {
724+
if (queryContextSet.isEmpty()) {
695725
LOGGER.debug(
696726
"TVList {} is released by the query, FragmentInstance Id is {}",
697727
tvList,
698728
this.getId());
699729
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
700730
tvList.clear();
701731
} else {
732+
FragmentInstanceContext queryContext =
733+
(FragmentInstanceContext) queryContextSet.iterator().next();
702734
LOGGER.debug(
703735
"TVList {} is now owned by another query, FragmentInstance Id is {}",
704736
tvList,
705-
((FragmentInstanceContext) queryContextList.get(0)).getId());
706-
tvList.setOwnerQuery(queryContextList.get(0));
737+
queryContext.getId());
738+
tvList.setOwnerQuery(queryContext);
707739
}
708740
}
709741
} finally {

‎iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
140140
try {
141141
LOGGER.debug(
142142
"Flushing/Working MemTable - add current query context to immutable TVList's query list");
143-
tvList.getQueryContextList().add(context);
143+
tvList.getQueryContextSet().add(context);
144144
tvListQueryMap.put(tvList, tvList.rowCount());
145145
} finally {
146146
tvList.unlockQueryList();
@@ -160,13 +160,13 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
160160
if (!isWorkMemTable) {
161161
LOGGER.debug(
162162
"Flushing MemTable - add current query context to mutable TVList's query list");
163-
list.getQueryContextList().add(context);
163+
list.getQueryContextSet().add(context);
164164
tvListQueryMap.put(list, list.rowCount());
165165
} else {
166-
if (list.isSorted() || list.getQueryContextList().isEmpty()) {
166+
if (list.isSorted() || list.getQueryContextSet().isEmpty()) {
167167
LOGGER.debug(
168168
"Working MemTable - add current query context to mutable TVList's query list when it's sorted or no other query on it");
169-
list.getQueryContextList().add(context);
169+
list.getQueryContextSet().add(context);
170170
tvListQueryMap.put(list, list.rowCount());
171171
} else {
172172
/*
@@ -185,7 +185,7 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
185185
*/
186186
LOGGER.debug(
187187
"Working MemTable - clone mutable TVList and replace old TVList in working MemTable");
188-
QueryContext firstQuery = list.getQueryContextList().get(0);
188+
QueryContext firstQuery = list.getQueryContextSet().iterator().next();
189189
// reserve query memory
190190
if (firstQuery instanceof FragmentInstanceContext) {
191191
MemoryReservationManager memoryReservationManager =
@@ -196,7 +196,7 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
196196

197197
// clone TVList
198198
cloneList = list.clone();
199-
cloneList.getQueryContextList().add(context);
199+
cloneList.getQueryContextSet().add(context);
200200
tvListQueryMap.put(cloneList, cloneList.rowCount());
201201
}
202202
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.storageengine.dataregion.memtable;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
24+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
25+
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
26+
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
27+
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
28+
import org.apache.iotdb.db.utils.datastructure.TVList;
29+
30+
import org.apache.tsfile.enums.TSDataType;
31+
import org.apache.tsfile.utils.Binary;
32+
import org.apache.tsfile.utils.BitMap;
33+
import org.apache.tsfile.write.chunk.IChunkWriter;
34+
import org.apache.tsfile.write.schema.IMeasurementSchema;
35+
36+
import java.util.Iterator;
37+
import java.util.List;
38+
import java.util.concurrent.BlockingQueue;
39+
40+
public abstract class AbstractWritableMemChunk implements IWritableMemChunk {
41+
protected static long RETRY_INTERVAL_MS = 100L;
42+
protected static long MAX_WAIT_QUERY_MS = 60 * 1000L;
43+
44+
/**
45+
* Release the TVList if there is no query on it. Otherwise, it should set the first query as the
46+
* owner. TVList is released until all queries finish. If it throws memory-not-enough exception
47+
* during owner transfer, retry the release process after 100ms. If the problem is still not
48+
* solved in 60s, it starts to abort first query, kick it out of the query list and retry. This
49+
* method must ensure success because it's part of flushing.
50+
*
51+
* @param tvList
52+
*/
53+
protected void maybeReleaseTvList(TVList tvList) {
54+
long startTimeInMs = System.currentTimeMillis();
55+
boolean succeed = false;
56+
while (!succeed) {
57+
try {
58+
tryReleaseTvList(tvList);
59+
succeed = true;
60+
} catch (MemoryNotEnoughException ex) {
61+
long waitQueryInMs = System.currentTimeMillis() - startTimeInMs;
62+
if (waitQueryInMs > MAX_WAIT_QUERY_MS) {
63+
// Abort first query in the list. When all queries in the list have been aborted,
64+
// tryReleaseTvList will ensure succeed finally.
65+
tvList.lockQueryList();
66+
try {
67+
// fail the first query
68+
Iterator<QueryContext> iterator = tvList.getQueryContextSet().iterator();
69+
if (iterator.hasNext()) {
70+
FragmentInstanceContext firstQuery = (FragmentInstanceContext) iterator.next();
71+
firstQuery.failed(
72+
new MemoryNotEnoughException(
73+
"Memory not enough to clone the tvlist during flush phase"));
74+
}
75+
} finally {
76+
tvList.unlockQueryList();
77+
}
78+
}
79+
80+
// sleep 100ms to retry
81+
try {
82+
Thread.sleep(RETRY_INTERVAL_MS);
83+
} catch (InterruptedException ignore) {
84+
Thread.currentThread().interrupt();
85+
}
86+
}
87+
}
88+
}
89+
90+
private void tryReleaseTvList(TVList tvList) {
91+
tvList.lockQueryList();
92+
try {
93+
if (tvList.getQueryContextSet().isEmpty()) {
94+
tvList.clear();
95+
} else {
96+
QueryContext firstQuery = tvList.getQueryContextSet().iterator().next();
97+
// transfer memory from write process to read process. Here it reserves read memory and
98+
// releaseFlushedMemTable will release write memory.
99+
if (firstQuery instanceof FragmentInstanceContext) {
100+
MemoryReservationManager memoryReservationManager =
101+
((FragmentInstanceContext) firstQuery).getMemoryReservationContext();
102+
memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize());
103+
}
104+
// update current TVList owner to first query in the list
105+
tvList.setOwnerQuery(firstQuery);
106+
}
107+
} finally {
108+
tvList.unlockQueryList();
109+
}
110+
}
111+
112+
@Override
113+
public abstract void putLong(long t, long v);
114+
115+
@Override
116+
public abstract void putInt(long t, int v);
117+
118+
@Override
119+
public abstract void putFloat(long t, float v);
120+
121+
@Override
122+
public abstract void putDouble(long t, double v);
123+
124+
@Override
125+
public abstract void putBinary(long t, Binary v);
126+
127+
@Override
128+
public abstract void putBoolean(long t, boolean v);
129+
130+
@Override
131+
public abstract void putAlignedRow(long t, Object[] v);
132+
133+
@Override
134+
public abstract void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end);
135+
136+
@Override
137+
public abstract void putInts(long[] t, int[] v, BitMap bitMap, int start, int end);
138+
139+
@Override
140+
public abstract void putFloats(long[] t, float[] v, BitMap bitMap, int start, int end);
141+
142+
@Override
143+
public abstract void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end);
144+
145+
@Override
146+
public abstract void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int end);
147+
148+
@Override
149+
public abstract void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end);
150+
151+
@Override
152+
public abstract void putAlignedTablet(
153+
long[] t, Object[] v, BitMap[] bitMaps, int start, int end, TSStatus[] results);
154+
155+
@Override
156+
public abstract void writeNonAlignedPoint(long insertTime, Object objectValue);
157+
158+
@Override
159+
public abstract void writeAlignedPoints(
160+
long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList);
161+
162+
@Override
163+
public abstract void writeNonAlignedTablet(
164+
long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end);
165+
166+
@Override
167+
public abstract void writeAlignedTablet(
168+
long[] times,
169+
Object[] valueList,
170+
BitMap[] bitMaps,
171+
List<IMeasurementSchema> schemaList,
172+
int start,
173+
int end,
174+
TSStatus[] results);
175+
176+
@Override
177+
public abstract long count();
178+
179+
@Override
180+
public abstract long rowCount();
181+
182+
@Override
183+
public abstract IMeasurementSchema getSchema();
184+
185+
@Override
186+
public abstract void sortTvListForFlush();
187+
188+
@Override
189+
public abstract int delete(long lowerBound, long upperBound);
190+
191+
@Override
192+
public abstract IChunkWriter createIChunkWriter();
193+
194+
@Override
195+
public abstract void encode(BlockingQueue<Object> ioTaskQueue);
196+
197+
@Override
198+
public abstract void release();
199+
200+
@Override
201+
public abstract boolean isEmpty();
202+
203+
@Override
204+
public abstract List<? extends TVList> getSortedList();
205+
206+
@Override
207+
public abstract TVList getWorkingTVList();
208+
209+
@Override
210+
public abstract void setWorkingTVList(TVList list);
211+
212+
@Override
213+
public abstract void serializeToWAL(IWALByteBufferView buffer);
214+
215+
@Override
216+
public abstract int serializedSize();
217+
}

0 commit comments

Comments
 (0)
Please sign in to comment.