From 99a9ff6fd13eafd53f05a61e4ff7bb9997f9b414 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Mon, 17 Mar 2025 19:28:16 +0100 Subject: [PATCH 1/2] Fix parameter ordering --- .../faulttolerant/EventDrivenFaultTolerantQueryScheduler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 830f22a4599c..da07040daf4a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -1683,7 +1683,7 @@ public Void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkIns }, NO_FINAL_TASK_INFO_CHECK_INTERVAL.toMillis(), MILLISECONDS); case CANCELED, ABORTED, FAILED -> scheduledExecutorService.schedule(() -> { if (!finalTaskInfoReceived.get()) { - log.error("Did not receive final task info for task %s after it %s; internal inconsistency; marking task failed in scheduler to unblock query progression", taskStatus.getState(), task.getTaskId()); + log.error("Did not receive final task info for task %s after it %s; internal inconsistency; marking task failed in scheduler to unblock query progression", task.getTaskId(), taskStatus.getState()); eventQueue.add(new RemoteTaskCompletedEvent(taskStatus)); } }, NO_FINAL_TASK_INFO_CHECK_INTERVAL.toMillis(), MILLISECONDS); From 683e6e381ec5fe008a2ec2fef2eeb678cb8bc984 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Mon, 17 Mar 2025 19:39:22 +0100 Subject: [PATCH 2/2] Handle race conditition in receiving final task info It seems to happen that final task info can be received in rare cases much after we taskStatus is marked as completed. In such case RemoteTaskCompletedEvent was processed twice. * first event sent by preventive mechanism; handling tasks for which final task info was lost * second event which is late arrival. In such case we triggered check in taskFailed as single task was marked as failed twice. --- .../EventDrivenFaultTolerantQueryScheduler.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index da07040daf4a..64ef7c0a1cae 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -1670,6 +1670,7 @@ public Void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkIns // the code below is a stop-gap to mitigate this issue and unblock or fail query // until we find and fix the bug AtomicBoolean finalTaskInfoReceived = new AtomicBoolean(); + AtomicBoolean taskCompletedEventSent = new AtomicBoolean(); task.addStateChangeListener(taskStatus -> { if (!taskStatus.getState().isDone()) { return; @@ -1683,8 +1684,10 @@ public Void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkIns }, NO_FINAL_TASK_INFO_CHECK_INTERVAL.toMillis(), MILLISECONDS); case CANCELED, ABORTED, FAILED -> scheduledExecutorService.schedule(() -> { if (!finalTaskInfoReceived.get()) { - log.error("Did not receive final task info for task %s after it %s; internal inconsistency; marking task failed in scheduler to unblock query progression", task.getTaskId(), taskStatus.getState()); - eventQueue.add(new RemoteTaskCompletedEvent(taskStatus)); + if (taskCompletedEventSent.compareAndSet(false, true)) { + log.error("Did not receive final task info for task %s after it %s; internal inconsistency; marking task failed in scheduler to unblock query progression", task.getTaskId(), taskStatus.getState()); + eventQueue.add(new RemoteTaskCompletedEvent(taskStatus)); + } } }, NO_FINAL_TASK_INFO_CHECK_INTERVAL.toMillis(), MILLISECONDS); default -> throw new IllegalStateException("Unexpected task state: " + taskStatus.getState()); @@ -1693,7 +1696,14 @@ public Void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkIns task.addFinalTaskInfoListener(_ -> finalTaskInfoReceived.set(true)); task.addFinalTaskInfoListener(taskExecutionStats::update); - task.addFinalTaskInfoListener(taskInfo -> eventQueue.add(new RemoteTaskCompletedEvent(taskInfo.taskStatus()))); + task.addFinalTaskInfoListener(taskInfo -> { + if (taskCompletedEventSent.compareAndSet(false, true)) { + eventQueue.add(new RemoteTaskCompletedEvent(taskInfo.taskStatus())); + } + else { + log.warn("Final task info for task %s received late; state = %s", task.getTaskId(), taskInfo.taskStatus().getState()); + } + }); nodeLease.attachTaskId(task.getTaskId()); task.start(); if (queryStateMachine.getQueryState() == QueryState.STARTING) {