From 21c151883305e083b6d221a817322d9914896266 Mon Sep 17 00:00:00 2001 From: Ezequiel Werner Date: Thu, 20 Feb 2025 15:59:49 -0300 Subject: [PATCH] Only control retry policy template lifecycle if the source is its actual owner --- .../runtime/source/ExtensionMessageSource.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/modules/extensions-support/src/main/java/org/mule/runtime/module/extension/internal/runtime/source/ExtensionMessageSource.java b/modules/extensions-support/src/main/java/org/mule/runtime/module/extension/internal/runtime/source/ExtensionMessageSource.java index ae5000e164c3..824cd56e2915 100644 --- a/modules/extensions-support/src/main/java/org/mule/runtime/module/extension/internal/runtime/source/ExtensionMessageSource.java +++ b/modules/extensions-support/src/main/java/org/mule/runtime/module/extension/internal/runtime/source/ExtensionMessageSource.java @@ -188,6 +188,7 @@ public class ExtensionMessageSource extends ExtensionComponent impl private final String applicationName; private final AtomicBoolean started = new AtomicBoolean(false); + private boolean shouldControlRetryPolicyLifecycle; public ExtensionMessageSource(ExtensionModel extensionModel, SourceModel sourceModel, @@ -203,6 +204,7 @@ public ExtensionMessageSource(ExtensionModel extensionModel, this.sourceModel = sourceModel; this.sourceAdapterFactory = sourceAdapterFactory; this.customRetryPolicyTemplate = retryPolicyTemplate; + this.shouldControlRetryPolicyLifecycle = retryPolicyTemplate != null; this.primaryNodeOnly = primaryNodeOnly; this.backPressureStrategy = backPressureStrategy; this.notificationDispatcher = notificationDispatcher; @@ -226,7 +228,9 @@ private synchronized void createSource(boolean restarting) throws Exception { restarting); muleContext.getInjector().inject(sourceAdapter); retryPolicyTemplate = createRetryPolicyTemplate(customRetryPolicyTemplate); - initialiseIfNeeded(retryPolicyTemplate, true, muleContext); + if (shouldControlRetryPolicyLifecycle) { + initialiseIfNeeded(retryPolicyTemplate, true, muleContext); + } } finally { if (initialiserEvent != null) { ((BaseEventContext) initialiserEvent.getContext()).success(); @@ -486,7 +490,9 @@ private void reallyDoStart() throws MuleException { LOGGER.debug("Message source '{}' on flow '{}' is starting", sourceModel.getName(), getLocation().getRootContainerName()); lifecycle(() -> lifecycleManager.fireStartPhase((phase, o) -> { - startIfNeeded(retryPolicyTemplate); + if (shouldControlRetryPolicyLifecycle) { + startIfNeeded(retryPolicyTemplate); + } if (retryScheduler == null) { retryScheduler = schedulerService.ioScheduler(); @@ -519,8 +525,10 @@ public void doDispose() { try { safeLifecycle(() -> lifecycleManager.fireDisposePhase((phase, o) -> { disposeSource(); - stopIfNeeded(retryPolicyTemplate); - disposeIfNeeded(retryPolicyTemplate, LOGGER); + if (shouldControlRetryPolicyLifecycle) { + stopIfNeeded(retryPolicyTemplate); + disposeIfNeeded(retryPolicyTemplate, LOGGER); + } stopSchedulers(); })); } catch (MuleException e) {