XmlBlaster Logo

REQUIREMENT

interface.update

XmlBlaster Logo


Type NEW
Priority HIGH
Status CLOSED
Topic Clients receive asynchronous messages with the update() method
Des
cription

If clients want to receive callbacks from xmlBlaster they need to install a callback server. The messages arrive in the update() method.
The client side callback interface has to support following methods.

Callback server method detail (CORBA IDL notation):

   typedef string XmlType;
   typedef sequence<XmlType> XmlTypeArr;
   typedef sequence<MessageUnit> MessageUnitArr;

   XmlTypeArr update(in string cbSessionId, in MessageUnitArr msgUnitArr)
                                                     raises(XmlBlasterException);

   oneway void updateOneway(in string cbSessionId, in MessageUnitArr msgUnitArr);

   string ping(in string qos);
      

As you can see the client has to provide three methods (see xmlBlaster.idl)

  • update() This is the acknowledged update invocation
  • updateOneway() This method has no return value, there is no application level ACK for increased performance
  • ping() Allows xmlBlaster to ping the client, currently empty strings "" are expected

As default messages are sent acknowledged, if a client chooses to receive oneway messages it has to be configured with the connect QoS during connect() or during subscribe().

We discuss here update() only as updateOneway() is just a subtype without return value.

ParameterTypeDescription
cbSessionIdstringA session ID provided by the client on connect() by setting the secretCbSessionId in the callback address. It is not the normal sessionId used for the connection. This allows the clients callback server to decide if it trusts the callback
msgUnitArrMessageUnit[]An array of messages
returnstring An array of XML encoded strings containing the status of each message. This is the status from the client point of view. The syntax is not specified yet, it will be specified when xmlBlaster gets transaction support and clients want to participate. For the time being return an empty string "" or an empty qos "<qos/>".
XmlBlasterException exception Thrown on error. The only allowed error codes are of type ErrorCode.USER_UPDATE_..., this is for example user.update.internalError or user.update.security.authentication.accessDenied See ErrorCode.java for a complete list of error codes.

Behavior on exceptions thrown by a client in update()

If the update() methods throws an exception of type ErrorCode.USER_UPDATE_* (for example "user.update.error") the server forwards the rejected message to its error handler. The current error handler is implemented to publish this message as a 'dead letter', that is a message with the key oid='__sys__deadMessage'.
You can write a client which subscribes to such dead messages and handles them appropriate, or at least activate the DeadMessageDumper in xmlBlasterPlugins.xml as described in the requirement admin.errorHandling.
After the message is published as a dead letter it is removed from the clients callback queue and the next one is processed.

If your client throws a USER_UPDATE_HOLDBACK="user.update.holdback" exception the behavior is different, the server keeps the message in its callback queue and sets the dispatcherActive=false, that means that the xmlBlaster server goes to sleep for this client and won't deliver further update messages. It is now up to an administrative command to activate the callback dispatcher again to try to redeliver the queued messages.
This is done manually over the jconsole or by sending a key oid __cmd:client/receiver/1/?dispatcherActive=true, for more details please see requirement admin.commands.

If your client throws a USER_UPDATE_DEADMESSAGE="user.update.deadMessage" exception the behavior is different, the server removes the message from its callback queue and forwards the message as a 'dead message'. Like this a client can reject a poison message and continue processing the next one. It is now up to an administrator to analyze the dead message at a later point.

Please note that in the Java client library all other exception types thrown in your update() method are converted to an ErrorCode.USER_UPDATE_INTERNALERROR="user.update.internalError" before being propagated to the server.
If clients written in other programming languages send a weird exception error code, it is handled by the server as "user.update.internalError" (for Corba access since ever, for SOCKET and XMLRPC since xmlBlaster 1.3, before it was a "internal.unknown" or "internal.illegalargument").
To be more precise, every exception caught by the server from the callback clients which is not of type ErrorCode.USER_*="user.*" or ErrorCode.COMMUNICATION_*="communication.*" is automatically handled as ErrorCode.USER_UPDATE_INTERNALERROR="user.update.internalError" and the above rules apply.

If the server gets a communication exception ErrorCode.COMMUNICATION_* (for example "communication.responseTimeout") from its dispatcher framework it assumes that the client has never seen or processed the message and retries to deliver it as configured: In fail safe mode the dispatcher framework goes to status POLLING and queues messages until the client reconnects else the status goes to DEAD and the message is published as a dead message. A communication exception occurs for example if the network is down or if the client has disappeared.
The main configuration settings to control the redeliver behavior are -dispatch/callback/retries and -dispatch/callback/delay.

Behavior on blocking/timeout during a update() call:

When the update() callback method is invoked we wait a given time on a response. This setting is depending on the communication protocol used. The response is the return value or an exception for method invocations which are not marked oneway. On timeout an exception of type communication.* is thrown which leads on server side to a dead message (depending on the installed error handler) or if -dispatch/callback/retries is not 0 to a POLLING dispatcher.
For the SOCKET protocol see plugin/socket/updateResponseTimeout, it defaults to forever - that means the update() callback does never timeout.

A typical fail safe subscriber needs to set at least:

java javaclients.HelloWorldSubscribe -session.name SUBSCRIBER/1 -dispatch/callback/retries -1

Summary

Exception errorCode thrownMessage HandlingDescription
user.* dead message If any user exception is thrown the dispatcher callback state goes to DEAD and all arriving messages will be redirected as dead messages. (See "user.update.holdback" and "user.update.deadMessage" for different behavior).
user.update.holdback queued in callback queue The dispatcher will change to inactive and all arriving messages are queued (dispatcherActive=false).
user.update.deadMessage dead message Since xmlBlaster > v2.0: The message is removed from the callback queue and the client can continue processing the follow up messages.
communication.* queued in callback queue On communication problems the dispatcher goes to POLLING depending on the configuration of -dispatch/callback/retries, if retries is set to -1 we poll forever, if retries is > 0 we poll the given number before we give up and go to DEAD.
Depending on the configuration setting -dispatch/callback/delay xmlBlaster tries to reconnect again to the clients callback server. During this time all arriving messages are safely queued.
communication.* and retries=0 dead message On communication problems the dispatcher goes to DEAD if -dispatch/callback/retries is set to zero (which is the default).
other exceptions dead message All other exceptions coming from the clients are transformed to "user.update.internalError" and the first use case "user.*" applies.

Please see requirement admin.errorcodes.listing for a list of error codes.

The server side error handler is hidden by an interface and will be a plugin in a future version of xmlBlaster to support customized error behavior.
For example such a plugin can look at the thrown error code or error message and follow specific recovery strategies.

Note

Be prepared to receive all sorts of messages on update, like normal messages or internal xmlBlaster messages or administrative command messages.

Note for XML-RPC Clients

The update interface differs a little bit with the actual XMLRPC implementation.
The update method signature looks like:

        string update( string cbSessionId, string key, byte[] content, string qos )
      

The XmlRpcCallbackImpl.java:update() does not allow to send an array of messages. So for each message, XmlBlaster server will call the client's callback update method.

Example
XML

Here is an example of an update message:

Key<key oid='MyMessage' />
contentHello world
QoS<qos><state id='OK'></qos>

And a return value from the client (sent back to xmlBlaster):

return "<qos><state id='OK'/></qos>";
Example
XML

Here is an example of an UpdateQos XML markup which you may receive in the update() method.
Usually, the first line of an XML document - the XML declaration - defines the XML version and the character encoding used in the document: <?xml version="1.0" encoding="UTF-8"?>. As we support UTF-8 only, which is the XML default, it can be omitted. The XML version is 1.0.

<qos> <!-- UpdateQos -->

  <state id='OK'/>  <!-- OK,ERASED etc. see Constants.java -->

  <sender>/node/heron/client/Tim/-2</sender>

  <priority>5</priority>  

  <subscribe id='__subId:heron-2'/>
                                 <!-- Showing which subscription forced the update -->
                                 <!-- PtP messages are marked with '__subId:PtP' -->

  <!-- UTC time when message was created in xmlBlaster server with a publish() call,
          in nanoseconds since 1970 -->
  <rcvTimestamp nanos='1007764305862000002'>
        2001-12-07 23:31:45.862000002 <!-- The nanos from above but human readable -->
  </rcvTimestamp>

  <!-- remainingLife is calculated relative to when xmlBlaster has sent the message -->
  <expiration lifeTime='129595811' remainingLife='1200'/> 
  
  <queue index='0' size='1'/> <!-- If queued messages are flushed on login -->
  
  <redeliver>4</redeliver> <!-- Only sent if message sending had previous errors -->

  <!-- Only sent if message is from history queue directly after subscribe -->
  <clientProperty name='__isInitialUpdate'>true</clientProperty>
  
  <clientProperty name='myTransactionId'>0x23345</clientProperty>
  <clientProperty name='myDescription' encoding="base64" charset="windows-1252">
     QUUgaXMgJ8QnDQpPRSBpcyAn1icNCnNzIGlzICffJw==
  </clientProperty>
  <clientProperty name='myAge' type='int'>12</clientProperty>

  <route> <!-- Routing information in cluster environment -->
     <node id='avalon' stratum='1' timestamp='1068026303739000001' dirtyRead='false'/>
     <node id='heron' stratum='0' timestamp='1068026303773000001' dirtyRead='false'/>
  </route>
</qos>
      
Example
Java

A typical Java client code to handle message updates:

import org.xmlBlaster.client.key.UpdateKey;
import org.xmlBlaster.client.qos.UpdateQos;
...

   public String update(String cbSessionId, UpdateKey updateKey, byte[] content,
                        UpdateQos updateQos) {

      if (updateKey.isInternal()) { // key oids starting with "__" like "__cmd:..."
         System.out.println("Received internal message '" +
              updateKey.getOid() + " from xmlBlaster");
         return "<qos/>";
      }

      if (updateQos.isOk()) {       // <qos><state id='OK' ...
         System.out.println("Received asynchronous message '" + updateKey.getOid() +
                      "' state=" + updateQos.getState() +
                      " content=" + new String(content) + " from xmlBlaster");
      }
      else if (updateQos.isErased()) {   // <qos><state id='ERASED' ...
         System.out.println("Topic '" + updateKey.getOid() + " is erased");
         // Remove my subscriptionId if i have cached it ...
         return "<qos/>";
      }
      else {
         System.out.println("Ignoring asynchronous message '" + updateKey.getOid() +
                      "' state=" + updateQos.getState() + " is not handled");
      }
      return "<qos/>";
   }
      

See xmlBlaster/demo/HelloWorld*.java for more examples.

Example
Java

We provide two demo clients which allow playing with the update behavior on command line. Here is an example how to simulate a subscriber which throws an exception:


java -jar lib/xmlBlaster.jar

java javaclients.HelloWorldSubscribe -updateException.errorCode user.update.error 
                                     -updateException.message "I don't want this"

java javaclients.HelloWorldPublish -numPublish 100
      

We start a server and a subscriber which is configured to throw an exception if a message arrives, then we start a publisher and publish messages.

If we want to look a the generated dead messages we start another subscriber:


java javaclients.HelloWorldSubscribe -oid __sys__deadMessage
      
Configure

NOTE: Configuration parameters are specified on command line (-someValue 17) or in the xmlBlaster.properties file (someValue=17). See requirement "util.property" for details.
Columns named Impl tells you if the feature is implemented.
Columns named Hot tells you if the configuration is changeable in hot operation.

See API org.xmlBlaster.util.def.PriorityEnum
See API org.xmlBlaster.client.key.UpdateKey
See API org.xmlBlaster.client.qos.UpdateQos
See API org.xmlBlaster.client.I_XmlBlasterAccess
See API org.xmlBlaster.client.protocol.corba.CorbaCallbackServer
See API org.xmlBlaster.client.protocol.rmi.RmiCallbackServer
See API org.xmlBlaster.client.protocol.xmlrpc.XmlRpcCallbackServer
See API org.xmlBlaster.util.def.Constants
See REQ interface
See REQ engine.qos.update.queue
See REQ engine.qos.update.rcvTimestamp
See REQ engine.qos.update.sender
See REQ engine.qos.update.subscriptionId
See REQ admin.errorHandling
See http://www.xmlBlaster.org/xmlBlaster/src/java/org/xmlBlaster/protocol/corba/xmlBlaster.idl
See TEST org.xmlBlaster.test.qos.TestSub
See TEST org.xmlBlaster.test.qos.TestErase

This page is generated from the requirement XML file xmlBlaster/doc/requirements/interface.update.xml

Back to overview