Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server sent support #14274

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ public class MuleProperties {
public static final String SERVER_NOTIFICATION_MANAGER = "_serverNotificationManager";
public static final String OBJECT_ARTIFACT_TYPE_LOADER = "_artifactTypeLoader";
public static final String INTERCEPTOR_MANAGER_REGISTRY_KEY = "_muleInterceptorManager";
public static final String MULE_HTTP_SERVICE_API_REGISTRY_KEY = "_httpServiceApi";

// Not currently used as these need to be instance variables of the MuleContext.
public static final String OBJECT_NOTIFICATION_MANAGER = "_muleNotificationManager";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@

opens org.mule.runtime.module.extension.internal.resources.documentation to
jakarta.xml.bind;
exports org.mule.runtime.module.extension.api.http;

provides org.mule.runtime.api.connectivity.ConnectivityTestingStrategy with
org.mule.runtime.module.extension.api.tooling.ExtensionConnectivityTestingStrategy;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.runtime.module.extension.api.http;

import org.mule.runtime.http.api.HttpService;
import org.mule.sdk.api.http.HttpServiceApi;
import org.mule.sdk.api.http.sse.ClientWithSse;
import org.mule.sdk.api.http.sse.ServerSentEventSource;
import org.mule.sdk.api.http.sse.ServerWithSse;
import org.mule.sdk.api.http.sse.SseClient;
import org.mule.sdk.api.http.sse.SseEndpointManager;
import org.mule.sdk.api.http.sse.SseRetryConfig;

import java.util.function.Consumer;

/**
* Definition of {@link HttpServiceApi} that just delegates all to the {@link HttpService}.
*/
public class HttpServiceApiDelegate implements HttpServiceApi {

@Override
public SseEndpointManager sseEndpoint(ServerWithSse httpServer, String ssePath, Consumer<SseClient> sseClientHandler) {
return httpServer.sse(ssePath, sseClientHandler);
}

@Override
public ServerSentEventSource sseSource(ClientWithSse httpClient, String url, SseRetryConfig retryConfig) {
return httpClient.sseSource(url, retryConfig);
}
}
4 changes: 4 additions & 0 deletions modules/http-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>org.mule.sdk</groupId>
<artifactId>mule-sdk-api</artifactId>
</dependency>

<!-- For the deprecated API methods still relying on org.mule.runtime.core.api.retry.policy.RetryPolicyTemplate -->
<dependency>
Expand Down
4 changes: 3 additions & 1 deletion modules/http-api/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// For the deprecated API methods still relying on org.mule.runtime.core.api.retry.policy.RetryPolicyTemplate
requires org.mule.runtime.core;
requires com.github.benmanes.caffeine;
requires org.mule.sdk.api;

exports org.mule.runtime.http.api;
exports org.mule.runtime.http.api.client;
Expand All @@ -30,6 +31,7 @@
exports org.mule.runtime.http.api.domain.message.request;
exports org.mule.runtime.http.api.domain.message.response;
exports org.mule.runtime.http.api.domain.request;
exports org.mule.runtime.http.api.domain.sse;
exports org.mule.runtime.http.api.exception;
exports org.mule.runtime.http.api.server;
exports org.mule.runtime.http.api.server.async;
Expand All @@ -38,5 +40,5 @@
exports org.mule.runtime.http.api.tcp;
exports org.mule.runtime.http.api.ws;
exports org.mule.runtime.http.api.ws.exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.runtime.http.api.ws.WebSocket;
import org.mule.sdk.api.http.sse.ClientWithSse;
import org.mule.sdk.api.http.sse.ServerSentEventSource;
import org.mule.sdk.api.http.sse.SseRetryConfig;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
Expand All @@ -27,7 +30,7 @@
* @since 4.0
*/
@NoImplement
public interface HttpClient {
public interface HttpClient extends ClientWithSse {

/**
* Fully configures the client, leaving it ready to use. Must be executed before any requests are attempted.
Expand Down Expand Up @@ -166,4 +169,18 @@ default CompletableFuture<WebSocket> openWebSocket(HttpRequest request,
WebSocketCallback callback) {
throw new UnsupportedOperationException("WebSockets are only supported in Enterprise Edition");
}

/**
* Creates a consumer of Server-sent events. The resulting {@link ServerSentEventSource} is not connected automatically.
*
* @param url the URL of the server.
* @param retryConfig configuration for the retry mechanism.
* @return a non-connected instance of {@link ServerSentEventSource}.
*
* @since 4.10.0
*/
@Override
default ServerSentEventSource sseSource(String url, SseRetryConfig retryConfig) {
throw new UnsupportedOperationException("Server-sent Events are not supported");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.runtime.http.api.domain.sse;

import static java.util.Objects.requireNonNull;
import static java.util.Optional.ofNullable;
import static java.util.OptionalLong.empty;
import static java.util.OptionalLong.of;

import java.io.Serial;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;

/**
* Server-sent event.
*
* TODO: Move to service...
*/
public class ServerSentEventImpl implements Serializable, org.mule.sdk.api.http.sse.ServerSentEvent {

@Serial
private static final long serialVersionUID = -1211505868025654629L;

private final String eventName;
private final String eventData;
private final String id;
private final Long retryDelay;

public ServerSentEventImpl(String eventName, String eventData, String id, Long retryDelay) {
requireNonNull(eventName, "eventName cannot be null");
requireNonNull(eventData, "eventData cannot be null");

this.eventName = eventName;
this.eventData = eventData;
this.id = id;
this.retryDelay = retryDelay;
}

@Override
public String getEventName() {
return eventName;
}

@Override
public String getEventData() {
return eventData;
}

@Override
public Optional<String> getId() {
return ofNullable(id);
}

@Override
public OptionalLong getRetryDelay() {
return null != retryDelay ? of(retryDelay) : empty();
}

@Override
public String toString() {
return "ServerSentEvent [name=" + eventName + ", data=" + eventData + ", id=" + id + ", retryDelay=" + retryDelay + "]";
}

@Override
public int hashCode() {
return Objects.hashCode(eventName) + Objects.hashCode(eventData) + Objects.hashCode(id);
}

@Override
public boolean equals(Object o) {
if (null == o || getClass() != o.getClass()) {
return false;
}
ServerSentEventImpl that = (ServerSentEventImpl) o;
return Objects.equals(eventName, that.eventName) && Objects.equals(eventData, that.eventData) && Objects.equals(id, that.id)
&& Objects.equals(retryDelay, that.retryDelay);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
import org.mule.runtime.http.api.HttpConstants.Protocol;
import org.mule.runtime.http.api.server.ws.WebSocketHandler;
import org.mule.runtime.http.api.server.ws.WebSocketHandlerManager;
import org.mule.sdk.api.http.sse.ServerWithSse;
import org.mule.sdk.api.http.sse.SseClient;
import org.mule.sdk.api.http.sse.SseEndpointManager;

import java.io.IOException;
import java.util.Collection;
import java.util.function.Consumer;

/**
* Represents a ServerSocket connection. Notice it should be started to be bound, stopped to be unbound and finally disposed to
Expand All @@ -23,7 +27,7 @@
* @since 4.0
*/
@NoImplement
public interface HttpServer {
public interface HttpServer extends ServerWithSse {

/**
* Binds the ServerSocket to the network interface and starts listening for requests.
Expand Down Expand Up @@ -117,4 +121,16 @@ RequestHandlerManager addRequestHandler(final Collection<String> methods, final
default WebSocketHandlerManager addWebSocketHandler(WebSocketHandler handler) {
throw new UnsupportedOperationException("WebSockets are only supported in Enterprise Edition");
}

/**
* Adds an endpoint to produce server-sent events.
*
* @param ssePath path to match.
* @param sseClientHandler callback to be executed for each received {@link SseClient}.
* @return an object that can be used to enable/disable/remove the endpoint from the server.
*/
@Override
default SseEndpointManager sse(String ssePath, Consumer<SseClient> sseClientHandler) {
throw new UnsupportedOperationException("Server-sent events are not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.mule.runtime.core.api.config.MuleProperties.MULE_CORE_EXPORTER_FACTORY_KEY;
import static org.mule.runtime.core.api.config.MuleProperties.MULE_CORE_SPAN_FACTORY_KEY;
import static org.mule.runtime.core.api.config.MuleProperties.MULE_ERROR_METRICS_FACTORY_KEY;
import static org.mule.runtime.core.api.config.MuleProperties.MULE_HTTP_SERVICE_API_REGISTRY_KEY;
import static org.mule.runtime.core.api.config.MuleProperties.MULE_MEMORY_MANAGEMENT_SERVICE;
import static org.mule.runtime.core.api.config.MuleProperties.MULE_METER_EXPORTER_CONFIGURATION_KEY;
import static org.mule.runtime.core.api.config.MuleProperties.MULE_METER_EXPORTER_FACTORY_KEY;
Expand Down Expand Up @@ -144,6 +145,7 @@
import org.mule.runtime.metrics.exporter.impl.optel.config.OpenTelemetryAutoConfigurableMeterExporterConfiguration;
import org.mule.runtime.metrics.impl.DefaultMeterProvider;
import org.mule.runtime.metrics.impl.meter.error.DefaultErrorMetricsFactory;
import org.mule.runtime.module.extension.api.http.HttpServiceApiDelegate;
import org.mule.runtime.module.extension.api.runtime.compatibility.DefaultForwardCompatibilityHelper;
import org.mule.runtime.module.extension.internal.data.sample.MuleSampleDataService;
import org.mule.runtime.module.extension.internal.store.SdkObjectStoreManagerAdapter;
Expand All @@ -162,7 +164,6 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;

import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
Expand Down Expand Up @@ -256,6 +257,7 @@ public class SpringMuleContextServiceConfigurator extends AbstractSpringMuleCont
.put(MULE_TRACER_INITIAL_SPAN_INFO_PROVIDER_KEY, getBeanDefinition(DefaultInitialSpanInfoProvider.class))
.put(PROFILING_FEATURE_MANAGEMENT_SERVICE_KEY, getBeanDefinition(DefaultFeatureManagementService.class))
.put(FORWARD_COMPATIBILITY_HELPER_KEY, getBeanDefinition(DefaultForwardCompatibilityHelper.class))
.put(MULE_HTTP_SERVICE_API_REGISTRY_KEY, getBeanDefinition(HttpServiceApiDelegate.class))
.build();

// Do not use static field. BeanDefinitions are reused and produce weird behaviour
Expand Down