In preparation for my DevNexus talk on Thursday I reviewed the commit log for the JAX-RS API for all the commits in the 2.1 cycle, to be sure I didn’t miss anything (See git://java.net/jax-rs-spec~api). There are basically three new big ticket features in JAX-RS 2.1.

  • Integration with Arbitrary Third-Party Reactive Frameworks

  • Server Sent Events, with support for Reactive Streams as specified in the Reactive Streams API

  • Reactive Client API

This blog post outlines each feature. For more details, see the DevNexus talk. Thanks to Pavel Bucek, Marek Potociar and Santigo Pericas-Geertsen for co-leading the JAX-2.1 Spec and in particular for the code samples I cite in this post.

Please note that this blog post documents in-progress features whose API will likely change before JAX-RS 2.1 is final.

Integration with Arbitrary Third-Party Reactive Frameworks

The JAX-RS EG acknowledges that the state of the art of reactive programming has already reached maturity, and several popular frameworks exist, while the Java SE 9 Flow API is not yet available. The reactive features in JAX-RS 2.1 are designed in the spirit of innovation happens elsewhere: you can bring your own reactive framework. The following example uses Flowable from RxJava2.

  1. Client client = ClientBuilder.newClient().register(RxFlowableInvokerProvider.class);
  2.  
  3. Flowable<Response> flowable = client.target("http://cloud.oracle.com")
  4.                                     .request()
  5.                                     .rx(RxFlowableInvoker.class)
  6.                                     .get();
  7.  
  8. flowable.subscribe(response -> {
  9.     // do something with a response when it arrives.
  10. });

The work of implementing the RxFlowableInvokerProvider on line 1 and the RxFlowableInvoker on line 5 is beyond the scope of this blog post, but it is expected that reactive framework providers (or third parties) will provide the necessary implementations of RxInvokerProvider for popular frameworks. This work was introduced in “commit f601630 Date: Thu Jan 12 10:31:41 2017 +0100”.

Server Sent Events (SSE)

This feature was introduced in “commit 37f8814 Date: Tue Oct 6 20:15:07 2015 +0200” and was substantially enhanced several times since, most notably with the addition of support for the Reactive Streams Standard from Lightbend. Reactive support was very recently introduced, in “commit b2b8f3f Date: Tue Jan 10 16:36:28 2017 +0100”. SSE is currently the only JAX-RS feature to use Reactive Streams, but it is specified without being particularly bound to that feature. SSE support in JAX-RS makes it easy to use this feature in a standard way, in both client and server.

Server Sent Events, SSE is a popular web transport technique specified by the W3C at https://www.w3.org/TR/eventsource/. The most common usage of SSE is to allow web applications to push one way asynchronous updates to browsers without having to maintain a million open sockets. This common usage is well documented at https://www.html5rocks.com/en/tutorials/eventsource/basics/. As an aside, because JAX-RS can function as both a client and a server technology, it is possible to use SSE in a two way fashion if you have JAX-RS on both ends of your client and server connection. Either way, JAX-RS 2.1 has you covered.

Let’s take a look at the most up-to-date manifestation of the SSE API in JAX-RS 2.1. First, a simple REST resource that implements an SSE server:

  1. @Path("server-sent-events")
  2. @Singleton
  3. public class ServerSentEventsResource {
  4.  
  5.     private final Object outputLock = new Object();
  6.     private final Sse sse;
  7.     private volatile SseEventSink eventSink;
  8.  
  9.     @Resource
  10.     private ManagedExecutorService executorService;
  11.  
  12.     @Inject
  13.     public ServerSentEventsResource(Sse sse) {
  14.         this.sse = sse;
  15.     }
  16.  
  17.     @GET
  18.     @Produces(MediaType.SERVER_SENT_EVENTS)
  19.     public void getMessageQueue(@Context SseEventSink eventSink) {
  20.         synchronized (outputLock) {
  21.             if (this.eventSink != null) {
  22.                 throw new IllegalStateException("Server sink already served.");
  23.             }
  24.             this.eventSink = eventSink;
  25.         }
  26.     }
  27.  
  28.     @POST
  29.     public void addMessage(final String message) throws IOException {
  30.         if (eventSink == null) {
  31.             throw new IllegalStateException("No client connected.");
  32.         }
  33.         eventSink.onNext(sse.newEvent("custom-message"));
  34.     }
  35.  
  36.     @DELETE
  37.     public void close() throws IOException {
  38.         synchronized (outputLock) {
  39.             if (eventSink != null) {
  40.                 eventSink.close();
  41.                 eventSink = null;
  42.             }
  43.         }
  44.     }
  45.  
  46.     @POST
  47.     @Path("domains/{id}")
  48.     @Produces(MediaType.SERVER_SENT_EVENTS)
  49.     public void startDomain(@PathParam("id") final String id,
  50.                             @Context SseEventSink eventSink) {
  51.  
  52.         executorService.submit(() -> {
  53.             try {
  54.                 eventSink.onNext(sse.newEventBuilder().name("domain-progress")
  55.                                     .data(String.class, "starting domain " + id + " ...").build());
  56.                 Thread.sleep(200);
  57.                 eventSink.onNext(sse.newEvent("domain-progress", "50%"));
  58.                 Thread.sleep(200);
  59.                 eventSink.onNext(sse.newEvent("domain-progress", "60%"));
  60.                 Thread.sleep(200);
  61.                 eventSink.onNext(sse.newEvent("domain-progress", "70%"));
  62.                 Thread.sleep(200);
  63.                 eventSink.onNext(sse.newEvent("domain-progress", "99%"));
  64.                 Thread.sleep(200);
  65.                 eventSink.onNext(sse.newEvent("domain-progress", "Done."));
  66.                 eventSink.close();
  67.             } catch (final InterruptedException | IOException e) {
  68.                 e.printStackTrace();
  69.             }
  70.         });
  71.     }
  72. }
  73.  

The constructor on line 13 receives the Sse class, formerly known as SseContext. This serves as a factory for instances of OutboundSseEvent, as seen on lines 33 and 57 - 65.

The method that processes GET requests starts on line 17. This method receives an SseEventSink via injection. In this simple example, the GET request is just made to cause the eventSink instance variable to be populated.

Now the magic starts to happen. SseEventSink is functionally a Flow.Subscriber, though in the Java EE 8 delivery of JAX-RS 2.1 the class will not actually extend Flow.Subscriber<T>. The first version of JAX-RS that does support Java SE 9 will do so. In either case, the necessary methods from that interface are implemented so the API will remain unchanged. The Java SE 9 Flow API is a reactive streams compliant API. You can see the onNext method of this interface being invoked within the POST processor on line 33. The meaning of onNext is simply, “here’s another thing in the stream of things for you to process”. In this case, the recipient of the new thing is whatever parties are connected to the other end of the SSE connection.

A more advanced POST processor is found starting on line 46, for the path of domains/{id}. A post to this path causes an asynchronous action to commence using the ExecutorService. This example illustrates the process of starting an appserver domain on line 54. Several status updates are sent on lines 56 - 65 and then the eventSink is closed on line 67.

Rounding out the example, a DELETE processor simply closes the eventSink on demand, on line 36.

I feel the SSE Client API is more interesting in a sense because it enables the less common use case of a non-JavaScript SSE client. Let’s take a look at the SSE Client API in JAX-RS 2.1.

  1. public class SseClient {
  2.  
  3.     public static WebTarget target;
  4.  
  5.     public static void main(String[] args) {
  6.         target = ClientBuilder.newClient().target(args[0]);
  7.         consumeEventsViaSubscription();
  8.     }
  9.  
  10.     private static void consumeEventsViaSubscription() {
  11.         try (final SseEventSource eventSource =
  12.                      SseEventSource.target(target)
  13.                                    .build()) {
  14.  
  15.             eventSource.subscribe(System.out::println);
  16.             eventSource.open();
  17.  
  18.             for (int counter = 0; counter < 5; counter++) {
  19.                 target.request().post(Entity.text("message " + counter));
  20.             }
  21.  
  22.             Thread.sleep(500); // make sure all the events have time to arrive
  23.         } catch (InterruptedException e) {
  24.             e.printStackTrace();
  25.         }
  26.     }
  27. }
  28.  

As in the server case, SseEventSource is functionally equivalent to an instance of Flow.Publisher<InboundSseEvent>, and will literally be one in the JAX-RS version that supports Java SE 9. It does, however, have all necessary the methods of that interface. In the above example, the subscribe() method is seen on line 15. Here we are simply printing out the messages received from the server. Then, on lines 18 - 20 we are issuing a few POST requests to the server.

Reactive Client API

This other major feature for JAX-RS 2.1 was introduced in “commit dea3922 Date: Wed Aug 26 10:26:26 2015 -0400”: the Reactive Client API. This API builds entirely on two foundations: Java 8 CompletableFuture and the Invocation API from JAX-RS 2.0. The former is capably explained by Thomas Nurkiewicz whom I had the pleasure of seeing present at GeekOut 2015. The latter is a way to compose client requests and cause them to be processed in the proper order to enable assembling the responses. In the case of the Reactive Client API, these responses are asynchronous and fully reactive compliant. The root interface of the API is RxInvoker. This interface has Java methods for all of the standard HTTP methods, as well a generic “method” method for arbitrary methods. The API is usually invoked as part of a fluid call chain.

The full details of both of these foundational features are too deep to describe in this blog post, but the following simple example gives a flavor of their power.

  1. Client client = ClientBuilder.newClient();
  2. URI service = URI.create("http://localhost/service/resource");
  3.  
  4. CompletionStage<Response> cs1 = client.target(service).request.rx().post(Entity.text("1"));
  5. CompletionStage<Response> cs2 = client.target(service).request.rx().post(Entity.text("2"));
  6.  
  7. cs1.thenCombine(cs2, (r1, r2) -> {
  8.   String s1 = r1.readEntity(String.class);
  9.   String s2 = r2.readEntity(String.class);
  10.  
  11.   return client.target(service).request.rx().post(Entity.text(s1 + s2));
  12. }).thenAccept(responseCompletionStage -> responseCompletionStage.thenAccept(r3 -> {
  13.   String s3 = r3.readEntity(String.class);
  14.  
  15.   System.out.println(s3);
  16. }));

This example shows a reactive client that issues three separate POST requests asynchronously, in a non-blocking fashion, and assembling the result for printing, on line 15.

Appendix, work in progress Non-Blocking IO

Another important feature is in the works for JAX-RS 2.1, a Non-Blocking IO API. An early draft of this feature was introduced in “commit e997a32 Date: Thu Sep 3 14:38:00 2015 -0400”

In its final form, this work will likely use the Flow API in a similar way that the SSE support does.

Annotated Commit Log Messages

The remainder of the blog post is a review of all the commits to the JAX-RS 2.1 API since the start of the JSR.

commit a2b9dfe Date: Fri Feb 10 19:09:46 2017 +0100

Refine javadoc for javax/ws/rs/sse/SseEventSource.java. Fix the type on the reference to the accept() method.

commit 4d79c56 Date: Fri Feb 10 16:28:15 2017 +0100

Rename Sse.newEvent() to newEventBuilder(). Rename Sse.newDataEvent() to newEvent(). Update example accordingly.

commit d62ed77 Date: Fri Feb 10 15:41:36 2017 +0100

In javax/ws/rs/sse/SseBroadcaster.java, in onException() and onClose(), modify the type of the argument to be less specific in the inheritance hierarchy. It’s just a FlowSubscriber<> now, of which SseEventSink is an implementation.

Refine jaxrs/examples/sse/ItemStoreResource.java example accordingly.

commit 8a61f14 Date: Wed Feb 8 09:03:40 2017 +0100

Add newDataEvent() to javax/ws/rs/sse/Sse.java. This creates an OutputboundSseEvent.

commit b036972 Date: Wed Feb 8 09:43:51 2017 +0100

Refine Javadoc of RxInvokerProvider.

commit 459ddb6 Date: Wed Feb 8 18:14:31 2017 +0100

This was the big refactor of the Sse API.

  • Drop SseClientSubscriber and remove usages. Functionality taken over by new class javax/ws/rs/sse/SseSubscription.java

  • Drop SseEventInput

  • Rename SseEventOutput to SseEventSink.

  • Rename SseContext to Sse.

commit 2ae0169 Date: Thu Feb 2 18:43:14 2017 +0100

Introduce javax/ws/rs/sse/SseClientSubscriber.java, an implementation of Flow.Subscriber(). Drop open() from SseEventSource. This class was removed shortly after this commit.

Refine SseClient example to use SseClientSubscriber instead of open().

commit 4e87a23 Date: Fri Jan 13 14:29:09 2017 +0100

Rework commit f601630, to remove Class<? extends RxInvokerProvider> from the signatures, replacing with Class. Added isProviderFor() method to javax/ws/rs/client/RxInvokerProvider.java. Modify getRxInvoker() to take a SyncInvoker instead of Invocation.Builder.

Updated example javax/ws/rs/core/RxClientTest.java

commit f601630 Date: Thu Jan 12 10:31:41 2017 +0100

This was reworked in a subsequent commit.

Refine javax/ws/rs/client/Invocation.java. Add an rx() method that takes an RxInvokerProvider. This enables other types of asynchronous computation models, such as the RxJava. There is also a variant that takes an ExecutorService.

Add javax/ws/rs/client/RxInvokerProvider.java.

Add example javax/ws/rs/core/RxClientTest.java.

commit b2b8f3f Date: Tue Jan 10 16:36:28 2017 +0100

Refine jaxrs/examples/sse/ItemStoreResource.java and jaxrs/examples/sse/ServerSentEventsResource.java to use Java SE 8 lambdas.

Added class javax/ws/rs/Flow.java. This is the implementation of the Reactive Streams API. This is the big class here.

Rework Sse feature to use the Flow API. In particular, make SseEventInput an implementation of Flow.Publisher and SseEventOutput an implementation of Flow.Subscriber. Also make SseEventSource an implementation of Flow.Publisher.

Rework Sse feature to flesh out some hitherto missing Javadoc.

Adde a package-info.java for Sse.

commit 46d81cf Date: Thu Oct 15 14:52:43 2015 +0200

Refine ServerSentEventsResource example.

commit 37f8814 Date: Tue Oct 6 20:15:07 2015 +0200

First draft of Server Sent Events support. Since this is one of the biggest new features in JAX-RS-2.1, I’ll discuss it in its final form.

This commit added examples of using SSE in classes jaxrs/examples/sse/ItemStoreResource.java, jaxrs/examples/sse/SseClient.java and examples/sse/ServerSentEventsResource.java. These classes are still in the codebase.

  • Added forInstance() that takes an Object and returns GenericType to javax/ws/rs/core/GenericType.java. This uses the GenericEntity.getType() to return the type, or just returns the Java Class.

  • Added LAST_EVENT_ID_HEADER constant to javax/ws/rs/core/HttpHeaders.java. This constant is used for the “Last-Event-ID” header in the SSE spec http://www.w3.org/TR/eventsource/#last-event-id

  • Added text/event-stream to javax/ws/rs/core/MediaType.java.

  • Added javax/ws/rs/sse/FactoryFinder.java. Looks like this is used from SseEventSource to instantiate an SseEventSource.Builder. This class has been moved in a later revision of the proposal.

  • Added javax/ws/rs/sse/InboundSseEvent.java. An SseEvent subclass for events when JAX-RS client is listening to a server that is sending SSE?

  • Added javax/ws/rs/sse/OutboundSseEvent.java. An SseEvent subclass for events when JAX-RS is acting as the server sending SSE events.

  • Added javax/ws/rs/sse/SseBroadcaster.java. The thing that causes OutboundSseEvents to be sent to the client.

  • Added javax/ws/rs/sse/SseContext.java. To make the SSE feature injectable. This class has been renamed to Sse in a later version of the feature.

  • Added class java/javax/ws/rs/sse/SseEvent.java, the event base class.

  • Added javax/ws/rs/sse/SseEventInput.java. Lets you read InboundSseEvents as they arrive. This class has been droped in a later version of the feature.

  • Added javax/ws/rs/sse/SseEventOutput.java. Lets you write OutboundSseEvents. This class has been renamed to SseEventSink a later version of the feature.

  • Added java/javax/ws/rs/sse/SseEventSource.java. Lets you read InboundSseEvents.

commit f1b7167 Date: Thu Sep 3 14:41:56 2015 -0400

  • Remove onWritePossible() from Response.

commit e997a32 Date: Thu Sep 3 14:38:00 2015 -0400

This commit added some experimental work for the use of non-blocking IO in the JAX-RS spec. There are helpful examples in jaxrs/examples/nio/FileResourceClient.java and jaxrs/examples/nio/FileResource.java.

This commit introduced the following classes:

  • javax/ws/rs/client/NioInvoker.java

  • javax/ws/rs/core/NioCompletionHandler.java

  • javax/ws/rs/core/NioErrorHandler.java

  • javax/ws/rs/core/NioInputStream.java

  • javax/ws/rs/core/NioOutputStream.java

  • javax/ws/rs/core/NioReaderHandler.java

  • javax/ws/rs/core/NioWriterHandler.java

And modified the following classes:

  • javax/ws/rs/core/Request.java and javax/ws/rs/core/Response.java — Add entity methods that take the Nio*Handler classes above. For Response, also add an onWritePossible() method that takes the NioWriterHandler.

  • javax/ws/rs/client/Invocation.java — Return the NioInvoker from Invocation.nio().

Since this is one of the biggest new features in JAX-RS 2.1, I’ll discuss it in its final form later. Note that the above classes are all still present in the API as of today.

commit dea3922 Date: Wed Aug 26 10:26:26 2015 -0400

The first iteration of the RxInvoker proposal. This commit added the following classes:

  • javax/ws/rs/client/CompletionStageRxInvoker.java

  • javax/ws/rs/client/RxInvoker.java

And introduced changes to the following classes

  • javax/ws/rs/client/Invocation.java

A helpful test is at javax/ws/rs/core/RxClientTest.java.

Since this is one of the biggest new features in JAX-RS 2.1, I’ll discuss it in its final form later. Note that the above classes are all still present in the API as of today.

commit 77464a5 Date: Wed Feb 18 16:41:44 2015 +0100

Remove the LaTeX sources to a separate repo. git://java.net/jax-rs-spec~spec

commit 0dbabe25 Date: Wed Feb 4 13:28:31 2015 -0500

Spec prose document section 3.2 specifies how Java language fields and JavaBean properties of Resource classes are injected with values from the incoming request, all of which are coming over HTTP and are therefore just plain strings. That section includes a list that describes the types of valid values to which those strings can be converted, let’s call them destinations for discussion. This commit added List, Set, and SortedSet to set of possible destinations, where T meets at least one of the following requirements.

  • T has a ParamConverter

  • T has a public constructor that takes a single String.

  • T has a public static valueOf() method that takes a String and returns an instance of T.

The commit also tightened up what happens when a WebApplicationException is thrown during the conversion process.