Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

After changes thats fixes 25151 issue facing problems with java.lang.IllegalStateException: task ... already marked as finished #25217

Closed
Albel-Analyst opened this issue Mar 5, 2025 · 11 comments · Fixed by #25333
Assignees

Comments

@Albel-Analyst
Copy link

I am facing a problem after building from current master branch. There was no such problem on 471, but I had to build from master because of this issue. #25151

Now, periodically I encounter this problem.:

java.lang.IllegalStateException: task 20250305_065017_09674_pwtt5.3.1.0 already marked as finished
	at com.google.common.base.Preconditions.checkState(Preconditions.java:602)
	at io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler$StagePartition.taskFailed(EventDrivenFaultTolerantQueryScheduler.java:2973)
	at io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler$StageExecution.taskFailed(EventDrivenFaultTolerantQueryScheduler.java:2533)
	at io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler$Scheduler.onRemoteTaskCompleted(EventDrivenFaultTolerantQueryScheduler.java:1796)
	at io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler$Scheduler.onRemoteTaskCompleted(EventDrivenFaultTolerantQueryScheduler.java:675)
	at io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler$RemoteTaskCompletedEvent.accept(EventDrivenFaultTolerantQueryScheduler.java:3410)
	at io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler$Scheduler.processEvents(EventDrivenFaultTolerantQueryScheduler.java:928)
	at io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler$Scheduler.run(EventDrivenFaultTolerantQueryScheduler.java:848)
	at io.trino.$gen.Trino_v1_3_1____20250304_070114_2.run(Unknown Source)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:75)
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1575)

If it's important, then I use
connector.name=iceberg
iceberg.catalog.type=rest
retry-policy=TASK

@wendigo
Copy link
Contributor

wendigo commented Mar 5, 2025

@losipiuk ptal

@Albel-Analyst
Copy link
Author

After I removed the retry-policy, the errors stopped, but everything worked stably on previous versions.

@grumbler
Copy link

Having the same issue with 471, trino in k8s, iceberg with s3 and glue.

Occurs 100% when running alter table execute optimize on a large table.
Haven't tried without retry-policy yet as we need this for other queries.

Is there anything I could help with? E.g. logs / test other versions?

@wufh43
Copy link

wufh43 commented Mar 17, 2025

We are also experiencing this issue, hard to pin down, but happens when we have large queries that call slow functions, and retry-policy task.

@losipiuk losipiuk self-assigned this Mar 17, 2025
@losipiuk
Copy link
Member

@grumbler do you have log lines starting with

Did not receive final task info for task

In your logs? I expect you have. Can you quote those? I have high confidence I know which new code path you are hitting. But not sure yet what is the root cause.

@losipiuk
Copy link
Member

Also can you build version from #25333 and see how it looks like there. I would expect that it would still fail - but probably with more reasonable error message. Probably we will need to tweak one timeout too.

@losipiuk
Copy link
Member

We are also experiencing this issue, hard to pin down, but happens when we have large queries that call slow functions, and retry-policy task.

@wufh43 How long do the functions run? Also can you look into logs and see if you have anything with Did not receive final task info for task and paste results here with some context?

@losipiuk
Copy link
Member

After I removed the retry-policy, the errors stopped, but everything worked stably on previous versions.

@Albel-Analyst yeah - issue is FTE specific. Would you be able to build from #25333 and see how it behaves?

@grumbler
Copy link

@losipiuk sure, let me spin up a test instance on 472 and grab the logs.

Yep, I have quite a lot of those

2025-03-18T12:49:33.360Z	ERROR	stage-scheduler	io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler	Did not receive final task info for task FAILED after it 20250318_124101_00000_piwx3.2.345.0; internal inconsistency; marking task failed in scheduler to unblock query progression

From the client side:

Query 20250318_124101_00000_piwx3, FAILED, 11 nodes
Splits: 6,412 total, 3,621 done (56.47%)
5:40 [8.27M rows, 12.7GiB] [24.3K rows/s, 38.2MiB/s]

Query 20250318_124101_00000_piwx3 failed: task 20250318_124101_00000_piwx3.2.325.0 already marked as finished

build from MR - I won't have time for that until next week, unfortunately.

@losipiuk
Copy link
Member

Thanks.

Would you be fine with sharing query text and plan (Output of EXPLAIN ...)? Can be stripped from any parts you see sensitive of course.

@grumbler
Copy link

Sure!
Query is pretty simple - it's optimize for iceberg table.

I've replaced catalogName.schemaName.tableName with full_table_name

alter table ful_table_name EXECUTE optimize;

explain:

Trino version: 472
Fragment 0 [COORDINATOR_ONLY]
    Output layout: [rows]
    Output partitioning: SINGLE []
    Output[columnNames = [rows]]
    │   Layout: [rows:bigint]
    │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
    └─ TableCommit[target = iceberg:schemaTableName:full_table_name, procedureId:OPTIMIZE, procedureHandle:{IcebergOptimizeHandle[snapshotId=Optional[2040365380846482166], schemaAsJson={""type"":""struct"",""schema-id"":0,""fields"":[{""id"":1,""name"":""key"",""required"":true,""type"":""string""},{""id"":2,""name"":""offset"",""required"":true,""type"":""long""},{""id"":3,""name"":""partition"",""required"":true,""type"":""int""},{""id"":4,""name"":""tombstone"",""required"":true,""type"":""boolean""},{""id"":5,""name"":""topic"",""required"":true,""type"":""string""},{""id"":6,""name"":""value"",""required"":false,""type"":""binary""}]}, partitionSpecAsJson={""spec-id"":0,""fields"":[]}, tableColumns=[1:key:varchar, 2:offset:bigint, 3:partition:integer, 4:tombstone:boolean, 5:topic:varchar, 6:value:varbinary], sortOrder=[], fileFormat=PARQUET, tableStorageProperties={write.parquet.compression-codec=zstd}, maxScannedFileSize=100MB, retriesEnabled=true]}]
       │   Layout: [rows:bigint]
       └─ LocalExchange[partitioning = SINGLE]
          │   Layout: [partialrows:bigint, fragment:varbinary]
          │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
          └─ RemoteSource[sourceFragmentIds = [1]]
                 Layout: [partialrows:bigint, fragment:varbinary]

Fragment 1 [ROUND_ROBIN (scale writers)]
    Output layout: [partialrows, fragment]
    Output partitioning: SINGLE []
    TableExecute[]
    │   Layout: [partialrows:bigint, fragment:varbinary]
    │   key := key
    │   offset := offset
    │   partition := partition
    │   tombstone := tombstone
    │   topic := topic
    │   value := value
    └─ LocalExchange[partitioning = SINGLE]
       │   Layout: [key:varchar, offset:bigint, partition:integer, tombstone:boolean, topic:varchar, value:varbinary]
       │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
       └─ RemoteSource[sourceFragmentIds = [2]]
              Layout: [key:varchar, offset:bigint, partition:integer, tombstone:boolean, topic:varchar, value:varbinary]

Fragment 2 [SOURCE]
    Output layout: [key, offset, partition, tombstone, topic, value]
    Output partitioning: ROUND_ROBIN (scale writers) []
    TableScan[table = iceberg:full_table_name$data@2040365380846482166]
        Layout: [key:varchar, offset:bigint, partition:integer, tombstone:boolean, topic:varchar, value:varbinary]
        Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        key := 1:key:varchar
        offset := 2:offset:bigint
        partition := 3:partition:integer
        value := 6:value:varbinary
        tombstone := 4:tombstone:boolean
        topic := 5:topic:varchar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging a pull request may close this issue.

5 participants