diff --git a/mule-http-connector/pom.xml b/mule-http-connector/pom.xml index d7d2fe368..f618519bb 100644 --- a/mule-http-connector/pom.xml +++ b/mule-http-connector/pom.xml @@ -41,8 +41,8 @@ 1.1.3 1.1.3 1.0.0 - 0.7.4 - 0.7.0 + 0.12.0-SNAPSHOT + 0.12.0-SNAPSHOT src/test/munit diff --git a/mule-http-connector/src/main/java/org/mule/extension/http/api/SseClientAttributes.java b/mule-http-connector/src/main/java/org/mule/extension/http/api/SseClientAttributes.java new file mode 100644 index 000000000..e2dab7a40 --- /dev/null +++ b/mule-http-connector/src/main/java/org/mule/extension/http/api/SseClientAttributes.java @@ -0,0 +1,20 @@ +/* + * Copyright 2023 Salesforce, Inc. All rights reserved. + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.extension.http.api; + +public class SseClientAttributes { + + private final Long clientId; + + public SseClientAttributes(Long clientId) { + this.clientId = clientId; + } + + public Long getClientId() { + return clientId; + } +} diff --git a/mule-http-connector/src/main/java/org/mule/extension/http/api/SseEventAttributes.java b/mule-http-connector/src/main/java/org/mule/extension/http/api/SseEventAttributes.java new file mode 100644 index 000000000..43a6fc07c --- /dev/null +++ b/mule-http-connector/src/main/java/org/mule/extension/http/api/SseEventAttributes.java @@ -0,0 +1,34 @@ +/* + * Copyright 2023 Salesforce, Inc. All rights reserved. + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.extension.http.api; + +import org.mule.sdk.api.http.sse.ServerSentEvent; + +public class SseEventAttributes { + + private final String eventName; + private final String eventId; + private final Long retryDelay; + + public SseEventAttributes(ServerSentEvent event) { + eventName = event.getEventName(); + eventId = event.getId().orElse(null); + retryDelay = event.getRetryDelay().isPresent() ? event.getRetryDelay().getAsLong() : null; + } + + public String getEventName() { + return eventName; + } + + public String getId() { + return eventId; + } + + public Long getRetryDelay() { + return retryDelay; + } +} diff --git a/mule-http-connector/src/main/java/org/mule/extension/http/api/listener/server/HttpListenerConfig.java b/mule-http-connector/src/main/java/org/mule/extension/http/api/listener/server/HttpListenerConfig.java index af8246656..88c457c8b 100644 --- a/mule-http-connector/src/main/java/org/mule/extension/http/api/listener/server/HttpListenerConfig.java +++ b/mule-http-connector/src/main/java/org/mule/extension/http/api/listener/server/HttpListenerConfig.java @@ -19,6 +19,7 @@ import org.mule.extension.http.internal.listener.ListenerPath; import org.mule.extension.http.internal.listener.intercepting.HttpListenerInterceptor; import org.mule.extension.http.api.listener.intercepting.cors.CorsInterceptorWrapper; +import org.mule.extension.http.internal.sse.SseEndpoint; import org.mule.runtime.api.lifecycle.Initialisable; import org.mule.runtime.api.lifecycle.InitialisationException; import org.mule.runtime.api.util.MultiMap; @@ -36,7 +37,7 @@ */ @Configuration(name = "listenerConfig") @ConnectionProviders(HttpListenerProvider.class) -@Sources(HttpListener.class) +@Sources({HttpListener.class, SseEndpoint.class}) public class HttpListenerConfig implements Initialisable { /** diff --git a/mule-http-connector/src/main/java/org/mule/extension/http/internal/delegate/HttpServiceApiProxy.java b/mule-http-connector/src/main/java/org/mule/extension/http/internal/delegate/HttpServiceApiProxy.java new file mode 100644 index 000000000..867415ab6 --- /dev/null +++ b/mule-http-connector/src/main/java/org/mule/extension/http/internal/delegate/HttpServiceApiProxy.java @@ -0,0 +1,50 @@ +/* + * Copyright 2023 Salesforce, Inc. All rights reserved. + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.extension.http.internal.delegate; + +import org.mule.runtime.http.api.client.HttpClient; +import org.mule.runtime.http.api.server.HttpServer; +import org.mule.sdk.api.http.HttpServiceApi; +import org.mule.sdk.api.http.sse.ClientWithSse; +import org.mule.sdk.api.http.sse.ServerSentEventSource; +import org.mule.sdk.api.http.sse.ServerWithSse; +import org.mule.sdk.api.http.sse.SseClient; +import org.mule.sdk.api.http.sse.SseEndpointManager; +import org.mule.sdk.api.http.sse.SseRetryConfig; + +import java.util.Optional; +import java.util.function.Consumer; + +import javax.inject.Inject; +import javax.inject.Named; + +public class HttpServiceApiProxy implements HttpServiceApi { + + @Inject + @Named("_httpServiceApi") + private Optional forwardCompatibilityApi; + + @Override + public SseEndpointManager sseEndpoint(ServerWithSse httpServer, String ssePath, Consumer sseClientHandler) { + if (forwardCompatibilityApi.isPresent()) { + HttpServiceApi httpServiceApi = forwardCompatibilityApi.get(); + return httpServiceApi.sseEndpoint(httpServer, ssePath, sseClientHandler); + } else { + throw new IllegalStateException("Feature not implemented in this Mule Version"); + } + } + + @Override + public ServerSentEventSource sseSource(ClientWithSse httpClient, String url, SseRetryConfig retryConfig) { + if (forwardCompatibilityApi.isPresent()) { + HttpServiceApi httpServiceApi = forwardCompatibilityApi.get(); + return httpServiceApi.sseSource(httpClient, url, retryConfig); + } else { + throw new IllegalStateException("Feature not implemented in this Mule Version"); + } + } +} diff --git a/mule-http-connector/src/main/java/org/mule/extension/http/internal/listener/HttpServerDelegate.java b/mule-http-connector/src/main/java/org/mule/extension/http/internal/listener/HttpServerDelegate.java index 57019b25b..1ce1f889b 100644 --- a/mule-http-connector/src/main/java/org/mule/extension/http/internal/listener/HttpServerDelegate.java +++ b/mule-http-connector/src/main/java/org/mule/extension/http/internal/listener/HttpServerDelegate.java @@ -11,14 +11,18 @@ import org.mule.runtime.http.api.server.RequestHandler; import org.mule.runtime.http.api.server.RequestHandlerManager; import org.mule.runtime.http.api.server.ServerAddress; +import org.mule.sdk.api.http.sse.ServerWithSse; +import org.mule.sdk.api.http.sse.SseClient; +import org.mule.sdk.api.http.sse.SseEndpointManager; import java.io.IOException; import java.util.Collection; +import java.util.function.Consumer; /** * Base class for applying the delegate design pattern around an {@link HttpServer} */ -public class HttpServerDelegate implements HttpServer { +public class HttpServerDelegate implements HttpServer, ServerWithSse { private final HttpServer delegate; @@ -73,7 +77,12 @@ public RequestHandlerManager addRequestHandler(String path, RequestHandler reque return delegate.addRequestHandler(path, requestHandler); } - protected HttpServer getDelegate() { - return delegate; + @Override + public SseEndpointManager sse(String ssePath, Consumer sseClientHandler) { + if (delegate instanceof ServerWithSse) { + return ((ServerWithSse) delegate).sse(ssePath, sseClientHandler); + } else { + throw new UnsupportedOperationException("Server-sent events are not supported"); + } } } diff --git a/mule-http-connector/src/main/java/org/mule/extension/http/internal/request/HttpRequesterConfig.java b/mule-http-connector/src/main/java/org/mule/extension/http/internal/request/HttpRequesterConfig.java index 3bda22f3a..8823a83ac 100644 --- a/mule-http-connector/src/main/java/org/mule/extension/http/internal/request/HttpRequesterConfig.java +++ b/mule-http-connector/src/main/java/org/mule/extension/http/internal/request/HttpRequesterConfig.java @@ -12,6 +12,7 @@ import org.mule.extension.http.api.request.builder.QueryParam; import org.mule.extension.http.api.request.builder.RequestHeader; import org.mule.extension.http.api.streaming.HttpStreamingType; +import org.mule.extension.http.internal.sse.SseSource; import org.mule.runtime.api.lifecycle.Initialisable; import org.mule.runtime.api.lifecycle.InitialisationException; import org.mule.runtime.core.api.MuleContext; @@ -35,7 +36,7 @@ @Configuration(name = "requestConfig") @ConnectionProviders(HttpRequesterProvider.class) @Operations({HttpRequestOperations.class}) -@Sources(HttpPollingSource.class) +@Sources({HttpPollingSource.class, SseSource.class}) public class HttpRequesterConfig implements Initialisable, HttpRequesterCookieConfig { @ParameterGroup(name = URL_CONFIGURATION) diff --git a/mule-http-connector/src/main/java/org/mule/extension/http/internal/request/ShareableHttpClient.java b/mule-http-connector/src/main/java/org/mule/extension/http/internal/request/ShareableHttpClient.java index 84333a5b9..ff7252c5a 100644 --- a/mule-http-connector/src/main/java/org/mule/extension/http/internal/request/ShareableHttpClient.java +++ b/mule-http-connector/src/main/java/org/mule/extension/http/internal/request/ShareableHttpClient.java @@ -11,6 +11,9 @@ import org.mule.runtime.http.api.client.auth.HttpAuthentication; import org.mule.runtime.http.api.domain.message.request.HttpRequest; import org.mule.runtime.http.api.domain.message.response.HttpResponse; +import org.mule.sdk.api.http.sse.ClientWithSse; +import org.mule.sdk.api.http.sse.ServerSentEventSource; +import org.mule.sdk.api.http.sse.SseRetryConfig; import java.util.concurrent.CompletableFuture; @@ -18,7 +21,7 @@ * Wrapper implementation of an {@link HttpClient} that allows being shared by only configuring the client when first required and * only disabling it when last required. */ -public class ShareableHttpClient { +public class ShareableHttpClient implements ClientWithSse { private HttpClient delegate; private Integer usageCount = new Integer(0); @@ -52,4 +55,13 @@ public CompletableFuture sendAsync(HttpRequest request, int respon HttpSendBodyMode sendBodyMode) { return HttpClientReflection.sendAsync(delegate, request, responseTimeout, followRedirects, authentication, sendBodyMode); } + + @Override + public ServerSentEventSource sseSource(String url, SseRetryConfig retryConfig) { + if (delegate instanceof ClientWithSse) { + return ((ClientWithSse) delegate).sseSource(url, retryConfig); + } else { + throw new UnsupportedOperationException("SSE is not supported"); + } + } } diff --git a/mule-http-connector/src/main/java/org/mule/extension/http/internal/request/client/HttpExtensionClient.java b/mule-http-connector/src/main/java/org/mule/extension/http/internal/request/client/HttpExtensionClient.java index 966471524..f688d14b7 100644 --- a/mule-http-connector/src/main/java/org/mule/extension/http/internal/request/client/HttpExtensionClient.java +++ b/mule-http-connector/src/main/java/org/mule/extension/http/internal/request/client/HttpExtensionClient.java @@ -6,6 +6,7 @@ */ package org.mule.extension.http.internal.request.client; +import static org.mule.extension.http.internal.request.UriUtils.resolveUri; import static org.mule.runtime.core.api.lifecycle.LifecycleUtils.startIfNeeded; import static org.mule.runtime.core.api.lifecycle.LifecycleUtils.stopIfNeeded; @@ -19,8 +20,13 @@ import org.mule.runtime.http.api.client.auth.HttpAuthentication; import org.mule.runtime.http.api.domain.message.request.HttpRequest; import org.mule.runtime.http.api.domain.message.response.HttpResponse; +import org.mule.sdk.api.http.sse.ClientWithSse; +import org.mule.sdk.api.http.sse.ServerSentEventSource; +import org.mule.sdk.api.http.sse.SseRetryConfig; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; /** * Composition of a {@link ShareableHttpClient} with URI and authentication parameters that allow falling back to connection @@ -28,11 +34,12 @@ * * @since 1.0 */ -public class HttpExtensionClient implements Startable, Stoppable { +public class HttpExtensionClient implements Startable, Stoppable, ClientWithSse { private final HttpRequestAuthentication authentication; private final ShareableHttpClient httpClient; private final UriParameters uriParameters; + private final Map sseSourcesByPath = new ConcurrentHashMap<>(); public HttpExtensionClient(ShareableHttpClient httpClient, UriParameters uriParameters, HttpRequestAuthentication authentication) { @@ -74,4 +81,12 @@ public CompletableFuture send(HttpRequest request, int responseTim HttpSendBodyMode sendBodyMode) { return httpClient.sendAsync(request, responseTimeout, followRedirects, authentication, sendBodyMode); } + + @Override + public ServerSentEventSource sseSource(String path, SseRetryConfig retryConfig) { + return sseSourcesByPath.computeIfAbsent(path, p -> { + String uri = resolveUri(uriParameters.getScheme(), uriParameters.getHost().trim(), uriParameters.getPort(), path); + return httpClient.sseSource(uri, retryConfig); + }); + } } diff --git a/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/PassClientToFlow.java b/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/PassClientToFlow.java new file mode 100644 index 000000000..8758c0bf9 --- /dev/null +++ b/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/PassClientToFlow.java @@ -0,0 +1,41 @@ +/* + * Copyright 2023 Salesforce, Inc. All rights reserved. + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.extension.http.internal.sse; + +import org.mule.extension.http.api.SseClientAttributes; +import org.mule.runtime.extension.api.runtime.operation.Result; +import org.mule.runtime.extension.api.runtime.source.SourceCallback; +import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext; +import org.mule.sdk.api.http.sse.SseClient; + +import java.util.function.Consumer; + +public class PassClientToFlow implements Consumer { + + private final SourceCallback sourceCallback; + private final SseClientsRepository sseClientsRepository; + + public PassClientToFlow(SourceCallback sourceCallback, SseClientsRepository sseClientsRepository) { + this.sourceCallback = sourceCallback; + this.sseClientsRepository = sseClientsRepository; + } + + @Override + public void accept(SseClient sseClient) { + sseClientsRepository.addClient(sseClient); + + SourceCallbackContext sourceContext = sourceCallback.createContext(); + sourceCallback.handle(Result.builder() + .output(null) + .attributes(getAttributes(sseClient)) + .build(), sourceContext); + } + + private SseClientAttributes getAttributes(SseClient sseClient) { + return new SseClientAttributes(sseClient.getClientId()); + } +} diff --git a/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/PassEventToFlow.java b/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/PassEventToFlow.java new file mode 100644 index 000000000..c04cbb42d --- /dev/null +++ b/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/PassEventToFlow.java @@ -0,0 +1,36 @@ +/* + * Copyright 2023 Salesforce, Inc. All rights reserved. + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.extension.http.internal.sse; + +import org.mule.extension.http.api.SseEventAttributes; +import org.mule.runtime.extension.api.runtime.operation.Result; +import org.mule.runtime.extension.api.runtime.source.SourceCallback; +import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext; +import org.mule.sdk.api.http.sse.ServerSentEvent; +import org.mule.sdk.api.http.sse.ServerSentEventListener; + +public class PassEventToFlow implements ServerSentEventListener { + + private final SourceCallback sourceCallback; + + public PassEventToFlow(SourceCallback sourceCallback) { + this.sourceCallback = sourceCallback; + } + + @Override + public void onEvent(ServerSentEvent event) { + SourceCallbackContext sourceContext = sourceCallback.createContext(); + sourceCallback.handle(Result.builder() + .output(event.getEventData()) + .attributes(getAttributes(event)) + .build(), sourceContext); + } + + private SseEventAttributes getAttributes(ServerSentEvent event) { + return new SseEventAttributes(event); + } +} diff --git a/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/SseClientsRepository.java b/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/SseClientsRepository.java new file mode 100644 index 000000000..edb5c1bb7 --- /dev/null +++ b/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/SseClientsRepository.java @@ -0,0 +1,25 @@ +/* + * Copyright 2023 Salesforce, Inc. All rights reserved. + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.extension.http.internal.sse; + +import org.mule.sdk.api.http.sse.SseClient; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class SseClientsRepository { + + private final Map sseClients = new ConcurrentHashMap<>(); + + public void addClient(SseClient sseClient) { + sseClients.put(sseClient.getClientId(), sseClient); + } + + public SseClient getClient(long id) { + return sseClients.get(id); + } +} diff --git a/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/SseEndpoint.java b/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/SseEndpoint.java new file mode 100644 index 000000000..9025bd372 --- /dev/null +++ b/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/SseEndpoint.java @@ -0,0 +1,69 @@ +/* + * Copyright 2023 Salesforce, Inc. All rights reserved. + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.extension.http.internal.sse; + +import static org.mule.runtime.extension.api.annotation.param.MediaType.ANY; + +import static org.slf4j.LoggerFactory.getLogger; + +import org.mule.extension.http.api.SseClientAttributes; +import org.mule.extension.http.internal.delegate.HttpServiceApiProxy; +import org.mule.runtime.api.connection.ConnectionProvider; +import org.mule.runtime.api.exception.MuleException; +import org.mule.runtime.extension.api.annotation.Alias; +import org.mule.runtime.extension.api.annotation.param.Connection; +import org.mule.runtime.extension.api.annotation.param.MediaType; +import org.mule.runtime.extension.api.annotation.param.Optional; +import org.mule.runtime.extension.api.annotation.param.Parameter; +import org.mule.runtime.extension.api.annotation.param.display.Placement; +import org.mule.runtime.extension.api.runtime.source.Source; +import org.mule.runtime.extension.api.runtime.source.SourceCallback; +import org.mule.runtime.http.api.server.HttpServer; +import org.mule.sdk.api.http.sse.ServerWithSse; + +import javax.inject.Inject; +import javax.inject.Named; + +import org.slf4j.Logger; + +@Alias("sseEndpoint") +@MediaType(value = ANY, strict = false) +public class SseEndpoint extends Source { + + private static final Logger LOGGER = getLogger(SseEndpoint.class); + + @Inject + private HttpServiceApiProxy httpService; + + @Connection + private ConnectionProvider serverProvider; + + @Inject + @Named("_sseClientsRepository") + private SseClientsRepository sseClientsRepository; + + @Parameter + @Placement(order = 1) + @Optional(defaultValue = "/") + private String path = "/"; + + + @Override + public void onStart(SourceCallback sourceCallback) throws MuleException { + HttpServer server = serverProvider.connect(); + if (server instanceof ServerWithSse) { + httpService.sseEndpoint((ServerWithSse) server, path, new PassClientToFlow(sourceCallback, sseClientsRepository)); + } else { + throw new IllegalStateException("Server is not an instance of ServerWithSse"); + } + } + + @Override + public void onStop() { + // TODO: implement + } +} diff --git a/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/SseOperations.java b/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/SseOperations.java new file mode 100644 index 000000000..1507586ff --- /dev/null +++ b/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/SseOperations.java @@ -0,0 +1,40 @@ +/* + * Copyright 2023 Salesforce, Inc. All rights reserved. + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.extension.http.internal.sse; + +import static org.slf4j.LoggerFactory.getLogger; + +import org.mule.runtime.extension.api.annotation.param.Optional; +import org.mule.sdk.api.http.sse.SseClient; + +import java.io.IOException; + +import javax.inject.Inject; +import javax.inject.Named; + +import org.slf4j.Logger; + +public class SseOperations { + + private static final Logger LOGGER = getLogger(SseOperations.class); + + @Inject + @Named("_sseClientsRepository") + private SseClientsRepository sseClientsRepository; + + // TODO: inject -> param + // TODO: other params + // TODO: client not found + // TODO: close client + + public void sendSseEvent(long clientId, String eventName, String data, @Optional String id, @Optional Long retry) + throws IOException { + LOGGER.trace("SSE event {} received for client {}", eventName, clientId); + SseClient sseClient = sseClientsRepository.getClient(clientId); + sseClient.sendEvent(eventName, data); + } +} diff --git a/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/SseSource.java b/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/SseSource.java new file mode 100644 index 000000000..4fefb3e9b --- /dev/null +++ b/mule-http-connector/src/main/java/org/mule/extension/http/internal/sse/SseSource.java @@ -0,0 +1,77 @@ +/* + * Copyright 2023 Salesforce, Inc. All rights reserved. + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.extension.http.internal.sse; + +import static org.mule.runtime.extension.api.annotation.param.MediaType.ANY; +import static org.mule.sdk.api.http.sse.SseRetryConfig.DEFAULT; + +import static org.slf4j.LoggerFactory.getLogger; + +import org.mule.extension.http.api.SseEventAttributes; +import org.mule.extension.http.internal.delegate.HttpServiceApiProxy; +import org.mule.extension.http.internal.request.client.HttpExtensionClient; +import org.mule.runtime.api.connection.ConnectionProvider; +import org.mule.runtime.api.exception.MuleException; +import org.mule.runtime.extension.api.annotation.Alias; +import org.mule.runtime.extension.api.annotation.param.Connection; +import org.mule.runtime.extension.api.annotation.param.MediaType; +import org.mule.runtime.extension.api.annotation.param.Optional; +import org.mule.runtime.extension.api.annotation.param.Parameter; +import org.mule.runtime.extension.api.annotation.param.display.Placement; +import org.mule.runtime.extension.api.runtime.source.Source; +import org.mule.runtime.extension.api.runtime.source.SourceCallback; +import org.mule.sdk.api.http.sse.ClientWithSse; +import org.mule.sdk.api.http.sse.ServerSentEventSource; + +import javax.inject.Inject; + +import org.slf4j.Logger; + +@Alias("sseSource") +@MediaType(value = ANY, strict = false) +public class SseSource extends Source { + + private static final Logger LOGGER = getLogger(SseSource.class); + + @Connection + private ConnectionProvider clientProvider; + + @Inject + private HttpServiceApiProxy httpService; + + @Parameter + @Placement(order = 1) + @Optional(defaultValue = "/") + private String path = "/"; + + /** + * Event name to listen. Defaults to "*", which means that it's a fallback for all non-listened event names. + */ + @Parameter + @Placement(order = 2) + @Optional(defaultValue = "*") + private String eventName = "*"; + + private ServerSentEventSource serverSentEventSource; + + @Override + public void onStart(SourceCallback sourceCallback) throws MuleException { + HttpExtensionClient client = clientProvider.connect(); + serverSentEventSource = httpService.sseSource(client, path, DEFAULT); + if ("*".equals(eventName)) { + serverSentEventSource.register(new PassEventToFlow(sourceCallback)); + } else { + serverSentEventSource.register(eventName, new PassEventToFlow(sourceCallback)); + } + serverSentEventSource.open(); + } + + @Override + public void onStop() { + // serverSentEventSource.stop(); + } +} diff --git a/mule-http-connector/src/main/java/org/mule/extension/http/internal/temporary/HttpConnector.java b/mule-http-connector/src/main/java/org/mule/extension/http/internal/temporary/HttpConnector.java index 1748f0fee..21674db30 100644 --- a/mule-http-connector/src/main/java/org/mule/extension/http/internal/temporary/HttpConnector.java +++ b/mule-http-connector/src/main/java/org/mule/extension/http/internal/temporary/HttpConnector.java @@ -29,6 +29,7 @@ import org.mule.extension.http.api.request.validator.SuccessStatusCodeValidator; import org.mule.extension.http.internal.HttpOperations; import org.mule.extension.http.internal.request.HttpRequesterConfig; +import org.mule.extension.http.internal.sse.SseOperations; import org.mule.extension.socket.api.socket.tcp.TcpClientSocketProperties; import org.mule.extension.socket.api.socket.tcp.TcpServerSocketProperties; import org.mule.modules.cors.api.configuration.origin.EveryOrigin; @@ -56,7 +57,7 @@ @Extension(name = "HTTP") @JavaVersionSupport({JAVA_8, JAVA_11, JAVA_17}) @Configurations({HttpListenerConfig.class, HttpRequesterConfig.class}) -@Operations(HttpOperations.class) +@Operations({HttpOperations.class, SseOperations.class}) @SubTypeMapping(baseType = HttpRequestAuthentication.class, subTypes = {BasicAuthentication.class, DigestAuthentication.class, NtlmAuthentication.class}) @SubTypeMapping(baseType = HttpProxyConfig.class, subTypes = {DefaultProxyConfig.class, DefaultNtlmProxyConfig.class}) diff --git a/mule-http-connector/src/main/resources/META-INF/org/mule/runtime/core/config/registry-bootstrap.properties b/mule-http-connector/src/main/resources/META-INF/org/mule/runtime/core/config/registry-bootstrap.properties index 2013d056b..52aee9bc5 100644 --- a/mule-http-connector/src/main/resources/META-INF/org/mule/runtime/core/config/registry-bootstrap.properties +++ b/mule-http-connector/src/main/resources/META-INF/org/mule/runtime/core/config/registry-bootstrap.properties @@ -1,4 +1,6 @@ +_httpServiceProxy=org.mule.extension.http.internal.delegate.HttpServiceApiProxy _httpRequesterConnectionManager=org.mule.extension.http.internal.request.HttpRequesterConnectionManager +_sseClientsRepository=org.mule.extension.http.internal.sse.SseClientsRepository http.policy.listener.transformer=org.mule.extension.http.api.policy.HttpListenerPolicyParametersTransformer http.policy.request.transformer=org.mule.extension.http.api.policy.HttpPolicyRequestParametersTransformer diff --git a/mule-http-connector/src/test/munit/sse/sse-event-source-test-case.xml b/mule-http-connector/src/test/munit/sse/sse-event-source-test-case.xml new file mode 100644 index 000000000..36ee5a291 --- /dev/null +++ b/mule-http-connector/src/test/munit/sse/sse-event-source-test-case.xml @@ -0,0 +1,68 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + #[payload] + + + + + + + + #[payload] + + + + diff --git a/mule-http-connector/src/test/resources/log4j2-test.xml b/mule-http-connector/src/test/resources/log4j2-test.xml index 79217cd8d..bfe279511 100644 --- a/mule-http-connector/src/test/resources/log4j2-test.xml +++ b/mule-http-connector/src/test/resources/log4j2-test.xml @@ -7,6 +7,7 @@ +