Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 7902d38

Browse files
committedMar 20, 2025·
fix: remove query context when abort finish
1 parent 57f6a0d commit 7902d38

File tree

5 files changed

+93
-7
lines changed

5 files changed

+93
-7
lines changed
 

‎iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java

+30
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,22 @@ public static FragmentInstanceContext createFragmentInstanceContext(
198198
return instanceContext;
199199
}
200200

201+
@TestOnly
202+
public static FragmentInstanceContext createFragmentInstanceContext(
203+
FragmentInstanceId id,
204+
FragmentInstanceStateMachine stateMachine,
205+
MemoryReservationManager memoryReservationManager) {
206+
FragmentInstanceContext instanceContext =
207+
new FragmentInstanceContext(
208+
id,
209+
stateMachine,
210+
new SessionInfo(1, "test", ZoneId.systemDefault()),
211+
memoryReservationManager);
212+
instanceContext.initialize();
213+
instanceContext.start();
214+
return instanceContext;
215+
}
216+
201217
private FragmentInstanceContext(
202218
FragmentInstanceId id,
203219
FragmentInstanceStateMachine stateMachine,
@@ -230,6 +246,20 @@ private FragmentInstanceContext(
230246
new ThreadSafeMemoryReservationManager(id.getQueryId(), this.getClass().getName());
231247
}
232248

249+
private FragmentInstanceContext(
250+
FragmentInstanceId id,
251+
FragmentInstanceStateMachine stateMachine,
252+
SessionInfo sessionInfo,
253+
MemoryReservationManager memoryReservationManager) {
254+
this.id = id;
255+
this.stateMachine = stateMachine;
256+
this.executionEndTime.set(END_TIME_INITIAL_VALUE);
257+
this.sessionInfo = sessionInfo;
258+
this.dataNodeQueryContextMap = null;
259+
this.dataNodeQueryContext = null;
260+
this.memoryReservationManager = memoryReservationManager;
261+
}
262+
233263
private FragmentInstanceContext(
234264
FragmentInstanceId id,
235265
FragmentInstanceStateMachine stateMachine,

‎iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java

-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ protected void maybeReleaseTvList(TVList tvList) {
6161
firstQuery.failed(
6262
new MemoryNotEnoughException(
6363
"Memory not enough to clone the tvlist during flush phase"));
64-
iterator.remove();
6564
}
6665
} finally {
6766
tvList.unlockQueryList();

‎iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java

-1
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,6 @@ private void sortTVList() {
433433
}
434434
}
435435

436-
// TODO: will remove clone logic later
437436
@Override
438437
public synchronized void sortTvListForFlush() {
439438
AlignedTVList cloneList = null;

‎iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java

-1
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,6 @@ private void sortTVList() {
271271
}
272272
}
273273

274-
// TODO: will remove clone logic later
275274
@Override
276275
public synchronized void sortTvListForFlush() {
277276
TVList cloneList = null;

‎iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java

+63-4
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,34 @@
1818
*/
1919
package org.apache.iotdb.db.storageengine.dataregion.memtable;
2020

21+
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
2122
import org.apache.iotdb.commons.exception.IllegalPathException;
2223
import org.apache.iotdb.commons.exception.MetadataException;
2324
import org.apache.iotdb.commons.path.AlignedFullPath;
2425
import org.apache.iotdb.commons.path.MeasurementPath;
2526
import org.apache.iotdb.commons.path.NonAlignedFullPath;
2627
import org.apache.iotdb.commons.path.PartialPath;
28+
import org.apache.iotdb.db.conf.IoTDBConfig;
29+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2730
import org.apache.iotdb.db.exception.WriteProcessException;
2831
import org.apache.iotdb.db.exception.query.QueryProcessException;
32+
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
33+
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
34+
import org.apache.iotdb.db.queryengine.common.QueryId;
35+
import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
2936
import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
37+
import org.apache.iotdb.db.queryengine.execution.driver.IDriver;
38+
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager;
39+
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
3040
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
41+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceExecution;
42+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
3143
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
44+
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
3245
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
3346
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
3447
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
48+
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
3549
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
3650
import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry;
3751
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALByteBufferForTest;
@@ -66,8 +80,13 @@
6680
import java.util.List;
6781
import java.util.Map;
6882
import java.util.Random;
83+
import java.util.concurrent.ExecutorService;
84+
85+
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
6986

7087
public class PrimitiveMemTableTest {
88+
private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
89+
private static final int dataNodeId = 0;
7190

7291
String database = "root.test";
7392
String dataRegionId = "1";
@@ -100,6 +119,7 @@ public class PrimitiveMemTableTest {
100119
@Before
101120
public void setUp() {
102121
delta = Math.pow(0.1, TSFileDescriptor.getInstance().getConfig().getFloatPrecision());
122+
conf.setDataNodeId(dataNodeId);
103123
}
104124

105125
@Test
@@ -194,6 +214,10 @@ public void simpleTest() throws IOException, QueryProcessException, MetadataExce
194214

195215
@Test
196216
public void totalSeriesNumberTest() throws IOException, QueryProcessException, MetadataException {
217+
IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
218+
int dataNodeId = 0;
219+
conf.setDataNodeId(dataNodeId);
220+
197221
IMemTable memTable = new PrimitiveMemTable(database, dataRegionId);
198222
int count = 10;
199223
String deviceId = "d1";
@@ -588,7 +612,7 @@ public void testSerializeSize()
588612
}
589613

590614
@Test
591-
public void testReleaseWithNotEnoughMemory() {
615+
public void testReleaseWithNotEnoughMemory() throws CpuNotEnoughException {
592616
TSDataType dataType = TSDataType.INT32;
593617
WritableMemChunk series =
594618
new WritableMemChunk(new MeasurementSchema("s1", dataType, TSEncoding.PLAIN));
@@ -599,16 +623,51 @@ public void testReleaseWithNotEnoughMemory() {
599623

600624
// mock MemoryNotEnoughException exception
601625
TVList list = series.getWorkingTVList();
626+
627+
// mock MemoryReservationManager
602628
MemoryReservationManager memoryReservationManager =
603629
Mockito.mock(MemoryReservationManager.class);
604630
Mockito.doThrow(new MemoryNotEnoughException(""))
605631
.when(memoryReservationManager)
606632
.reserveMemoryCumulatively(list.calculateRamSize());
607633

608-
FragmentInstanceContext queryContext = Mockito.mock(FragmentInstanceContext.class);
609-
Mockito.when(queryContext.getMemoryReservationContext()).thenReturn(memoryReservationManager);
610-
list.getQueryContextSet().add(queryContext);
634+
// create FragmentInstanceId
635+
QueryId queryId = new QueryId("stub_query");
636+
FragmentInstanceId instanceId =
637+
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
638+
ExecutorService instanceNotificationExecutor =
639+
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
640+
FragmentInstanceStateMachine stateMachine =
641+
new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
642+
FragmentInstanceContext queryContext =
643+
createFragmentInstanceContext(instanceId, stateMachine, memoryReservationManager);
644+
queryContext.initializeNumOfDrivers(1);
645+
DataRegion dataRegion = Mockito.mock(DataRegion.class);
646+
queryContext.setDataRegion(dataRegion);
611647

648+
list.getQueryContextSet().add(queryContext);
649+
Map<TVList, Integer> tvlistMap = new HashMap<>();
650+
tvlistMap.put(list, 100);
651+
queryContext.addTVListToSet(tvlistMap);
652+
653+
// fragment instance execution
654+
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
655+
List<IDriver> drivers = Collections.emptyList();
656+
ISink sinkHandle = Mockito.mock(ISink.class);
657+
MPPDataExchangeManager exchangeManager = Mockito.mock(MPPDataExchangeManager.class);
658+
FragmentInstanceExecution execution =
659+
FragmentInstanceExecution.createFragmentInstanceExecution(
660+
scheduler,
661+
instanceId,
662+
queryContext,
663+
drivers,
664+
sinkHandle,
665+
stateMachine,
666+
-1,
667+
false,
668+
exchangeManager);
669+
670+
queryContext.decrementNumOfUnClosedDriver();
612671
series.release();
613672
}
614673
}

0 commit comments

Comments
 (0)
Please sign in to comment.