Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poc/sse #14247

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft

Poc/sse #14247

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion modules/http-api/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
exports org.mule.runtime.http.api.client;
exports org.mule.runtime.http.api.client.auth;
exports org.mule.runtime.http.api.client.proxy;
exports org.mule.runtime.http.api.client.sse;
exports org.mule.runtime.http.api.client.ws;
exports org.mule.runtime.http.api.domain;
exports org.mule.runtime.http.api.domain.entity;
Expand All @@ -30,13 +31,15 @@
exports org.mule.runtime.http.api.domain.message.request;
exports org.mule.runtime.http.api.domain.message.response;
exports org.mule.runtime.http.api.domain.request;
exports org.mule.runtime.http.api.domain.sse;
exports org.mule.runtime.http.api.exception;
exports org.mule.runtime.http.api.server;
exports org.mule.runtime.http.api.server.async;
exports org.mule.runtime.http.api.server.sse;
exports org.mule.runtime.http.api.server.ws;
exports org.mule.runtime.http.api.utils;
exports org.mule.runtime.http.api.tcp;
exports org.mule.runtime.http.api.ws;
exports org.mule.runtime.http.api.ws.exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.mule.api.annotation.NoImplement;
import org.mule.runtime.http.api.client.auth.HttpAuthentication;
import org.mule.runtime.http.api.client.sse.ServerSentEventSource;
import org.mule.runtime.http.api.client.ws.WebSocketCallback;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
Expand Down Expand Up @@ -166,4 +167,14 @@ default CompletableFuture<WebSocket> openWebSocket(HttpRequest request,
WebSocketCallback callback) {
throw new UnsupportedOperationException("WebSockets are only supported in Enterprise Edition");
}

/**
* Creates a consumer of Server-sent events. The resulting {@link ServerSentEventSource} is not connected automatically.
*
* @param url the URL of the server.
* @return a non-connected instance of {@link ServerSentEventSource}.
*/
default ServerSentEventSource sseSource(String url) {
throw new UnsupportedOperationException("Server-sent Events are not supported");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.runtime.http.api.client.sse;

import org.mule.runtime.http.api.domain.sse.ServerSentEvent;

/**
* An observer of server-sent events.
*/
public interface ServerSentEventListener {

// TODO: Move to another class, this one should only handle events.
default void onOpen() {}

// TODO: Move to another class, this one should only handle events.
default void onClose() {}

// TODO: Move to another class, this one should only handle events.
default void onError(Throwable error) {}

/**
* Method to be invoked for each received {@link ServerSentEvent}.
*
* @param event the received event.
*/
default void onEvent(ServerSentEvent event) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.runtime.http.api.client.sse;

/**
* A consumer of server-sent events.
*/
public interface ServerSentEventSource {

// TODO: Reconnection
void connect();

/**
* Registers a {@link ServerSentEventListener listener} for a specific event name (a.k.a. topic, a.k.a. type).
*
* @param eventName The event name that the {@link ServerSentEventListener listener} will handle.
* @param listener The event handler.
*/
void register(String eventName, ServerSentEventListener listener);

/**
* Registers a fallback {@link ServerSentEventListener listener} for all the events that aren't handled by any listener
* registered with {@link #register(String, ServerSentEventListener)}.
*
* @param listener The event handler.
*/
void register(ServerSentEventListener listener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.runtime.http.api.domain.sse;

import java.io.Serial;
import java.io.Serializable;
import java.util.Objects;

/**
* Server-sent event.
*/
public class ServerSentEvent implements Serializable {

@Serial
private static final long serialVersionUID = -1211505868025654629L;

private final String eventName;
private final String eventData;
private final String id;

public ServerSentEvent(String eventName, String eventData, String id) {
this.eventName = eventName;
this.eventData = eventData;
this.id = id;
}

/**
* @return the event name, the topic of the event.
*/
public String getEventName() {
return eventName;
}

/**
* @return the full data as string. // TODO: Add a method to iterate line-by-line.
*/
public String getEventData() {
return eventData;
}

/**
* @return event id.
*/
public String getId() {
return id;
}

@Override
public String toString() {
return "ServerSentEvent [name=" + eventName + ", data=" + eventData + ", id=" + id + "]";
}

@Override
public int hashCode() {
return Objects.hashCode(eventName) + Objects.hashCode(eventData) + Objects.hashCode(id);
}

@Override
public boolean equals(Object o) {
if (null == o || getClass() != o.getClass()) {
return false;
}
ServerSentEvent that = (ServerSentEvent) o;
return Objects.equals(eventName, that.eventName) && Objects.equals(eventData, that.eventData) && Objects.equals(id, that.id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.mule.api.annotation.NoImplement;
import org.mule.runtime.api.tls.TlsContextFactory;
import org.mule.runtime.http.api.HttpConstants.Protocol;
import org.mule.runtime.http.api.server.sse.SseClientHandler;
import org.mule.runtime.http.api.server.sse.SseHandlerManager;
import org.mule.runtime.http.api.server.ws.WebSocketHandler;
import org.mule.runtime.http.api.server.ws.WebSocketHandlerManager;

Expand Down Expand Up @@ -117,4 +119,8 @@ RequestHandlerManager addRequestHandler(final Collection<String> methods, final
default WebSocketHandlerManager addWebSocketHandler(WebSocketHandler handler) {
throw new UnsupportedOperationException("WebSockets are only supported in Enterprise Edition");
}

default SseHandlerManager sse(String ssePath, SseClientHandler sseClientHandler) {
throw new UnsupportedOperationException("Server-sent events are not supported");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.runtime.http.api.server.sse;

import java.io.IOException;

/**
* Server-side abstraction of a connected client.
*/
public interface SseClient extends AutoCloseable {

/**
* Sends an event to the client represented by this interface.
*
* @param name the event name (topic).
* @param data the data as string.
* @param id event id (used to resume an event stream on reconnection).
*/
void sendEvent(String name, String data, String id) throws IOException;

/**
* Equivalent to call {@code sendEvent(name, data, null);}
*
* @param name the event name (topic).
* @param data the data as string.
*/
default void sendEvent(String name, String data) throws IOException {
sendEvent(name, data, null);
}

/**
* Equivalent to call {@code sendEvent("message", data, null);}
* <p>
* Note: If you want to send a message without topic, you can call {@code sendEvent(null, data);}
*
* @param data the data as string.
*/
default void sendEvent(String data) throws IOException {
sendEvent("message", data, null);
}

/**
* Sends a comment.
*
* @param comment the comment. TODO: What's the format of a "comment"?.
*/
void sendComment(String comment);

/**
* The callback will be called when the client closes its connection.
*
* @param callback to be called when the client closes its connection.
*/
void onClose(Runnable callback);

/**
* Closes the connection.
*/
@Override
void close() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.runtime.http.api.server.sse;

/**
* Server-side of client connections.
*/
public interface SseClientHandler {

/**
* Callback to be invoked when each client is connected.
*
* @param sseClient server-side abstraction of a connected client.
*/
void handle(SseClient sseClient);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.runtime.http.api.server.sse;

public interface SseHandlerManager {

// start stop blah
}