diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 830f22a4599c..64ef7c0a1cae 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -1670,6 +1670,7 @@ public Void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkIns // the code below is a stop-gap to mitigate this issue and unblock or fail query // until we find and fix the bug AtomicBoolean finalTaskInfoReceived = new AtomicBoolean(); + AtomicBoolean taskCompletedEventSent = new AtomicBoolean(); task.addStateChangeListener(taskStatus -> { if (!taskStatus.getState().isDone()) { return; @@ -1683,8 +1684,10 @@ public Void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkIns }, NO_FINAL_TASK_INFO_CHECK_INTERVAL.toMillis(), MILLISECONDS); case CANCELED, ABORTED, FAILED -> scheduledExecutorService.schedule(() -> { if (!finalTaskInfoReceived.get()) { - log.error("Did not receive final task info for task %s after it %s; internal inconsistency; marking task failed in scheduler to unblock query progression", taskStatus.getState(), task.getTaskId()); - eventQueue.add(new RemoteTaskCompletedEvent(taskStatus)); + if (taskCompletedEventSent.compareAndSet(false, true)) { + log.error("Did not receive final task info for task %s after it %s; internal inconsistency; marking task failed in scheduler to unblock query progression", task.getTaskId(), taskStatus.getState()); + eventQueue.add(new RemoteTaskCompletedEvent(taskStatus)); + } } }, NO_FINAL_TASK_INFO_CHECK_INTERVAL.toMillis(), MILLISECONDS); default -> throw new IllegalStateException("Unexpected task state: " + taskStatus.getState()); @@ -1693,7 +1696,14 @@ public Void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkIns task.addFinalTaskInfoListener(_ -> finalTaskInfoReceived.set(true)); task.addFinalTaskInfoListener(taskExecutionStats::update); - task.addFinalTaskInfoListener(taskInfo -> eventQueue.add(new RemoteTaskCompletedEvent(taskInfo.taskStatus()))); + task.addFinalTaskInfoListener(taskInfo -> { + if (taskCompletedEventSent.compareAndSet(false, true)) { + eventQueue.add(new RemoteTaskCompletedEvent(taskInfo.taskStatus())); + } + else { + log.warn("Final task info for task %s received late; state = %s", task.getTaskId(), taskInfo.taskStatus().getState()); + } + }); nodeLease.attachTaskId(task.getTaskId()); task.start(); if (queryStateMachine.getQueryState() == QueryState.STARTING) {