Monday, 3 March 2014

Asynchronous Communication with HTTP

Requirements and Solutions

Sometimes using of native asynchronous transports like JMS is not possible because of technical or political reasons, but asynchronous communication is required anyway.
CXF does have a facilities to help with these types of requirements:
  1. Using WS-Addressing decoupled response
  2. Using independent oneway operations

WS-Addressing decoupled response

Decoupled response uses different HTTP channel to send the response. Client in this case acts as a service as well: it opens independent HTTP connection and can receives the messages. The communication is shown on the following picture (used from FuseSource resource)

  1. The consumer implementation invokes an operation and a request message is generated.
  2. The WS-Addressing layer adds the WS-A headers to the message. When a decoupled endpoint is specified in the consumer's configuration, the address of the decoupled endpoint is placed in the WS-A ReplyTo header.
  3. The message is sent to the service provider.
  4. The service provider receives the message.
  5. The request message from the consumer is dispatched to the provider's WS-A layer.
  6. Because the WS-A ReplyTo header is not set to anonymous, the provider sends back a message with the HTTP status code set to 202, acknowledging that the request has been received.
  7. The HTTP layer sends a 202 Accepted message back to the consumer using the original connection's back-channel. The consumer receives the 202 Accepted reply on the back-channel of the HTTP connection used to send the original message.
  8. When the consumer receives the 202 Accepted reply, the HTTP connection closes.
  9. The request is passed to the service provider's implementation where the request is processed.
  10. When the response is ready, it is dispatched to the WS-A layer.
  11. The WS-A layer adds the WS-Addressing headers to the response message.
  12. The HTTP transport sends the response to the consumer's decoupled endpoint.
  13. The consumer's decoupled endpoint receives the response from the service provider.
  14. The response is dispatched to the consumer's WS-A layer where it is correlated to the proper request using the WS-A RelatesTo header.
  15. The correlated response is returned to the client implementation and the invoking call is unblocked.
 Configuring decoupled response is trivial: it is necessary to activate WS-Addressing Feature on client and service and provide ReplyTo and FaultTo addresses on the client:

Activation of WS-Addressing Feature

For <jaxws:endpoint>:
<jaxws:endpoint id="{your.service.namespace}YourPortName">
  <jaxws:features>
    <wsa:addressing xmlns:wsa="http://cxf.apache.org/ws/addressing"/>
  </jaxws:features>
</jaxws:endpoint>
For <jaxws:client>:
<jaxws: client id="{your.service.namespace}YourPortName">
  <jaxws:features>
    <wsa:addressing xmlns:wsa="http://cxf.apache.org/ws/addressing"/>
  </jaxws:features>
</jaxws:client>

Configuring WSA Properties

Client should configure ReplyTo WS-Addressing property to receive decoupled response:
((BindingProvider)proxy).getRequestContext()
    .put("org.apache.cxf.ws.addressing.replyto", "http://localhost:9090/decoupled_endpoint");
Alternatively, client can use CXF AddressingPropertiesImpl object to control many aspects of WS-Addressing including ReplyTo:
AddressingProperties maps = new AddressingPropertiesImpl();
EndpointReferenceType ref = new EndpointReferenceType();
AttributedURIType add = new AttributedURIType();
ref.setAddress(add);
maps.setReplyTo(ref);
maps.setFaultTo(ref);
  
((BindingProvider)proxy).getRequestContext()
    .put("javax.xml.ws.addressing.context", maps);

Under the Hood

CXF makes the following steps on the service side by receiving the client request:
  1. Creates empty response message (partial response)
  2. Initializes new interceptor chain: adds all registered interceptors for endpoint, service, bus and binding
  3. Replaces interceptor chain to the new one
  4. Creates and replaces ConduitSelector in exchange to PreexistingConduitSelector with original back channel
  5. Invokes chain.doIntercept() and sends empty response (HTTP status 202)
  6. Resets the interceptors chain
  7. Creates and initializes new message for real response
  8. Creates decoupled destination for the client
  9. Suspends current interceptor chain for full response and resumes that in new thread
  10. Service handler will be called from ServiceInvokerInterceptor in new thread and response will be delivered to decoupled destination
Note, that CXF automatically cares about all communication aspects: opens client endpoints, sends responses to the correct endpoint, correlates the requests and response.

Samples

I have created two samples illustrating decoupled responses on Git Hub respository:
  1. Simple scenario based on Java First client and service
  2. Dispatching scenario based on Provider<T> service and Dispatch client containing code to dispatch responses to registered callbacks

Independent Oneway Operations

WS-Addressing decoupled response is very useful for most of cases, but sometimes you need more flexibility. For example I have seen the requirements to support more than one response to single client request (multi-response conversation) or to persist service state during request processing. In this case you can consider to use two independent oneway operations: one on the service side and second on the client side:
The client registers own endpoint and uses it to receive responses from the service. This solution is very flexible, you can fulfill requirements like:
  • receive some responses for the same request;
  • dispatch responses to different handlers on the client side;
  • persist service state after receiving request, shutdown the service, start up it again when response is ready and send response to client endpoint.
The downside of this flexibility is that approach is more involved in implementation: you should explicitly register endpoint on the client side, send responses on the service side using client, correlate request and response messages.

Samples

Sample illustrating using of independent oneway operations is uploaded to Git Hub repository

Sunday, 2 March 2014

Native Asynchronous Transports: JMS

JMS Transport 

 JMS transport is asynchronous from nature and perfectly fits to client and service asynchronous communications. JMS transport can be used together with CXF Decoupled response.
Using of JMS transport is illustrated in following samples:

CXF Asynchronous API

Client API
WSDL asynchronious binding
If you use WSDL first approach and code is generated, it will be necessary to create asynchronous binding declaration using following template:

<bindings xmlns:xsd="http://www.w3.org/2001/XMLSchema"
          xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
          wsdlLocation="<at:var at:name="WSDL_LOCATION" />/hello_world_async.wsdl"
          xmlns="http://java.sun.com/xml/ns/jaxws">
  <bindings node="wsdl:definitions">
    <enableAsyncMapping>true</enableAsyncMapping>
  </bindings>
</bindings>


This node attribute in binding is an XPath value that specifies which node (or nodes) from the WSDL contract are affected by this binding declaration. In this template node is set to wsdl:definitions, that means entire WSDL contract to be affected.
Than you should add file containing this binding as parameter to cxf-codegen-plugin:

<plugin>
   <groupId>org.apache.cxf</groupId>
   <artifactId>cxf-codegen-plugin</artifactId>
   <version>${project.version}</version>
   <executions>
       <execution>
          <id>generate-sources</id>
          <phase>generate-sources</phase>
           <configuration>
               <wsdlOptions>
                  <wsdlOption>
                      <wsdl>${basedir}/wsdl/hello_world_async.wsdl</wsdl>
                       <frontEnd>jaxws21</frontEnd>
                        <extraargs>
                           <extraarg>-b</extraarg>
                           <extraarg>${basedir}/wsdl/async_binding.xml</extraarg>
                        </extraargs>
                    </wsdlOption>
                 </wsdlOptions>
             </configuration>
             <goals>
                 <goal>wsdl2java</goal>
             </goals>
      </execution>
   </executions>
</plugin>


To each synchronous method Code generator will add two new asynchronous ones:

public Future<?> methodNameAsync(Type requestType, AsyncHandler<Type> asyncHandler);

public Response<Type> methodNameAsync(Type requestType);

Now you can invoke service method in non-blocking manner on client side. Both mechanisms java Future and callback are available. CXF illustrates this approach in sample and documents in wiki.

Dispatch and Dynamic Clients
If you use Dispatch client API, the method invokeAsync is available for non-blocking calls:
Future<?> invokeAsync(T msg, AsyncHandler<T> handler)
Method for non-blocking call is also available in CXF Dynamic Client:
void invoke(ClientCallback callback, BindingOperationInfo oi, Object... params)

Service API
Asynchronous API on service side means that service implementation can start a new thread for processing request, return from the operation method and set the response in new thread later:

public Future<?> myMethodAsync(final String me,
                                           final AsyncHandler<String> asyncHandler) {     
        final ServerAsyncResponse<MyType> r
            = new ServerAsyncResponse<MyType>();
        new Thread() {
            public void run() {
                MyTyperesp = new MyType();
                resp.setResponseType("[async] How are you " + me);
                r.set(resp);
                asyncHandler.handleResponse(r);
            }
        } .start();
        return r;
    }


CXF provides @UseAsyncMethod annotaion to achieve service side asynchronicity. How it works? Basically very simple:

@UseAsyncMethod
public String myMethod(final String me){
...
}

public Future<?> myMethodAsync(final String me,
                               final AsyncHandler<String> asyncHandler) {
...
}


You should do two things:
  • introduce new method with original name and "Async" suffix. This method will have Future<Type> as return type and AsyncHandler<Type> as additional argument.
  •  add @UseAsyncMethod annotation to original method.
Important here is that second asynchronous method will be invoked ONLY if transport supports asynchronous request processing, otherwise CXF will call first synchronous method. Asynchronous request processing will be supported in following cases:
  1. JMS transport
  2. Jetty Continuations (supported starting from Jetty 6)
  3. Servlet 3.0 API
  4. CXF Decoupled responses
Notice: this approach works not only for generated code, but for Java first and even for Provider<T> based services. In the last case you need to add the second invokeAsync method:

@WebServiceProvider
@ServiceMode(value = Service.Mode.PAYLOAD)
public class ServiceHandler implements Provider<StreamSource> {
   
    @Override
    @UseAsyncMethod
    public StreamSource invoke(StreamSource request) {
     ...
    }
   
    public Future<?> invokeAsync(final StreamSource s, final AsyncHandler<Source> asyncHandler) {
     ...    }
}


Metro provides a bit different approach to support service side asynchronous API using AsyncProvider for Provider based services. Benefit of the CXF @UseAsyncMethod annottaion is that it works for different use cases: generated stubs, java first stubs, Provider based services.

This sample and system test illustrate using of @UseAsyncMethod annotation in different cases.
 

CXF Asynchronous Features Overview

Introduction
Asynchronous communication is very important to achieve services scalability. It also plays essential role in use cases, when service responses with certain delay.
In this post I tried to classify and summarize the different aspects of asynchronous communication for WS- stack providing in Apache CXF and illustrate them with examples.

Asynchronous Levels
Architecturally I would separate two different levels of asynchronous communication: 
  1. API  level
  2. Transport level
API level
API level provides a possibility to implement client or service code in non-blocking manner. Normally this can be achieved by using:
  • Pooling approach based on java Future 
  • Pushing approach using callbacks
Non-blocking API does not automatically mean that physical communication is asynchronous: transport can still consume a separate thread and communicate synchronously even if user code is based on non-blocking API.

Transport level 
There are following alternatives for transport level of asynchronous communication:
  • Native asynchronous transports (JMS, SMTP): provide decoupled communication from nature. Response can be sent completely independent on request.
  • Simulation of decoupled communication using synchronous transports (HTTP decoupled response): simulates natively asynchronous transports using separate communication channels for request and response.
  • Asynchronous thread model in synchronous transports (continuations and non-blocking IO model): provides a possibility to reuse threads for multiple communications in synchronous transports
I try to summarize these asynchronous options and aspects supported by CXF in following table:
Client Server
API WSDL asynchronious binding, Dispatch.invokeAsync() @UseMethodAsync annotation
Transport [async] JMS JMS
Transport [simulation sync] WSA decoupled responses, independent oneways WSA decoupled responses, independent oneways
Transport [async thread model] HttpAsyncClient (NIO) Servlet 3 API, Jetty Continuations

Next posts will describe each of these CXF features.