XmlBlaster Logo

REQUIREMENT

engine.queue

XmlBlaster Logo


Type NEW
Priority HIGH
Status CLOSED
Topic XmlBlaster has a sophisticated message queueing engine
Des
cription

Message are queued only when necessary. This occurs typically on messages which need to be sent back to clients:

Overview of the internal message queue framework

Queue Features:

  • High performing and thread safe
    The queue is based on a sorted bounded buffer from Doug Lea. It is higher performing than a thread safe java.util collection TreeSet, consuming 100 mirco seconds instead of 200 micro seconds for the latter to insert and remove one message.
  • Priority support
    The priority for messages can be specified when publishing. The standard value is 5, a value of 9 has highest priority, whereas 0 is slowest.
  • Guarantee sequence
    The sequence of incoming messages is recorded with a unique timestamp. On delivery this sequence is guaranteed to be equal. Different message priorities reorder the sequence of message delivery, since priority is a higher sort criteria than the incoming timestamp.
  • Bounded (max size)
    Every queue has a maximum size. The maximum boundary is adjustable as the number of entries in the queue or as maximum memory consumption. On queue overflow xmlBlaster sends a deadMessage (see below). 'Falling through' - the oldest message is removed when the queue is full and a new message arrives - is currently not supported.
  • Dead message
    On queue overflow or on callback failure a "dead message" mode is adjustable (MQSeries calls it dead letter). In this case a dead message is published, containing the lost message. Interested exception handlers clients can subscribe to "__sys__deadMessage" messages to receive them and react accordingly. A logging entry is written in such a case to the servers log file.
  • Queue types
    A queue is installed on the fly when needed. If a message destination is not reachable, a queue is created to hold messages addressed for this destination. Two message queue types are created on login.
    1. SessionQueue (callback:)
      The life cycle of this queue is bound to the login session life cycle, usually it is created on login and destroyed on logout (see requirement engine.login.qos.session). It can be used as the ReplyTo address for sent messages. This is the default callback for session based subscriptions.
    2. SubjectQueue (subject:)
      This queue is created on login if not existing beforehand. The queue is named similar to the login name of the client. It may remain living after logout, or may be created before login if a PtP message addresses it. It is destroyed on logout of the last session of a specific client if it contains no messages (or if forced by the QoS of a disconnect(), see API of DisconnectQos). If the last message in this queue expires and no login session exists it is destroyed [TODO].
      The specified callback server of the SessionQueue is used if not otherwise specified in the login QoS, see callback attribute useForSubjectQueue='true'. The messages from this queue are delivered to all current sessions of the user having set useForSubjectQueue='true'. This means on multiple logins, the same message is delivered multiple times.
  • Zero or one callback address
    Every message queue has zero or one callback addresses associated: The SessionQueue has exactly one for asynchronous updates or zero when browsed synchronously. The SubjectQueue has no callback address, it uses the sessions callback.
  • Synchronous browsing
    A message queue may be browsed with the synchronous get() method.
  • Sequence on XPath subscription
    A subscription with XPath may collect many messages with one query. The sequence of those message is guaranteed (priority and timestamp subordering).

Example
Java


These are example QoS of a connect() invocation:
================================================

<qos>
   ...
   <!-- SessionQueue callback: Use CORBA to send messages back -->
   <queue relating='callback' maxEntries='1600' expires='360000000'
            onOverflow='deadMessage' onFailure='deadMessage'>
      
      <callback type='IOR' sessionId='sd3lXjs9Fdlggh'
                pingInterval='60000' useForSubjectQueue='true'>
         IOR:00011200070009990000....
      </callback>
   
   </queue>
</qos>


<qos>
   ...
   <!-- specify subjectQueue parameters -->
   <queue relating='subject' maxEntries='1600' expires='360000000'
            onOverflow='deadMessage'>
   </queue>
</qos>


<qos>
   ...
   <!-- SessionQueue callback:
         Use CORBA to send messages back with default queue parameters
   -->
   <callback type='IOR'>
   
      IOR:00011200070009990000....
   
      <compress type='gzip' minSize='1000' />
         <!-- compress messages bigger 1000 bytes before sending them to me -->
   
      <burstMode collectTime='400' />
         <!-- Collect messages for 400 milliseconds and update
              them in one callback (burst mode) -->
   
   </callback>
</qos>


These are example QoS of subscribe() invocations:
================================================

<!-- The subscribed messages are delivered via the SessionQueue of the subscriber -->
<qos>
</qos>

<!-- Same as above -->
<qos>
   <queue relating='callback'/>
</qos>

Any other queue overwrites session delivery:

<!-- The subscribed messages are only delivered via the
     SubjectQueue of the current client -->
<qos>
   <queue relating='subject'/>
   <queue relating='subject:somebodyElse'/>
</qos>

<qos>
   <queue maxEntries='1000' maxBytes='4000000' onOverflow='deadMessage'/>
      <callback type='EMAIL'>
         et@mars.universe
         <!-- Sends messages to et with specified queue attributes -->
      </callback>
   </queue>

   <callback type='EMAIL' onFailure='deadMessage'>
      tolkien@mars.universe
      <!-- Sends messages to tolkien, with default queue settings -->
   </callback>

   <callback type='XMLRPC' sessionId='8sd3lXjspx9Fdlggh'>
      http:/www.mars.universe:8080/RPC2
   </callback>

   <!-- The session id is passed to the client callback server, he can check
         if he trusts this sender -->
</qos>


This is an example QoS of a disconnect() invocation:
====================================================

<qos>
   <deleteSubjectQueue>false</deleteSubjectQueue>
</qos>


   
Configure

These parameters allow to configure the xmlBlaster server default behavior.

Property Default Description Implemented
queue/maxEntries 1000 The max setting allowed for queue size (number of messages in queue until it overflows) yes
queue/maxBytes 4000 (4 MB) The max setting allowed for queue max size in bytes no
queue/expires.min 1000 The min span of life is one second no
queue/expires.max 0 The max span of life of a queue is currently forever (=0) no
queue/expires 0 If not otherwise noted a queue dies after the max value (0 is forever) no
queue/onOverflow deadMessage Error handling when queue is full yes
queue/onFailure deadMessage Error handling when callback failed (after all retries etc.) yes

NOTE: Configuration parameters are specified on command line (-someValue 17) or in the xmlBlaster.properties file (someValue=17). See requirement "util.property" for details.
Columns named Impl tells you if the feature is implemented.
Columns named Hot tells you if the configuration is changeable in hot operation.

See API org.xmlBlaster.engine.queue.MsgQueue
See API org.xmlBlaster.engine.callback.CbWorker
See API org.xmlBlaster.util.qos.address.CallbackAddress
See API org.xmlBlaster.util.qos.storage.QueueProperty
See API org.xmlBlaster.client.qos.ConnectQos
See API org.xmlBlaster.client.qos.DisconnectQos
See REQ engine.callback
See REQ engine.qos.login.callback
See REQ util.property
See REQ util.property.args
See REQ util.property.env
See REQ admin.errorHandling

This page is generated from the requirement XML file xmlBlaster/doc/requirements/engine.queue.xml

Back to overview