Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server sent events basic support #1010

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions mule-http-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
<muleHttpPolicyApiVersion>1.1.3</muleHttpPolicyApiVersion>
<mulePolicyApiVersion>1.1.3</mulePolicyApiVersion>
<muleProfilingApiVersion>1.0.0</muleProfilingApiVersion>
<muleSdkCompatibilityApiVersion>0.7.4</muleSdkCompatibilityApiVersion>
<muleSdkApiVersion>0.7.0</muleSdkApiVersion>
<muleSdkCompatibilityApiVersion>0.12.0-SNAPSHOT</muleSdkCompatibilityApiVersion>
<muleSdkApiVersion>0.12.0-SNAPSHOT</muleSdkApiVersion>

<!-- Remove when a new parent version with MTF is available -->
<munit.input.directory>src/test/munit</munit.input.directory>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.extension.http.api;

public class SseClientAttributes {

private final Long clientId;

public SseClientAttributes(Long clientId) {
this.clientId = clientId;
}

public Long getClientId() {
return clientId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.extension.http.api;

import org.mule.sdk.api.http.sse.ServerSentEvent;

public class SseEventAttributes {

private final String eventName;
private final String eventId;
private final Long retryDelay;

public SseEventAttributes(ServerSentEvent event) {
eventName = event.getEventName();
eventId = event.getId().orElse(null);
retryDelay = event.getRetryDelay().isPresent() ? event.getRetryDelay().getAsLong() : null;
}

public String getEventName() {
return eventName;
}

public String getId() {
return eventId;
}

public Long getRetryDelay() {
return retryDelay;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.mule.extension.http.internal.listener.ListenerPath;
import org.mule.extension.http.internal.listener.intercepting.HttpListenerInterceptor;
import org.mule.extension.http.api.listener.intercepting.cors.CorsInterceptorWrapper;
import org.mule.extension.http.internal.sse.SseEndpoint;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.util.MultiMap;
Expand All @@ -36,7 +37,7 @@
*/
@Configuration(name = "listenerConfig")
@ConnectionProviders(HttpListenerProvider.class)
@Sources(HttpListener.class)
@Sources({HttpListener.class, SseEndpoint.class})
public class HttpListenerConfig implements Initialisable {

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.extension.http.internal.delegate;

import org.mule.runtime.http.api.client.HttpClient;
import org.mule.runtime.http.api.server.HttpServer;
import org.mule.sdk.api.http.HttpServiceApi;
import org.mule.sdk.api.http.sse.ClientWithSse;
import org.mule.sdk.api.http.sse.ServerSentEventSource;
import org.mule.sdk.api.http.sse.ServerWithSse;
import org.mule.sdk.api.http.sse.SseClient;
import org.mule.sdk.api.http.sse.SseEndpointManager;
import org.mule.sdk.api.http.sse.SseRetryConfig;

import java.util.Optional;
import java.util.function.Consumer;

import javax.inject.Inject;
import javax.inject.Named;

public class HttpServiceApiProxy implements HttpServiceApi {

@Inject
@Named("_httpServiceApi")
private Optional<HttpServiceApi> forwardCompatibilityApi;

@Override
public SseEndpointManager sseEndpoint(ServerWithSse httpServer, String ssePath, Consumer<SseClient> sseClientHandler) {
if (forwardCompatibilityApi.isPresent()) {
HttpServiceApi httpServiceApi = forwardCompatibilityApi.get();
return httpServiceApi.sseEndpoint(httpServer, ssePath, sseClientHandler);
} else {
throw new IllegalStateException("Feature not implemented in this Mule Version");
}
}

@Override
public ServerSentEventSource sseSource(ClientWithSse httpClient, String url, SseRetryConfig retryConfig) {
if (forwardCompatibilityApi.isPresent()) {
HttpServiceApi httpServiceApi = forwardCompatibilityApi.get();
return httpServiceApi.sseSource(httpClient, url, retryConfig);
} else {
throw new IllegalStateException("Feature not implemented in this Mule Version");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@
import org.mule.runtime.http.api.server.RequestHandler;
import org.mule.runtime.http.api.server.RequestHandlerManager;
import org.mule.runtime.http.api.server.ServerAddress;
import org.mule.sdk.api.http.sse.ServerWithSse;
import org.mule.sdk.api.http.sse.SseClient;
import org.mule.sdk.api.http.sse.SseEndpointManager;

import java.io.IOException;
import java.util.Collection;
import java.util.function.Consumer;

/**
* Base class for applying the delegate design pattern around an {@link HttpServer}
*/
public class HttpServerDelegate implements HttpServer {
public class HttpServerDelegate implements HttpServer, ServerWithSse {

private final HttpServer delegate;

Expand Down Expand Up @@ -73,7 +77,12 @@ public RequestHandlerManager addRequestHandler(String path, RequestHandler reque
return delegate.addRequestHandler(path, requestHandler);
}

protected HttpServer getDelegate() {
return delegate;
@Override
public SseEndpointManager sse(String ssePath, Consumer<SseClient> sseClientHandler) {
if (delegate instanceof ServerWithSse) {
return ((ServerWithSse) delegate).sse(ssePath, sseClientHandler);
} else {
throw new UnsupportedOperationException("Server-sent events are not supported");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.mule.extension.http.api.request.builder.QueryParam;
import org.mule.extension.http.api.request.builder.RequestHeader;
import org.mule.extension.http.api.streaming.HttpStreamingType;
import org.mule.extension.http.internal.sse.SseSource;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.core.api.MuleContext;
Expand All @@ -35,7 +36,7 @@
@Configuration(name = "requestConfig")
@ConnectionProviders(HttpRequesterProvider.class)
@Operations({HttpRequestOperations.class})
@Sources(HttpPollingSource.class)
@Sources({HttpPollingSource.class, SseSource.class})
public class HttpRequesterConfig implements Initialisable, HttpRequesterCookieConfig {

@ParameterGroup(name = URL_CONFIGURATION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@
import org.mule.runtime.http.api.client.auth.HttpAuthentication;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.sdk.api.http.sse.ClientWithSse;
import org.mule.sdk.api.http.sse.ServerSentEventSource;
import org.mule.sdk.api.http.sse.SseRetryConfig;

import java.util.concurrent.CompletableFuture;

/**
* Wrapper implementation of an {@link HttpClient} that allows being shared by only configuring the client when first required and
* only disabling it when last required.
*/
public class ShareableHttpClient {
public class ShareableHttpClient implements ClientWithSse {

private HttpClient delegate;
private Integer usageCount = new Integer(0);
Expand Down Expand Up @@ -52,4 +55,13 @@ public CompletableFuture<HttpResponse> sendAsync(HttpRequest request, int respon
HttpSendBodyMode sendBodyMode) {
return HttpClientReflection.sendAsync(delegate, request, responseTimeout, followRedirects, authentication, sendBodyMode);
}

@Override
public ServerSentEventSource sseSource(String url, SseRetryConfig retryConfig) {
if (delegate instanceof ClientWithSse) {
return ((ClientWithSse) delegate).sseSource(url, retryConfig);
} else {
throw new UnsupportedOperationException("SSE is not supported");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.mule.extension.http.internal.request.client;

import static org.mule.extension.http.internal.request.UriUtils.resolveUri;
import static org.mule.runtime.core.api.lifecycle.LifecycleUtils.startIfNeeded;
import static org.mule.runtime.core.api.lifecycle.LifecycleUtils.stopIfNeeded;

Expand All @@ -19,20 +20,26 @@
import org.mule.runtime.http.api.client.auth.HttpAuthentication;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.sdk.api.http.sse.ClientWithSse;
import org.mule.sdk.api.http.sse.ServerSentEventSource;
import org.mule.sdk.api.http.sse.SseRetryConfig;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

/**
* Composition of a {@link ShareableHttpClient} with URI and authentication parameters that allow falling back to connection
* default values for them.
*
* @since 1.0
*/
public class HttpExtensionClient implements Startable, Stoppable {
public class HttpExtensionClient implements Startable, Stoppable, ClientWithSse {

private final HttpRequestAuthentication authentication;
private final ShareableHttpClient httpClient;
private final UriParameters uriParameters;
private final Map<String, ServerSentEventSource> sseSourcesByPath = new ConcurrentHashMap<>();

public HttpExtensionClient(ShareableHttpClient httpClient, UriParameters uriParameters,
HttpRequestAuthentication authentication) {
Expand Down Expand Up @@ -74,4 +81,12 @@ public CompletableFuture<HttpResponse> send(HttpRequest request, int responseTim
HttpSendBodyMode sendBodyMode) {
return httpClient.sendAsync(request, responseTimeout, followRedirects, authentication, sendBodyMode);
}

@Override
public ServerSentEventSource sseSource(String path, SseRetryConfig retryConfig) {
return sseSourcesByPath.computeIfAbsent(path, p -> {
String uri = resolveUri(uriParameters.getScheme(), uriParameters.getHost().trim(), uriParameters.getPort(), path);
return httpClient.sseSource(uri, retryConfig);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.extension.http.internal.sse;

import org.mule.extension.http.api.SseClientAttributes;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.sdk.api.http.sse.SseClient;

import java.util.function.Consumer;

public class PassClientToFlow implements Consumer<SseClient> {

private final SourceCallback<Void, SseClientAttributes> sourceCallback;
private final SseClientsRepository sseClientsRepository;

public PassClientToFlow(SourceCallback<Void, SseClientAttributes> sourceCallback, SseClientsRepository sseClientsRepository) {
this.sourceCallback = sourceCallback;
this.sseClientsRepository = sseClientsRepository;
}

@Override
public void accept(SseClient sseClient) {
sseClientsRepository.addClient(sseClient);

SourceCallbackContext sourceContext = sourceCallback.createContext();
sourceCallback.handle(Result.<Void, SseClientAttributes>builder()
.output(null)
.attributes(getAttributes(sseClient))
.build(), sourceContext);
}

private SseClientAttributes getAttributes(SseClient sseClient) {
return new SseClientAttributes(sseClient.getClientId());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.extension.http.internal.sse;

import org.mule.extension.http.api.SseEventAttributes;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.sdk.api.http.sse.ServerSentEvent;
import org.mule.sdk.api.http.sse.ServerSentEventListener;

public class PassEventToFlow implements ServerSentEventListener {

private final SourceCallback<String, SseEventAttributes> sourceCallback;

public PassEventToFlow(SourceCallback<String, SseEventAttributes> sourceCallback) {
this.sourceCallback = sourceCallback;
}

@Override
public void onEvent(ServerSentEvent event) {
SourceCallbackContext sourceContext = sourceCallback.createContext();
sourceCallback.handle(Result.<String, SseEventAttributes>builder()
.output(event.getEventData())
.attributes(getAttributes(event))
.build(), sourceContext);
}

private SseEventAttributes getAttributes(ServerSentEvent event) {
return new SseEventAttributes(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.extension.http.internal.sse;

import org.mule.sdk.api.http.sse.SseClient;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class SseClientsRepository {

private final Map<Long, SseClient> sseClients = new ConcurrentHashMap<>();

public void addClient(SseClient sseClient) {
sseClients.put(sseClient.getClientId(), sseClient);
}

public SseClient getClient(long id) {
return sseClients.get(id);
}
}
Loading