Wednesday, December 21, 2011

Understanding the Axis2 Non Blocking API


The Non blocking api of axis2 facilitates asynchronous communication at the API level and at the Transport level.
This document provides an understanding of how it works. The example in question uses dual transport channel.
For this feature to work it is necessary to enable WS-Addressing on both the client and the server hosting the external web services.

Scenario 1: The external web service being invoked provides response in the same execution thread. 

                      Fig: 1                                                  


1) The axis client program prepares the request and invokes the  sendReceiveNonBlocking(payload,callback) on the ServiceClient.

2) The call is propagated to the AxisEngine, which then starts a Http transport listener on a predefined port specified in axis2.xml and registers the callback object on the listener. The response for the request would be communicated to this callback listener at a later point.

3) A separate request execution thread is created and all the necessary objects are passed to it for further execution.

4) The call returns back to the client program. The client program can continue with processing without having to wait for the response. Thus we achieve API level asynchrony.

5) The request execution thread opens a transport out channel and sends the request data to the external web service.

6) The external web service upon completion of its operation, communicates the response back to the callback listener through another transport out channel. Thus we have dual channels for communication.

7) Finally HTTP 202 Ack is sent to the request execution thread through the initial channel in which the request was made. It is observed that the response and the 202 Ack comes back at almost the same time.

Since the request channel is expecting the acknowledgment all this time, there is possibility that it can timeout if the external web service takes more time to respond. This problem can be solved partially by setting the connection timeout to an appropriate value.

//connection timeout set to 60 seconds
options.setTimeOutInMilliSeconds(60 * 1000);


Scenario 2: The external web service being invoked provides response in a separate execution thread.

To enable this feature on a web service developed on axis2, an additional parameter has to be added in the services.xml for the web service.
<parameter name="messageReceiver.invokeOnSeparateThread"> true </parameter>
 

Fig:2
                                                                 

1) The axis client program prepares the request and invokes the sendReceiveNonBlocking(payload,callback) on the ServiceClient.

2) The call is propagated to the AxisEngine, which then starts a Http transport listener on a predefined port specified in axis2.xml and registers the callback object on the listener. The response for the request would be communicated to this callback listener at a later point.

3) A separate request execution thread is created and all the necessary objects are passed to it for further execution.

4) The call returns back to the client program. The client program can continue with processing without having to wait for the response. Thus we achieve API level asynchrony.

5) The request execution thread opens a transport out channel and sends the request data to the external web service.

6) HTTP 202 Ack is sent to the request execution thread immediately, and thus closing this channel.

7) The external web service upon completion of its operation, communicates the response back to the callback listener from a separate thread through another transport out channel. Thus we have dual channels for communication.

Since the request channel is acknowledged immediately and the response is sent from another execution thread over another transport channel, there is no connection timeout issues encountered. Thus we achieve a full asynchronous behavior.


Sample client code: EchoNonBlockingDualClient.java
        ServiceClient sender = null;
        try {
            OMElement payload = ClientUtil.getEchoOMElement();

            Options options = new Options();
            options.setTo(targetEPR);
            options.setAction("urn:echo");
            options.setTransportInProtocol(Constants.TRANSPORT_HTTP);
            options.setUseSeparateListener(true);            
            // this is the action mapping we put within the service.xml
            options.setAction("urn:echo");  

            //timeout needs to be set incase of scenario 1 explained above
            options.setTimeOutInMilliSeconds(60 * 1000);

            //Callback to handle the response
            MyAxisCallback callback = new MyAxisCallback();
           
            //Non-Blocking Invocation
            sender = new ServiceClient();
            sender.engageModule(Constants.MODULE_ADDRESSING);
            sender.setOptions(options);
            
            sender.sendReceiveNonBlocking(payload, callback);
            
            int i=0;
            //Wait till the callback receives the response.           
            while (!callback.isComplete()) {
                Thread.sleep(1000);
                System.out.println("Waiting since:"+ ++i + " seconds");                
            }
            
        } catch (AxisFault axisFault) {
            axisFault.printStackTrace();
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            try {
                System.out.println("cleaning called");
                sender.cleanup();
                System.out.println("cleaning completed");
            } catch (AxisFault axisFault) {
                //have to ignore this
                System.out.println("cleaning error");                 
            }
        }


The use of separate request and response channels are indicated to the axis engine by setting the options 

options.setUseSeparateListener(true);

To check whether the response has been communicated to the callback from the external web service, we can poll the callback like given below.
 

while (!callback.isComplete()) {
    Thread.sleep(1000);
    System.out.println("Waiting since:"+ ++i + " seconds");               
}

Finally, we have to close all the resources which includes the client side callback listener. This is achieved by

finally{
    try{
          sender.cleanup();
    }catch(AxisFault af){
          //ignore this
    }
}
 

Here are the steps to compile and run the sample:

1) Download the appropriate Axis2 binary from apache axis2 website.The binary used to run the same was axis2-1.6.0. Extract the archive to the local drive.
 
2) Make sure that the ws-addressing module is enabled in the axis2.xml under axis2-1.6.0/conf directory.
Uncomment the line <module ref="addressing" />

3) cd axis2-1.6.0/samples/userguide

4) axis2-1.6.0/samples/userguide> ant
The deployment archives are copied to the axis2-1.6.0/repository/services

5) Run the axis2 server by executing the command at the terminal. 
axis2-1.6.0/bin> ./axis2server.sh

6) Run the client program by executing the command  
axis2-1.6.0/samples/userguide> ant run.client.nonblockingdual



Sample Request-Response:
POST /axis2/services/MyService HTTP/1.1
Content-Type: text/xml; charset=UTF-8
SOAPAction: "urn:echo"
User-Agent: Axis2
Host: 127.0.0.1:8080
Transfer-Encoding: chunked

288
<?xml version='1.0' encoding='UTF-8'?>
    <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
        <soapenv:Header xmlns:wsa="http://www.w3.org/2005/08/addressing">
            <wsa:To>http://127.0.0.1:8080/axis2/services/MyService</wsa:To>
            <wsa:ReplyTo>
                <wsa:Address>http://10.10.2.134:7070/axis2/services/anonService2/</wsa:Address>
            </wsa:ReplyTo>
            <wsa:MessageID>urn:uuid:b683fc84-a02e-49a4-a6a6-a4f5ca6b962d</wsa:MessageID>
            <wsa:Action>urn:echo</wsa:Action>
        </soapenv:Header>
        <soapenv:Body>
            <example1:echo xmlns:example1="http://example1.org/example1">
                <example1:Text>Axis2 Echo String </example1:Text>
            </example1:echo>
        </soapenv:Body>
    </soapenv:Envelope>
0
Response:
<?xml version='1.0' encoding='utf-8'?>
    <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
         <soapenv:Header xmlns:wsa="http://www.w3.org/2005/08/addressing">
             <wsa:To>http://10.10.2.134:7070/axis2/services/anonService2/</wsa:To>
             <wsa:ReplyTo><wsa:Address>http://www.w3.org/2005/08/addressing/none</wsa:Address></wsa:ReplyTo>
             <wsa:MessageID>urn:uuid:bec091be-7b92-40e4-8416-2b49863ceb53</wsa:MessageID>
             <wsa:Action>urn:echoResponse</wsa:Action>
             <wsa:RelatesTo>urn:uuid:b683fc84-a02e-49a4-a6a6-a4f5ca6b962d</wsa:RelatesTo>
         </soapenv:Header>
         <soapenv:Body>
             <example1:echo xmlns:example1="http://example1.org/example1">
                 <example1:Text>Axis2 Echo String </example1:Text>
             </example1:echo>
         </soapenv:Body>
     </soapenv:Envelope>